diff --git a/v2/eventlog/eventlog.go b/v2/eventlog/eventlog.go new file mode 100644 index 0000000..f113c5c --- /dev/null +++ b/v2/eventlog/eventlog.go @@ -0,0 +1,61 @@ +package eventlog + +import ( + "fmt" + "io" + "sync" + "time" +) + +type Recent interface { + Add(timestamp time.Time, message string) + Format(w io.Writer) +} + +type entry struct { + tstamp time.Time + message string + count int +} + +type recent struct { + mu sync.Mutex + elements []entry + sizelimit int +} + +func NewRecent(sizelimit int) Recent { + return &recent{ + elements: []entry{}, + sizelimit: sizelimit, + } +} + +func (p *recent) Add(timestamp time.Time, message string) { + p.mu.Lock() + defer p.mu.Unlock() + if len(p.elements) > 0 && p.elements[len(p.elements)-1].message == message { + p.elements[len(p.elements)-1].count++ + } + p.elements = append(p.elements, entry{timestamp, message, 1}) + if len(p.elements) >= 2*p.sizelimit { + copy(p.elements, p.elements[len(p.elements)-p.sizelimit:]) + p.elements = p.elements[:p.sizelimit] + } +} + +func (p *recent) Format(w io.Writer) { + p.mu.Lock() + defer p.mu.Unlock() + n := max(0, len(p.elements)-p.sizelimit) + for i := len(p.elements) - 1; i >= n; i-- { + entry := p.elements[i] + var s string + if entry.count == 1 { + s = fmt.Sprintf("%s %s\n", entry.tstamp.Format(time.RFC3339), entry.message) + } else { + s = fmt.Sprintf("%s %s (x%d)\n", entry.tstamp.Format(time.RFC3339), entry.message, entry.count) + } + w.Write([]byte(s)) + } +} diff --git a/v2/go.mod b/v2/go.mod index 2d09b27..131454f 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -1,8 +1,8 @@ module github.com/hexian000/tlswrapper/v2 -go 1.20 +go 1.21 require ( github.com/hashicorp/yamux v0.1.1 - github.com/hexian000/gosnippets v0.0.0-20231124140902-d2fe8bf2355d + github.com/hexian000/gosnippets v0.0.0-20240124083900-b330d695ba90 ) diff --git a/v2/go.sum b/v2/go.sum index 191fe33..577ab95 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -1,4 +1,4 @@ github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= -github.com/hexian000/gosnippets v0.0.0-20231124140902-d2fe8bf2355d h1:OTTGcMF8+0pFlYuM8h0LjNly0TIsOOt5etdL8oSbmMw= -github.com/hexian000/gosnippets v0.0.0-20231124140902-d2fe8bf2355d/go.mod h1:SItpHKzoWxn8Lj2kJiw1ridZGDunCyUIbpL1N/7j6FQ= +github.com/hexian000/gosnippets v0.0.0-20240124083900-b330d695ba90 h1:wdmi39Ie1Afh28nw7XZnEmoMtRL33pFH8W23IXP6ICA= +github.com/hexian000/gosnippets v0.0.0-20240124083900-b330d695ba90/go.mod h1:SItpHKzoWxn8Lj2kJiw1ridZGDunCyUIbpL1N/7j6FQ= diff --git a/v2/handler.go b/v2/handler.go index 270d79e..f69fc0c 100644 --- a/v2/handler.go +++ b/v2/handler.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "fmt" "net" "sync/atomic" "time" @@ -51,12 +52,12 @@ func (h *TLSHandler) Serve(ctx context.Context, conn net.Conn) { } else { slog.Warningf("%q <= %v: connection is not encrypted", h.t.name, conn.RemoteAddr()) } - tun := h.t + t := h.t handshake := &proto.Handshake{ Identity: c.Identity, } - if tun.c.LocalIdentity != "" { - handshake.Identity = tun.c.LocalIdentity + if t.c.LocalIdentity != "" { + handshake.Identity = t.c.LocalIdentity } if err := proto.RunHandshake(conn, handshake); err != nil { slog.Errorf("%q <= %v: %s", h.t.name, conn.RemoteAddr(), formats.Error(err)) @@ -70,20 +71,21 @@ func (h *TLSHandler) Serve(ctx context.Context, conn net.Conn) { } h.s.stats.authorized.Add(1) if handshake.Identity != "" { - if t := h.s.findTunnel(handshake.Identity); t != nil { - tun = t + if tun := h.s.findTunnel(handshake.Identity); tun != nil { + t = tun } else { - slog.Infof("%q <= %v: unknown identity %q", tun.name, conn.RemoteAddr(), handshake.Identity) + slog.Infof("%q <= %v: unknown identity %q", t.name, conn.RemoteAddr(), handshake.Identity) } } if err := h.s.g.Go(func() { - tun.Serve(mux) + t.Serve(mux) }); err != nil { - slog.Errorf("%q <= %v: %s", tun.name, conn.RemoteAddr(), formats.Error(err)) + slog.Errorf("%q <= %v: %s", t.name, conn.RemoteAddr(), formats.Error(err)) ioClose(mux) return } - slog.Infof("%q <= %v: setup %v", tun.name, conn.RemoteAddr(), formats.Duration(time.Since(start))) + slog.Infof("%q <= %v: setup %v", t.name, conn.RemoteAddr(), formats.Duration(time.Since(start))) + h.s.events.Add(time.Now(), fmt.Sprintf("%q <= %v: established", t.name, mux.RemoteAddr())) } // ForwardHandler forwards connections to another plain address diff --git a/v2/metric.go b/v2/metric.go index cc6707c..29fb565 100644 --- a/v2/metric.go +++ b/v2/metric.go @@ -219,6 +219,9 @@ func (h *apiStatsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fprintf(w, "%-20q: never seen\n", t.Name) } } + + fprintf(w, "\n> Recent Events\n") + h.s.events.Format(w) } func RunHTTPServer(l net.Listener, s *Server) error { diff --git a/v2/server.go b/v2/server.go index 674cd4c..999673e 100644 --- a/v2/server.go +++ b/v2/server.go @@ -17,6 +17,7 @@ import ( snet "github.com/hexian000/gosnippets/net" "github.com/hexian000/gosnippets/routines" "github.com/hexian000/gosnippets/slog" + "github.com/hexian000/tlswrapper/v2/eventlog" "github.com/hexian000/tlswrapper/v2/forwarder" ) @@ -37,6 +38,7 @@ type Server struct { f forwarder.Forwarder flowStats *snet.FlowStats + events eventlog.Recent listeners map[string]net.Listener tunnels map[string]*Tunnel // map[identity]tunnel @@ -58,7 +60,7 @@ type Server struct { // NewServer creates a server object func NewServer(cfg *Config) *Server { - g := routines.NewGroup(0) + g := routines.NewGroup() return &Server{ listeners: make(map[string]net.Listener), tunnels: make(map[string]*Tunnel), @@ -68,6 +70,7 @@ func NewServer(cfg *Config) *Server { }, f: forwarder.New(cfg.MaxConn, g), flowStats: &snet.FlowStats{}, + events: eventlog.NewRecent(16), g: g, c: cfg, } diff --git a/v2/tunnel.go b/v2/tunnel.go index a98ac4b..1863044 100644 --- a/v2/tunnel.go +++ b/v2/tunnel.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "fmt" "net" "sync" "time" @@ -118,6 +119,7 @@ func (t *Tunnel) runWithRedial() { select { case mux := <-t.muxCloseSig: slog.Infof("tunnel %q: connection lost %v", t.name, mux.RemoteAddr()) + t.s.events.Add(time.Now(), fmt.Sprintf("%q => %v: connection lost", t.name, mux.RemoteAddr())) case <-t.scheduleRedial(): case <-t.s.g.CloseC(): // server shutdown @@ -143,6 +145,7 @@ func (t *Tunnel) run() { select { case mux := <-t.muxCloseSig: slog.Infof("tunnel %q: connection lost %v", t.name, mux.RemoteAddr()) + t.s.events.Add(time.Now(), fmt.Sprintf("%q => %v: connection lost", t.name, mux.RemoteAddr())) case <-t.s.g.CloseC(): // server shutdown return @@ -268,6 +271,7 @@ func (t *Tunnel) dial(ctx context.Context) (*yamux.Session, error) { return nil, err } slog.Infof("%q => %v: setup %v", t.name, conn.RemoteAddr(), formats.Duration(time.Since(start))) + t.s.events.Add(time.Now(), fmt.Sprintf("%q => %v: established", t.name, mux.RemoteAddr())) return mux, nil }