diff --git a/backoff.go b/backoff.go new file mode 100644 index 0000000..8ee06f6 --- /dev/null +++ b/backoff.go @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Matt Brittan + * Daichi Tomaru + */ + +package mqtt + +import ( + "sync" + "time" +) + +// Controller for sleep with backoff when the client attempts reconnection +// It has statuses for each situations cause reconnection. +type backoffController struct { + sync.RWMutex + statusMap map[string]*backoffStatus +} + +type backoffStatus struct { + lastSleepPeriod time.Duration + lastErrorTime time.Time +} + +func newBackoffController() *backoffController { + return &backoffController{ + statusMap: map[string]*backoffStatus{}, + } +} + +// Calculate next sleep period from the specified parameters. +// Returned values are next sleep period and whether the error situation is continual. +// If connection errors continuouslly occurs, its sleep period is exponentially increased. +// Also if there is a lot of time between last and this error, sleep period is initialized. +func (b *backoffController) getBackoffSleepTime( + situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool, +) (time.Duration, bool) { + // Decide first sleep time if the situation is not continual. + var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) { + if skip { + status.lastSleepPeriod = 0 + return 0, false + } + status.lastSleepPeriod = init + return init, false + } + + // Prioritize maxSleep. + if initSleepPeriod > maxSleepPeriod { + initSleepPeriod = maxSleepPeriod + } + b.Lock() + defer b.Unlock() + + status, exist := b.statusMap[situation] + if !exist { + b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()} + return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst) + } + + oldTime := status.lastErrorTime + status.lastErrorTime = time.Now() + + // When there is a lot of time between last and this error, sleep period is initialized. + if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) { + return firstProcess(status, initSleepPeriod, skipFirst) + } + + if status.lastSleepPeriod == 0 { + status.lastSleepPeriod = initSleepPeriod + return initSleepPeriod, true + } + + if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod { + status.lastSleepPeriod = nextSleepPeriod + } else { + status.lastSleepPeriod = maxSleepPeriod + } + + return status.lastSleepPeriod, true +} + +// Execute sleep the time returned from getBackoffSleepTime. +func (b *backoffController) sleepWithBackoff( + situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool, +) (time.Duration, bool) { + sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst) + if sleep != 0 { + time.Sleep(sleep) + } + return sleep, isFirst +} diff --git a/backoff_test.go b/backoff_test.go new file mode 100644 index 0000000..bb6c343 --- /dev/null +++ b/backoff_test.go @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Matt Brittan + * Daichi Tomaru + */ + +package mqtt + +import ( + "testing" + "time" +) + +func TestGetBackoffSleepTime(t *testing.T) { + // Test for adding new situation + controller := newBackoffController() + if s, c := controller.getBackoffSleepTime("not-exist", 1 * time.Second, 5 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) { + t.Errorf("When new situation is added, period should be initSleepPeriod and naturally it shouldn't be continual error. s:%d c%t", s, c) + } + + // Test for the continual error in the same situation and suppression of sleep period by maxSleepPeriod + controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false) + if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 20 * time.Second) && c) { + t.Errorf("When same situation is called again, period should be increased and it should be regarded as a continual error. s:%d c%t", s, c) + } + if s, c := controller.getBackoffSleepTime("multi", 10 * time.Second, 30 * time.Second, 1 * time.Second, false); !((s == 30 * time.Second) && c) { + t.Errorf("A same situation is called three times. 10 * 2 * 2 = 40 but maxSleepPeriod is 30. So the next period should be 30. s:%d c%t", s, c) + } + + // Test for initialization by elapsed time. + controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false) + controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false) + time.Sleep((1 * 2 + 1 * 2 + 1) * time.Second) + if s, c := controller.getBackoffSleepTime("elapsed", 1 * time.Second, 128 * time.Second, 1 * time.Second, false); !((s == 1 * time.Second) && !c) { + t.Errorf("Initialization should be triggered by elapsed time. s:%d c%t", s, c) + } + + // Test when initial and max period is same. + controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false) + if s, c := controller.getBackoffSleepTime("same", 2 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) { + t.Errorf("Sleep time should be always 2. s:%d c%t", s, c) + } + + // Test when initial period > max period. + controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false) + if s, c := controller.getBackoffSleepTime("bigger", 5 * time.Second, 2 * time.Second, 1 * time.Second, false); !((s == 2 * time.Second) && c) { + t.Errorf("Sleep time should be 2. s:%d c%t", s, c) + } + + // Test when first sleep is skipped. + if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 0) && !c) { + t.Errorf("Sleep time should be 0 because of skip. s:%d c%t", s, c) + } + if s, c := controller.getBackoffSleepTime("skip", 3 * time.Second, 12 * time.Second, 1 * time.Second, true); !((s == 3 * time.Second) && c) { + t.Errorf("Sleep time should be 3. s:%d c%t", s, c) + } +} diff --git a/client.go b/client.go index 200daa8..9fe349e 100644 --- a/client.go +++ b/client.go @@ -141,6 +141,8 @@ type client struct { stop chan struct{} // Closed to request that workers stop workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume) commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks) + + backoff *backoffController } // NewClient will create an MQTT v3.1.1 client with all of the options specified @@ -169,6 +171,7 @@ func NewClient(o *ClientOptions) Client { c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler) c.obound = make(chan *PacketAndToken) c.oboundP = make(chan *PacketAndToken) + c.backoff = newBackoffController() return c } @@ -302,10 +305,16 @@ func (c *client) Connect() Token { func (c *client) reconnect(connectionUp connCompletedFn) { DEBUG.Println(CLI, "enter reconnect") var ( - sleep = 1 * time.Second + initSleep = 1 * time.Second conn net.Conn ) + // If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started. + // Sleep time is exponentially increased as the same situation continues + if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual { + DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds") + } + for { if nil != c.options.OnReconnecting { c.options.OnReconnecting(c, &c.options) @@ -315,15 +324,8 @@ func (c *client) reconnect(connectionUp connCompletedFn) { if err == nil { break } - DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err) - time.Sleep(sleep) - if sleep < c.options.MaxReconnectInterval { - sleep *= 2 - } - - if sleep > c.options.MaxReconnectInterval { - sleep = c.options.MaxReconnectInterval - } + sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false) + DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err) if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called if err := connectionUp(false); err != nil { // Should always return an error