diff --git a/src/backend/main.ts b/src/backend/main.ts index 3180709..60eff3d 100644 --- a/src/backend/main.ts +++ b/src/backend/main.ts @@ -221,8 +221,14 @@ export async function bootstrapWorkspace(opts: { await certStorage.readyEvent // Sync Agents + let resolver = (newConnState: ConnState) => { + console.log("ConnState changed to: " + newConnState) + if (newConnState == "DISCONNECTED"){ + disconnect() + } + } syncAgent = await SyncAgent.create( - nodeId, persistStore, endpoint, certStorage.signer!, certStorage.verifier, + nodeId, persistStore, endpoint, certStorage.signer!, certStorage.verifier, resolver ) // Root doc using CRDT and Sync diff --git a/src/backend/sync-agent/deliveries.ts b/src/backend/sync-agent/deliveries.ts index 7e9c83a..df03746 100644 --- a/src/backend/sync-agent/deliveries.ts +++ b/src/backend/sync-agent/deliveries.ts @@ -7,6 +7,7 @@ import { SvStateVector } from "@ndn/sync" import { getNamespace } from "./namespace" import { Storage } from "../storage" import { panic } from "../../utils" +import { ConnState } from "../../backend/main" export function encodeSyncState(state: SvStateVector): Uint8Array { return Encoder.encode(state) @@ -34,6 +35,7 @@ const fireSvSync = (inst: SvSync) => { */ export abstract class SyncDelivery { readonly baseName: Name + connState: ConnState = "CONNECTED" protected _syncInst?: SvSync protected _syncNode?: SyncNode protected _ready: boolean = false @@ -127,30 +129,25 @@ export abstract class SyncDelivery { } } - reset() { + public reset() { if (this._syncInst === undefined || !this._ready) { throw new Error('Please do not reset before start.') } console.warn('A Sync reset is scheduled.') this._syncInst.close() this._syncNode = undefined - return new Promise(r => setTimeout(r, 600000)).then(() => { - SvSync.create({ - endpoint: this.endpoint, - syncPrefix: this.syncPrefix, - signer: digestSigning, - initialStateVector: new SvStateVector(this.state), - initialize: async (svSync) => { - this._syncInst = svSync - this._syncInst.addEventListener("update", update => this.handleSyncUpdate(update)) - // const nodeId = getNamespace().nodeIdFromSigner(this.signer.name) - this._syncNode = this._syncInst.add(this.nodeId) - // this._ready should already be true - } - }).then(value => { - // Force trigger the SvSync to fire - fireSvSync(value) - }) + SvSync.create({ + endpoint: this.endpoint, + syncPrefix: this.syncPrefix, + signer: digestSigning, + initialStateVector: new SvStateVector(this.state), + initialize: async (svSync) => { + this._syncInst = svSync + this._syncInst.addEventListener("update", update => this.handleSyncUpdate(update)) + // const nodeId = getNamespace().nodeIdFromSigner(this.signer.name) + this._syncNode = this._syncInst.add(this.nodeId) + // this._ready should already be true + } }) } @@ -238,11 +235,12 @@ export class AtLeastOnceDelivery extends SyncDelivery { } catch (error) { // TODO: Find a better way to handle this console.error(`Unable to fetch or verify ${name.toString()} due to: `, error) - console.warn('The current SVS protocol cannot recover from this error. A reset is scheduled in 10 min.') - this.reset() + console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered') + // this.reset() // TODO: Since it takes time to fix, let's core dump first - panic(`Unable to fetch or verify ${name.toString()} due to: ${error}`) + this.connState = "DISCONNECTED" + // panic(`Unable to fetch or verify ${name.toString()} due to: ${error}`) return } diff --git a/src/backend/sync-agent/sync-agent.ts b/src/backend/sync-agent/sync-agent.ts index 007ee30..8de4a34 100644 --- a/src/backend/sync-agent/sync-agent.ts +++ b/src/backend/sync-agent/sync-agent.ts @@ -9,6 +9,7 @@ import { getNamespace } from "./namespace" import { InMemoryStorage, Storage } from "../storage" import { SvStateVector } from "@ndn/sync" import { panic } from "../../utils" +import { ConnState } from "../main" export type ChannelType = 'update' | 'blob' | 'status' | 'blobUpdate' @@ -29,7 +30,8 @@ export class SyncAgent { readonly signer: Signer, readonly verifier: Verifier, readonly atLeastOnce: AtLeastOnceDelivery, - readonly latestOnly: LatestOnlyDelivery + readonly latestOnly: LatestOnlyDelivery, + readonly connUpdateCallback: (connState: ConnState) => void ) { // Serve stored packets // TODO: Design a better namespace @@ -115,6 +117,12 @@ export class SyncAgent { } private async onUpdate(wire: Uint8Array, id: Name) { + if (this.atLeastOnce.connState == "DISCONNECTED") { + console.log(this.atLeastOnce.connState) + this.connUpdateCallback(this.atLeastOnce.connState) + this.atLeastOnce.reset() + this.latestOnly.reset() + } if (!this._ready) { console.error('[SyncAgent] FATAL: NOT READY YET') panic('[SyncAgent] Not ready for update. Check program flow.') @@ -354,6 +362,7 @@ export class SyncAgent { endpoint: Endpoint, signer: Signer, verifier: Verifier, + connUpdateCallback: (connState: ConnState) => void ) { const tempStorage = new InMemoryStorage() // Note: we need the signer name to be /[appPrefix]//KEY/ @@ -378,7 +387,8 @@ export class SyncAgent { signer, verifier, atLeastOnce, - latestOnly + latestOnly, + connUpdateCallback ) resolver!((content, id) => ret.onUpdate(content, id)) return ret