Skip to content

Commit

Permalink
Merge pull request lightninglabs#90 from lightninglabs/gbnCleanup
Browse files Browse the repository at this point in the history
gbn+mailbox: cleanup & prefixed logger
  • Loading branch information
ellemouton authored Nov 28, 2023
2 parents 9cf86bc + 72e5adf commit fa60171
Show file tree
Hide file tree
Showing 12 changed files with 366 additions and 242 deletions.
62 changes: 62 additions & 0 deletions gbn/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package gbn

import "time"

// config holds the configuration values for an instance of GoBackNConn.
type config struct {
// n is the window size. The sender can send a maximum of n packets
// before requiring an ack from the receiver for the first packet in
// the window. The value of n is chosen by the client during the
// GoBN handshake.
n uint8

// s is the maximum sequence number used to label packets. Packets
// are labelled with incrementing sequence numbers modulo s.
// s must be strictly larger than the window size, n. This
// is so that the receiver can tell if the sender is resending the
// previous window (maybe the sender did not receive the acks) or if
// they are sending the next window. If s <= n then there would be
// no way to tell.
s uint8

// maxChunkSize is the maximum payload size in bytes allowed per
// message. If the payload to be sent is larger than maxChunkSize then
// the payload will be split between multiple packets.
// If maxChunkSize is zero then it is disabled and data won't be split
// between packets.
maxChunkSize int

// resendTimeout is the duration that will be waited before resending
// the packets in the current queue.
resendTimeout time.Duration

// recvFromStream is the function that will be used to acquire the next
// available packet.
recvFromStream recvBytesFunc

// sendToStream is the function that will be used to send over our next
// packet.
sendToStream sendBytesFunc

// handshakeTimeout is the time after which the server or client
// will abort and restart the handshake if the expected response is
// not received from the peer.
handshakeTimeout time.Duration

pingTime time.Duration
pongTime time.Duration
}

// newConfig constructs a new config struct.
func newConfig(sendFunc sendBytesFunc, recvFunc recvBytesFunc,
n uint8) *config {

return &config{
n: n,
s: n + 1,
recvFromStream: recvFunc,
sendToStream: sendFunc,
resendTimeout: defaultResendTimeout,
handshakeTimeout: defaultHandshakeTimeout,
}
}
38 changes: 22 additions & 16 deletions gbn/gbn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ func NewClientConn(ctx context.Context, n uint8, sendFunc sendBytesFunc,
math.MaxUint8)
}

conn := newGoBackNConn(ctx, sendFunc, receiveFunc, false, n)
cfg := newConfig(sendFunc, receiveFunc, n)

// Apply functional options
for _, o := range opts {
o(conn)
o(cfg)
}

conn := newGoBackNConn(ctx, cfg, "client")

if err := conn.clientHandshake(); err != nil {
if err := conn.Close(); err != nil {
log.Errorf("error closing gbn ClientConn: %v", err)
conn.log.Errorf("error closing gbn ClientConn: %v", err)
}
return nil, err
}
Expand Down Expand Up @@ -76,7 +78,7 @@ func (g *GoBackNConn) clientHandshake() error {
case <-recvNext:
}

b, err := g.recvFromStream(g.ctx)
b, err := g.cfg.recvFromStream(g.ctx)
if err != nil {
errChan <- err
return
Expand All @@ -101,21 +103,22 @@ func (g *GoBackNConn) clientHandshake() error {
handshake:
for {
// start Handshake
msg := &PacketSYN{N: g.n}
msg := &PacketSYN{N: g.cfg.n}
msgBytes, err := msg.Serialize()
if err != nil {
return err
}

// Send SYN
log.Debugf("Client sending SYN")
if err := g.sendToStream(g.ctx, msgBytes); err != nil {
g.log.Debugf("Sending SYN")
if err := g.cfg.sendToStream(g.ctx, msgBytes); err != nil {
return err
}

for {
// Wait for SYN
log.Debugf("Client waiting for SYN")
g.log.Debugf("Waiting for SYN")

select {
case recvNext <- 1:
case <-g.quit:
Expand All @@ -127,8 +130,10 @@ handshake:

var b []byte
select {
case <-time.After(g.handshakeTimeout):
log.Debugf("SYN resendTimeout. Resending SYN.")
case <-time.After(g.cfg.handshakeTimeout):
g.log.Debugf("SYN resendTimeout. Resending " +
"SYN.")

continue handshake
case <-g.quit:
return nil
Expand All @@ -144,7 +149,8 @@ handshake:
return err
}

log.Debugf("Client got %T", resp)
g.log.Debugf("Got %T", resp)

switch r := resp.(type) {
case *PacketSYN:
respSYN = r
Expand All @@ -159,24 +165,24 @@ handshake:
}
}

log.Debugf("Client got SYN")
g.log.Debugf("Got SYN")

if respSYN.N != g.n {
if respSYN.N != g.cfg.n {
return io.EOF
}

// Send SYNACK
log.Debugf("Client sending SYNACK")
g.log.Debugf("Sending SYNACK")
synack, err := new(PacketSYNACK).Serialize()
if err != nil {
return err
}

if err := g.sendToStream(g.ctx, synack); err != nil {
if err := g.cfg.sendToStream(g.ctx, synack); err != nil {
return err
}

log.Debugf("Client Handshake complete")
g.log.Debugf("Handshake complete")

return nil
}
Loading

0 comments on commit fa60171

Please sign in to comment.