forked from parksben/mysql_2_elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
89 lines (76 loc) · 2.94 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
var elasticsearch = require('elasticsearch');
var mysql = require('mysql');
var es_jdbc = require('./lib/pull-push.js');
var pullDataFromMysql = es_jdbc.pull;
var pushDataToElastic = es_jdbc.push;
var transSingleTable = function(pool, client, config, $table, sqlPhrase, $type, filter_map, exception_handler, callback) {
// 配置项
var es_config = {
host: config.elasticsearch.host_config,
chunkSize: config.elasticsearch.chunkSize,
timeout: config.elasticsearch.timeout,
src_table: $table,
index: config.elasticsearch.index,
type: $type
};
// 导出 mysql 数据表 到 bulk文件
pullDataFromMysql(pool, $table, sqlPhrase, filter_map, exception_handler, function(obj) {
if (obj.message == 'success') {
console.log('==>> ' + obj.table + '.bulk.json 文件构造完毕!');
}
// 导入 bulk 数据 到 es
pushDataToElastic(client, es_config, function(obj) {
if (obj.message == 'success') {
console.log('====>> /' + obj.index + '/' + obj.type + ' 导入数据成功!');
callback(true);
} else {
console.log('====>> /' + obj.index + '/' + obj.type + ' 导入数据失败!');
callback(false);
}
});
});
};
module.exports = function(config, callback) {
// 开始建立 MySQL 数据库连接
var pool = mysql.createPool( config.mysql );
console.log('开始连接数据库:' + config.mysql.host + ':' + config.mysql.port + '/' + config.mysql.database);
// 开始建立 elasticsearch 客户端服务
var client = new elasticsearch.Client(config.elasticsearch.host_config);
console.log('开始建立 elasticsearch 客户端连接...');
var mapLen = JSON.stringify(config.riverMap).match(/\=\>/g).length;
var successArr = [];
var failedArr = [];
for (var key in config.riverMap) {
var curTable = key.split('=>')[0].replace(/\s+/g, '');
var curType = key.split('=>')[1].replace(/\s+/g, '');
// filter_out
if (!config.riverMap[key].filter_out || config.riverMap[key].filter_out.length == 0) {
var filter_map = [];
} else {
var filter_map = config.riverMap[key].filter_out;
}
// exception_handler
if (!config.riverMap[key].exception_handler) {
var exception_handler = {};
} else {
var exception_handler = config.riverMap[key].exception_handler;
}
// 表名为SQL,则表示将SQL查询结果存入对应type
var sqlPhrase = !config.riverMap[key].SQL ? '' : config.riverMap[key].SQL;
transSingleTable(pool, client, config, curTable, sqlPhrase, curType, filter_map, exception_handler, function(state) {
if (state) {
successArr.push(curTable);
} else {
failedArr.push(curTable);
}
if (successArr.length + failedArr.length == mapLen) {
callback({
total: mapLen,
success: successArr.length,
failed: failedArr.length,
result: successArr.length == mapLen ? 'success' : 'failed'
});
}
});
}
};