diff --git a/client/batch.go b/client/batch.go index c7bb718..1667581 100644 --- a/client/batch.go +++ b/client/batch.go @@ -3,6 +3,8 @@ package client import ( "encoding/json" "fmt" + + "github.com/contribsys/faktory/util" ) type BatchStatus struct { @@ -176,7 +178,7 @@ func (c *Client) BatchStatus(bid string) (*BatchStatus, error) { } var stat BatchStatus - err = json.Unmarshal(data, &stat) + err = util.JsonUnmarshal(data, &stat) if err != nil { return nil, err } diff --git a/client/client.go b/client/client.go index 2c770d7..48034df 100644 --- a/client/client.go +++ b/client/client.go @@ -15,6 +15,7 @@ import ( "time" "github.com/contribsys/faktory/internal/pool" + "github.com/contribsys/faktory/util" ) const ( @@ -152,7 +153,7 @@ func DefaultServer() *Server { // // Use the URL to configure any necessary password: // -// tcp://:mypassword@localhost:7419 +// tcp://:mypassword@localhost:7419 // // By default Open assumes localhost with no password // which is appropriate for local development. @@ -182,8 +183,7 @@ func OpenWithDialer(dialer Dialer) (*Client, error) { // a *tls.Dialer if "tcp+tls" and a *net.Dialer if // not. // -// client.Dial(client.Localhost, "topsecret") -// +// client.Dial(client.Localhost, "topsecret") func Dial(srv *Server, password string) (*Client, error) { d := &net.Dialer{Timeout: srv.Timeout} dialer := Dialer(d) @@ -198,6 +198,12 @@ func DialWithDialer(srv *Server, password string, dialer Dialer) (*Client, error return dial(srv, password, dialer) } +type HIv2 struct { + V int `json:"v"` // version, should be 2 + I int `json:"i,omitempty"` // iterations + S string `json:"s,omitempty"` // salt +} + // dial connects to the remote faktory server. func dial(srv *Server, password string, dialer Dialer) (*Client, error) { client := emptyClientData() @@ -227,27 +233,19 @@ func dial(srv *Server, password string, dialer Dialer) (*Client, error) { if strings.HasPrefix(line, "HI ") { str := strings.TrimSpace(line)[3:] - var hi map[string]interface{} - err = json.Unmarshal([]byte(str), &hi) + var hi HIv2 + err = util.JsonUnmarshal([]byte(str), &hi) if err != nil { conn.Close() return nil, err } - v, ok := hi["v"].(float64) - if ok { - if ExpectedProtocolVersion != int(v) { - fmt.Println("Warning: server and client protocol versions out of sync:", v, ExpectedProtocolVersion) - } + if ExpectedProtocolVersion != hi.V { + util.Infof("Warning: server and client protocol versions out of sync: want %d, got %d", ExpectedProtocolVersion, hi.V) } - salt, ok := hi["s"].(string) - if ok { - iter := 1 - iterVal, ok := hi["i"] - if ok { - iter = int(iterVal.(float64)) - } - + salt := hi.S + if salt != "" { + iter := hi.I client.PasswordHash = hash(password, salt, iter) } } else { @@ -303,7 +301,7 @@ func (c *Client) PushBulk(jobs []*Job) (map[string]string, error) { return nil, err } results := map[string]string{} - err = json.Unmarshal(data, &results) + err = util.JsonUnmarshal(data, &results) if err != nil { return nil, err } @@ -342,7 +340,7 @@ func (c *Client) Fetch(q ...string) (*Job, error) { } var job Job - err = json.Unmarshal(data, &job) + err = util.JsonUnmarshal(data, &job) if err != nil { return nil, err } @@ -421,7 +419,11 @@ func (c *Client) ResumeQueues(names ...string) error { return c.ok(c.rdr) } +// deprecated, this returns an untyped map. +// use CurrentState() instead which provides strong typing func (c *Client) Info() (map[string]interface{}, error) { + util.Info("client.Info() is deprecated, use client.CurrentState() instead") + err := c.writeLine(c.wtr, "INFO", nil) if err != nil { return nil, err @@ -435,42 +437,43 @@ func (c *Client) Info() (map[string]interface{}, error) { return nil, nil } - var hash map[string]interface{} - err = json.Unmarshal(data, &hash) + var cur map[string]interface{} + err = util.JsonUnmarshal(data, &cur) if err != nil { return nil, err } - return hash, nil + return cur, nil } -func (c *Client) QueueSizes() (map[string]uint64, error) { - hash, err := c.Info() +func (c *Client) CurrentState() (*FaktoryState, error) { + err := c.writeLine(c.wtr, "INFO", nil) if err != nil { return nil, err } - faktory, ok := hash["faktory"].(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("invalid info hash: %s", hash) + data, err := c.readResponse(c.rdr) + if err != nil { + return nil, err } - - queues, ok := faktory["queues"].(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("invalid info hash: %s", hash) + if len(data) == 0 { + return nil, nil } - sizes := make(map[string]uint64) - for name, size := range queues { - size, ok := size.(float64) - if !ok { - return nil, fmt.Errorf("invalid queue size: %v", size) - } - - sizes[name] = uint64(size) + var cur FaktoryState + err = util.JsonUnmarshal(data, &cur) + if err != nil { + return nil, err } + return &cur, nil +} - return sizes, nil +func (c *Client) QueueSizes() (map[string]uint64, error) { + state, err := c.CurrentState() + if err != nil { + return nil, err + } + return state.Data.Queues, nil } func (c *Client) Generic(cmdline string) (string, error) { diff --git a/client/client_test.go b/client/client_test.go index 4f20930..b3295b2 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -2,7 +2,6 @@ package client import ( "bufio" - "fmt" "log" "net" "os" @@ -122,13 +121,13 @@ func TestClientOperations(t *testing.T) { resp <- "$36\r\n{\"faktory\":{\"queues\":{\"default\":2}}}\r\n" sizes, err := cl.QueueSizes() assert.NoError(t, err) - assert.Equal(t, sizes["default"], uint64(2)) + assert.EqualValues(t, 2, sizes["default"]) assert.Contains(t, <-req, "INFO") resp <- "$39\r\n{\"faktory\":{\"queues\":{\"invalid\":null}}}\r\n" sizes, err = cl.QueueSizes() - assert.Error(t, err) - assert.Nil(t, sizes) + assert.NoError(t, err) + assert.EqualValues(t, 0, sizes["invalid"]) assert.Contains(t, <-req, "INFO") err = cl.Close() @@ -157,7 +156,6 @@ func withFakeServer(t *testing.T, fn func(chan string, chan string, string)) { buf := bufio.NewReader(conn) line, err := buf.ReadString('\n') if err != nil { - fmt.Println(err) conn.Close() break } diff --git a/client/faktory.go b/client/faktory.go index 110bfe1..4a2d49a 100644 --- a/client/faktory.go +++ b/client/faktory.go @@ -4,3 +4,30 @@ var ( Name = "Faktory" Version = "1.9.0" ) + +// Structs for parsing the INFO response +type FaktoryState struct { + Now string `json:"now"` + ServerUtcTime string `json:"server_utc_time"` + Data DataSnapshot `json:"faktory"` + Server ServerSnapshot `json:"server"` +} + +type DataSnapshot struct { + TotalFailures uint64 `json:"total_failures"` + TotalProcessed uint64 `json:"total_processed"` + TotalEnqueued uint64 `json:"total_enqueued"` + TotalQueues uint64 `json:"total_queues"` + Queues map[string]uint64 `json:"queues"` + Sets map[string]uint64 `json:"sets"` + Tasks map[string]map[string]interface{} `json:"tasks"` // deprecated +} + +type ServerSnapshot struct { + Description string `json:"description"` + Version string `json:"faktory_version"` + Uptime uint64 `json:"uptime"` + Connections uint64 `json:"connections"` + CommandCount uint64 `json:"command_count"` + UsedMemoryMB uint64 `json:"used_memory_mb"` +} diff --git a/client/tracking.go b/client/tracking.go index 271cfff..4f281f1 100644 --- a/client/tracking.go +++ b/client/tracking.go @@ -28,7 +28,7 @@ func (c *Client) TrackGet(jid string) (*JobTrack, error) { } var trck JobTrack - err = json.Unmarshal(data, &trck) + err = util.JsonUnmarshal(data, &trck) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index f7342fa..c1fab30 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module github.com/contribsys/faktory -go 1.21 +go 1.22 require ( github.com/BurntSushi/toml v0.4.1 github.com/justinas/nosurf v1.1.1 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.9.0 ) require github.com/redis/go-redis/v9 v9.2.0 diff --git a/go.sum b/go.sum index 3eb4964..6d4d06f 100644 --- a/go.sum +++ b/go.sum @@ -8,7 +8,6 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/contribsys/faktory_worker_go v1.6.0 h1:ov69BLHL62i/wRLJwvuj5UphwgjMOINRCGW3KzrKOjk= github.com/contribsys/faktory_worker_go v1.6.0/go.mod h1:XMNGn3sBJdqFGfTH4SkmYkMovhdkq5cDJj36wowfbNY= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -19,15 +18,9 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.2.0 h1:zwMdX0A4eVzse46YN18QhuDiM4uf3JmkOB4VZrdt5uI= github.com/redis/go-redis/v9 v9.2.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/manager/fetch.go b/manager/fetch.go index a138a97..de6e284 100644 --- a/manager/fetch.go +++ b/manager/fetch.go @@ -2,7 +2,6 @@ package manager import ( "context" - "encoding/json" "fmt" "time" @@ -158,7 +157,7 @@ func (el *simpleLease) Job() (*client.Job, error) { } if el.job == nil { var job client.Job - err := json.Unmarshal(el.payload, &job) + err := util.JsonUnmarshal(el.payload, &job) if err != nil { return nil, fmt.Errorf("cannot unmarshal job payload: %w", err) } diff --git a/manager/scheduler.go b/manager/scheduler.go index 7510f73..052cf3f 100644 --- a/manager/scheduler.go +++ b/manager/scheduler.go @@ -2,7 +2,6 @@ package manager import ( "context" - "encoding/json" "fmt" "time" @@ -38,7 +37,7 @@ func (m *manager) schedule(ctx context.Context, when time.Time, set storage.Sort for { count, err := set.RemoveBefore(ctx, util.Thens(when), 100, func(data []byte) error { var job client.Job - if err := json.Unmarshal(data, &job); err != nil { + if err := util.JsonUnmarshal(data, &job); err != nil { return fmt.Errorf("cannot unmarshal job payload: %w", err) } diff --git a/manager/working.go b/manager/working.go index 758cfd6..6215904 100644 --- a/manager/working.go +++ b/manager/working.go @@ -83,7 +83,7 @@ func (m *manager) loadWorkingSet(ctx context.Context) error { addedCount := 0 err := m.store.Working().Each(ctx, func(idx int, entry storage.SortedEntry) error { var res Reservation - err := json.Unmarshal(entry.Value(), &res) + err := util.JsonUnmarshal(entry.Value(), &res) if err != nil { // We can't return an error here, this method is best effort // as we are booting the server. We can't allow corrupted data @@ -193,7 +193,7 @@ func (m *manager) ReapExpiredJobs(ctx context.Context, when time.Time) (int64, e tm := util.Thens(when) count, err := m.store.Working().RemoveBefore(ctx, tm, 10, func(data []byte) error { var res Reservation - err := json.Unmarshal(data, &res) + err := util.JsonUnmarshal(data, &res) if err != nil { return fmt.Errorf("cannot unmarshal reservation payload: %w", err) } diff --git a/server/commands.go b/server/commands.go index 2d5db62..2108332 100644 --- a/server/commands.go +++ b/server/commands.go @@ -100,7 +100,7 @@ func pushBulk(c *Connection, s *Server, cmd string) { data := cmd[6:] jobs := make([]client.Job, 0) - err := json.Unmarshal([]byte(data), &jobs) + err := util.JsonUnmarshal([]byte(data), &jobs) if err != nil { _ = c.Error(cmd, fmt.Errorf("invalid JSON: %w", err)) return @@ -147,7 +147,7 @@ func push(c *Connection, s *Server, cmd string) { var job client.Job job.Retry = &client.RetryPolicyDefault - err := json.Unmarshal([]byte(data), &job) + err := util.JsonUnmarshal([]byte(data), &job) if err != nil { _ = c.Error(cmd, fmt.Errorf("invalid JSON: %w", err)) return @@ -201,7 +201,7 @@ func ack(c *Connection, s *Server, cmd string) { data := cmd[4:] var hash map[string]string - err := json.Unmarshal([]byte(data), &hash) + err := util.JsonUnmarshal([]byte(data), &hash) if err != nil { _ = c.Error(cmd, fmt.Errorf("invalid ACK %s", data)) return @@ -225,7 +225,7 @@ func fail(c *Connection, s *Server, cmd string) { data := cmd[5:] var failure manager.FailPayload - err := json.Unmarshal([]byte(data), &failure) + err := util.JsonUnmarshal([]byte(data), &failure) if err != nil { _ = c.Error(cmd, fmt.Errorf("invalid FAIL %s", data)) return @@ -266,7 +266,7 @@ func heartbeat(c *Connection, s *Server, cmd string) { data := cmd[5:] var beat ClientBeat - err := json.Unmarshal([]byte(data), &beat) + err := util.JsonUnmarshal([]byte(data), &beat) if err != nil { _ = c.Error(cmd, fmt.Errorf("invalid BEAT %s", data)) return diff --git a/server/mutate.go b/server/mutate.go index 6ad0071..df666e4 100644 --- a/server/mutate.go +++ b/server/mutate.go @@ -2,7 +2,6 @@ package server import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -10,6 +9,7 @@ import ( "github.com/contribsys/faktory/client" "github.com/contribsys/faktory/manager" "github.com/contribsys/faktory/storage" + "github.com/contribsys/faktory/util" ) var ( @@ -119,7 +119,7 @@ func mutate(c *Connection, s *Server, cmd string) { var err error var op client.Operation - err = json.Unmarshal([]byte(parts[1]), &op) + err = util.JsonUnmarshal([]byte(parts[1]), &op) if err != nil { _ = c.Error(cmd, err) return diff --git a/server/mutate_test.go b/server/mutate_test.go index 4f0b91e..b8d5743 100644 --- a/server/mutate_test.go +++ b/server/mutate_test.go @@ -20,10 +20,6 @@ func TestMutateCommands(t *testing.T) { err = cl.Clear(faktory.Retries) assert.NoError(t, err) - hash, err := cl.Info() - assert.NoError(t, err) - assert.EqualValues(t, 0, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Scheduled"].(map[string]interface{})["size"]) - j := faktory.NewJob("AnotherJob", "truid:67123", 3) j.At = util.Thens(time.Now().Add(10 * time.Second)) err = cl.Push(j) @@ -49,17 +45,17 @@ func TestMutateCommands(t *testing.T) { err = cl.Push(j) assert.NoError(t, err) - hash, err = cl.Info() + state, err := cl.CurrentState() assert.NoError(t, err) - assert.EqualValues(t, 4, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Scheduled"].(map[string]interface{})["size"]) + assert.EqualValues(t, 4, state.Data.Sets["scheduled"]) err = cl.Discard(faktory.Scheduled, faktory.OfType("SomeJob").Matching("*uid:67123*")) assert.NoError(t, err) - hash, err = cl.Info() + state, err = cl.CurrentState() assert.NoError(t, err) - assert.EqualValues(t, 3, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Scheduled"].(map[string]interface{})["size"]) - assert.EqualValues(t, 0, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Dead"].(map[string]interface{})["size"]) + assert.EqualValues(t, 3, state.Data.Sets["scheduled"]) + assert.EqualValues(t, 0, state.Data.Sets["dead"]) err = cl.Kill(faktory.Scheduled, faktory.OfType("AnotherJob")) assert.NoError(t, err) @@ -67,26 +63,27 @@ func TestMutateCommands(t *testing.T) { err = cl.Kill("", faktory.OfType("AnotherJob")) assert.Error(t, err) - hash, err = cl.Info() + state, err = cl.CurrentState() assert.NoError(t, err) - assert.EqualValues(t, 2, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Scheduled"].(map[string]interface{})["size"]) - assert.EqualValues(t, 1, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Dead"].(map[string]interface{})["size"]) - assert.EqualValues(t, 1, hash["faktory"].(map[string]interface{})["queues"].(map[string]interface{})["default"]) + assert.EqualValues(t, 2, state.Data.Sets["scheduled"]) + assert.EqualValues(t, 1, state.Data.Sets["dead"]) + assert.EqualValues(t, 1, state.Data.Queues["default"]) err = cl.Requeue(faktory.Scheduled, faktory.WithJids(targetJid)) assert.NoError(t, err) - hash, err = cl.Info() + state, err = cl.CurrentState() assert.NoError(t, err) - assert.EqualValues(t, 1, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Scheduled"].(map[string]interface{})["size"]) - assert.EqualValues(t, 2, hash["faktory"].(map[string]interface{})["queues"].(map[string]interface{})["default"]) - assert.EqualValues(t, 1, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Dead"].(map[string]interface{})["size"]) + assert.EqualValues(t, 1, state.Data.Sets["scheduled"]) + assert.EqualValues(t, 1, state.Data.Sets["dead"]) + assert.EqualValues(t, 2, state.Data.Queues["default"]) err = cl.Clear(faktory.Dead) assert.NoError(t, err) - hash, err = cl.Info() + + state, err = cl.CurrentState() assert.NoError(t, err) - assert.EqualValues(t, 0, hash["faktory"].(map[string]interface{})["tasks"].(map[string]interface{})["Dead"].(map[string]interface{})["size"]) + assert.EqualValues(t, 0, state.Data.Sets["dead"]) }) } diff --git a/server/server.go b/server/server.go index a7e0f92..6aefe58 100644 --- a/server/server.go +++ b/server/server.go @@ -403,51 +403,71 @@ func (s *Server) processLines(conn *Connection) { } } -func (s *Server) uptimeInSeconds() int { - return int(time.Since(s.Stats.StartedAt).Seconds()) +func (s *Server) uptimeInSeconds() uint64 { + return uint64(time.Since(s.Stats.StartedAt).Seconds()) } -func (s *Server) CurrentState() (map[string]interface{}, error) { +func (s *Server) CurrentState() (*client.FaktoryState, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + queueCmd := map[string]*redis.IntCmd{} + setCmd := map[string]*redis.IntCmd{} _, err := s.store.Redis().Pipelined(ctx, func(pipe redis.Pipeliner) error { s.store.EachQueue(ctx, func(q storage.Queue) { queueCmd[q.Name()] = pipe.LLen(ctx, q.Name()) }) + setCmd["scheduled"] = pipe.ZCard(ctx, "scheduled") + setCmd["retries"] = pipe.ZCard(ctx, "retries") + setCmd["dead"] = pipe.ZCard(ctx, "dead") + setCmd["working"] = pipe.ZCard(ctx, "working") + setCmd["failures"] = pipe.IncrBy(ctx, "failures", 0) + setCmd["processed"] = pipe.IncrBy(ctx, "processed", 0) return nil }) if err != nil { return nil, err } - queues := map[string]int64{} - totalQueued := int64(0) + queues := map[string]uint64{} + totalQueued := uint64(0) totalQueues := len(queueCmd) for name, cmd := range queueCmd { - qsize := cmd.Val() + qsize, _ := cmd.Uint64() totalQueued += qsize queues[name] = qsize } - return map[string]interface{}{ - "now": util.Nows(), - "server_utc_time": time.Now().UTC().Format("15:04:05 UTC"), - "faktory": map[string]interface{}{ - "total_failures": s.store.TotalFailures(ctx), - "total_processed": s.store.TotalProcessed(ctx), - "total_enqueued": totalQueued, - "total_queues": totalQueues, - "queues": queues, - "tasks": s.taskRunner.Stats(), + snap := &client.FaktoryState{ + Now: util.Nows(), + ServerUtcTime: time.Now().UTC().Format("15:04:05 UTC"), + Data: client.DataSnapshot{ + TotalFailures: size(setCmd["failures"]), + TotalProcessed: size(setCmd["processed"]), + TotalEnqueued: totalQueued, + TotalQueues: uint64(totalQueues), + Queues: queues, + Tasks: s.taskRunner.Stats(), + Sets: map[string]uint64{ + "scheduled": size(setCmd["scheduled"]), + "retries": size(setCmd["retries"]), + "dead": size(setCmd["dead"]), + "working": size(setCmd["working"]), + }, }, - "server": map[string]interface{}{ - "description": client.Name, - "faktory_version": client.Version, - "uptime": s.uptimeInSeconds(), - "connections": atomic.LoadUint64(&s.Stats.Connections), - "command_count": atomic.LoadUint64(&s.Stats.Commands), - "used_memory_mb": util.MemoryUsageMB(), + Server: client.ServerSnapshot{ + Description: client.Name, + Version: client.Version, + Uptime: s.uptimeInSeconds(), + Connections: atomic.LoadUint64(&s.Stats.Connections), + CommandCount: atomic.LoadUint64(&s.Stats.Commands), + UsedMemoryMB: util.MemoryUsageMB(), }, - }, nil + } + return snap, nil +} + +func size(cmd *redis.IntCmd) uint64 { + s, _ := cmd.Uint64() + return s } diff --git a/server/server_test.go b/server/server_test.go index 8f8b222..ee0c1aa 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/contribsys/faktory/storage" + "github.com/contribsys/faktory/util" "github.com/stretchr/testify/assert" ) @@ -104,7 +105,7 @@ func TestServerStart(t *testing.T) { assert.Regexp(t, "\"retry\":", result) hash := make(map[string]interface{}) - err = json.Unmarshal([]byte(result), &hash) + err = util.JsonUnmarshal([]byte(result), &hash) assert.NoError(t, err) // fmt.Println(hash) assert.Equal(t, "12345678901234567890abcd", hash["jid"]) @@ -127,7 +128,7 @@ func TestServerStart(t *testing.T) { assert.NoError(t, err) var stats map[string]interface{} - err = json.Unmarshal([]byte(result), &stats) + err = util.JsonUnmarshal([]byte(result), &stats) assert.NoError(t, err) assert.Equal(t, 4, len(stats)) diff --git a/server/workers.go b/server/workers.go index b22ee12..2a24938 100644 --- a/server/workers.go +++ b/server/workers.go @@ -1,7 +1,6 @@ package server import ( - "encoding/json" "io" "sync" "time" @@ -9,7 +8,6 @@ import ( "github.com/contribsys/faktory/util" ) -// // This represents a single client process. It may have many network // connections open to Faktory. // @@ -30,7 +28,7 @@ import ( // // A worker process has a simple three-state lifecycle: // -// running -> quiet -> terminate +// running -> quiet -> terminate // // - Running means the worker is alive and processing jobs. // - Quiet means the worker should stop FETCHing new jobs but continue working on existing jobs. @@ -47,7 +45,6 @@ import ( // // Workers will typically also respond to standard Unix signals. // faktory_worker_ruby uses TSTP ("Threads SToP") as the quiet signal and TERM as the terminate signal. -// type ClientData struct { Hostname string `json:"hostname"` Wid string `json:"wid"` @@ -98,7 +95,7 @@ func stateFromString(state string) WorkerState { func clientDataFromHello(data string) (*ClientData, error) { var client ClientData - err := json.Unmarshal([]byte(data), &client) + err := util.JsonUnmarshal([]byte(data), &client) if err != nil { return nil, err } diff --git a/storage/sorted_redis.go b/storage/sorted_redis.go index 99747c0..c7aabcc 100644 --- a/storage/sorted_redis.go +++ b/storage/sorted_redis.go @@ -154,7 +154,7 @@ func (e *setEntry) Job() (*client.Job, error) { } var job client.Job - err := json.Unmarshal(e.value, &job) + err := util.JsonUnmarshal(e.value, &job) if err != nil { return nil, err } diff --git a/test/go_system_test.go b/test/go_system_test.go index 928a640..8256bd3 100644 --- a/test/go_system_test.go +++ b/test/go_system_test.go @@ -135,12 +135,12 @@ func pushAndPop(t *testing.T, count int) { } util.Info("Done") - hash, err := cl.Info() + hash, err := cl.CurrentState() if err != nil { handleError(err) return } - util.Infof("%v", hash) + util.Infof("%+v", hash) } func pushJob(cl *client.Client, idx int) error { diff --git a/util/json.go b/util/json.go new file mode 100644 index 0000000..24a5994 --- /dev/null +++ b/util/json.go @@ -0,0 +1,32 @@ +package util + +import ( + "bytes" + "encoding/json" +) + +func Must[T any](obj T, err error) T { + if err != nil { + panic(err) + } + return obj +} + +var ( + // If true, activates encoding/json's Decoder and its UseNumber() + // option to preserve number precision. + // Defaults to false in Faktory 1.x. + // Will default to true in Faktory 2.x + JsonUseNumber bool = Faktory2Preview +) + +func JsonUnmarshal(data []byte, target any) error { + if !JsonUseNumber { + return json.Unmarshal(data, target) + } + + buf := bytes.NewBuffer(data) + dec := json.NewDecoder(buf) + dec.UseNumber() + return dec.Decode(target) +} diff --git a/util/util.go b/util/util.go index cd01467..c8a3e30 100644 --- a/util/util.go +++ b/util/util.go @@ -1,6 +1,7 @@ package util import ( + "cmp" "context" cryptorand "crypto/rand" "encoding/base64" @@ -8,6 +9,7 @@ import ( "math/big" "os" "runtime" + "strconv" "time" ) @@ -20,6 +22,11 @@ const ( maxInt63 = int64(^uint64(0) >> 1) ) +var ( + // Set FAKTORY2_PREVIEW=true to enable breaking changes coming in Faktory 2.0. + Faktory2Preview bool = Must(strconv.ParseBool(cmp.Or(os.Getenv("FAKTORY2_PREVIEW"), "false"))) +) + func Retryable(ctx context.Context, name string, count int, fn func() error) error { var err error for i := 0; i < count; i++ { diff --git a/webui/helpers.go b/webui/helpers.go index 3c3eee5..f6dcf55 100644 --- a/webui/helpers.go +++ b/webui/helpers.go @@ -143,7 +143,7 @@ func queueJobs(r *http.Request, q storage.Queue, count, currentPage uint64, fn f c := r.Context() err := q.Page(c, int64((currentPage-1)*count), int64(count), func(idx int, data []byte) error { var job client.Job - err := json.Unmarshal(data, &job) + err := util.JsonUnmarshal(data, &job) if err != nil { util.Warnf("Error parsing JSON: %s", string(data)) return err @@ -205,7 +205,7 @@ func busyReservations(req *http.Request, fn func(worker *manager.Reservation)) { c := req.Context() err := ctx(req).Store().Working().Each(c, func(idx int, entry storage.SortedEntry) error { var res manager.Reservation - err := json.Unmarshal(entry.Value(), &res) + err := util.JsonUnmarshal(entry.Value(), &res) if err != nil { util.Error("Cannot unmarshal reservation", err) } else { diff --git a/webui/helpers_test.go b/webui/helpers_test.go index e89c920..1d633c5 100644 --- a/webui/helpers_test.go +++ b/webui/helpers_test.go @@ -1,11 +1,11 @@ package webui import ( - "encoding/json" "fmt" "testing" "github.com/contribsys/faktory/client" + "github.com/contribsys/faktory/util" ) type testJob struct { @@ -47,7 +47,7 @@ func makeActiveJob(jobType string) *client.Job { } `, jobType)) job := &client.Job{} - err := json.Unmarshal(payload, job) + err := util.JsonUnmarshal(payload, job) if err != nil { panic(err) } @@ -99,7 +99,7 @@ func makeActionMailerJob(jobType string, mailerClass string, mailerMethod string } `, jobType, mailerClass, mailerMethod)) job := &client.Job{} - err := json.Unmarshal(payload, job) + err := util.JsonUnmarshal(payload, job) if err != nil { panic(err) } diff --git a/webui/pages.go b/webui/pages.go index fd42c18..586542b 100644 --- a/webui/pages.go +++ b/webui/pages.go @@ -13,12 +13,12 @@ import ( ) func statsHandler(w http.ResponseWriter, r *http.Request) { - hash, err := ctx(r).Server().CurrentState() + thing, err := ctx(r).Server().CurrentState() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - data, err := json.Marshal(hash) + data, err := json.Marshal(thing) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/webui/pages_test.go b/webui/pages_test.go index 33f9283..c18dc4d 100644 --- a/webui/pages_test.go +++ b/webui/pages_test.go @@ -58,19 +58,21 @@ func TestPages(t *testing.T) { assert.Equal(t, 200, w.Code) assert.Equal(t, "application/json", w.Header().Get("Content-Type")) - var content map[string]interface{} + var content client.FaktoryState err = json.Unmarshal(w.Body.Bytes(), &content) assert.NoError(t, err) - s := content["server"].(map[string]interface{}) - uid := s["uptime"].(float64) - assert.Equal(t, float64(1234567), uid) + uid := content.Server.Uptime + assert.NoError(t, err) + assert.EqualValues(t, 1234567, uid) - queues := content["faktory"].(map[string]interface{})["queues"].(map[string]interface{}) - defaultQ := queues["default"].(float64) - assert.Equal(t, 0.0, defaultQ) - foobarQ := queues["foobar"] - assert.Nil(t, foobarQ) + queues := content.Data.Queues + defaultQ := queues["default"] + assert.NoError(t, err) + assert.EqualValues(t, 0, defaultQ) + foobarQ, ok := queues["foobar"] + assert.False(t, ok) + assert.EqualValues(t, 0, foobarQ) }) t.Run("Queues", func(t *testing.T) { @@ -546,7 +548,6 @@ func findCSRFTokens(w http.ResponseWriter, body string) (string, string) { // parse body token results := searchBody.FindStringSubmatch(body) if len(results) > 1 { - fmt.Println(results) bodyToken = results[1] } @@ -556,7 +557,6 @@ func findCSRFTokens(w http.ResponseWriter, body string) (string, string) { results2 := searchCookie.FindStringSubmatch(rawCookie) if len(results2) > 1 { cookieToken = results2[1] - fmt.Println(rawCookie) } } return bodyToken, cookieToken