Skip to content

Commit

Permalink
Merge branch 'sbruens/proxy' into sbruens/proxy-protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Aug 7, 2024
2 parents 2622b5e + 30bbdfa commit 4270b56
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 205 deletions.
59 changes: 37 additions & 22 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func init() {
}

type SSServer struct {
stopConfig func()
stopConfig func() error
lnManager service.ListenerManager
natTimeout time.Duration
m *outlineMetrics
Expand All @@ -83,12 +83,14 @@ func (s *SSServer) loadConfig(filename string) error {
// We hot swap the config by having the old and new listeners both live at
// the same time. This means we create listeners for the new config first,
// and then close the old ones after.
stopConfig, err := s.runConfig(*config)
sopConfig, err := s.runConfig(*config)
if err != nil {
return err
}
go s.stopConfig()
s.stopConfig = stopConfig
if err := s.Stop(); err != nil {
return fmt.Errorf("unable to stop old config: %v", err)
}
s.stopConfig = sopConfig
return nil
}

Expand Down Expand Up @@ -151,35 +153,35 @@ type listenerSet struct {
listenersMu sync.Mutex
}

// ListenStream announces on a given TCP network address. Trying to listen on
// the same address twice will result in an error.
// ListenStream announces on a given network address. Trying to listen for stream connections
// on the same address twice will result in an error.
func (ls *listenerSet) ListenStream(addr string, proxy bool) (service.StreamListener, error) {
ls.listenersMu.Lock()
defer ls.listenersMu.Unlock()

lnKey := "tcp/" + addr
lnKey := "stream/" + addr
if _, exists := ls.listenerCloseFuncs[lnKey]; exists {
return nil, fmt.Errorf("listener %s already exists", lnKey)
return nil, fmt.Errorf("stream listener for %s already exists", addr)
}
ln, err := ls.manager.ListenStream("tcp", addr, proxy)
ln, err := ls.manager.ListenStream(addr, proxy)
if err != nil {
return nil, err
}
ls.listenerCloseFuncs[lnKey] = ln.Close
return ln, nil
}

// ListenPacket announces on a given UDP network address. Trying to listen on
// the same address twice will result in an error.
// ListenPacket announces on a given network address. Trying to listen for packet connections
// on the same address twice will result in an error.
func (ls *listenerSet) ListenPacket(addr string, proxy bool) (net.PacketConn, error) {
ls.listenersMu.Lock()
defer ls.listenersMu.Unlock()

lnKey := "udp/" + addr
lnKey := "packet/" + addr
if _, exists := ls.listenerCloseFuncs[lnKey]; exists {
return nil, fmt.Errorf("listener %s already exists", lnKey)
return nil, fmt.Errorf("packet listener for %s already exists", addr)
}
ln, err := ls.manager.ListenPacket("udp", addr, proxy)
ln, err := ls.manager.ListenPacket(addr, proxy)
if err != nil {
return nil, err
}
Expand All @@ -189,13 +191,14 @@ func (ls *listenerSet) ListenPacket(addr string, proxy bool) (net.PacketConn, er

// Close closes all the listeners in the set, after which the set can't be used again.
func (ls *listenerSet) Close() error {
ls.listenersMu.Lock()
defer ls.listenersMu.Unlock()

for addr, listenerCloseFunc := range ls.listenerCloseFuncs {
if err := listenerCloseFunc(); err != nil {
return fmt.Errorf("listener on address %s failed to stop: %w", addr, err)
}
}
ls.listenersMu.Lock()
defer ls.listenersMu.Unlock()
ls.listenerCloseFuncs = nil
return nil
}
Expand All @@ -205,16 +208,19 @@ func (ls *listenerSet) Len() int {
return len(ls.listenerCloseFuncs)
}

func (s *SSServer) runConfig(config Config) (func(), error) {
func (s *SSServer) runConfig(config Config) (func() error, error) {
startErrCh := make(chan error)
stopErrCh := make(chan error)
stopCh := make(chan struct{})

go func() {
lnSet := &listenerSet{
manager: s.lnManager,
listenerCloseFuncs: make(map[string]func() error),
}
defer lnSet.Close() // This closes all the listeners in the set.
defer func() {
stopErrCh <- lnSet.Close()
}()

startErrCh <- func() error {
totalCipherCount := len(config.Keys)
Expand Down Expand Up @@ -281,11 +287,13 @@ func (s *SSServer) runConfig(config Config) (func(), error) {
if err != nil {
return nil, err
}
return func() {
return func() error {
logger.Infof("Stopping running config.")
// TODO(sbruens): Actually wait for all handlers to be stopped, e.g. by
// using a https://pkg.go.dev/sync#WaitGroup.
stopCh <- struct{}{}
stopErr := <-stopErrCh
return stopErr
}, nil
}

Expand Down Expand Up @@ -341,15 +349,22 @@ func (s *SSServer) startListenerFromConfig(lnSet *listenerSet, serviceConfig Ser
}

// Stop stops serving the current config.
func (s *SSServer) Stop() {
go s.stopConfig()
func (s *SSServer) Stop() error {
stopFunc := s.stopConfig
if stopFunc == nil {
return nil
}
if err := stopFunc(); err != nil {
logger.Errorf("Error stopping config: %v", err)
return err
}
logger.Info("Stopped all listeners for running config")
return nil
}

// RunSSServer starts a shadowsocks server running, and returns the server or an error.
func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetrics, replayHistory int) (*SSServer, error) {
server := &SSServer{
stopConfig: func() {},
lnManager: service.NewListenerManager(),
natTimeout: natTimeout,
m: sm,
Expand Down
4 changes: 3 additions & 1 deletion cmd/outline-ss-server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ func TestRunSSServer(t *testing.T) {
if err != nil {
t.Fatalf("RunSSServer() error = %v", err)
}
server.Stop()
if err := server.Stop(); err != nil {
t.Errorf("Error while stopping server: %v", err)
}
}
Loading

0 comments on commit 4270b56

Please sign in to comment.