From 0336314191f5c27bba1527760000e8b94c32953d Mon Sep 17 00:00:00 2001 From: Masaharu TASHIRO Date: Wed, 15 Feb 2023 18:51:20 +0900 Subject: [PATCH] create and refactor iterator utils --- .../repositories/parsers/parseCsv/record.ts | 31 +-- src/record/import/usecases/add.ts | 24 +-- src/record/import/usecases/upsert.ts | 50 ++--- .../import/utils/__tests__/iterator.test.ts | 187 ++++++++++++++++++ src/record/import/utils/iterator.ts | 87 ++++++++ 5 files changed, 293 insertions(+), 86 deletions(-) create mode 100644 src/record/import/utils/__tests__/iterator.test.ts create mode 100644 src/record/import/utils/iterator.ts diff --git a/src/record/import/repositories/parsers/parseCsv/record.ts b/src/record/import/repositories/parsers/parseCsv/record.ts index 3f495c5b9b..2c96b48060 100644 --- a/src/record/import/repositories/parsers/parseCsv/record.ts +++ b/src/record/import/repositories/parsers/parseCsv/record.ts @@ -7,6 +7,7 @@ import { convertSubtableField, subtableFieldReader } from "./subtable"; import { PRIMARY_MARK } from "./constants"; import type csvParse from "csv-parse"; import { Readable } from "stream"; +import { withIndex, withNext } from "../../../utils/iterator"; type RecordCsv = { rows: CsvRow[]; @@ -53,9 +54,7 @@ export async function* recordReader( } const stream = unshiftToStream(csvStream, firstRow); - const generator = withIndexIterator( - withNextIterator(stream[Symbol.asyncIterator]()) - ); + const generator = withIndex(withNext(stream[Symbol.asyncIterator]())); if (!hasSubtable(firstRow)) { for await (const { @@ -104,29 +103,3 @@ const unshiftToStream = (stream: Readable, element: unknown) => yield* stream; })() ); - -// eslint-disable-next-line func-style -async function* withNextIterator( - source: AsyncIterableIterator | AsyncGenerator -): AsyncGenerator<{ current: T; next?: T }> { - let { value: prev, done } = await source.next(); - if (done) { - return; - } - for await (const value of source) { - yield { current: prev, next: value }; - prev = value; - } - yield { current: prev, next: undefined }; -} - -// eslint-disable-next-line func-style -async function* withIndexIterator( - source: AsyncIterableIterator | AsyncGenerator -): AsyncGenerator<{ data: T; index: number }> { - let index = 0; - for await (const value of source) { - yield { data: value, index }; - index++; - } -} diff --git a/src/record/import/usecases/add.ts b/src/record/import/usecases/add.ts index a5b1ea1ba2..f0489632af 100644 --- a/src/record/import/usecases/add.ts +++ b/src/record/import/usecases/add.ts @@ -8,6 +8,7 @@ import { AddRecordsError } from "./add/error"; import { logger } from "../../../utils/log"; import { ProgressLogger } from "./add/progress"; import type { LocalRecordRepository } from "./interface"; +import { chunked } from "../utils/iterator"; const CHUNK_SIZE = 2000; @@ -35,7 +36,10 @@ export const addRecords: ( ); try { logger.info("Starting to import records..."); - for await (const recordsByChunk of recordsReader(recordSource)) { + for await (const recordsByChunk of chunked( + recordSource.reader(), + CHUNK_SIZE + )) { currentRecords = recordsByChunk; const recordsToUpload = await convertRecordsToApiRequestParameter( apiClient, @@ -89,21 +93,3 @@ const convertRecordsToApiRequestParameter = async ( } return kintoneRecords; }; - -// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/Arrow_functions#use_of_the_yield_keyword -// eslint-disable-next-line func-style -async function* recordsReader( - localRecordReader: LocalRecordRepository -): AsyncGenerator { - let records = []; - for await (const localRecord of localRecordReader.reader()) { - records.push(localRecord); - if (records.length >= CHUNK_SIZE) { - yield records; - records = []; - } - } - if (records.length > 0) { - yield records; - } -} diff --git a/src/record/import/usecases/upsert.ts b/src/record/import/usecases/upsert.ts index 0b7d814666..ad87234ba8 100644 --- a/src/record/import/usecases/upsert.ts +++ b/src/record/import/usecases/upsert.ts @@ -12,6 +12,7 @@ import { UpsertRecordsError } from "./upsert/error"; import { logger } from "../../../utils/log"; import { ProgressLogger } from "./add/progress"; import type { LocalRecordRepository } from "./interface"; +import { groupByKeyChunked } from "../utils/iterator"; const CHUNK_SIZE = 2000; @@ -44,14 +45,18 @@ export const upsertRecords = async ( await updateKey.validateUpdateKeyInRecords(recordSource); logger.info("Starting to import records..."); - for await (const recordsByChunk of recordReader(recordSource, updateKey)) { - currentRecords = recordsByChunk.records; - - if (recordsByChunk.type === "update") { + for await (const recordsByChunk of groupByKeyChunked( + recordSource.reader(), + (record) => (updateKey.isUpdate(record) ? "update" : "add"), + CHUNK_SIZE + )) { + currentRecords = recordsByChunk.data; + + if (recordsByChunk.key === "update") { const recordsToUpload = await convertToKintoneRecordForUpdate( apiClient, app, - recordsByChunk.records, + recordsByChunk.data, schema, updateKey, { attachmentsDir, skipMissingFields } @@ -64,7 +69,7 @@ export const upsertRecords = async ( const recordsToUpload = await convertToKintoneRecordForAdd( apiClient, app, - recordsByChunk.records, + recordsByChunk.data, schema, updateKey, { attachmentsDir, skipMissingFields } @@ -74,7 +79,7 @@ export const upsertRecords = async ( records: recordsToUpload, }); } - currentIndex += recordsByChunk.records.length; + currentIndex += recordsByChunk.data.length; progressLogger.update(currentIndex); } progressLogger.done(); @@ -166,34 +171,3 @@ const convertToKintoneRecordForAdd = async ( return kintoneRecords; }; - -// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/Arrow_functions#use_of_the_yield_keyword -// eslint-disable-next-line func-style -async function* recordReader( - localRecordReader: LocalRecordRepository, - updateKey: UpdateKey -): AsyncGenerator< - { type: "add" | "update"; records: LocalRecord[] }, - void, - undefined -> { - let records: LocalRecord[] = []; - let isUpdate: boolean | undefined; - - for await (const localRecord of localRecordReader.reader()) { - const isUpdateCurrent = updateKey.isUpdate(localRecord); - if (isUpdate === undefined) { - isUpdate = isUpdateCurrent; - } - if (isUpdateCurrent === isUpdate && records.length < CHUNK_SIZE) { - records.push(localRecord); - } else { - yield { type: isUpdate ? "update" : "add", records: records }; - records = [localRecord]; - isUpdate = isUpdateCurrent; - } - } - if (records.length > 0) { - yield { type: isUpdate ? "update" : "add", records: records }; - } -} diff --git a/src/record/import/utils/__tests__/iterator.test.ts b/src/record/import/utils/__tests__/iterator.test.ts new file mode 100644 index 0000000000..4d8eced8e4 --- /dev/null +++ b/src/record/import/utils/__tests__/iterator.test.ts @@ -0,0 +1,187 @@ +import { + chunked, + groupByKey, + groupByKeyChunked, + iterToAsyncIer, + withIndex, + withNext, +} from "../iterator"; +import { Readable } from "stream"; + +const arrayToAsyncIter = (arr: T[]): AsyncIterableIterator => + iterToAsyncIer(arr[Symbol.iterator]()); + +describe("chunked", () => { + it("can separate data source by chunk", async () => { + const source = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + const expected = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]]; + const actual = []; + for await (const chunk of chunked(arrayToAsyncIter(source), 3)) { + actual.push(chunk); + } + expect(actual).toStrictEqual(expected); + }); +}); + +describe("groupByKey", () => { + it("can separate date source by group", async () => { + const source = [ + { name: "banana", type: "fruit" }, + { name: "orange", type: "fruit" }, + { name: "tomato", type: "vegetables" }, + { name: "carrot", type: "vegetables" }, + { name: "asparagus", type: "vegetables" }, + { name: "beef", type: "meat" }, + { name: "cherries", type: "fruit" }, + { name: "pork", type: "meat" }, + { name: "chicken", type: "meat" }, + ]; + const expected = [ + { + key: "fruit", + data: [ + { name: "banana", type: "fruit" }, + { name: "orange", type: "fruit" }, + ], + }, + { + key: "vegetables", + data: [ + { name: "tomato", type: "vegetables" }, + { name: "carrot", type: "vegetables" }, + { name: "asparagus", type: "vegetables" }, + ], + }, + { + key: "meat", + data: [{ name: "beef", type: "meat" }], + }, + { + key: "fruit", + data: [{ name: "cherries", type: "fruit" }], + }, + { + key: "meat", + data: [ + { name: "pork", type: "meat" }, + { name: "chicken", type: "meat" }, + ], + }, + ]; + const actual = []; + for await (const chunk of groupByKey( + arrayToAsyncIter(source), + (el) => el.type + )) { + actual.push(chunk); + } + expect(actual).toStrictEqual(expected); + }); +}); + +describe("groupByKeyChunked", () => { + it("can separate date source by group and chunked", async () => { + const chunkSize = 2; + const source = [ + { name: "banana", type: "fruit" }, + { name: "orange", type: "fruit" }, + { name: "tomato", type: "vegetables" }, + { name: "carrot", type: "vegetables" }, + { name: "asparagus", type: "vegetables" }, + { name: "beef", type: "meat" }, + { name: "cherries", type: "fruit" }, + { name: "pork", type: "meat" }, + { name: "chicken", type: "meat" }, + ]; + const expected = [ + { + key: "fruit", + data: [ + { name: "banana", type: "fruit" }, + { name: "orange", type: "fruit" }, + ], + }, + { + key: "vegetables", + data: [ + { name: "tomato", type: "vegetables" }, + { name: "carrot", type: "vegetables" }, + ], + }, + { + key: "vegetables", + data: [{ name: "asparagus", type: "vegetables" }], + }, + { + key: "meat", + data: [{ name: "beef", type: "meat" }], + }, + { + key: "fruit", + data: [{ name: "cherries", type: "fruit" }], + }, + { + key: "meat", + data: [ + { name: "pork", type: "meat" }, + { name: "chicken", type: "meat" }, + ], + }, + ]; + const actual = []; + for await (const chunk of groupByKeyChunked( + arrayToAsyncIter(source), + (el) => el.type, + chunkSize + )) { + actual.push(chunk); + } + expect(actual).toStrictEqual(expected); + }); +}); + +describe("withNextIterator", () => { + it("can iterate from data source with next value", async () => { + const source = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + const expected = [ + { current: 1, next: 2 }, + { current: 2, next: 3 }, + { current: 3, next: 4 }, + { current: 4, next: 5 }, + { current: 5, next: 6 }, + { current: 6, next: 7 }, + { current: 7, next: 8 }, + { current: 8, next: 9 }, + { current: 9, next: 10 }, + { current: 10, next: undefined }, + ]; + const actual = []; + for await (const data of withNext(arrayToAsyncIter(source))) { + actual.push(data); + } + expect(actual).toStrictEqual(expected); + }); +}); + +describe("withIndexIterator", () => { + it("can iterate from data source with index", async () => { + const source = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + const expected = [ + { data: 1, index: 0 }, + { data: 2, index: 1 }, + { data: 3, index: 2 }, + { data: 4, index: 3 }, + { data: 5, index: 4 }, + { data: 6, index: 5 }, + { data: 7, index: 6 }, + { data: 8, index: 7 }, + { data: 9, index: 8 }, + { data: 10, index: 9 }, + ]; + const actual = []; + for await (const data of withIndex(arrayToAsyncIter(source))) { + actual.push(data); + } + expect(actual).toStrictEqual(expected); + }); +}); diff --git a/src/record/import/utils/iterator.ts b/src/record/import/utils/iterator.ts new file mode 100644 index 0000000000..1bdc24517d --- /dev/null +++ b/src/record/import/utils/iterator.ts @@ -0,0 +1,87 @@ +type Iterators = IterableIterator | Generator; +type AsyncIterators = AsyncIterableIterator | AsyncGenerator; + +// eslint-disable-next-line func-style +export async function* iterToAsyncIer( + source: Iterators +): AsyncIterators { + for (const element of source) { + yield element; + } +} + +// eslint-disable-next-line func-style +export async function* chunked( + source: AsyncIterators, + size: number +): AsyncGenerator { + let chunk = []; + for await (const element of source) { + chunk.push(element); + if (chunk.length >= size) { + yield chunk; + chunk = []; + } + } + if (chunk.length > 0) { + yield chunk; + } +} + +// eslint-disable-next-line func-style +export async function* groupByKey( + source: AsyncIterableIterator | AsyncGenerator, + keyFn: (element: T) => K +): AsyncGenerator<{ key: K; data: T[] }, void, undefined> { + let array = []; + for await (const { current, next } of withNext(source)) { + array.push(current); + const currentKey = keyFn(current); + if (next === undefined || currentKey !== keyFn(next)) { + yield { key: currentKey, data: array }; + array = []; + } + } +} + +// eslint-disable-next-line func-style +export async function* groupByKeyChunked( + source: AsyncIterators, + keyFn: (element: T) => K, + size: number +): AsyncGenerator<{ key: K; data: T[] }, void, undefined> { + for await (const { key, data } of groupByKey(source, keyFn)) { + for await (const chunk of chunked( + iterToAsyncIer(data[Symbol.iterator]()), + size + )) { + yield { key, data: chunk }; + } + } +} + +// eslint-disable-next-line func-style +export async function* withNext( + source: AsyncIterators +): AsyncGenerator<{ current: T; next?: T }> { + let { value: prev, done } = await source.next(); + if (done) { + return; + } + for await (const value of source) { + yield { current: prev, next: value }; + prev = value; + } + yield { current: prev, next: undefined }; +} + +// eslint-disable-next-line func-style +export async function* withIndex( + source: AsyncIterators +): AsyncGenerator<{ data: T; index: number }> { + let index = 0; + for await (const value of source) { + yield { data: value, index }; + index++; + } +}