diff --git a/packages/blockstore-fs/package.json b/packages/blockstore-fs/package.json index 1fb0e6ad..7ae8d596 100644 --- a/packages/blockstore-fs/package.json +++ b/packages/blockstore-fs/package.json @@ -162,16 +162,17 @@ "release": "aegir release" }, "dependencies": { - "fast-write-atomic": "^0.2.1", "interface-blockstore": "^5.0.0", "interface-store": "^6.0.0", "it-glob": "^3.0.1", "it-map": "^3.1.1", "it-parallel-batch": "^3.0.6", - "multiformats": "^13.2.3" + "multiformats": "^13.2.3", + "steno": "^4.0.2" }, "devDependencies": { "aegir": "^44.1.1", - "interface-blockstore-tests": "^7.0.0" + "interface-blockstore-tests": "^7.0.0", + "threads": "^1.7.0" } } diff --git a/packages/blockstore-fs/src/index.ts b/packages/blockstore-fs/src/index.ts index dbe490e6..9b725ee8 100644 --- a/packages/blockstore-fs/src/index.ts +++ b/packages/blockstore-fs/src/index.ts @@ -14,31 +14,27 @@ import fs from 'node:fs/promises' import path from 'node:path' -import { promisify } from 'node:util' -// @ts-expect-error no types -import fwa from 'fast-write-atomic' import { OpenFailedError, type AwaitIterable, PutFailedError, NotFoundError, DeleteFailedError } from 'interface-store' import glob from 'it-glob' import map from 'it-map' import parallelBatch from 'it-parallel-batch' +import { Writer } from 'steno' import { NextToLast } from './sharding.js' import type { ShardingStrategy } from './sharding.js' import type { Blockstore, Pair } from 'interface-blockstore' import type { CID } from 'multiformats/cid' -const writeAtomic = promisify(fwa) - /** * Write a file atomically */ async function writeFile (file: string, contents: Uint8Array): Promise { try { - await writeAtomic(file, contents) + const writer = new Writer(file) + await writer.write(contents) } catch (err: any) { - if (err.code === 'EPERM' && err.syscall === 'rename') { - // fast-write-atomic writes a file to a temp location before renaming it. - // On Windows, if the final file already exists this error is thrown. - // No such error is thrown on Linux/Mac + if (err.syscall === 'rename' && ['ENOENT', 'EPERM'].includes(err.code)) { + // steno writes a file to a temp location before renaming it. + // If the final file already exists this error is thrown. // Make sure we can read & write to this file await fs.access(file, fs.constants.F_OK | fs.constants.W_OK) @@ -46,7 +42,6 @@ async function writeFile (file: string, contents: Uint8Array): Promise { // attempts to write the same block by two different function calls return } - throw err } } diff --git a/packages/blockstore-fs/test/fixtures/writer-worker.ts b/packages/blockstore-fs/test/fixtures/writer-worker.ts new file mode 100644 index 00000000..67a7d9cc --- /dev/null +++ b/packages/blockstore-fs/test/fixtures/writer-worker.ts @@ -0,0 +1,28 @@ +import { CID } from 'multiformats/cid' +import { expose } from 'threads/worker' +import { FsBlockstore } from '../../src/index.js' + +let fs: FsBlockstore +expose({ + async isReady (path) { + fs = new FsBlockstore(path) + try { + await fs.open() + return true + } catch (err) { + // eslint-disable-next-line no-console + console.error('Error opening blockstore', err) + throw err + } + }, + async put (cidString, value) { + const key = CID.parse(cidString) + try { + return await fs.put(key, value) + } catch (err) { + // eslint-disable-next-line no-console + console.error('Error putting block', err) + throw err + } + } +}) diff --git a/packages/blockstore-fs/test/index.spec.ts b/packages/blockstore-fs/test/index.spec.ts index 7e327b33..776744f9 100644 --- a/packages/blockstore-fs/test/index.spec.ts +++ b/packages/blockstore-fs/test/index.spec.ts @@ -5,6 +5,7 @@ import { expect } from 'aegir/chai' import { interfaceBlockstoreTests } from 'interface-blockstore-tests' import { base32 } from 'multiformats/bases/base32' import { CID } from 'multiformats/cid' +import { spawn, Thread, Worker } from 'threads' import { FsBlockstore } from '../src/index.js' import { FlatDirectory, NextToLast } from '../src/sharding.js' @@ -157,4 +158,35 @@ describe('FsBlockstore', () => { expect(res).to.deep.equal(value) }) + + /** + * This test spawns 10 workers that concurrently write to the same blockstore. + * it's different from the previous test because it uses workers to write to the blockstore + * which means that the writes are happening in parallel in different threads. + */ + it('can survive concurrent worker writes', async () => { + const dir = path.join(os.tmpdir(), `test-${Math.random()}`) + const key = CID.parse('QmeimKZyjcBnuXmAD9zMnSjM9JodTbgGT3gutofkTqz9rE') + const workers = await Promise.all(new Array(10).fill(0).map(async () => { + const worker = await spawn(new Worker('./fixtures/writer-worker.js')) + await worker.isReady(dir) + return worker + })) + + try { + const value = utf8Encoder.encode('Hello world') + // 100 iterations of looping over all workers and putting the same key value pair + await Promise.all(new Array(100).fill(0).map(async () => { + return Promise.all(workers.map(async (worker) => worker.put(key.toString(), value))) + })) + + const fs = new FsBlockstore(dir) + await fs.open() + const res = await fs.get(key) + + expect(res).to.deep.equal(value) + } finally { + await Promise.all(workers.map(async (worker) => Thread.terminate(worker))) + } + }) }) diff --git a/packages/datastore-fs/package.json b/packages/datastore-fs/package.json index 1dee7e52..020a4fbd 100644 --- a/packages/datastore-fs/package.json +++ b/packages/datastore-fs/package.json @@ -141,16 +141,17 @@ }, "dependencies": { "datastore-core": "^10.0.0", - "fast-write-atomic": "^0.2.1", "interface-datastore": "^8.0.0", "interface-store": "^6.0.0", "it-glob": "^3.0.1", "it-map": "^3.1.1", - "it-parallel-batch": "^3.0.6" + "it-parallel-batch": "^3.0.6", + "steno": "^4.0.2" }, "devDependencies": { "aegir": "^44.1.1", "interface-datastore-tests": "^6.0.0", - "ipfs-utils": "^9.0.14" + "ipfs-utils": "^9.0.14", + "threads": "^1.7.0" } } diff --git a/packages/datastore-fs/src/index.ts b/packages/datastore-fs/src/index.ts index 0f6e2f0f..7461368c 100644 --- a/packages/datastore-fs/src/index.ts +++ b/packages/datastore-fs/src/index.ts @@ -14,31 +14,27 @@ import fs from 'node:fs/promises' import path from 'node:path' -import { promisify } from 'util' import { BaseDatastore } from 'datastore-core' -// @ts-expect-error no types -import fwa from 'fast-write-atomic' import { Key } from 'interface-datastore' import { OpenFailedError, NotFoundError, PutFailedError, DeleteFailedError } from 'interface-store' import glob from 'it-glob' import map from 'it-map' import parallel from 'it-parallel-batch' +import { Writer } from 'steno' import type { KeyQuery, Pair, Query } from 'interface-datastore' import type { AwaitIterable } from 'interface-store' -const writeAtomic = promisify(fwa) - /** * Write a file atomically */ async function writeFile (path: string, contents: Uint8Array): Promise { try { - await writeAtomic(path, contents) + const writer = new Writer(path) + await writer.write(contents) } catch (err: any) { - if (err.code === 'EPERM' && err.syscall === 'rename') { - // fast-write-atomic writes a file to a temp location before renaming it. - // On Windows, if the final file already exists this error is thrown. - // No such error is thrown on Linux/Mac + if (err.syscall === 'rename' && ['ENOENT', 'EPERM'].includes(err.code)) { + // steno writes a file to a temp location before renaming it. + // If the final file already exists this error is thrown. // Make sure we can read & write to this file await fs.access(path, fs.constants.F_OK | fs.constants.W_OK) diff --git a/packages/datastore-fs/test/fixtures/writer-worker.ts b/packages/datastore-fs/test/fixtures/writer-worker.ts new file mode 100644 index 00000000..49cdfef2 --- /dev/null +++ b/packages/datastore-fs/test/fixtures/writer-worker.ts @@ -0,0 +1,28 @@ +import { Key } from 'interface-datastore' +import { expose } from 'threads/worker' +import { FsDatastore } from '../../src/index.js' + +let fs: FsDatastore +expose({ + async isReady (path) { + fs = new FsDatastore(path) + try { + await fs.open() + return true + } catch (err) { + // eslint-disable-next-line no-console + console.error('Error opening blockstore', err) + throw err + } + }, + async put (keyString, value) { + const key = new Key(keyString) + try { + return await fs.put(key, value) + } catch (err) { + // eslint-disable-next-line no-console + console.error('Error putting block', err) + throw err + } + } +}) diff --git a/packages/datastore-fs/test/index.spec.ts b/packages/datastore-fs/test/index.spec.ts index 39b74536..44e1384c 100644 --- a/packages/datastore-fs/test/index.spec.ts +++ b/packages/datastore-fs/test/index.spec.ts @@ -6,6 +6,7 @@ import { ShardingDatastore, shard } from 'datastore-core' import { Key } from 'interface-datastore' import { interfaceDatastoreTests } from 'interface-datastore-tests' import tempdir from 'ipfs-utils/src/temp-dir.js' +import { spawn, Thread, Worker } from 'threads' import { FsDatastore } from '../src/index.js' const utf8Encoder = new TextEncoder() @@ -170,4 +171,35 @@ describe('FsDatastore', () => { expect(res).to.deep.equal(value) }) + + /** + * This test spawns 10 workers that concurrently write to the same blockstore. + * it's different from the previous test because it uses workers to write to the blockstore + * which means that the writes are happening in parallel in different threads. + */ + it('can survive concurrent worker writes', async () => { + const dir = tempdir() + const key = new Key('CIQGFTQ7FSI2COUXWWLOQ45VUM2GUZCGAXLWCTOKKPGTUWPXHBNIVOY') + const workers = await Promise.all(new Array(10).fill(0).map(async () => { + const worker = await spawn(new Worker('./fixtures/writer-worker.js')) + await worker.isReady(dir) + return worker + })) + + try { + const value = utf8Encoder.encode('Hello world') + // 100 iterations of looping over all workers and putting the same key value pair + await Promise.all(new Array(100).fill(0).map(async () => { + return Promise.all(workers.map(async (worker) => worker.put(key.toString(), value))) + })) + + const fs = new FsDatastore(dir) + await fs.open() + const res = await fs.get(key) + + expect(res).to.deep.equal(value) + } finally { + await Promise.all(workers.map(async (worker) => Thread.terminate(worker))) + } + }) })