Skip to content

Commit

Permalink
create and refactor iterator utils
Browse files Browse the repository at this point in the history
  • Loading branch information
tasshi-me committed Feb 15, 2023
1 parent 8e3a726 commit 0336314
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 86 deletions.
31 changes: 2 additions & 29 deletions src/record/import/repositories/parsers/parseCsv/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { convertSubtableField, subtableFieldReader } from "./subtable";
import { PRIMARY_MARK } from "./constants";
import type csvParse from "csv-parse";
import { Readable } from "stream";
import { withIndex, withNext } from "../../../utils/iterator";

type RecordCsv = {
rows: CsvRow[];
Expand Down Expand Up @@ -53,9 +54,7 @@ export async function* recordReader(
}

const stream = unshiftToStream(csvStream, firstRow);
const generator = withIndexIterator(
withNextIterator<CsvRow>(stream[Symbol.asyncIterator]())
);
const generator = withIndex(withNext<CsvRow>(stream[Symbol.asyncIterator]()));

if (!hasSubtable(firstRow)) {
for await (const {
Expand Down Expand Up @@ -104,29 +103,3 @@ const unshiftToStream = (stream: Readable, element: unknown) =>
yield* stream;
})()
);

// eslint-disable-next-line func-style
async function* withNextIterator<T = unknown>(
source: AsyncIterableIterator<T> | AsyncGenerator<T>
): AsyncGenerator<{ current: T; next?: T }> {
let { value: prev, done } = await source.next();
if (done) {
return;
}
for await (const value of source) {
yield { current: prev, next: value };
prev = value;
}
yield { current: prev, next: undefined };
}

// eslint-disable-next-line func-style
async function* withIndexIterator<T = unknown>(
source: AsyncIterableIterator<T> | AsyncGenerator<T>
): AsyncGenerator<{ data: T; index: number }> {
let index = 0;
for await (const value of source) {
yield { data: value, index };
index++;
}
}
24 changes: 5 additions & 19 deletions src/record/import/usecases/add.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AddRecordsError } from "./add/error";
import { logger } from "../../../utils/log";
import { ProgressLogger } from "./add/progress";
import type { LocalRecordRepository } from "./interface";
import { chunked } from "../utils/iterator";

const CHUNK_SIZE = 2000;

