Skip to content

Commit

Permalink
fix skip label check
Browse files Browse the repository at this point in the history
Memberlist allows having label and to check them on receiving a message
to ensure rings don't collide unexpectedly. The option to skip that
checks currently doesn't work and will fail the message unless the
labels are exactly the same.

This PR change that behaviour to ensure that the option to skip the
label check does that.

Signed-off-by: Loic Reyreaud <loic@weaviate.io>
  • Loading branch information
reyreaud-l committed Sep 11, 2024
1 parent 3f82dc1 commit 6abd7fd
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
11 changes: 1 addition & 10 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,6 @@ func (m *Memberlist) handleConn(conn net.Conn) {
}

if m.config.SkipInboundLabelCheck {
if streamLabel != "" {
m.logger.Printf("[ERR] memberlist: unexpected double stream label header: %s", LogConn(conn))
return
}
// Set this from config so that the auth data assertions work below.
streamLabel = m.config.Label
}
Expand Down Expand Up @@ -372,10 +368,6 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
}

if m.config.SkipInboundLabelCheck {
if packetLabel != "" {
m.logger.Printf("[ERR] memberlist: unexpected double packet label header: %s", LogAddress(from))
return
}
// Set this from config so that the auth data assertions work below.
packetLabel = m.config.Label
}
Expand Down Expand Up @@ -1118,10 +1110,9 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader, streamLabel string) (

if moreBytes > maxPushStateBytes {
return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes)

}

//Start reporting the size before you cross the limit
// Start reporting the size before you cross the limit
if moreBytes > uint32(math.Floor(.6*maxPushStateBytes)) {
m.logger.Printf("[WARN] memberlist: Remote node state size is (%d) limit is (%d)", moreBytes, maxPushStateBytes)
}
Expand Down
57 changes: 55 additions & 2 deletions transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestTransport_Join(t *testing.T) {
if m2.estNumNodes() != 2 {
t.Fatalf("bad: %v", m2.Members())
}

}

func TestTransport_Send(t *testing.T) {
Expand Down Expand Up @@ -155,7 +154,6 @@ func (tw testCountingWriter) Write(p []byte) (n int, err error) {
// do not result in a tight loop and spam the log. We verify this here by counting the number
// of entries logged in a given time period.
func TestTransport_TcpListenBackoff(t *testing.T) {

// testTime is the amount of time we will allow NetTransport#tcpListen() to run
// This needs to be long enough that to verify that maxDelay is in force,
// but not so long as to be obnoxious when running the test suite.
Expand Down Expand Up @@ -206,3 +204,58 @@ func TestTransport_TcpListenBackoff(t *testing.T) {
// no connections should have been accepted and sent to the channel
require.Equal(t, len(transport.streamCh), 0)
}

func TestTransport_LabelsAndSkip(t *testing.T) {
net := &MockNetwork{}

tests := []struct {
label1 string
label2 string
skip1 bool
skip2 bool
expectSuccess bool
}{
{label1: "label1", label2: "label2"},
{label1: "label1", label2: "label1", expectSuccess: true},
{label1: "label1", label2: "label2", skip1: true, skip2: true, expectSuccess: true},
{label1: "label1", label2: "label1", skip1: true, skip2: true, expectSuccess: true},
}
for _, test := range tests {
t1 := net.NewTransport("node1")
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Label = test.label1
c1.SkipInboundLabelCheck = test.skip1
c1.Transport = t1
m1, err := Create(c1)
if err != nil {
t.Fatalf("err: %v", err)
}
m1.setAlive()
m1.schedule()

c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Label = test.label2
c2.SkipInboundLabelCheck = test.skip2
c2.Transport = net.NewTransport("node2")
m2, err := Create(c2)
if err != nil {
t.Fatalf("err: %v", err)
}
m2.setAlive()
m2.schedule()

_, err = m2.Join([]string{c1.Name + "/" + t1.addr.String()})
// First shutdown everything so that the next iteration can set it up again
m1.Shutdown()
m2.Shutdown()

// Then check if we expected success or not
if test.expectSuccess && err != nil {
t.Fatalf("unexpected error: %v", err)
} else if !test.expectSuccess && err == nil {
t.Fatalf("expected an error")
}
}
}

0 comments on commit 6abd7fd

Please sign in to comment.