中间件包括ShardingSphere分库分表的封装、Redis的封装、Kafka的封装、Elastic-Search的封装、db2es-admin的封装、 db2es-client等的封装等。
database_0数据库容纳数据表table_0, table_1, …, table_15
database_1数据库容纳数据表table_16, table_17, …, table_31
database_2数据库容纳数据表table_32, table_33, …, table_47
database_3数据库容纳数据表table_48, table_49, …, table_63
id : 主键, BIGINT UNSIGNED类型 (分布式主键, 例如:雪花算法)
row_create_time : 记录创建时间, datetime(3), 必须设置默认值: CURRENT_TIMESTAMP, 业务代码不可修改该字段
row_update_time : 记录最后一次修改时间, datetime(3), 必须勾选根据当前时间戳更新, 业务代码不可修改该字段
<dependency> <groupId>org.wyyt</groupId> <artifactId>elasticsearch-starter</artifactId> <version>${lastest_version}</version> </dependency>
sharding: # 是否开启ShardingSphere数据源 enabled: true # 分布式集群编号id, 不能重复(取值范围0~1023) work-id: 1 # 是否输出执行的sql(true:打印; false:不打印) show-sql: true # ACM配置信息 acm: datasource: data-id: scfs.xml.datasource.encrypt group: SIJIBAO_ORDER_CENTER_GROUP dimension: data-id: scfs.xml.dimension group: SIJIBAO_ORDER_CENTER_GROUP table: data-id: scfs.xml.table group: SIJIBAO_ORDER_CENTER_GROUP acmConfigPath: acmConfig.properties nacosLocalSnapshotPath: /wyyt/etc/acm/sql_tool nacosLogPath: /wyyt/logs/tomcat/sql_tool/
其中, scfs.xml.datasource.encrypt数据源配置信息如下:
<?xml version="1.0" encoding="UTF-8"?> <datasources> <!-- name: 数据库的逻辑名称. 必填项, 必须唯一 --> <!-- index: 数据库的索引(分库时使用), 从0开始, 默认为0 --> <datasource name="finance_center_main_0" index="0"> <!-- 数据库IP地址. 必填项 --> <host>192.168.0.197</host> <!-- 数据库端口. 必填项 --> <port>3306</port> <!-- 数据库的真实物理名称. 必填项 --> <databaseName>finance_center_main_0</databaseName> <!-- 数据库的账号. 必填项 --> <username>root</username> <!-- 数据库的密码. 必填项 --> <password>EqkPepuq0FN49w=</password> <!-- 配置连接池中最小可用连接的个数 --> <minIdle>10</minIdle> <!-- 配置连接池中最大可用连接的个数 --> <maxActive>20</maxActive> </datasource> <datasource name="finance_center_main_1" index="1"> <host>192.168.0.197</host> <port>3306</port> <databaseName>finance_center_main_1</databaseName> <username>root</username> <password>EqkPepuq0FNoCe49w=</password> <minIdle>10</minIdle> <maxActive>20</maxActive> </datasource> <!--******当SQL所涉及的数据表在以上数据源中查询不到时, 会自动去isDefault=true(该属性默认为false)的数据源中寻找, 最多只能拥有一个isDefault=true的数据源******--> <datasource name="finance_other" isDefault="true"> <host>192.168.5.110</host> <port>3306</port> <databaseName>finance_dev</databaseName> <username>fin</username> <password>TdAvSNMlMQhNY2MG9pzKY=</password> <minIdle>10</minIdle> <maxActive>20</maxActive> </datasource> </datasources>
scfs.xml.dimension维度配置信息如下:
<?xml version="1.0" encoding="UTF-8"?> <dimensions> <!-- name: 维度名称,必须唯一。不允许为空 priority: 当多个拆分键在同一条SQL中出现时,维度的优先级,数值越低,优先级越高。 不允许为空。 当priority="0"时,优先级最高,被视为是主维度,多个维度之间只能有一个主维度 description: 当前维度的描述信息,不允许为空 --> <dimension name="order-no" priority="0" description="订单维度"> <!-- ref: 数据库的逻辑名称。不允许为空 --> <datasource ref="finance_center_main_0"/> <datasource ref="finance_center_main_1"/> </dimension> </dimensions>
scfs.xml.table数据表配置信息如下:
<?xml version="1.0" encoding="UTF-8"?> <tables> <!-- name: 数据表的逻辑名称,必须唯一。不允许为空 pkName: 主键。 可以为空,为空默认为id rowCreateTime: 记录创建时间字段(时间精确到毫秒),为空默认为row_create_time rowUpdateTime: 记录最后一次修改时间字段(时间精确到毫秒),为空默认为row_update_time bindingName: 具有相同绑定名称的表为一组绑定表, 为空表示不和任何表组成绑定表 broadcast: 是否是广播表(true: 是广播表; false: 不是)。为空表示false --> <table name="fin_pay_fund_flow_out_fund" pkName="id"> <!-- ref: 维度信息xml配置中的维度名称name tableCountNum: 逻辑表在该维度下的分表总个数, 默认为1 shardingColumn: 逻辑表在该维度下的拆分键字段, 默认为id tableNameFormat: 逻辑表与物理表之间的映射关系表达式, 为空默认是{逻辑名称} (也可以是: {逻辑名称}_%s, 其中, %s为下标索引, 从0开始到{tableCountNum-1}) --> <dimension ref="order-no" tableCountNum="64" shardingColumn="order_no"/> </table> <table name="fin_sjb_order_out_fund" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="order_no"/> </table> <table name="fin_sjb_order" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/> </table> <table name="fin_sjb_order_sub_line" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/> </table> <table name="fin_sjb_order_feerate_content" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/> </table> <table name="fin_payment_days_info" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/> </table> <table name="fin_sjb_order_pay_line" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="sjb_stock_no"/> </table> <table name="fin_pay_fund_flow_detail" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="stock_no"/> </table> <table name="fin_external_capital_change_wide" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="order_no"/> </table> <table name="fin_ac_out_fund_chg" pkName="id"> <dimension ref="order-no" tableCountNum="64" shardingColumn="trade_no"/> </table> </tables>
@Configuration public class DataSourceConfig { @Autowired private ShardingDataSource shardingDataSource; @Bean public DataSource dataSource() { return this.shardingDataSource; } }
acm: data-id: scfs.tool group: SIJIBAO_ORDER_CENTER_GROUP acmConfigPath: acmConfig.properties nacosLocalSnapshotPath: /wyyt/etc/acm/db2es/ nacosLogPath: /wyyt/logs/sql_tool/其中, scfs.tool配置如下:sharding: enabled: true work-id: 300 show-sql: false
acm: datasource: data-id: scfs.xml.datasource.encrypt group: SIJIBAO_ORDER_CENTER_GROUP dimension: data-id: scfs.xml.dimension group: SIJIBAO_ORDER_CENTER_GROUP table: data-id: scfs.xml.table group: SIJIBAO_ORDER_CENTER_GROUP acmConfigPath: acmConfig.properties nacosLocalSnapshotPath: /wyyt/etc/acm/sql_tool nacosLogPath: /wyyt/logs/tomcat/springcloud/sql_tool/
#sql tool工具端口 sql.tool.port=10086 #sql tool数据库配置 db.host=192.168.0.197 db.port=3306 db.username=root encrypt.db.password=Xzl9H5z0zWOGu5nh= db.dbName=scfs_sql_developer
<dependency> <groupId>org.wyyt</groupId> <artifactId>redis-starter</artifactId> <version>${lastest_version}</version> </dependency>
spring: redis: host: 192.168.6.167 port: 6379 password: ******** timeout: 2000 database: 0 jedis: pool: max-idle: 1000 max-wait: -1 min-idle: 0
@Autowired private RedisService redisService; //读写 public void setAndGet() { this.redisService.set(KEY, System.currentTimeMillis()); Assert.notNull(this.redisService.get(KEY), "set & get 失败"); }//分布式锁 public void lock() { try (RedisService.Lock lock = this.redisService.getLock(KEY, 10000L, 6000L)) { if ( lock.hasLock()) { System.out.println("拿到锁了: " + lock.lockKey() + " " + lock.requestId()); } else { System.err.println(" 没有拿到锁"); } } Assert.isNull(this.redisService.get(KEY), "lock失败"); }
<dependency> <groupId>org.wyyt</groupId> <artifactId>kafka-starter</artifactId> <version>${lastest_version}</version> </dependency>
spring: kafka: bootstrap-servers: 192.168.6.164:9092,192.168.6.165:9092,192.168.6.166:9092 listener: missing-topics-fatal: false producer: retries: 3 batch-size: 1024 buffer-memory: 33554432 acks: all compression-type: lz4 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
@Autowired private KafkaTest kafkaTest; //同步发送 public void send() throws Exception { this.kafkaService.send(TOPIC_NAME, "KEY", String.valueOf(System.currentTimeMillis())); }//同步发送(带事务,当方法体失败时,该消息不会被消费) @TranKafka public void sendTran() throws Exception { this.kafkaService.send(TOPIC_NAME, "KEY", String.valueOf( System.currentTimeMillis())); }
//异步发送(带事务) @TranKafka public void sendTranAsync() { this.kafkaService.sendAsync(TOPIC_NAME, "KEY", String.valueOf( System.currentTimeMillis()), (sendResult, throwable) -> { log.info(sendResult.toString()); Assert.isTrue(false, " 回调方法中的异常是不会回滚的"); }); Assert.isTrue(false, "能够正常回滚"); }
server.port = 9999 zookeeper.servers = 192.168.6.166:2181,192.168.6.167:2181,192.168.0.197:2181 db.host = 192.168.0.197 db.port = 3306 db.name = kafka_monitor db.username = root db.password = XXXXX retention.days = 3 topic.blacklist =
<dependency> <groupId>org.wyyt</groupId> <artifactId>elaticsearch-starter</artifactId> <version>${lastest_version}</version> </dependency>
elasticsearch: enabled: true hostnames: 192.168.6.165:9900,192.168.6.166:9900,192.168.6.167:9900 username: elastic password: ****** max-conn-total: 100 max-conn-per-route: 20
@Autowired private ElasticSearchService elasticSearchService;//根据主键查询 public void getById() throws Exception { String response = this.elasticSearchService.getById(INDEX_NAME, PRIMARY_KEY_VALUE, String.class); System.out.println(response); }
//条件查询 public void test06_search() throws Exception { SearchRequest searchRequest = new SearchRequest(INDEX_NAME); BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.must(QueryBuilders.rangeQuery("id").gte(1) .lte(20)); //范围查询。must相当于SQL where字句中的AND; should则相当于OR boolQueryBuilder.must(QueryBuilders.matchQuery("remark", " 颚ABCDEFGHIJKLMNOPQRSTUVWXYZ_1234567890987654321")); //match查询 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.from(0); //获取的起始位置,可用以分页 searchSourceBuilder.size(10);//获取的document记录数,可用于分页 searchSourceBuilder.sort("row_create_time", SortOrder.ASC); //排序 searchSourceBuilder.fetchSource(new String[]{"id", "name", "remark"}, new String[]{}); searchRequest.source( searchSourceBuilder); List response = this.elasticSearchService.select(searchRequest, String.class); for (String s : response) { System.out.println(s); } }
//分页查询 public void page() throws IOException { SearchRequest searchRequest = new SearchRequest(INDEX_NAME); BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.must(QueryBuilders.matchPhraseQuery("id", " 1")); //match查询 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query( boolQueryBuilder); searchRequest.source(searchSourceBuilder); IPage page = this.elasticSearchService.page( searchRequest, TestEntity.class, new Page<>(1, 10)); System.out.println(page.getRecords()); }
## kafka集群所使用的zookeeper集群地址, 多个用逗号隔开 zookeeper.servers=192.168.6.166:2181,192.168.6.167:2181 ## ## 目标ElasticSearch的地址, 多个用逗号隔开 elasticsearch.hostnames=192.168.6.165:9900,192.168.6.166:9900,192.168.6.167:9900 ## ElasticSearch的用户名 elasticsearch.username=finance ## ElasticSearch的密码 encrypt.elasticsearch.password=AQZRHONdKs= ## ## db2es数据库的地址 db.host=192.168.0.197 ## db2es数据库的端口 db.port=3306 ## db2es数据库的库名 db.databaseName=scfs_db2es ## db2es数据库的用户名 db.username=root ## db2es数据库的密码 encrypt.db.password=APgXwToHDGFNOz0=