Skip to content

Commit

Permalink
Merge pull request #64 from taosdata/fix-covered
Browse files Browse the repository at this point in the history
remove deprecated code
  • Loading branch information
menshibin authored Nov 1, 2024
2 parents 436febd + fddf41d commit 33a2049
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
cd TDengine
mkdir debug
cd debug
cmake .. -DBUILD_HTTP=false -DBUILD_JDBC=false -DBUILD_TOOLS=false -DBUILD_TEST=off
cmake .. -DBUILD_HTTP=false -DBUILD_JDBC=false -DBUILD_TOOLS=false -DBUILD_TEST=off -DBUILD_DEPENDENCY_TESTS=false
make -j 4
sudo make install
which taosd
Expand Down
56 changes: 0 additions & 56 deletions nodejs/src/client/wsClient.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import JSONBig from 'json-bigint';
import { WebSocketConnector } from './wsConnector';
import { WebSocketConnectionPool } from './wsConnectorPool'
import { parseBlock, MessageResp, TaosResult } from '../common/taosResult';
import { ErrorCode, TDWebSocketClientError, WebSocketInterfaceError, WebSocketQueryError } from '../common/wsError';
import {
WSVersionResponse,
WSFetchBlockResponse,
WSQueryResponse,
WSFetchResponse,
} from './wsResponse';
import { ReqId } from '../common/reqid';
import logger from '../common/log';
Expand Down Expand Up @@ -144,59 +141,6 @@ export class WsClient {

}

async fetch(res: WSQueryResponse): Promise<WSFetchResponse> {
let fetchMsg = {
action: 'fetch',
args: {
req_id: ReqId.getReqID(),
id: res.id,
},
};
return new Promise((resolve, reject) => {
let jsonStr = JSONBig.stringify(fetchMsg);
logger.debug('[wsQueryInterface.fetch.fetchMsg]===>' + jsonStr);
if (this._wsConnector && this._wsConnector.readyState() > 0) {
this._wsConnector.sendMsg(jsonStr).then((e: any) => {
if (e.msg.code == 0) {
resolve(new WSFetchResponse(e));
} else {
reject(new WebSocketInterfaceError(e.msg.code, e.msg.message));
}
}).catch((e) => {reject(e);});
} else {
reject(new TDWebSocketClientError(ErrorCode.ERR_CONNECTION_CLOSED, "invalid websocket connect"));
}

});
}

async fetchBlock(fetchResponse: WSFetchResponse, taosResult: TaosResult): Promise<TaosResult> {
let fetchBlockMsg = {
action: 'fetch_block',
args: {
req_id: ReqId.getReqID(),
id: fetchResponse.id,
},
};

return new Promise((resolve, reject) => {
let jsonStr = JSONBig.stringify(fetchBlockMsg);
logger.debug("[wsQueryInterface.fetchBlock.fetchBlockMsg]===>" + jsonStr)
if (this._wsConnector && this._wsConnector.readyState() > 0) {
this._wsConnector.sendMsg(jsonStr).then((e: any) => {
let resp:MessageResp = e
taosResult.addTotalTime(resp.totalTime)
// if retrieve JSON then reject with message
// else is binary , so parse raw block to TaosResult
parseBlock(new WSFetchBlockResponse(resp.msg), taosResult)
resolve(taosResult);
}).catch((e) => reject(e));
} else {
reject(new TDWebSocketClientError(ErrorCode.ERR_CONNECTION_CLOSED, "invalid websocket connect"));
}
});
}

