Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve memory usage of record import #218

Merged
merged 27 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
311bf75
wip
tasshi-me Dec 21, 2022
aea3a4c
Revert "wip"
tasshi-me Feb 1, 2023
141c544
iterative CSV parser
tasshi-me Feb 4, 2023
53935f7
initialize stream when reader called
tasshi-me Feb 11, 2023
3829ac2
fix addRecords
tasshi-me Feb 11, 2023
0cbc06f
fix upsertRecords
tasshi-me Feb 11, 2023
18705f7
debug
tasshi-me Feb 11, 2023
980de5d
Merge branch 'main' into feat/improve-memory-usage-of-import
tasshi-me Feb 14, 2023
7a77a54
Revert "debug"
tasshi-me Feb 14, 2023
8cede92
propagate error between steams
tasshi-me Feb 14, 2023
28a002d
remove recordIndex from LocalRecord
tasshi-me Feb 14, 2023
19652c8
apply review: don't use generics for NodeJS.ReadableStream because it…
tasshi-me Feb 15, 2023
fe1bbd6
separate asyncGeneratorWithIndexAndNextValueFromStream to two helpers
tasshi-me Feb 15, 2023
8e3a726
remove unnecessary async
tasshi-me Feb 15, 2023
0336314
create and refactor iterator utils
tasshi-me Feb 15, 2023
3f446be
add types to fields of LocalRecordRepository impls
tasshi-me Feb 15, 2023
8968307
remove debug console.log
tasshi-me Feb 15, 2023
341b77d
remove file format validation from `openFsStreamWithEncode()`
tasshi-me Feb 15, 2023
fb0b2ea
fix typo
tasshi-me Feb 15, 2023
262505d
remove unused imports
tasshi-me Feb 15, 2023
68b10e5
remove unnecessary callback
tasshi-me Feb 16, 2023
70cb9bf
remove unnecessary input from LocalRecordRepositoryMock
tasshi-me Feb 16, 2023
d8b6abb
use `withIndex()` on validateUpdateKeyInRecords
tasshi-me Feb 16, 2023
c2f3133
remove `unknow` as the default type of generics
tasshi-me Feb 16, 2023
ec4882d
Merge remote-tracking branch 'origin/main' into feat/improve-memory-u…
tasshi-me Feb 21, 2023
e5721db
move iterator utils to common directory
tasshi-me Feb 21, 2023
9d44cb7
remove unused import
tasshi-me Feb 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions src/record/import/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type { RestAPIClientOptions } from "../../kintone/client";
import { buildRestAPIClient } from "../../kintone/client";
import type { SupportedImportEncoding } from "./utils/file";
import { readFile } from "./utils/file";
import { parseRecords } from "./parsers";
import { extractFileFormat, openFsStreamWithEncode } from "./utils/file";
import { addRecords } from "./usecases/add";
import { upsertRecords } from "./usecases/upsert";
import { createSchema } from "./schema";
import { noop as defaultTransformer } from "./schema/transformers/noop";
import { userSelected } from "./schema/transformers/userSelected";
import { logger } from "../../utils/log";
import { LocalRecordRepositoryFromStream } from "./repositories/localRecordRepositoryFromStream";

