Skip to content

Commit

Permalink
initialize stream when reader called
Browse files Browse the repository at this point in the history
  • Loading branch information
tasshi-me committed Feb 11, 2023
1 parent 141c544 commit 53935f7
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 42 deletions.
10 changes: 5 additions & 5 deletions src/record/import/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { createSchema } from "./schema";
import { noop as defaultTransformer } from "./schema/transformers/noop";
import { userSelected } from "./schema/transformers/userSelected";
import { logger } from "../../utils/log";
import { buildLocalRecordRepositoryFromStream } from "./repositories";
import { LocalRecordRepositoryByStream } from "./repositories";

export type Options = {
app: string;
Expand Down Expand Up @@ -42,14 +42,14 @@ export const run: (
? userSelected(fields, fieldsJson, updateKey)
: defaultTransformer()
);
const { stream, format } = openFsStreamWithEncode(filePath, encoding);
const localRecordRepository = buildLocalRecordRepositoryFromStream(
stream,
const { format } = openFsStreamWithEncode(filePath, encoding);
const localRecordRepository = new LocalRecordRepositoryByStream(
() => openFsStreamWithEncode(filePath, encoding).stream,
format,
schema
);

if (localRecordRepository.length === 0) {
if ((await localRecordRepository.length()) === 0) {
logger.warn("The input file does not have any records");
return;
}
Expand Down
51 changes: 22 additions & 29 deletions src/record/import/repositories/index.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,27 @@
import type { LocalRecordRepository } from "../usecases/interface";
import type { RecordSchema } from "../types/schema";
import { RepositoryError } from "./error";
import { csvParser } from "./parsers/parseCsv";
import { countRecordsFromCsv, csvReader } from "./parsers/parseCsv";

export const buildLocalRecordRepositoryFromStream = <
T extends NodeJS.ReadableStream
>(
source: T,
format: string,
schema: RecordSchema
): LocalRecordRepository => {
switch (format) {
case "csv":
return buildLocalRecordRepositoryFromCsvStream(source, schema);
default:
throw new RepositoryError(
`Unexpected file type: ${format} is unacceptable.`
);
}
};
export class LocalRecordRepositoryByStream<T extends NodeJS.ReadableStream>
implements LocalRecordRepository
{
readonly format;
readonly length;
readonly reader;

constructor(source: () => T, format: string, schema: RecordSchema) {
this.format = format;
this.length = () => countRecordsFromCsv(source());

const buildLocalRecordRepositoryFromCsvStream = <
T extends NodeJS.ReadableStream
>(
source: T,
schema: RecordSchema
): LocalRecordRepository => {
return {
length: 0, // TODO
format: "csv",
reader: () => csvParser(source, schema),
};
};
switch (format) {
case "csv":
this.reader = () => csvReader(source, schema);
break;
default:
throw new RepositoryError(
`Unexpected file type: ${format} is unacceptable.`
);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { LocalRecord } from "../../../../types/record";
import type { RecordSchema } from "../../../../types/schema";

import { csvParser } from "../index";
import { csvReader } from "../index";

import { pattern as withoutSubtable } from "./fixtures/withoutSubtable";
import { pattern as withSubtable } from "./fixtures/withSubtable";
Expand Down Expand Up @@ -33,7 +33,7 @@ describe("parseCsv", () => {
];
it.each(patterns)("$description", async (pattern) => {
const records = [];
for await (const record of csvParser(
for await (const record of csvReader(
Readable.from(pattern.input),
pattern.schema
)) {
Expand Down
29 changes: 26 additions & 3 deletions src/record/import/repositories/parsers/parseCsv/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import type { LocalRecordRepository } from "../../../usecases/interface";

// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Functions/Arrow_functions#use_of_the_yield_keyword
// eslint-disable-next-line func-style
export async function* csvParser<T extends NodeJS.ReadableStream>(
source: T,
export async function* csvReader<T extends NodeJS.ReadableStream>(
source: () => T,
schema: RecordSchema
): ReturnType<LocalRecordRepository["reader"]> {
try {
const csvStream = source.pipe(
const csvStream = source().pipe(
csvParse({
columns: true,
skip_empty_lines: true,
Expand All @@ -33,3 +33,26 @@ export async function* csvParser<T extends NodeJS.ReadableStream>(
throw new ParserError(e);
}
}

export const countRecordsFromCsv = async <T extends NodeJS.ReadableStream>(
source: T
): Promise<number> => {
try {
const csvStream = source.pipe(
csvParse({
columns: true,
skip_empty_lines: true,
delimiter: SEPARATOR,
})
);

let count = 0;
for await (const recordRows of recordReader(csvStream)) {
count++;
}
return count;
} catch (e) {
console.error(e);
throw new ParserError(e);
}
};
7 changes: 5 additions & 2 deletions src/record/import/usecases/add.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ export const addRecords: (
{ attachmentsDir, skipMissingFields = true }
) => {
let currentIndex = 0;
const progressLogger = new ProgressLogger(logger, recordSource.length);
const progressLogger = new ProgressLogger(
logger,
await recordSource.length()
);
try {
logger.info("Starting to import records...");
for await (const [recordsNext, index] of recordsReader(recordSource)) {
Expand Down Expand Up @@ -92,7 +95,7 @@ async function* recordsReader(
): AsyncGenerator<[LocalRecord[], number], void, undefined> {
let records = [];
let currentIndex = 0;
for await (const localRecord of localRecordReader.reader) {
for await (const localRecord of localRecordReader.reader()) {
records.push(localRecord);
if (records.length >= CHUNK_SIZE) {
yield [records, currentIndex];
Expand Down
3 changes: 2 additions & 1 deletion src/record/import/usecases/interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { LocalRecord } from "../types/record";

export type LocalRecordRepository = {
readonly length: number;
readonly format: string;
readonly length: () => Promise<number>;

readonly reader: () => AsyncGenerator<LocalRecord, void, undefined>;
};

0 comments on commit 53935f7

Please sign in to comment.