Skip to content

Commit

Permalink
Merge branch 'development' into en/add_support_for_encoded_forms
Browse files Browse the repository at this point in the history
  • Loading branch information
Umang01-hash authored Oct 4, 2024
2 parents d35b407 + 47069d0 commit c8c1c50
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 56 deletions.
4 changes: 2 additions & 2 deletions pkg/gofr/datasource/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (c *client) UseMetrics(metrics interface{}) {

// UseTracer sets the tracer for Clickhouse client.
func (c *client) UseTracer(tracer any) {
if tracer, ok := tracer.(trace.Tracer); ok {
c.tracer = tracer
if t, ok := tracer.(trace.Tracer); ok {
c.tracer = t
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/gofr/datasource/clickhouse/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.22
require (
github.com/ClickHouse/clickhouse-go/v2 v2.25.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/trace v1.26.0
go.uber.org/mock v0.4.0
)

Expand All @@ -22,8 +24,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
golang.org/x/sys v0.18.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
117 changes: 90 additions & 27 deletions pkg/gofr/datasource/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -49,7 +50,7 @@ i.e. by default observability features gets initialised when used with GoFr.
// client := New(config)
// client.UseLogger(loggerInstance)
// client.UseMetrics(metricsInstance)
// client.Connect()
// client.Connect().
func New(c Config) *Client {
return &Client{config: c}
}
Expand Down Expand Up @@ -101,28 +102,36 @@ func (c *Client) Connect() {

// InsertOne inserts a single document into the specified collection.
func (c *Client) InsertOne(ctx context.Context, collection string, document interface{}) (interface{}, error) {
defer c.sendOperationStats(&QueryLog{Query: "insertOne", Collection: collection, Filter: document}, time.Now())
tracerCtx, span := c.addTrace(ctx, "insertOne", collection)

return c.Database.Collection(collection).InsertOne(ctx, document)
result, err := c.Database.Collection(collection).InsertOne(tracerCtx, document)

defer c.sendOperationStats(&QueryLog{Query: "insertOne", Collection: collection, Filter: document}, time.Now(),
"insert", span)

return result, err
}

// InsertMany inserts multiple documents into the specified collection.
func (c *Client) InsertMany(ctx context.Context, collection string, documents []interface{}) ([]interface{}, error) {
defer c.sendOperationStats(&QueryLog{Query: "insertMany", Collection: collection, Filter: documents}, time.Now())
tracerCtx, span := c.addTrace(ctx, "insertMany", collection)

res, err := c.Database.Collection(collection).InsertMany(ctx, documents)
res, err := c.Database.Collection(collection).InsertMany(tracerCtx, documents)
if err != nil {
return nil, err
}

defer c.sendOperationStats(&QueryLog{Query: "insertMany", Collection: collection, Filter: documents}, time.Now(),
"insertMany", span)

return res.InsertedIDs, nil
}

// Find retrieves documents from the specified collection based on the provided filter and binds response to result.
func (c *Client) Find(ctx context.Context, collection string, filter, results interface{}) error {
defer c.sendOperationStats(&QueryLog{Query: "find", Collection: collection, Filter: filter}, time.Now())
tracerCtx, span := c.addTrace(ctx, "find", collection)

cur, err := c.Database.Collection(collection).Find(ctx, filter)
cur, err := c.Database.Collection(collection).Find(tracerCtx, filter)
if err != nil {
return err
}
Expand All @@ -133,94 +142,129 @@ func (c *Client) Find(ctx context.Context, collection string, filter, results in
return err
}

defer c.sendOperationStats(&QueryLog{Query: "find", Collection: collection, Filter: filter}, time.Now(), "find",
span)

return nil
}

// FindOne retrieves a single document from the specified collection based on the provided filter and binds response to result.
func (c *Client) FindOne(ctx context.Context, collection string, filter, result interface{}) error {
defer c.sendOperationStats(&QueryLog{Query: "findOne", Collection: collection, Filter: filter}, time.Now())
tracerCtx, span := c.addTrace(ctx, "findOne", collection)

b, err := c.Database.Collection(collection).FindOne(ctx, filter).Raw()
b, err := c.Database.Collection(collection).FindOne(tracerCtx, filter).Raw()
if err != nil {
return err
}

defer c.sendOperationStats(&QueryLog{Query: "findOne", Collection: collection, Filter: filter}, time.Now(),
"findOne", span)

return bson.Unmarshal(b, result)
}

// UpdateByID updates a document in the specified collection by its ID.
func (c *Client) UpdateByID(ctx context.Context, collection string, id, update interface{}) (int64, error) {
defer c.sendOperationStats(&QueryLog{Query: "updateByID", Collection: collection, ID: id, Update: update}, time.Now())
tracerCtx, span := c.addTrace(ctx, "updateByID", collection)

res, err := c.Database.Collection(collection).UpdateByID(tracerCtx, id, update)

res, err := c.Database.Collection(collection).UpdateByID(ctx, id, update)
defer c.sendOperationStats(&QueryLog{Query: "updateByID", Collection: collection, ID: id, Update: update}, time.Now(),
"updateByID", span)

return res.ModifiedCount, err
}

// UpdateOne updates a single document in the specified collection based on the provided filter.
func (c *Client) UpdateOne(ctx context.Context, collection string, filter, update interface{}) error {
defer c.sendOperationStats(&QueryLog{Query: "updateOne", Collection: collection, Filter: filter, Update: update}, time.Now())
tracerCtx, span := c.addTrace(ctx, "updateOne", collection)

_, err := c.Database.Collection(collection).UpdateOne(tracerCtx, filter, update)

_, err := c.Database.Collection(collection).UpdateOne(ctx, filter, update)
defer c.sendOperationStats(&QueryLog{Query: "updateOne", Collection: collection, Filter: filter, Update: update},
time.Now(), "updateOne", span)

return err
}

// UpdateMany updates multiple documents in the specified collection based on the provided filter.
func (c *Client) UpdateMany(ctx context.Context, collection string, filter, update interface{}) (int64, error) {
defer c.sendOperationStats(&QueryLog{Query: "updateMany", Collection: collection, Filter: filter, Update: update}, time.Now())
tracerCtx, span := c.addTrace(ctx, "updateMany", collection)

res, err := c.Database.Collection(collection).UpdateMany(ctx, filter, update)
res, err := c.Database.Collection(collection).UpdateMany(tracerCtx, filter, update)

defer c.sendOperationStats(&QueryLog{Query: "updateMany", Collection: collection, Filter: filter, Update: update}, time.Now(),
"updateMany", span)

return res.ModifiedCount, err
}

// CountDocuments counts the number of documents in the specified collection based on the provided filter.
func (c *Client) CountDocuments(ctx context.Context, collection string, filter interface{}) (int64, error) {
defer c.sendOperationStats(&QueryLog{Query: "countDocuments", Collection: collection, Filter: filter}, time.Now())
tracerCtx, span := c.addTrace(ctx, "countDocuments", collection)

result, err := c.Database.Collection(collection).CountDocuments(tracerCtx, filter)

defer c.sendOperationStats(&QueryLog{Query: "countDocuments", Collection: collection, Filter: filter}, time.Now(),
"countDocuments", span)

return c.Database.Collection(collection).CountDocuments(ctx, filter)
return result, err
}

// DeleteOne deletes a single document from the specified collection based on the provided filter.
func (c *Client) DeleteOne(ctx context.Context, collection string, filter interface{}) (int64, error) {
defer c.sendOperationStats(&QueryLog{Query: "deleteOne", Collection: collection, Filter: filter}, time.Now())
tracerCtx, span := c.addTrace(ctx, "deleteOne", collection)

res, err := c.Database.Collection(collection).DeleteOne(ctx, filter)
res, err := c.Database.Collection(collection).DeleteOne(tracerCtx, filter)
if err != nil {
return 0, err
}

defer c.sendOperationStats(&QueryLog{Query: "deleteOne", Collection: collection, Filter: filter}, time.Now(),
"deleteOne", span)

return res.DeletedCount, nil
}

// DeleteMany deletes multiple documents from the specified collection based on the provided filter.
func (c *Client) DeleteMany(ctx context.Context, collection string, filter interface{}) (int64, error) {
defer c.sendOperationStats(&QueryLog{Query: "deleteMany", Collection: collection, Filter: filter}, time.Now())
tracerCtx, span := c.addTrace(ctx, "deleteMany", collection)

res, err := c.Database.Collection(collection).DeleteMany(ctx, filter)
res, err := c.Database.Collection(collection).DeleteMany(tracerCtx, filter)
if err != nil {
return 0, err
}

defer c.sendOperationStats(&QueryLog{Query: "deleteMany", Collection: collection, Filter: filter}, time.Now(),
"deleteMany", span)

return res.DeletedCount, nil
}

// Drop drops the specified collection from the database.
func (c *Client) Drop(ctx context.Context, collection string) error {
defer c.sendOperationStats(&QueryLog{Query: "drop", Collection: collection}, time.Now())
tracerCtx, span := c.addTrace(ctx, "drop", collection)

err := c.Database.Collection(collection).Drop(tracerCtx)

return c.Database.Collection(collection).Drop(ctx)
defer c.sendOperationStats(&QueryLog{Query: "drop", Collection: collection}, time.Now(), "drop", span)

return err
}

// CreateCollection creates the specified collection in the database.
func (c *Client) CreateCollection(ctx context.Context, name string) error {
defer c.sendOperationStats(&QueryLog{Query: "createCollection", Collection: name}, time.Now())
tracerCtx, span := c.addTrace(ctx, "createCollection", name)

err := c.Database.CreateCollection(tracerCtx, name)

defer c.sendOperationStats(&QueryLog{Query: "createCollection", Collection: name}, time.Now(), "createCollection",
span)

return c.Database.CreateCollection(ctx, name)
return err
}

func (c *Client) sendOperationStats(ql *QueryLog, startTime time.Time) {
func (c *Client) sendOperationStats(ql *QueryLog, startTime time.Time, method string, span trace.Span) {
duration := time.Since(startTime).Milliseconds()

ql.Duration = duration
Expand All @@ -229,6 +273,11 @@ func (c *Client) sendOperationStats(ql *QueryLog, startTime time.Time) {

c.metrics.RecordHistogram(context.Background(), "app_mongo_stats", float64(duration), "hostname", c.uri,
"database", c.database, "type", ql.Query)

if span != nil {
defer span.End()
span.SetAttributes(attribute.Int64(fmt.Sprintf("mongo.%v.duration", method), duration))
}
}

type Health struct {
Expand Down Expand Up @@ -258,7 +307,7 @@ func (c *Client) HealthCheck(ctx context.Context) (any, error) {
}

func (c *Client) StartSession() (interface{}, error) {
defer c.sendOperationStats(&QueryLog{Query: "startSession"}, time.Now())
defer c.sendOperationStats(&QueryLog{Query: "startSession"}, time.Now(), "", nil)

s, err := c.Client().StartSession()
ses := &session{s}
Expand All @@ -280,3 +329,17 @@ type Transaction interface {
CommitTransaction(context.Context) error
EndSession(context.Context)
}

func (c *Client) addTrace(ctx context.Context, method, collection string) (context.Context, trace.Span) {
if c.tracer != nil {
contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("mongodb-%v", method))

span.SetAttributes(
attribute.String("mongo.collection", collection),
)

return contextWithTrace, span
}

return ctx, nil
}
Loading

0 comments on commit c8c1c50

Please sign in to comment.