Skip to content

Commit

Permalink
Update to use sfomuseum/go-pubsub v0.0.18 (#14)
Browse files Browse the repository at this point in the history
* update vendor deps, still testing

* update to use sfomuseum/go-pubsub v0.0.18

---------

Co-authored-by: sfomuseumbot <sfomuseumbot@localhost>
  • Loading branch information
thisisaaronland and sfomuseumbot authored Sep 4, 2024
1 parent 80f7780 commit 9fcb402
Show file tree
Hide file tree
Showing 1,471 changed files with 473,341 additions and 31,429 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
GOMOD=$(shell test -f "go.work" && echo "readonly" || echo "vendor")
LDFLAGS=-s -w

cli:
go build -mod vendor -ldflags="-s -w" -o bin/pubssed-broadcast cmd/pubssed-broadcast/main.go
go build -mod vendor -ldflags="-s -w" -o bin/pubssed-client cmd/pubssed-client/main.go
go build -mod vendor -ldflags="-s -w" -o bin/pubssed-server cmd/pubssed-server/main.go
go build -mod $(GOMOD) -ldflags="$(LDFLAGS)" -o bin/pubssed-broadcast cmd/pubssed-broadcast/main.go
go build -mod $(GOMOD) -ldflags="$(LDFLAGS)" -o bin/pubssed-client cmd/pubssed-client/main.go
go build -mod $(GOMOD) -ldflags="$(LDFLAGS)" -o bin/pubssed-server cmd/pubssed-server/main.go

docker:
docker build -t go-pubssed .
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ Documentation is incomplete still.
```
import (
"context"
"net/http"
"github.com/sfomuseum/go-pubsub/subscriber"
"github.com/whosonfirst/go-pubssed/broker"
"net/http"
)
ctx := context.Background()
Expand All @@ -40,8 +41,9 @@ _Note that all error handling has been removed for the sake of brevity._

```
import (
"github.com/whosonfirst/go-pubssed/listener"
"log"
"github.com/whosonfirst/go-pubssed/listener"
)
callback := func(msg string) error {
Expand Down
35 changes: 18 additions & 17 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package broker
import (
"context"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"time"

"github.com/sfomuseum/go-pubsub/subscriber"
Expand All @@ -16,19 +15,15 @@ type Broker struct {
messages chan string
new_clients chan chan string
bunk_clients chan chan string
Logger *log.Logger
}

func NewBroker() (*Broker, error) {

logger := log.New(os.Stdout, "[pubssed] ", log.LstdFlags)

b := Broker{
clients: make(map[chan string]bool),
messages: make(chan string),
new_clients: make(chan (chan string)),
bunk_clients: make(chan (chan string)),
Logger: logger,
}

return &b, nil
Expand All @@ -38,20 +33,22 @@ func (b *Broker) Start(ctx context.Context, sub subscriber.Subscriber) error {

// set up the SSE monitor

logger := slog.Default()
logger.Debug("Start broker", "subscriber", fmt.Sprintf("%T", sub))

go func() {

for {

select {

case <-ctx.Done():
// log.Println("Done")
logger.Debug("Broker received done signal, exiting")
return

case s := <-b.new_clients:

b.clients[s] = true
// log.Println("Added new client")

case s := <-b.bunk_clients:

Expand All @@ -64,11 +61,11 @@ func (b *Broker) Start(ctx context.Context, sub subscriber.Subscriber) error {

case msg := <-b.messages:

logger.Debug("Broadcast message to clients", "count", len(b.clients))

for s, _ := range b.clients {
s <- msg
}

// log.Printf("Broadcast message to %d clients", len(b.clients))
}
}
}()
Expand All @@ -78,7 +75,7 @@ func (b *Broker) Start(ctx context.Context, sub subscriber.Subscriber) error {
go func() {

// something something error handling...

logger.Debug("Listen for pub sub messages")
sub.Listen(ctx, b.messages)
}()

Expand All @@ -93,19 +90,24 @@ func (b *Broker) HandlerFuncWithTimeout(ttl *time.Duration) (http.HandlerFunc, e

f := func(w http.ResponseWriter, r *http.Request) {

logger := slog.Default()
logger = logger.With("remote addr", r.RemoteAddr)

if ttl != nil {
b.Logger.Printf("SSE start handler from %s with TTL %v", r.RemoteAddr, ttl)
} else {
b.Logger.Printf("SSE start handler from %s", r.RemoteAddr)
logger = logger.With("ttl", ttl)
}

t1 := time.Now()
logger.Debug("Start broker HTTP handler", "time", t1)

defer func() {
b.Logger.Printf("SSE finish handler from %s", r.RemoteAddr)
logger.Debug("Finish handler", "time", time.Since(t1))
}()

fl, ok := w.(http.Flusher)

if !ok {
logger.Error("Writer does not support streaming")
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
Expand All @@ -128,7 +130,6 @@ func (b *Broker) HandlerFuncWithTimeout(ttl *time.Duration) (http.HandlerFunc, e

go func() {
<-notify
b.Logger.Println("Received ctx.Done notification.")
b.bunk_clients <- messageChan
// Don't close(messageChan) since it's unnecessary and
// seems to cause CPU to spike to 100% Computers, amirite?
Expand All @@ -151,7 +152,7 @@ func (b *Broker) HandlerFuncWithTimeout(ttl *time.Duration) (http.HandlerFunc, e

select {
case <-notify:
b.Logger.Println("SSE stop handler")
logger.Debug("Handler received signal, stopping handler")
return
case msg := <-messageChan:
fmt.Fprintf(w, "data: %s\n\n", msg)
Expand Down
34 changes: 22 additions & 12 deletions cmd/pubssed-broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,56 @@ import (
"context"
"flag"
"fmt"
_ "log"
"log"
"log/slog"
"os"
"time"

"github.com/go-redis/redis/v8"
"github.com/sfomuseum/go-pubsub/publisher"
)

func main() {

var clients = flag.Int("clients", 200, "Number of concurrent clients")

var redis_host = flag.String("redis-host", "localhost", "Redis host")
var redis_port = flag.Int("redis-port", 6379, "Redis port")
var redis_channel = flag.String("redis-channel", "pubssed", "Redis channel")
var publisher_uri = flag.String("publisher-uri", "redis://?host=localhost&port=6379&channel=pubssed", "...")

var verbose = flag.Bool("verbose", false, "Enable verbose (debug) logging")

flag.Parse()

if *verbose {
slog.SetLogLoggerLevel(slog.LevelDebug)
slog.Debug("Verbose logging enabled")
}

ctx := context.Background()

redis_endpoint := fmt.Sprintf("%s:%d", *redis_host, *redis_port)
pub, err := publisher.NewPublisher(ctx, *publisher_uri)

redis_client := redis.NewClient(&redis.Options{
Addr: redis_endpoint,
})
if err != nil {
log.Fatalf("Failed to create publisher, %w", err)
}

defer redis_client.Close()
defer pub.Close()

ch := make(chan bool, *clients)

for i := 0; i < *clients; i++ {
ch <- true
}

logger := slog.Default()
logger = logger.With("publisher", *publisher_uri)

for {

<-ch

now := fmt.Sprintf("%v", time.Now())
// log.Println(*redis_channel, now)
redis_client.Publish(ctx, *redis_channel, now)
logger.Info("Publish", "message", now)

pub.Publish(ctx, now)

ch <- true
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/pubssed-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"log"
"log/slog"
"os"
"path/filepath"
"time"
Expand All @@ -18,8 +19,15 @@ func main() {
var append_root = flag.String("append-root", ".", "The destination to write log files if the 'append' callback is invoked.")
var retry = flag.Bool("retry-on-eof", false, "Try to reconnect to the SSE endpoint if an EOF error is triggered. This is sometimes necessary if an SSE endpoint is configured with a too-short HTTP timeout (for example if running behind an AWS load balancer).")

var verbose = flag.Bool("verbose", false, "Enable verbose (debug) logging")

flag.Parse()

if *verbose {
slog.SetLogLoggerLevel(slog.LevelDebug)
slog.Debug("Verbose logging enabled")
}

if *endpoint == "" {
log.Fatal("Missing pubssed endpoint")
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/pubssed-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"log"
"log/slog"
"net/http"
"os"

Expand All @@ -24,12 +25,19 @@ func main() {

var subscription_uri = flag.String("subscription-uri", "redis://?host=localhost&port=6379&channel=pubssed", "...")

var verbose = flag.Bool("verbose", false, "Enable verbose (debug) logging")
enable_demo := flag.Bool("enable-demo", false, "...")

flag.Parse()

if *verbose {
slog.SetLogLoggerLevel(slog.LevelDebug)
slog.Debug("Verbose logging enabled")
}

ctx := context.Background()

slog.Debug("SERVER", "sub", *subscription_uri)
sub, err := subscriber.NewSubscriber(ctx, *subscription_uri)

if err != nil {
Expand Down
58 changes: 39 additions & 19 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,33 +1,53 @@
module github.com/whosonfirst/go-pubssed

go 1.21
go 1.23

replace github.com/hpcloud/tail v1.0.0 => github.com/sfomuseum/tail v1.0.1
replace github.com/hpcloud/tail v1.0.0 => github.com/sfomuseum/tail v1.0.2

require (
github.com/go-redis/redis/v8 v8.11.5
github.com/sfomuseum/go-pubsub v0.0.15
)
require github.com/sfomuseum/go-pubsub v0.0.18

require (
github.com/aaronland/go-aws-auth v1.6.4 // indirect
github.com/aaronland/go-roster v1.0.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.30.5 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.27 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/cognitoidentity v1.25.5 // indirect
github.com/aws/aws-sdk-go-v2/service/iam v1.34.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/sns v1.31.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssm v1.52.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/google/wire v0.6.0 // indirect
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
github.com/hpcloud/tail v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/redis/go-redis/v9 v9.6.1 // indirect
go.opencensus.io v0.24.0 // indirect
gocloud.dev v0.36.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/api v0.151.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gocloud.dev v0.39.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9 // indirect
google.golang.org/api v0.191.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240812133136-8ffd90a71988 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
)
Loading

0 comments on commit 9fcb402

Please sign in to comment.