Skip to content

Commit

Permalink
fix: improve assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Dec 15, 2023
1 parent 3bf4fec commit 6cbceb4
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 112 deletions.
221 changes: 110 additions & 111 deletions packages/programs/data/shared-log/src/__tests__/replicate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,140 +259,139 @@ describe(`exchange`, function () {
}
});

it("emits correct replication info", async () => {
db1.log.distribute = async () => {
return true; // do a noop becaus in this test we want to make sure that writes are only treated once
// and we don't want extra replication events
};

db2 = (await EventStore.open<EventStore<string>>(
db1.address!,
session.peers[1]
))!;

await db1.waitFor(session.peers[1].peerId);
await db2.waitFor(session.peers[0].peerId);

const entryCount = 99;

// Trigger replication
let adds: number[] = [];
for (let i = 0; i < entryCount; i++) {
adds.push(i);
await db1.add("hello " + i, { meta: { next: [] } });
// TODO when nexts is omitted, entrise will dependon each other,
// When entries arrive in db2 unecessary fetches occur because there is already a sync in progress?
}

//await mapSeries(adds, (i) => db1.add("hello " + i));
describe("info", () => {
it("insertion", async () => {
db1.log.distribute = async () => {
return true; // do a noop becaus in this test we want to make sure that writes are only treated once
// and we don't want extra replication events
};

db2 = (await EventStore.open<EventStore<string>>(
db1.address!,
session.peers[1]
))!;

await db1.waitFor(session.peers[1].peerId);
await db2.waitFor(session.peers[0].peerId);

const entryCount = 99;

// Trigger replication
let adds: number[] = [];
for (let i = 0; i < entryCount; i++) {
adds.push(i);
await db1.add("hello " + i, { meta: { next: [] } });
// TODO when nexts is omitted, entrise will dependon each other,
// When entries arrive in db2 unecessary fetches occur because there is already a sync in progress?
}

// All entries should be in the database
await waitFor(() => db2.log.log.length === entryCount);
//await mapSeries(adds, (i) => db1.add("hello " + i));

// All entries should be in the database
expect((await db2.iterator({ limit: -1 })).collect().length).toEqual(
entryCount
);
// All entries should be in the database
await waitForResolved(() =>
expect(db2.log.log.length).toEqual(entryCount)
);

// progress events should increase monotonically
expect(fetchEvents).toEqual(fetchHashes.size);
expect(fetchEvents).toEqual(0); // becausel all entries were sent
});
// All entries should be in the database
expect((await db2.iterator({ limit: -1 })).collect().length).toEqual(
entryCount
);

it("emits correct replication info on fresh replication", async () => {
const entryCount = 15;
// progress events should increase monotonically
expect(fetchEvents).toEqual(fetchHashes.size);
expect(fetchEvents).toEqual(0); // becausel all entries were sent
});
it("open after insertion", async () => {
const entryCount = 15;

// Trigger replication
const adds: number[] = [];
for (let i = 0; i < entryCount; i++) {
adds.push(i);
}
// Trigger replication
const adds: number[] = [];
for (let i = 0; i < entryCount; i++) {
adds.push(i);
}

const add = async (i: number) => {
process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ");
await db1.add("hello " + i);
};
const add = async (i: number) => {
process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ");
await db1.add("hello " + i);
};

await mapSeries(adds, add);
await mapSeries(adds, add);

db2 = (await EventStore.open<EventStore<string>>(
db1.address!,
session.peers[1]
))!;
db2 = (await EventStore.open<EventStore<string>>(
db1.address!,
session.peers[1]
))!;

// All entries should be in the database
await waitForResolved(() => expect(db2.log.log.length).toEqual(entryCount));
// All entries should be in the database
await waitForResolved(() =>
expect(db2.log.log.length).toEqual(entryCount)
);

// progress events should (increase monotonically)
expect((await db2.iterator({ limit: -1 })).collect().length).toEqual(
entryCount
);
expect(fetchEvents).toEqual(fetchHashes.size);
expect(fetchEvents).toEqual(entryCount - 1); // - 1 because we also send some references for faster syncing (see exchange-heads.ts)
});
// progress events should (increase monotonically)
expect((await db2.iterator({ limit: -1 })).collect().length).toEqual(
entryCount
);
expect(fetchEvents).toEqual(fetchHashes.size);
expect(fetchEvents).toEqual(entryCount - 1); // - 1 because we also send some references for faster syncing (see exchange-heads.ts)
});

it("emits correct replication info in two-way replication", async () => {
const entryCount = 15;
it("two-way replication", async () => {
const entryCount = 15;

// Trigger replication
const adds: number[] = [];
for (let i = 0; i < entryCount; i++) {
adds.push(i);
}
// Trigger replication
const adds: number[] = [];
for (let i = 0; i < entryCount; i++) {
adds.push(i);
}

const add = async (i: number) => {
process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ");
await Promise.all([db1.add("hello-1-" + i), db2.add("hello-2-" + i)]);
};
const add = async (i: number) => {
process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ");
await Promise.all([db1.add("hello-1-" + i), db2.add("hello-2-" + i)]);
};

// Open second instance again
db1.log.distribute = async () => {
return true; // do a noop becaus in this test we want to make sure that writes are only treated once
// and we don't want extra replication events
};
// Open second instance again
db1.log.distribute = async () => {
return true; // do a noop becaus in this test we want to make sure that writes are only treated once
// and we don't want extra replication events
};

db2 = (await EventStore.open<EventStore<string>>(
db1.address!,
session.peers[1]
))!;
db2 = (await EventStore.open<EventStore<string>>(
db1.address!,
session.peers[1]
))!;

await db1.waitFor(session.peers[1].peerId);
await db2.waitFor(session.peers[0].peerId);
await db1.waitFor(session.peers[1].peerId);
await db2.waitFor(session.peers[0].peerId);

expect(db1.address).toBeDefined();
expect(db2.address).toBeDefined();
expect(db1.address!.toString()).toEqual(db2.address!.toString());
expect(db1.address).toBeDefined();
expect(db2.address).toBeDefined();
expect(db1.address!.toString()).toEqual(db2.address!.toString());

await mapSeries(adds, add);
await mapSeries(adds, add);

// All entries should be in the database
await waitFor(
async () =>
(await db2.iterator({ limit: -1 })).collect().length === entryCount * 2,
{ delayInterval: 200, timeout: 20000 }
);
// All entries should be in the database
await waitForResolved(async () =>
expect((await db2.iterator({ limit: -1 })).collect()).toHaveLength(
entryCount * 2
)
);

// Database values should match
// Database values should match

try {
await waitFor(
() => db1.log.log.values.length === db2.log.log.values.length
);
} catch (error) {
throw new Error(
`${db1.log.log.values.length} +" --- " + ${db2.log.log.values.length}`
await waitForResolved(() =>
expect(db1.log.log.values.length).toEqual(db2.log.log.values.length)
);
}

const values1 = (await db1.iterator({ limit: -1 })).collect();
const values2 = (await db2.iterator({ limit: -1 })).collect();
expect(values1.length).toEqual(values2.length);
for (let i = 0; i < values1.length; i++) {
assert(values1[i].equals(values2[i]));
}
// All entries should be in the database
expect(values1.length).toEqual(entryCount * 2);
expect(values2.length).toEqual(entryCount * 2);
const values1 = (await db1.iterator({ limit: -1 })).collect();
const values2 = (await db2.iterator({ limit: -1 })).collect();
expect(values1.length).toEqual(values2.length);
for (let i = 0; i < values1.length; i++) {
assert(values1[i].equals(values2[i]));
}
// All entries should be in the database
expect(values1.length).toEqual(entryCount * 2);
expect(values2.length).toEqual(entryCount * 2);
});
});
});

Expand Down
2 changes: 1 addition & 1 deletion packages/transport/blocks/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class DirectBlock extends DirectStream implements IBlocks {
messageProcessingConcurrency?: number;
}
) {
super(components, ["/lazyblock/1.0.0"], {
super(components, ["/lazyblock/0.0.0"], {
signaturePolicy: "StrictNoSign",
messageProcessingConcurrency: options?.messageProcessingConcurrency || 10,
canRelayMessage: options?.canRelayMessage ?? true,
Expand Down

0 comments on commit 6cbceb4

Please sign in to comment.