Skip to content

Commit

Permalink
enhance recover trace finish
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Jun 19, 2024
1 parent aeccc2e commit 0b5f83c
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 72 deletions.
9 changes: 6 additions & 3 deletions broker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,12 @@ func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher {
// PublishMessage method
func (p *kafkaPublisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) {
trace, _ := tracer.StartTraceWithContext(ctx, "kafka:publish_message")
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(panicMessage any) { err = fmt.Errorf("%v", panicMessage) }),
)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
}
trace.Finish(tracer.FinishWithError(err))
}()

var payload []byte
if len(args.Message) > 0 {
Expand Down
9 changes: 6 additions & 3 deletions broker/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,12 @@ func NewRabbitMQPublisher(conn *amqp.Connection, exchange string) *rabbitMQPubli
// PublishMessage method
func (r *rabbitMQPublisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) {
trace, _ := tracer.StartTraceWithContext(ctx, "rabbitmq:publish_message")
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(panicMessage any) { err = fmt.Errorf("%v", panicMessage) }),
)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
}
trace.Finish(tracer.FinishWithError(err))
}()

ch, err := r.conn.Channel()
if err != nil {
Expand Down
12 changes: 9 additions & 3 deletions codebase/app/cron_worker/cron_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,15 @@ func (c *cronWorker) processJob(job *Job) {
}
defer c.opt.locker.Unlock(c.getLockKey(job.HandlerName))

var err error
trace, ctx := tracer.StartTraceFromHeader(ctx, "CronScheduler", make(map[string]string, 0))
defer trace.Finish(tracer.FinishWithRecoverPanic(func(any) {}))
defer func() {
if r := recover(); r != nil {
trace.SetTag("panic", true)
err = fmt.Errorf("%v", r)
}
trace.Finish(tracer.FinishWithError(err))
}()

trace.SetTag("job_name", job.HandlerName)
trace.Log("job_param", job.Params)
Expand All @@ -195,9 +202,8 @@ func (c *cronWorker) processJob(job *Job) {
eventContext.Write([]byte(job.Params))

for _, handlerFunc := range job.Handler.HandlerFuncs {
if err := handlerFunc(eventContext); err != nil {
if err = handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
}
}
Expand Down
30 changes: 14 additions & 16 deletions codebase/app/grpc_server/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ func (i *interceptor) unaryTracerInterceptor(ctx context.Context, req interface{
}

trace, ctx := tracer.StartTraceFromHeader(ctx, "GRPC-Server", header)
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(message any) {
err = status.Errorf(codes.Aborted, "%v", message)
}),
tracer.FinishWithFunc(func() {
i.logInterceptor(start, err, info.FullMethod, "GRPC")
}),
)
defer func() {
if rec := recover(); rec != nil {
err = status.Errorf(codes.Aborted, "%v", rec)
}
i.logInterceptor(start, err, info.FullMethod, "GRPC")
trace.Finish(tracer.FinishWithError(err))
}()

trace.SetTag("method", info.FullMethod)
trace.Log("metadata", meta)
Expand Down Expand Up @@ -151,14 +150,13 @@ func (i *interceptor) streamTracerInterceptor(srv interface{}, stream grpc.Serve
}

trace, ctx := tracer.StartTraceFromHeader(ctx, "GRPC-STREAM", header)
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(message any) {
err = status.Errorf(codes.Aborted, "%v", message)
}),
tracer.FinishWithFunc(func() {
i.logInterceptor(start, err, info.FullMethod, "GRPC-STREAM")
}),
)
defer func() {
if rec := recover(); rec != nil {
err = status.Errorf(codes.Aborted, "%v", rec)
}
i.logInterceptor(start, err, info.FullMethod, "GRPC-STREAM")
trace.Finish(tracer.FinishWithError(err))
}()

trace.SetTag("method", info.FullMethod)
trace.Log("metadata", meta)
Expand Down
24 changes: 14 additions & 10 deletions codebase/app/kafka_worker/default_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafkaworker

