-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
109 lines (88 loc) · 3.03 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
var modelSession *ModelSession
func init() {
var err error
modelSession, err = initSession()
if err != nil {
fmt.Printf("Failed to initialize session: %v\n", err)
return
}
}
func main() {
defer modelSession.Destroy()
imagePath := "car.png"
redisAddr := "localhost:6379"
streamName := "camera_stream"
consumerGroup := "camera_group"
numConsumers := 30 // Number of consumers
numPublishers := 20 // Number of publishers
client := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
defer client.Close()
// Ensure consumer group exists
ctx := context.Background()
err := client.XGroupCreateMkStream(ctx, streamName, consumerGroup, "$").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
fmt.Printf("Failed to create consumer group: %v\n", err)
return
}
fmt.Println("Consumer group created or already exists.")
consumerSemaphore := make(chan struct{}, numConsumers)
publisherSemaphore := make(chan struct{}, numPublishers)
var wg sync.WaitGroup
publisher := NewRedisPublisher(redisAddr, streamName)
for i := 0; i < numPublishers; i++ {
wg.Add(1)
go func(publisherName string) {
defer wg.Done()
publisherSemaphore <- struct{}{} // Acquire a semaphore slot
defer func() { <-publisherSemaphore }() // Release the semaphore slot
// Mimic publishing frames
n := 5 // Number of frames to publish
fmt.Printf("Publisher %s starting...\n", publisherName)
for i := 0; i < n; i++ {
err := publisher.Publish(ctx, map[string]interface{}{
"frame_path": imagePath,
"timestamp": time.Now().Format(time.RFC3339),
})
if err != nil {
fmt.Printf("Publisher %s failed to publish frame %d: %v\n", publisherName, i, err)
}
time.Sleep(500 * time.Millisecond) // Simulate a delay between frame publishing
}
fmt.Printf("Publisher %s finished.\n", publisherName)
}(fmt.Sprintf("publisher-%d", i+1))
}
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go func(consumerName string) {
defer wg.Done()
consumerSemaphore <- struct{}{} // Acquire a semaphore slot
defer func() { <-consumerSemaphore }() // Release the semaphore slot
consumer := NewRedisConsumer(redisAddr, streamName, consumerGroup, consumerName)
// Process messages with the consumer
consumer.ProcessMessages(ctx, func(msgID string, values map[string]interface{}) error {
framePath, ok := values["frame_path"].(string)
if !ok {
return fmt.Errorf("frame_path not found or invalid in message %s", msgID)
}
if err := RunModel(modelSession, framePath); err != nil {
fmt.Printf("Consumer %s error processing frame %s: %v\n", consumerName, framePath, err)
return nil // Continue processing other frames even if this one fails
}
fmt.Printf("Consumer %s processed frame %s from message %s\n", consumerName, framePath, msgID)
return nil
})
}(fmt.Sprintf("consumer-%d", i+1))
}
// Wait for all consumers and publishers to finish
wg.Wait()
}