Skip to content

Commit

Permalink
Fix lost peer change update
Browse files Browse the repository at this point in the history
This is an old informer issues where a delete and add or add and delete
could get converted to an update call which messes up the peer syncing.
This causes ingestors to sometimes get stuck with old addresses and
accumulate old segments that it thinks it should transfer.  This
fixes the first part of this so that the address eventually are kept
in sync.
  • Loading branch information
jwilder committed Jun 22, 2023
1 parent 745ec19 commit 8b81fa7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 22 deletions.
58 changes: 39 additions & 19 deletions ingestor/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type coordinator struct {
hostname string
groupName string
cancel context.CancelFunc
wg sync.WaitGroup
}

type CoordinatorOpts struct {
Expand Down Expand Up @@ -142,6 +143,11 @@ func (c *coordinator) OnDelete(obj interface{}) {
}

func (c *coordinator) isPeer(p *v1.Pod) bool {
// Determine if peer discovery is enabled or not
if c.namespace == "" || c.groupName == "" {
return false
}

if p.Namespace != c.namespace {
return false
}
Expand Down Expand Up @@ -189,7 +195,6 @@ func (c *coordinator) Open(ctx context.Context) error {
set := make(map[string]string)
set[c.hostname] = c.hostEntpoint
c.peers = set
logger.Info("Peer changed %s addr=%s ready=%v", hostName, myIP.To4().String(), "true")

if _, err := podsInformer.AddEventHandler(c); err != nil {
return err
Expand All @@ -199,6 +204,9 @@ func (c *coordinator) Open(ctx context.Context) error {
return err
}

c.wg.Add(1)
go c.resyncPeers(ctx)

return nil
}

Expand All @@ -210,6 +218,7 @@ func (c *coordinator) Owner(b []byte) (string, string) {

func (c *coordinator) Close() error {
c.cancel()
c.wg.Wait()
c.factory.Shutdown()
return nil
}
Expand All @@ -228,6 +237,7 @@ func (c *coordinator) syncPeers() error {
return fmt.Errorf("list pods: %w", err)
}

set := make(map[string]string, len(c.peers))
for _, p := range pods {
if p.Status.PodIP == "" {
continue
Expand All @@ -237,41 +247,51 @@ func (c *coordinator) syncPeers() error {
continue
}

_, isPeer := c.peers[p.Name]
ready := IsPodReady(p)

// if the peer is not ready and already in our peer set, remove it.
if !ready && isPeer {
logger.Info("Peer changed %s addr=%s ready=%v", p.Name, p.Status.PodIP, ready)
delete(c.peers, p.Name)
continue
}

// If the peer is not ready and not already in our peer set, skip it.
if !ready || isPeer {
if !ready {
continue
}

logger.Info("Peer changed %s addr=%s ready=%v", p.Name, p.Status.PodIP, ready)

c.peers[p.Name] = fmt.Sprintf("https://%s:9090/transfer", p.Status.PodIP)
set[p.Name] = fmt.Sprintf("https://%s:9090/transfer", p.Status.PodIP)
}

set := make(map[string]string, len(c.peers))
for peer, addr := range c.peers {
set[peer] = addr
c.peers = make(map[string]string, len(set))
for peer, addr := range set {
c.peers[peer] = addr
}

part, err := NewPartition(set)
if err != nil {
return err
}
c.part = part
c.peers = set

return nil
}

func (c *coordinator) resyncPeers(ctx context.Context) {
defer c.wg.Done()

t := time.NewTicker(time.Minute)
defer t.Stop()

for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := c.syncPeers(); err != nil {
logger.Error("Failed to reconfigure peers: %s", err)
}
c.mu.RLock()
for peer, addr := range c.peers {
logger.Info("Peers updated %s addr=%s ready=%v", peer, addr, "true")
}
c.mu.RUnlock()
}
}
}

// Get preferred outbound ip of this machine
func GetOutboundIP() (net.IP, error) {
conn, err := net.Dial("udp", "169.254.169.25:80")
Expand Down
6 changes: 3 additions & 3 deletions ingestor/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestCoordinator_DiscoveryDisabled(t *testing.T) {
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "ingestor-0",
Name: "ingestor-1",
Namespace: "adx-mon",
OwnerReferences: []metav1.OwnerReference{
{
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestCoordinator_DiscoveryDisabled(t *testing.T) {
WriteTimeSeriesFn: nil,
K8sCli: kcli,
Namespace: "",
Hostname: "",
Hostname: "ingestor-0",
InsecureSkipVerify: false,
})
require.NoError(t, c.Open(context.Background()))
Expand All @@ -144,7 +144,7 @@ func TestCoordinator_DiscoveryDisabled(t *testing.T) {
require.NoError(t, err)

coord := c.(*coordinator)
require.Equal(t, 1, len(coord.peers))
require.Equal(t, 0, len(coord.peers))

}

Expand Down

0 comments on commit 8b81fa7

Please sign in to comment.