Skip to content

Commit

Permalink
Merge branch 'use_json_numbers'
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed May 1, 2024
2 parents 5671882 + c14dd84 commit 428506c
Show file tree
Hide file tree
Showing 24 changed files with 218 additions and 143 deletions.
4 changes: 3 additions & 1 deletion client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package client
import (
"encoding/json"
"fmt"

"github.com/contribsys/faktory/util"
)

type BatchStatus struct {
Expand Down Expand Up @@ -176,7 +178,7 @@ func (c *Client) BatchStatus(bid string) (*BatchStatus, error) {
}

var stat BatchStatus
err = json.Unmarshal(data, &stat)
err = util.JsonUnmarshal(data, &stat)
if err != nil {
return nil, err
}
Expand Down
85 changes: 44 additions & 41 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/contribsys/faktory/internal/pool"
"github.com/contribsys/faktory/util"
)

const (
Expand Down Expand Up @@ -152,7 +153,7 @@ func DefaultServer() *Server {
//
// Use the URL to configure any necessary password:
//
// tcp://:mypassword@localhost:7419
// tcp://:mypassword@localhost:7419
//
// By default Open assumes localhost with no password
// which is appropriate for local development.
Expand Down Expand Up @@ -182,8 +183,7 @@ func OpenWithDialer(dialer Dialer) (*Client, error) {
// a *tls.Dialer if "tcp+tls" and a *net.Dialer if
// not.
//
// client.Dial(client.Localhost, "topsecret")
//
// client.Dial(client.Localhost, "topsecret")
func Dial(srv *Server, password string) (*Client, error) {
d := &net.Dialer{Timeout: srv.Timeout}
dialer := Dialer(d)
Expand All @@ -198,6 +198,12 @@ func DialWithDialer(srv *Server, password string, dialer Dialer) (*Client, error
return dial(srv, password, dialer)
}

type HIv2 struct {
V int `json:"v"` // version, should be 2
I int `json:"i,omitempty"` // iterations
S string `json:"s,omitempty"` // salt
}

// dial connects to the remote faktory server.
func dial(srv *Server, password string, dialer Dialer) (*Client, error) {
client := emptyClientData()
Expand Down Expand Up @@ -227,27 +233,19 @@ func dial(srv *Server, password string, dialer Dialer) (*Client, error) {
if strings.HasPrefix(line, "HI ") {
str := strings.TrimSpace(line)[3:]

var hi map[string]interface{}
err = json.Unmarshal([]byte(str), &hi)
var hi HIv2
err = util.JsonUnmarshal([]byte(str), &hi)
if err != nil {
conn.Close()
return nil, err
}
v, ok := hi["v"].(float64)
if ok {
if ExpectedProtocolVersion != int(v) {
fmt.Println("Warning: server and client protocol versions out of sync:", v, ExpectedProtocolVersion)
}
if ExpectedProtocolVersion != hi.V {
util.Infof("Warning: server and client protocol versions out of sync: want %d, got %d", ExpectedProtocolVersion, hi.V)
}

salt, ok := hi["s"].(string)
if ok {
iter := 1
iterVal, ok := hi["i"]
if ok {
iter = int(iterVal.(float64))
}

salt := hi.S
if salt != "" {
iter := hi.I
client.PasswordHash = hash(password, salt, iter)
}
} else {
Expand Down Expand Up @@ -303,7 +301,7 @@ func (c *Client) PushBulk(jobs []*Job) (map[string]string, error) {
return nil, err
}
results := map[string]string{}
err = json.Unmarshal(data, &results)
err = util.JsonUnmarshal(data, &results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -342,7 +340,7 @@ func (c *Client) Fetch(q ...string) (*Job, error) {
}

var job Job
err = json.Unmarshal(data, &job)
err = util.JsonUnmarshal(data, &job)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -421,7 +419,11 @@ func (c *Client) ResumeQueues(names ...string) error {
return c.ok(c.rdr)
}

// deprecated, this returns an untyped map.
// use CurrentState() instead which provides strong typing
func (c *Client) Info() (map[string]interface{}, error) {
util.Info("client.Info() is deprecated, use client.CurrentState() instead")

err := c.writeLine(c.wtr, "INFO", nil)
if err != nil {
return nil, err
Expand All @@ -435,42 +437,43 @@ func (c *Client) Info() (map[string]interface{}, error) {
return nil, nil
}

var hash map[string]interface{}
err = json.Unmarshal(data, &hash)
var cur map[string]interface{}
err = util.JsonUnmarshal(data, &cur)
if err != nil {
return nil, err
}

return hash, nil
return cur, nil
}

func (c *Client) QueueSizes() (map[string]uint64, error) {
hash, err := c.Info()
func (c *Client) CurrentState() (*FaktoryState, error) {
err := c.writeLine(c.wtr, "INFO", nil)
if err != nil {
return nil, err
}

faktory, ok := hash["faktory"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid info hash: %s", hash)
data, err := c.readResponse(c.rdr)
if err != nil {
return nil, err
}

queues, ok := faktory["queues"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid info hash: %s", hash)
if len(data) == 0 {
return nil, nil
}

sizes := make(map[string]uint64)
for name, size := range queues {
size, ok := size.(float64)
if !ok {
return nil, fmt.Errorf("invalid queue size: %v", size)
}

sizes[name] = uint64(size)
var cur FaktoryState
err = util.JsonUnmarshal(data, &cur)
if err != nil {
return nil, err
}
return &cur, nil
}

return sizes, nil
func (c *Client) QueueSizes() (map[string]uint64, error) {
state, err := c.CurrentState()
if err != nil {
return nil, err
}
return state.Data.Queues, nil
}

func (c *Client) Generic(cmdline string) (string, error) {
Expand Down
8 changes: 3 additions & 5 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"bufio"
"fmt"
"log"
"net"
"os"
Expand Down Expand Up @@ -122,13 +121,13 @@ func TestClientOperations(t *testing.T) {
resp <- "$36\r\n{\"faktory\":{\"queues\":{\"default\":2}}}\r\n"
sizes, err := cl.QueueSizes()
assert.NoError(t, err)
assert.Equal(t, sizes["default"], uint64(2))
assert.EqualValues(t, 2, sizes["default"])
assert.Contains(t, <-req, "INFO")

resp <- "$39\r\n{\"faktory\":{\"queues\":{\"invalid\":null}}}\r\n"
sizes, err = cl.QueueSizes()
assert.Error(t, err)
assert.Nil(t, sizes)
assert.NoError(t, err)
assert.EqualValues(t, 0, sizes["invalid"])
assert.Contains(t, <-req, "INFO")

err = cl.Close()
Expand Down Expand Up @@ -157,7 +156,6 @@ func withFakeServer(t *testing.T, fn func(chan string, chan string, string)) {
buf := bufio.NewReader(conn)
line, err := buf.ReadString('\n')
if err != nil {
fmt.Println(err)
conn.Close()
break
}
Expand Down
27 changes: 27 additions & 0 deletions client/faktory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,30 @@ var (
Name = "Faktory"
Version = "1.9.0"
)

// Structs for parsing the INFO response
type FaktoryState struct {
Now string `json:"now"`
ServerUtcTime string `json:"server_utc_time"`
Data DataSnapshot `json:"faktory"`
Server ServerSnapshot `json:"server"`
}

type DataSnapshot struct {
TotalFailures uint64 `json:"total_failures"`
TotalProcessed uint64 `json:"total_processed"`
TotalEnqueued uint64 `json:"total_enqueued"`
TotalQueues uint64 `json:"total_queues"`
Queues map[string]uint64 `json:"queues"`
Sets map[string]uint64 `json:"sets"`
Tasks map[string]map[string]interface{} `json:"tasks"` // deprecated
}

type ServerSnapshot struct {
Description string `json:"description"`
Version string `json:"faktory_version"`
Uptime uint64 `json:"uptime"`
Connections uint64 `json:"connections"`
CommandCount uint64 `json:"command_count"`
UsedMemoryMB uint64 `json:"used_memory_mb"`
}
2 changes: 1 addition & 1 deletion client/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Client) TrackGet(jid string) (*JobTrack, error) {
}

var trck JobTrack
err = json.Unmarshal(data, &trck)
err = util.JsonUnmarshal(data, &trck)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/contribsys/faktory

go 1.21
go 1.22

require (
github.com/BurntSushi/toml v0.4.1
github.com/justinas/nosurf v1.1.1
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.9.0
)

require github.com/redis/go-redis/v9 v9.2.0
Expand Down
11 changes: 2 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/contribsys/faktory_worker_go v1.6.0 h1:ov69BLHL62i/wRLJwvuj5UphwgjMOINRCGW3KzrKOjk=
github.com/contribsys/faktory_worker_go v1.6.0/go.mod h1:XMNGn3sBJdqFGfTH4SkmYkMovhdkq5cDJj36wowfbNY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
Expand All @@ -19,15 +18,9 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.2.0 h1:zwMdX0A4eVzse46YN18QhuDiM4uf3JmkOB4VZrdt5uI=
github.com/redis/go-redis/v9 v9.2.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3 changes: 1 addition & 2 deletions manager/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package manager

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -158,7 +157,7 @@ func (el *simpleLease) Job() (*client.Job, error) {
}
if el.job == nil {
var job client.Job
err := json.Unmarshal(el.payload, &job)
err := util.JsonUnmarshal(el.payload, &job)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal job payload: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions manager/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package manager

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -38,7 +37,7 @@ func (m *manager) schedule(ctx context.Context, when time.Time, set storage.Sort
for {
count, err := set.RemoveBefore(ctx, util.Thens(when), 100, func(data []byte) error {
var job client.Job
if err := json.Unmarshal(data, &job); err != nil {
if err := util.JsonUnmarshal(data, &job); err != nil {
return fmt.Errorf("cannot unmarshal job payload: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions manager/working.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *manager) loadWorkingSet(ctx context.Context) error {
addedCount := 0
err := m.store.Working().Each(ctx, func(idx int, entry storage.SortedEntry) error {
var res Reservation
err := json.Unmarshal(entry.Value(), &res)
err := util.JsonUnmarshal(entry.Value(), &res)
if err != nil {
// We can't return an error here, this method is best effort
// as we are booting the server. We can't allow corrupted data
Expand Down Expand Up @@ -193,7 +193,7 @@ func (m *manager) ReapExpiredJobs(ctx context.Context, when time.Time) (int64, e
tm := util.Thens(when)
count, err := m.store.Working().RemoveBefore(ctx, tm, 10, func(data []byte) error {
var res Reservation
err := json.Unmarshal(data, &res)
err := util.JsonUnmarshal(data, &res)
if err != nil {
return fmt.Errorf("cannot unmarshal reservation payload: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions server/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func pushBulk(c *Connection, s *Server, cmd string) {
data := cmd[6:]
jobs := make([]client.Job, 0)

err := json.Unmarshal([]byte(data), &jobs)
err := util.JsonUnmarshal([]byte(data), &jobs)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid JSON: %w", err))
return
Expand Down Expand Up @@ -147,7 +147,7 @@ func push(c *Connection, s *Server, cmd string) {
var job client.Job
job.Retry = &client.RetryPolicyDefault

err := json.Unmarshal([]byte(data), &job)
err := util.JsonUnmarshal([]byte(data), &job)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid JSON: %w", err))
return
Expand Down Expand Up @@ -201,7 +201,7 @@ func ack(c *Connection, s *Server, cmd string) {
data := cmd[4:]

var hash map[string]string
err := json.Unmarshal([]byte(data), &hash)
err := util.JsonUnmarshal([]byte(data), &hash)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid ACK %s", data))
return
Expand All @@ -225,7 +225,7 @@ func fail(c *Connection, s *Server, cmd string) {
data := cmd[5:]

var failure manager.FailPayload
err := json.Unmarshal([]byte(data), &failure)
err := util.JsonUnmarshal([]byte(data), &failure)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid FAIL %s", data))
return
Expand Down Expand Up @@ -266,7 +266,7 @@ func heartbeat(c *Connection, s *Server, cmd string) {
data := cmd[5:]

var beat ClientBeat
err := json.Unmarshal([]byte(data), &beat)
err := util.JsonUnmarshal([]byte(data), &beat)
if err != nil {
_ = c.Error(cmd, fmt.Errorf("invalid BEAT %s", data))
return
Expand Down
Loading

0 comments on commit 428506c

Please sign in to comment.