Skip to content

Commit

Permalink
Store resource
Browse files Browse the repository at this point in the history
  • Loading branch information
hugoboos committed May 29, 2024
1 parent bd26f38 commit 3244585
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 18 deletions.
2 changes: 1 addition & 1 deletion libs/logboek-go/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestProcessingOperationFromContext(t *testing.T) {
ctx := context.Background()

operator := logboek.NewProcessingOperator(nil)
operator := logboek.NewProcessingOperator(logboek.Resource{}, nil)
ctx, op := operator.StartProcessing(ctx, "test")

assert.Same(t, op, logboek.ProcessingOperationFromContext(ctx))
Expand Down
4 changes: 4 additions & 0 deletions libs/logboek-go/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (h *GRPCProcessingOperationHandler) OnEnd(op *ProcessingOperation) {
}

req := proto_v1.ExportOperationsRequest{
Resource: &proto_v1.Resource{
Name: op.resource.Name,
Version: op.resource.Version,
},
Operations: []*proto_v1.ProcessingOperation{&out},
}

Expand Down
17 changes: 13 additions & 4 deletions libs/logboek-go/logboek.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type ProcessingOperation struct {
statusCode StatusCode
attributes []attribute.Attribute

handler ProcessingOperationHandler
resource Resource
handler ProcessingOperationHandler
}

func (op *ProcessingOperation) Start() {
Expand Down Expand Up @@ -97,7 +98,8 @@ func (op *ProcessingOperation) SetAttributes(attrs ...attribute.Attribute) {
}

type ProcessingOperator struct {
handler ProcessingOperationHandler
resource Resource
handler ProcessingOperationHandler
}

func (o *ProcessingOperator) StartProcessing(ctx context.Context, processingName string) (context.Context, *ProcessingOperation) {
Expand Down Expand Up @@ -126,6 +128,7 @@ func (o *ProcessingOperator) StartProcessing(ctx context.Context, processingName
operationID: operationID,
},
parentContext: parentContext,
resource: o.resource,
handler: o.handler,
}

Expand All @@ -136,8 +139,14 @@ func (o *ProcessingOperator) StartProcessing(ctx context.Context, processingName
return ctx, op
}

func NewProcessingOperator(handler ProcessingOperationHandler) *ProcessingOperator {
func NewProcessingOperator(resource Resource, handler ProcessingOperationHandler) *ProcessingOperator {
return &ProcessingOperator{
handler: handler,
resource: resource,
handler: handler,
}
}

type Resource struct {
Name string
Version string
}
2 changes: 1 addition & 1 deletion libs/logboek-go/logboek_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestStartProcessingParentContext(t *testing.T) {
ctx := context.Background()

operator := logboek.NewProcessingOperator(nil)
operator := logboek.NewProcessingOperator(logboek.Resource{}, nil)
ctx, a := operator.StartProcessing(ctx, "a")
_, b := operator.StartProcessing(ctx, "b")

Expand Down
22 changes: 16 additions & 6 deletions libs/logboek-python/src/logboek/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
from contextvars import ContextVar
from enum import Enum
from time import time_ns
from typing import Dict, Iterator, Optional
from typing import Dict, Iterator, NamedTuple, Optional


class Resource(NamedTuple):
name: str
version: str


class ProcessingContext:
Expand Down Expand Up @@ -35,11 +40,13 @@ class ProcessingOperation:
def __init__(
self,
name: str,
resource: Resource,
context: ProcessingContext,
parent_context: Optional[ProcessingContext] = None,
handler: ProcessingOperationHandler = ProcessingOperationHandler(),
):
self._name = name
self._resource = resource
self._context = context
self._parent_context = parent_context
self._handler = handler
Expand All @@ -64,7 +71,7 @@ def set_attribute(self, key: str, value: str) -> None:
self.set_attributes({key: value})


_NONE_PROCESSING = ProcessingOperation("", None)
_NONE_PROCESSING = ProcessingOperation("", None, None)
_CURRENT_PROCESSING = ContextVar("current-processing", default=_NONE_PROCESSING)


Expand All @@ -73,7 +80,8 @@ def get_current_proccessing() -> ProcessingOperation:


class ProcessingOperator:
def __init__(self, handler: ProcessingOperationHandler) -> None:
def __init__(self, resource: Resource, handler: ProcessingOperationHandler) -> None:
self._resource = resource
self._handler = handler

def start_proccessing(self, processing_name: str) -> ProcessingOperation:
Expand All @@ -87,7 +95,7 @@ def start_proccessing(self, processing_name: str) -> ProcessingOperation:

context = ProcessingContext(trace_id=trace_id, operation_id=self._generate_operation_id())

op = ProcessingOperation(processing_name, context, parent_context, self._handler)
op = ProcessingOperation(processing_name, self._resource, context, parent_context, self._handler)
op.start()

return op
Expand Down Expand Up @@ -117,12 +125,14 @@ def _generate_operation_id(self) -> int:
_PROCESSING_OPERATOR: Optional["ProcessingOperator"] = None


def init_processing_operator(handler: Optional[ProcessingOperationHandler] = ProcessingOperationHandler()):
def init_processing_operator(
resource: Resource, handler: Optional[ProcessingOperationHandler] = ProcessingOperationHandler()
):
global _PROCESSING_OPERATOR
if _PROCESSING_OPERATOR is not None:
raise RuntimeError("Processing operator already initialized")

_PROCESSING_OPERATOR = ProcessingOperator(handler)
_PROCESSING_OPERATOR = ProcessingOperator(resource, handler)


def get_processing_operator() -> ProcessingOperator:
Expand Down
5 changes: 4 additions & 1 deletion libs/logboek-python/src/logboek/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ def on_end(self, op: ProcessingOperation) -> None:
status_code=op._status_code.value,
attributes=attributes,
)
request = logboek_pb2.ExportOperationsRequest(operations=(pb_op,))

request = logboek_pb2.ExportOperationsRequest(
resource=logboek_pb2.Resource(name=op._resource.name, version=op._resource.version), operations=(pb_op,)
)

try:
self._client.Export(request)
Expand Down
14 changes: 12 additions & 2 deletions server/pkg/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@ type LogboekService struct {
}

func (s LogboekService) Export(ctx context.Context, in *proto_v1.ExportOperationsRequest) (resp *proto_v1.ExportOperationsResponse, err error) {
if in.Resource == nil {
return nil, status.Errorf(codes.InvalidArgument, "missing resource")
}

resource := storage.Resource{
Name: in.Resource.Name,
Version: in.Resource.Version,
}

for _, reqOp := range in.Operations {
log.Printf("name=%s, root=%t, status_code=%s", reqOp.GetName(), reqOp.GetParentOperationId() == nil, reqOp.GetStatusCode())
resp, err = s.handleOp(ctx, reqOp)
resp, err = s.handleOp(ctx, resource, reqOp)
if err != nil {
break
}
Expand All @@ -31,7 +40,7 @@ func (s LogboekService) Export(ctx context.Context, in *proto_v1.ExportOperation
return resp, err
}

func (s LogboekService) handleOp(ctx context.Context, reqOp *proto_v1.ProcessingOperation) (*proto_v1.ExportOperationsResponse, error) {
func (s LogboekService) handleOp(ctx context.Context, resource storage.Resource, reqOp *proto_v1.ProcessingOperation) (*proto_v1.ExportOperationsResponse, error) {
traceID, err := logboek.TraceIDFromBytes(reqOp.TraceId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "trace_id: %v", err)
Expand Down Expand Up @@ -95,6 +104,7 @@ func (s LogboekService) handleOp(ctx context.Context, reqOp *proto_v1.Processing
ForeignTraceID: foreignTraceID,
ForeignOperationID: foreignOperationID,
Attributes: attributes,
Resource: resource,
}

err = s.store.Write(ctx, op)
Expand Down
8 changes: 6 additions & 2 deletions server/pkg/storage/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ func (s *CassandraStore) Write(ctx context.Context, op ProcessingOperation) erro
INSERT INTO processing_operations (
trace_id, operation_id, parent_operation_id,
name, start_time, end_time, status_code,
foreign_trace_id, foreign_operation_id, attributes
foreign_trace_id, foreign_operation_id, attributes,
resource_name, resource_version
) VALUES (
?, ?, ?,
?, ?, ?, ?,
?, ?, ?
?, ?, ?,
?, ?
)`

var parentOperationID *string
Expand All @@ -85,6 +87,8 @@ func (s *CassandraStore) Write(ctx context.Context, op ProcessingOperation) erro
foreignTraceID,
foreignOperationID,
op.Attributes,
op.Resource.Name,
op.Resource.Version,
).WithContext(ctx)

return query.Exec()
Expand Down
2 changes: 2 additions & 0 deletions server/pkg/storage/schema/cassandra.cql
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ CREATE TABLE IF NOT EXISTS logboek.processing_operations (
foreign_trace_id text,
foreign_operation_id text,
attributes list<frozen<attribute>>,
resource_name text,
resource_version text,

PRIMARY KEY (trace_id, operation_id)
);
2 changes: 2 additions & 0 deletions server/pkg/storage/schema/sqlite.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ CREATE TABLE IF NOT EXISTS processing_operations (
status_code VARCHAR(5) NOT NULL,
foreign_trace_id CHAR(32),
foreign_operation_id CHAR(16),
resource_name VARCHAR,
resource_version VARCHAR,

PRIMARY KEY (trace_id, operation_id)
) WITHOUT ROWID;
Expand Down
6 changes: 5 additions & 1 deletion server/pkg/storage/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ func (s *SqliteStore) Initialize() error {
INSERT INTO processing_operations (
trace_id, operation_id, parent_operation_id,
name, start_time, end_time, status_code,
foreign_trace_id, foreign_operation_id
foreign_trace_id, foreign_operation_id,
resource_name, resource_version
) VALUES (
?, ?, ?,
?, ?, ?, ?,
?, ?,
?, ?
)`
s.insertOperationStmt, err = s.db.Prepare(insertOperation)
Expand Down Expand Up @@ -114,6 +116,8 @@ func (s *SqliteStore) Write(ctx context.Context, op ProcessingOperation) error {
op.StatusCode.String(),
foreignTraceID,
foreignOperationID,
op.Resource.Name,
op.Resource.Version,
)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions server/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ type ProcessingOperation struct {
ForeignTraceID logboek.TraceID
ForeignOperationID logboek.OperationID
Attributes []Attribute
Resource Resource
}

type Resource struct {
Name string
Version string
}

0 comments on commit 3244585

Please sign in to comment.