Skip to content

Commit

Permalink
Merge pull request #676 from bstasyszyn/casdatasizemetric
Browse files Browse the repository at this point in the history
chore: Add CAS write size metric
  • Loading branch information
bstasyszyn authored Jun 20, 2022
2 parents a1567c3 + 3f84728 commit bdd5338
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 22 deletions.
3 changes: 2 additions & 1 deletion pkg/batch/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,8 @@ func newMockProtocolClient() *mocks.MockProtocolClient {
oa := operationapplier.New(pc.Protocol, parser, dc)

pc.CasClient = mocks.NewMockCasClient(nil)
th := txnprovider.NewOperationHandler(pc.Protocol, pc.CasClient, compression.New(compression.WithDefaultAlgorithms()), parser)
th := txnprovider.NewOperationHandler(pc.Protocol, pc.CasClient, compression.New(compression.WithDefaultAlgorithms()),
parser, &mocks.MetricsProvider{})

pv := mocks.GetProtocolVersion(pc.Protocol)

Expand Down
2 changes: 1 addition & 1 deletion pkg/dochandler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func newMockProtocolClient() *mocks.MockProtocolClient {

pc.CasClient = mocks.NewMockCasClient(nil)
cp := compression.New(compression.WithDefaultAlgorithms())
oh := txnprovider.NewOperationHandler(pc.Protocol, pc.CasClient, cp, parser)
oh := txnprovider.NewOperationHandler(pc.Protocol, pc.CasClient, cp, parser, &mocks.MetricsProvider{})

v.OperationParserReturns(parser)
v.OperationApplierReturns(oa)
Expand Down
4 changes: 4 additions & 0 deletions pkg/mocks/metricsprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ func (m *MetricsProvider) HTTPCreateUpdateTime(value time.Duration) {
// HTTPResolveTime records the time rest call for resolve.
func (m *MetricsProvider) HTTPResolveTime(value time.Duration) {
}

// CASWriteSize records the size of the data written to CAS.
func (m *MetricsProvider) CASWriteSize(dataType string, size int) {
}
18 changes: 16 additions & 2 deletions pkg/versions/1_0/txnprovider/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,29 @@ type compressionProvider interface {
Compress(alg string, data []byte) ([]byte, error)
}

type metricsProvider interface {
CASWriteSize(dataType string, size int)
}

// OperationHandler creates batch files(chunk, map, anchor) from batch operations.
type OperationHandler struct {
cas cas.Client
protocol protocol.Protocol
parser OperationParser
cp compressionProvider
metrics metricsProvider
}

// NewOperationHandler returns new operations handler.
func NewOperationHandler(p protocol.Protocol, cas cas.Client, cp compressionProvider, parser OperationParser) *OperationHandler {
return &OperationHandler{cas: cas, protocol: p, cp: cp, parser: parser}
func NewOperationHandler(p protocol.Protocol, cas cas.Client, cp compressionProvider, parser OperationParser,
metrics metricsProvider) *OperationHandler {
return &OperationHandler{
cas: cas,
protocol: p,
parser: parser,
cp: cp,
metrics: metrics,
}
}

// PrepareTxnFiles will create batch files(core index, core proof, provisional index, provisional proof and chunk)
Expand Down Expand Up @@ -283,6 +295,8 @@ func (h *OperationHandler) writeModelToCAS(model interface{}, alias string) (str
return "", fmt.Errorf("failed to store %s file: %s", alias, err.Error())
}

h.metrics.CASWriteSize(alias, len(compressedBytes))

return address, nil
}

Expand Down
31 changes: 21 additions & 10 deletions pkg/versions/1_0/txnprovider/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func TestNewOperationHandler(t *testing.T) {
protocol,
mocks.NewMockCasClient(nil),
compression.New(compression.WithDefaultAlgorithms()),
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})
require.NotNil(t, handler)
}

Expand All @@ -73,7 +74,8 @@ func TestOperationHandler_PrepareTxnFiles(t *testing.T) {
protocol,
mocks.NewMockCasClient(nil),
compression,
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.NoError(t, err)
Expand Down Expand Up @@ -192,7 +194,8 @@ func TestOperationHandler_PrepareTxnFiles(t *testing.T) {
protocol,
mocks.NewMockCasClient(nil),
compression,
operationparser.New(protocol, operationparser.WithAnchorTimeValidator(&mockTimeValidator{})))
operationparser.New(protocol, operationparser.WithAnchorTimeValidator(&mockTimeValidator{})),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.NoError(t, err)
Expand All @@ -210,7 +213,8 @@ func TestOperationHandler_PrepareTxnFiles(t *testing.T) {
protocol,
mocks.NewMockCasClient(nil),
compression,
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.NoError(t, err)
Expand Down Expand Up @@ -270,7 +274,8 @@ func TestOperationHandler_PrepareTxnFiles(t *testing.T) {
protocol,
mocks.NewMockCasClient(nil),
compression,
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(nil)
require.Error(t, err)
Expand All @@ -283,7 +288,8 @@ func TestOperationHandler_PrepareTxnFiles(t *testing.T) {
protocol,
mocks.NewMockCasClient(nil),
compression,
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

op := &operation.QueuedOperation{
OperationRequest: []byte(`{"key":"value"}`),
Expand All @@ -304,7 +310,8 @@ func TestOperationHandler_PrepareTxnFiles(t *testing.T) {
protocol,
mocks.NewMockCasClient(errors.New("CAS error")),
compression,
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.Error(t, err)
Expand All @@ -319,7 +326,8 @@ func TestOperationHandler_PrepareTxnFiles(t *testing.T) {
protocol,
mocks.NewMockCasClient(errors.New("CAS error")),
compression,
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.Error(t, err)
Expand All @@ -335,7 +343,8 @@ func TestWriteModelToCAS(t *testing.T) {
protocol,
mocks.NewMockCasClient(nil),
compression.New(compression.WithDefaultAlgorithms()),
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

t.Run("success", func(t *testing.T) {
address, err := handler.writeModelToCAS(&models.CoreIndexFile{}, "alias")
Expand All @@ -355,7 +364,8 @@ func TestWriteModelToCAS(t *testing.T) {
protocol,
mocks.NewMockCasClient(errors.New("CAS error")),
compression.New(compression.WithDefaultAlgorithms()),
operationparser.New(protocol))
operationparser.New(protocol),
&mocks.MetricsProvider{})

address, err := handlerWithCASError.writeModelToCAS(&models.CoreIndexFile{}, "alias")
require.Error(t, err)
Expand All @@ -372,6 +382,7 @@ func TestWriteModelToCAS(t *testing.T) {
mocks.NewMockCasClient(nil),
compression.New(compression.WithDefaultAlgorithms()),
operationparser.New(pc.Protocol),
&mocks.MetricsProvider{},
)

address, err := handlerWithProtocolError.writeModelToCAS(&models.CoreIndexFile{}, "alias")
Expand Down
24 changes: 16 additions & 8 deletions pkg/versions/1_0/txnprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {

t.Run("success", func(t *testing.T) {
cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

ops := getTestOperations(createOpsNum, updateOpsNum, deactivateOpsNum, recoverOpsNum)

Expand All @@ -84,7 +85,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {

t.Run("error - delta exceeds maximum delta size in chunk file", func(t *testing.T) {
cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

ops := getTestOperations(createOpsNum, updateOpsNum, deactivateOpsNum, recoverOpsNum)

Expand Down Expand Up @@ -112,7 +114,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {

t.Run("error - number of operations doesn't match", func(t *testing.T) {
cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

ops := getTestOperations(createOpsNum, updateOpsNum, deactivateOpsNum, recoverOpsNum)

Expand Down Expand Up @@ -159,7 +162,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {

t.Run("error - parse core index operations error", func(t *testing.T) {
cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

ops := getTestOperations(createOpsNum, updateOpsNum, deactivateOpsNum, recoverOpsNum)

Expand Down Expand Up @@ -206,7 +210,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {
ops = append(ops, generateOperations(deactivateOpsNum, operation.TypeDeactivate)...)

cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.NoError(t, err)
Expand Down Expand Up @@ -237,7 +242,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {
ops = append(ops, generateOperations(updateOpsNum, operation.TypeUpdate)...)

cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.NoError(t, err)
Expand Down Expand Up @@ -268,7 +274,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {
ops = append(ops, generateOperations(createOpsNum, operation.TypeCreate)...)

cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.NoError(t, err)
Expand Down Expand Up @@ -299,7 +306,8 @@ func TestHandler_GetTxnOperations(t *testing.T) {
ops = append(ops, generateOperations(recoverOpsNum, operation.TypeRecover)...)

cas := mocks.NewMockCasClient(nil)
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol))
handler := NewOperationHandler(pc.Protocol, cas, cp, operationparser.New(pc.Protocol),
&mocks.MetricsProvider{})

anchoringInfo, err := handler.PrepareTxnFiles(ops)
require.NoError(t, err)
Expand Down

0 comments on commit bdd5338

Please sign in to comment.