Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/td 30531 #62

Merged
merged 20 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions nodejs/example/basicBatchTmq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { WSConfig } from "../src/common/config";
import { TMQConstants } from "../src/tmq/constant";
import { destroy, setLogLevel, sqlConnect, tmqConnect } from "../src";
import { WsConsumer } from "../src/tmq/wsTmq";

const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group-50";
const clientId = "client-50";

async function createConsumer() {
let configMap = new Map([
[TMQConstants.GROUP_ID, groupId],
[TMQConstants.CLIENT_ID, clientId],
[TMQConstants.CONNECT_USER, "root"],
[TMQConstants.CONNECT_PASS, "taosdata"],
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
[TMQConstants.WS_URL, url],
[TMQConstants.ENABLE_AUTO_COMMIT, 'false'],
[TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
let conn = await tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err: any) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}

}
// ANCHOR_END: create_consumer

async function prepare() {
let conf = new WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb(db);
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;

let wsSql = await sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);

let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
wsSql.close();
}

async function insert() {
let conf = new WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb(db);
let wsSql = await sqlConnect(conf);
for (let i = 0; i < 10000; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW + ${i}a, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
await wsSql.close();
console.log("insert fininsh!!!!!")
}

async function subscribe(consumer: WsConsumer) {
// ANCHOR: commit
try {
let count = 0;
await consumer.subscribe(topics);
let bFinish = false;
let bBegin = false;
const startTime = new Date().getTime();
while (!bFinish) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
let data = value.getData();
if (data) {
if (data.length == 0 && bBegin) {
bFinish = true;
break;
} else if(data.length > 0){
bBegin = true;
}
count += data.length
console.log("poll end ------>", count);
}

}
// await consumer.commit();
}
const endTime = new Date().getTime();
console.log(count, endTime - startTime);
} catch (err: any) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
// ANCHOR_END: commit
}

async function consumer() {
// ANCHOR: unsubscribe
setLogLevel("debug");
let consumer = null;
try {
// await prepare();
consumer = await createConsumer();
// const allPromises = [];
// allPromises.push(subscribe(consumer));
// allPromises.push(insert());
// await Promise.all(allPromises);
// await insert();
await subscribe(consumer);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err: any) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
destroy();
}
// ANCHOR_END: unsubscribe
}

async function test() {
await consumer();
}

test()
20 changes: 11 additions & 9 deletions nodejs/example/basicSql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ let dsn = 'ws://root:taosdata@localhost:6041';

taosResult = await wsSql.exec('INSERT INTO d1001 USING meters TAGS ("California.SanFrancisco", 3) VALUES (NOW, 10.2, 219, 0.32)', reqId++)
console.log(taosResult);

wsRows = await wsSql.query('select * from meters', reqId++);
let meta = wsRows.getMeta()
console.log("wsRow:meta:=>", meta);