import (
"fmt"
"log"
"strconv"
"strings"
Expand Down Expand Up @@ -70,15 +71,18 @@ func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, me
header[string(val.Key)] = string(val.Value)
}

var err error
trace, ctx := tracer.StartTraceFromHeader(ctx, "KafkaConsumer", header)
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(any) {}),
tracer.FinishWithFunc(func() {
if handler.AutoACK {
session.MarkMessage(message, "")
}
}),
)
defer func() {
if r := recover(); r != nil {
trace.SetTag("panic", true)
err = fmt.Errorf("%v", r)
}
if handler.AutoACK {
session.MarkMessage(message, "")
}
trace.Finish(tracer.FinishWithError(err))
}()

trace.SetTag("brokers", strings.Join(c.bk.BrokerHost, ","))
trace.SetTag("topic", message.Topic)
Expand All @@ -104,9 +108,9 @@ func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, me
eventContext.Write(message.Value)

for _, handlerFunc := range handler.HandlerFuncs {
if err := handlerFunc(eventContext); err != nil {
err = handlerFunc(eventContext)
if err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions codebase/app/postgres_worker/postgres_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,15 @@ func (p *postgresWorker) execEvent(workerIndex int, data *EventPayload) {
ctx = tracer.SkipTraceContext(ctx)
}

var err error
trace, ctx := tracer.StartTraceFromHeader(ctx, "PostgresEventListener", make(map[string]string, 0))
defer trace.Finish(tracer.FinishWithRecoverPanic(func(any) {}))
defer func() {
if r := recover(); r != nil {
trace.SetTag("panic", true)
err = fmt.Errorf("%v", r)
}
trace.Finish(tracer.FinishWithError(err))
}()

if p.opt.debugMode {
var sourceLog string
Expand Down Expand Up @@ -258,9 +265,8 @@ func (p *postgresWorker) execEvent(workerIndex int, data *EventPayload) {
trace.Log("payload", data)

for _, handlerFunc := range handler.HandlerFuncs {
if err := handlerFunc(eventContext); err != nil {
if err = handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
}
}
Expand Down
22 changes: 12 additions & 10 deletions codebase/app/rabbitmq_worker/rabbitmq_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,18 @@ func (r *rabbitmqWorker) processMessage(message amqp.Delivery) {
header[key] = string(candihelper.ToBytes(val))
}

var err error
trace, ctx := tracer.StartTraceFromHeader(ctx, "RabbitMQConsumer", header)
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(any) {}),
tracer.FinishWithFunc(func() {
if selectedHandler.AutoACK {
message.Ack(false)
}
}),
)
defer func() {
if r := recover(); r != nil {
trace.SetTag("panic", true)
err = fmt.Errorf("%v", r)
}
if selectedHandler.AutoACK {
message.Ack(false)
}
trace.Finish(tracer.FinishWithError(err))
}()

trace.SetTag("broker", candihelper.MaskingPasswordURL(r.bk.BrokerHost))
trace.SetTag("exchange", message.Exchange)
Expand All @@ -184,9 +187,8 @@ func (r *rabbitmqWorker) processMessage(message amqp.Delivery) {
eventContext.Write(message.Body)

for _, handlerFunc := range selectedHandler.HandlerFuncs {
if err := handlerFunc(eventContext); err != nil {
if err = handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion codebase/app/redis_worker/redis_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,13 @@ func (r *redisWorker) processMessage(param broker.RedisMessage) {
}

trace, ctx := tracer.StartTraceFromHeader(ctx, "RedisSubscriber", make(map[string]string, 0))
defer trace.Finish(tracer.FinishWithRecoverPanic(func(any) {}))
defer func() {
if r := recover(); r != nil {
trace.SetTag("panic", true)
err = fmt.Errorf("%v", r)
}
trace.Finish(tracer.FinishWithError(err))
}()

if r.opt.debugMode {
log.Printf("\x1b[35;3mRedis Key Expired Subscriber%s: executing event topic '%s'\x1b[0m", getWorkerTypeLog(r.bk.WorkerType), param.HandlerName)
Expand Down
9 changes: 5 additions & 4 deletions codebase/app/rest_server/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,12 @@ func HTTPMiddlewareTracer() func(http.Handler) http.Handler {
}

trace, ctx := tracer.StartTraceFromHeader(req.Context(), "REST-Server", header)
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(any) {
defer func() {
if rec := recover(); rec != nil {
wrapper.NewHTTPResponse(http.StatusInternalServerError, "Something error").JSON(rw)
}),
)
}
trace.Finish()
}()

httpDump, _ := httputil.DumpRequest(req, false)
trace.Log("http.request", httpDump)
Expand Down
29 changes: 14 additions & 15 deletions codebase/app/task_queue_worker/trigger_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,23 @@ func (t *taskQueueWorker) execJob(ctx context.Context, runningTask *Task) {
result, traceID string
})
go func(ctx context.Context, job Job) {
defer close(eventResultChan)
result := struct {
err error
result, traceID string
}{}

trace, ctx := tracer.StartTraceFromHeader(ctx, "TaskQueueWorker", make(map[string]string, 0))
defer trace.Finish(
tracer.FinishWithRecoverPanic(func(panicMessage any) {
eventResultChan <- struct {
err error
result, traceID string
}{err: fmt.Errorf("panic: %v", panicMessage), traceID: tracer.GetTraceID(ctx)}
}),
)
defer func() {
if r := recover(); r != nil {
trace.SetTag("panic", true)
result.err = fmt.Errorf("%v", r)
}
eventResultChan <- result
close(eventResultChan)
trace.Finish(tracer.FinishWithError(result.err))
}()

result.traceID = tracer.GetTraceID(ctx)
trace.SetTag("job_id", job.ID)
trace.SetTag("task_name", job.TaskName)
trace.SetTag("retries", job.Retries)
Expand All @@ -157,26 +162,20 @@ func (t *taskQueueWorker) execJob(ctx context.Context, runningTask *Task) {
eventContext.SetKey(job.ID)
eventContext.WriteString(job.Arguments)

result := struct {
err error
result, traceID string
}{traceID: tracer.GetTraceID(ctx)}
for i, h := range selectedHandler.HandlerFuncs {
if err := h(eventContext); err != nil {
if _, isRetry := err.(*candishared.ErrorRetrier); isRetry {
trace.SetTag("is_retry", true)
}
eventContext.SetError(err)
if i == 0 { // set for main handler
trace.SetError(err)
result.err = err
}
}
}
if respBuff := eventContext.GetResponse(); respBuff != nil {
result.result = respBuff.String()
}
eventResultChan <- result
}(ctx, job)

select {
Expand Down
22 changes: 18 additions & 4 deletions tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,24 @@ func (noopTracer) SetTag(key string, value interface{}) { return }
func (noopTracer) InjectRequestHeader(header map[string]string) { return }
func (noopTracer) SetError(err error) { return }
func (noopTracer) Log(key string, value interface{}) { return }
func (noopTracer) Finish(opts ...FinishOptionFunc) { return }
func (noopTracer) GetTraceID(ctx context.Context) (u string) { return }
func (noopTracer) GetTraceURL(ctx context.Context) (u string) { return }
func (noopTracer) Disconnect(ctx context.Context) error { return nil }
func (noopTracer) Finish(opts ...FinishOptionFunc) {
var finishOpt FinishOption
for _, opt := range opts {
opt(&finishOpt)
}
if finishOpt.RecoverFunc != nil {
if rec := recover(); rec != nil {
finishOpt.RecoverFunc(rec)
}
}
if finishOpt.OnFinish != nil {
finishOpt.OnFinish()
}
return
}
func (noopTracer) GetTraceID(ctx context.Context) (u string) { return }
func (noopTracer) GetTraceURL(ctx context.Context) (u string) { return }
func (noopTracer) Disconnect(ctx context.Context) error { return nil }
func (n noopTracer) StartSpan(ctx context.Context, opName string) Tracer {
n.ctx = ctx
return &n
Expand Down

0 comments on commit 0b5f83c

Please sign in to comment.