From 7fbb1c42e178b8dd14949ae13e05eec1ce16337f Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 21 Sep 2024 19:11:44 +0800 Subject: [PATCH 01/20] add query new interface --- nodejs/example/basicSql.ts | 2 +- nodejs/example/basicTmq.ts | 23 +- nodejs/src/client/wsClient.ts | 10 +- nodejs/src/client/wsConnector.ts | 4 +- nodejs/src/client/wsResponse.ts | 41 ++- nodejs/src/common/constant.ts | 3 + nodejs/src/common/taosResult.ts | 52 ++- nodejs/src/common/utils.ts | 33 +- nodejs/src/sql/wsRows.ts | 34 +- nodejs/src/sql/wsSql.ts | 65 ++-- nodejs/src/tmq/constant.ts | 25 ++ nodejs/src/tmq/tmqResponse.ts | 316 ++++++++++++------ nodejs/src/tmq/wsTmq.ts | 65 ++-- nodejs/test/bulkPulling/sql.test.ts | 2 +- nodejs/test/bulkPulling/tmq.test.ts | 28 +- nodejs/test/bulkPulling/wsConnectPool.test.ts | 8 +- 16 files changed, 503 insertions(+), 208 deletions(-) diff --git a/nodejs/example/basicSql.ts b/nodejs/example/basicSql.ts index 85189ab..200a9f9 100644 --- a/nodejs/example/basicSql.ts +++ b/nodejs/example/basicSql.ts @@ -1,7 +1,7 @@ import { WSConfig } from '../src/common/config'; import { sqlConnect, destroy, setLogLevel } from '../src' -let dsn = 'ws://root:taosdata@localhost:6041'; +let dsn = 'ws://root:taosdata@192.168.1.98:6041'; (async () => { let wsSql = null; let wsRows = null; diff --git a/nodejs/example/basicTmq.ts b/nodejs/example/basicTmq.ts index 34e7749..356d821 100644 --- a/nodejs/example/basicTmq.ts +++ b/nodejs/example/basicTmq.ts @@ -1,10 +1,10 @@ import { WSConfig } from "../src/common/config"; import { TMQConstants } from "../src/tmq/constant"; -import { destroy, sqlConnect, tmqConnect } from "../src"; +import { destroy, setLogLevel, sqlConnect, tmqConnect } from "../src"; const stable = 'meters'; -const db = 'power' -const topics:string[] = ['pwer_meters_topic'] +const db = 'power18' +const topics:string[] = ['topic_ws_map'] let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` let configMap = new Map([ [TMQConstants.GROUP_ID, "gId"], @@ -12,11 +12,11 @@ let configMap = new Map([ [TMQConstants.CONNECT_PASS, "taosdata"], [TMQConstants.AUTO_OFFSET_RESET, "earliest"], [TMQConstants.CLIENT_ID, 'test_tmq_client'], - [TMQConstants.WS_URL, 'ws://localhost:6041'], + [TMQConstants.WS_URL, 'ws://192.168.1.98:6041'], [TMQConstants.ENABLE_AUTO_COMMIT, 'true'], [TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] ]); -let dsn = 'ws://root:taosdata@localhost:6041'; +let dsn = 'ws://root:taosdata@192.168.1.98:6041'; async function Prepare() { let conf :WSConfig = new WSConfig(dsn) const createDB = `create database if not exists ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;` @@ -39,17 +39,20 @@ async function Prepare() { (async () => { let consumer = null try { + setLogLevel("debug") await Prepare() consumer = await tmqConnect(configMap); await consumer.subscribe(topics); for (let i = 0; i < 5; i++) { let res = await consumer.poll(500); - for (let [key, value] of res) { - console.log(key, value); - } - if (res.size == 0) { - break; + console.log(res.getTopic(), res.getMeta()); + let data = res.getData(); + if (data) { + for (let record of data ) { + console.log(record) + } } + await consumer.commit(); } diff --git a/nodejs/src/client/wsClient.ts b/nodejs/src/client/wsClient.ts index 7f181d0..44850fa 100644 --- a/nodejs/src/client/wsClient.ts +++ b/nodejs/src/client/wsClient.ts @@ -95,15 +95,19 @@ export class WsClient { // need to construct Response. - async sendBinaryMsg(reqId: bigint, action:string, message: ArrayBuffer, bSqlQuery:boolean = true): Promise { + async sendBinaryMsg(reqId: bigint, action:string, message: ArrayBuffer, bSqlQuery:boolean = true, bResultBinary: boolean = false): Promise { return new Promise((resolve, reject) => { if (this._wsConnector && this._wsConnector.readyState() > 0) { this._wsConnector.sendBinaryMsg(reqId, action, message).then((e: any) => { + if (bResultBinary) { + resolve(e); + } + if (e.msg.code == 0) { if (bSqlQuery) { resolve(new WSQueryResponse(e)); }else{ - resolve(e) + resolve(e); } } else { reject(new WebSocketInterfaceError(e.msg.code, e.msg.message)); @@ -184,7 +188,7 @@ export class WsClient { taosResult.addTotalTime(resp.totalTime) // if retrieve JSON then reject with message // else is binary , so parse raw block to TaosResult - parseBlock(fetchResponse.rows, new WSFetchBlockResponse(resp.msg), taosResult) + parseBlock(new WSFetchBlockResponse(resp.msg), taosResult) resolve(taosResult); }).catch((e) => reject(e)); } else { diff --git a/nodejs/src/client/wsConnector.ts b/nodejs/src/client/wsConnector.ts index 99af734..6e7d2c7 100644 --- a/nodejs/src/client/wsConnector.ts +++ b/nodejs/src/client/wsConnector.ts @@ -56,7 +56,7 @@ export class WebSocketConnector { let data = event.data; logger.debug("wsClient._onMessage()===="+ (Object.prototype.toString.call(data))) if (Object.prototype.toString.call(data) === '[object ArrayBuffer]') { - let id = new DataView(data, 8, 8).getBigUint64(0, true); + let id = new DataView(data, 26, 8).getBigUint64(0, true); WsEventCallback.instance().handleEventCallback({id:id, action:'', req_id:BigInt(0)}, OnMessageType.MESSAGE_TYPE_ARRAYBUFFER, data); @@ -140,7 +140,7 @@ export class WebSocketConnector { WsEventCallback.instance().registerCallback({ action: action, req_id: reqId, timeout:this._timeout, id: reqId}, resolve, reject); } - logger.debug("[wsClient.sendBinaryMsg()]===>" + reqId, action, message.byteLength) + logger.debug("[wsClient.sendBinaryMsg()]===>" + reqId + action + message.byteLength) this._wsConn.send(message) } else { reject(new WebSocketQueryError(ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, diff --git a/nodejs/src/client/wsResponse.ts b/nodejs/src/client/wsResponse.ts index d96915c..76f3d2c 100644 --- a/nodejs/src/client/wsResponse.ts +++ b/nodejs/src/client/wsResponse.ts @@ -2,7 +2,7 @@ * define ws Response type|class, for query? */ -import { MessageResp } from "../common/taosResult"; +import { MessageResp, readVarchar } from "../common/taosResult"; export class WSVersionResponse { version: string; @@ -87,14 +87,41 @@ export class WSFetchResponse { } export class WSFetchBlockResponse { - - id: bigint - data: ArrayBuffer + data: ArrayBuffer | undefined + action: bigint timing: bigint + reqId: bigint + code: number + message: string | undefined + resultId: bigint | undefined + finished: number | undefined + blockLen: number | undefined + metaType: number | undefined constructor(msg: ArrayBuffer) { - this.timing = new DataView(msg, 0, 8).getBigUint64(0, true) - this.id = new DataView(msg, 8, 8).getBigUint64(0, true) - this.data = msg.slice(16) + this.action = new DataView(msg, 8, 8).getBigUint64(0, true) + this.timing = new DataView(msg, 18, 8).getBigUint64(0, true) + this.reqId = new DataView(msg, 26, 8).getBigUint64(0, true) + this.code = new DataView(msg, 34, 4).getUint32(0, true) + if (this.code != 0) { + let len = new DataView(msg, 38, 4).getUint32(0, true) + this.message = readVarchar(msg, 42, len); + return; + } + this.resultId = new DataView(msg, 42, 8).getBigUint64(0, true) + let offset = 50; + if (this.action == BigInt(8)) { + this.metaType = new DataView(msg, 50, 2).getUint16(0, true) + offset += 2; + }else { + this.finished = new DataView(msg, 50, 1).getUint8(0) + if (this.finished == 1) { + return; + } + offset += 1; + } + + this.blockLen = new DataView(msg, offset, 4).getUint32(0, true) + this.data = msg.slice(offset + 4); } } diff --git a/nodejs/src/common/constant.ts b/nodejs/src/common/constant.ts index ac7bbfe..a7b83fd 100644 --- a/nodejs/src/common/constant.ts +++ b/nodejs/src/common/constant.ts @@ -6,6 +6,9 @@ export interface StringIndexable { [index: string]: number } +export const BinaryQueryMessage: bigint = BigInt(6); +export const FetchRawBlockMessage: bigint = BigInt(7); + export const TDengineTypeName: IndexableString = { 0: 'NULL', 1: 'BOOL', diff --git a/nodejs/src/common/taosResult.ts b/nodejs/src/common/taosResult.ts index 63f4841..a432371 100644 --- a/nodejs/src/common/taosResult.ts +++ b/nodejs/src/common/taosResult.ts @@ -22,18 +22,21 @@ export interface MessageResp { } export class TaosResult { + private _topic?: string private _meta: Array | null; private _data: Array> | null; private _precision: number | null | undefined; protected _affectRows: number | null | undefined; private _totalTime = 0; + /** unit nano seconds */ private _timing: bigint | null | undefined; constructor(queryResponse?: WSQueryResponse) { + if (queryResponse == null) { - this._meta = null - this._data = null + this._meta = [] + this._data = [] this._timing = BigInt(0) return } @@ -75,11 +78,25 @@ export class TaosResult { } } - + public getTopic(): string { + if (this._topic) { + return this._topic; + } + return ""; + } + public setTopic(topic: string = "") { + this._topic = topic; + } public getMeta(): Array | null { return this.getTDengineMeta(); } + public setMeta(metaData: ResponseMeta){ + if (this._meta) { + this._meta.push(metaData); + } + } + public getData(): Array> | null { return this._data; } @@ -132,13 +149,17 @@ export class TaosResult { } } -export function parseBlock(rows: number, blocks: WSFetchBlockResponse, taosResult: TaosResult): TaosResult { +export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult): TaosResult { let metaList = taosResult.getTaosMeta() let dataList = taosResult.getData() - if (metaList && dataList) { + if (metaList && dataList && blocks && blocks.data) { + let rows = new DataView(blocks.data, 8, 4).getUint32(0, true); + if (rows == 0) { + return taosResult; + } + taosResult.setTiming(blocks.timing) const INT_32_SIZE = 4; - // Offset num of bytes from rawBlockBuffer. let bufferOffset = (4 * 5) + 8 + (4 + 1) * metaList.length let colLengthBlockSize = INT_32_SIZE * metaList.length @@ -160,7 +181,7 @@ export function parseBlock(rows: number, blocks: WSFetchBlockResponse, taosResul // traverse row after row. for (let j = 0; j < metaList.length; j++) { - let isVarType = _isVarType(metaList[j]) + let isVarType = _isVarType(metaList[j].type) if (isVarType == ColumnsBlockType.SOLID) { colDataHead = colBlockHead + bitMapSize + metaList[j].length * i @@ -207,8 +228,8 @@ export function parseBlock(rows: number, blocks: WSFetchBlockResponse, taosResul } } -export function _isVarType(meta: ResponseMeta): Number { - switch (meta.type) { +export function _isVarType(metaType: number): Number { + switch (metaType) { case TDengineTypeCode['NCHAR']: { return ColumnsBlockType['NCHAR'] } @@ -233,14 +254,14 @@ export function _isVarType(meta: ResponseMeta): Number { } } export function readSolidDataToArray(buffer: ArrayBuffer, colBlockHead:number, - rows:number, meta: ResponseMeta, bitMapArr: ArrayBuffer): any[] { + rows:number, metaType: number, bitMapArr: ArrayBuffer): any[] { let dataBuffer = new DataView(buffer) let result:any[] = [] - switch (meta.type) { + switch (metaType) { case TDengineTypeCode['BOOL']: case TDengineTypeCode['TINYINT']: - case TDengineTypeCode['TINYINT UNSIGNED']:{ + case TDengineTypeCode['TINYINT UNSIGNED']:{ for (let i = 0; i < rows; i++, colBlockHead++) { if (isNull(bitMapArr, i)) { result.push(null); @@ -343,7 +364,7 @@ export function readSolidDataToArray(buffer: ArrayBuffer, colBlockHead:number, break; } default: { - throw new WebSocketQueryInterFaceError(ErrorCode.ERR_UNSUPPORTED_TDENGINE_TYPE, `unspported type ${meta.type} for column ${meta.name}`) + throw new WebSocketQueryInterFaceError(ErrorCode.ERR_UNSUPPORTED_TDENGINE_TYPE, `unspported type ${metaType}`) } } return result; @@ -417,6 +438,11 @@ export function readNchar(dataBuffer: ArrayBuffer, colDataHead: number, length: return data; } +export function getString(dataBuffer: ArrayBuffer, colDataHead: number, length: number): string { + let buff = dataBuffer.slice(colDataHead, colDataHead + length - 1) + let decoder = new TextDecoder('utf-8'); + return decoder.decode(new Uint8Array(buff)); +} function iteratorBuff(arr: ArrayBuffer) { let buf = Buffer.from(arr); diff --git a/nodejs/src/common/utils.ts b/nodejs/src/common/utils.ts index fd960d5..13d0a81 100644 --- a/nodejs/src/common/utils.ts +++ b/nodejs/src/common/utils.ts @@ -28,4 +28,35 @@ export function isEmpty(value: any): boolean { if (Array.isArray(value) && value.length === 0) return true; // if (typeof value === 'object' && Object.keys(value).length === 0) return true; return false; -} \ No newline at end of file +} + +export function getBinarySql(action:bigint, reqId:bigint, resultId:bigint, sql?:string): ArrayBuffer{ + // construct msg + let messageLen = 26; + if (sql) { + messageLen = 30 + sql.length; + } + + let sqlBuffer = new ArrayBuffer(messageLen); + let sqlView = new DataView(sqlBuffer); + sqlView.setBigUint64(0, reqId, true); + sqlView.setBigInt64(8, resultId, true); + sqlView.setBigInt64(16, action, true); + sqlView.setInt16(24, 1, true); + if (sql) { + sqlView.setInt32(26, sql.length, true); + const encoder = new TextEncoder(); + const buffer = encoder.encode(sql); + let offset = 30; + for (let i = 0; i < buffer.length; i++) { + sqlView.setUint8(offset + i, buffer[i]); + } + + } + + return sqlBuffer; +} + +export function zigzagDecode(n: number): number { + return (n >> 1) ^ (-(n & 1)) +} diff --git a/nodejs/src/sql/wsRows.ts b/nodejs/src/sql/wsRows.ts index ad43536..fd08a2e 100644 --- a/nodejs/src/sql/wsRows.ts +++ b/nodejs/src/sql/wsRows.ts @@ -1,14 +1,18 @@ -import { TDengineMeta, TaosResult } from '../common/taosResult'; +import { TDengineMeta, TaosResult, parseBlock } from '../common/taosResult'; import { TaosResultError } from '../common/wsError'; -import { WSQueryResponse } from '../client/wsResponse'; +import { WSFetchBlockResponse, WSQueryResponse } from '../client/wsResponse'; import { WsClient } from '../client/wsClient'; import logger from '../common/log'; +import { ReqId } from '../common/reqid'; +import { getBinarySql } from '../common/utils'; +import { BinaryQueryMessage, FetchRawBlockMessage } from '../common/constant'; export class WSRows { private _wsClient: WsClient; private readonly _wsQueryResponse: WSQueryResponse; private _taosResult: TaosResult; private _isClose : boolean; + constructor(wsInterface: WsClient, resp: WSQueryResponse) { this._wsClient = wsInterface; this._wsQueryResponse = resp; @@ -38,14 +42,24 @@ export class WSRows { private async getBlockData():Promise { try { - let wsFetchResponse = await this._wsClient.fetch(this._wsQueryResponse); - logger.debug("[wsQuery.execute.wsFetchResponse]==>\n", wsFetchResponse) - if (wsFetchResponse.completed) { - this.close(); - this._taosResult.setData(null); - } else { - this._taosResult.setRowsAndTime(wsFetchResponse.rows, wsFetchResponse.timing); - return await this._wsClient.fetchBlock(wsFetchResponse, this._taosResult); + if (this._wsQueryResponse.id) { + let bigintReqId = BigInt(ReqId.getReqID()); + let resp = await this._wsClient.sendBinaryMsg(bigintReqId, + "binary_query", getBinarySql(FetchRawBlockMessage, bigintReqId, BigInt(this._wsQueryResponse.id)), false, true); + + this._taosResult.addTotalTime(resp.totalTime) + let wsResponse = new WSFetchBlockResponse(resp.msg); + if (wsResponse.code != 0) { + logger.error("Executing SQL statement returns error: ", wsResponse.code, wsResponse.message); + throw new TaosResultError(wsResponse.code, wsResponse.message); + } + + if (wsResponse.finished == 1) { + this.close(); + this._taosResult.setData(null); + } else { + parseBlock(wsResponse, this._taosResult); + } } return this._taosResult; }catch(err:any){ diff --git a/nodejs/src/sql/wsSql.ts b/nodejs/src/sql/wsSql.ts index 3a8ff85..2ec6997 100644 --- a/nodejs/src/sql/wsSql.ts +++ b/nodejs/src/sql/wsSql.ts @@ -1,19 +1,20 @@ import { WSRows } from './wsRows' -import { TaosResult } from '../common/taosResult' +import { parseBlock, TaosResult } from '../common/taosResult' import { WsClient } from '../client/wsClient' import { ErrorCode, TDWebSocketClientError, TaosResultError, WebSocketInterfaceError } from '../common/wsError' import { WSConfig } from '../common/config' -import { getUrl } from '../common/utils' -import { WSQueryResponse } from '../client/wsResponse' +import { getBinarySql, getUrl } from '../common/utils' +import { WSFetchBlockResponse, WSQueryResponse } from '../client/wsResponse' import { Precision, SchemalessMessageInfo, SchemalessProto } from './wsProto' import { WsStmt } from '../stmt/wsStmt' import { ReqId } from '../common/reqid' -import { PrecisionLength } from '../common/constant' +import { BinaryQueryMessage, FetchRawBlockMessage, PrecisionLength } from '../common/constant' import logger from '../common/log' +import { log } from 'console' export class WsSql{ private wsConfig:WSConfig; - private _wsClient: WsClient; + private _wsClient: WsClient; constructor(wsConfig:WSConfig) { let url = getUrl(wsConfig); this._wsClient = new WsClient(url, wsConfig.getTimeOut()); @@ -103,29 +104,44 @@ export class WsSql{ throw(new TDWebSocketClientError(ErrorCode.ERR_CONNECTION_CLOSED, "stmt connect closed")); } - async exec(sql: string, reqId?: number, action:string = 'query'): Promise { + async exec(sql: string, reqId?: number, action:string = 'binary_query'): Promise { try { - let wsQueryResponse:WSQueryResponse = await this._wsClient.exec(this.getSql(sql, reqId, action)); + let bigintReqId = BigInt(ReqId.getReqID(reqId)); + let wsQueryResponse:WSQueryResponse = await this._wsClient.sendBinaryMsg(bigintReqId, + action, getBinarySql(BinaryQueryMessage, bigintReqId, BigInt(0), sql)); let taosResult = new TaosResult(wsQueryResponse); if (wsQueryResponse.is_update) { return taosResult; - } else { - try{ - while (true) { - let wsFetchResponse = await this._wsClient.fetch(wsQueryResponse) - if (wsFetchResponse.completed) { - break; - } else { - taosResult.setRowsAndTime(wsFetchResponse.rows, wsFetchResponse.timing); - taosResult = await this._wsClient.fetchBlock(wsFetchResponse, taosResult); + } else { + if (wsQueryResponse.id) { + try{ + while (true) { + let bigintReqId = BigInt(ReqId.getReqID(reqId)); + let resp = await this._wsClient.sendBinaryMsg(bigintReqId, + action, getBinarySql(FetchRawBlockMessage, bigintReqId, BigInt(wsQueryResponse.id)), false, true); + + taosResult.addTotalTime(resp.totalTime) + let wsResponse = new WSFetchBlockResponse(resp.msg); + if (wsResponse.code != 0) { + logger.error("Executing SQL statement returns error: ", wsResponse.code, wsResponse.message); + throw new TaosResultError(wsResponse.code, wsResponse.message); + } + console.log(wsResponse.finished) + if (wsResponse.finished == 1) { + break; + } + parseBlock(wsResponse, taosResult); } - } - return taosResult; - } catch(err: any){ - throw new TaosResultError(err.code, err.message); - } finally { - this._wsClient.freeResult(wsQueryResponse) + + return taosResult; + } catch(err: any){ + throw new TaosResultError(err.code, err.message); + } finally { + this._wsClient.freeResult(wsQueryResponse) + } } + throw new TaosResultError(ErrorCode.ERR_INVALID_FETCH_MESSAGE_DATA, "The result data of the query is incorrect"); + } } catch(err: any) { throw new TaosResultError(err.code, err.message); @@ -147,7 +163,9 @@ export class WsSql{ async query(sql: string, reqId?:number): Promise { try { - let wsQueryResponse:WSQueryResponse = await this._wsClient.exec(this.getSql(sql, reqId)); + let bigintReqId = BigInt(ReqId.getReqID(reqId)); + let wsQueryResponse:WSQueryResponse = await this._wsClient.sendBinaryMsg(bigintReqId, + 'binary_query', getBinarySql(BinaryQueryMessage, bigintReqId, BigInt(0), sql)); return new WSRows(this._wsClient, wsQueryResponse); } catch (err: any) { throw new TaosResultError(err.code, err.message); @@ -166,4 +184,5 @@ export class WsSql{ } return JSON.stringify(queryMsg) } + } \ No newline at end of file diff --git a/nodejs/src/tmq/constant.ts b/nodejs/src/tmq/constant.ts index f40b7e8..cdc9560 100644 --- a/nodejs/src/tmq/constant.ts +++ b/nodejs/src/tmq/constant.ts @@ -101,4 +101,29 @@ export class TMQMessageType { public static ResDataType: number = 1; } +export class TMQBlockInfo { + rawBlock?: ArrayBuffer; + precision?: number; + schema: Array; + tableName?: string; + constructor() { + this.schema = []; + } +} + +export class TMQRawDataSchema { + colType: number; + flag: number; + bytes: bigint; + colID: number + name: string; + constructor() { + this.bytes = BigInt(0); + this.colID = -1; + this.colType = -1; + this.flag = -1; + this.name = ""; + + } +} diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index e9410b9..fbac024 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -1,7 +1,11 @@ -import { WSQueryResponse } from "../client/wsResponse"; +import { off } from "process"; +import { WSFetchBlockResponse, WSQueryResponse } from "../client/wsResponse"; import { ColumnsBlockType, TDengineTypeLength } from "../common/constant"; -import { MessageResp, TaosResult, _isVarType, readBinary, readNchar, readSolidDataToArray, readVarchar } from "../common/taosResult"; -import { WebSocketInterfaceError, ErrorCode } from "../common/wsError"; +import { MessageResp, TaosResult, _isVarType, getString, readBinary, readNchar, readSolidDataToArray, readVarchar } from "../common/taosResult"; +import { WebSocketInterfaceError, ErrorCode, TDWebSocketClientError } from "../common/wsError"; +import { TMQBlockInfo, TMQRawDataSchema } from "./constant"; +import { zigzagDecode } from "../common/utils"; +import { parseBlock } from "@tdengine/websocket"; export class WsPollResponse { code: number; @@ -55,29 +59,227 @@ export class WsTmqQueryResponse extends WSQueryResponse{ } export class TaosTmqResult extends TaosResult { - topic: string; database: string; vgroup_id:number; - table_name:string; - constructor(resp: WsTmqQueryResponse, pollResp:WsPollResponse) { - super(resp); - this.table_name = resp.table_name; - // this._affectRows = resp.rows; - this.topic = pollResp.topic; + constructor(pollResp:WsPollResponse) { + super(); + this.setTopic(pollResp.topic); this.database = pollResp.database; this.vgroup_id = pollResp.vgroup_id; - } } -export class WSTmqFetchBlockResponse { - totalTime : number; - blockData : ArrayBuffer; - constructor(resp:MessageResp) { - this.totalTime = resp.totalTime - this.blockData = resp.msg +export class WSTmqFetchBlockInfo { + // totalTime: number; + // blockData: ArrayBuffer; + blockNum?: number; + withTableName?: boolean; + withSchema?: boolean; + blockInfos?: Array; + schema: Array; + tableName?: string; + taosResult: TaosResult; + constructor(blockData: ArrayBuffer, taosResult: TaosResult) { + // this.totalTime = resp.totalTime + // this.blockData = resp.msg + this.taosResult = taosResult; + this.schema = []; + let dataView = new DataView(blockData); + blockData = this.skipHead(dataView); + this.parseBlockInfos(blockData); + } + + private skipHead(dataView: DataView) { + let v = dataView.getUint8(0); + console.log("------>", v, dataView.buffer); + if (v >= 100) { + let skip = dataView.getUint32(1, true); + console.log("------>", skip); + console.log("------>", v, dataView.buffer.slice(skip + 5)); + return dataView.buffer.slice(skip + 5) + } + let skip1 = this.getTypeSkip(v); + console.log("------>", skip1); + v = dataView.getUint8(1 + skip1); + let skip2 = this.getTypeSkip(v); + console.log("------>", skip2); + return dataView.buffer.slice(skip1 + 2 + skip2) + } + + private getTypeSkip(v: number) { + switch (v) { + case 1: + return 8; + case 2: + case 3: + return 16; + default: + throw(new TDWebSocketClientError(ErrorCode.ERR_INVALID_FETCH_MESSAGE_DATA, `FetchBlockRawResp getTypeSkip error, type: ${v}`)); + } + } + + private parseBlockInfos(blockData: ArrayBuffer) { + let dataView = new DataView(blockData) + this.blockNum = dataView.getUint32(0, true); + this.withTableName = dataView.getUint8(4) == 1? true : false; + this.withSchema = dataView.getUint8(5) == 1? true : false; + console.log("------>", this.blockNum, this.withTableName, this.withSchema) + this.blockInfos = []; + let dataBuffer = dataView.buffer.slice(6) + for (let i = 0; i < this.blockNum; i++) { + let blockInfo = new TMQBlockInfo(); + let variableInfo = this.parseVariableByteInteger(dataBuffer); + console.log("---1-->", variableInfo) + dataView = new DataView(variableInfo[1].slice(17)); + blockInfo.precision = dataView.getUint8(0); + + let offset = variableInfo[0] - 17; + dataBuffer = this.parseSchemaInfo(dataView.buffer.slice(offset)); + console.log(this.schema) + + this.parseTmqBlock(dataView.buffer.slice(1)); + // dataBuffer = variableInfo[1].slice(variableInfo[0]); + + } + + } + + private parseSchemaInfo(dataBuffer: ArrayBuffer) { + if (this.withSchema) { + let variableInfo = this.parseVariableByteInteger(dataBuffer); + let cols = zigzagDecode(variableInfo[0]); + variableInfo = this.parseVariableByteInteger(variableInfo[1]); + let dataView = new DataView(variableInfo[1]) + let isSkip = this.schema.length > 0 + for (let index = 0; index < cols; index++) { + let schema = new TMQRawDataSchema(); + schema.colType = dataView.getInt8(0); + schema.flag = dataView.getInt8(1); + variableInfo = this.parseVariableByteInteger(dataView.buffer.slice(2)); + schema.bytes = BigInt(zigzagDecode(variableInfo[0])); + variableInfo = this.parseVariableByteInteger(variableInfo[1]); + schema.colID = zigzagDecode(variableInfo[0]); + variableInfo = this.parseVariableByteInteger(variableInfo[1]); + schema.name = getString(variableInfo[1], 0, variableInfo[0]); + if (!isSkip) { + this.taosResult.setMeta({ + name: schema.name, + type: schema.colType, + length: Number(schema.bytes) + } ); + this.schema.push(schema); + } + dataView = new DataView(variableInfo[1].slice(variableInfo[0])) + + } + + if(this.withTableName) { + variableInfo = this.parseVariableByteInteger(dataView.buffer); + this.tableName = readVarchar(variableInfo[1], 0, variableInfo[0]); + dataView = new DataView(variableInfo[1].slice(variableInfo[0])) + } + return dataView.buffer; + } + return dataBuffer; + } + + private parseVariableByteInteger(dataBuffer: ArrayBuffer): [number, ArrayBuffer] { + let value = 0; + let multiplier = 1; + let dataView = new DataView(dataBuffer); + let count = 0; + while (true) { + let encodedByte = dataView.getUint8(count); + value += (encodedByte&127) * multiplier; + if ((encodedByte & 128) == 0) { + break; + } + multiplier *= 128; + count++; + } + + return [value, dataView.buffer.slice(count+1)] + } + + private parseTmqBlock(dataBuffer: ArrayBuffer) { + let dataView = new DataView(dataBuffer) + let rows = dataView.getInt32(8, true); + console.log("rows------->", rows) + let taosdata = this.taosResult.getData() + let metaData = this.taosResult.getMeta() + if (metaData && rows && taosdata) { + let dataList:any[][] = new Array(rows); + //get bitmap length + let bitMapOffset:number = getBitmapLen(rows); + //skip data head + let bufferOffset = 28 + 5 * this.schema.length + + dataBuffer = dataBuffer.slice(bufferOffset); + let metaLens:number[]= [] + for (let i = 0; i< this.schema.length; i++) { + //get data len + metaLens.push(new DataView(dataBuffer, i*4, 4).getInt32(0, true)) + } + bufferOffset = this.schema.length * 4; + + for (let i = 0; i < this.schema.length; i++) { + let data:any[] = []; + //get type code + let isVarType = _isVarType(this.schema[i].colType) + //fixed length type + if (isVarType == ColumnsBlockType.SOLID) { + let bitMapArr = dataBuffer.slice(bufferOffset, bufferOffset + bitMapOffset); + bufferOffset += bitMapOffset; + //decode column data, data is array + data = readSolidDataToArray(dataBuffer, bufferOffset, rows, this.schema[i].colType, bitMapArr); + } else { + //Variable length type + let offset = bufferOffset; + let offsets:number[]= []; + for (let i = 0; i< rows; i++, offset += TDengineTypeLength['INT']) { + //get data length, -1 is null + offsets.push(new DataView(dataBuffer, offset, 4).getInt32(0, true)) + } + let start = offset + for (let i = 0; i< rows; i++) { + let value:any = '' + if (-1 == offsets[i]) { + value = null + }else{ + let header = start + offsets[i]; + let dataLength = new DataView(dataBuffer, header, 2).getInt16(0, true) & 0xFFFF; + if (isVarType == ColumnsBlockType.VARCHAR) { + //decode var char + value = readVarchar(dataBuffer, header + 2, dataLength) + } else if(isVarType == ColumnsBlockType.GEOMETRY || isVarType == ColumnsBlockType.VARBINARY) { + //decode binary + value = readBinary(dataBuffer, header + 2, dataLength) + } else { + //decode nchar + value = readNchar(dataBuffer, header + 2, dataLength) + } + + } + data.push(value); + } + bufferOffset += rows * 4 + } + bufferOffset += metaLens[i] + //column data to row data + for (let row = 0; row < data.length; row++) { + if (dataList[row] == null) { + dataList[row] = [] + } + dataList[row].push(data[row]) + } + } + console.log("data-->", dataList); + taosdata.push(...dataList); + + } } } + export class AssignmentResp{ req_id: number; code: number; @@ -170,86 +372,6 @@ export class TopicPartition { } } -export function parseTmqBlock(rows:number, resp: WSTmqFetchBlockResponse, taosResult: TaosResult): TaosResult { - let dataList:any[][] = new Array(rows); - if (!resp || !taosResult) { - return taosResult; - } - - let metaList = taosResult.getTaosMeta() - let taosdata = taosResult.getData() - if (metaList && rows && taosdata) { - //get bitmap length - let bitMapOffset:number = getBitmapLen(rows); - //skip data head - let bufferOffset = 24 + 28 + 5 * metaList.length - - let dataBuffer:ArrayBuffer = resp.blockData.slice(bufferOffset); - let metaLens:number[]= [] - for (let i = 0; i< metaList.length; i++) { - //get data len - metaLens.push(new DataView(dataBuffer, i*4, 4).getInt32(0, true)) - } - bufferOffset = metaList.length * 4; - - for (let i = 0; i < metaList.length; i++) { - let data:any[] = []; - //get type code - let isVarType = _isVarType(metaList[i]) - //fixed length type - if (isVarType == ColumnsBlockType.SOLID) { - let bitMapArr = dataBuffer.slice(bufferOffset, bufferOffset + bitMapOffset); - bufferOffset += bitMapOffset; - //decode column data, data is array - data = readSolidDataToArray(dataBuffer, bufferOffset, rows, metaList[i], bitMapArr); - } else { - //Variable length type - let offset = bufferOffset; - let offsets:number[]= []; - for (let i = 0; i< rows; i++, offset += TDengineTypeLength['INT']) { - //get data length, -1 is null - offsets.push(new DataView(dataBuffer, offset, 4).getInt32(0, true)) - } - let start = offset - for (let i = 0; i< rows; i++) { - let value:any = '' - if (-1 == offsets[i]) { - value = null - }else{ - let header = start + offsets[i]; - let dataLength = new DataView(dataBuffer, header, 2).getInt16(0, true) & 0xFFFF; - if (isVarType == ColumnsBlockType.VARCHAR) { - //decode var char - value = readVarchar(dataBuffer, header + 2, dataLength) - } else if(isVarType == ColumnsBlockType.GEOMETRY || isVarType == ColumnsBlockType.VARBINARY) { - //decode binary - value = readBinary(dataBuffer, header + 2, dataLength) - } else { - //decode nchar - value = readNchar(dataBuffer, header + 2, dataLength) - } - - } - data.push(value); - } - bufferOffset += rows * 4 - } - bufferOffset += metaLens[i] - //column data to row data - for (let row = 0; row < data.length; row++) { - if (dataList[row] == null) { - dataList[row] = [] - } - dataList[row].push(data[row]) - } - } - taosdata.push(...dataList); - } - - - return taosResult; -} - function getBitmapLen(n:number) { return (n + 0x7) >> 3; } diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index cbc4670..f19bce2 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -2,10 +2,11 @@ import { TmqConfig } from './config'; import { TMQConstants, TMQMessageType } from './constant'; import { WsClient } from '../client/wsClient'; import { TaosResult } from '../common/taosResult'; -import { ErrorCode, TaosResultError, WebSocketInterfaceError } from '../common/wsError'; -import { AssignmentResp, CommittedResp, PartitionsResp, SubscriptionResp, TaosTmqResult, TopicPartition, WSTmqFetchBlockResponse, WsPollResponse, WsTmqQueryResponse, parseTmqBlock} from './tmqResponse'; +import { ErrorCode, TaosResultError, WebSocketInterfaceError, WebSocketQueryError } from '../common/wsError'; +import { AssignmentResp, CommittedResp, PartitionsResp, SubscriptionResp, TaosTmqResult, TopicPartition, WSTmqFetchBlockInfo, WsPollResponse, WsTmqQueryResponse} from './tmqResponse'; import { ReqId } from '../common/reqid'; import logger from '../common/log'; +import { WSFetchBlockResponse } from '../client/wsResponse'; export class WsConsumer { private _wsClient: WsClient; @@ -68,7 +69,7 @@ export class WsConsumer { return await this._wsClient.exec(JSON.stringify(queryMsg)); } - async poll(timeoutMs: number, reqId?:number):Promise> { + async poll(timeoutMs: number, reqId?:number):Promise { if (this._wsConfig.auto_commit) { if (this._commitTime) { let currTime = new Date().getTime(); @@ -252,22 +253,32 @@ export class WsConsumer { return new WsTmqQueryResponse(result); } - private async fetchBlockData(fetchResponse: WsTmqQueryResponse, taosResult: TaosResult):Promise { + private async fetchBlockData(pollResp: WsPollResponse, taosResult: TaosTmqResult):Promise { let fetchMsg = { - action: 'fetch_block', + action: 'fetch_raw_data', args: { - req_id: fetchResponse.req_id, - message_id: fetchResponse.message_id, + req_id: ReqId.getReqID(), + message_id: pollResp.message_id, }, }; let jsonStr = JSON.stringify(fetchMsg); logger.debug('[wsQueryInterface.fetch.fetchMsg]===>' + jsonStr); let result = await this._wsClient.sendMsg(jsonStr) - parseTmqBlock(fetchResponse.rows, new WSTmqFetchBlockResponse(result), taosResult) - return taosResult; + let wsResponse = new WSFetchBlockResponse(result.msg) + + if (wsResponse && wsResponse.data) { + console.log(wsResponse) + let wsTmqResponse = new WSTmqFetchBlockInfo(wsResponse.data, taosResult); + console.log(wsTmqResponse); + } + + + throw new WebSocketQueryError(ErrorCode.ERR_UNSUPPORTED_TDENGINE_TYPE, `SSSSSSSSSSSSSSS`); + // parseTmqBlock(fetchResponse.rows, new WSTmqFetchBlockResponse(result), taosResult) + // return taosResult; } - private async pollData(timeoutMs: number, reqId?:number): Promise> { + private async pollData(timeoutMs: number, reqId?:number): Promise { let queryMsg = { action: TMQMessageType.Poll, args: { @@ -276,27 +287,29 @@ export class WsConsumer { }, }; - var taosResults: Map = new Map(); + var taosResults: TaosResult = new TaosResult(); let resp = await this._wsClient.exec(JSON.stringify(queryMsg), false); let pollResp = new WsPollResponse(resp) if (!pollResp.have_message || pollResp.message_type != TMQMessageType.ResDataType) { return taosResults; - } - while (true) { - let fetchResp = await this.fetch(pollResp) - if (fetchResp.completed || fetchResp.rows == 0) { - break; - } - let taosResult = taosResults.get(pollResp.topic + pollResp.vgroup_id) - if (taosResult == null) { - taosResult = new TaosTmqResult(fetchResp, pollResp) - taosResults.set(pollResp.topic + pollResp.vgroup_id, taosResult) - } else { - taosResult.setRowsAndTime(fetchResp.rows); - } - await this.fetchBlockData(fetchResp, taosResult) + } + let taosResult = new TaosTmqResult(pollResp) + await this.fetchBlockData(pollResp, taosResult) + // while (true) { + // let fetchResp = await this.fetch(pollResp) + // if (fetchResp.completed || fetchResp.rows == 0) { + // break; + // } + // let taosResult = taosResults.get(pollResp.topic + pollResp.vgroup_id) + // if (taosResult == null) { + // taosResult = new TaosTmqResult(fetchResp, pollResp) + // taosResults.set(pollResp.topic + pollResp.vgroup_id, taosResult) + // } else { + // taosResult.setRowsAndTime(fetchResp.rows); + // } - } + + // } return taosResults; } diff --git a/nodejs/test/bulkPulling/sql.test.ts b/nodejs/test/bulkPulling/sql.test.ts index f76130b..6029c2d 100644 --- a/nodejs/test/bulkPulling/sql.test.ts +++ b/nodejs/test/bulkPulling/sql.test.ts @@ -2,7 +2,7 @@ import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool"; import { WSConfig } from "../../src/common/config"; import { WsSql } from "../../src/sql/wsSql"; -let dns = 'ws://localhost:6041' +let dns = 'ws://192.168.1.98:6041' beforeAll(async () => { let conf :WSConfig = new WSConfig(dns) conf.setUser('root') diff --git a/nodejs/test/bulkPulling/tmq.test.ts b/nodejs/test/bulkPulling/tmq.test.ts index 578848c..b647c9a 100644 --- a/nodejs/test/bulkPulling/tmq.test.ts +++ b/nodejs/test/bulkPulling/tmq.test.ts @@ -118,13 +118,15 @@ describe('TDWebSocket.Tmq()', () => { let res = await consumer.poll(500); let currTime = new Date().getTime(); useTime.push(Math.abs(currTime - startTime)); - for (let [key, value] of res) { - let data = value.getData() - if (data) { - counts[0] += data.length; - } + console.log(res.getTopic(), res.getMeta()); + let data = res.getData(); + if (data) { + for (let record of data ) { + console.log(record) + } } - if (res.size == 0) { + + if (data == null || data.length == 0) { break; } @@ -138,13 +140,15 @@ describe('TDWebSocket.Tmq()', () => { let res = await consumer.poll(500); let currTime = new Date().getTime(); useTime.push(Math.abs(currTime - startTime)); - for (let [key, value] of res) { - let data = value.getData() - if (data) { - counts[1] += data.length; - } + console.log(res.getTopic(), res.getMeta()); + let data = res.getData(); + if (data) { + for (let record of data ) { + console.log(record) + } } - if (res.size == 0) { + + if (data == null || data.length == 0) { break; } // await Sleep(100) diff --git a/nodejs/test/bulkPulling/wsConnectPool.test.ts b/nodejs/test/bulkPulling/wsConnectPool.test.ts index 4bc5330..8e54cee 100644 --- a/nodejs/test/bulkPulling/wsConnectPool.test.ts +++ b/nodejs/test/bulkPulling/wsConnectPool.test.ts @@ -84,8 +84,12 @@ async function tmqConnect() { await consumer.subscribe(topics); let res = await consumer.poll(500); - for (let [key, value] of res) { - console.log(key, value); + console.log(res.getTopic(), res.getMeta()); + let data = res.getData(); + if (data) { + for (let record of data ) { + console.log(record) + } } await consumer.commit(); From d6eccd1310b238bb2144527a7915223be71394c7 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 23 Sep 2024 19:19:28 +0800 Subject: [PATCH 02/20] fetchBlockData parse --- nodejs/example/basicTmq.ts | 6 ++-- nodejs/src/client/wsConnector.ts | 2 +- nodejs/src/client/wsResponse.ts | 10 ++++-- nodejs/src/common/log.ts | 6 ++-- nodejs/src/common/taosResult.ts | 4 +++ nodejs/src/tmq/tmqResponse.ts | 61 +++++++++++++++----------------- nodejs/src/tmq/wsTmq.ts | 42 +++++++--------------- 7 files changed, 59 insertions(+), 72 deletions(-) diff --git a/nodejs/example/basicTmq.ts b/nodejs/example/basicTmq.ts index 356d821..1c38759 100644 --- a/nodejs/example/basicTmq.ts +++ b/nodejs/example/basicTmq.ts @@ -7,13 +7,13 @@ const db = 'power18' const topics:string[] = ['topic_ws_map'] let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` let configMap = new Map([ - [TMQConstants.GROUP_ID, "gId"], + [TMQConstants.GROUP_ID, "gId_7"], [TMQConstants.CONNECT_USER, "root"], [TMQConstants.CONNECT_PASS, "taosdata"], [TMQConstants.AUTO_OFFSET_RESET, "earliest"], [TMQConstants.CLIENT_ID, 'test_tmq_client'], [TMQConstants.WS_URL, 'ws://192.168.1.98:6041'], - [TMQConstants.ENABLE_AUTO_COMMIT, 'true'], + [TMQConstants.ENABLE_AUTO_COMMIT, 'false'], [TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] ]); let dsn = 'ws://root:taosdata@192.168.1.98:6041'; @@ -53,7 +53,7 @@ async function Prepare() { } } - await consumer.commit(); + // await consumer.commit(); } let assignment = await consumer.assignment() diff --git a/nodejs/src/client/wsConnector.ts b/nodejs/src/client/wsConnector.ts index 6e7d2c7..090788f 100644 --- a/nodejs/src/client/wsConnector.ts +++ b/nodejs/src/client/wsConnector.ts @@ -54,7 +54,7 @@ export class WebSocketConnector { private _onmessage(event: any) { let data = event.data; - logger.debug("wsClient._onMessage()===="+ (Object.prototype.toString.call(data))) + logger.debug("wsClient._onMessage()====" + (Object.prototype.toString.call(data))) if (Object.prototype.toString.call(data) === '[object ArrayBuffer]') { let id = new DataView(data, 26, 8).getBigUint64(0, true); WsEventCallback.instance().handleEventCallback({id:id, action:'', req_id:BigInt(0)}, diff --git a/nodejs/src/client/wsResponse.ts b/nodejs/src/client/wsResponse.ts index 76f3d2c..f4e96bd 100644 --- a/nodejs/src/client/wsResponse.ts +++ b/nodejs/src/client/wsResponse.ts @@ -92,16 +92,17 @@ export class WSFetchBlockResponse { timing: bigint reqId: bigint code: number + blockLen: number message: string | undefined resultId: bigint | undefined finished: number | undefined - blockLen: number | undefined metaType: number | undefined constructor(msg: ArrayBuffer) { this.action = new DataView(msg, 8, 8).getBigUint64(0, true) this.timing = new DataView(msg, 18, 8).getBigUint64(0, true) this.reqId = new DataView(msg, 26, 8).getBigUint64(0, true) this.code = new DataView(msg, 34, 4).getUint32(0, true) + this.blockLen = 0; if (this.code != 0) { let len = new DataView(msg, 38, 4).getUint32(0, true) this.message = readVarchar(msg, 42, len); @@ -120,8 +121,11 @@ export class WSFetchBlockResponse { offset += 1; } - this.blockLen = new DataView(msg, offset, 4).getUint32(0, true) - this.data = msg.slice(offset + 4); + this.blockLen = new DataView(msg, offset, 4).getUint32(0, true) + if (this.blockLen > 0) { + this.data = msg.slice(offset + 4); + } + } } diff --git a/nodejs/src/common/log.ts b/nodejs/src/common/log.ts index 34ab833..eb10e7c 100644 --- a/nodejs/src/common/log.ts +++ b/nodejs/src/common/log.ts @@ -16,7 +16,7 @@ const transport = new DailyRotateFile({ handleExceptions: true, // Whether to handle exceptions json: false, // Whether to output logs in JSON format format: winston.format.combine( - winston.format.timestamp(), + winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), customFormat ), level: 'info', // set log level @@ -29,8 +29,8 @@ const logger = winston.createLogger({ // 设置 BigInt 类型的序列化处理 transport.format = winston.format((info) => { - if (info && info.message && typeof info.message === 'object' && typeof info.message.toJSON === 'function') { - info.message = info.message.toJSON(); + if (info && info.message && typeof info.message === 'object' && typeof info.message.toJSON === 'function') { + info.message = info.message.toJSON(); } return info; })(); diff --git a/nodejs/src/common/taosResult.ts b/nodejs/src/common/taosResult.ts index a432371..d27ad34 100644 --- a/nodejs/src/common/taosResult.ts +++ b/nodejs/src/common/taosResult.ts @@ -67,6 +67,10 @@ export class TaosResult { this._totalTime = queryResponse.totalTime } + public setPrecision(precision: number) { + this._precision = precision; + } + public setRowsAndTime(rows: number, timing?:bigint) { if (this._affectRows) { this._affectRows += rows; diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index fbac024..17c8a3c 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -1,11 +1,10 @@ -import { off } from "process"; -import { WSFetchBlockResponse, WSQueryResponse } from "../client/wsResponse"; +import { WSQueryResponse } from "../client/wsResponse"; import { ColumnsBlockType, TDengineTypeLength } from "../common/constant"; import { MessageResp, TaosResult, _isVarType, getString, readBinary, readNchar, readSolidDataToArray, readVarchar } from "../common/taosResult"; import { WebSocketInterfaceError, ErrorCode, TDWebSocketClientError } from "../common/wsError"; import { TMQBlockInfo, TMQRawDataSchema } from "./constant"; import { zigzagDecode } from "../common/utils"; -import { parseBlock } from "@tdengine/websocket"; +import logger from "../common/log"; export class WsPollResponse { code: number; @@ -70,15 +69,13 @@ export class TaosTmqResult extends TaosResult { } export class WSTmqFetchBlockInfo { - // totalTime: number; - // blockData: ArrayBuffer; - blockNum?: number; withTableName?: boolean; withSchema?: boolean; blockInfos?: Array; schema: Array; tableName?: string; taosResult: TaosResult; + rows: number; constructor(blockData: ArrayBuffer, taosResult: TaosResult) { // this.totalTime = resp.totalTime // this.blockData = resp.msg @@ -86,23 +83,20 @@ export class WSTmqFetchBlockInfo { this.schema = []; let dataView = new DataView(blockData); blockData = this.skipHead(dataView); - this.parseBlockInfos(blockData); + this.rows = this.parseBlockInfos(blockData); + } + public getRows(): number{ + return this.rows; } - private skipHead(dataView: DataView) { let v = dataView.getUint8(0); - console.log("------>", v, dataView.buffer); if (v >= 100) { let skip = dataView.getUint32(1, true); - console.log("------>", skip); - console.log("------>", v, dataView.buffer.slice(skip + 5)); return dataView.buffer.slice(skip + 5) } let skip1 = this.getTypeSkip(v); - console.log("------>", skip1); v = dataView.getUint8(1 + skip1); let skip2 = this.getTypeSkip(v); - console.log("------>", skip2); return dataView.buffer.slice(skip1 + 2 + skip2) } @@ -118,29 +112,26 @@ export class WSTmqFetchBlockInfo { } } - private parseBlockInfos(blockData: ArrayBuffer) { + private parseBlockInfos(blockData: ArrayBuffer): number { let dataView = new DataView(blockData) - this.blockNum = dataView.getUint32(0, true); + let blockNum = dataView.getUint32(0, true); + if (blockNum == 0) { + return 0; + } this.withTableName = dataView.getUint8(4) == 1? true : false; this.withSchema = dataView.getUint8(5) == 1? true : false; - console.log("------>", this.blockNum, this.withTableName, this.withSchema) - this.blockInfos = []; + logger.debug("parseBlockInfos blockNum="+ blockNum + ", withTableName=" + this.withTableName + ", withSchema=" + this.withSchema) let dataBuffer = dataView.buffer.slice(6) - for (let i = 0; i < this.blockNum; i++) { - let blockInfo = new TMQBlockInfo(); + let rows = 0; + for (let i = 0; i < blockNum; i++) { let variableInfo = this.parseVariableByteInteger(dataBuffer); - console.log("---1-->", variableInfo) dataView = new DataView(variableInfo[1].slice(17)); - blockInfo.precision = dataView.getUint8(0); - + this.taosResult.setPrecision(dataView.getUint8(0)); let offset = variableInfo[0] - 17; dataBuffer = this.parseSchemaInfo(dataView.buffer.slice(offset)); - console.log(this.schema) - - this.parseTmqBlock(dataView.buffer.slice(1)); - // dataBuffer = variableInfo[1].slice(variableInfo[0]); - + rows += this.parseTmqBlock(dataView.buffer.slice(1)); } + return rows; } @@ -201,13 +192,16 @@ export class WSTmqFetchBlockInfo { return [value, dataView.buffer.slice(count+1)] } - private parseTmqBlock(dataBuffer: ArrayBuffer) { + private parseTmqBlock(dataBuffer: ArrayBuffer): number { let dataView = new DataView(dataBuffer) let rows = dataView.getInt32(8, true); - console.log("rows------->", rows) - let taosdata = this.taosResult.getData() + if (rows == 0) { + return rows; + } + + let taosData = this.taosResult.getData() let metaData = this.taosResult.getMeta() - if (metaData && rows && taosdata) { + if (metaData && rows && taosData) { let dataList:any[][] = new Array(rows); //get bitmap length let bitMapOffset:number = getBitmapLen(rows); @@ -273,11 +267,12 @@ export class WSTmqFetchBlockInfo { dataList[row].push(data[row]) } } - console.log("data-->", dataList); - taosdata.push(...dataList); + taosData.push(...dataList); } + return rows; } + } export class AssignmentResp{ diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index f19bce2..8ea5d40 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -5,7 +5,7 @@ import { TaosResult } from '../common/taosResult'; import { ErrorCode, TaosResultError, WebSocketInterfaceError, WebSocketQueryError } from '../common/wsError'; import { AssignmentResp, CommittedResp, PartitionsResp, SubscriptionResp, TaosTmqResult, TopicPartition, WSTmqFetchBlockInfo, WsPollResponse, WsTmqQueryResponse} from './tmqResponse'; import { ReqId } from '../common/reqid'; -import logger from '../common/log'; +import logger from "../common/log"; import { WSFetchBlockResponse } from '../client/wsResponse'; export class WsConsumer { @@ -253,7 +253,7 @@ export class WsConsumer { return new WsTmqQueryResponse(result); } - private async fetchBlockData(pollResp: WsPollResponse, taosResult: TaosTmqResult):Promise { + private async fetchBlockData(pollResp: WsPollResponse, taosResult: TaosTmqResult):Promise { let fetchMsg = { action: 'fetch_raw_data', args: { @@ -265,17 +265,14 @@ export class WsConsumer { logger.debug('[wsQueryInterface.fetch.fetchMsg]===>' + jsonStr); let result = await this._wsClient.sendMsg(jsonStr) let wsResponse = new WSFetchBlockResponse(result.msg) - - if (wsResponse && wsResponse.data) { - console.log(wsResponse) + if (wsResponse && wsResponse.data && wsResponse.blockLen > 0) { let wsTmqResponse = new WSTmqFetchBlockInfo(wsResponse.data, taosResult); - console.log(wsTmqResponse); + logger.debug('[WSTmqFetchBlockInfo.fetchBlockData]===>' + wsTmqResponse); + if (wsTmqResponse.rows > 0) { + return true; + } } - - - throw new WebSocketQueryError(ErrorCode.ERR_UNSUPPORTED_TDENGINE_TYPE, `SSSSSSSSSSSSSSS`); - // parseTmqBlock(fetchResponse.rows, new WSTmqFetchBlockResponse(result), taosResult) - // return taosResult; + return false; } private async pollData(timeoutMs: number, reqId?:number): Promise { @@ -294,24 +291,11 @@ export class WsConsumer { return taosResults; } let taosResult = new TaosTmqResult(pollResp) - await this.fetchBlockData(pollResp, taosResult) - // while (true) { - // let fetchResp = await this.fetch(pollResp) - // if (fetchResp.completed || fetchResp.rows == 0) { - // break; - // } - // let taosResult = taosResults.get(pollResp.topic + pollResp.vgroup_id) - // if (taosResult == null) { - // taosResult = new TaosTmqResult(fetchResp, pollResp) - // taosResults.set(pollResp.topic + pollResp.vgroup_id, taosResult) - // } else { - // taosResult.setRowsAndTime(fetchResp.rows); - // } - - - // } - - return taosResults; + let finish = false; + while (!finish) { + finish = await this.fetchBlockData(pollResp, taosResult) + } + return taosResult; } private async sendAssignmentReq(topic:string):Promise> { From c0e0081f1929fba59d048f1cd98fbedd0b6c4050 Mon Sep 17 00:00:00 2001 From: menshibin Date: Fri, 27 Sep 2024 11:29:51 +0800 Subject: [PATCH 03/20] modify poll interface to map --- nodejs/example/basicTmq.ts | 39 +++++++++---------- nodejs/src/tmq/wsTmq.ts | 18 +++++---- nodejs/test/bulkPulling/tmq.test.ts | 36 +++++++++-------- nodejs/test/bulkPulling/wsConnectPool.test.ts | 13 +++++-- 4 files changed, 57 insertions(+), 49 deletions(-) diff --git a/nodejs/example/basicTmq.ts b/nodejs/example/basicTmq.ts index 1c38759..63ab597 100644 --- a/nodejs/example/basicTmq.ts +++ b/nodejs/example/basicTmq.ts @@ -4,14 +4,14 @@ import { destroy, setLogLevel, sqlConnect, tmqConnect } from "../src"; const stable = 'meters'; const db = 'power18' -const topics:string[] = ['topic_ws_map'] +const topics:string[] = ['topic_ws_map11111'] let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` let configMap = new Map([ - [TMQConstants.GROUP_ID, "gId_7"], + [TMQConstants.GROUP_ID, "gId_11"], [TMQConstants.CONNECT_USER, "root"], [TMQConstants.CONNECT_PASS, "taosdata"], [TMQConstants.AUTO_OFFSET_RESET, "earliest"], - [TMQConstants.CLIENT_ID, 'test_tmq_client'], + [TMQConstants.CLIENT_ID, 'test_tmq_client11'], [TMQConstants.WS_URL, 'ws://192.168.1.98:6041'], [TMQConstants.ENABLE_AUTO_COMMIT, 'false'], [TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] @@ -29,10 +29,10 @@ async function Prepare() { await ws.exec(useDB); await ws.exec(createStable); await ws.exec(createTopic); - for (let i = 0; i < 10; i++) { - await ws.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10+i}, ${200+i}, ${0.32 + i})`) + for (let i = 0; i < 1000; i++) { + await ws.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW + ${i}a, ${10+i}, ${200+i}, ${0.32 + i})`) } - ws.close() + await ws.close() } @@ -44,25 +44,24 @@ async function Prepare() { consumer = await tmqConnect(configMap); await consumer.subscribe(topics); for (let i = 0; i < 5; i++) { - let res = await consumer.poll(500); - console.log(res.getTopic(), res.getMeta()); - let data = res.getData(); - if (data) { - for (let record of data ) { - console.log(record) + let res = await consumer.poll(5); + for (let [key, value] of res) { + console.log(key, value.getMeta()); + let data = value.getData(); + if (data) { + console.log(data.length) } } - // await consumer.commit(); } - let assignment = await consumer.assignment() - console.log(assignment) - await consumer.seekToBeginning(assignment) - assignment = await consumer.assignment() - for(let i in assignment) { - console.log("seek after:", assignment[i]) - } + // let assignment = await consumer.assignment() + // console.log(assignment) + // await consumer.seekToBeginning(assignment) + // assignment = await consumer.assignment() + // for(let i in assignment) { + // console.log("seek after:", assignment[i]) + // } await consumer.unsubscribe() } catch (e) { console.error(e); diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index 8ea5d40..9ab67cd 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -69,7 +69,7 @@ export class WsConsumer { return await this._wsClient.exec(JSON.stringify(queryMsg)); } - async poll(timeoutMs: number, reqId?:number):Promise { + async poll(timeoutMs: number, reqId?:number):Promise> { if (this._wsConfig.auto_commit) { if (this._commitTime) { let currTime = new Date().getTime(); @@ -85,8 +85,6 @@ export class WsConsumer { return await this.pollData(timeoutMs,reqId) } - - async subscription(reqId?:number):Promise> { let queryMsg = { action: TMQMessageType.ListTopics, @@ -267,7 +265,7 @@ export class WsConsumer { let wsResponse = new WSFetchBlockResponse(result.msg) if (wsResponse && wsResponse.data && wsResponse.blockLen > 0) { let wsTmqResponse = new WSTmqFetchBlockInfo(wsResponse.data, taosResult); - logger.debug('[WSTmqFetchBlockInfo.fetchBlockData]===>' + wsTmqResponse); + logger.debug('[WSTmqFetchBlockInfo.fetchBlockData]===>' + wsTmqResponse.taosResult); if (wsTmqResponse.rows > 0) { return true; } @@ -275,7 +273,7 @@ export class WsConsumer { return false; } - private async pollData(timeoutMs: number, reqId?:number): Promise { + private async pollData(timeoutMs: number, reqId?:number): Promise> { let queryMsg = { action: TMQMessageType.Poll, args: { @@ -284,18 +282,22 @@ export class WsConsumer { }, }; - var taosResults: TaosResult = new TaosResult(); + let resp = await this._wsClient.exec(JSON.stringify(queryMsg), false); let pollResp = new WsPollResponse(resp) + let taosResult = new TaosTmqResult(pollResp) + var taosResults: Map = new Map(); + taosResults.set(pollResp.topic, taosResult); if (!pollResp.have_message || pollResp.message_type != TMQMessageType.ResDataType) { return taosResults; } - let taosResult = new TaosTmqResult(pollResp) + let finish = false; while (!finish) { finish = await this.fetchBlockData(pollResp, taosResult) } - return taosResult; + + return taosResults; } private async sendAssignmentReq(topic:string):Promise> { diff --git a/nodejs/test/bulkPulling/tmq.test.ts b/nodejs/test/bulkPulling/tmq.test.ts index b647c9a..b37f773 100644 --- a/nodejs/test/bulkPulling/tmq.test.ts +++ b/nodejs/test/bulkPulling/tmq.test.ts @@ -118,18 +118,19 @@ describe('TDWebSocket.Tmq()', () => { let res = await consumer.poll(500); let currTime = new Date().getTime(); useTime.push(Math.abs(currTime - startTime)); - console.log(res.getTopic(), res.getMeta()); - let data = res.getData(); - if (data) { + + for (let [key, value] of res) { + console.log(key, value.getMeta()); + let data = value.getData(); + if (data == null || data.length == 0) { + break; + } + for (let record of data ) { console.log(record) - } - } - - if (data == null || data.length == 0) { - break; + } + } - // await Sleep(100) } @@ -140,16 +141,17 @@ describe('TDWebSocket.Tmq()', () => { let res = await consumer.poll(500); let currTime = new Date().getTime(); useTime.push(Math.abs(currTime - startTime)); - console.log(res.getTopic(), res.getMeta()); - let data = res.getData(); - if (data) { + for (let [key, value] of res) { + console.log(key, value.getMeta()); + let data = value.getData(); + if (data == null || data.length == 0) { + break; + } + for (let record of data ) { console.log(record) - } - } - - if (data == null || data.length == 0) { - break; + } + } // await Sleep(100) } diff --git a/nodejs/test/bulkPulling/wsConnectPool.test.ts b/nodejs/test/bulkPulling/wsConnectPool.test.ts index 8e54cee..e4baaa1 100644 --- a/nodejs/test/bulkPulling/wsConnectPool.test.ts +++ b/nodejs/test/bulkPulling/wsConnectPool.test.ts @@ -84,12 +84,17 @@ async function tmqConnect() { await consumer.subscribe(topics); let res = await consumer.poll(500); - console.log(res.getTopic(), res.getMeta()); - let data = res.getData(); - if (data) { + for (let [key, value] of res) { + console.log(key, value.getMeta()); + let data = value.getData(); + if (data == null || data.length == 0) { + break; + } + for (let record of data ) { console.log(record) - } + } + } await consumer.commit(); From b4d31a98133e54be5010a26b8276b6b7b29c0ee6 Mon Sep 17 00:00:00 2001 From: menshibin Date: Fri, 11 Oct 2024 13:25:38 +0800 Subject: [PATCH 04/20] Optimize analysis --- nodejs/example/basicBatchTmq.ts | 136 ++++++++++++++++++++++++++++++++ nodejs/src/sql/wsSql.ts | 2 +- nodejs/src/tmq/tmqResponse.ts | 85 ++++++++++++-------- nodejs/src/tmq/wsTmq.ts | 7 ++ 4 files changed, 197 insertions(+), 33 deletions(-) create mode 100644 nodejs/example/basicBatchTmq.ts diff --git a/nodejs/example/basicBatchTmq.ts b/nodejs/example/basicBatchTmq.ts new file mode 100644 index 0000000..d1954e6 --- /dev/null +++ b/nodejs/example/basicBatchTmq.ts @@ -0,0 +1,136 @@ +import { WSConfig } from "../src/common/config"; +import { TMQConstants } from "../src/tmq/constant"; +import { destroy, setLogLevel, sqlConnect, tmqConnect } from "../src"; +import { WsConsumer } from "../src/tmq/wsTmq"; + +const db = 'power'; +const stable = 'meters'; +const url = 'ws://192.168.1.98:6041'; +const topic = 'topic_meters' +const topics = [topic]; +const groupId = "group-28"; +const clientId = "client-28"; + +async function createConsumer() { + let configMap = new Map([ + [TMQConstants.GROUP_ID, groupId], + [TMQConstants.CLIENT_ID, clientId], + [TMQConstants.CONNECT_USER, "root"], + [TMQConstants.CONNECT_PASS, "taosdata"], + [TMQConstants.AUTO_OFFSET_RESET, "earliest"], + [TMQConstants.WS_URL, url], + [TMQConstants.ENABLE_AUTO_COMMIT, 'true'], + [TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] + ]); + try { + let conn = await tmqConnect(configMap); + console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`) + return conn; + } catch (err: any) { + console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); + throw err; + } + +} +// ANCHOR_END: create_consumer + +async function prepare() { + let conf = new WSConfig('ws://192.168.1.98:6041'); + conf.setUser('root'); + conf.setPwd('taosdata'); + conf.setDb(db); + const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`; + const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`; + + let wsSql = await sqlConnect(conf); + await wsSql.exec(createDB); + await wsSql.exec(createStable); + + let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`; + await wsSql.exec(createTopic); + wsSql.close(); +} + +async function insert() { + let conf = new WSConfig('ws://192.168.1.98:6041'); + conf.setUser('root'); + conf.setPwd('taosdata'); + conf.setDb(db); + let wsSql = await sqlConnect(conf); + for (let i = 0; i < 10000; i++) { + await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW + ${i}a, ${10 + i}, ${200 + i}, ${0.32 + i})`); + } + await wsSql.close(); + console.log("insert fininsh!!!!!") +} + +async function subscribe(consumer: WsConsumer) { + // ANCHOR: commit + try { + let count = 0; + await consumer.subscribe(topics); + let bFinish = false; + let bBegin = false; + const startTime = new Date().getTime(); + while (!bFinish) { + let res = await consumer.poll(100); + for (let [key, value] of res) { + // Add your data processing logic here + let data = value.getData(); + if (data) { + if (data.length == 0 && bBegin) { + bFinish = true; + break; + } else if(data.length > 0){ + bBegin = true; + } + count += data.length + console.log("poll end ------>", count); + } + + } + // await consumer.commit(); + } + const endTime = new Date().getTime(); + console.log(count, endTime - startTime); + } catch (err: any) { + console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); + throw err; + } + // ANCHOR_END: commit +} + +async function consumer() { + // ANCHOR: unsubscribe + let consumer = null; + try { + // await prepare(); + consumer = await createConsumer(); + // const allPromises = []; + // allPromises.push(subscribe(consumer)); + // allPromises.push(insert()); + // await Promise.all(allPromises); + // await insert(); + await subscribe(consumer); + await consumer.unsubscribe(); + console.log("Consumer unsubscribed successfully."); + } + catch (err: any) { + console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); + throw err; + } + finally { + if (consumer) { + await consumer.close(); + console.log("Consumer closed successfully."); + } + destroy(); + } + // ANCHOR_END: unsubscribe +} + +async function test() { + await consumer(); +} + +test() diff --git a/nodejs/src/sql/wsSql.ts b/nodejs/src/sql/wsSql.ts index 2ec6997..7a859a2 100644 --- a/nodejs/src/sql/wsSql.ts +++ b/nodejs/src/sql/wsSql.ts @@ -126,7 +126,7 @@ export class WsSql{ logger.error("Executing SQL statement returns error: ", wsResponse.code, wsResponse.message); throw new TaosResultError(wsResponse.code, wsResponse.message); } - console.log(wsResponse.finished) + if (wsResponse.finished == 1) { break; } diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index 17c8a3c..12c49f7 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -75,12 +75,14 @@ export class WSTmqFetchBlockInfo { schema: Array; tableName?: string; taosResult: TaosResult; + schemaLen: number; rows: number; constructor(blockData: ArrayBuffer, taosResult: TaosResult) { // this.totalTime = resp.totalTime // this.blockData = resp.msg this.taosResult = taosResult; this.schema = []; + this.schemaLen = 0; let dataView = new DataView(blockData); blockData = this.skipHead(dataView); this.rows = this.parseBlockInfos(blockData); @@ -120,61 +122,80 @@ export class WSTmqFetchBlockInfo { } this.withTableName = dataView.getUint8(4) == 1? true : false; this.withSchema = dataView.getUint8(5) == 1? true : false; - logger.debug("parseBlockInfos blockNum="+ blockNum + ", withTableName=" + this.withTableName + ", withSchema=" + this.withSchema) + let dataBuffer = dataView.buffer.slice(6) let rows = 0; + // const parseStartTime = new Date().getTime(); for (let i = 0; i < blockNum; i++) { let variableInfo = this.parseVariableByteInteger(dataBuffer); + dataView = new DataView(variableInfo[1].slice(17)); this.taosResult.setPrecision(dataView.getUint8(0)); let offset = variableInfo[0] - 17; - dataBuffer = this.parseSchemaInfo(dataView.buffer.slice(offset)); + dataBuffer = this.parseSchemaInfo(dataView.buffer.slice(offset)); rows += this.parseTmqBlock(dataView.buffer.slice(1)); + } + // const parseEndTime = new Date().getTime(); + // console.log("------------->", parseEndTime- parseStartTime, rows); + logger.info("parseBlockInfos blockNum="+ blockNum + ", withTableName=" + this.withTableName + ", withSchema=" + this.withSchema + ", rows=" + rows) return rows; } private parseSchemaInfo(dataBuffer: ArrayBuffer) { if (this.withSchema) { - let variableInfo = this.parseVariableByteInteger(dataBuffer); - let cols = zigzagDecode(variableInfo[0]); - variableInfo = this.parseVariableByteInteger(variableInfo[1]); - let dataView = new DataView(variableInfo[1]) let isSkip = this.schema.length > 0 - for (let index = 0; index < cols; index++) { - let schema = new TMQRawDataSchema(); - schema.colType = dataView.getInt8(0); - schema.flag = dataView.getInt8(1); - variableInfo = this.parseVariableByteInteger(dataView.buffer.slice(2)); - schema.bytes = BigInt(zigzagDecode(variableInfo[0])); - variableInfo = this.parseVariableByteInteger(variableInfo[1]); - schema.colID = zigzagDecode(variableInfo[0]); + if (!isSkip) { + let variableInfo = this.parseVariableByteInteger(dataBuffer); + this.schemaLen = variableInfo[2]; + let cols = zigzagDecode(variableInfo[0]); variableInfo = this.parseVariableByteInteger(variableInfo[1]); - schema.name = getString(variableInfo[1], 0, variableInfo[0]); - if (!isSkip) { - this.taosResult.setMeta({ - name: schema.name, - type: schema.colType, - length: Number(schema.bytes) - } ); - this.schema.push(schema); + this.schemaLen += variableInfo[2]; + let dataView = new DataView(variableInfo[1]) + for (let index = 0; index < cols; index++) { + let schema = new TMQRawDataSchema(); + schema.colType = dataView.getInt8(0); + schema.flag = dataView.getInt8(1); + variableInfo = this.parseVariableByteInteger(dataView.buffer.slice(2)); + this.schemaLen += 2 + variableInfo[2]; + schema.bytes = BigInt(zigzagDecode(variableInfo[0])); + variableInfo = this.parseVariableByteInteger(variableInfo[1]); + this.schemaLen += variableInfo[2]; + schema.colID = zigzagDecode(variableInfo[0]); + variableInfo = this.parseVariableByteInteger(variableInfo[1]); + this.schemaLen += variableInfo[2]; + schema.name = getString(variableInfo[1], 0, variableInfo[0]); + + if (!isSkip) { + this.taosResult.setMeta({ + name: schema.name, + type: schema.colType, + length: Number(schema.bytes) + } ); + this.schema.push(schema); + } + dataView = new DataView(variableInfo[1].slice(variableInfo[0])) + this.schemaLen += variableInfo[0]; } - dataView = new DataView(variableInfo[1].slice(variableInfo[0])) - - } - if(this.withTableName) { - variableInfo = this.parseVariableByteInteger(dataView.buffer); - this.tableName = readVarchar(variableInfo[1], 0, variableInfo[0]); - dataView = new DataView(variableInfo[1].slice(variableInfo[0])) + if(this.withTableName) { + variableInfo = this.parseVariableByteInteger(dataView.buffer); + this.schemaLen += variableInfo[2]; + this.tableName = readVarchar(variableInfo[1], 0, variableInfo[0]); + dataView = new DataView(variableInfo[1].slice(variableInfo[0])); + this.schemaLen += variableInfo[0]; + } + console.log("this.schemaLen==>", this.schemaLen); + return dataView.buffer; + } else { + return dataBuffer.slice(this.schemaLen); } - return dataView.buffer; } return dataBuffer; } - private parseVariableByteInteger(dataBuffer: ArrayBuffer): [number, ArrayBuffer] { + private parseVariableByteInteger(dataBuffer: ArrayBuffer): [number, ArrayBuffer, number] { let value = 0; let multiplier = 1; let dataView = new DataView(dataBuffer); @@ -189,7 +210,7 @@ export class WSTmqFetchBlockInfo { count++; } - return [value, dataView.buffer.slice(count+1)] + return [value, dataView.buffer.slice(count+1), count+1] } private parseTmqBlock(dataBuffer: ArrayBuffer): number { diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index 9ab67cd..cd977e0 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -261,15 +261,21 @@ export class WsConsumer { }; let jsonStr = JSON.stringify(fetchMsg); logger.debug('[wsQueryInterface.fetch.fetchMsg]===>' + jsonStr); + // const startTime = new Date().getTime(); let result = await this._wsClient.sendMsg(jsonStr) let wsResponse = new WSFetchBlockResponse(result.msg) if (wsResponse && wsResponse.data && wsResponse.blockLen > 0) { + // const parseStartTime = new Date().getTime(); let wsTmqResponse = new WSTmqFetchBlockInfo(wsResponse.data, taosResult); logger.debug('[WSTmqFetchBlockInfo.fetchBlockData]===>' + wsTmqResponse.taosResult); if (wsTmqResponse.rows > 0) { + // const endTime = new Date().getTime(); + // console.log(endTime - parseStartTime, endTime - startTime); return true; } } + + return false; } @@ -293,6 +299,7 @@ export class WsConsumer { } let finish = false; + while (!finish) { finish = await this.fetchBlockData(pollResp, taosResult) } From 9a71a79b5c36e1cf64dfa98c13befe9b93a37155 Mon Sep 17 00:00:00 2001 From: menshibin Date: Fri, 11 Oct 2024 14:28:48 +0800 Subject: [PATCH 05/20] Optimize analysis --- nodejs/src/tmq/tmqResponse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index 12c49f7..06bc560 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -186,7 +186,7 @@ export class WSTmqFetchBlockInfo { dataView = new DataView(variableInfo[1].slice(variableInfo[0])); this.schemaLen += variableInfo[0]; } - console.log("this.schemaLen==>", this.schemaLen); + return dataView.buffer; } else { return dataBuffer.slice(this.schemaLen); From dd7e944ba8dd73d393493dfe9340e32d2f38cfe1 Mon Sep 17 00:00:00 2001 From: menshibin Date: Fri, 11 Oct 2024 17:21:06 +0800 Subject: [PATCH 06/20] Optimize analysis --- nodejs/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodejs/package.json b/nodejs/package.json index 24249cc..ff06053 100644 --- a/nodejs/package.json +++ b/nodejs/package.json @@ -4,7 +4,7 @@ "description": "TDengine Connector for nodejs and browser using WebSocket.", "source": "index.ts", "main": "lib/index.js", - "module": "lib/module/index.mjs", + "module": "lib/index.js", "types": "lib/index.d.ts", "directories": { "example": "example", From 2006d1f89ff0925be43fda45ec163149b671f80f Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 12 Oct 2024 10:26:03 +0800 Subject: [PATCH 07/20] modify dns address --- nodejs/example/basicBatchTmq.ts | 6 +++--- nodejs/example/basicSql.ts | 2 +- nodejs/example/basicTmq.ts | 4 ++-- nodejs/test/bulkPulling/sql.test.ts | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/nodejs/example/basicBatchTmq.ts b/nodejs/example/basicBatchTmq.ts index d1954e6..0e4efbc 100644 --- a/nodejs/example/basicBatchTmq.ts +++ b/nodejs/example/basicBatchTmq.ts @@ -5,7 +5,7 @@ import { WsConsumer } from "../src/tmq/wsTmq"; const db = 'power'; const stable = 'meters'; -const url = 'ws://192.168.1.98:6041'; +const url = 'ws://localhost:6041'; const topic = 'topic_meters' const topics = [topic]; const groupId = "group-28"; @@ -35,7 +35,7 @@ async function createConsumer() { // ANCHOR_END: create_consumer async function prepare() { - let conf = new WSConfig('ws://192.168.1.98:6041'); + let conf = new WSConfig('ws://localhost:6041'); conf.setUser('root'); conf.setPwd('taosdata'); conf.setDb(db); @@ -52,7 +52,7 @@ async function prepare() { } async function insert() { - let conf = new WSConfig('ws://192.168.1.98:6041'); + let conf = new WSConfig('ws://localhost:6041'); conf.setUser('root'); conf.setPwd('taosdata'); conf.setDb(db); diff --git a/nodejs/example/basicSql.ts b/nodejs/example/basicSql.ts index 200a9f9..85189ab 100644 --- a/nodejs/example/basicSql.ts +++ b/nodejs/example/basicSql.ts @@ -1,7 +1,7 @@ import { WSConfig } from '../src/common/config'; import { sqlConnect, destroy, setLogLevel } from '../src' -let dsn = 'ws://root:taosdata@192.168.1.98:6041'; +let dsn = 'ws://root:taosdata@localhost:6041'; (async () => { let wsSql = null; let wsRows = null; diff --git a/nodejs/example/basicTmq.ts b/nodejs/example/basicTmq.ts index 63ab597..cde8d77 100644 --- a/nodejs/example/basicTmq.ts +++ b/nodejs/example/basicTmq.ts @@ -12,11 +12,11 @@ let configMap = new Map([ [TMQConstants.CONNECT_PASS, "taosdata"], [TMQConstants.AUTO_OFFSET_RESET, "earliest"], [TMQConstants.CLIENT_ID, 'test_tmq_client11'], - [TMQConstants.WS_URL, 'ws://192.168.1.98:6041'], + [TMQConstants.WS_URL, 'ws://localhost:6041'], [TMQConstants.ENABLE_AUTO_COMMIT, 'false'], [TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] ]); -let dsn = 'ws://root:taosdata@192.168.1.98:6041'; +let dsn = 'ws://root:taosdata@localhost:6041'; async function Prepare() { let conf :WSConfig = new WSConfig(dsn) const createDB = `create database if not exists ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;` diff --git a/nodejs/test/bulkPulling/sql.test.ts b/nodejs/test/bulkPulling/sql.test.ts index 6029c2d..f76130b 100644 --- a/nodejs/test/bulkPulling/sql.test.ts +++ b/nodejs/test/bulkPulling/sql.test.ts @@ -2,7 +2,7 @@ import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool"; import { WSConfig } from "../../src/common/config"; import { WsSql } from "../../src/sql/wsSql"; -let dns = 'ws://192.168.1.98:6041' +let dns = 'ws://localhost:6041' beforeAll(async () => { let conf :WSConfig = new WSConfig(dns) conf.setUser('root') From f8a517ed33965a813c5df408199c842f837143b1 Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 12 Oct 2024 10:29:32 +0800 Subject: [PATCH 08/20] modify dns address --- nodejs/example/basicBatchTmq.ts | 4 ++-- nodejs/example/basicTmq.ts | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nodejs/example/basicBatchTmq.ts b/nodejs/example/basicBatchTmq.ts index 0e4efbc..8a50821 100644 --- a/nodejs/example/basicBatchTmq.ts +++ b/nodejs/example/basicBatchTmq.ts @@ -8,8 +8,8 @@ const stable = 'meters'; const url = 'ws://localhost:6041'; const topic = 'topic_meters' const topics = [topic]; -const groupId = "group-28"; -const clientId = "client-28"; +const groupId = "group-1"; +const clientId = "client-1"; async function createConsumer() { let configMap = new Map([ diff --git a/nodejs/example/basicTmq.ts b/nodejs/example/basicTmq.ts index cde8d77..f4bbc9a 100644 --- a/nodejs/example/basicTmq.ts +++ b/nodejs/example/basicTmq.ts @@ -3,8 +3,8 @@ import { TMQConstants } from "../src/tmq/constant"; import { destroy, setLogLevel, sqlConnect, tmqConnect } from "../src"; const stable = 'meters'; -const db = 'power18' -const topics:string[] = ['topic_ws_map11111'] +const db = 'power' +const topics:string[] = ['topic_ws_map'] let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` let configMap = new Map([ [TMQConstants.GROUP_ID, "gId_11"], @@ -44,7 +44,7 @@ async function Prepare() { consumer = await tmqConnect(configMap); await consumer.subscribe(topics); for (let i = 0; i < 5; i++) { - let res = await consumer.poll(5); + let res = await consumer.poll(100); for (let [key, value] of res) { console.log(key, value.getMeta()); let data = value.getData(); From b9e2609fa910b05030622c16701e507720836aab Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 12 Oct 2024 14:15:20 +0800 Subject: [PATCH 09/20] modify encode length bug --- nodejs/src/common/utils.ts | 28 ++++++++++++---------- nodejs/test/bulkPulling/schemaless.test.ts | 15 ++++++------ nodejs/test/bulkPulling/sql.test.ts | 2 +- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/nodejs/src/common/utils.ts b/nodejs/src/common/utils.ts index 13d0a81..6cacfe4 100644 --- a/nodejs/src/common/utils.ts +++ b/nodejs/src/common/utils.ts @@ -32,28 +32,32 @@ export function isEmpty(value: any): boolean { export function getBinarySql(action:bigint, reqId:bigint, resultId:bigint, sql?:string): ArrayBuffer{ // construct msg - let messageLen = 26; - if (sql) { - messageLen = 30 + sql.length; - } - let sqlBuffer = new ArrayBuffer(messageLen); - let sqlView = new DataView(sqlBuffer); - sqlView.setBigUint64(0, reqId, true); - sqlView.setBigInt64(8, resultId, true); - sqlView.setBigInt64(16, action, true); - sqlView.setInt16(24, 1, true); if (sql) { - sqlView.setInt32(26, sql.length, true); const encoder = new TextEncoder(); const buffer = encoder.encode(sql); + let messageLen = 30 + buffer.length; + let sqlBuffer = new ArrayBuffer(messageLen); + let sqlView = new DataView(sqlBuffer); + sqlView.setBigUint64(0, reqId, true); + sqlView.setBigInt64(8, resultId, true); + sqlView.setBigInt64(16, action, true); + sqlView.setInt16(24, 1, true); + sqlView.setInt32(26, buffer.length, true); let offset = 30; for (let i = 0; i < buffer.length; i++) { sqlView.setUint8(offset + i, buffer[i]); } - + return sqlBuffer; } + let messageLen = 26; + let sqlBuffer = new ArrayBuffer(messageLen); + let sqlView = new DataView(sqlBuffer); + sqlView.setBigUint64(0, reqId, true); + sqlView.setBigInt64(8, resultId, true); + sqlView.setBigInt64(16, action, true); + sqlView.setInt16(24, 1, true); return sqlBuffer; } diff --git a/nodejs/test/bulkPulling/schemaless.test.ts b/nodejs/test/bulkPulling/schemaless.test.ts index 52935c7..04ffe54 100644 --- a/nodejs/test/bulkPulling/schemaless.test.ts +++ b/nodejs/test/bulkPulling/schemaless.test.ts @@ -4,16 +4,15 @@ import { Precision, SchemalessProto } from "../../src/sql/wsProto"; import { WsSql } from "../../src/sql/wsSql"; let dns = 'ws://localhost:6041' - - beforeAll(async () => { let conf :WSConfig = new WSConfig(dns) conf.setUser('root') conf.setPwd('taosdata') let wsSql = await WsSql.open(conf) - await wsSql.exec('create database if not exists power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;'); - await wsSql.exec('CREATE STABLE if not exists power.meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);'); + await wsSql.exec('drop database if exists power_schemaless;') + await wsSql.exec('create database if not exists power_schemaless KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;'); + await wsSql.exec('CREATE STABLE if not exists power_schemaless.meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);'); await wsSql.close() }) @@ -28,7 +27,7 @@ describe('TDWebSocket.WsSchemaless()', () => { let conf :WSConfig = new WSConfig(dns) conf.setUser('root') conf.setPwd('taosdata') - conf.setDb('power') + conf.setDb('power_schemaless') let wsSchemaless = await WsSql.open(conf) expect(wsSchemaless.state()).toBeGreaterThan(0) await wsSchemaless.close(); @@ -57,7 +56,7 @@ describe('TDWebSocket.WsSchemaless()', () => { let conf :WSConfig = new WSConfig(dns) conf.setUser('root') conf.setPwd('taosdata') - conf.setDb('power') + conf.setDb('power_schemaless') let wsSchemaless = await WsSql.open(conf) expect(wsSchemaless.state()).toBeGreaterThan(0) await wsSchemaless.schemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NANO_SECONDS, 0); @@ -71,7 +70,7 @@ describe('TDWebSocket.WsSchemaless()', () => { let conf :WSConfig = new WSConfig(dns) conf.setUser('root') conf.setPwd('taosdata') - conf.setDb('power') + conf.setDb('power_schemaless') let wsSchemaless = await WsSql.open(conf) expect(wsSchemaless.state()).toBeGreaterThan(0) await wsSchemaless.schemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NOT_CONFIGURED, 0); @@ -87,7 +86,7 @@ describe('TDWebSocket.WsSchemaless()', () => { let conf :WSConfig = new WSConfig(dns) conf.setUser('root') conf.setPwd('taosdata') - conf.setDb('power') + conf.setDb('power_schemaless') let wsSchemaless = await WsSql.open(conf) expect(wsSchemaless.state()).toBeGreaterThan(0) try { diff --git a/nodejs/test/bulkPulling/sql.test.ts b/nodejs/test/bulkPulling/sql.test.ts index f76130b..c88b8eb 100644 --- a/nodejs/test/bulkPulling/sql.test.ts +++ b/nodejs/test/bulkPulling/sql.test.ts @@ -105,7 +105,7 @@ describe('TDWebSocket.WsSql()', () => { taosResult = await wsSql.exec('describe meters') console.log(taosResult); - taosResult = await wsSql.exec('INSERT INTO d1001 USING meters (location, groupid) TAGS ("California.SanFrancisco", 3) VALUES (NOW, 10.2, 219, 0.32)') + taosResult = await wsSql.exec('INSERT INTO d1001 USING meters (location, groupid) TAGS ("California", 3) VALUES (NOW, 10.2, 219, 0.32)') console.log(taosResult); expect(taosResult.getAffectRows()).toBeGreaterThanOrEqual(1) await wsSql.close() From 4fc44d75473cc4bf3f25677560068cf3fefff8f2 Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 12 Oct 2024 14:31:29 +0800 Subject: [PATCH 10/20] modify test case --- nodejs/test/bulkPulling/tmq.test.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/nodejs/test/bulkPulling/tmq.test.ts b/nodejs/test/bulkPulling/tmq.test.ts index b37f773..acace84 100644 --- a/nodejs/test/bulkPulling/tmq.test.ts +++ b/nodejs/test/bulkPulling/tmq.test.ts @@ -15,8 +15,8 @@ let createTopic = `create topic if not exists ${topics[0]} as select * from ${db let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` // let dropTopic2 = `DROP TOPIC IF EXISTS ${topic2};` -let dsn = 'ws://root:taosdata@localhost:6041'; -let tmqDsn = 'ws://localhost:6041' +let dsn = 'ws://root:taosdata@192.168.1.98:6041'; +let tmqDsn = 'ws://192.168.1.98:6041' beforeAll(async () => { let conf :WSConfig = new WSConfig(dsn) @@ -111,7 +111,7 @@ describe('TDWebSocket.Tmq()', () => { let assignment = await consumer.assignment() console.log(assignment) - let counts:number[]=[0, 0] + let counts:number = 0; let useTime:number[] = []; for (let i = 0; i < 5; i++) { let startTime = new Date().getTime(); @@ -148,9 +148,7 @@ describe('TDWebSocket.Tmq()', () => { break; } - for (let record of data ) { - console.log(record) - } + counts += data.length } // await Sleep(100) @@ -163,7 +161,7 @@ describe('TDWebSocket.Tmq()', () => { await consumer.close(); console.log("------------->", useTime) console.log("------------->", counts) - expect(counts).toEqual([10, 10]) + expect(counts).toEqual(10) }); test('Topic not exist', async() => { From 480985aef1df433a926bbaa5848995721d5a62ea Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 12 Oct 2024 14:42:06 +0800 Subject: [PATCH 11/20] modify test case --- nodejs/test/bulkPulling/tmq.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nodejs/test/bulkPulling/tmq.test.ts b/nodejs/test/bulkPulling/tmq.test.ts index acace84..60905f6 100644 --- a/nodejs/test/bulkPulling/tmq.test.ts +++ b/nodejs/test/bulkPulling/tmq.test.ts @@ -15,8 +15,8 @@ let createTopic = `create topic if not exists ${topics[0]} as select * from ${db let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` // let dropTopic2 = `DROP TOPIC IF EXISTS ${topic2};` -let dsn = 'ws://root:taosdata@192.168.1.98:6041'; -let tmqDsn = 'ws://192.168.1.98:6041' +let dsn = 'ws://root:taosdata@localhost:6041'; +let tmqDsn = 'ws://localhost:6041' beforeAll(async () => { let conf :WSConfig = new WSConfig(dsn) From a8c4dca1210d44f7a34f90d590ba39df93e81d2c Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 12 Oct 2024 14:43:04 +0800 Subject: [PATCH 12/20] modify version --- nodejs/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodejs/package.json b/nodejs/package.json index ff06053..4fe1b13 100644 --- a/nodejs/package.json +++ b/nodejs/package.json @@ -1,6 +1,6 @@ { "name": "@tdengine/websocket", - "version": "3.1.0", + "version": "3.1.1", "description": "TDengine Connector for nodejs and browser using WebSocket.", "source": "index.ts", "main": "lib/index.js", From bce4a7ad203bfc81be6a92dbbbe6ce1b24e68fa1 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 13:43:27 +0800 Subject: [PATCH 13/20] Optimize analysis dataBuffer slice copy --- nodejs/example/basicBatchTmq.ts | 9 +- nodejs/src/client/wsResponse.ts | 23 ++--- nodejs/src/common/taosResult.ts | 79 ++++++++------- nodejs/src/tmq/tmqResponse.ts | 101 ++++++++++---------- nodejs/src/tmq/wsTmq.ts | 4 +- nodejs/test/bulkPulling/queryTables.test.ts | 2 +- nodejs/test/bulkPulling/tmq.test.ts | 6 +- 7 files changed, 117 insertions(+), 107 deletions(-) diff --git a/nodejs/example/basicBatchTmq.ts b/nodejs/example/basicBatchTmq.ts index 8a50821..91faa6e 100644 --- a/nodejs/example/basicBatchTmq.ts +++ b/nodejs/example/basicBatchTmq.ts @@ -5,11 +5,11 @@ import { WsConsumer } from "../src/tmq/wsTmq"; const db = 'power'; const stable = 'meters'; -const url = 'ws://localhost:6041'; +const url = 'ws://192.168.1.98:6041'; const topic = 'topic_meters' const topics = [topic]; -const groupId = "group-1"; -const clientId = "client-1"; +const groupId = "group-50"; +const clientId = "client-50"; async function createConsumer() { let configMap = new Map([ @@ -19,7 +19,7 @@ async function createConsumer() { [TMQConstants.CONNECT_PASS, "taosdata"], [TMQConstants.AUTO_OFFSET_RESET, "earliest"], [TMQConstants.WS_URL, url], - [TMQConstants.ENABLE_AUTO_COMMIT, 'true'], + [TMQConstants.ENABLE_AUTO_COMMIT, 'false'], [TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] ]); try { @@ -102,6 +102,7 @@ async function subscribe(consumer: WsConsumer) { async function consumer() { // ANCHOR: unsubscribe + setLogLevel("debug"); let consumer = null; try { // await prepare(); diff --git a/nodejs/src/client/wsResponse.ts b/nodejs/src/client/wsResponse.ts index f4e96bd..81f77da 100644 --- a/nodejs/src/client/wsResponse.ts +++ b/nodejs/src/client/wsResponse.ts @@ -87,7 +87,7 @@ export class WSFetchResponse { } export class WSFetchBlockResponse { - data: ArrayBuffer | undefined + data: DataView | undefined action: bigint timing: bigint reqId: bigint @@ -98,32 +98,33 @@ export class WSFetchBlockResponse { finished: number | undefined metaType: number | undefined constructor(msg: ArrayBuffer) { - this.action = new DataView(msg, 8, 8).getBigUint64(0, true) - this.timing = new DataView(msg, 18, 8).getBigUint64(0, true) - this.reqId = new DataView(msg, 26, 8).getBigUint64(0, true) - this.code = new DataView(msg, 34, 4).getUint32(0, true) + let dataView = new DataView(msg); + this.action = dataView.getBigUint64(8, true) + this.timing = dataView.getBigUint64(18, true) + this.reqId = dataView.getBigUint64(26, true) + this.code = dataView.getUint32(34, true) this.blockLen = 0; if (this.code != 0) { - let len = new DataView(msg, 38, 4).getUint32(0, true) + let len = dataView.getUint32(38, true) this.message = readVarchar(msg, 42, len); return; } - this.resultId = new DataView(msg, 42, 8).getBigUint64(0, true) + this.resultId = dataView.getBigUint64(42, true) let offset = 50; if (this.action == BigInt(8)) { - this.metaType = new DataView(msg, 50, 2).getUint16(0, true) + this.metaType = dataView.getUint16(50, true) offset += 2; }else { - this.finished = new DataView(msg, 50, 1).getUint8(0) + this.finished = dataView.getUint8(50) if (this.finished == 1) { return; } offset += 1; } - this.blockLen = new DataView(msg, offset, 4).getUint32(0, true) + this.blockLen = dataView.getUint32(offset, true) if (this.blockLen > 0) { - this.data = msg.slice(offset + 4); + this.data = new DataView(msg, offset + 4); } } diff --git a/nodejs/src/common/taosResult.ts b/nodejs/src/common/taosResult.ts index d27ad34..2eb7620 100644 --- a/nodejs/src/common/taosResult.ts +++ b/nodejs/src/common/taosResult.ts @@ -157,7 +157,7 @@ export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult) let metaList = taosResult.getTaosMeta() let dataList = taosResult.getData() if (metaList && dataList && blocks && blocks.data) { - let rows = new DataView(blocks.data, 8, 4).getUint32(0, true); + let rows = blocks.data.getUint32(8, true); if (rows == 0) { return taosResult; } @@ -172,8 +172,10 @@ export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult) let bitMapSize = (rows + (1 << 3) - 1) >> 3 // whole raw block ArrayBuffer - let dataBuffer = blocks.data.slice(bufferOffset); - + // let dataBuffer = blocks.data.slice(bufferOffset); + let headOffset = blocks.data.byteOffset + bufferOffset; + let dataView = new DataView(blocks.data.buffer, headOffset); + console.log("---", headOffset, blocks.data.byteOffset, bufferOffset); // record the head of column in block let colBlockHead = 0; for (let i = 0; i < rows; i++) { @@ -192,34 +194,35 @@ export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult) let byteArrayIndex = i >> 3; let bitwiseOffset = 7 - (i & 7) - let bitMapArr = dataBuffer.slice(colBlockHead, colBlockHead + bitMapSize) - let bitFlag = ((new DataView(bitMapArr).getUint8(byteArrayIndex)) & (1 << bitwiseOffset)) >> bitwiseOffset + // let bitMapArr = dataBuffer.slice(colBlockHead, colBlockHead + bitMapSize) + let bitMapArr = new DataView(dataView.buffer, dataView.byteOffset + colBlockHead, bitMapSize); + let bitFlag = (bitMapArr.getUint8(byteArrayIndex) & (1 << bitwiseOffset)) >> bitwiseOffset if (bitFlag == 1) { row.push("NULL") } else { - row.push(readSolidData(dataBuffer, colDataHead, metaList[j])) + row.push(readSolidData(dataView, colDataHead, metaList[j])) } - colBlockHead = colBlockHead + bitMapSize + (new DataView(dataBuffer, INT_32_SIZE * j, INT_32_SIZE).getInt32(0, true)) + colBlockHead = colBlockHead + bitMapSize + dataView.getInt32(INT_32_SIZE * j, true) } else { // if null check - let varOffset = new DataView(dataBuffer, colBlockHead + (INT_32_SIZE * i), INT_32_SIZE).getInt32(0, true) + let varOffset = dataView.getInt32(colBlockHead + (INT_32_SIZE * i), true) if (varOffset == -1) { row.push("NULL") - colBlockHead = colBlockHead + INT_32_SIZE * rows + (new DataView(dataBuffer, j * INT_32_SIZE, INT_32_SIZE).getInt32(0, true)) + colBlockHead = colBlockHead + INT_32_SIZE * rows + dataView.getInt32(j * INT_32_SIZE, true); } else { colDataHead = colBlockHead + INT_32_SIZE * rows + varOffset - let dataLength = (new DataView(dataBuffer, colDataHead, 2).getInt16(0, true)) + let dataLength = dataView.getInt16(colDataHead, true); if (isVarType == ColumnsBlockType.VARCHAR) { - row.push(readVarchar(dataBuffer, colDataHead + 2, dataLength)) + row.push(readVarchar(dataView.buffer, dataView.byteOffset + colDataHead + 2, dataLength)) } else if(isVarType == ColumnsBlockType.GEOMETRY || isVarType == ColumnsBlockType.VARBINARY) { - row.push(readBinary(dataBuffer, colDataHead + 2, dataLength)) + row.push(readBinary(dataView.buffer, dataView.byteOffset + colDataHead + 2, dataLength)) } else { - row.push(readNchar(dataBuffer, colDataHead + 2, dataLength)) + row.push(readNchar(dataView.buffer, dataView.byteOffset+ colDataHead + 2, dataLength)) } - colBlockHead = colBlockHead + INT_32_SIZE * rows + (new DataView(dataBuffer, j * INT_32_SIZE, INT_32_SIZE).getInt32(0, true)) + colBlockHead = colBlockHead + INT_32_SIZE * rows + dataView.getInt32(j * INT_32_SIZE, true); } } } @@ -257,10 +260,9 @@ export function _isVarType(metaType: number): Number { } } } -export function readSolidDataToArray(buffer: ArrayBuffer, colBlockHead:number, - rows:number, metaType: number, bitMapArr: ArrayBuffer): any[] { +export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, + rows:number, metaType: number, bitMapArr: Uint8Array): any[] { - let dataBuffer = new DataView(buffer) let result:any[] = [] switch (metaType) { case TDengineTypeCode['BOOL']: @@ -374,44 +376,44 @@ export function readSolidDataToArray(buffer: ArrayBuffer, colBlockHead:number, return result; } -export function readSolidData(dataBuffer: ArrayBuffer, colDataHead: number, meta: ResponseMeta): Number | Boolean | BigInt { +export function readSolidData(dataBuffer: DataView, colDataHead: number, meta: ResponseMeta): Number | Boolean | BigInt { switch (meta.type) { case TDengineTypeCode['BOOL']: { - return (Boolean)(new DataView(dataBuffer, colDataHead, 1).getInt8(0)) + return (Boolean)(dataBuffer.getInt8(colDataHead)); } case TDengineTypeCode['TINYINT']: { - return (new DataView(dataBuffer, colDataHead, 1).getInt8(0)) + return dataBuffer.getInt8(colDataHead); } case TDengineTypeCode['SMALLINT']: { - return (new DataView(dataBuffer, colDataHead, 2).getInt16(0, true)) + return dataBuffer.getInt16(colDataHead, true); } case TDengineTypeCode['INT']: { - return (new DataView(dataBuffer, colDataHead, 4).getInt32(0, true)) + return dataBuffer.getInt32(colDataHead, true); } case TDengineTypeCode['BIGINT']: { - return (new DataView(dataBuffer, colDataHead, 8).getBigInt64(0, true)) + return dataBuffer.getBigInt64(colDataHead, true); } case TDengineTypeCode['TINYINT UNSIGNED']: { - return (new DataView(dataBuffer, colDataHead, 1).getUint8(0)) + return dataBuffer.getUint8(colDataHead); } case TDengineTypeCode['SMALLINT UNSIGNED']: { - return (new DataView(dataBuffer, colDataHead, 2).getUint16(0, true)) + return dataBuffer.getUint16(colDataHead, true); } case TDengineTypeCode['INT UNSIGNED']: { - return (new DataView(dataBuffer, colDataHead, 4).getUint32(0, true)) + return dataBuffer.getUint32(colDataHead, true); } case TDengineTypeCode['BIGINT UNSIGNED']: { - return (new DataView(dataBuffer, colDataHead, 8).getBigUint64(0, true)) + return dataBuffer.getBigUint64(colDataHead, true); } case TDengineTypeCode['FLOAT']: { - return (parseFloat(new DataView(dataBuffer, colDataHead, 4).getFloat32(0, true).toFixed(5)) ) + return parseFloat(dataBuffer.getFloat32(colDataHead, true).toFixed(5)); } case TDengineTypeCode['DOUBLE']: { - return (parseFloat(new DataView(dataBuffer, colDataHead, 8).getFloat64(0, true).toFixed(15))) + return parseFloat(dataBuffer.getFloat64(colDataHead, true).toFixed(15)); } case TDengineTypeCode['TIMESTAMP']: { - return (new DataView(dataBuffer, colDataHead, 8).getBigInt64(0, true)) + return dataBuffer.getBigInt64(colDataHead, true); // could change } default: { @@ -427,25 +429,28 @@ export function readBinary(dataBuffer: ArrayBuffer, colDataHead: number, length: export function readVarchar(dataBuffer: ArrayBuffer, colDataHead: number, length: number): string { let data = ""; - let buff = dataBuffer.slice(colDataHead, colDataHead + length) - data += new TextDecoder().decode(buff) + // let buff = dataBuffer.slice(colDataHead, colDataHead + length) + let dataView = new DataView(dataBuffer, colDataHead, length); + data += new TextDecoder().decode(dataView) return data; } export function readNchar(dataBuffer: ArrayBuffer, colDataHead: number, length: number): string { let data = ""; - let buff: ArrayBuffer = dataBuffer.slice(colDataHead, colDataHead + length); + // let buff: ArrayBuffer = dataBuffer.slice(colDataHead, colDataHead + length); + let dataView = new DataView(dataBuffer, colDataHead, length); for (let i = 0; i < length / 4; i++) { - data += appendRune(new DataView(buff, i * 4, 4).getUint32(0, true)) + data += appendRune(dataView.getUint32(i * 4, true)) } return data; } -export function getString(dataBuffer: ArrayBuffer, colDataHead: number, length: number): string { - let buff = dataBuffer.slice(colDataHead, colDataHead + length - 1) +export function getString(dataBuffer: DataView, colDataHead: number, length: number): string { + // let buff = dataBuffer.slice(colDataHead, colDataHead + length - 1) + let dataView = new Uint8Array(dataBuffer.buffer, dataBuffer.byteOffset + colDataHead, length - 1); let decoder = new TextDecoder('utf-8'); - return decoder.decode(new Uint8Array(buff)); + return decoder.decode(dataView); } function iteratorBuff(arr: ArrayBuffer) { diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index 06bc560..0069a9b 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -5,6 +5,7 @@ import { WebSocketInterfaceError, ErrorCode, TDWebSocketClientError } from "../c import { TMQBlockInfo, TMQRawDataSchema } from "./constant"; import { zigzagDecode } from "../common/utils"; import logger from "../common/log"; +import { off } from "process"; export class WsPollResponse { code: number; @@ -77,29 +78,28 @@ export class WSTmqFetchBlockInfo { taosResult: TaosResult; schemaLen: number; rows: number; - constructor(blockData: ArrayBuffer, taosResult: TaosResult) { + constructor(dataView: DataView, taosResult: TaosResult) { // this.totalTime = resp.totalTime // this.blockData = resp.msg this.taosResult = taosResult; this.schema = []; this.schemaLen = 0; - let dataView = new DataView(blockData); - blockData = this.skipHead(dataView); - this.rows = this.parseBlockInfos(blockData); + let blockDataView: DataView = this.skipHead(dataView); + this.rows = this.parseBlockInfos(blockDataView); } public getRows(): number{ return this.rows; } - private skipHead(dataView: DataView) { + private skipHead(dataView: DataView): DataView{ let v = dataView.getUint8(0); if (v >= 100) { let skip = dataView.getUint32(1, true); - return dataView.buffer.slice(skip + 5) + return new DataView(dataView.buffer, dataView.byteOffset + skip + 5); } let skip1 = this.getTypeSkip(v); v = dataView.getUint8(1 + skip1); let skip2 = this.getTypeSkip(v); - return dataView.buffer.slice(skip1 + 2 + skip2) + return new DataView(dataView.buffer, dataView.byteOffset + skip1 + 2 + skip2); } private getTypeSkip(v: number) { @@ -114,8 +114,7 @@ export class WSTmqFetchBlockInfo { } } - private parseBlockInfos(blockData: ArrayBuffer): number { - let dataView = new DataView(blockData) + private parseBlockInfos(dataView: DataView): number { let blockNum = dataView.getUint32(0, true); if (blockNum == 0) { return 0; @@ -123,18 +122,18 @@ export class WSTmqFetchBlockInfo { this.withTableName = dataView.getUint8(4) == 1? true : false; this.withSchema = dataView.getUint8(5) == 1? true : false; - let dataBuffer = dataView.buffer.slice(6) + // let dataBuffer = dataView.buffer.slice(6) + let dataBuffer = new DataView(dataView.buffer, dataView.byteOffset + 6); let rows = 0; // const parseStartTime = new Date().getTime(); + // console.log("parseBlockInfos blockNum="+ blockNum) for (let i = 0; i < blockNum; i++) { let variableInfo = this.parseVariableByteInteger(dataBuffer); - - dataView = new DataView(variableInfo[1].slice(17)); - this.taosResult.setPrecision(dataView.getUint8(0)); + this.taosResult.setPrecision(variableInfo[1].getUint8(17)); + dataView = new DataView(variableInfo[1].buffer, variableInfo[1].byteOffset + 17); let offset = variableInfo[0] - 17; - dataBuffer = this.parseSchemaInfo(dataView.buffer.slice(offset)); - rows += this.parseTmqBlock(dataView.buffer.slice(1)); - + dataBuffer = this.parseSchemaInfo(dataView, offset); + rows += this.parseTmqBlock(dataView, 1); } // const parseEndTime = new Date().getTime(); // console.log("------------->", parseEndTime- parseStartTime, rows); @@ -143,21 +142,22 @@ export class WSTmqFetchBlockInfo { } - private parseSchemaInfo(dataBuffer: ArrayBuffer) { + private parseSchemaInfo(dataBuffer: DataView, offset: number) { if (this.withSchema) { let isSkip = this.schema.length > 0 if (!isSkip) { + dataBuffer = new DataView(dataBuffer.buffer, dataBuffer.byteOffset + offset); let variableInfo = this.parseVariableByteInteger(dataBuffer); this.schemaLen = variableInfo[2]; let cols = zigzagDecode(variableInfo[0]); variableInfo = this.parseVariableByteInteger(variableInfo[1]); this.schemaLen += variableInfo[2]; - let dataView = new DataView(variableInfo[1]) + let dataView = variableInfo[1]; for (let index = 0; index < cols; index++) { let schema = new TMQRawDataSchema(); schema.colType = dataView.getInt8(0); schema.flag = dataView.getInt8(1); - variableInfo = this.parseVariableByteInteger(dataView.buffer.slice(2)); + variableInfo = this.parseVariableByteInteger(dataView, 2); this.schemaLen += 2 + variableInfo[2]; schema.bytes = BigInt(zigzagDecode(variableInfo[0])); variableInfo = this.parseVariableByteInteger(variableInfo[1]); @@ -175,33 +175,33 @@ export class WSTmqFetchBlockInfo { } ); this.schema.push(schema); } - dataView = new DataView(variableInfo[1].slice(variableInfo[0])) + dataView = new DataView(variableInfo[1].buffer, variableInfo[1].byteOffset + variableInfo[0]); this.schemaLen += variableInfo[0]; } if(this.withTableName) { - variableInfo = this.parseVariableByteInteger(dataView.buffer); + variableInfo = this.parseVariableByteInteger(dataView); this.schemaLen += variableInfo[2]; - this.tableName = readVarchar(variableInfo[1], 0, variableInfo[0]); - dataView = new DataView(variableInfo[1].slice(variableInfo[0])); + this.tableName = readVarchar(variableInfo[1].buffer, variableInfo[1].byteOffset, variableInfo[0]); + dataView = new DataView(variableInfo[1].buffer, variableInfo[1].byteOffset + variableInfo[0]); this.schemaLen += variableInfo[0]; } - return dataView.buffer; + return dataView; } else { - return dataBuffer.slice(this.schemaLen); + return new DataView(dataBuffer.buffer, dataBuffer.byteOffset + this.schemaLen + offset); + } } return dataBuffer; } - private parseVariableByteInteger(dataBuffer: ArrayBuffer): [number, ArrayBuffer, number] { + private parseVariableByteInteger(dataView: DataView, offset: number = 0): [number, DataView, number] { let value = 0; let multiplier = 1; - let dataView = new DataView(dataBuffer); let count = 0; while (true) { - let encodedByte = dataView.getUint8(count); + let encodedByte = dataView.getUint8(count + offset); value += (encodedByte&127) * multiplier; if ((encodedByte & 128) == 0) { break; @@ -210,16 +210,16 @@ export class WSTmqFetchBlockInfo { count++; } - return [value, dataView.buffer.slice(count+1), count+1] + return [value, new DataView(dataView.buffer, dataView.byteOffset + count + 1 + offset), count+1] } - private parseTmqBlock(dataBuffer: ArrayBuffer): number { - let dataView = new DataView(dataBuffer) - let rows = dataView.getInt32(8, true); + private parseTmqBlock(dataView: DataView, startOffset: number): number { + // let dataView = new DataView(dataBuffer) + let rows = dataView.getInt32(8 + startOffset, true); if (rows == 0) { return rows; } - + let taosData = this.taosResult.getData() let metaData = this.taosResult.getMeta() if (metaData && rows && taosData) { @@ -227,58 +227,60 @@ export class WSTmqFetchBlockInfo { //get bitmap length let bitMapOffset:number = getBitmapLen(rows); //skip data head - let bufferOffset = 28 + 5 * this.schema.length - - dataBuffer = dataBuffer.slice(bufferOffset); + let bufferOffset = 28 + 5 * this.schema.length + startOffset; let metaLens:number[]= [] for (let i = 0; i< this.schema.length; i++) { //get data len - metaLens.push(new DataView(dataBuffer, i*4, 4).getInt32(0, true)) + metaLens.push(dataView.getInt32(bufferOffset + i*4, true)); } - bufferOffset = this.schema.length * 4; - + + bufferOffset += this.schema.length * 4; for (let i = 0; i < this.schema.length; i++) { let data:any[] = []; //get type code let isVarType = _isVarType(this.schema[i].colType) //fixed length type if (isVarType == ColumnsBlockType.SOLID) { - let bitMapArr = dataBuffer.slice(bufferOffset, bufferOffset + bitMapOffset); + // let bitMapArr = dataBuffer.slice(bufferOffset, bufferOffset + bitMapOffset); + let bitMapArr = new Uint8Array(dataView.buffer, dataView.byteOffset + bufferOffset, bitMapOffset) bufferOffset += bitMapOffset; //decode column data, data is array - data = readSolidDataToArray(dataBuffer, bufferOffset, rows, this.schema[i].colType, bitMapArr); + data = readSolidDataToArray(dataView, bufferOffset, rows, this.schema[i].colType, bitMapArr); + } else { //Variable length type - let offset = bufferOffset; + let start = bufferOffset; let offsets:number[]= []; - for (let i = 0; i< rows; i++, offset += TDengineTypeLength['INT']) { + for (let i = 0; i< rows; i++, start += TDengineTypeLength['INT']) { //get data length, -1 is null - offsets.push(new DataView(dataBuffer, offset, 4).getInt32(0, true)) + offsets.push(dataView.getInt32(start, true)) } - let start = offset + for (let i = 0; i< rows; i++) { let value:any = '' if (-1 == offsets[i]) { value = null }else{ let header = start + offsets[i]; - let dataLength = new DataView(dataBuffer, header, 2).getInt16(0, true) & 0xFFFF; + let dataLength = dataView.getInt16(header, true) & 0xFFFF; if (isVarType == ColumnsBlockType.VARCHAR) { //decode var char - value = readVarchar(dataBuffer, header + 2, dataLength) + value = readVarchar(dataView.buffer, dataView.byteOffset + header + 2, dataLength) } else if(isVarType == ColumnsBlockType.GEOMETRY || isVarType == ColumnsBlockType.VARBINARY) { //decode binary - value = readBinary(dataBuffer, header + 2, dataLength) + value = readBinary(dataView.buffer, dataView.byteOffset + header + 2, dataLength) } else { //decode nchar - value = readNchar(dataBuffer, header + 2, dataLength) + value = readNchar(dataView.buffer, dataView.byteOffset + header + 2, dataLength) } } + data.push(value); } bufferOffset += rows * 4 } + bufferOffset += metaLens[i] //column data to row data for (let row = 0; row < data.length; row++) { @@ -291,6 +293,7 @@ export class WSTmqFetchBlockInfo { taosData.push(...dataList); } + return rows; } diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index cd977e0..2f78cce 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -275,7 +275,7 @@ export class WsConsumer { } } - + return false; } @@ -303,7 +303,7 @@ export class WsConsumer { while (!finish) { finish = await this.fetchBlockData(pollResp, taosResult) } - + return taosResults; } diff --git a/nodejs/test/bulkPulling/queryTables.test.ts b/nodejs/test/bulkPulling/queryTables.test.ts index 9ecbf4d..0ad918e 100644 --- a/nodejs/test/bulkPulling/queryTables.test.ts +++ b/nodejs/test/bulkPulling/queryTables.test.ts @@ -4,7 +4,7 @@ import { WSConfig } from "../../src/common/config"; import { WsSql } from "../../src/sql/wsSql"; import { compareUint8Arrays, createSTable, createSTableJSON, createTable, expectStableData, hexToBytes, insertNTable, insertStable, jsonMeta, tableMeta, tagMeta } from "../utils"; // const DSN = 'ws://root:taosdata@127.0.0.1:6041' -let dsn = 'ws://root:taosdata@localhost:6041'; +let dsn = 'ws://root:taosdata@192.168.1.98:6041'; let conf :WSConfig = new WSConfig(dsn) const resultMap:Map = new Map(); resultMap.set("POINT (4.0 8.0)", hexToBytes("010100000000000000000010400000000000002040")); diff --git a/nodejs/test/bulkPulling/tmq.test.ts b/nodejs/test/bulkPulling/tmq.test.ts index 60905f6..cedf927 100644 --- a/nodejs/test/bulkPulling/tmq.test.ts +++ b/nodejs/test/bulkPulling/tmq.test.ts @@ -15,8 +15,8 @@ let createTopic = `create topic if not exists ${topics[0]} as select * from ${db let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` // let dropTopic2 = `DROP TOPIC IF EXISTS ${topic2};` -let dsn = 'ws://root:taosdata@localhost:6041'; -let tmqDsn = 'ws://localhost:6041' +let dsn = 'ws://root:taosdata@192.168.1.98:6041'; +let tmqDsn = 'ws://192.168.1.98:6041' beforeAll(async () => { let conf :WSConfig = new WSConfig(dsn) @@ -127,7 +127,7 @@ describe('TDWebSocket.Tmq()', () => { } for (let record of data ) { - console.log(record) + console.log("-----===>>", record) } } From a735d44f4b89f934a423ef3e59c01ba0359ad429 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 13:46:59 +0800 Subject: [PATCH 14/20] Optimize analysis dataBuffer slice copy --- nodejs/example/basicBatchTmq.ts | 2 +- nodejs/test/bulkPulling/queryTables.test.ts | 2 +- nodejs/test/bulkPulling/tmq.test.ts | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nodejs/example/basicBatchTmq.ts b/nodejs/example/basicBatchTmq.ts index 91faa6e..1060d21 100644 --- a/nodejs/example/basicBatchTmq.ts +++ b/nodejs/example/basicBatchTmq.ts @@ -5,7 +5,7 @@ import { WsConsumer } from "../src/tmq/wsTmq"; const db = 'power'; const stable = 'meters'; -const url = 'ws://192.168.1.98:6041'; +const url = 'ws://localhost:6041'; const topic = 'topic_meters' const topics = [topic]; const groupId = "group-50"; diff --git a/nodejs/test/bulkPulling/queryTables.test.ts b/nodejs/test/bulkPulling/queryTables.test.ts index 0ad918e..9ecbf4d 100644 --- a/nodejs/test/bulkPulling/queryTables.test.ts +++ b/nodejs/test/bulkPulling/queryTables.test.ts @@ -4,7 +4,7 @@ import { WSConfig } from "../../src/common/config"; import { WsSql } from "../../src/sql/wsSql"; import { compareUint8Arrays, createSTable, createSTableJSON, createTable, expectStableData, hexToBytes, insertNTable, insertStable, jsonMeta, tableMeta, tagMeta } from "../utils"; // const DSN = 'ws://root:taosdata@127.0.0.1:6041' -let dsn = 'ws://root:taosdata@192.168.1.98:6041'; +let dsn = 'ws://root:taosdata@localhost:6041'; let conf :WSConfig = new WSConfig(dsn) const resultMap:Map = new Map(); resultMap.set("POINT (4.0 8.0)", hexToBytes("010100000000000000000010400000000000002040")); diff --git a/nodejs/test/bulkPulling/tmq.test.ts b/nodejs/test/bulkPulling/tmq.test.ts index cedf927..b446612 100644 --- a/nodejs/test/bulkPulling/tmq.test.ts +++ b/nodejs/test/bulkPulling/tmq.test.ts @@ -15,8 +15,8 @@ let createTopic = `create topic if not exists ${topics[0]} as select * from ${db let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};` // let dropTopic2 = `DROP TOPIC IF EXISTS ${topic2};` -let dsn = 'ws://root:taosdata@192.168.1.98:6041'; -let tmqDsn = 'ws://192.168.1.98:6041' +let dsn = 'ws://root:taosdata@localhost:6041'; +let tmqDsn = 'ws://localhost:6041' beforeAll(async () => { let conf :WSConfig = new WSConfig(dsn) From 0fdfbd22bc7a6c516a49547c77339cf1f78c1435 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 14:10:41 +0800 Subject: [PATCH 15/20] Optimize analysis dataBuffer slice copy --- nodejs/example/basicSql.ts | 22 ++++++++++++---------- nodejs/src/common/taosResult.ts | 1 - 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/nodejs/example/basicSql.ts b/nodejs/example/basicSql.ts index 85189ab..ec3be1e 100644 --- a/nodejs/example/basicSql.ts +++ b/nodejs/example/basicSql.ts @@ -1,7 +1,7 @@ import { WSConfig } from '../src/common/config'; import { sqlConnect, destroy, setLogLevel } from '../src' -let dsn = 'ws://root:taosdata@localhost:6041'; +let dsn = 'ws://root:taosdata@192.168.1.98:6041'; (async () => { let wsSql = null; let wsRows = null; @@ -33,16 +33,18 @@ let dsn = 'ws://root:taosdata@localhost:6041'; taosResult = await wsSql.exec('INSERT INTO d1001 USING meters TAGS ("California.SanFrancisco", 3) VALUES (NOW, 10.2, 219, 0.32)', reqId++) console.log(taosResult); - - wsRows = await wsSql.query('select * from meters', reqId++); - let meta = wsRows.getMeta() - console.log("wsRow:meta:=>", meta); - - while (await wsRows.next()) { - let result = wsRows.getData(); - console.log('queryRes.Scan().then=>', result); + for (let i = 0; i< 100; i++) { + wsRows = await wsSql.query('select * from meters', reqId++); + let meta = wsRows.getMeta() + console.log("wsRow:meta:=>", meta); + + while (await wsRows.next()) { + let result = wsRows.getData(); + console.log('queryRes.Scan().then=>', result); + } + wsRows.close() } - wsRows.close() + } catch (err: any) { console.error(err.code, err.message); diff --git a/nodejs/src/common/taosResult.ts b/nodejs/src/common/taosResult.ts index 2eb7620..49665c4 100644 --- a/nodejs/src/common/taosResult.ts +++ b/nodejs/src/common/taosResult.ts @@ -175,7 +175,6 @@ export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult) // let dataBuffer = blocks.data.slice(bufferOffset); let headOffset = blocks.data.byteOffset + bufferOffset; let dataView = new DataView(blocks.data.buffer, headOffset); - console.log("---", headOffset, blocks.data.byteOffset, bufferOffset); // record the head of column in block let colBlockHead = 0; for (let i = 0; i < rows; i++) { From 8181a2538548ef1f82059730d417dd6a8025ea9d Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 15:13:24 +0800 Subject: [PATCH 16/20] Optimize analysis dataBuffer slice copy --- nodejs/src/client/wsResponse.ts | 4 +++- nodejs/src/common/taosResult.ts | 20 +++++++++----------- nodejs/src/tmq/tmqResponse.ts | 11 ++++++----- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/nodejs/src/client/wsResponse.ts b/nodejs/src/client/wsResponse.ts index 81f77da..b66259a 100644 --- a/nodejs/src/client/wsResponse.ts +++ b/nodejs/src/client/wsResponse.ts @@ -97,16 +97,18 @@ export class WSFetchBlockResponse { resultId: bigint | undefined finished: number | undefined metaType: number | undefined + textDecoder: TextDecoder constructor(msg: ArrayBuffer) { let dataView = new DataView(msg); this.action = dataView.getBigUint64(8, true) this.timing = dataView.getBigUint64(18, true) this.reqId = dataView.getBigUint64(26, true) this.code = dataView.getUint32(34, true) + this.textDecoder = new TextDecoder() this.blockLen = 0; if (this.code != 0) { let len = dataView.getUint32(38, true) - this.message = readVarchar(msg, 42, len); + this.message = readVarchar(msg, 42, len, this.textDecoder); return; } this.resultId = dataView.getBigUint64(42, true) diff --git a/nodejs/src/common/taosResult.ts b/nodejs/src/common/taosResult.ts index 49665c4..e25dadd 100644 --- a/nodejs/src/common/taosResult.ts +++ b/nodejs/src/common/taosResult.ts @@ -156,6 +156,7 @@ export class TaosResult { export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult): TaosResult { let metaList = taosResult.getTaosMeta() let dataList = taosResult.getData() + let textDecoder = new TextDecoder() if (metaList && dataList && blocks && blocks.data) { let rows = blocks.data.getUint32(8, true); if (rows == 0) { @@ -215,7 +216,7 @@ export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult) colDataHead = colBlockHead + INT_32_SIZE * rows + varOffset let dataLength = dataView.getInt16(colDataHead, true); if (isVarType == ColumnsBlockType.VARCHAR) { - row.push(readVarchar(dataView.buffer, dataView.byteOffset + colDataHead + 2, dataLength)) + row.push(readVarchar(dataView.buffer, dataView.byteOffset + colDataHead + 2, dataLength, textDecoder)) } else if(isVarType == ColumnsBlockType.GEOMETRY || isVarType == ColumnsBlockType.VARBINARY) { row.push(readBinary(dataView.buffer, dataView.byteOffset + colDataHead + 2, dataLength)) } else { @@ -426,30 +427,27 @@ export function readBinary(dataBuffer: ArrayBuffer, colDataHead: number, length: return buff } -export function readVarchar(dataBuffer: ArrayBuffer, colDataHead: number, length: number): string { - let data = ""; +export function readVarchar(dataBuffer: ArrayBuffer, colDataHead: number, length: number, textDecoder: TextDecoder): string { // let buff = dataBuffer.slice(colDataHead, colDataHead + length) let dataView = new DataView(dataBuffer, colDataHead, length); - data += new TextDecoder().decode(dataView) - return data; + return textDecoder.decode(dataView); } export function readNchar(dataBuffer: ArrayBuffer, colDataHead: number, length: number): string { - let data = ""; + let data: string[] = []; // let buff: ArrayBuffer = dataBuffer.slice(colDataHead, colDataHead + length); let dataView = new DataView(dataBuffer, colDataHead, length); for (let i = 0; i < length / 4; i++) { - data += appendRune(dataView.getUint32(i * 4, true)) + data.push(appendRune(dataView.getUint32(i * 4, true))); } - return data; + return data.join(''); } -export function getString(dataBuffer: DataView, colDataHead: number, length: number): string { +export function getString(dataBuffer: DataView, colDataHead: number, length: number, textDecoder: TextDecoder): string { // let buff = dataBuffer.slice(colDataHead, colDataHead + length - 1) let dataView = new Uint8Array(dataBuffer.buffer, dataBuffer.byteOffset + colDataHead, length - 1); - let decoder = new TextDecoder('utf-8'); - return decoder.decode(dataView); + return textDecoder.decode(dataView); } function iteratorBuff(arr: ArrayBuffer) { diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index 0069a9b..1fab866 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -5,7 +5,6 @@ import { WebSocketInterfaceError, ErrorCode, TDWebSocketClientError } from "../c import { TMQBlockInfo, TMQRawDataSchema } from "./constant"; import { zigzagDecode } from "../common/utils"; import logger from "../common/log"; -import { off } from "process"; export class WsPollResponse { code: number; @@ -20,7 +19,7 @@ export class WsPollResponse { id: bigint; message_type:number; totalTime:number; - + constructor(resp:MessageResp) { this.totalTime = resp.totalTime this.code = resp.msg.code; @@ -78,9 +77,11 @@ export class WSTmqFetchBlockInfo { taosResult: TaosResult; schemaLen: number; rows: number; + textDecoder: TextDecoder; constructor(dataView: DataView, taosResult: TaosResult) { // this.totalTime = resp.totalTime // this.blockData = resp.msg + this.textDecoder = new TextDecoder(); this.taosResult = taosResult; this.schema = []; this.schemaLen = 0; @@ -165,7 +166,7 @@ export class WSTmqFetchBlockInfo { schema.colID = zigzagDecode(variableInfo[0]); variableInfo = this.parseVariableByteInteger(variableInfo[1]); this.schemaLen += variableInfo[2]; - schema.name = getString(variableInfo[1], 0, variableInfo[0]); + schema.name = getString(variableInfo[1], 0, variableInfo[0], this.textDecoder); if (!isSkip) { this.taosResult.setMeta({ @@ -182,7 +183,7 @@ export class WSTmqFetchBlockInfo { if(this.withTableName) { variableInfo = this.parseVariableByteInteger(dataView); this.schemaLen += variableInfo[2]; - this.tableName = readVarchar(variableInfo[1].buffer, variableInfo[1].byteOffset, variableInfo[0]); + this.tableName = readVarchar(variableInfo[1].buffer, variableInfo[1].byteOffset, variableInfo[0], this.textDecoder); dataView = new DataView(variableInfo[1].buffer, variableInfo[1].byteOffset + variableInfo[0]); this.schemaLen += variableInfo[0]; } @@ -265,7 +266,7 @@ export class WSTmqFetchBlockInfo { let dataLength = dataView.getInt16(header, true) & 0xFFFF; if (isVarType == ColumnsBlockType.VARCHAR) { //decode var char - value = readVarchar(dataView.buffer, dataView.byteOffset + header + 2, dataLength) + value = readVarchar(dataView.buffer, dataView.byteOffset + header + 2, dataLength, this.textDecoder) } else if(isVarType == ColumnsBlockType.GEOMETRY || isVarType == ColumnsBlockType.VARBINARY) { //decode binary value = readBinary(dataView.buffer, dataView.byteOffset + header + 2, dataLength) From 43fbc5597dec61efdcab7c979b22ad760c0136b9 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 15:22:43 +0800 Subject: [PATCH 17/20] Optimize analysis dataBuffer slice copy --- nodejs/src/tmq/tmqResponse.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index 1fab866..4418192 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -18,8 +18,7 @@ export class WsPollResponse { message_id: number; id: bigint; message_type:number; - totalTime:number; - + totalTime:number; constructor(resp:MessageResp) { this.totalTime = resp.totalTime this.code = resp.msg.code; @@ -77,7 +76,7 @@ export class WSTmqFetchBlockInfo { taosResult: TaosResult; schemaLen: number; rows: number; - textDecoder: TextDecoder; + textDecoder: TextDecoder; constructor(dataView: DataView, taosResult: TaosResult) { // this.totalTime = resp.totalTime // this.blockData = resp.msg @@ -127,7 +126,7 @@ export class WSTmqFetchBlockInfo { let dataBuffer = new DataView(dataView.buffer, dataView.byteOffset + 6); let rows = 0; // const parseStartTime = new Date().getTime(); - // console.log("parseBlockInfos blockNum="+ blockNum) + console.log("parseBlockInfos blockNum="+ blockNum) for (let i = 0; i < blockNum; i++) { let variableInfo = this.parseVariableByteInteger(dataBuffer); this.taosResult.setPrecision(variableInfo[1].getUint8(17)); @@ -266,6 +265,7 @@ export class WSTmqFetchBlockInfo { let dataLength = dataView.getInt16(header, true) & 0xFFFF; if (isVarType == ColumnsBlockType.VARCHAR) { //decode var char + value = readVarchar(dataView.buffer, dataView.byteOffset + header + 2, dataLength, this.textDecoder) } else if(isVarType == ColumnsBlockType.GEOMETRY || isVarType == ColumnsBlockType.VARBINARY) { //decode binary From 1de7c2809816ab2d49aef5b2a7059a9e58fa1ec5 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 15:44:08 +0800 Subject: [PATCH 18/20] Optimize analysis StringIndexable to enum --- nodejs/example/basicBatchTmq.ts | 2 +- nodejs/src/common/constant.ts | 40 ++++++++--------- nodejs/src/common/taosResult.ts | 62 ++++++++++++------------- nodejs/src/stmt/wsParams.ts | 80 ++++++++++++++++----------------- 4 files changed, 92 insertions(+), 92 deletions(-) diff --git a/nodejs/example/basicBatchTmq.ts b/nodejs/example/basicBatchTmq.ts index 1060d21..91faa6e 100644 --- a/nodejs/example/basicBatchTmq.ts +++ b/nodejs/example/basicBatchTmq.ts @@ -5,7 +5,7 @@ import { WsConsumer } from "../src/tmq/wsTmq"; const db = 'power'; const stable = 'meters'; -const url = 'ws://localhost:6041'; +const url = 'ws://192.168.1.98:6041'; const topic = 'topic_meters' const topics = [topic]; const groupId = "group-50"; diff --git a/nodejs/src/common/constant.ts b/nodejs/src/common/constant.ts index a7b83fd..3b695f4 100644 --- a/nodejs/src/common/constant.ts +++ b/nodejs/src/common/constant.ts @@ -39,26 +39,26 @@ export const ColumnsBlockType: StringIndexable = { } -export const TDengineTypeCode: StringIndexable = { - 'NULL': 0, - 'BOOL': 1, - 'TINYINT': 2, - 'SMALLINT': 3, - 'INT': 4, - 'BIGINT': 5, - 'FLOAT': 6, - 'DOUBLE': 7, - 'BINARY': 8, - 'VARCHAR': 8, - 'TIMESTAMP': 9, - 'NCHAR': 10, - 'TINYINT UNSIGNED': 11, - 'SMALLINT UNSIGNED': 12, - 'INT UNSIGNED': 13, - 'BIGINT UNSIGNED': 14, - 'JSON': 15, - 'VARBINARY': 16, - 'GEOMETRY': 20, +export enum TDengineTypeCode { + NULL = 0, + BOOL = 1, + TINYINT = 2, + SMALLINT = 3, + INT = 4, + BIGINT = 5, + FLOAT = 6, + DOUBLE = 7, + BINARY = 8, + VARCHAR = 8, + TIMESTAMP = 9, + NCHAR = 10, + TINYINT_UNSIGNED = 11, + SMALLINT_UNSIGNED = 12, + INT_UNSIGNED = 13, + BIGINT_UNSIGNED = 14, + JSON = 15, + VARBINARY = 16, + GEOMETRY = 20, } export const TDenginePrecision: IndexableString = { diff --git a/nodejs/src/common/taosResult.ts b/nodejs/src/common/taosResult.ts index e25dadd..e8d9768 100644 --- a/nodejs/src/common/taosResult.ts +++ b/nodejs/src/common/taosResult.ts @@ -237,23 +237,23 @@ export function parseBlock(blocks: WSFetchBlockResponse, taosResult: TaosResult) export function _isVarType(metaType: number): Number { switch (metaType) { - case TDengineTypeCode['NCHAR']: { + case TDengineTypeCode.NCHAR: { return ColumnsBlockType['NCHAR'] } - case TDengineTypeCode['VARCHAR']: { + case TDengineTypeCode.VARCHAR: { return ColumnsBlockType['VARCHAR'] } - case TDengineTypeCode['BINARY']: { + case TDengineTypeCode.BINARY: { return ColumnsBlockType['VARCHAR'] } - case TDengineTypeCode['JSON']: { + case TDengineTypeCode.JSON: { return ColumnsBlockType['VARCHAR'] } - case TDengineTypeCode['GEOMETRY']: { + case TDengineTypeCode.GEOMETRY: { return ColumnsBlockType['GEOMETRY'] } - case TDengineTypeCode['VARBINARY']: { - return ColumnsBlockType['VARBINARY'] + case TDengineTypeCode.VARBINARY: { + return ColumnsBlockType.VARBINARY } default: { return ColumnsBlockType['SOLID'] @@ -265,9 +265,9 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, let result:any[] = [] switch (metaType) { - case TDengineTypeCode['BOOL']: - case TDengineTypeCode['TINYINT']: - case TDengineTypeCode['TINYINT UNSIGNED']:{ + case TDengineTypeCode.BOOL: + case TDengineTypeCode.TINYINT: + case TDengineTypeCode.TINYINT_UNSIGNED:{ for (let i = 0; i < rows; i++, colBlockHead++) { if (isNull(bitMapArr, i)) { result.push(null); @@ -278,7 +278,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['SMALLINT']: { + case TDengineTypeCode.SMALLINT: { for (let i = 0; i < rows; i++, colBlockHead+=2) { if (isNull(bitMapArr, i)) { result.push(null); @@ -288,7 +288,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['INT']: { + case TDengineTypeCode.INT: { for (let i = 0; i < rows; i++, colBlockHead+=4) { if (isNull(bitMapArr, i)) { result.push(null); @@ -298,7 +298,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['BIGINT']: { + case TDengineTypeCode.BIGINT: { for (let i = 0; i < rows; i++, colBlockHead+=8) { if (isNull(bitMapArr, i)) { result.push(null); @@ -308,7 +308,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['SMALLINT UNSIGNED']: { + case TDengineTypeCode.SMALLINT_UNSIGNED: { for (let i = 0; i < rows; i++, colBlockHead+=2) { if (isNull(bitMapArr, i)) { result.push(null); @@ -318,7 +318,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['INT UNSIGNED']: { + case TDengineTypeCode.INT_UNSIGNED: { for (let i = 0; i < rows; i++, colBlockHead+=4) { if (isNull(bitMapArr, i)) { result.push(null); @@ -328,7 +328,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['BIGINT UNSIGNED']: { + case TDengineTypeCode.BIGINT_UNSIGNED: { for (let i = 0; i < rows; i++, colBlockHead+=8) { if (isNull(bitMapArr, i)) { result.push(null); @@ -338,7 +338,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['FLOAT']: { + case TDengineTypeCode.FLOAT: { for (let i = 0; i < rows; i++, colBlockHead+=4) { if (isNull(bitMapArr, i)) { result.push(null); @@ -348,7 +348,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['DOUBLE']: { + case TDengineTypeCode.DOUBLE: { for (let i = 0; i < rows; i++, colBlockHead += 8) { if (isNull(bitMapArr, i)) { @@ -359,7 +359,7 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, } break; } - case TDengineTypeCode['TIMESTAMP']: { + case TDengineTypeCode.TIMESTAMP: { for (let i = 0; i < rows; i++, colBlockHead += 8) { if (isNull(bitMapArr, i)) { result.push(null); @@ -379,40 +379,40 @@ export function readSolidDataToArray(dataBuffer: DataView, colBlockHead:number, export function readSolidData(dataBuffer: DataView, colDataHead: number, meta: ResponseMeta): Number | Boolean | BigInt { switch (meta.type) { - case TDengineTypeCode['BOOL']: { + case TDengineTypeCode.BOOL: { return (Boolean)(dataBuffer.getInt8(colDataHead)); } - case TDengineTypeCode['TINYINT']: { + case TDengineTypeCode.TINYINT: { return dataBuffer.getInt8(colDataHead); } - case TDengineTypeCode['SMALLINT']: { + case TDengineTypeCode.SMALLINT: { return dataBuffer.getInt16(colDataHead, true); } - case TDengineTypeCode['INT']: { + case TDengineTypeCode.INT: { return dataBuffer.getInt32(colDataHead, true); } - case TDengineTypeCode['BIGINT']: { + case TDengineTypeCode.BIGINT: { return dataBuffer.getBigInt64(colDataHead, true); } - case TDengineTypeCode['TINYINT UNSIGNED']: { + case TDengineTypeCode.TINYINT_UNSIGNED: { return dataBuffer.getUint8(colDataHead); } - case TDengineTypeCode['SMALLINT UNSIGNED']: { + case TDengineTypeCode.SMALLINT_UNSIGNED: { return dataBuffer.getUint16(colDataHead, true); } - case TDengineTypeCode['INT UNSIGNED']: { + case TDengineTypeCode.INT_UNSIGNED: { return dataBuffer.getUint32(colDataHead, true); } - case TDengineTypeCode['BIGINT UNSIGNED']: { + case TDengineTypeCode.BIGINT_UNSIGNED: { return dataBuffer.getBigUint64(colDataHead, true); } - case TDengineTypeCode['FLOAT']: { + case TDengineTypeCode.FLOAT: { return parseFloat(dataBuffer.getFloat32(colDataHead, true).toFixed(5)); } - case TDengineTypeCode['DOUBLE']: { + case TDengineTypeCode.DOUBLE: { return parseFloat(dataBuffer.getFloat64(colDataHead, true).toFixed(15)); } - case TDengineTypeCode['TIMESTAMP']: { + case TDengineTypeCode.TIMESTAMP: { return dataBuffer.getBigInt64(colDataHead, true); // could change } diff --git a/nodejs/src/stmt/wsParams.ts b/nodejs/src/stmt/wsParams.ts index b10e992..bc237ad 100644 --- a/nodejs/src/stmt/wsParams.ts +++ b/nodejs/src/stmt/wsParams.ts @@ -45,33 +45,33 @@ export class StmtBindParams { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetBooleanColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "boolean", TDengineTypeLength['BOOL'], TDengineTypeCode['BOOL']) + let arrayBuffer = this.encodeDigitColumns(params, "boolean", TDengineTypeLength['BOOL'], TDengineTypeCode.BOOL) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['BOOL'], TDengineTypeLength['BOOL'])) ; + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.BOOL, TDengineTypeLength['BOOL'])) ; } setTinyInt(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetTinyIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['TINYINT'], TDengineTypeCode['TINYINT']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['TINYINT'], TDengineTypeLength['TINYINT'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['TINYINT'], TDengineTypeCode.TINYINT) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.TINYINT, TDengineTypeLength['TINYINT'])); } setUTinyInt(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetUTinyIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['TINYINT UNSIGNED'], TDengineTypeCode['TINYINT UNSIGNED']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['TINYINT UNSIGNED'], TDengineTypeLength['TINYINT UNSIGNED'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['TINYINT UNSIGNED'], TDengineTypeCode.TINYINT_UNSIGNED) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.TINYINT_UNSIGNED, TDengineTypeLength['TINYINT UNSIGNED'])); } setSmallInt(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetSmallIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['SMALLINT'], TDengineTypeCode['SMALLINT']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['SMALLINT'], TDengineTypeLength['SMALLINT'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['SMALLINT'], TDengineTypeCode.SMALLINT) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.SMALLINT, TDengineTypeLength['SMALLINT'])); } @@ -79,81 +79,81 @@ export class StmtBindParams { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetSmallIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['SMALLINT UNSIGNED'], TDengineTypeCode['SMALLINT UNSIGNED']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['SMALLINT UNSIGNED'], TDengineTypeLength['SMALLINT UNSIGNED'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['SMALLINT UNSIGNED'], TDengineTypeCode.SMALLINT_UNSIGNED) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.SMALLINT_UNSIGNED, TDengineTypeLength['SMALLINT UNSIGNED'])); } setInt(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['INT'], TDengineTypeCode['INT']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['INT'], TDengineTypeLength['INT'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['INT'], TDengineTypeCode.INT) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.INT, TDengineTypeLength['INT'])); } setUInt(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetUIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['INT UNSIGNED'], TDengineTypeCode['INT UNSIGNED']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['INT UNSIGNED'], TDengineTypeLength['INT UNSIGNED'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['INT UNSIGNED'], TDengineTypeCode.INT_UNSIGNED) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.INT_UNSIGNED, TDengineTypeLength['INT UNSIGNED'])); } setBigint(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetBigIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "bigint", TDengineTypeLength['BIGINT'], TDengineTypeCode['BIGINT']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['INT'], TDengineTypeLength['BIGINT'])); + let arrayBuffer = this.encodeDigitColumns(params, "bigint", TDengineTypeLength['BIGINT'], TDengineTypeCode.BIGINT) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.INT, TDengineTypeLength['BIGINT'])); } setUBigint(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetUBigIntColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "bigint", TDengineTypeLength['BIGINT UNSIGNED'], TDengineTypeCode['BIGINT UNSIGNED']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['BIGINT UNSIGNED'], TDengineTypeLength['BIGINT UNSIGNED'])); + let arrayBuffer = this.encodeDigitColumns(params, "bigint", TDengineTypeLength['BIGINT UNSIGNED'], TDengineTypeCode.BIGINT_UNSIGNED) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.BIGINT_UNSIGNED, TDengineTypeLength['BIGINT UNSIGNED'])); } setFloat(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetFloatColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['FLOAT'], TDengineTypeCode['FLOAT']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['FLOAT'], TDengineTypeLength['FLOAT'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['FLOAT'], TDengineTypeCode.FLOAT) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.FLOAT, TDengineTypeLength['FLOAT'])); } setDouble(params :any[]) { if (!params || params.length == 0) { throw new TaosError(ErrorCode.ERR_INVALID_PARAMS, "SetDoubleColumn params is invalid!"); } - let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['DOUBLE'], TDengineTypeCode['DOUBLE']) - this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode['DOUBLE'], TDengineTypeLength['DOUBLE'])); + let arrayBuffer = this.encodeDigitColumns(params, "number", TDengineTypeLength['DOUBLE'], TDengineTypeCode.DOUBLE) + this._params.push(new ColumnInfo(arrayBuffer, TDengineTypeCode.DOUBLE, TDengineTypeLength['DOUBLE'])); } setVarchar(params :any[]) { let data = this.encodeVarLengthColumn(params) - this._params.push(new ColumnInfo(data, TDengineTypeCode['VARCHAR'], 0)); + this._params.push(new ColumnInfo(data, TDengineTypeCode.VARCHAR, 0)); } setBinary(params :any[]) { - this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode['BINARY'], 0)); + this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode.BINARY, 0)); } setNchar(params :any[]) { - this._params.push(new ColumnInfo(this.encodeNcharColumn(params), TDengineTypeCode['NCHAR'], 0)); + this._params.push(new ColumnInfo(this.encodeNcharColumn(params), TDengineTypeCode.NCHAR, 0)); } setJson(params :any[]) { - this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode['JSON'], 0)); + this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode.JSON, 0)); } setVarBinary(params :any[]) { - this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode['VARBINARY'], 0)); + this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode.VARBINARY, 0)); } setGeometry(params :any[]) { - this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode['GEOMETRY'], 0)); + this._params.push(new ColumnInfo(this.encodeVarLengthColumn(params), TDengineTypeCode.GEOMETRY, 0)); } setTimestamp(params :any[]) { @@ -224,7 +224,7 @@ export class StmtBindParams { } this._dataTotalLen += arrayBuffer.byteLength; - this._params.push(new ColumnInfo([TDengineTypeLength['TIMESTAMP'] * params.length, arrayBuffer], TDengineTypeCode['TIMESTAMP'], TDengineTypeLength['TIMESTAMP'])); + this._params.push(new ColumnInfo([TDengineTypeLength['TIMESTAMP'] * params.length, arrayBuffer], TDengineTypeCode.TIMESTAMP, TDengineTypeLength['TIMESTAMP'])); } @@ -245,7 +245,7 @@ export class StmtBindParams { if (!isEmpty(params[i])) { if (typeof params[i] == dataType) { switch (columnType) { - case TDengineTypeCode['BOOL']: { + case TDengineTypeCode.BOOL: { if (params[i]) { dataBuffer.setInt8(i, 1); } else { @@ -253,48 +253,48 @@ export class StmtBindParams { } break; } - case TDengineTypeCode['TINYINT']: { + case TDengineTypeCode.TINYINT: { dataBuffer.setInt8(i, params[i]); break; } - case TDengineTypeCode['TINYINT UNSIGNED']: { + case TDengineTypeCode.TINYINT_UNSIGNED: { dataBuffer.setUint8(i, params[i]); break; } - case TDengineTypeCode['SMALLINT']: { + case TDengineTypeCode.SMALLINT: { dataBuffer.setInt16(i * 2, params[i], true); break; } - case TDengineTypeCode['SMALLINT UNSIGNED']: { + case TDengineTypeCode.SMALLINT_UNSIGNED: { dataBuffer.setUint16(i * 2, params[i], true); break; } - case TDengineTypeCode['INT']: { + case TDengineTypeCode.INT: { dataBuffer.setInt32(i * 4, params[i], true); break; } - case TDengineTypeCode['INT UNSIGNED']: { + case TDengineTypeCode.INT_UNSIGNED: { dataBuffer.setUint32(i * 4, params[i], true); break; } - case TDengineTypeCode['BIGINT']: { + case TDengineTypeCode.BIGINT: { dataBuffer.setBigInt64(i * 8, params[i], true); break; } - case TDengineTypeCode['BIGINT UNSIGNED']: { + case TDengineTypeCode.BIGINT_UNSIGNED: { dataBuffer.setBigUint64(i * 8, params[i], true); break; } - case TDengineTypeCode['FLOAT']: { + case TDengineTypeCode.FLOAT: { dataBuffer.setFloat32(i * 4, params[i], true); break; } - case TDengineTypeCode['DOUBLE']: { + case TDengineTypeCode.DOUBLE: { dataBuffer.setFloat64(i * 8, params[i], true); break; } From f4c02418f33b94d3479ed960c51fd5eb817cb2cc Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 15:44:58 +0800 Subject: [PATCH 19/20] modify test ip --- nodejs/example/basicBatchTmq.ts | 2 +- nodejs/example/basicSql.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nodejs/example/basicBatchTmq.ts b/nodejs/example/basicBatchTmq.ts index 91faa6e..1060d21 100644 --- a/nodejs/example/basicBatchTmq.ts +++ b/nodejs/example/basicBatchTmq.ts @@ -5,7 +5,7 @@ import { WsConsumer } from "../src/tmq/wsTmq"; const db = 'power'; const stable = 'meters'; -const url = 'ws://192.168.1.98:6041'; +const url = 'ws://localhost:6041'; const topic = 'topic_meters' const topics = [topic]; const groupId = "group-50"; diff --git a/nodejs/example/basicSql.ts b/nodejs/example/basicSql.ts index ec3be1e..7001502 100644 --- a/nodejs/example/basicSql.ts +++ b/nodejs/example/basicSql.ts @@ -1,7 +1,7 @@ import { WSConfig } from '../src/common/config'; import { sqlConnect, destroy, setLogLevel } from '../src' -let dsn = 'ws://root:taosdata@192.168.1.98:6041'; +let dsn = 'ws://root:taosdata@localhost:6041'; (async () => { let wsSql = null; let wsRows = null; From 5d278c5f627cec9345810864c6d9d1f31cec6d41 Mon Sep 17 00:00:00 2001 From: menshibin Date: Mon, 14 Oct 2024 16:48:51 +0800 Subject: [PATCH 20/20] Optimize analysis StringIndexable to enum --- nodejs/src/tmq/tmqResponse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodejs/src/tmq/tmqResponse.ts b/nodejs/src/tmq/tmqResponse.ts index 4418192..18caf11 100644 --- a/nodejs/src/tmq/tmqResponse.ts +++ b/nodejs/src/tmq/tmqResponse.ts @@ -126,7 +126,7 @@ export class WSTmqFetchBlockInfo { let dataBuffer = new DataView(dataView.buffer, dataView.byteOffset + 6); let rows = 0; // const parseStartTime = new Date().getTime(); - console.log("parseBlockInfos blockNum="+ blockNum) + // console.log("parseBlockInfos blockNum="+ blockNum) for (let i = 0; i < blockNum; i++) { let variableInfo = this.parseVariableByteInteger(dataBuffer); this.taosResult.setPrecision(variableInfo[1].getUint8(17));