Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RS-418: Implement remove record API #92

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added:

- RS-418: `Bucket.reomveRecord`, `Bucket.beginRemoveBatch` and `Bucket.removeQuery` methods to remove records, [PR-92](https://github.com/reductstore/reduct-js/pull/92)

## [1.11.0] - 2024-08-20

### Added:
Expand Down
71 changes: 49 additions & 22 deletions src/Batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import Stream from "stream";
export enum BatchType {
WRITE,
UPDATE,
REMOVE,
}

export class Batch {
Expand Down Expand Up @@ -67,7 +68,7 @@ export class Batch {
/**
* Add only labels to batch
* Use for updating labels
* @param ts
* @param ts timestamp of record as a UNIX timestamp in microseconds
* @param labels
*/
public addOnlyLabels(ts: bigint, labels: LabelMap): void {
Expand All @@ -78,6 +79,19 @@ export class Batch {
});
}

/**
* Add only timestamp to batch
* Use for removing records
* @param ts timestamp of record as a UNIX timestamp in microseconds
*/
public addOnlyTimestamp(ts: bigint): void {
this.records.set(ts, {
data: Buffer.from(""),
contentType: "",
labels: {},
});
}

/**
* Write batch to entry
*/
Expand Down Expand Up @@ -108,27 +122,40 @@ export class Batch {
}

let response;
if (this.type == BatchType.UPDATE) {
headers["Content-Length"] = "0";
response = await this.httpClient.patch(
`/b/${this.bucketName}/${this.entryName}/batch`,
"",
{
headers,
},
);
} else {
headers["Content-Length"] = contentLength.toString();
headers["Content-Type"] = "application/octet-stream";

const stream = Stream.Readable.from(chunks);
response = await this.httpClient.post(
`/b/${this.bucketName}/${this.entryName}/batch`,
stream,
{
headers,
},
);
switch (this.type) {
case BatchType.WRITE: {
headers["Content-Length"] = contentLength.toString();
headers["Content-Type"] = "application/octet-stream";

const stream = Stream.Readable.from(chunks);
response = await this.httpClient.post(
`/b/${this.bucketName}/${this.entryName}/batch`,
stream,
{
headers,
},
);
break;
}
case BatchType.UPDATE:
headers["Content-Length"] = "0";
response = await this.httpClient.patch(
`/b/${this.bucketName}/${this.entryName}/batch`,
"",
{
headers,
},
);
break;
case BatchType.REMOVE:
headers["Content-Length"] = "0";
response = await this.httpClient.delete(
`/b/${this.bucketName}/${this.entryName}/batch`,
{
headers,
},
);
break;
}

const errors = new Map<bigint, APIError>();
Expand Down
114 changes: 84 additions & 30 deletions src/Bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import { BucketInfo } from "./messages/BucketInfo";
import { EntryInfo } from "./messages/EntryInfo";
import { LabelMap, ReadableRecord, WritableRecord } from "./Record";
import { APIError } from "./APIError";
import { Readable } from "stream";
import Stream, { Readable } from "stream";
import { Buffer } from "buffer";
import { Batch, BatchType } from "./Batch";
import { isCompatibale } from "./Client";
import Stream from "stream";

/**
* Options for querying records
Expand All @@ -22,7 +21,7 @@ export interface QueryOptions {
eachN?: number; // return each N-th record
limit?: number; // limit number of records
continuous?: boolean; // await for new records
poolInterval?: number; // interval for pooling new records (only for continue=true)
pollInterval?: number; // interval for polling new records (only for continue=true)
head?: boolean; // return only head of the record
}

Expand Down Expand Up @@ -120,6 +119,45 @@ export class Bucket {
await this.httpClient.delete(`/b/${this.name}/${entry}`);
}

/**
* Remove a record
* @param entry {string} name of the entry
* @param ts {BigInt} timestamp of record in microseconds
*/
async removeRecord(entry: string, ts: bigint): Promise<void> {
await this.httpClient.delete(`/b/${this.name}/${entry}?ts=${ts}`);
}

/**
* Remove a batch of records
* @param entry {string} name of the entry
* @param tsList {BigInt[]} list of timestamps of records in microseconds
*/
async beginRemoveBatch(entry: string): Promise<Batch> {
return new Batch(this.name, entry, this.httpClient, BatchType.REMOVE);
}

/**
* Remove records by query
* @param entry {string} name of the entry
* @param start {BigInt} start point of the time period, if undefined, the query starts from the first record
* @param stop {BigInt} stop point of the time period. If undefined, the query stops at the last record
* @param QueryOptions {QueryOptions} options for query. You can use only include, exclude, eachS, eachN other options are ignored
*/
async removeQuery(
entry: string,
start?: bigint,
stop?: bigint,
QueryOptions?: QueryOptions,
): Promise<void> {
const ret = this.parse_query_params(start, stop, QueryOptions);

const { data } = await this.httpClient.delete(
`/b/${this.name}/${entry}/q?${ret.query}`,
);
return Promise.resolve(data["removed_records"]);
}

/**
* Start writing a record into an entry
* @param entry name of the entry
Expand Down Expand Up @@ -217,9 +255,39 @@ export class Bucket {
stop?: bigint,
options?: number | QueryOptions,
): AsyncGenerator<ReadableRecord> {
const params: string[] = [];
const ret = this.parse_query_params(start, stop, options);

const url = `/b/${this.name}/${entry}/q?` + ret.query;
const { data, headers } = await this.httpClient.get(url);
const { id } = data;
const header_api_version = headers["x-reduct-api"];
if (isCompatibale("1.5", header_api_version) && !this.isBrowser) {
yield* this.fetchAndParseBatchedRecords(
entry,
id,
ret.continuous,
ret.pollInterval,
ret.head,
);
} else {
yield* this.fetchAndParseSingleRecord(
entry,
id,
ret.continuous,
ret.pollInterval,
ret.head,
);
}
}

private parse_query_params(
start?: bigint,
stop?: bigint,
options?: QueryOptions | number,
) {
let continueQuery = false;
let poolInterval = 1;
const params: string[] = [];
let head = false;

if (start !== undefined) {
Expand Down Expand Up @@ -267,49 +335,35 @@ export class Bucket {
params.push(`continuous=${options.continuous ? "true" : "false"}`);
continueQuery = options.continuous;

if (options.poolInterval !== undefined) {
if (options.pollInterval !== undefined) {
// eslint-disable-next-line prefer-destructuring
poolInterval = options.poolInterval;
poolInterval = options.pollInterval;
}

// Set default TTL for continue query as 2 * poolInterval
if (options.ttl === undefined) {
params.push(`ttl=${poolInterval * 2}`);
}
}

continueQuery = options.continuous ?? false;
poolInterval = options.pollInterval ?? 1;
head = options.head ?? false;
}
}

const url = `/b/${this.name}/${entry}/q?` + params.join("&");
const { data, headers } = await this.httpClient.get(url);
const { id } = data;
const header_api_version = headers["x-reduct-api"];
if (isCompatibale("1.5", header_api_version) && !this.isBrowser) {
yield* this.fetchAndParseBatchedRecords(
entry,
id,
continueQuery,
poolInterval,
head,
);
} else {
yield* this.fetchAndParseSingleRecord(
entry,
id,
continueQuery,
poolInterval,
head,
);
}
return {
continuous: continueQuery,
pollInterval: poolInterval,
head: head,
query: params.join("&"),
};
}

private async *fetchAndParseSingleRecord(
entry: string,
id: string,
continueQuery: boolean,
poolInterval: number,
pollInterval: number,
head: boolean,
) {
while (true) {
Expand All @@ -320,7 +374,7 @@ export class Bucket {
if (e instanceof APIError && e.status === 204) {
if (continueQuery) {
await new Promise((resolve) =>
setTimeout(resolve, poolInterval * 1000),
setTimeout(resolve, pollInterval * 1000),
);
continue;
}
Expand Down
Loading