Skip to content

Commit

Permalink
Fixed the error by disconnecting the connection and resetting the syn…
Browse files Browse the repository at this point in the history
…c instance.

ConnState field is added to SyncDelivery, which is possibly a duplicate of the existing fields.
  • Loading branch information
markverick committed Nov 19, 2023
1 parent 0c8381b commit 288e208
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
8 changes: 7 additions & 1 deletion src/backend/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,14 @@ export async function bootstrapWorkspace(opts: {
await certStorage.readyEvent

// Sync Agents
let resolver = (newConnState: ConnState) => {

Check warning on line 224 in src/backend/main.ts

View workflow job for this annotation

GitHub Actions / ESLint

src/backend/main.ts#L224

'resolver' is never reassigned. Use 'const' instead (prefer-const)
console.log("ConnState changed to: " + newConnState)
if (newConnState == "DISCONNECTED"){
disconnect()
}
}
syncAgent = await SyncAgent.create(
nodeId, persistStore, endpoint, certStorage.signer!, certStorage.verifier,
nodeId, persistStore, endpoint, certStorage.signer!, certStorage.verifier, resolver
)

// Root doc using CRDT and Sync
Expand Down
40 changes: 19 additions & 21 deletions src/backend/sync-agent/deliveries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { SvStateVector } from "@ndn/sync"
import { getNamespace } from "./namespace"
import { Storage } from "../storage"
import { panic } from "../../utils"
import { ConnState } from "../../backend/main"

export function encodeSyncState(state: SvStateVector): Uint8Array {
return Encoder.encode(state)
Expand Down Expand Up @@ -34,6 +35,7 @@ const fireSvSync = (inst: SvSync) => {
*/
export abstract class SyncDelivery {
readonly baseName: Name
connState: ConnState = "CONNECTED"
protected _syncInst?: SvSync
protected _syncNode?: SyncNode
protected _ready: boolean = false
Expand Down Expand Up @@ -127,30 +129,25 @@ export abstract class SyncDelivery {
}
}

reset() {
public reset() {
if (this._syncInst === undefined || !this._ready) {
throw new Error('Please do not reset before start.')
}
console.warn('A Sync reset is scheduled.')
this._syncInst.close()
this._syncNode = undefined
return new Promise(r => setTimeout(r, 600000)).then(() => {
SvSync.create({
endpoint: this.endpoint,
syncPrefix: this.syncPrefix,
signer: digestSigning,
initialStateVector: new SvStateVector(this.state),
initialize: async (svSync) => {
this._syncInst = svSync
this._syncInst.addEventListener("update", update => this.handleSyncUpdate(update))
// const nodeId = getNamespace().nodeIdFromSigner(this.signer.name)
this._syncNode = this._syncInst.add(this.nodeId)
// this._ready should already be true
}
}).then(value => {
// Force trigger the SvSync to fire
fireSvSync(value)
})
SvSync.create({
endpoint: this.endpoint,
syncPrefix: this.syncPrefix,
signer: digestSigning,
initialStateVector: new SvStateVector(this.state),
initialize: async (svSync) => {
this._syncInst = svSync
this._syncInst.addEventListener("update", update => this.handleSyncUpdate(update))
// const nodeId = getNamespace().nodeIdFromSigner(this.signer.name)
this._syncNode = this._syncInst.add(this.nodeId)
// this._ready should already be true
}
})
}

Expand Down Expand Up @@ -238,11 +235,12 @@ export class AtLeastOnceDelivery extends SyncDelivery {
} catch (error) {
// TODO: Find a better way to handle this
console.error(`Unable to fetch or verify ${name.toString()} due to: `, error)
console.warn('The current SVS protocol cannot recover from this error. A reset is scheduled in 10 min.')
this.reset()
console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered')
// this.reset()

// TODO: Since it takes time to fix, let's core dump first
panic(`Unable to fetch or verify ${name.toString()} due to: ${error}`)
this.connState = "DISCONNECTED"
// panic(`Unable to fetch or verify ${name.toString()} due to: ${error}`)

return
}
Expand Down
14 changes: 12 additions & 2 deletions src/backend/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getNamespace } from "./namespace"
import { InMemoryStorage, Storage } from "../storage"
import { SvStateVector } from "@ndn/sync"
import { panic } from "../../utils"
import { ConnState } from "../main"


export type ChannelType = 'update' | 'blob' | 'status' | 'blobUpdate'
Expand All @@ -29,7 +30,8 @@ export class SyncAgent {
readonly signer: Signer,
readonly verifier: Verifier,
readonly atLeastOnce: AtLeastOnceDelivery,
readonly latestOnly: LatestOnlyDelivery
readonly latestOnly: LatestOnlyDelivery,
readonly connUpdateCallback: (connState: ConnState) => void
) {
// Serve stored packets
// TODO: Design a better namespace
Expand Down Expand Up @@ -115,6 +117,12 @@ export class SyncAgent {
}

private async onUpdate(wire: Uint8Array, id: Name) {
if (this.atLeastOnce.connState == "DISCONNECTED") {
console.log(this.atLeastOnce.connState)
this.connUpdateCallback(this.atLeastOnce.connState)
this.atLeastOnce.reset()
this.latestOnly.reset()
}
if (!this._ready) {
console.error('[SyncAgent] FATAL: NOT READY YET')
panic('[SyncAgent] Not ready for update. Check program flow.')
Expand Down Expand Up @@ -354,6 +362,7 @@ export class SyncAgent {
endpoint: Endpoint,
signer: Signer,
verifier: Verifier,
connUpdateCallback: (connState: ConnState) => void
) {
const tempStorage = new InMemoryStorage()
// Note: we need the signer name to be /[appPrefix]/<nodeId>/KEY/<keyID>
Expand All @@ -378,7 +387,8 @@ export class SyncAgent {
signer,
verifier,
atLeastOnce,
latestOnly
latestOnly,
connUpdateCallback
)
resolver!((content, id) => ret.onUpdate(content, id))
return ret
Expand Down

0 comments on commit 288e208

Please sign in to comment.