-
Notifications
You must be signed in to change notification settings - Fork 16
/
timeout_manager.go
101 lines (84 loc) · 3.05 KB
/
timeout_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package nex
import (
"context"
"time"
)
// TimeoutManager is an implementation of rdv::TimeoutManager and manages the resending of reliable PRUDP packets
type TimeoutManager struct {
ctx context.Context
cancel context.CancelFunc
packets *MutexMap[uint16, PRUDPPacketInterface]
streamSettings *StreamSettings
}
// SchedulePacketTimeout adds a packet to the scheduler and begins it's timer
func (tm *TimeoutManager) SchedulePacketTimeout(packet PRUDPPacketInterface) {
endpoint := packet.Sender().Endpoint().(*PRUDPEndPoint)
rto := endpoint.ComputeRetransmitTimeout(packet)
ctx, cancel := context.WithTimeout(tm.ctx, rto)
timeout := NewTimeout()
timeout.SetRTO(rto)
timeout.ctx = ctx
timeout.cancel = cancel
packet.setTimeout(timeout)
tm.packets.Set(packet.SequenceID(), packet)
go tm.start(packet)
}
// AcknowledgePacket marks a pending packet as acknowledged. It will be ignored at the next resend attempt
func (tm *TimeoutManager) AcknowledgePacket(sequenceID uint16) {
// * Acknowledge the packet
tm.packets.RunAndDelete(sequenceID, func(_ uint16, packet PRUDPPacketInterface) {
// * Update the RTT on the connection if the packet hasn't been resent
if packet.SendCount() >= tm.streamSettings.RTTRetransmit {
rttm := time.Since(packet.SentAt())
packet.Sender().(*PRUDPConnection).rtt.Adjust(rttm)
}
})
}
func (tm *TimeoutManager) start(packet PRUDPPacketInterface) {
<-packet.getTimeout().ctx.Done()
connection := packet.Sender().(*PRUDPConnection)
// * If the connection is closed stop trying to resend
if connection.ConnectionState != StateConnected {
return
}
if tm.packets.Has(packet.SequenceID()) {
// * This is `<` instead of `<=` for accuracy with observed behavior, even though we're comparing send count vs _resend_ max
if packet.SendCount() < tm.streamSettings.MaxPacketRetransmissions {
endpoint := packet.Sender().Endpoint().(*PRUDPEndPoint)
packet.incrementSendCount()
packet.setSentAt(time.Now())
rto := endpoint.ComputeRetransmitTimeout(packet)
ctx, cancel := context.WithTimeout(tm.ctx, rto)
timeout := packet.getTimeout()
timeout.timeout = rto
timeout.ctx = ctx
timeout.cancel = cancel
// * Schedule the packet to be resent
go tm.start(packet)
// * Resend the packet to the connection
server := connection.endpoint.Server
data := packet.Bytes()
server.sendRaw(connection.Socket, data)
} else {
// * Packet has been retried too many times, consider the connection dead
connection.Lock()
defer connection.Unlock()
connection.cleanup()
}
}
}
// Stop kills the resend scheduler and stops all pending packets
func (tm *TimeoutManager) Stop() {
tm.cancel()
tm.packets.Clear(func(key uint16, value PRUDPPacketInterface) {})
}
// NewTimeoutManager creates a new TimeoutManager
func NewTimeoutManager() *TimeoutManager {
ctx, cancel := context.WithCancel(context.Background())
return &TimeoutManager{
ctx: ctx,
cancel: cancel,
packets: NewMutexMap[uint16, PRUDPPacketInterface](),
streamSettings: NewStreamSettings(),
}
}