Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Dec 27, 2024
1 parent 5dcc720 commit fa27128
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 296 deletions.
161 changes: 73 additions & 88 deletions packages/programs/data/shared-log/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ export type DynamicReplicationOptions<R extends "u32" | "u64"> = {
cpu?: number | { max: number; monitor?: CPUUsage };
};
} & (
| { offset: number; normalized?: true | undefined }
| { offset: NumberFromType<R>; normalized: false }
| { offset?: undefined; normalized?: undefined }
);
| { offset: number; normalized?: true | undefined }
| { offset: NumberFromType<R>; normalized: false }
| { offset?: undefined; normalized?: undefined }
);

export type FixedReplicationOptions = {
id?: Uint8Array;
Expand Down Expand Up @@ -284,8 +284,8 @@ export type SharedLogOptions<
T,
D extends ReplicationDomain<any, T, R>,
R extends "u32" | "u64" = D extends ReplicationDomain<any, T, infer I>
? I
: "u32",
? I
: "u32",
> = {
replicate?: ReplicationOptions<R>;
replicas?: ReplicationLimitsOptions;
Expand Down Expand Up @@ -330,8 +330,8 @@ export type Args<
T,
D extends ReplicationDomain<any, T, R>,
R extends "u32" | "u64" = D extends ReplicationDomain<any, T, infer I>
? I
: "u32",
? I
: "u32",
> = LogProperties<T> & LogEvents<T> & SharedLogOptions<T, D, R>;

export type SharedAppendOptions<T> = AppendOptions<T> & {
Expand All @@ -357,8 +357,8 @@ export class SharedLog<
T,
D extends ReplicationDomain<any, T, R>,
R extends "u32" | "u64" = D extends ReplicationDomain<any, T, infer I>
? I
: "u32",
? I
: "u32",
> extends Program<Args<T, D, R>, SharedLogEvents> {
@field({ type: Log })
log: Log<T>;
Expand Down Expand Up @@ -639,8 +639,8 @@ export class SharedLog<
rangeArg.offset != null
? normalized
? this.indexableDomain.numbers.denormalize(
rangeArg.offset as number,
)
rangeArg.offset as number,
)
: rangeArg.offset
: this.indexableDomain.numbers.random();
let factor = rangeArg.factor;
Expand All @@ -659,7 +659,7 @@ export class SharedLog<
? fullWidth
: factor === "right"
? // @ts-ignore
fullWidth - offset
fullWidth - offset
: factorDenormalized) as NumberFromType<R>,
/* typeof factor === "number"
? factor
Expand Down Expand Up @@ -740,11 +740,11 @@ export class SharedLog<
cpu:
options?.limits?.cpu != null
? {
max:
typeof options?.limits?.cpu === "object"
? options.limits.cpu.max
: options?.limits?.cpu,
}
max:
typeof options?.limits?.cpu === "object"
? options.limits.cpu.max
: options?.limits?.cpu,
}
: undefined,
},
);
Expand Down Expand Up @@ -1577,18 +1577,18 @@ export class SharedLog<

this.syncronizer = options?.syncronizer
? new options.syncronizer({
entryIndex: this.entryCoordinatesIndex,
log: this.log,
rangeIndex: this._replicationRangeIndex,
rpc: this.rpc,
coordinateToHash: this.coordinateToHash,
})
entryIndex: this.entryCoordinatesIndex,
log: this.log,
rangeIndex: this._replicationRangeIndex,
rpc: this.rpc,
coordinateToHash: this.coordinateToHash,
})
: new SimpleSyncronizer({
log: this.log,
rpc: this.rpc,
entryIndex: this.entryCoordinatesIndex,
coordinateToHash: this.coordinateToHash,
});
log: this.log,
rpc: this.rpc,
entryIndex: this.entryCoordinatesIndex,
coordinateToHash: this.coordinateToHash,
});

// Open for communcation
await this.rpc.open({
Expand Down Expand Up @@ -1695,7 +1695,7 @@ export class SharedLog<
if (!key) {
throw new Error(
"Failed to resolve public key from hash: " +
segment.value.hash,
segment.value.hash,
);
}
this.events.dispatchEvent(
Expand Down Expand Up @@ -1772,10 +1772,10 @@ export class SharedLog<
options?: {
roleAge?: number;
eager?:
| {
unmaturedFetchCoverSize?: number;
}
| boolean;
| {
unmaturedFetchCoverSize?: number;
}
| boolean;
},
) {
let roleAge = options?.roleAge ?? (await this.getDefaultMinRoleAge());
Expand Down Expand Up @@ -1884,9 +1884,6 @@ export class SharedLog<
return this.log.recover();
}

recievedOnce: Set<string> = new Set();
recievedAndIsLeaderOnce: Set<string> = new Set();

// Callback for receiving a message from the network
async _onMessage(
msg: TransportMessage,
Expand All @@ -1910,18 +1907,14 @@ export class SharedLog<
const { heads } = msg;

logger.debug(
`${this.node.identity.publicKey.hashcode()}: Recieved heads: ${heads.length === 1 ? heads[0].entry.hash : "#" + heads.length
`${this.node.identity.publicKey.hashcode()}: Recieved heads: ${
heads.length === 1 ? heads[0].entry.hash : "#" + heads.length
}, logId: ${this.log.idString}`,
);
if (heads.length > 1)
console.log("RECEIVED " + heads.length + " FROM " + context.from!.hashcode());

if (heads) {
const filteredHeads: EntryWithRefs<any>[] = [];
for (const head of heads) {
(this.recievedOnce || (this.recievedOnce = new Set())).add(
head.entry.hash,
);
if (!(await this.log.has(head.entry.hash))) {
head.entry.init({
// we need to init because we perhaps need to decrypt gid
Expand Down Expand Up @@ -2043,11 +2036,6 @@ export class SharedLog<

outer: for (const entry of entries) {
if (isLeader || this.sync?.(entry.entry)) {
(
this.recievedAndIsLeaderOnce ||
(this.recievedAndIsLeaderOnce = new Set())
).add(entry.entry.hash);

toMerge.push(entry.entry);
} else {
for (const ref of entry.gidRefrences) {
Expand All @@ -2061,7 +2049,8 @@ export class SharedLog<
}

logger.debug(
`${this.node.identity.publicKey.hashcode()}: Dropping heads with gid: ${entry.entry.meta.gid
`${this.node.identity.publicKey.hashcode()}: Dropping heads with gid: ${
entry.entry.meta.gid
}. Because not leader`,
);
}
Expand Down Expand Up @@ -2373,7 +2362,7 @@ export class SharedLog<
}
logger.error(
"Failed to find peer who updated replication settings: " +
e?.message,
e?.message,
);
});
} else if (msg instanceof StoppedReplicating) {
Expand Down Expand Up @@ -2538,10 +2527,10 @@ export class SharedLog<
verifySignatures?: boolean;
timeout?: number;
replicate?:
| boolean
| {
mergeSegments?: boolean;
};
| boolean
| {
mergeSegments?: boolean;
};
},
): Promise<void> {
let entriesToReplicate: Entry<T>[] = [];
Expand Down Expand Up @@ -2571,14 +2560,14 @@ export class SharedLog<

const onChangeForReplication = options?.replicate
? async (change: Change<T>) => {
if (change.added) {
for (const entry of change.added) {
if (entry.head) {
entriesToReplicate.push(entry.entry);
if (change.added) {
for (const entry of change.added) {
if (entry.head) {
entriesToReplicate.push(entry.entry);
}
}
}
}
}
: undefined;

const persistCoordinate = async (entry: Entry<T>) => {
Expand Down Expand Up @@ -2696,10 +2685,10 @@ export class SharedLog<
onLeader?: (key: string) => void;
// persist even if not leader
persist?:
| {
prev?: EntryReplicated<R>;
}
| false;
| {
prev?: EntryReplicated<R>;
}
| false;
} = { timeout: this.waitForReplicatorTimeout },
): Promise<Map<string, { intersecting: boolean }> | false> {
const timeout = options.timeout ?? this.waitForReplicatorTimeout;
Expand Down Expand Up @@ -2889,13 +2878,13 @@ export class SharedLog<
coordinates: NumberFromType<R>[];
entry: ShallowOrFullEntry<any> | EntryReplicated<R>;
leaders:
| Map<
string,
{
intersecting: boolean;
}
>
| false;
| Map<
string,
{
intersecting: boolean;
}
>
| false;
replicas: number;
prev?: EntryReplicated<R>;
}) {
Expand All @@ -2908,7 +2897,7 @@ export class SharedLog<
properties.prev &&
properties.prev.assignedToRangeBoundary === assignedToRangeBoundary
) {
console.log("SKIP", properties.entry.hash, properties.coordinates)
console.log("SKIP", properties.entry.hash, properties.coordinates);
return; // no change
}

Expand Down Expand Up @@ -2975,10 +2964,10 @@ export class SharedLog<
onLeader?: (key: string) => void;
// persist even if not leader
persist?:
| {
prev?: EntryReplicated<R>;
}
| false;
| {
prev?: EntryReplicated<R>;
}
| false;
},
): Promise<Map<string, { intersecting: boolean }>> {
// we consume a list of coordinates in this method since if we are leader of one coordinate we want to persist all of them
Expand Down Expand Up @@ -3018,10 +3007,10 @@ export class SharedLog<
onLeader?: (key: string) => void;
// persist even if not leader
persist?:
| {
prev?: EntryReplicated<R>;
}
| false;
| {
prev?: EntryReplicated<R>;
}
| false;
},
): Promise<boolean> {
let cursors: NumberFromType<R>[] = await this.createCoordinates(
Expand Down Expand Up @@ -3173,9 +3162,6 @@ export class SharedLog<
>,
options?: { timeout?: number; unchecked?: boolean },
): Promise<any>[] {
if (true as any) {
return [];
}
if (options?.unchecked) {
return [...entries.values()].map((x) => {
this._gidPeersHistory.delete(x.entry.meta.gid);
Expand Down Expand Up @@ -3272,7 +3258,7 @@ export class SharedLog<
replicas: minReplicas.getValue(this),
})
) {
console.error("HERE")
console.error("Is leader after delete");
}
});
}, this.waitForPruneDelay),
Expand Down Expand Up @@ -3309,10 +3295,10 @@ export class SharedLog<
if (
!(await this._waitForReplicators(
cursor ??
(cursor = await this.createCoordinates(
entry,
minReplicasValue,
)),
(cursor = await this.createCoordinates(
entry,
minReplicasValue,
)),
entry,
[
{ key: publicKeyHash, replicator: true },
Expand Down Expand Up @@ -3596,7 +3582,6 @@ export class SharedLog<
}
}
for (const [target, entries] of uncheckedDeliver) {
console.log("MAYBE MISSING ENTRIES", target, entries.size)
this.syncronizer.onMaybeMissingEntries({
entries,
targets: [target],
Expand Down
Loading

0 comments on commit fa27128

Please sign in to comment.