Skip to content

Commit

Permalink
generalized redial routine
Browse files Browse the repository at this point in the history
Signed-off-by: He Xian <hexian000@outlook.com>
  • Loading branch information
hexian000 committed Sep 17, 2024
1 parent 0669dc5 commit 0eb60be
Showing 1 changed file with 21 additions and 23 deletions.
44 changes: 21 additions & 23 deletions v2/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ type Tunnel struct {
l hlistener.Listener
mu sync.RWMutex
mux map[*yamux.Session]string // map[mux]tag
redialSig chan struct{}
redialCount int
dialMu sync.Mutex
lastChanged time.Time
}

func NewTunnel(s *Server, c *TunnelConfig) *Tunnel {
return &Tunnel{
name: c.Identity,
s: s,
c: c,
mux: make(map[*yamux.Session]string),
name: c.Identity,
s: s,
c: c,
mux: make(map[*yamux.Session]string),
redialSig: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -96,10 +98,10 @@ func (t *Tunnel) redial() {
}

func (t *Tunnel) scheduleRedial() <-chan time.Time {
n := t.redialCount - 1
if n < 0 {
if !t.s.c.Redial || t.c.MuxDial == "" || t.redialCount < 1 {
return make(<-chan time.Time)
}
n := t.redialCount - 1
var waitTimeConst = [...]time.Duration{
200 * time.Millisecond,
2 * time.Second,
Expand All @@ -118,18 +120,6 @@ func (t *Tunnel) scheduleRedial() <-chan time.Time {
return time.After(waitTime)
}

func (t *Tunnel) runWithRedial() {
for {
t.redial()
select {
case <-t.scheduleRedial():
case <-t.s.g.CloseC():
// server shutdown
return
}
}
}

func (t *Tunnel) run() {
defer func() {
t.mu.Lock()
Expand All @@ -139,11 +129,16 @@ func (t *Tunnel) run() {
delete(t.mux, mux)
}
}()
if t.s.c.Redial {
t.runWithRedial()
return
for {
t.redial()
select {
case <-t.redialSig:
case <-t.scheduleRedial():
case <-t.s.g.CloseC():
// server shutdown
return
}
}
<-t.s.g.CloseC()
}

func (t *Tunnel) addMux(mux *yamux.Session, isDialed bool) {
Expand Down Expand Up @@ -197,6 +192,10 @@ func (t *Tunnel) delMux(mux *yamux.Session) {
}
t.s.numSessions.Add(uint32(len(t.mux) - num))
t.lastChanged = now
select {
case t.redialSig <- struct{}{}:
default:
}
}

func (t *Tunnel) getMux() *yamux.Session {
Expand Down Expand Up @@ -233,7 +232,6 @@ func (t *Tunnel) dial(ctx context.Context) (*yamux.Session, error) {
return nil, ErrDialInProgress
}
defer t.dialMu.Unlock()
// try again with dialMu acquired
if mux := t.getMux(); mux != nil {
return mux, nil
}
Expand Down

0 comments on commit 0eb60be

Please sign in to comment.