while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
for (let i = 0; i< 100; i++) {
wsRows = await wsSql.query('select * from meters', reqId++);
let meta = wsRows.getMeta()
console.log("wsRow:meta:=>", meta);

while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
wsRows.close()
}
wsRows.close()


} catch (err: any) {
console.error(err.code, err.message);
Expand Down
44 changes: 23 additions & 21 deletions nodejs/example/basicTmq.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { WSConfig } from "../src/common/config";
import { TMQConstants } from "../src/tmq/constant";
import { destroy, sqlConnect, tmqConnect } from "../src";
import { destroy, setLogLevel, sqlConnect, tmqConnect } from "../src";

const stable = 'meters';
const db = 'power'
const topics:string[] = ['pwer_meters_topic']
const topics:string[] = ['topic_ws_map']
let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};`
let configMap = new Map([
[TMQConstants.GROUP_ID, "gId"],
[TMQConstants.GROUP_ID, "gId_11"],
[TMQConstants.CONNECT_USER, "root"],
[TMQConstants.CONNECT_PASS, "taosdata"],
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
[TMQConstants.CLIENT_ID, 'test_tmq_client'],
[TMQConstants.CLIENT_ID, 'test_tmq_client11'],
[TMQConstants.WS_URL, 'ws://localhost:6041'],
[TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[TMQConstants.ENABLE_AUTO_COMMIT, 'false'],
[TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
let dsn = 'ws://root:taosdata@localhost:6041';
Expand All @@ -29,37 +29,39 @@ async function Prepare() {
await ws.exec(useDB);
await ws.exec(createStable);
await ws.exec(createTopic);
for (let i = 0; i < 10; i++) {
await ws.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10+i}, ${200+i}, ${0.32 + i})`)
for (let i = 0; i < 1000; i++) {
await ws.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW + ${i}a, ${10+i}, ${200+i}, ${0.32 + i})`)
}
ws.close()
await ws.close()

}

(async () => {
let consumer = null
try {
setLogLevel("debug")
await Prepare()
consumer = await tmqConnect(configMap);
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
let res = await consumer.poll(100);
for (let [key, value] of res) {
console.log(key, value);
console.log(key, value.getMeta());
let data = value.getData();
if (data) {
console.log(data.length)
}
}
if (res.size == 0) {
break;
}
await consumer.commit();
// await consumer.commit();
}

let assignment = await consumer.assignment()
console.log(assignment)
await consumer.seekToBeginning(assignment)
assignment = await consumer.assignment()
for(let i in assignment) {
console.log("seek after:", assignment[i])
}
// let assignment = await consumer.assignment()
// console.log(assignment)
// await consumer.seekToBeginning(assignment)
// assignment = await consumer.assignment()
// for(let i in assignment) {
// console.log("seek after:", assignment[i])
// }
await consumer.unsubscribe()
} catch (e) {
console.error(e);
Expand Down
4 changes: 2 additions & 2 deletions nodejs/package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "@tdengine/websocket",
"version": "3.1.0",
"version": "3.1.1",
"description": "TDengine Connector for nodejs and browser using WebSocket.",
"source": "index.ts",
"main": "lib/index.js",
"module": "lib/module/index.mjs",
"module": "lib/index.js",
"types": "lib/index.d.ts",
"directories": {
"example": "example",
Expand Down
10 changes: 7 additions & 3 deletions nodejs/src/client/wsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,19 @@ export class WsClient {


// need to construct Response.
async sendBinaryMsg(reqId: bigint, action:string, message: ArrayBuffer, bSqlQuery:boolean = true): Promise<any> {
async sendBinaryMsg(reqId: bigint, action:string, message: ArrayBuffer, bSqlQuery:boolean = true, bResultBinary: boolean = false): Promise<any> {
return new Promise((resolve, reject) => {
if (this._wsConnector && this._wsConnector.readyState() > 0) {
this._wsConnector.sendBinaryMsg(reqId, action, message).then((e: any) => {
if (bResultBinary) {
resolve(e);
}

if (e.msg.code == 0) {
if (bSqlQuery) {
resolve(new WSQueryResponse(e));
}else{
resolve(e)
resolve(e);
}
} else {
reject(new WebSocketInterfaceError(e.msg.code, e.msg.message));
Expand Down Expand Up @@ -184,7 +188,7 @@ export class WsClient {
taosResult.addTotalTime(resp.totalTime)
// if retrieve JSON then reject with message
// else is binary , so parse raw block to TaosResult
parseBlock(fetchResponse.rows, new WSFetchBlockResponse(resp.msg), taosResult)
parseBlock(new WSFetchBlockResponse(resp.msg), taosResult)
resolve(taosResult);
}).catch((e) => reject(e));
} else {
Expand Down
6 changes: 3 additions & 3 deletions nodejs/src/client/wsConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ export class WebSocketConnector {

private _onmessage(event: any) {
let data = event.data;
logger.debug("wsClient._onMessage()===="+ (Object.prototype.toString.call(data)))
logger.debug("wsClient._onMessage()====" + (Object.prototype.toString.call(data)))
if (Object.prototype.toString.call(data) === '[object ArrayBuffer]') {
let id = new DataView(data, 8, 8).getBigUint64(0, true);
let id = new DataView(data, 26, 8).getBigUint64(0, true);
WsEventCallback.instance().handleEventCallback({id:id, action:'', req_id:BigInt(0)},
OnMessageType.MESSAGE_TYPE_ARRAYBUFFER, data);

Expand Down Expand Up @@ -140,7 +140,7 @@ export class WebSocketConnector {
WsEventCallback.instance().registerCallback({ action: action, req_id: reqId,
timeout:this._timeout, id: reqId}, resolve, reject);
}
logger.debug("[wsClient.sendBinaryMsg()]===>" + reqId, action, message.byteLength)
logger.debug("[wsClient.sendBinaryMsg()]===>" + reqId + action + message.byteLength)
this._wsConn.send(message)
} else {
reject(new WebSocketQueryError(ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
Expand Down
48 changes: 41 additions & 7 deletions nodejs/src/client/wsResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* define ws Response type|class, for query?
*/

import { MessageResp } from "../common/taosResult";
import { MessageResp, readVarchar } from "../common/taosResult";

export class WSVersionResponse {
version: string;
Expand Down Expand Up @@ -87,14 +87,48 @@ export class WSFetchResponse {
}

export class WSFetchBlockResponse {

id: bigint
data: ArrayBuffer
data: DataView | undefined
action: bigint
timing: bigint
reqId: bigint
code: number
blockLen: number
message: string | undefined
resultId: bigint | undefined
finished: number | undefined
metaType: number | undefined
textDecoder: TextDecoder
constructor(msg: ArrayBuffer) {
this.timing = new DataView(msg, 0, 8).getBigUint64(0, true)
this.id = new DataView(msg, 8, 8).getBigUint64(0, true)
this.data = msg.slice(16)
let dataView = new DataView(msg);
this.action = dataView.getBigUint64(8, true)
this.timing = dataView.getBigUint64(18, true)
this.reqId = dataView.getBigUint64(26, true)
this.code = dataView.getUint32(34, true)
this.textDecoder = new TextDecoder()
this.blockLen = 0;
if (this.code != 0) {
let len = dataView.getUint32(38, true)
this.message = readVarchar(msg, 42, len, this.textDecoder);
return;
}
this.resultId = dataView.getBigUint64(42, true)
let offset = 50;
if (this.action == BigInt(8)) {
this.metaType = dataView.getUint16(50, true)
offset += 2;
}else {
this.finished = dataView.getUint8(50)
if (this.finished == 1) {
return;
}
offset += 1;
}

this.blockLen = dataView.getUint32(offset, true)
if (this.blockLen > 0) {
this.data = new DataView(msg, offset + 4);
}

}
}

Expand Down
Loading
Loading