diff --git a/broker/kafka.go b/broker/kafka.go index 4ef3dd1..a8dbbd0 100644 --- a/broker/kafka.go +++ b/broker/kafka.go @@ -140,7 +140,7 @@ func (k *KafkaBroker) Health() map[string]error { // Disconnect method func (k *KafkaBroker) Disconnect(ctx context.Context) error { - defer logger.LogWithDefer("kafka: disconnect...")() + defer logger.LogWithDefer("\x1b[33;5mkafka_broker\x1b[0m: disconnect...")() return k.Client.Close() } diff --git a/broker/rabbitmq.go b/broker/rabbitmq.go index 9fe8368..24aa598 100644 --- a/broker/rabbitmq.go +++ b/broker/rabbitmq.go @@ -138,26 +138,26 @@ func (r *RabbitMQBroker) Health() map[string]error { // Disconnect method func (r *RabbitMQBroker) Disconnect(ctx context.Context) error { - defer logger.LogWithDefer("rabbitmq: disconnect...")() + defer logger.LogWithDefer("\x1b[33;5mrabbitmq_broker\x1b[0m: disconnect...")() return r.Conn.Close() } -// rabbitMQPublisher rabbitmq -type rabbitMQPublisher struct { +// RabbitMQPublisher rabbitmq +type RabbitMQPublisher struct { conn *amqp.Connection exchange string } // NewRabbitMQPublisher setup only rabbitmq publisher with client connection -func NewRabbitMQPublisher(conn *amqp.Connection, exchange string) *rabbitMQPublisher { - return &rabbitMQPublisher{ +func NewRabbitMQPublisher(conn *amqp.Connection, exchange string) *RabbitMQPublisher { + return &RabbitMQPublisher{ conn: conn, exchange: exchange, } } // PublishMessage method -func (r *rabbitMQPublisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) { +func (r *RabbitMQPublisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) { trace, _ := tracer.StartTraceWithContext(ctx, "rabbitmq:publish_message") defer func() { if r := recover(); r != nil { diff --git a/broker/redis.go b/broker/redis.go index f5258db..7a96946 100644 --- a/broker/redis.go +++ b/broker/redis.go @@ -113,7 +113,7 @@ func (r *RedisBroker) Health() map[string]error { // Disconnect method func (r *RedisBroker) Disconnect(ctx context.Context) error { - defer logger.LogWithDefer("redis: closing pool...")() + defer logger.LogWithDefer("\x1b[33;5mredis_broker\x1b[0m: closing pool...")() return r.Pool.Close() } diff --git a/codebase/app/cron_worker/cron_worker.go b/codebase/app/cron_worker/cron_worker.go index 5c82911..922e9cb 100644 --- a/codebase/app/cron_worker/cron_worker.go +++ b/codebase/app/cron_worker/cron_worker.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "reflect" + "strings" "sync" "time" @@ -136,7 +137,8 @@ func (c *cronWorker) Serve() { func (c *cronWorker) Shutdown(ctx context.Context) { defer func() { - log.Println("\x1b[33;1mStopping Cron Job Scheduler:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m") + fmt.Printf("\r%s \x1b[33;1mStopping Cron Job Scheduler:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n", + time.Now().Format(candihelper.TimeFormatLogger), strings.Repeat(" ", 20)) }() if len(c.activeJobs) == 0 { @@ -149,9 +151,11 @@ func (c *cronWorker) Shutdown(ctx context.Context) { for _, sem := range c.semaphore { runningJob += len(sem) } + waitingJob := "... " if runningJob != 0 { - fmt.Printf("\x1b[34;1mCron Job Scheduler:\x1b[0m waiting %d job until done...\n", runningJob) + waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob) } + fmt.Printf("\r%s \x1b[33;1mStopping Cron Job Scheduler:\x1b[0m %s", time.Now().Format(candihelper.TimeFormatLogger), waitingJob) c.wg.Wait() c.ctxCancelFunc() diff --git a/codebase/app/kafka_worker/default_handler.go b/codebase/app/kafka_worker/default_handler.go index 82dfecf..b1baaf7 100644 --- a/codebase/app/kafka_worker/default_handler.go +++ b/codebase/app/kafka_worker/default_handler.go @@ -41,7 +41,6 @@ func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, clai for { select { case message := <-claim.Messages(): - c.processMessage(session, message) case <-session.Context().Done(): @@ -52,6 +51,9 @@ func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, clai } func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage) { + if message == nil { + return + } handler, ok := c.handlerFuncs[message.Topic] if !ok { return @@ -95,7 +97,8 @@ func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, me trace.Log("message", message.Value) if c.opt.debugMode { - log.Printf("\x1b[35;3mKafka Consumer%s: message consumed, timestamp = %v, topic = %s\x1b[0m", getWorkerTypeLog(c.bk.WorkerType), message.Timestamp, message.Topic) + log.Printf("\x1b[35;3mKafka Consumer%s: message consumed, timestamp = %v, topic = %s, partition = %d, offset = %d\x1b[0m", + getWorkerTypeLog(c.bk.WorkerType), message.Timestamp, message.Topic, message.Partition, message.Offset) } eventContext := c.messagePool.Get().(*candishared.EventContext) diff --git a/codebase/app/kafka_worker/kafka_worker.go b/codebase/app/kafka_worker/kafka_worker.go index 1ffe329..e131f1a 100644 --- a/codebase/app/kafka_worker/kafka_worker.go +++ b/codebase/app/kafka_worker/kafka_worker.go @@ -11,6 +11,7 @@ import ( "github.com/IBM/sarama" "github.com/golangid/candi/broker" + "github.com/golangid/candi/candihelper" "github.com/golangid/candi/candishared" "github.com/golangid/candi/codebase/factory" "github.com/golangid/candi/codebase/factory/types" @@ -117,8 +118,12 @@ func (h *kafkaWorker) Serve() { } func (h *kafkaWorker) Shutdown(ctx context.Context) { - defer log.Printf("\x1b[33;1mStopping Kafka Consumer%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(h.bk.WorkerType)) + defer func() { + fmt.Printf("\r%s \x1b[33;1mStopping Kafka Consumer%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n", + time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(h.bk.WorkerType), strings.Repeat(" ", 20)) + }() + fmt.Printf("\r%s \x1b[33;1mStopping Kafka Consumer%s:\x1b[0m ... ", time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(h.bk.WorkerType)) h.cancelFunc() h.engine.Close() } diff --git a/codebase/app/postgres_worker/postgres_worker.go b/codebase/app/postgres_worker/postgres_worker.go index e8bdad6..1faa514 100644 --- a/codebase/app/postgres_worker/postgres_worker.go +++ b/codebase/app/postgres_worker/postgres_worker.go @@ -7,7 +7,9 @@ import ( "fmt" "log" "reflect" + "strings" "sync" + "time" "github.com/golangid/candi/candihelper" "github.com/golangid/candi/candishared" @@ -159,7 +161,8 @@ func (p *postgresWorker) Serve() { func (p *postgresWorker) Shutdown(ctx context.Context) { defer func() { - log.Printf("\x1b[33;1mStopping Postgres Event Listener%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(p.opt.workerType)) + fmt.Printf("\r%s \x1b[33;1mPostgres Event Listener%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n", + time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(p.opt.workerType), strings.Repeat(" ", 20)) }() p.shutdown <- struct{}{} @@ -167,9 +170,12 @@ func (p *postgresWorker) Shutdown(ctx context.Context) { for _, sem := range p.semaphores { runningJob += len(sem) } + waitingJob := "... " if runningJob != 0 { - fmt.Printf("\x1b[34;1mPostgres Event Listener%s:\x1b[0m waiting %d job until done...\n", getWorkerTypeLog(p.opt.workerType), runningJob) + waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob) } + fmt.Printf("\r%s \x1b[33;1mPostgres Event Listener%s:\x1b[0m %s", + time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(p.opt.workerType), waitingJob) for _, source := range p.opt.sources { source.listener.Close() diff --git a/codebase/app/rabbitmq_worker/rabbitmq_worker.go b/codebase/app/rabbitmq_worker/rabbitmq_worker.go index 3233c08..f745d1a 100644 --- a/codebase/app/rabbitmq_worker/rabbitmq_worker.go +++ b/codebase/app/rabbitmq_worker/rabbitmq_worker.go @@ -6,7 +6,9 @@ import ( "fmt" "log" "reflect" + "strings" "sync" + "time" "github.com/golangid/candi/broker" "github.com/golangid/candi/candihelper" @@ -114,17 +116,23 @@ func (r *rabbitmqWorker) Serve() { } func (r *rabbitmqWorker) Shutdown(ctx context.Context) { - defer log.Printf("\x1b[33;1mStopping RabbitMQ Worker%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(r.bk.WorkerType)) + defer func() { + fmt.Printf("\r%s \x1b[33;1mStopping RabbitMQ Worker%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n", + time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), strings.Repeat(" ", 20)) + }() r.shutdown <- struct{}{} r.isShutdown = true - var runningJob int + runningJob := 0 for _, sem := range r.semaphore { runningJob += len(sem) } + waitingJob := "... " if runningJob != 0 { - fmt.Printf("\x1b[34;1mRabbitMQ Worker%s:\x1b[0m waiting %d job until done...\x1b[0m\n", getWorkerTypeLog(r.bk.WorkerType), runningJob) + waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob) } + fmt.Printf("\r%s \x1b[33;1mStopping RabbitMQ Worker%s:\x1b[0m %s", + time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), waitingJob) r.wg.Wait() r.bk.Channel.Close() diff --git a/codebase/app/redis_worker/redis_worker.go b/codebase/app/redis_worker/redis_worker.go index 83050ba..f6c60a5 100644 --- a/codebase/app/redis_worker/redis_worker.go +++ b/codebase/app/redis_worker/redis_worker.go @@ -7,9 +7,12 @@ import ( "context" "fmt" "log" + "strings" "sync" + "time" "github.com/golangid/candi/broker" + "github.com/golangid/candi/candihelper" "github.com/golangid/candi/candishared" "github.com/golangid/candi/candiutils" "github.com/golangid/candi/codebase/factory" @@ -20,10 +23,6 @@ import ( "github.com/gomodule/redigo/redis" ) -var ( - refreshWorkerNotif, shutdown chan struct{} -) - type ( redisWorker struct { ctx context.Context @@ -84,8 +83,6 @@ func NewWorker(service factory.ServiceFactory, bk interfaces.Broker, opts ...Opt fmt.Printf("\x1b[34;1m⇨ Redis pubsub worker%s running with %d keys\x1b[0m\n\n", getWorkerTypeLog(workerInstance.bk.WorkerType), len(handlers)) } - shutdown = make(chan struct{}, 1) - workerInstance.handlers = handlers workerInstance.isHaveJob = len(handlers) != 0 workerInstance.ctx, workerInstance.ctxCancelFunc = context.WithCancel(context.Background()) @@ -129,20 +126,25 @@ func (r *redisWorker) Serve() { } func (r *redisWorker) Shutdown(ctx context.Context) { - defer log.Printf("\x1b[33;1mStopping Redis Subscriber%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m\n", getWorkerTypeLog(r.bk.WorkerType)) + defer func() { + fmt.Printf("\r%s \x1b[33;1mStopping Redis Subscriber%s:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n", + time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), strings.Repeat(" ", 20)) + }() if !r.isHaveJob { return } - shutdown <- struct{}{} runningJob := 0 for _, sem := range r.semaphore { runningJob += len(sem) } + waitingJob := "... " if runningJob != 0 { - fmt.Printf("\x1b[34;1mRedis Subscriber:\x1b[0m waiting %d job until done...\n", runningJob) + waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob) } + fmt.Printf("\r%s \x1b[33;1mStopping Redis Subscriber%s:\x1b[0m %s", + time.Now().Format(candihelper.TimeFormatLogger), getWorkerTypeLog(r.bk.WorkerType), waitingJob) r.wg.Wait() r.ctxCancelFunc() diff --git a/codebase/app/task_queue_worker/task_queue_worker.go b/codebase/app/task_queue_worker/task_queue_worker.go index e0858e1..5b3abd8 100644 --- a/codebase/app/task_queue_worker/task_queue_worker.go +++ b/codebase/app/task_queue_worker/task_queue_worker.go @@ -3,12 +3,12 @@ package taskqueueworker import ( "context" "fmt" - "log" "reflect" "strings" "sync" "time" + "github.com/golangid/candi/candihelper" "github.com/golangid/candi/candishared" "github.com/golangid/candi/codebase/factory" "github.com/golangid/candi/codebase/factory/types" @@ -183,7 +183,10 @@ func (t *taskQueueWorker) Serve() { } func (t *taskQueueWorker) Shutdown(ctx context.Context) { - defer log.Println("\x1b[33;1mStopping Task Queue Worker:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m") + defer func() { + fmt.Printf("\r%s \x1b[33;1mStopping Task Queue Worker:\x1b[0m \x1b[32;1mSUCCESS\x1b[0m%s\n", + time.Now().Format(candihelper.TimeFormatLogger), strings.Repeat(" ", 20)) + }() if len(t.registeredTaskWorkerIndex) == 0 { return @@ -194,13 +197,15 @@ func (t *taskQueueWorker) Shutdown(ctx context.Context) { t.isShutdown = true t.opt.locker.Reset(t.getLockKey("*")) - var runningJob int + runningJob := 0 for _, sem := range t.semaphore { runningJob += len(sem) } + waitingJob := "... " if runningJob != 0 { - fmt.Printf("\x1b[34;1mTask Queue Worker:\x1b[0m waiting %d job until done...\x1b[0m\n", runningJob) + waitingJob = fmt.Sprintf("waiting %d job until done... ", runningJob) } + fmt.Printf("\r%s \x1b[33;1mStopping Task Queue Worker:\x1b[0m %s", time.Now().Format(candihelper.TimeFormatLogger), waitingJob) t.wg.Wait() for _, task := range t.tasks { diff --git a/config/database/mongo.go b/config/database/mongo.go index 24abc9f..d3b8f58 100644 --- a/config/database/mongo.go +++ b/config/database/mongo.go @@ -45,7 +45,7 @@ func (m *mongoInstance) Health() map[string]error { } } func (m *mongoInstance) Disconnect(ctx context.Context) (err error) { - defer logger.LogWithDefer("mongodb: disconnect...")() + defer logger.LogWithDefer("\x1b[33;5mmongodb\x1b[0m: disconnect...")() if err := m.write.Client().Disconnect(ctx); err != nil { return err diff --git a/config/database/redis.go b/config/database/redis.go index 918eeb0..b71bb7c 100644 --- a/config/database/redis.go +++ b/config/database/redis.go @@ -57,7 +57,7 @@ func (m *redisInstance) Cache() interfaces.Cache { return m.cache } func (m *redisInstance) Disconnect(ctx context.Context) (err error) { - defer logger.LogWithDefer("redis: disconnect...")() + defer logger.LogWithDefer("\x1b[33;5mredis\x1b[0m: disconnect...")() if err := m.read.Close(); err != nil { return err diff --git a/config/database/sql.go b/config/database/sql.go index 6b8e805..876ab68 100644 --- a/config/database/sql.go +++ b/config/database/sql.go @@ -29,7 +29,7 @@ func (s *sqlInstance) Health() map[string]error { return mErr } func (s *sqlInstance) Disconnect(ctx context.Context) (err error) { - defer logger.LogWithDefer("sql: disconnect...")() + defer logger.LogWithDefer("\x1b[33;5msql\x1b[0m: disconnect...")() if err := s.read.Close(); err != nil { return err diff --git a/init.go b/init.go index 94c7a40..88c90d8 100644 --- a/init.go +++ b/init.go @@ -2,5 +2,5 @@ package candi const ( // Version of this library - Version = "v1.17.11" + Version = "v1.17.12" )