diff --git a/pkg/gofr/datasource/clickhouse/clickhouse.go b/pkg/gofr/datasource/clickhouse/clickhouse.go index 9ab49d2e5..4f24ee5ac 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse.go @@ -3,9 +3,12 @@ package clickhouse import ( "context" "errors" + "fmt" "strings" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ClickHouse/clickhouse-go/v2" @@ -120,9 +123,13 @@ func pushDBMetrics(conn Conn, metrics Metrics) { // Exec should be used for DDL and simple statements. // It should not be used for larger inserts or query iterations. func (c *client) Exec(ctx context.Context, query string, args ...any) error { - defer c.sendOperationStats(time.Now(), "Exec", query, args...) + tracedCtx, span := c.addTrace(ctx, "exec", query) + + err := c.conn.Exec(tracedCtx, query, args...) - return c.conn.Exec(ctx, query, args...) + defer c.sendOperationStats(time.Now(), "Exec", query, "exec", span, args...) + + return err } // Select method allows a set of response rows to be marshaled into a slice of structs with a single invocation.. @@ -139,20 +146,29 @@ func (c *client) Exec(ctx context.Context, query string, args ...any) error { // // err = ctx.Clickhouse.Select(ctx, &user, "SELECT * FROM users") . func (c *client) Select(ctx context.Context, dest any, query string, args ...any) error { - defer c.sendOperationStats(time.Now(), "Select", query, args...) + tracedCtx, span := c.addTrace(ctx, "select", query) + + err := c.conn.Select(tracedCtx, dest, query, args...) - return c.conn.Select(ctx, dest, query, args...) + defer c.sendOperationStats(time.Now(), "Select", query, "select", span, args...) + + return err } // AsyncInsert allows the user to specify whether the client should wait for the server to complete the insert or // respond once the data has been received. func (c *client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error { - defer c.sendOperationStats(time.Now(), "AsyncInsert", query, args...) + tracedCtx, span := c.addTrace(ctx, "async-insert", query) + + err := c.conn.AsyncInsert(tracedCtx, query, wait, args...) - return c.conn.AsyncInsert(ctx, query, wait, args...) + defer c.sendOperationStats(time.Now(), "AsyncInsert", query, "async-insert", span, args...) + + return err } -func (c *client) sendOperationStats(start time.Time, methodType, query string, args ...interface{}) { +func (c *client) sendOperationStats(start time.Time, methodType, query string, method string, + span trace.Span, args ...interface{}) { duration := time.Since(start).Milliseconds() c.logger.Debug(&Log{ @@ -162,6 +178,11 @@ func (c *client) sendOperationStats(start time.Time, methodType, query string, a Args: args, }) + if span != nil { + defer span.End() + span.SetAttributes(attribute.Int64(fmt.Sprintf("clickhouse.%v.duration", method), duration)) + } + c.metrics.RecordHistogram(context.Background(), "app_clickhouse_stats", float64(duration), "hosts", c.config.Hosts, "database", c.config.Database, "type", getOperationType(query)) } @@ -198,3 +219,17 @@ func (c *client) HealthCheck(ctx context.Context) (any, error) { return &h, nil } + +func (c *client) addTrace(ctx context.Context, method, query string) (context.Context, trace.Span) { + if c.tracer != nil { + contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("clickhouse-%v", method)) + + span.SetAttributes( + attribute.String("clickhouse.query", query), + ) + + return contextWithTrace, span + } + + return ctx, nil +} diff --git a/pkg/gofr/datasource/clickhouse/clickhouse_test.go b/pkg/gofr/datasource/clickhouse/clickhouse_test.go index 4bd4861d7..51f6cf38f 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse_test.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse_test.go @@ -14,7 +14,7 @@ import ( "go.uber.org/mock/gomock" ) -func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) { +func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, *MockLogger, client) { t.Helper() ctrl := gomock.NewController(t) @@ -30,36 +30,37 @@ func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) Database: "test", }, logger: mockLogger, metrics: mockMetric} - return mockConn, mockMetric, c + return mockConn, mockMetric, mockLogger, c } func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) { - logs := stderrOutputForFunc(func() { - _, mockMetric, _ := getClickHouseTestConnection(t) - mockLogger := NewMockLogger(gomock.NewController(t)) + _, mockMetric, _, _ := getClickHouseTestConnection(t) + mockLogger := NewMockLogger(gomock.NewController(t)) - cl := New(Config{ - Hosts: "localhost:8000", - Username: "user", - Password: "pass", - Database: "test", - }) + cl := New(Config{ + Hosts: "localhost:8000", + Username: "user", + Password: "pass", + Database: "test", + }) - cl.UseLogger(mockLogger) - cl.UseMetrics(mockMetric) + cl.UseLogger(mockLogger) + cl.UseMetrics(mockMetric) - mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any()) - mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.") - mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.") - mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes() - mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes() + mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any()) + mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.") + mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.") + mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes() + mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test") + mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any()) - cl.Connect() + cl.Connect() - time.Sleep(100 * time.Millisecond) - }) + time.Sleep(100 * time.Millisecond) - assert.Contains(t, logs, "ping failed with error dial tcp [::1]:8000: connect: connection refused") + assert.True(t, mockLogger.ctrl.Satisfied()) + assert.True(t, mockMetric.ctrl.Satisfied()) } func stderrOutputForFunc(f func()) string { @@ -78,7 +79,7 @@ func stderrOutputForFunc(f func()) string { } func Test_ClickHouse_HealthUP(t *testing.T) { - mockConn, _, c := getClickHouseTestConnection(t) + mockConn, _, _, c := getClickHouseTestConnection(t) mockConn.EXPECT().Ping(gomock.Any()).Return(nil) @@ -88,7 +89,7 @@ func Test_ClickHouse_HealthUP(t *testing.T) { } func Test_ClickHouse_HealthDOWN(t *testing.T) { - mockConn, _, c := getClickHouseTestConnection(t) + mockConn, _, _, c := getClickHouseTestConnection(t) mockConn.EXPECT().Ping(gomock.Any()).Return(sql.ErrConnDone) @@ -100,13 +101,15 @@ func Test_ClickHouse_HealthDOWN(t *testing.T) { } func Test_ClickHouse_Exec(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) ctx := context.Background() mockConn.EXPECT().Exec(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", "8f165e2d-feef-416c-95f6-913ce3172e15", "gofr", "10").Return(nil) + mockLogger.EXPECT().Debug(gomock.Any()) + mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "INSERT") @@ -116,7 +119,7 @@ func Test_ClickHouse_Exec(t *testing.T) { } func Test_ClickHouse_Select(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) type User struct { ID string `ch:"id"` @@ -130,6 +133,8 @@ func Test_ClickHouse_Select(t *testing.T) { mockConn.EXPECT().Select(ctx, &user, "SELECT * FROM users").Return(nil) + mockLogger.EXPECT().Debug(gomock.Any()) + mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "SELECT") @@ -139,7 +144,7 @@ func Test_ClickHouse_Select(t *testing.T) { } func Test_ClickHouse_AsyncInsert(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) ctx := context.Background() @@ -149,6 +154,8 @@ func Test_ClickHouse_AsyncInsert(t *testing.T) { mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "INSERT") + mockLogger.EXPECT().Debug(gomock.Any()) + err := c.AsyncInsert(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", true, "8f165e2d-feef-416c-95f6-913ce3172e15", "user", "10") diff --git a/pkg/gofr/external_db.go b/pkg/gofr/external_db.go index 9b5103bdf..5ef8fb4e0 100644 --- a/pkg/gofr/external_db.go +++ b/pkg/gofr/external_db.go @@ -1,6 +1,7 @@ package gofr import ( + "go.opentelemetry.io/otel" "gofr.dev/pkg/gofr/container" "gofr.dev/pkg/gofr/datasource/file" ) @@ -52,6 +53,10 @@ func (a *App) AddClickhouse(db container.ClickhouseProvider) { db.UseLogger(a.Logger()) db.UseMetrics(a.Metrics()) + tracer := otel.GetTracerProvider().Tracer("gofr-clickhouse") + + db.UseTracer(tracer) + db.Connect() a.container.Clickhouse = db diff --git a/pkg/gofr/external_db_test.go b/pkg/gofr/external_db_test.go index ae5a4952c..b2f2adb06 100644 --- a/pkg/gofr/external_db_test.go +++ b/pkg/gofr/external_db_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel" "go.uber.org/mock/gomock" "gofr.dev/pkg/gofr/container" @@ -78,6 +79,7 @@ func TestApp_AddClickhouse(t *testing.T) { mock.EXPECT().UseLogger(app.Logger()) mock.EXPECT().UseMetrics(app.Metrics()) + mock.EXPECT().UseTracer(otel.GetTracerProvider().Tracer("gofr-clickhouse")) mock.EXPECT().Connect() app.AddClickhouse(mock)