Skip to content

Commit

Permalink
fix: mutated cached block and encoding length for batched shards
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw committed Jan 25, 2024
1 parent 586fd06 commit 7c327d4
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 10 deletions.
14 changes: 8 additions & 6 deletions src/batch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Batcher {
constructor ({ blocks, entries, prefix, maxSize, maxKeyLength, base }) {
this.blocks = blocks
this.prefix = prefix
this.entries = entries
this.entries = [...entries]
this.base = base
this.maxSize = maxSize
this.maxKeyLength = maxKeyLength
Expand Down Expand Up @@ -51,7 +51,7 @@ class Batcher {
static async create ({ blocks, link, prefix }) {
const shards = new ShardFetcher(blocks)
const base = await shards.get(link)
return new Batcher({ blocks, entries: base.value.entries, prefix, base, ...Shard.configure(base.value) })
return new Batcher({ blocks, prefix, base, ...base.value })
}
}

Expand Down Expand Up @@ -108,7 +108,7 @@ export const put = async (blocks, shard, key, value) => {
shard.entries = Shard.putEntry(asShardEntries(shard.entries), asShardEntry(entry))

// TODO: adjust size automatically
const size = Shard.encodedLength(Shard.withEntries(asShardEntries(shard.entries), shard))
const size = BatcherShard.encodedLength(shard)
if (size > shard.maxSize) {
const common = Shard.findCommonPrefix(
asShardEntries(shard.entries),
Expand Down Expand Up @@ -164,13 +164,15 @@ export const put = async (blocks, shard, key, value) => {
* @returns {Promise<{ shard: API.BatcherShard, key: string }>}
*/
export const traverse = async (shards, key, shard) => {
for (const e of shard.entries) {
const [k, v] = e
for (let i = 0; i < shard.entries.length; i++) {
const [k, v] = shard.entries[i]
if (key <= k) break
if (key.startsWith(k) && Array.isArray(v)) {
if (Shard.isShardLink(v[0])) {
const blk = await shards.get(v[0], shard.prefix + k)
v[0] = BatcherShard.create({ base: blk, prefix: blk.prefix, ...blk.value })
const batcher = BatcherShard.create({ base: blk, prefix: blk.prefix, ...blk.value })
shard.entries[i] = [k, v[1] == null ? [batcher] : [batcher, v[1]]]
return traverse(shards, key.slice(k.length), batcher)
}
return traverse(shards, key.slice(k.length), v[0])
}
Expand Down
53 changes: 52 additions & 1 deletion src/batch/shard.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,65 @@
// eslint-disable-next-line no-unused-vars
import * as Link from 'multiformats/link'
import { tokensToLength } from 'cborg/length'
import { Token, Type } from 'cborg'
import * as API from './api.js'
import { configure } from '../shard.js'

/** Byte length of a v1, dag-cbor, sha-256 CID */
const ShardLinkByteLength = 36

const CID_TAG = new Token(Type.tag, 42)

/**
* @param {API.BatcherShardInit} [init]
* @returns {API.BatcherShard}
*/
export const create = init => ({
base: init?.base,
prefix: init?.prefix ?? '',
entries: init?.entries ?? [],
entries: [...init?.entries ?? []],
...configure(init)
})

/** @param {API.BatcherShard} shard */
export const encodedLength = (shard) => {
let entriesLength = 0
for (const entry of shard.entries) {
entriesLength += entryEncodedLength(entry)
}
const tokens = [
new Token(Type.map, 3),
new Token(Type.string, 'entries'),
new Token(Type.array, shard.entries.length),
new Token(Type.string, 'maxKeyLength'),
new Token(Type.uint, shard.maxKeyLength),
new Token(Type.string, 'maxSize'),
new Token(Type.uint, shard.maxSize)
]
return tokensToLength(tokens) + entriesLength
}

/** @param {API.BatcherShardEntry} entry */
const entryEncodedLength = entry => {
const tokens = [
new Token(Type.array, entry.length),
new Token(Type.string, entry[0])
]
if (Array.isArray(entry[1])) {
tokens.push(new Token(Type.array, entry[1].length))
for (const item of entry[1]) {
tokens.push(CID_TAG)
if (Link.isLink(item)) {
tokens.push(new Token(Type.bytes, { length: item.byteLength + 1 }))
} else {
// `item is BatcherShard and does not have a CID yet, however, when it
// does, it will be this long.
tokens.push(new Token(Type.bytes, { length: ShardLinkByteLength + 1 }))
}
}
} else {
tokens.push(CID_TAG)
tokens.push(new Token(Type.bytes, { length: entry[1].byteLength + 1 }))
}
return tokensToLength(tokens)
}
2 changes: 1 addition & 1 deletion src/crdt/batch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Batcher {
this.blocks = blocks
this.head = head
this.prefix = prefix
this.entries = entries
this.entries = [...entries]
this.base = base
this.maxSize = maxSize
this.maxKeyLength = maxKeyLength
Expand Down
4 changes: 2 additions & 2 deletions test/batch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ describe('batch', () => {
})

it('batches puts (shard on max size)', async () => {
const rootblk = await ShardBlock.create({ maxSize: 1000 })
const rootblk = await ShardBlock.create({ maxSize: 2000 })
const blocks = new Blockstore()
await blocks.put(rootblk.cid, rootblk.bytes)

Expand Down Expand Up @@ -73,7 +73,7 @@ describe('batch', () => {
assert.equal(value.toString(), o.value.toString())
}

// vis(blocks, root)
vis(blocks, root)
})

it('create the same DAG as non-batched puts', async () => {
Expand Down

0 comments on commit 7c327d4

Please sign in to comment.