-
Notifications
You must be signed in to change notification settings - Fork 447
Java客户端 数据库分片
本文主要说明dal的分片使用方式。包括
- 如何配置shard策略
- 如何在操作做那个使用shard
- 基于SQL的跨shard操作
- 基于单表的跨shard操作
如无特别说明,所有的字段,表名等字符相关信息全部是大小写敏感的
示例配置
<databaseSet name="HA_Test" provider="mySqlProvider" shardingStrategy=""> <add name="dao_test_M" databaseType="Master" sharding="" connectionString="dao_test"/> <add name="dao_test_S1" databaseType="Slave" sharding="" connectionString="dao_test"/> <add name="dao_test_S2" databaseType="Slave" sharding="" connectionString="dao_test_1"/> <add name="dao_test_S3" databaseType="Slave" sharding="" connectionString="dao_test_2"/> </databaseSet> <databaseSet name="dao_test_mod" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;columns=id;mod=2" > <add name="dao_test_sqlsvr_M" databaseType="Master" sharding="0" connectionString="HotelPubDB"/> <add name="dao_test_sqlsvr_S" databaseType="Slave" sharding="0" connectionString="HotelPubDB"/> <add name="dao_test_mysql_M" databaseType="Master" sharding="1" connectionString="dao_test"/> <add name="dao_test_mysql_S" databaseType="Slave" sharding="1" connectionString="dao_test"/> </databaseSet> <databaseSet name="dao_test_simple" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.SimpleShardHintStrategy;shardByDb=true" > <add name="dao_test_sqlsvr_M" databaseType="Master" sharding="0" connectionString="HotelPubDB"/> <add name="dao_test_mysql_M" databaseType="Master" sharding="1" connectionString="dao_test"/> </databaseSet> <databaseSet name="dao_test_sqlsvr_tableShard" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;tableColumns=index,tableIndex;tableMod=4;separator=_;shardedTables=dal_client_test" > <add name="dao_test_sqlsvr_M" databaseType="Master" sharding="" connectionString="HotelPubDB"/> </databaseSet> <databaseSet name="dao_test_sqlsvr_dbShard" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;columns=index,tableIndex;mod=2;"> <add name="dao_test_sqlsvr_dbShard_0" databaseType="Master" sharding="0" connectionString="SimpleShard_0"/> <add name="dao_test_sqlsvr_dbShard_1" databaseType="Master" sharding="1" connectionString="SimpleShard_1"/> </databaseSet> <databaseSet name="dao_test_sqlsvr_dbTableShard" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;columns=index,dbIndex;mod=2;tableColumns=table,tableIndex;tableMod=4;separator=_;shardedTables=dal_client_test"> <add name="dao_test_sqlsvr_dbShard_0" databaseType="Master" sharding="0" connectionString="SimpleShard_0"/> <add name="dao_test_sqlsvr_dbShard_1" databaseType="Master" sharding="1" connectionString="SimpleShard_1"/> </databaseSet>
通过用户直接指定shard id的方式来通知系统具体使用那个shard。可以指定shard的方式是按照DB还是Table。
参数说明
- shardByDb。是否按照DB shard。例如 shardByDb=true
- shardByTable。是否按照Table shard。例如 shardByTable=true
- shardedTables。按照Table shard的表名列表。例如 shardedTables=dal_client_test
- separator。Table名和shard Id之间的分割符。例如separator=_
按照DB shard的配置。指定shardByDb=true
按照Table shard的配置。指定shardByTable=true。并且运行shard的表名list为shardedTables=dal_client_test
可以同时指定shardByDb=true和shardByTable=true
示例代码
@Test
public void testSimple() {
try {
DalClient client = DalClientFactory.getClient(DATABASE_NAME_SIMPLE);
String sql = "select id from " + TABLE_NAME;
StatementParameters parameters = new StatementParameters();
DalHints hints = new DalHints().inShard("0").masterOnly();
Integer o = (Integer)client.query(sql, parameters, hints, new DalScalarExtractor());
assertNotNull(o);
assertEquals(4, o.longValue());
hints = new DalHints().inShard("1").masterOnly();
Long l = (Long)client.query(sql, parameters, hints, new DalScalarExtractor());
assertNotNull(l);
assertEquals(1, l.longValue());
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
利用DalHints里面的shardValue, parameters和shardColValues来判断具体的shard。 参数说明:
- columns。用于DB shard的列名。可以为虚拟列,即不存在的名字。例如 columns=index,tableIndex
- mod。DB shard的模值。例如 mod=2
- shardedTables。可以应用table shard的表名。例如 shardedTables=dal_client_test
- tableColumns。用于Table shard的列名。可以为虚拟列,即不存在的名字。例如tableColumns=index,tableIndex
- tableMod。Table shard的模值。例如 tableMod=4
- separator。Table名和shard Id之间的分割符。例如 separator=_
注:参数之间用‘;’分隔。多个参数值之间用‘,’分隔。
由于shard id可以由多个factor计算得来。需要确定优先级以便设定正确的参数。其运算考虑使用的factor的优先级为:
shardId〉shardValue〉shardColValue〉parameters〉fields
配置
代码
@Test
public void testQueryByPk() throws SQLException {
ClientTestModel model = null;
for(int i = 0; i < mod; i++) {
// By shard
if(i%2 == 0)
model = dao.queryByPk(1, new DalHints().inShard(String.valueOf(i)));
else
model = dao.queryByPk(1, new DalHints().inShard(i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
// By shardValue
if(i%2 == 0)
model = dao.queryByPk(1, new DalHints().setShardValue(String.valueOf(i)));
else
model = dao.queryByPk(1, new DalHints().setShardValue(i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
// By shardColValue
if(i%2 == 0)
model = dao.queryByPk(1, new DalHints().setShardColValue("index", String.valueOf(i)));
else
model = dao.queryByPk(1, new DalHints().setShardColValue("index", i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
// By shardColValue
if(i%2 == 0)
model = dao.queryByPk(1, new DalHints().setShardColValue("tableIndex", String.valueOf(i)));
else
model = dao.queryByPk(1, new DalHints().setShardColValue("tableIndex", i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
}
}
按照Table shard
代码
@Test
public void testQueryByPkWithEntity() throws SQLException{
ClientTestModel pk = null;
ClientTestModel model = null;
for(int i = 0; i < mod; i++) {
pk = new ClientTestModel();
pk.setId(1);
// By tabelShard
model = dao.queryByPk(pk, new DalHints().inTableShard(i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
// By tableShardValue
model = dao.queryByPk(pk, new DalHints().setTableShardValue(i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
// By shardColValue
model = dao.queryByPk(pk, new DalHints().setShardColValue("index", i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
// By shardColValue
model = dao.queryByPk(pk, new DalHints().setShardColValue("tableIndex", i));
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
// By fields
pk.setTableIndex(i);
model = dao.queryByPk(pk, new DalHints());
Assert.assertEquals(1, model.getId().intValue());
Assert.assertEquals(i, model.getTableIndex().intValue());
}
}
同时按照DB和Table shard
代码
@Test
public void testQueryByPk() throws SQLException {
ClientTestModel model = null;
for(int i = 0; i < mod; i++) {
// By shard
if(i%2 == 0)
testQueryByPk(i, new DalHints().inShard(String.valueOf(i)));
else
testQueryByPk(i, new DalHints().inShard(i));
// By shardValue
if(i%2 == 0)
testQueryByPk(i, new DalHints().setShardValue(String.valueOf(i)));
else
testQueryByPk(i, new DalHints().setShardValue(i));
// By shardColValue
if(i%2 == 0)
testQueryByPk(i, new DalHints().setShardColValue("index", String.valueOf(i)));
else
testQueryByPk(i, new DalHints().setShardColValue("index", i));
// By shardColValue
if(i%2 == 0)
testQueryByPk(i, new DalHints().setShardColValue("dbIndex", String.valueOf(i)));
else
testQueryByPk(i, new DalHints().setShardColValue("dbIndex", i));
}
}
private void testQueryByPk(int shardId, DalHints hints) throws SQLException {
ClientTestModel model = null;
for(int i = 0; i < tableMod; i++) {
int id = 1;
// By tabelShard
if(i%2 == 0)
model = dao.queryByPk(1, hints.clone().inTableShard(String.valueOf(i)));
else
model = dao.queryByPk(1, hints.clone().inTableShard(i));
assertQueryByPk(shardId, model, i, id);
// By tableShardValue
if(i%2 == 0)
model = dao.queryByPk(1, hints.clone().setTableShardValue(String.valueOf(i)));
else
model = dao.queryByPk(1, hints.clone().setTableShardValue(i));
assertQueryByPk(shardId, model, i, id);
// By shardColValue
if(i%2 == 0)
model = dao.queryByPk(1, hints.clone().setShardColValue("table", String.valueOf(i)));
else
model = dao.queryByPk(1, hints.clone().setShardColValue("table", i));
assertQueryByPk(shardId, model, i, id);
// By shardColValue
if(i%2 == 0)
model = dao.queryByPk(1, hints.clone().setShardColValue("tableIndex", String.valueOf(i)));
else
model = dao.queryByPk(1, hints.clone().setShardColValue("tableIndex", i));
assertQueryByPk(shardId, model, i, id);
}
}
private void assertQueryByPk(int shardId, ClientTestModel model, int i,
int id) {
assertQueryFirstWithWhereClause(shardId, model, i);
Assert.assertEquals(id * (shardId + 1) * (i+1), model.getQuantity().intValue());
}
按照fields的例子
/**
* Test Insert multiple entities with key-holder
* @throws SQLException
*/
@Test
public void testInsertMultipleAsListWithKeyHolderFail() throws SQLException{
List<ClientTestModel> entities = createListNoId(3);
KeyHolder holder = createKeyHolder();
int res;
try {
res = dao.insert(new DalHints(), holder, entities);
Assert.fail();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testInsertMultipleAsListWithKeyHolderByShard() throws SQLException{
for(int i = 0; i < mod; i++) {
// By shard
testInsertMultipleAsListWithKeyHolder(i, new DalHints().inShard(i));
}
}
@Test
public void testInsertMultipleAsListWithKeyHolderByShardValue() throws SQLException{
for(int i = 0; i < mod; i++) {
// By shardValue
testInsertMultipleAsListWithKeyHolder(i, new DalHints().setShardValue(i));
}
}
@Test
public void testInsertMultipleAsListWithKeyHolderByShardCol() throws SQLException{
for(int i = 0; i < mod; i++) {
// By shardColValue
testInsertMultipleAsListWithKeyHolder(i, new DalHints().setShardColValue("index", i));
}
}
@Test
public void testInsertMultipleAsListWithKeyHolderByShardCol2() throws SQLException{
for(int i = 0; i < mod; i++) {
// By shardColValue
testInsertMultipleAsListWithKeyHolder(i, new DalHints().setShardColValue("dbIndex", i));
}
}
@Test
public void testInsertMultipleAsListWithKeyHolderByFields() throws SQLException{
List<ClientTestModel> entities = createListNoId(3);
int res;
KeyHolder holder = createKeyHolder();
deleteAllShardsByDbTable(dao, mod, tableMod);
// By fields not same shard
holder = createKeyHolder();
entities.get(0).setTableIndex(0);
entities.get(0).setDbIndex(0);
entities.get(1).setTableIndex(1);
entities.get(1).setDbIndex(1);
entities.get(2).setTableIndex(2);
entities.get(2).setDbIndex(2);
res = dao.insert(new DalHints(), holder, entities);
assertResEquals(3, res);
Assert.assertEquals(1, getCount(0, 0));
Assert.assertEquals(1, getCount(1, 1));
Assert.assertEquals(1, getCount(0, 2));
assertKeyHolder(holder);
}
/**
* Test Insert multiple entities with key-holder
* @throws SQLException
*/
public void testInsertMultipleAsListWithKeyHolder(int shardId, DalHints hints) throws SQLException{
List<ClientTestModel> entities = createListNoId(3);
KeyHolder holder = new KeyHolder();
int res;
try {
res = dao.insert(hints.clone(), holder, entities);
Assert.fail();
} catch (Exception e) {
e.printStackTrace();
}
for(int i = 0; i < tableMod; i++) {
int j = 1;
// By tabelShard
holder = createKeyHolder();
res = dao.insert(hints.clone().inTableShard(i), holder, entities);
assertResEquals(3, res);
Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
assertKeyHolder(holder);
// By tableShardValue
holder = createKeyHolder();
res = dao.insert(hints.clone().setTableShardValue(i), holder, entities);
assertResEquals(3, res);
Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
assertKeyHolder(holder);
// By shardColValue
holder = createKeyHolder();
res = dao.insert(hints.clone().setShardColValue("table", i), holder, entities);
assertResEquals(3, res);
Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
assertKeyHolder(holder);
// By shardColValue
holder = createKeyHolder();
res = dao.insert(hints.clone().setShardColValue("tableIndex", i), holder, entities);
assertResEquals(3, res);
Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
assertKeyHolder(holder);
// By fields same shard
holder = createKeyHolder();
entities.get(0).setTableIndex(i);
entities.get(1).setTableIndex(i);
entities.get(2).setTableIndex(i);
res = dao.insert(hints.clone(), holder, entities);
assertResEquals(3, res);
Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
assertKeyHolder(holder);
}
deleteAllShards(shardId);
// By fields not same shard
holder = createKeyHolder();
entities.get(0).setTableIndex(0);
entities.get(1).setTableIndex(1);
entities.get(2).setTableIndex(2);
res = dao.insert(hints.clone(), holder, entities);
Assert.assertEquals(1, getCount(shardId, 0));
Assert.assertEquals(1, getCount(shardId, 1));
Assert.assertEquals(1, getCount(shardId, 2));
assertResEquals(3, res);
assertKeyHolder(holder);
}
DalTableDao里面的下面操作支持shard的批量操作
- combinedInsert
- batchInsert
- batchDelete
例如
@Test
public void testCrossShardBatchInsert() {
try {
deleteAllShardsByDbTable(dao, mod, tableMod);
ClientTestModel[] pList = new ClientTestModel[mod * (1 + tableMod)*tableMod/2];
int x = 0;
for(int i = 0; i < mod; i++) {
for(int j = 0; j < tableMod; j++) {
for(int k = 0; k < j + 1; k ++) {
ClientTestModel p = new ClientTestModel();
p = new ClientTestModel();
p.setId(1 + k);
p.setAddress("aaa");
p.setDbIndex(i);
p.setTableIndex(j);
pList[x++] = p;
}
}
}
dao.batchInsert(new DalHints(), pList);
for(int i = 0; i < mod; i++) {
for(int j = 0; j < tableMod; j++) {
Assert.assertEquals(j + 1, getCount(i, j));
}
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
}
基于SQL的跨shard操作是指同一条语句在多个shard上执行。达到这样的效果,只需调用一次API即可。对该功能的支持是通过DalHints来实现的,没有新添加API。本功能的支持也是在DAO层面通过DalRequestExecutor实现,不涉及到DalClient改动。该功能只针对传入语句的API,例如delete和update操作(insert可以利用update API来完成),对传入pojo或pojo list的没效果。
基于SQL的跨shard操作目前只支持数据库级别的shard。暂时不支持表级别的shard,因为无法安全高效的解析和替换SQL里面的表名。 跨shard操作的两种类型
如果希望操作在所有shard上都执行,可以通过Dalhints的如下设置来实现 hints.inAllShards();
如果希望操作在给定的shard范围上执行,可以通过Dalhints的如下设置来实现 // By shards Set shards = new HashSet<>(); shards.add("0"); shards.add("1"); hints.inShards(shards);
下面列出了4个例子,分别是返回列表,返回对象,返回第一个,返回头几个,返回中间几个
private List<Short> queryListInAllShard(DalHints hints) throws SQLException {
return new DalQueryDao(DATABASE_NAME).query(
sqlList, parameters,
hints.inAllShards(),
new ShortRowMapper());
}
private ClientTestModel queryForObjectInAllShard(DalHints hints) throws SQLException {
StatementParameters parameters = new StatementParameters();
parameters.set(1, 1);
return new DalQueryDao(DATABASE_NAME).queryForObject(
sqlObject, parameters,
hints.inAllShards(),
new ClientTestDalRowMapper());
}
private ClientTestModel queryFirstInAllShard(DalHints hints) throws SQLException {
StatementParameters parameters = new StatementParameters();
parameters.set(1, 1);
return new DalQueryDao(DATABASE_NAME).queryFirst(
sqlFirst, parameters,
hints.inAllShards(),
new ClientTestDalRowMapper());
}
private List<Short> queryTopInAllShard(DalHints hints) throws SQLException {
return new DalQueryDao(DATABASE_NAME).queryTop(
sqlList, parameters,
hints.inAllShards(),
new ShortRowMapper(), 4);
}
private List<Short> queryFromInAllShard(DalHints hints) throws SQLException {
return new DalQueryDao(DATABASE_NAME).queryFrom(
sqlList, parameters,
hints.inAllShards(),
new ShortRowMapper(), 2, 4);
}
由于查询可以在多个shard上执行,为了保证返回的结果是用户希望的顺序,DAL支持用户自定义的结果合并接口ResultMerger,或者简单的传入一个Comparator作为排序的sorter。并且为ResultMerger提供了缺省的常用实现。
public interface ResultMerger<T> {
void addPartial(String shard, T partial) throws SQLException;
T merge() throws SQLException;
static class IntSummary implements ResultMerger<Integer>{
private int total;
@Override
public void addPartial(String shard, Integer partial) {
total += partial.intValue();
}
@Override
public Integer merge() {
return total;
}
}
代码示例1,使用ResultMerger
@Test
public void testQueryListAllShardsWithMerger() {
try {
DalHints hints = new DalHints();
List<Short> result = queryListInAllShard(hints.mergeBy(new TestResultMerger()));
Short t = result.get(0);
assertEquals(new Short((short)3), t);
} catch (Exception e) {
fail();
}
}
代码示例2,使用Comparator
@Test
public void testQueryListAllShardsWithSorter() {
try {
DalHints hints = new DalHints();
List<Short> result = queryListInAllShard(hints.sortBy(new TestComparator()));
assertEquals(6, result.size());
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
private class TestComparator implements Comparator<Short>{
@Override
public int compare(Short o1, Short o2) {
return o1.compareTo(o2);
}
}
异步和跨shard执行可以结合使用,如下
@Test
public void testQueryListAllShardsWithRowCallbackSequentialAsync() {
try {
DalHints hints = new DalHints();
TestDalRowCallback callback = new TestDalRowCallback();
new DalQueryDao(DATABASE_NAME).query(
sqlListQuantity, parameters,
hints.inAllShards().sequentialExecute().asyncExecution(),
callback);
// Make sure the execution is completed
hints.getAsyncResult().get();
// 66 = (10 + 11 + 12)*2
assertEquals(66, callback.result.get());
} catch (Exception e) {
fail();
}
}
private static class TestDalRowCallback implements DalRowCallback {
AtomicInteger result = new AtomicInteger();
public void process(ResultSet rs) throws SQLException {
result.addAndGet(rs.getShort("quantity"));
}
}
跨shard更新只需在调用前指定inAllShards和inShards即可
代码示例 @Test public void testUpdatePlainAllShardsCallback() throws SQLException{ String sql = "UPDATE " + TABLE_NAME + " SET address = 'CTRIP' WHERE id = 1"; StatementParameters parameters = new StatementParameters(); IntCallback callback = new IntCallback(); DalHints hints = new DalHints().callbackWith(callback);
int res;
// By allShards
sql = "UPDATE " + TABLE_NAME
+ " SET address = 'CTRIP' WHERE id = 1";
res = dao.update(sql, parameters, hints.inAllShards());
assertEquals(0, res);
res = callback.getInt();
assertResEquals(2, res);
assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(0)).getAddress());
assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(1)).getAddress());
}
@Test
public void testUpdatePlainShards() throws SQLException{
String sql = "UPDATE " + TABLE_NAME
+ " SET address = 'CTRIP' WHERE id = 1";
StatementParameters parameters = new StatementParameters();
DalHints hints = new DalHints();
int res;
// By shards
Set<String> shards = new HashSet<>();
shards.add("0");
shards.add("1");
sql = "UPDATE " + TABLE_NAME
+ " SET address = 'CTRIP' WHERE id = 1";
res = dao.update(sql, parameters, new DalHints().inShards(shards));
assertResEquals(2, res);
assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(0)).getAddress());
assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(1)).getAddress());
}
跨shard删除只需在调用前指定inAllShards和inShards即可
@Test
public void testDeleteWithWhereClauseShards() throws SQLException{
String whereClause = "type=?";
StatementParameters parameters = new StatementParameters();
parameters.set(1, Types.SMALLINT, 1);
DalHints hints = new DalHints();
int res;
// By shards
Set<String> shards = new HashSet<>();
shards.add("0");
shards.add("1");
assertEquals(3, getCountByDb(dao, 0));
assertEquals(3, getCountByDb(dao, 1));
res = dao.delete(whereClause, parameters, new DalHints().inShards(shards));
assertResEquals(6, res);
assertEquals(0, dao.query(whereClause, parameters, new DalHints().inShards(shards)).size());
}
Dal支持按照指定shardBy参数来优化包含IN的语句。Dal会把IN里面的参数按照各自属于的shard划分为几份,然后在需要的shard上执行SQL,参数只包括那个shard对应的参数集合
For more detail, please refer to DalTableDao and DalQueryDao enhancement
/*
* Indicate name of the parameter that will partition shards for the request.
*/
shardBy,
示例代码
private List<Short> queryListForInParamBuilder(DalHints hints) throws SQLException {
StatementParameters parameters = new StatementParameters();
List<Integer> inParam = new ArrayList<>();
inParam.add(0);
inParam.add(1);
inParam.add(2);
inParam.add(3);
inParam.add(4);
parameters.setInParameter(1, "type", Types.INTEGER, inParam);
FreeSelectSqlBuilder<List<Short>> builder = new FreeSelectSqlBuilder<>(dbCategory);
builder.setTemplate(sqlInParam);
builder.mapWith(new ShortRowMapper());
return new DalQueryDao(DATABASE_NAME).query(
builder, parameters,
hints.shardBy("type"));
}
可以通过DalHints来控制,缺省都是并行操作,可以通过设置为sequentialExecute来指定为顺序操作。
hints.inAllShards().sequentialExecute().asyncExecution(),