Expand Down Expand Up @@ -35,7 +36,10 @@ export const addRecords: (
);
try {
logger.info("Starting to import records...");
for await (const recordsByChunk of recordsReader(recordSource)) {
for await (const recordsByChunk of chunked(
recordSource.reader(),
CHUNK_SIZE
)) {
currentRecords = recordsByChunk;
const recordsToUpload = await convertRecordsToApiRequestParameter(
apiClient,
Expand Down Expand Up @@ -89,21 +93,3 @@ const convertRecordsToApiRequestParameter = async (
}
return kintoneRecords;
};

// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/Arrow_functions#use_of_the_yield_keyword
// eslint-disable-next-line func-style
async function* recordsReader(
localRecordReader: LocalRecordRepository
): AsyncGenerator<LocalRecord[], void, undefined> {
let records = [];
for await (const localRecord of localRecordReader.reader()) {
records.push(localRecord);
if (records.length >= CHUNK_SIZE) {
yield records;
records = [];
}
}
if (records.length > 0) {
yield records;
}
}
50 changes: 12 additions & 38 deletions src/record/import/usecases/upsert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { UpsertRecordsError } from "./upsert/error";
import { logger } from "../../../utils/log";
import { ProgressLogger } from "./add/progress";
import type { LocalRecordRepository } from "./interface";
import { groupByKeyChunked } from "../utils/iterator";

const CHUNK_SIZE = 2000;

Expand Down Expand Up @@ -44,14 +45,18 @@ export const upsertRecords = async (
await updateKey.validateUpdateKeyInRecords(recordSource);

logger.info("Starting to import records...");
for await (const recordsByChunk of recordReader(recordSource, updateKey)) {
currentRecords = recordsByChunk.records;

if (recordsByChunk.type === "update") {
for await (const recordsByChunk of groupByKeyChunked(
recordSource.reader(),
(record) => (updateKey.isUpdate(record) ? "update" : "add"),
CHUNK_SIZE
)) {
currentRecords = recordsByChunk.data;

if (recordsByChunk.key === "update") {
const recordsToUpload = await convertToKintoneRecordForUpdate(
apiClient,
app,
recordsByChunk.records,
recordsByChunk.data,
schema,
updateKey,
{ attachmentsDir, skipMissingFields }
Expand All @@ -64,7 +69,7 @@ export const upsertRecords = async (
const recordsToUpload = await convertToKintoneRecordForAdd(
apiClient,
app,
recordsByChunk.records,
recordsByChunk.data,
schema,
updateKey,
{ attachmentsDir, skipMissingFields }
Expand All @@ -74,7 +79,7 @@ export const upsertRecords = async (
records: recordsToUpload,
});
}
currentIndex += recordsByChunk.records.length;
currentIndex += recordsByChunk.data.length;
progressLogger.update(currentIndex);
}
progressLogger.done();
Expand Down Expand Up @@ -166,34 +171,3 @@ const convertToKintoneRecordForAdd = async (

return kintoneRecords;
};

// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/Arrow_functions#use_of_the_yield_keyword
// eslint-disable-next-line func-style
async function* recordReader(
localRecordReader: LocalRecordRepository,
updateKey: UpdateKey
): AsyncGenerator<
{ type: "add" | "update"; records: LocalRecord[] },
void,
undefined
> {
let records: LocalRecord[] = [];
let isUpdate: boolean | undefined;

for await (const localRecord of localRecordReader.reader()) {
const isUpdateCurrent = updateKey.isUpdate(localRecord);
if (isUpdate === undefined) {
isUpdate = isUpdateCurrent;
}
if (isUpdateCurrent === isUpdate && records.length < CHUNK_SIZE) {
records.push(localRecord);
} else {
yield { type: isUpdate ? "update" : "add", records: records };
records = [localRecord];
isUpdate = isUpdateCurrent;
}
}
if (records.length > 0) {
yield { type: isUpdate ? "update" : "add", records: records };
}
}
187 changes: 187 additions & 0 deletions src/record/import/utils/__tests__/iterator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import {
chunked,
groupByKey,
groupByKeyChunked,
iterToAsyncIer,
withIndex,
withNext,
} from "../iterator";
import { Readable } from "stream";

const arrayToAsyncIter = <T = unknown>(arr: T[]): AsyncIterableIterator<T> =>
iterToAsyncIer(arr[Symbol.iterator]());

describe("chunked", () => {
it("can separate data source by chunk", async () => {
const source = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const expected = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]];
const actual = [];
for await (const chunk of chunked(arrayToAsyncIter(source), 3)) {
actual.push(chunk);
}
expect(actual).toStrictEqual(expected);
});
});

describe("groupByKey", () => {
it("can separate date source by group", async () => {
const source = [
{ name: "banana", type: "fruit" },
{ name: "orange", type: "fruit" },
{ name: "tomato", type: "vegetables" },
{ name: "carrot", type: "vegetables" },
{ name: "asparagus", type: "vegetables" },
{ name: "beef", type: "meat" },
{ name: "cherries", type: "fruit" },
{ name: "pork", type: "meat" },
{ name: "chicken", type: "meat" },
];
const expected = [
{
key: "fruit",
data: [
{ name: "banana", type: "fruit" },
{ name: "orange", type: "fruit" },
],
},
{
key: "vegetables",
data: [
{ name: "tomato", type: "vegetables" },
{ name: "carrot", type: "vegetables" },
{ name: "asparagus", type: "vegetables" },
],
},
{
key: "meat",
data: [{ name: "beef", type: "meat" }],
},
{
key: "fruit",
data: [{ name: "cherries", type: "fruit" }],
},
{
key: "meat",
data: [
{ name: "pork", type: "meat" },
{ name: "chicken", type: "meat" },
],
},
];
const actual = [];
for await (const chunk of groupByKey(
arrayToAsyncIter(source),
(el) => el.type
)) {
actual.push(chunk);
}
expect(actual).toStrictEqual(expected);
});
});

describe("groupByKeyChunked", () => {
it("can separate date source by group and chunked", async () => {
const chunkSize = 2;
const source = [
{ name: "banana", type: "fruit" },
{ name: "orange", type: "fruit" },
{ name: "tomato", type: "vegetables" },
{ name: "carrot", type: "vegetables" },
{ name: "asparagus", type: "vegetables" },
{ name: "beef", type: "meat" },
{ name: "cherries", type: "fruit" },
{ name: "pork", type: "meat" },
{ name: "chicken", type: "meat" },
];
const expected = [
{
key: "fruit",
data: [
{ name: "banana", type: "fruit" },
{ name: "orange", type: "fruit" },
],
},
{
key: "vegetables",
data: [
{ name: "tomato", type: "vegetables" },
{ name: "carrot", type: "vegetables" },
],
},
{
key: "vegetables",
data: [{ name: "asparagus", type: "vegetables" }],
},
{
key: "meat",
data: [{ name: "beef", type: "meat" }],
},
{
key: "fruit",
data: [{ name: "cherries", type: "fruit" }],
},
{
key: "meat",
data: [
{ name: "pork", type: "meat" },
{ name: "chicken", type: "meat" },
],
},
];
const actual = [];
for await (const chunk of groupByKeyChunked(
arrayToAsyncIter(source),
(el) => el.type,
chunkSize
)) {
actual.push(chunk);
}
expect(actual).toStrictEqual(expected);
});
});

describe("withNextIterator", () => {
it("can iterate from data source with next value", async () => {
const source = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const expected = [
{ current: 1, next: 2 },
{ current: 2, next: 3 },
{ current: 3, next: 4 },
{ current: 4, next: 5 },
{ current: 5, next: 6 },
{ current: 6, next: 7 },
{ current: 7, next: 8 },
{ current: 8, next: 9 },
{ current: 9, next: 10 },
{ current: 10, next: undefined },
];
const actual = [];
for await (const data of withNext(arrayToAsyncIter(source))) {
actual.push(data);
}
expect(actual).toStrictEqual(expected);
});
});

describe("withIndexIterator", () => {
it("can iterate from data source with index", async () => {
const source = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const expected = [
{ data: 1, index: 0 },
{ data: 2, index: 1 },
{ data: 3, index: 2 },
{ data: 4, index: 3 },
{ data: 5, index: 4 },
{ data: 6, index: 5 },
{ data: 7, index: 6 },
{ data: 8, index: 7 },
{ data: 9, index: 8 },
{ data: 10, index: 9 },
];
const actual = [];
for await (const data of withIndex(arrayToAsyncIter(source))) {
actual.push(data);
}
expect(actual).toStrictEqual(expected);
});
});
Loading

0 comments on commit 0336314

Please sign in to comment.