为何自研路由组件? 随着业务体量的增加,原先的库表存储已经不能支撑海量的并发请求。因此,可能需要考虑分库分表。 无论是业务之初就考虑分库分表,还是项目中期进行分库分表迁移,考虑自研数据库路由组件的出发点都是:** 现有的技术方案无法实现(不适合、不方便)个性化的业务需求,并且自研组件小而精,易于迭代维护,后续也可加入新的功能(例如事务支持)** 分库、分表是两回事,可能只分库不分表,可能分表不分库,也可能分库分表
分库分表前 | 分库分表后 | |
---|---|---|
并发支撑情况 | MySQL 单机部署,扛不住高并发 | MySQL 从单机到多机,能承受的并发增加了多倍 |
磁盘使用情况 | MySQL 单机磁盘容量几乎撑满 | 拆分为多个库,数据库服务器磁盘使用率大大降低 |
SQL 执行性能 | 单表数据量太大,SQL 越跑越慢 | 单表数据量减少,SQL 执行效率明显提升 |
现有的分库分表组件主要有如下两种:
在应用和数据中间加了一个代理层。应用程序所有的数据请求都交给代理层处理,代理层负责分离读写请求,将它们路由到对应的数据库中。 提供类似功能的中间件有 MySQL Router(官方)、Atlas(基于 MySQL Proxy)、MaxScale、MyCat。
基于组件的则直接基于独立的 jar 包就可以进行开发,不用部署,运维成本低,不需要代理层的二次转发请求,性能很高**。**比较经典的就是 Sharding-JDBC
- 实现层面:组件需要知道数据需要从哪个具体的数据库的子表中获取,并且对用户透明
- 数据源切换:如何在组件中实现_动态数据源切换_
- 路由算法:如何实现比较均匀的_路由散列算法_
- SQL 改写:如何拦截并修改 SQL
- 引入数据分片带来的问题:主要考虑跨表连接查询、跨库事务问题
- 跨表连接查询:通常两种方案:(1)解决跨表查询;(2)规避跨表连接,采用第三方中间件汇总查询
- 跨库事务:这块也是痛点,通常有两种做法:(1)分布式事务;(2)规避分布式事务问题,采用最终一致性方案
Tips: 关于以上跨表连接查询、跨库事务具体解决方案,需要根据业务场景、对于数据一致性的要求、综合性能等综合考虑。 例如:
- 秒杀场景下,关于库存的处理,就不太适合使用繁重的分布式事务,采用最终一致性方案(MQ+JOB兜底)比较合适;反之,对于金融等场景,考虑分布式事务比较合适
- 对于 B 端系统的跨表查询场景,业务访问量也不会很大,考虑适配跨表连接方案代价就比较高了,相反采用 ES 汇总查询,相对来说容易接受一点
- AOP 切面拦截:拦截需要使用DB 路由的方法,这里采用自定义注解
- 数据库连接池配置:分库分表需要按需配置数据库连接源,在这些连接池的集合中进行动态数据源切换
AbstractRoutingDataSource
:是用于动态数据源切换的 Spring 服务类,提供了数据源切换的抽象方法determineCurrentLookupKey
- 路由哈希算法设计:在路由设计时,需要根据分库分表字段进行路由计算,让数据均匀地分布至各个库表之中。这里参考 HashMap 的 扰动函数设计
- MyBatis 拦截器:实现 sql 动态拦截和修改
├─src
│ ├─main
│ │ ├─java
│ │ │ └─io
│ │ │ └─github
│ │ │ └─mokeeqian
│ │ │ └─router
│ │ │ ├─annotation
│ │ │ ├─aop
│ │ │ ├─config
│ │ │ ├─context
│ │ │ ├─dynamic
│ │ │ ├─model
│ │ │ ├─strategy
│ │ │ │ └─impl
│ │ │ └─util
│ │ └─resources
│ │ └─META-INF
│ └─test
│ └─java
路由注解 为切面提供切点,同时获取被注解的方法入参属性中的路由字段
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DBRouter {
/**
* 路由字段
* @return
*/
String key() default "";
}
- @Retention:告诉编译程序如何处理,也可理解为注解类的生命周期。
- @Target:该注解的作用点,主要有:TYPE(类、接口)、METHOD(方法)、PACKAGE、FIELD、PARAMETER等。这里我们选择作用在方法上。
路由策略注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DBRouterStrategy {
/**
* 是否分表
* @return
*/
boolean splitTable() default false;
}
继承抽象类 AbstractRoutingDataSource
,实现其 determineCurrentLookupKey
方法,从 DBContextHolder中获取 DB key,用以实现动态切换数据源
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
// db01, db02, ...
return "db" + RouterContext.getDatabaseKey();
}
}
AbstractRoutingDataSource的getConnection() ⽅法根据查找 lookup key 键对不同⽬标数据源的调⽤, 通常是通过(但不⼀定)某些线程绑定的事物上下⽂来实现 AbstractRoutingDataSource的多数据源动态 切换的核⼼逻辑是:在程序运⾏时,把数据源数据源通过AbstractRoutingDataSource 动态织⼊到程序 中,灵活的进⾏数据源切换 基于AbstractRoutingDataSource的多数据源动态切换,可以实现读写分离,这么做缺点也很明显,⽆法 动态的增加数据源
对于较复杂的数据源配置,一般使用 org.springframework.context.EnvironmentAware
来实现:
EnvironmentAware#setEnvironment:读取 yml 配置文件中的自定义分库分表配置
public void setEnvironment(Environment environment){
String prefix="mini-db-router.jdbc.datasource.";
databaseCount=Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix+"dbCount")));
tableCount=Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix+"tbCount")));
routerKey=environment.getProperty(prefix+"routerKey");
// 分库列表 db01,db02
String dataSources=environment.getProperty(prefix+"list");
assert dataSources!=null;
for(String dbInfo:dataSources.split(",")){
Map<String, Object> dataSourceProps=PropertyUtil.handle(environment,prefix+dbInfo,Map.class);
dataSourceMap.put(dbInfo,dataSourceProps);
}
// 默认数据源
String defaultData=environment.getProperty(prefix+"default");
defaultDataSourceConfig=PropertyUtil.handle(environment,prefix+defaultData,Map.class);
}
router:
jdbc:
datasource:
# 从这里开始就是数据源的配置了
dbCount: 2
tbCount: 4
default: db00
routerKey: uId # 路由字段
list: db01,db02 # 分库
db00:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery?useUnicode=true
username: root
password: xxx
db01:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery_01?useUnicode=true
username: root
password: xxx
db02:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery_02?useUnicode=true
username: root
password: xxx
创建数据源:DynamicDataSource#setTargetDataSources,DynamicDataSource#setDefaultTargetDataSource
@Bean
public DataSource dataSource(){
Map<Object, Object> targetDataSources=new HashMap<>();
for(String dbInfo:dataSourceMap.keySet()){
Map<String, Object> objMap=dataSourceMap.get(dbInfo);
targetDataSources.put(dbInfo,new DriverManagerDataSource(
objMap.get("url").toString(),objMap.get("username").toString(),objMap.get("password").toString())
);
}
// 设置数据源
DynamicDataSource dynamicDataSource=new DynamicDataSource();
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(
new DriverManagerDataSource(
defaultDataSourceConfig.get("url").toString(),
defaultDataSourceConfig.get("username").toString(),
defaultDataSourceConfig.get("password").toString()
)
);
return dynamicDataSource;
}
使用 ThreadLocal 保存分库、分表的路由结果,借鉴 SecurityContextHolder
public class RouterContext {
private static final ThreadLocal<String> DATABASE_KEY = new ThreadLocal<>();
private static final ThreadLocal<String> TABLE_KEY = new ThreadLocal<>();
public static String getDatabaseKey() {
return DATABASE_KEY.get();
}
public static void setDatabaseKey(String databaseKey) {
DATABASE_KEY.set(databaseKey);
}
public static String getTableKey() {
return TABLE_KEY.get();
}
public static void setTableKey(String tableKey) {
TABLE_KEY.set(tableKey);
}
public static void clearDatabaseKey() {
DATABASE_KEY.remove();
}
public static void clearTableKey() {
TABLE_KEY.remove();
}
}
这里采用接口 IDBRouterStrategy
,后续可以实现该接口,进行个性化的路由策略配置
基于HashMap 扰动函数思想实现路由分发
@Override
public void doRouter(String databaseKeyFieldValue){
// 总的库表数目
int size=routerConfig.getDatabaseCount()*routerConfig.getTableCount();
// 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
int idx=(size-1)&(databaseKeyFieldValue.hashCode()^(databaseKeyFieldValue.hashCode()>>>16));
// 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
int dbIdx=idx/routerConfig.getTableCount()+1;
int tbIdx=idx-routerConfig.getTableCount()*(dbIdx-1);
// 设置路由到 context
RouterContext.setDatabaseKey(String.format("%02d",dbIdx));
RouterContext.setTableKey(String.format("%03d",tbIdx));
}
基于以上,分库功能已经实现,但是,如何分表?即将逻辑 SQL 转化为 **物理 SQL,**例如: 逻辑SQL:SELECT * FROM tb_user WHERE id = 123;
物理SQL:SELECT * FROM tb_user_01 WHERE id = 123;
一种思路是:使用 MyBatis 的 Interceptor 进行 SQL 拦截,然后动态修改 SQL
mybatis:自定义实现拦截器插件Interceptor
-
@Intercepts注解:拦截器 可以被拦截的四种类型:
- Executor:拦截执行器的方法
- ParameterHandler:拦截参数的处理
- ResultHandler:拦截结果集的处理
- StatementHandler:拦截Sql语法构建的处理
-
@Signature注解:拦截点,指定拦截哪个对象里面的哪个方法 其参数如下:
- type:要被拦截的类型(上述四种之一)
- method:在类型基础上,指定被拦截的方法
- args:在方法基础上,指定方法入参参数(Java里可能存在重载,故要注意参数顺序和类型)
-
类型&方法一览
拦截类型 拦截方法 Executor update、query、flushStatements、commit、rollback、getTransaction、close、isClosed ParameterHandler getParameterObject、setParameters ResultHandler handleResultSets、handleOutputParameters StatementHandler prepare、parameterize、batch、update、query
StatementHandler 的具体方法:
- prepare: ⽤于创建⼀个具体的 Statement 对象的实现类或者是 Statement 对象
- parametersize: ⽤于初始化 Statement 对象以及对sql的占位符进⾏赋值
- update: ⽤于通知 Statement 对象将 insert、update、delete 操作推送到数据库
- query: ⽤于通知 Statement 对象将 select 操作推送数据库并返回对应的查询结果
我们主要使用 StatementHandler 的 prepare 方法,拦截 sql 语句
@Intercepts(
@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})
)
这里的 Interceptor#intercept 方法就是我们要实现的方法,其中,invocation 就是被拦截的对象(StatementHandler#prepare方法)
/**
* @author Clinton Begin
*/
public interface Interceptor {
Object intercept(Invocation invocation) throws Throwable;
default Object plugin(Object target) {
return Plugin.wrap(target, this);
}
default void setProperties(Properties properties) {
// NOP
}
}
如何获取 MyBatis 中的 SQL 语句?
基于 StatementHandler
,然后 获取其 BoundSql
// 获取 StatementHandler
StatementHandler statementHandler=(StatementHandler)invocation.getTarget();
MetaObject metaObject=MetaObject.forObject(statementHandler,SystemMetaObject.DEFAULT_OBJECT_FACTORY,
SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY,new DefaultReflectorFactory());
MappedStatement mappedStatement=(MappedStatement)metaObject.getValue("delegate.mappedStatement");
// 获取 MyBatis 原始 SQL
BoundSql boundSql=statementHandler.getBoundSql();
String originalSql=boundSql.getSql();
如何识别 SQL 中的表名称? 使用正则表达式匹配:from,into,update 这三个关键字,其之后就是表名称
private Pattern pattern=Pattern.compile("(from|into|update)[\\s]{1,}(\\w{1,})",Pattern.CASE_INSENSITIVE);
识别之后如何替换?
使用反射,直接修改 BoundSql#sql
字段
// 通过反射修改 sql 语句
// getDeclaredField:可以获取所有已声明字段(无视访问限定符); getField:只能获取public 字段
Field field=boundSql.getClass().getDeclaredField("sql");
field.setAccessible(true);
field.set(boundSql,replacedSql);
field.setAccessible(false);
- 使用反射可以访问 Java 类的私有成员、私有方法。在框架开发中,用处十分广泛
目前为止,底层逻辑已经全部实现,现在只需要使用AOP对调用方法进行拦截处理即可 定义切点 这里直接拦截先前定义的自定义注解,也可以是使用表达式匹配
/**
* 切点,拦截 @DBRouter
*/
@Pointcut("@annotation(io.github.mokeeqian.router.annotation.DBRouter)")
public void pointCut(){}
定义切面拦截具体逻辑
@Around("pointCut() && @annotation(dbRouter)")
public Object aroundAnnotationDBRouter(ProceedingJoinPoint proceedingJoinPoint,DBRouter dbRouter)throws Throwable{
// 从 @DBRouter 注解中拿到路由 Key
String dbKey=dbRouter.key();
if(StringUtils.isBlank(dbKey)||StringUtils.isBlank(routerConfig.getRouterKey())){
throw new RuntimeException("annotation @DBRouter key is null");
}
// 如果 @DBRouter key 属性未指定,则默认使用 application.yml 中的 routerKey
if(StringUtils.isBlank(dbKey)){
dbKey=routerConfig.getRouterKey();
}
// 获取路由字段的属性值
String dbKeyFieldValue=parseRouterKeyFieldValue(dbKey,proceedingJoinPoint.getArgs());
// 路由下发
routerStrategy.doRouter(dbKeyFieldValue);
// 放行
try{
return proceedingJoinPoint.proceed();
}finally{
// 清空路由
routerStrategy.clear();
}
}
- 几种切面环绕逻辑:
- @Before:前置通知,在方法执行之前执行
- @After:后置通知,在方法执行之后执行(即使出现异常,后置通知也会执行)
- @Around:环绕通知,围绕着方法执行(可以实现其他四种通知)
- @AfterReturning:返回通知,在方法返回结果之后执行
- @AfterThrowing:异常通知,在方法抛出异常之后
- AspectJ 注解的执行顺序:
@Around 都会出现两次:@Before、@AfterReturning、@AfterReturning、@After 这四个都会在两次@Around 执行之间被执行
- 无异常时:@Aspect、@Pointcut、@Around、@Before、@AfterReturning、@After、@Around
- 有异常时:@Aspect、@Pointcut、@Around、@Before、@AfterThrowing、@After、@Around
最后的最后,将项目封装成 SpringBoot 起步依赖,编写配置类,然后利用自动装配机制。
package io.github.mokeeqian.router.config;
import io.github.mokeeqian.router.aop.RouterAspect;
import io.github.mokeeqian.router.dynamic.DynamicDataSource;
import io.github.mokeeqian.router.dynamic.DynamicMybatisPlugin;
import io.github.mokeeqian.router.model.RouterConfig;
import io.github.mokeeqian.router.strategy.IRouterStrategy;
import io.github.mokeeqian.router.strategy.impl.RouterStrategyHashCode;
import io.github.mokeeqian.router.util.PropertyUtil;
import org.apache.ibatis.plugin.Interceptor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.transaction.support.TransactionTemplate;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* @description: 数据源配置
* @author:mokeeqian
* @date: 2023/8/27
* @Copyright: mokeeqian@gmail.com
*/
@Configuration
public class DataSourceAutoConfig implements EnvironmentAware {
/**
* 数据源
* db -> Config
*/
private Map<String, Map<String, Object>> dataSourceMap = new HashMap<>();
/**
* 默认数据源配置
*/
private Map<String, Object> defaultDataSourceConfig;
/**
* database 数目
*/
private int databaseCount;
/**
* table 数目
*/
private int tableCount;
/**
* 路由 key
*/
private String routerKey;
@Bean
public RouterConfig routerConfig() {
return new RouterConfig(this.databaseCount, this.tableCount, this.routerKey);
}
@Bean
public IRouterStrategy routerStrategy(RouterConfig routerConfig) {
return new RouterStrategyHashCode(routerConfig);
}
@Bean
public Interceptor plugin() {
return new DynamicMybatisPlugin();
}
@Bean(name = "routerAspect")
@ConditionalOnMissingBean
public RouterAspect routerAspect(RouterConfig routerConfig, IRouterStrategy routerStrategy) {
return new RouterAspect(routerConfig, routerStrategy);
}
/**
* 这个数据源就会被 MyBatis SpringBoot Starter 中 SqlSessionFactory sqlSessionFactory(DataSource dataSource) 注入使用
*
* @return
*/
@Bean
public DataSource dataSource() {
Map<Object, Object> targetDataSources = new HashMap<>();
for (String dbInfo : dataSourceMap.keySet()) {
Map<String, Object> objMap = dataSourceMap.get(dbInfo);
targetDataSources.put(dbInfo, new DriverManagerDataSource(
objMap.get("url").toString(), objMap.get("username").toString(), objMap.get("password").toString())
);
}
// 设置数据源
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(
new DriverManagerDataSource(
defaultDataSourceConfig.get("url").toString(),
defaultDataSourceConfig.get("username").toString(),
defaultDataSourceConfig.get("password").toString()
)
);
return dynamicDataSource;
}
@Bean
public TransactionTemplate transactionTemplate(DataSource dataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dataSource);
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager(dataSourceTransactionManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return transactionTemplate;
}
/**
* 读取 yml 数据源配置
*
* @param environment
*/
@Override
public void setEnvironment(Environment environment) {
String prefix = "mini-db-router.jdbc.datasource.";
databaseCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "dbCount")));
tableCount = Integer.parseInt(Objects.requireNonNull(environment.getProperty(prefix + "tbCount")));
routerKey = environment.getProperty(prefix + "routerKey");
// 分库列表 db01,db02
String dataSources = environment.getProperty(prefix + "list");
assert dataSources != null;
for (String dbInfo : dataSources.split(",")) {
Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + dbInfo, Map.class);
dataSourceMap.put(dbInfo, dataSourceProps);
}
// 默认数据源
String defaultData = environment.getProperty(prefix + "default");
defaultDataSourceConfig = PropertyUtil.handle(environment, prefix + defaultData, Map.class);
}
}
随后,需要编写 resources/META_INF/spring.factories
文件,配置数据源配置类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.github.mokeeqian.router.config.DataSourceAutoConfig
最后,将项目打包到 maven 仓库,即可使用啦
基于原先的单库单表,现在如何做迁移?
最简单的就是直接停机迁移,停止一切写入。然后将旧库数据迁移至新库。
如果线上业务不能停机怎么办?
- 我们对老库的更新操作(增删改),同时也要写入新库(双写)。如果操作的数据不存在于新库的话,需要插入到新库中。 这样就能保证,咱们新库里的数据是最新的。
- 在迁移过程,双写只会让被更新操作过的老库中的数据同步到新库,我们_还需要自己写脚本将老库中的数据和新库的数据做比对_。如果新库中没有,那咱们就把数据插入到新库。如果新库有,旧库没有,就把新库对应的数据删除(冗余数据清理)。
- 重复上一步的操作,直到老库和新库的数据一致为止