export type Options = {
app: string;
Expand Down Expand Up @@ -42,24 +42,33 @@ export const run: (
? userSelected(fields, fieldsJson, updateKey)
: defaultTransformer()
);
const { content, format } = await readFile(filePath, encoding);
const records = await parseRecords({
source: content,
const format = extractFileFormat(filePath);
const localRecordRepository = new LocalRecordRepositoryFromStream(
() => openFsStreamWithEncode(filePath, encoding),
format,
schema,
});
if (records.length === 0) {
schema
);

if ((await localRecordRepository.length()) === 0) {
logger.warn("The input file does not have any records");
return;
}

const skipMissingFields = !fields;
if (updateKey) {
await upsertRecords(apiClient, app, records, schema, updateKey, {
attachmentsDir,
skipMissingFields,
});
await upsertRecords(
apiClient,
app,
localRecordRepository,
schema,
updateKey,
{
attachmentsDir,
skipMissingFields,
}
);
} else {
await addRecords(apiClient, app, records, schema, {
await addRecords(apiClient, app, localRecordRepository, schema, {
attachmentsDir,
skipMissingFields,
});
Expand Down
19 changes: 0 additions & 19 deletions src/record/import/parsers/index.ts

This file was deleted.

This file was deleted.

This file was deleted.

33 changes: 0 additions & 33 deletions src/record/import/parsers/parseCsv/index.ts

This file was deleted.

85 changes: 0 additions & 85 deletions src/record/import/parsers/parseCsv/record.ts

This file was deleted.

30 changes: 30 additions & 0 deletions src/record/import/repositories/error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { ParserError } from "./parsers/error";

export class RepositoryError extends Error {
private readonly cause: unknown;

constructor(cause: unknown) {
const message =
"An error occurred while loading records from the data source";
super(message);

this.name = "RepositoryError";
this.message = message;
this.cause = cause;

// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
// Set the prototype explicitly.
Object.setPrototypeOf(this, RepositoryError.prototype);
}

toString(): string {
let errorMessage = "";
errorMessage += this.message + "\n";
if (this.cause instanceof ParserError) {
tasshi-me marked this conversation as resolved.
Show resolved Hide resolved
errorMessage += this.cause.toString();
} else {
errorMessage += this.cause + "\n";
}
return errorMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { LocalRecordRepository } from "../usecases/interface";
import type { RecordSchema } from "../types/schema";
import { RepositoryError } from "./error";
import { countRecordsFromCsv, csvReader } from "./parsers/parseCsv";
import { number } from "yargs";
import type { LocalRecord } from "../types/record";

export class LocalRecordRepositoryFromStream implements LocalRecordRepository {
readonly format: string;
readonly length: () => Promise<number>;

readonly reader: () => AsyncGenerator<LocalRecord, void, undefined>;

constructor(
source: () => NodeJS.ReadableStream,
format: string,
schema: RecordSchema
) {
this.format = format;
this.length = () => countRecordsFromCsv(source());

switch (format) {
case "csv":
this.reader = () => csvReader(source, schema);
break;
default:
throw new RepositoryError(
`Unexpected file type: ${format} is unacceptable.`
);
}
}
}
25 changes: 25 additions & 0 deletions src/record/import/repositories/localRecordRepositoryMock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { LocalRecordRepository } from "../usecases/interface";
import type { RecordSchema } from "../types/schema";
import type { LocalRecord } from "../types/record";
import { Readable } from "stream";
import { number } from "yargs";

export class LocalRecordRepositoryMock implements LocalRecordRepository {
readonly format: string;
readonly length: () => Promise<number>;

readonly reader: () => AsyncGenerator<LocalRecord, void, undefined>;

constructor(source: LocalRecord[], format: string, length: number) {
tasshi-me marked this conversation as resolved.
Show resolved Hide resolved
this.format = format;
this.length = async () => length;
this.reader = () => asyncGeneratorFromStream(source);
}
}

// eslint-disable-next-line func-style
async function* asyncGeneratorFromStream(
source: LocalRecord[]
): AsyncGenerator<LocalRecord, void, undefined> {
yield* source;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type * as Fields from "../../../types/field";
import type { FieldSchema } from "../../../types/schema";
import type * as Fields from "../../../../types/field";
import type { FieldSchema } from "../../../../types/schema";

import { convertFieldValue } from "../fieldValue";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import type { LocalRecord } from "../../../../../../types/record";

export const expected: LocalRecord[] = [];
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { RecordSchema } from "../../../../../types/schema";
import type { RecordSchema } from "../../../../../../types/schema";

export const schema: RecordSchema = {
fields: [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { KintoneRecord } from "../../../../../types/record";
import type { LocalRecord } from "../../../../../../types/record";

export const expected: KintoneRecord[] = [
export const expected: LocalRecord[] = [
{
data: {
singleLineText: {
Expand All @@ -13,7 +13,9 @@ export const expected: KintoneRecord[] = [
value: ['"sample3"', "sample4,sample5"],
},
},
metadata: { format: { type: "csv", firstRowIndex: 1, lastRowIndex: 1 } },
metadata: {
format: { type: "csv", firstRowIndex: 1, lastRowIndex: 1 },
},
},
{
data: {
Expand All @@ -27,6 +29,8 @@ export const expected: KintoneRecord[] = [
value: ['"sample4"', "sample5,sample6"],
},
},
metadata: { format: { type: "csv", firstRowIndex: 2, lastRowIndex: 2 } },
metadata: {
format: { type: "csv", firstRowIndex: 2, lastRowIndex: 2 },
},
},
];
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { RecordSchema } from "../../../../../types/schema";
import type { RecordSchema } from "../../../../../../types/schema";

export const schema: RecordSchema = {
fields: [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { KintoneRecord } from "../../../../../types/record";
import type { LocalRecord } from "../../../../../../types/record";

export const expected: KintoneRecord[] = [
export const expected: LocalRecord[] = [
{
data: {
singleLineText: {
Expand All @@ -13,7 +13,9 @@ export const expected: KintoneRecord[] = [
value: ['"sample3"', "sample4,sample5"],
},
},
metadata: { format: { type: "csv", firstRowIndex: 1, lastRowIndex: 1 } },
metadata: {
format: { type: "csv", firstRowIndex: 1, lastRowIndex: 1 },
},
},
{
data: {
Expand All @@ -27,6 +29,8 @@ export const expected: KintoneRecord[] = [
value: ['"sample4"', "sample5,sample6"],
},
},
metadata: { format: { type: "csv", firstRowIndex: 2, lastRowIndex: 2 } },
metadata: {
format: { type: "csv", firstRowIndex: 2, lastRowIndex: 2 },
},
},
];
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { RecordSchema } from "../../../../../types/schema";
import type { RecordSchema } from "../../../../../../types/schema";

export const schema: RecordSchema = {
fields: [
Expand Down
Loading