Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Portal-Hive client-interop fixes #429

Merged
merged 10 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions packages/cli/src/rpc/modules/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export class portal {
const shortEnr = encodedENR.nodeId.slice(0, 15) + '...'
this.logger(`portal_historyAddEnr request received for ${shortEnr}`)
try {
if (this._history.routingTable.getValue(encodedENR.nodeId)) {
if (this._history.routingTable.getWithPending(encodedENR.nodeId)?.value) {
return true
}
this._history.routingTable.insertOrUpdate(encodedENR, EntryStatus.Disconnected)
Expand Down Expand Up @@ -284,7 +284,7 @@ export class portal {
if (!isValidId(dstId)) {
return 'invalid node id'
}
if (!this._history.routingTable.getValue(dstId)) {
if (!this._history.routingTable.getWithPending(dstId)?.value) {
const pong = await this._history.sendPing(enr)
if (!pong) {
return ''
Expand All @@ -303,7 +303,7 @@ export class portal {
const [dstId, distances] = params
this.logger(`portal_historySendFindNodes`)
try {
const enr = this._history.routingTable.getValue(dstId)
const enr = this._history.routingTable.getWithPending(dstId)?.value
if (!enr) {
return
}
Expand All @@ -317,7 +317,7 @@ export class portal {
const [dstId, enrs, requestId] = params
this.logger(`portal_historySendNodes`)
try {
const enr = this._history.routingTable.getValue(dstId)
const enr = this._history.routingTable.getWithPending(dstId)?.value
if (!enr) {
return
}
Expand Down Expand Up @@ -351,18 +351,19 @@ export class portal {
this.logger(`historyRecursiveFindNodes request returned ${res}`)
return res ?? ''
}
async historyLocalContent(params: [string]): Promise<string> {
async historyLocalContent(params: [string]): Promise<string | undefined> {
const [contentKey] = params
this.logger(`Received historyLocalContent request for ${contentKey}`)

const res = await this._history.findContentLocally(fromHexString(contentKey))
this.logger(`historyLocalContent request returned ${res.length} bytes`)
return toHexString(res)
this.logger.extend(`historyLocalContent`)(`request returned ${res.length} bytes`)
this.logger.extend(`historyLocalContent`)(`${toHexString(res)}`)
return res.length > 0 ? toHexString(res) : undefined
}
async historyFindContent(params: [string, string]) {
const [enr, contentKey] = params
const nodeId = ENR.decodeTxt(enr).nodeId
if (!this._history.routingTable.getValue(nodeId)) {
if (!this._history.routingTable.getWithPending(nodeId)?.value) {
const pong = await this._history.sendPing(enr)
if (!pong) {
return ''
Expand Down Expand Up @@ -393,14 +394,14 @@ export class portal {
res.selector === 0 && this.logger.extend('findContent')('utp')
this.logger.extend('findContent')(content)
return {
content: toHexString(content),
content: content.length > 0 ? toHexString(content) : '',
utpTransfer: res.selector === 0,
}
}
async historySendFindContent(params: [string, string]) {
const [nodeId, contentKey] = params
const res = await this._history.sendFindContent(nodeId, fromHexString(contentKey))
const enr = this._history.routingTable.getValue(nodeId)
const enr = this._history.routingTable.getWithPending(nodeId)?.value
return res && enr && '0x' + enr.seq.toString(16)
}
async historySendContent(params: [string, string]) {
Expand All @@ -409,7 +410,7 @@ export class portal {
selector: 1,
value: fromHexString(content),
})
const enr = this._history.routingTable.getValue(nodeId)
const enr = this._history.routingTable.getWithPending(nodeId)?.value
this._client.sendPortalNetworkResponse(
{ nodeId, socketAddr: enr?.getLocationMultiaddr('udp')! },
enr!.seq,
Expand All @@ -430,7 +431,7 @@ export class portal {
const [enrHex, contentKeyHex, contentValueHex] = params
const enr = ENR.decodeTxt(enrHex)
const contentKey = decodeContentKey(contentKeyHex)
if (this._history.routingTable.getValue(enr.nodeId) === undefined) {
if (this._history.routingTable.getWithPending(enr.nodeId)?.value === undefined) {
const res = await this._history.sendPing(enr)
if (res === undefined) {
return '0x'
Expand All @@ -448,7 +449,7 @@ export class portal {
const [dstId, contentKeys] = params
const keys = contentKeys.map((key) => fromHexString(key))
const res = await this._history.sendOffer(dstId, keys)
const enr = this._history.routingTable.getValue(dstId)
const enr = this._history.routingTable.getWithPending(dstId)?.value
return res && enr && '0x' + enr.seq.toString(16)
}
async historySendAccept(params: [string, string, string[]]) {
Expand Down
4 changes: 2 additions & 2 deletions packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
this.discv5.on('talkReqReceived', this.onTalkReq)
this.discv5.on('talkRespReceived', this.onTalkResp)
this.uTP.on('Send', async (peerId: string, msg: Buffer, protocolId: ProtocolId) => {
const enr = this.protocols.get(protocolId)?.routingTable.getValue(peerId)
const enr = this.protocols.get(protocolId)?.routingTable.getWithPending(peerId)?.value
try {
await this.sendPortalNetworkMessage(enr ?? peerId, msg, protocolId, true)
this.uTP.emit('Sent')
Expand Down Expand Up @@ -365,7 +365,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
// If ENR is not provided, look up ENR in protocol routing table by nodeId
const protocol = this.protocols.get(protocolId)
if (protocol) {
nodeAddr = protocol.routingTable.getValue(enr)
nodeAddr = protocol.routingTable.getWithPending(enr)?.value
if (!nodeAddr) {
// Check in unverified sessions cache if no ENR found in routing table
nodeAddr = this.unverifiedSessionCache.get(enr)
Expand Down
4 changes: 2 additions & 2 deletions packages/portalnetwork/src/subprotocols/contentLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ export class ContentLookup {
continue
}
// Send a PING request to check liveness of any unknown nodes
if (!this.protocol.routingTable.getValue(decodedEnr.nodeId)) {
if (!this.protocol.routingTable.getWithPending(decodedEnr.nodeId)?.value) {
const ping = await this.protocol.sendPing(decodedEnr)
if (!ping) {
this.protocol.routingTable.evictNode(decodedEnr.nodeId)
continue
}
}
if (!this.protocol.routingTable.getValue(decodedEnr.nodeId)) {
if (!this.protocol.routingTable.getWithPending(decodedEnr.nodeId)?.value) {
continue
}
// Calculate distance and add to list of lookup peers
Expand Down
2 changes: 1 addition & 1 deletion packages/portalnetwork/src/subprotocols/history/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class HistoryProtocol extends BaseProtocol {
* @returns the value of the FOUNDCONTENT response or undefined
*/
public sendFindContent = async (dstId: string, key: Uint8Array) => {
const enr = this.routingTable.getValue(dstId)
const enr = this.routingTable.getWithPending(dstId)?.value
if (!enr) {
this.logger(`No ENR found for ${shortId(dstId)}. FINDCONTENT aborted.`)
return
Expand Down
2 changes: 1 addition & 1 deletion packages/portalnetwork/src/subprotocols/nodeLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ export class NodeLookup {
const res = await this.protocol.sendPing(enr)
if (res) this.protocol.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
}
return this.protocol.routingTable.getValue(this.nodeSought)?.encodeTxt()
return this.protocol.routingTable.getWithPending(this.nodeSought)?.value.encodeTxt()
}
}
17 changes: 11 additions & 6 deletions packages/portalnetwork/src/subprotocols/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ export abstract class BaseProtocol extends EventEmitter {
abstract store(contentType: any, hashKey: string, value: Uint8Array): Promise<void>

public handle(message: ITalkReqMessage, src: INodeAddress) {
const enr = this.findEnr(src.nodeId)
if (enr !== undefined) {
this.updateRoutingTable(enr)
}
const id = message.id
const protocol = message.protocol
const request = message.request
Expand Down Expand Up @@ -164,7 +168,7 @@ export abstract class BaseProtocol extends EventEmitter {
}

handlePing = async (src: INodeAddress, id: bigint, pingMessage: PingMessage) => {
if (!this.routingTable.getValue(src.nodeId)) {
if (!this.routingTable.getWithPending(src.nodeId)?.value) {
// Check to see if node is already in corresponding network routing table and add if not
const enr = this.findEnr(src.nodeId)
if (enr !== undefined) {
Expand Down Expand Up @@ -207,7 +211,7 @@ export abstract class BaseProtocol extends EventEmitter {
})

try {
const enr = this.routingTable.getValue(dstId)
const enr = this.routingTable.getWithPending(dstId)?.value
if (!enr) {
return
}
Expand All @@ -221,7 +225,7 @@ export abstract class BaseProtocol extends EventEmitter {
(enr) => !this.routingTable.isIgnored(ENR.decode(Buffer.from(enr)).nodeId)
)
const unknown = notIgnored.filter(
(enr) => !this.routingTable.getValue(ENR.decode(Buffer.from(enr)).nodeId)
(enr) => !this.routingTable.getWithPending(ENR.decode(Buffer.from(enr)).nodeId)?.value
)
// Ping node if not currently in subprotocol routing table
for (const enr of unknown) {
Expand Down Expand Up @@ -315,7 +319,7 @@ export abstract class BaseProtocol extends EventEmitter {
selector: MessageCodes.OFFER,
value: offerMsg,
})
const enr = this.routingTable.getValue(dstId)
const enr = this.routingTable.getWithPending(dstId)?.value
if (!enr) {
this.logger(`No ENR found for ${shortId(dstId)}. OFFER aborted.`)
return
Expand All @@ -341,6 +345,7 @@ export abstract class BaseProtocol extends EventEmitter {
this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(dstId)}`)
return []
}
this.logger.extend(`OFFER`)(`ACCEPT message received with uTP id: ${id}`)

const requestedData: Uint8Array[] = []
for await (const key of requestedKeys) {
Expand Down Expand Up @@ -563,7 +568,7 @@ export abstract class BaseProtocol extends EventEmitter {
try {
const nodeId = enr.nodeId
// Only add node to the routing table if we have an ENR
this.routingTable.getValue(enr.nodeId) === undefined &&
this.routingTable.getWithPending(enr.nodeId)?.value === undefined &&
this.logger(`adding ${nodeId} to ${this.protocolName} routing table`)
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
if (customPayload) {
Expand All @@ -573,7 +578,7 @@ export abstract class BaseProtocol extends EventEmitter {
} catch (err) {
this.logger(`Something went wrong: ${(err as any).message}`)
try {
this.routingTable.getValue(enr as any) === undefined &&
this.routingTable.getWithPending(enr as any)?.value === undefined &&
this.logger(`adding ${enr as any} to ${this.protocolName} routing table`)
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
if (customPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
// this.logger(
// `Received Rendezvous FIND request for ${shortId(dstId)} on ${protocolId} network`
// )
// let enr = protocol.routingTable.getValue(dstId)
// let enr = protocol.routingTable.getWithPending(dstId)
// if (!enr) {
// enr = this.client.discv5.getKadValue(dstId)
// if (!enr) {
Expand Down Expand Up @@ -108,7 +108,7 @@
// this.logger(
// `Received Rendezvous SYNC from requestor ${shortId(srcId)} for target ${shortId(dstId)}`
// )
// const srcEnr = protocol.routingTable.getValue(srcId)
// const srcEnr = protocol.routingTable.getWithPending(srcId)
// const payload = Buffer.concat([
// Uint8Array.from([2]),
// Buffer.from(protocolId.slice(2), 'hex'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class PacketManager {
...opts,
version: 1,
timestampMicroseconds: Bytes32TimeStamp(),
connectionId: this.rcvConnectionId,
connectionId: opts.connectionId ?? this.rcvConnectionId,
timestampDifferenceMicroseconds: this.congestionControl.reply_micro,
wndSize: Math.min(
2 ** 32 - 1,
Expand Down
7 changes: 7 additions & 0 deletions packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ interface IHeaderInput<T extends PacketType> {
interface IBasicHeaderInput<T extends PacketType> extends IHeaderInput<T> {
pType: PacketType
extension: HeaderExtension.none
connectionId: Uint16
}
export interface ISelectiveAckHeaderInput extends IHeaderInput<PacketType.ST_STATE> {
pType: PacketType.ST_STATE
extension: HeaderExtension.selectiveAck
bitmask: Uint8Array
connectionId: Uint16
}
export type HeaderInput<T extends PacketType> = IBasicHeaderInput<T> | ISelectiveAckHeaderInput

Expand Down Expand Up @@ -73,21 +75,25 @@ export interface IPacket<T extends PacketType> {
pType: T
seqNr: number
ackNr: number
connectionId: number
payload?: Uint8Array
}
export interface IBasic<T extends PacketType> extends IPacket<T> {
pType: T
extension: HeaderExtension.none
connectionId: number
}
export interface ISelectiveAck extends IPacket<PacketType.ST_STATE> {
pType: PacketType.ST_STATE
extension: HeaderExtension.selectiveAck
bitmask: Uint8Array
connectionId: number
}
export interface IData extends IPacket<PacketType.ST_DATA> {
pType: PacketType.ST_DATA
extension: HeaderExtension.none
payload: Uint8Array
connectionId: number
}
export type ICreate<T extends PacketType> = IBasic<T> | ISelectiveAck | IData

Expand All @@ -105,6 +111,7 @@ export interface UtpSocketOptions {

interface ICreatePacket<T> {
pType: T
connectionId?: number
}
interface ICreateSelectiveAck extends ICreatePacket<PacketType.ST_STATE> {
bitmask: Uint8Array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ export class ContentRequest {
}

async sendSyn(): Promise<void> {
await this.socket.sendSynPacket()
await this.socket.sendSynPacket(
this.requestCode === RequestCode.OFFER_WRITE
? this.socket.sndConnectionId
: this.socket.rcvConnectionId
)
this.socket.state = ConnectionState.SynSent
}
}
Loading