Skip to content

Commit

Permalink
Fix overflow issues in seqNr and ackNr (#692)
Browse files Browse the repository at this point in the history
* switch contentWRiter data chunks to array

* Fix test

* remove `only` modifier

* Use robots suggestion for cleaner code
  • Loading branch information
acolytec3 authored Dec 17, 2024
1 parent c652435 commit c7e5cf8
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { VERSION } from '../Utils/constants.js'

import { SelectiveAckHeaderExtension } from './Extensions.js'

import type { Uint8, Uint16, Uint32 } from '../index.js'
import type { Uint16, Uint32, Uint8 } from '../index.js'
import type {
HeaderInput,
ISelectiveAckHeaderInput,
Expand Down
3 changes: 2 additions & 1 deletion packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ export class ContentReader {
}

readPacket(payload: Uint8Array) {
this.nextDataNr!++
// Wrap seqNr back to 0 when it exceeds 16-bit max integer
this.nextDataNr! = (this.nextDataNr! + 1) % 65536
this.bytes.push(...payload)
this.logger.extend('BYTES')(
`Current stream: ${this.bytes.length} / ${this.bytesExpected} bytes. ${this.bytesExpected - this.bytes.length} bytes till end of content.`,
Expand Down
29 changes: 15 additions & 14 deletions packages/portalnetwork/src/wire/utp/Socket/ContentWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,31 @@ export class ContentWriter {
content: Uint8Array
writing: boolean
sentChunks: number[]
dataChunks: Record<number, Uint8Array>
dataNrs: number[]
dataChunks: Array<[number, Uint8Array]>

constructor(socket: WriteSocket, content: Uint8Array, startingSeqNr: number, logger: Debugger) {
this.socket = socket
this.content = content
this.startingSeqNr = startingSeqNr
this.seqNr = startingSeqNr
this.writing = false
this.sentChunks = []
this.dataNrs = []
this.logger = logger.extend('WRITING')
this.dataChunks = {}
this.dataChunks = []
}

async write(): Promise<void> {
if (!this.writing) return
const chunks = Object.keys(this.dataChunks).length
const totalChunks = this.dataChunks.length
let bytes: Uint8Array
if (this.sentChunks.length < chunks) {
bytes = this.dataChunks[this.seqNr] ?? []
!this.sentChunks.includes(this.seqNr) && this.sentChunks.push(this.seqNr)
if (this.sentChunks.length < totalChunks) {
bytes = this.dataChunks[this.sentChunks.length][1] ?? []
this.sentChunks.push(this.seqNr)
this.logger(
`Sending ST-DATA ${this.seqNr - this.startingSeqNr + 1}/${chunks} -- SeqNr: ${this.seqNr}`,
`Sending ST-DATA ${this.sentChunks.length}/${totalChunks} -- SeqNr: ${this.seqNr}`,
)
this.seqNr = this.sentChunks.slice(-1)[0] + 1
// Wrap seqNr back to 0 when it exceeds 16-bit max integer
this.seqNr = (this.sentChunks[this.sentChunks.length - 1] + 1) % 65536
await this.socket.sendDataPacket(bytes)
return
}
Expand All @@ -48,17 +48,18 @@ export class ContentWriter {
await this.write()
}

chunk(): Record<number, Uint8Array> {
chunk(): [number, Uint8Array][] {
let arrayMod = this.content
const total = Math.ceil(this.content.length / BUFFER_SIZE)
this.logger(`Preparing content for transfer as ${total} ${BUFFER_SIZE} byte chunks.`)
const dataChunks: Record<number, Uint8Array> = {}
const dataChunks = new Array<[number, Uint8Array]>(total)
let seqNr = this.startingSeqNr
for (let i = 0; i < total; i++) {
const start = 0
const end = arrayMod.length > 512 ? 512 : undefined
dataChunks[i + this.startingSeqNr] = arrayMod.subarray(start, end)
dataChunks[i] = [seqNr, arrayMod.subarray(start, end)]
arrayMod = arrayMod.subarray(end)
this.dataNrs.push(i + this.startingSeqNr)
seqNr = (seqNr + 1) % 65536 // Wrap seqNr back to 0 when it exceeds 16-bit max integer
}
this.logger(`Ready to send ${total} Packets starting at SeqNr: ${this.startingSeqNr}`)
return dataChunks
Expand Down
3 changes: 3 additions & 0 deletions packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ export abstract class UtpSocket {
extension,
}
opts.pType === PacketType.ST_DATA && this.seqNr++
if (this.seqNr > 65535) {
this.seqNr = 0
}
return this.packetManager.createPacket<T>(params)
}

Expand Down
8 changes: 3 additions & 5 deletions packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class WriteSocket extends UtpSocket {
this.close()
}
compare(): boolean {
if (!this.ackNrs.includes(undefined) && this.ackNrs.length === this.writer!.dataNrs.length) {
if (!this.ackNrs.includes(undefined) && this.ackNrs.length === this.writer!.dataChunks.length) {
return true
}
return false
Expand All @@ -71,7 +71,7 @@ export class WriteSocket extends UtpSocket {
this._clearTimeout()
}
logProgress() {
const needed = this.writer!.dataNrs.filter((n) => !this.ackNrs.includes(n))
const needed = this.writer!.dataChunks.filter((n) => !this.ackNrs.includes(n[0]))
this.logger(
`AckNr's received (${this.ackNrs.length}/${
this.writer!.sentChunks.length
Expand All @@ -89,9 +89,7 @@ export class WriteSocket extends UtpSocket {
`)
}
updateAckNrs(ackNr: number) {
this.ackNrs = Object.keys(this.writer!.dataChunks)
.filter((n) => parseInt(n) <= ackNr)
.map((n) => parseInt(n))
this.ackNrs = this.writer!.dataChunks.filter((n) => n[0] <= ackNr).map((n) => n[0])
}
async sendDataPacket(bytes: Uint8Array): Promise<void> {
this.state = ConnectionState.Connected
Expand Down
38 changes: 32 additions & 6 deletions packages/portalnetwork/test/wire/utp/socket.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ const _read = async (networkId: NetworkId): Promise<ReadSocket> => {
networkId,
ackNr: DEFAULT_RAND_ACKNR,
seqNr: DEFAULT_RAND_SEQNR,
remoteAddress: '1234',
enr: 'enr:1234' as any,
rcvId: readId,
sndId: writeId,
logger: debug('test'),
type: UtpSocketType.READ,
}) as ReadSocket
}
const _write = async (networkId: NetworkId): Promise<WriteSocket> => {
const _write = async (networkId: NetworkId, seqNr?: number): Promise<WriteSocket> => {
const client = await PortalNetwork.create({ bindAddress: '127.0.0.1' })
return createUtpSocket({
utp: new PortalNetworkUTP(client),
networkId,
ackNr: DEFAULT_RAND_ACKNR,
seqNr: DEFAULT_RAND_SEQNR,
remoteAddress: '1234',
seqNr: seqNr ?? DEFAULT_RAND_SEQNR,
enr: 'enr:1234' as any,
rcvId: writeId,
sndId: readId,
logger: debug('test'),
Expand Down Expand Up @@ -327,9 +327,24 @@ describe('uTP Socket Tests', async () => {
s.setWriter(s.getSeqNr())
it('socket.compare()', () => {
s.ackNrs = [0, 1, 2, 3, 4, 5]
s.writer!.dataNrs = [0, 1, 2, 3, 4, 5]
s.writer!.dataChunks = [
[0, Uint8Array.from([111])],
[1, Uint8Array.from([222])],
[2, Uint8Array.from([333])],
[3, Uint8Array.from([444])],
[4, Uint8Array.from([555])],
[5, Uint8Array.from([666])],
]
assert.ok(s.compare(), 'socket.compare() returns true for matching ackNrs and dataNrs')
s.writer!.dataNrs = [0, 1, 2, 3, 4, 5, 6]
s.writer!.dataChunks = [
[0, Uint8Array.from([111])],
[1, Uint8Array.from([222])],
[2, Uint8Array.from([333])],
[3, Uint8Array.from([444])],
[4, Uint8Array.from([555])],
[5, Uint8Array.from([666])],
[6, Uint8Array.from([777])],
]
assert.notOk(s.compare(), 'socket.compare() returns false for mismatched ackNrs and dataNrs')
s.ackNrs = [0, 1, 2, 3, 4, 6, 5]
assert.ok(
Expand Down Expand Up @@ -378,3 +393,14 @@ describe('uTP Socket Tests', async () => {
)
})
})
describe('seqNr overflow', () => {
it('should reset seqNr to 0', async () => {
const s = await _write(NetworkId.HistoryNetwork, 65535)
s.logger = debug('test')
s.content = new Uint8Array(1024)
s.setWriter(s.getSeqNr())
await s.writer?.write()
await s.writer?.write()
assert.equal(s.getSeqNr(), 1, 'seqNr should be reset to 0')
})
})
16 changes: 8 additions & 8 deletions packages/portalnetwork/test/wire/utp/utp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ describe('uTP Reader/Writer tests', async () => {
Math.ceil(sampleSize / 512),
'ContentWriter chunked',
)
const totalLength = Object.values(contentChunks).reduce((acc, chunk) => acc + chunk.length, 0)
const totalLength = contentChunks.reduce((acc, chunk) => acc + chunk[1].length, 0)
assert.equal(totalLength, sampleSize, 'ContentWriter chunked all bytes')
const packets = Object.values(contentChunks).map((chunk, i) => {
const packets = contentChunks.map((chunk, i) => {
return Packet.fromOpts({
header: {
seqNr: i,
Expand All @@ -59,10 +59,10 @@ describe('uTP Reader/Writer tests', async () => {
timestampMicroseconds: 0,
wndSize: 0,
},
payload: chunk,
payload: chunk[1],
})
})
assert.equal(packets.length, Object.values(contentChunks).length, 'Packets created')
assert.equal(packets.length, contentChunks.length, 'Packets created')
let sent = 0
for (const [i, packet] of packets.entries()) {
reader.addPacket(packet)
Expand All @@ -87,9 +87,9 @@ describe('uTP Reader/Writer tests', async () => {
Math.ceil(content.length / 512),
'ContentWriter chunked',
)
const totalLength = Object.values(contentChunks).reduce((acc, chunk) => acc + chunk.length, 0)
const totalLength = contentChunks.reduce((acc, chunk) => acc + chunk[1].length, 0)
assert.equal(totalLength, content.length, 'ContentWriter chunked all bytes')
const packets = Object.values(contentChunks).map((chunk, i) => {
const packets = contentChunks.map((chunk, i) => {
return Packet.fromOpts({
header: {
seqNr: i,
Expand All @@ -102,10 +102,10 @@ describe('uTP Reader/Writer tests', async () => {
timestampMicroseconds: 0,
wndSize: 0,
},
payload: chunk,
payload: chunk[1],
})
})
assert.equal(packets.length, Object.values(contentChunks).length, 'Packets created')
assert.equal(packets.length, contentChunks.length, 'Packets created')
let sent = 0
for (const [i, packet] of packets.entries()) {
reader.addPacket(packet)
Expand Down

0 comments on commit c7e5cf8

Please sign in to comment.