async sendMsg(msg:string): Promise<any> {
return new Promise((resolve, reject) => {
logger.debug("[wsQueryInterface.sendMsg]===>" + msg)
Expand Down
25 changes: 0 additions & 25 deletions nodejs/src/client/wsResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,6 @@ export class WSQueryResponse {
}
}

export class WSFetchResponse {
code: number;
message: string;
action: string;
req_id: number;
timing: bigint;
id: bigint;
completed: boolean;
length: Array<number>;
rows: number;
totalTime: number;
constructor(resp:MessageResp) {
this.totalTime = resp.totalTime
this.code = resp.msg.code;
this.message = resp.msg.message;
this.action = resp.msg.action;
this.req_id = resp.msg.req_id;
this.timing = BigInt(resp.msg.timing);
this.id = BigInt(resp.msg.id);
this.completed = resp.msg.completed;
this.length = resp.msg.length;
this.rows = resp.msg.rows;
}
}

export class WSFetchBlockResponse {
data: DataView | undefined
action: bigint
Expand Down
1 change: 0 additions & 1 deletion nodejs/src/sql/wsSql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { WsStmt } from '../stmt/wsStmt'
import { ReqId } from '../common/reqid'
import { BinaryQueryMessage, FetchRawBlockMessage, PrecisionLength } from '../common/constant'
import logger from '../common/log'
import { log } from 'console'

export class WsSql{
private wsConfig:WSConfig;
Expand Down
22 changes: 2 additions & 20 deletions nodejs/src/tmq/wsTmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ export class WsConsumer {
},
};

let resp = await this._wsClient.exec(JSON.stringify(queryMsg), false);
return new CommittedResp(resp).setTopicPartitions(offsets);
let resp = await this._wsClient.exec(JSON.stringify(queryMsg), false);
return new CommittedResp(resp).setTopicPartitions(offsets);
}

async commitOffsets(partitions:Array<TopicPartition>):Promise<Array<TopicPartition>> {
Expand Down Expand Up @@ -237,20 +237,6 @@ export class WsConsumer {
await this._wsClient.close();
}

private async fetch(pollResp: WsPollResponse):Promise<WsTmqQueryResponse> {
let fetchMsg = {
action: 'fetch',
args: {
req_id: ReqId.getReqID(),
message_id:pollResp.message_id,
},
};
let jsonStr = JSON.stringify(fetchMsg);
logger.debug('[wsQueryInterface.fetch.fetchMsg]===>' + jsonStr);
let result = await this._wsClient.exec(jsonStr, false);
return new WsTmqQueryResponse(result);
}

private async fetchBlockData(pollResp: WsPollResponse, taosResult: TaosTmqResult):Promise<boolean> {
let fetchMsg = {
action: 'fetch_raw_data',
Expand All @@ -261,16 +247,12 @@ export class WsConsumer {
};
let jsonStr = JSON.stringify(fetchMsg);
logger.debug('[wsQueryInterface.fetch.fetchMsg]===>' + jsonStr);
// const startTime = new Date().getTime();
let result = await this._wsClient.sendMsg(jsonStr)
let wsResponse = new WSFetchBlockResponse(result.msg)
if (wsResponse && wsResponse.data && wsResponse.blockLen > 0) {
// const parseStartTime = new Date().getTime();
let wsTmqResponse = new WSTmqFetchBlockInfo(wsResponse.data, taosResult);
logger.debug('[WSTmqFetchBlockInfo.fetchBlockData]===>' + wsTmqResponse.taosResult);
if (wsTmqResponse.rows > 0) {
// const endTime = new Date().getTime();
// console.log(endTime - parseStartTime, endTime - startTime);
return true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion nodejs/test/bulkPulling/stmt.func.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ beforeAll(async () => {
})
describe('TDWebSocket.Stmt()', () => {
jest.setTimeout(20 * 1000)
let tags = ['California.SanFrancisco', 3];
let tags = ['California', 3];
let multi = [
// [1709183268567],
// [10.2],
Expand Down
7 changes: 7 additions & 0 deletions nodejs/test/bulkPulling/tmq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,16 @@ describe('TDWebSocket.Tmq()', () => {
}
// await Sleep(100)
}
let topicArray = await consumer.subscription();
expect(topics.length).toEqual(topicArray.length);
for (let index = 0; index < topicArray.length; index++) {
expect(topics[index]).toEqual(topicArray[index]);
}

assignment = await consumer.commit();
console.log(assignment)
assignment = await consumer.committed(assignment)
assignment = await consumer.commitOffsets(assignment)
console.log(assignment)
await consumer.unsubscribe()
await consumer.close();
Expand Down
32 changes: 16 additions & 16 deletions nodejs/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,24 +285,24 @@ export function hexToBytes(hex: string): ArrayBuffer {
return a.buffer
}

export function createStmtData(varbinary:string = "ab",
geoHex:string = "0101000020E6100000000000000000F03F0000000000000040"):Array<Array<any>> {
let multi:any[][] = [
[1709183268567, 1709183268568, 1709183268569],
[10.2, 10.3, 10.4],
[292, 293, 294],
[0.32, 0.33, 0.34],
];
let res = hexToBytes(geoHex)
let geom = Array.from(new Uint8Array(res))
multi.push([geom, geom, geom])
// export function createStmtData(varbinary:string = "ab",
// geoHex:string = "0101000020E6100000000000000000F03F0000000000000040"):Array<Array<any>> {
// let multi:any[][] = [
// [1709183268567, 1709183268568, 1709183268569],
// [10.2, 10.3, 10.4],
// [292, 293, 294],
// [0.32, 0.33, 0.34],
// ];
// let res = hexToBytes(geoHex)
// let geom = Array.from(new Uint8Array(res))
// multi.push([geom, geom, geom])


res = new TextEncoder().encode(varbinary)
let binary = Array.from(new Uint8Array(res))
multi.push([binary, binary, binary])
return multi
}
// res = new TextEncoder().encode(varbinary)
// let binary = Array.from(new Uint8Array(res))
// multi.push([binary, binary, binary])
// return multi
// }

export function compareUint8Arrays(arr1: Uint8Array, arr2: Uint8Array): boolean {
if (arr1.length !== arr2.length) {
Expand Down

0 comments on commit 33a2049

Please sign in to comment.