说明
- 顶顶大名的分库分表中间件,废话不多说,官网地址:https://shardingsphere.apache.org/
- 本文中数据库用的是mysql5.7,并且实现了一主一从。
- 场景是订单表的分表,并且要支持只根据user_id进行查询的场景,所以要将用户的标识信息放到主键order_id中,这样才能既能只根据主键order_id进行查询,又能只根据user_id进行查询。
- 顺便支持一下主从分离,这个比较简单,加一下配置即可
- 完整代码地址在结尾!!
官方简介
- 定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。
- 适用于任何基于JDBC的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。
- 支持任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
- 支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer,PostgreSQL以及任何遵循SQL92标准的数据库。
第一步,在pom.xml加入依赖,如下
<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- mybatisPlus 核心库 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.1</version>
</dependency>
<!-- sharding-jdbc -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1</version>
</dependency>
<!-- hutool工具 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.5</version>
</dependency>
注:
- 本文的ORM框架使用的是MyBatis-Plus。
- hutool工具用于生成自定义id。
第二步,在application.yml配置shardingsphere,mybatis-plus相关配置
spring:
application:
name: shardingjdbc-demo-server
shardingsphere:
datasource:
# 数据源
names: master,salve
master:
driver-class-name: com.mysql.cj.jdbc.Driver
password: root
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://xxx:3306/db1
username: root
salve:
driver-class-name: com.mysql.cj.jdbc.Driver
password: root
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://xxx:3306/db2
username: root
sharding:
# 主从分离
master-slave-rules:
master:
master-data-source-name: master
slave-data-source-names: salve
# 表分片
tables:
my_order:
# 主表分片规则表名
actual-data-nodes: master.my_order_$->{0..3}
# 主键策略
# key-generator:
# column: id
# type: MyShardingKeyGenerator
table-strategy:
# 行表达式分片
# inline:
# algorithm-expression: order_$->{id.longValue() % 4}
# sharding-column: id
# 标准分片
# standard:
# sharding-column: id
# 指定自定义分片算法类的全路径
# precise-algorithm-class-name: com.jinhx.shardingjdbc.config.MyPreciseShardingAlgorithm
# 复合分片
complex:
# 分片键
sharding-columns: order_id,user_id
# 指定自定义分片算法类的全路径
algorithm-class-name: com.jinhx.shardingjdbc.config.MyComplexKeysShardingAlgorithm
# defaultTableStrategy:
# 打开sql控制台输出日志
props:
sql:
show: true
# mybatis-plus相关配置
mybatis-plus:
# xml扫描,多个目录用逗号或者分号分隔(告诉 Mapper 所对应的 XML 文件位置)
mapper-locations: classpath:com/jinhx/shardingjdbc/mapper/xml/*.xml
# 别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.jinhx.shardingjdbc.entity
configuration:
# 不开启二级缓存
cache-enabled: false
# 是否开启自动驼峰命名规则映射:从数据库列名到Java属性驼峰命名的类似映射
map-underscore-to-camel-case: true
# 如果查询结果中包含空值的列,则 MyBatis 在映射的时候,不会映射这个字段
call-setters-on-nulls: true
# 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
server:
port: 8093
第三步,在主库创建数据库db1,创建订单表,如下
说明
- 本文不进行分库,只分4个表,分别为my_order_0,my_order_1,my_order_2,my_order_3
sql
CREATE DATABASE db1;
use db1;
create table my_order_0
(
order_id bigint not null comment '订单id主键'
primary key,
user_id bigint not null comment '用户id',
money bigint not null comment '金额'
)
comment '用户订单表';
其他表结构一致,此处省略
第四步,创建表操作相应类
- 使用mybatis-plus的代码生成器对数据库的表生成相应的类,不懂的请参考另外一篇文章-SpringBoot 2.2.5 整合MyBatis-Plus 3.3.1 教程,配置多数据源并支持事务,附带代码生成器使用教程
- 手动创建,包括Order,IOrderService,OrderServiceImpl等,如下
Order
package com.jinhx.shardingjdbc.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.jinhx.shardingjdbc.util.SnowFlakeUtil;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Objects;
/**
* Order
*
* @author jinhx
* @since 2021-07-27
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("my_order")
public class Order implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 分表的数量,一定要2的n次方
*/
public static final int TABLE_COUNT = 4;
/**
* 订单id主键
*/
@TableId(type = IdType.INPUT)
private Long orderId;
/**
* 用户id
*/
private Long userId;
/**
* 金额
*/
private Long money;
public void buildOrderId(){
if (Objects.isNull(this.userId)){
throw new RuntimeException("userId为空,无法生成orderId");
}
this.orderId = SnowFlakeUtil.getSnowflakeId(SnowFlakeUtil.getDataCenterId(this.userId) & (TABLE_COUNT - 1));
}
public void buildUserId(Integer dataCenterId){
if (Objects.isNull(dataCenterId)){
throw new RuntimeException("dataCenterId为空,无法生成userId");
}
this.userId = SnowFlakeUtil.getSnowflakeId(dataCenterId & (TABLE_COUNT - 1));
}
}
IOrderService
package com.jinhx.shardingjdbc.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.jinhx.shardingjdbc.entity.Order;
import java.util.List;
/**
* IOrderService
*
* @author jinhx
* @since 2021-07-27
*/
public interface IOrderService extends IService<Order> {
/**
* 根据orderIds查询
*
* @param orderIds orderIds
* @return List<Order>
*/
List<Order> selectByOrderIds(List<Long> orderIds);
/**
* 根据userIds查询
*
* @param userIds userIds
* @return List<Order>
*/
List<Order> selectByUserIds(List<Long> userIds);
/**
* 批量插入
*
* @param orders orders
*/
void insertOrders(List<Order> orders);
}
OrderServiceImpl
package com.jinhx.shardingjdbc.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jinhx.shardingjdbc.entity.Order;
import com.jinhx.shardingjdbc.mapper.OrderMapper;
import com.jinhx.shardingjdbc.service.IOrderService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* OrderServiceImpl
*
* @author jinhx
* @since 2021-07-27
*/
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
/**
* 根据orderIds查询
*
* @param orderIds orderIds
* @return List<Order>
*/
@Override
public List<Order> selectByOrderIds(List<Long> orderIds) {
return baseMapper.selectBatchIds(orderIds);
}
/**
* 根据userIds查询
*
* @param userIds userIds
* @return List<Order>
*/
@Override
public List<Order> selectByUserIds(List<Long> userIds) {
return baseMapper.selectList(new LambdaQueryWrapper<Order>()
.in(CollectionUtils.isNotEmpty(userIds), Order::getUserId, userIds));
}
/**
* 批量插入
*
* @param orders orders
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void insertOrders(List<Order> orders) {
if (CollectionUtils.isNotEmpty(orders)){
if (orders.stream().mapToInt(item -> baseMapper.insert(item)).sum() != orders.size()){
log.error("批量插入order表失败 orders={}" + orders);
throw new RuntimeException("批量插入order表失败");
}
}
}
}
第五步,配置MybatisPlus,如下
1. 在启动类MybatisplusApplication新增@MapperScan注解,里面写入生成的文件中的mapper存放的路径,用于扫描mapper文件。
package com.jinhx.shardingjdbc;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan("com.jinhx.shardingjdbc.mapper")
@SpringBootApplication
public class ShardingjdbcApplication {
public static void main(String[] args) {
SpringApplication.run(ShardingjdbcApplication.class, args);
}
}
2. 创建MybatisPlus配置类,MybatisPlusConfig,主要是配置一些插件的使用,此步可省略
package com.jinhx.shardingjdbc.config;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.plugins.pagination.optimize.JsqlParserCountOptimize;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* Mybatis-Plus配置类
*
* @author jinhx
* @since 2021-07-27
*/
@EnableTransactionManagement
@Configuration
public class MybatisPlusConfig {
/**
* mybatis-plus SQL执行效率插件【生产环境可以关闭】,设置 dev test 环境开启
*/
// @Bean
// @Profile({"dev", "qa"})
// public PerformanceInterceptor performanceInterceptor() {
// PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor();
// performanceInterceptor.setMaxTime(1000);
// performanceInterceptor.setFormat(true);
// return performanceInterceptor;
// }
/**
* 分页插件
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
// 设置请求的页面大于最大页后操作, true调回到首页,false 继续请求 默认false
paginationInterceptor.setOverflow(false);
// 设置最大单页限制数量,默认 500 条,-1 不受限制
paginationInterceptor.setLimit(500);
// 开启 count 的 join 优化,只针对部分 left join
paginationInterceptor.setCountSqlParser(new JsqlParserCountOptimize(true));
return paginationInterceptor;
}
}
第六步,创建自定义复合分片算法类MyComplexKeysShardingAlgorithm,注意自己替换application.yml里面的全路径
package com.jinhx.shardingjdbc.config;
import com.jinhx.shardingjdbc.util.SnowFlakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 配置Sharding-JDBC复合分片算法
* 根据id和age计算,来确定是路由到那个表中
* 目前处理 = 和 in 操作,其余的操作,比如 >、< 等范围操作均不支持。
*
* @author jinhx
* @since 2021-07-27
*/
@Slf4j
public class MyComplexKeysShardingAlgorithm implements ComplexKeysShardingAlgorithm<Long> {
/**
* orderId
*/
private static final String COLUMN_ORDER_ID = "order_id";
/**
* userId
*/
private static final String COLUMN_USER_ID = "user_id";
/**
* 重写复合分片算法
*/
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames, ComplexKeysShardingValue<Long> shardingValue) {
if (!shardingValue.getColumnNameAndRangeValuesMap().isEmpty()) {
throw new RuntimeException("条件全部为空,无法路由到具体的表,暂时不支持范围查询");
}
// 获取orderId
Collection<Long> orderIds = shardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_ORDER_ID, new ArrayList<>(1));
// 获取userId
Collection<Long> userIds = shardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_USER_ID, new ArrayList<>(1));
if (CollectionUtils.isEmpty(orderIds) && CollectionUtils.isEmpty(userIds)) {
throw new RuntimeException("orderId,userId字段同时为空,无法路由到具体的表,暂时不支持范围查询");
}
// 获取最终要查询的表后缀序号的集合,入参顺序不能颠倒
List<Integer> tableNos = getTableNoList(orderIds, userIds);
return tableNos.stream()
// 对可用的表数量求余数,获取到真实的表的后缀
// .map(idSuffix -> String.valueOf(idSuffix % availableTargetNames.size()))
// 拼接获取到真实的表
.map(tableSuffix -> availableTargetNames.stream().filter(targetName -> targetName.endsWith(String.valueOf(tableSuffix))).findFirst().orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* 获取最终要查询的表后缀序号的集合
*
* @param orderIds orderId字段集合
* @param userIds userId字段集合
* @return 最终要查询的表后缀序号的集合
*/
private List<Integer> getTableNoList(Collection<Long> orderIds, Collection<Long> userIds) {
List<Integer> result = new ArrayList<>();
if (CollectionUtils.isNotEmpty(orderIds)){
// 获取表位信息
result.addAll(orderIds.stream()
.filter(item -> Objects.nonNull(item) && item > 0)
.map(item -> (int) SnowFlakeUtil.getDataCenterId(item))
.collect(Collectors.toList()));
}
if (CollectionUtils.isNotEmpty(userIds)) {
// 获取表位信息
result.addAll(userIds.stream().filter(item -> Objects.nonNull(item) && item > 0)
.map(item -> (int) SnowFlakeUtil.getDataCenterId(item))
.collect(Collectors.toList()));
}
if (CollectionUtils.isNotEmpty(result)) {
log.info("SharingJDBC解析路由表后缀成功 redEnvelopeIds={} uids={} 路由表后缀列表={}", orderIds, userIds, result);
// 合并去重
return result.stream().distinct().collect(Collectors.toList());
}
log.error("SharingJDBC解析路由表后缀失败 redEnvelopeIds={} uids={}", orderIds, userIds);
throw new RuntimeException("orderId,userId解析路由表后缀为空,无法路由到具体的表,暂时不支持范围查询");
}
}
第七步,编写单元测试类,ShardingjdbcApplicationTests,并进行测试
测试步骤
- 先运行insertOrdersTest方法向数据库插入数据,跑完分别查看四个表,是否都有数据,且理论上来说应该是数据均匀
- 分别从4个表里面随机捞几条数据的order_id出来,然后运行selectByOrderIdsTest,看是否都能查出数据,此步骤是为了验证只根据order_id进行路由查询是否正常
- 分别从4个表里面随机捞几条数据的user_id出来,然后运行selectByUserIdsTest,看是否都能查出数据,此步骤是为了验证只根据user_id进行路由查询是否正常
ShardingjdbcApplicationTests
package com.jinhx.shardingjdbc;
import com.jinhx.shardingjdbc.entity.Order;
import com.jinhx.shardingjdbc.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
@Slf4j
// 获取启动类,加载配置,确定装载 Spring 程序的装载方法,它回去寻找 主配置启动类(被 @SpringBootApplication 注解的)
@SpringBootTest
class ShardingjdbcApplicationTests {
@Autowired
private IOrderService iOrderService;
@Test
void selectByOrderIdsTest() {
List<Long> orderIds = Lists.newArrayList(1443844581547311109L, 1443844581547442181L, 1443844581547573255L, 1443844581547704327L);
log.info(iOrderService.selectByOrderIds(orderIds).toString());
}
@Test
void selectByUserIdsTest() {
List<Long> userIds = Lists.newArrayList(1443844581547311108L, 1443844581547311106L, 1443844581547442180L, 1443844581547704326L);
log.info(iOrderService.selectByUserIds(userIds).toString());
}
@Test
void insertOrdersTest() {
List<Order> orders = Lists.newArrayList();
for (int i = 1;i < 100;i++){
Order order = new Order();
order.buildUserId(i);
order.setMoney(i * 1000L);
order.buildOrderId();
orders.add(order);
}
log.info("orders={}", orders);
iOrderService.insertOrders(orders);
}
@BeforeEach
void testBefore(){
log.info("测试开始!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
@AfterEach
void testAfter(){
log.info("测试结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
}
注:此工程包含多个module,本文所用代码均在shardingjdbc-demo模块下