Skip to content

Commit

Permalink
enhance shutdown logging for all worker & db config
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Jul 5, 2024
1 parent 88bdd4d commit 4213f93
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 35 deletions.
2 changes: 1 addition & 1 deletion broker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (k *KafkaBroker) Health() map[string]error {

// Disconnect method
func (k *KafkaBroker) Disconnect(ctx context.Context) error {
defer logger.LogWithDefer("kafka: disconnect...")()
defer logger.LogWithDefer("\x1b[33;5mkafka_broker\x1b[0m: disconnect...")()

return k.Client.Close()
}
Expand Down
12 changes: 6 additions & 6 deletions broker/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,26 @@ func (r *RabbitMQBroker) Health() map[string]error {

// Disconnect method
func (r *RabbitMQBroker) Disconnect(ctx context.Context) error {
defer logger.LogWithDefer("rabbitmq: disconnect...")()
defer logger.LogWithDefer("\x1b[33;5mrabbitmq_broker\x1b[0m: disconnect...")()

return r.Conn.Close()
}

// rabbitMQPublisher rabbitmq
type rabbitMQPublisher struct {
// RabbitMQPublisher rabbitmq
type RabbitMQPublisher struct {
conn *amqp.Connection
exchange string
}

// NewRabbitMQPublisher setup only rabbitmq publisher with client connection
func NewRabbitMQPublisher(conn *amqp.Connection, exchange string) *rabbitMQPublisher {
return &rabbitMQPublisher{
func NewRabbitMQPublisher(conn *amqp.Connection, exchange string) *RabbitMQPublisher {
return &RabbitMQPublisher{
conn: conn, exchange: exchange,
}
}

// PublishMessage method
func (r *rabbitMQPublisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) {
func (r *RabbitMQPublisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) {
trace, _ := tracer.StartTraceWithContext(ctx, "rabbitmq:publish_message")
defer func() {
if r := recover(); r != nil {
Expand Down
2 changes: 1 addition & 1 deletion broker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *RedisBroker) Health() map[string]error {

// Disconnect method
func (r *RedisBroker) Disconnect(ctx context.Context) error {
defer logger.LogWithDefer("redis: closing pool...")()
defer logger.LogWithDefer("\x1b[33;5mredis_broker\x1b[0m: closing pool...")()

return r.Pool.Close()
}
Expand Down
8 changes: 6 additions & 2 deletions codebase/app/cron_worker/cron_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -136,7 +137,8 @@ func (c *cronWorker) Serve() {

func (c *cronWorker) Shutdown(ctx context.Context) {
defer func() {
log.Println("\x1b[33;1mStopping Cron Job Scheduler:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m")
fmt.Printf("\r%s \x1b[33;1mStopping Cron Job Scheduler:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n",
time.Now().Format(candihelper.TimeFormatLogger), strings.Repeat(" ", 20))
}()

if len(c.activeJobs) == 0 {
Expand All @@ -149,9 +151,11 @@ func (c *cronWorker) Shutdown(ctx context.Context) {
for _, sem := range c.semaphore {
runningJob += len(sem)
}
waitingJob := "... "
if runningJob != 0 {
fmt.Printf("\x1b[34;1mCron Job Scheduler:\x1b[0m waiting %d job until done...\n", runningJob)
waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob)
}
fmt.Printf("\r%s \x1b[33;1mStopping Cron Job Scheduler:\x1b[0m %s", time.Now().Format(candihelper.TimeFormatLogger), waitingJob)

c.wg.Wait()
c.ctxCancelFunc()
Expand Down
7 changes: 5 additions & 2 deletions codebase/app/kafka_worker/default_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, clai
for {
select {
case message := <-claim.Messages():

c.processMessage(session, message)

case <-session.Context().Done():
Expand All @@ -52,6 +51,9 @@ func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, clai
}

func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage) {
if message == nil {
return
}
handler, ok := c.handlerFuncs[message.Topic]
if !ok {
return
Expand Down Expand Up @@ -95,7 +97,8 @@ func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, me
trace.Log("message", message.Value)

if c.opt.debugMode {
log.Printf("\x1b[35;3mKafka Consumer%s: message consumed, timestamp = %v, topic = %s\x1b[0m", getWorkerTypeLog(c.bk.WorkerType), message.Timestamp, message.Topic)
log.Printf("\x1b[35;3mKafka Consumer%s: message consumed, timestamp = %v, topic = %s, partition = %d, offset = %d\x1b[0m",
getWorkerTypeLog(c.bk.WorkerType), message.Timestamp, message.Topic, message.Partition, message.Offset)
}

eventContext := c.messagePool.Get().(*candishared.EventContext)
Expand Down
7 changes: 6 additions & 1 deletion codebase/app/kafka_worker/kafka_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/IBM/sarama"
"github.com/golangid/candi/broker"
"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candishared"
"github.com/golangid/candi/codebase/factory"
"github.com/golangid/candi/codebase/factory/types"
Expand Down Expand Up @@ -117,8 +118,12 @@ func (h *kafkaWorker) Serve() {
}

func (h *kafkaWorker) Shutdown(ctx context.Context) {
defer log.Printf("\x1b[33;1mStopping Kafka Consumer%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(h.bk.WorkerType))
defer func() {
fmt.Printf("\r%s \x1b[33;1mStopping Kafka Consumer%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n",
time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(h.bk.WorkerType), strings.Repeat(" ", 20))
}()

fmt.Printf("\r%s \x1b[33;1mStopping Kafka Consumer%s:\x1b[0m ... ", time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(h.bk.WorkerType))
h.cancelFunc()
h.engine.Close()
}
Expand Down
10 changes: 8 additions & 2 deletions codebase/app/postgres_worker/postgres_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"fmt"
"log"
"reflect"
"strings"
"sync"
"time"

"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candishared"
Expand Down Expand Up @@ -159,17 +161,21 @@ func (p *postgresWorker) Serve() {

func (p *postgresWorker) Shutdown(ctx context.Context) {
defer func() {
log.Printf("\x1b[33;1mStopping Postgres Event Listener%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(p.opt.workerType))
fmt.Printf("\r%s \x1b[33;1mPostgres Event Listener%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n",
time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(p.opt.workerType), strings.Repeat(" ", 20))
}()

p.shutdown <- struct{}{}
runningJob := 0
for _, sem := range p.semaphores {
runningJob += len(sem)
}
waitingJob := "... "
if runningJob != 0 {
fmt.Printf("\x1b[34;1mPostgres Event Listener%s:\x1b[0m waiting %d job until done...\n", getWorkerTypeLog(p.opt.workerType), runningJob)
waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob)
}
fmt.Printf("\r%s \x1b[33;1mPostgres Event Listener%s:\x1b[0m %s",
time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(p.opt.workerType), waitingJob)

for _, source := range p.opt.sources {
source.listener.Close()
Expand Down
14 changes: 11 additions & 3 deletions codebase/app/rabbitmq_worker/rabbitmq_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"log"
"reflect"
"strings"
"sync"
"time"

"github.com/golangid/candi/broker"
"github.com/golangid/candi/candihelper"
Expand Down Expand Up @@ -114,17 +116,23 @@ func (r *rabbitmqWorker) Serve() {
}

func (r *rabbitmqWorker) Shutdown(ctx context.Context) {
defer log.Printf("\x1b[33;1mStopping RabbitMQ Worker%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(r.bk.WorkerType))
defer func() {
fmt.Printf("\r%s \x1b[33;1mStopping RabbitMQ Worker%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n",
time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), strings.Repeat(" ", 20))
}()

r.shutdown <- struct{}{}
r.isShutdown = true
var runningJob int
runningJob := 0
for _, sem := range r.semaphore {
runningJob += len(sem)
}
waitingJob := "... "
if runningJob != 0 {
fmt.Printf("\x1b[34;1mRabbitMQ Worker%s:\x1b[0m waiting %d job until done...\x1b[0m\n", getWorkerTypeLog(r.bk.WorkerType), runningJob)
waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob)
}
fmt.Printf("\r%s \x1b[33;1mStopping RabbitMQ Worker%s:\x1b[0m %s",
time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), waitingJob)

r.wg.Wait()
r.bk.Channel.Close()
Expand Down
20 changes: 11 additions & 9 deletions codebase/app/redis_worker/redis_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"

"github.com/golangid/candi/broker"
"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candishared"
"github.com/golangid/candi/candiutils"
"github.com/golangid/candi/codebase/factory"
Expand All @@ -20,10 +23,6 @@ import (
"github.com/gomodule/redigo/redis"
)

var (
refreshWorkerNotif, shutdown chan struct{}
)

type (
redisWorker struct {
ctx context.Context
Expand Down Expand Up @@ -84,8 +83,6 @@ func NewWorker(service factory.ServiceFactory, bk interfaces.Broker, opts ...Opt
fmt.Printf("\x1b[34;1m⇨ Redis pubsub worker%s running with %d keys\x1b[0m\n\n", getWorkerTypeLog(workerInstance.bk.WorkerType), len(handlers))
}

shutdown = make(chan struct{}, 1)

workerInstance.handlers = handlers
workerInstance.isHaveJob = len(handlers) != 0
workerInstance.ctx, workerInstance.ctxCancelFunc = context.WithCancel(context.Background())
Expand Down Expand Up @@ -129,20 +126,25 @@ func (r *redisWorker) Serve() {
}

func (r *redisWorker) Shutdown(ctx context.Context) {
defer log.Printf("\x1b[33;1mStopping Redis Subscriber%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(r.bk.WorkerType))
defer func() {
fmt.Printf("\r%s \x1b[33;1mStopping Redis Subscriber%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n",
time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), strings.Repeat(" ", 20))
}()

if !r.isHaveJob {
return
}

shutdown <- struct{}{}
runningJob := 0
for _, sem := range r.semaphore {
runningJob += len(sem)
}
waitingJob := "... "
if runningJob != 0 {
fmt.Printf("\x1b[34;1mRedis Subscriber:\x1b[0m waiting %d job until done...\n", runningJob)
waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob)
}
fmt.Printf("\r%s \x1b[33;1mStopping Redis Subscriber%s:\x1b[0m %s",
time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), waitingJob)

r.wg.Wait()
r.ctxCancelFunc()
Expand Down
13 changes: 9 additions & 4 deletions codebase/app/task_queue_worker/task_queue_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package taskqueueworker
import (
"context"
"fmt"
"log"
"reflect"
"strings"
"sync"
"time"

"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candishared"
"github.com/golangid/candi/codebase/factory"
"github.com/golangid/candi/codebase/factory/types"
Expand Down Expand Up @@ -183,7 +183,10 @@ func (t *taskQueueWorker) Serve() {
}

func (t *taskQueueWorker) Shutdown(ctx context.Context) {
defer log.Println("\x1b[33;1mStopping Task Queue Worker:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m")
defer func() {
fmt.Printf("\r%s \x1b[33;1mStopping Task Queue Worker:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n",
time.Now().Format(candihelper.TimeFormatLogger), strings.Repeat(" ", 20))
}()

if len(t.registeredTaskWorkerIndex) == 0 {
return
Expand All @@ -194,13 +197,15 @@ func (t *taskQueueWorker) Shutdown(ctx context.Context) {
t.isShutdown = true
t.opt.locker.Reset(t.getLockKey("*"))

var runningJob int
runningJob := 0
for _, sem := range t.semaphore {
runningJob += len(sem)
}
waitingJob := "... "
if runningJob != 0 {
fmt.Printf("\x1b[34;1mTask Queue Worker:\x1b[0m waiting %d job until done...\x1b[0m\n", runningJob)
waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob)
}
fmt.Printf("\r%s \x1b[33;1mStopping Task Queue Worker:\x1b[0m %s", time.Now().Format(candihelper.TimeFormatLogger), waitingJob)

t.wg.Wait()
for _, task := range t.tasks {
Expand Down
2 changes: 1 addition & 1 deletion config/database/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *mongoInstance) Health() map[string]error {
}
}
func (m *mongoInstance) Disconnect(ctx context.Context) (err error) {
defer logger.LogWithDefer("mongodb: disconnect...")()
defer logger.LogWithDefer("\x1b[33;5mmongodb\x1b[0m: disconnect...")()

if err := m.write.Client().Disconnect(ctx); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion config/database/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *redisInstance) Cache() interfaces.Cache {
return m.cache
}
func (m *redisInstance) Disconnect(ctx context.Context) (err error) {
defer logger.LogWithDefer("redis: disconnect...")()
defer logger.LogWithDefer("\x1b[33;5mredis\x1b[0m: disconnect...")()

if err := m.read.Close(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion config/database/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *sqlInstance) Health() map[string]error {
return mErr
}
func (s *sqlInstance) Disconnect(ctx context.Context) (err error) {
defer logger.LogWithDefer("sql: disconnect...")()
defer logger.LogWithDefer("\x1b[33;5msql\x1b[0m: disconnect...")()

if err := s.read.Close(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package candi

const (
// Version of this library
Version = "v1.17.11"
Version = "v1.17.12"
)

0 comments on commit 4213f93

Please sign in to comment.