Skip to content

Commit

Permalink
formatMessage for the verify result
Browse files Browse the repository at this point in the history
  • Loading branch information
driv3r committed Sep 3, 2024
1 parent 6fd817c commit 2dd8c90
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 36 deletions.
75 changes: 42 additions & 33 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -433,51 +434,65 @@ func (v *InlineVerifier) VerifyBeforeCutover() error {
return nil
}

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
v.verifyDuringCutoverStarted.Set(true)

mismatchFound, mismatches, err := v.verifyAllEventsInStore()
if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
return VerificationResult{}, err
}

if !mismatchFound {
return VerificationResult{
DataCorrect: true,
}, nil
}

// TODO make this into a func:
// func FormatMismatches(mismatches map[string]map[string][]InlineVerifierMismatches)) string
func formatMismatches(mismatches map[string]map[string][]InlineVerifierMismatches) (string, []string) {
// Build error message for display
var messageBuf bytes.Buffer
messageBuf.WriteString("cutover verification failed for: ")
incorrectTables := make([]string, 0)

for schemaName, _ := range mismatches {
for tableName, mismatches := range mismatches[schemaName] {
tableName = fmt.Sprintf("%s.%s", schemaName, tableName)
incorrectTables = append(incorrectTables, tableName)
sortedTables := make([]string, 0, len(mismatches[schemaName]))
for tableName, _ := range mismatches[schemaName] {
sortedTables = append(sortedTables, tableName)
}
sort.Strings(sortedTables)

for _, tableName := range sortedTables {
tableNameWithSchema := fmt.Sprintf("%s.%s", schemaName, tableName)
incorrectTables = append(incorrectTables, tableNameWithSchema)

messageBuf.WriteString(tableName)
messageBuf.WriteString(tableNameWithSchema)
messageBuf.WriteString(" [paginationKeys: ")
for _, mismatch := range mismatches {
for _, mismatch := range mismatches[schemaName][tableName] {
messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10))
messageBuf.WriteString(" (source: ")
messageBuf.WriteString(mismatch.SourceChecksum)
messageBuf.WriteString(", target: ")
messageBuf.WriteString(mismatch.TargetChecksum)
// TODO add the mismatch details somehow
// ([paginationKeys: (source: "..", target: "..")])
// to
// ([paginationKeys: (source: "..", target: "..", type: "missing on source", column: "")])
messageBuf.WriteString(", type: ")
messageBuf.WriteString(string(mismatch.Mismatch.mismatchType))
if mismatch.Mismatch.column != "" {
messageBuf.WriteString(", column: ")
messageBuf.WriteString(mismatch.Mismatch.column)
}

messageBuf.WriteString(") ")
}
messageBuf.WriteString("] ")
}
}

message := messageBuf.String()
return messageBuf.String(), incorrectTables
}

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
v.verifyDuringCutoverStarted.Set(true)

mismatchFound, mismatches, err := v.verifyAllEventsInStore()

if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
return VerificationResult{}, err
}

if !mismatchFound {
return VerificationResult{
DataCorrect: true,
}, nil
}

message, incorrectTables := formatMismatches(mismatches)

v.logger.WithField("incorrect_tables", incorrectTables).Error(message)

return VerificationResult{
Expand Down Expand Up @@ -611,7 +626,6 @@ const (
)

type mismatch struct {
paginationKey uint64
column string
mismatchType mismatchType
}
Expand All @@ -624,7 +638,6 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui
if !exists {
// row missing on source
mismatchSet[paginationKey] = mismatch{
paginationKey: paginationKey,
mismatchType: MismatchRowMissingOnSource,
}
continue
Expand All @@ -634,14 +647,12 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui
sourceData, exists := sourceDecompressedColumns[colName]
if !exists {
mismatchSet[paginationKey] = mismatch{
paginationKey: paginationKey,
column: colName,
mismatchType: MismatchColumnMissingOnSource,
}
break // no need to compare other columns
} else if !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = mismatch{
paginationKey: paginationKey,
column: colName,
mismatchType: MismatchContentDifference,
}
Expand All @@ -655,7 +666,6 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui
if !exists {
// row missing on target
mismatchSet[paginationKey] = mismatch{
paginationKey: paginationKey,
mismatchType: MismatchRowMissingOnTarget,
}
continue
Expand All @@ -665,7 +675,6 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui
_, exists := targetDecompressedColumns[colName]
if !exists {
mismatchSet[paginationKey] = mismatch{
paginationKey: paginationKey,
column: colName,
mismatchType: MismatchColumnMissingOnTarget,
}
Expand Down
78 changes: 75 additions & 3 deletions inline_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestCompareDecompressedDataContentDifference(t *testing.T) {

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]mismatch{1: {paginationKey: 1, mismatchType: MismatchContentDifference, column: "name"}}, result)
assert.Equal(t, map[uint64]mismatch{1: {mismatchType: MismatchContentDifference, column: "name"}}, result)
}

func TestCompareDecompressedDataMissingTarget(t *testing.T) {
Expand All @@ -40,7 +40,7 @@ func TestCompareDecompressedDataMissingTarget(t *testing.T) {

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]mismatch{1: {paginationKey: 1, mismatchType: MismatchRowMissingOnTarget}}, result)
assert.Equal(t, map[uint64]mismatch{1: {mismatchType: MismatchRowMissingOnTarget}}, result)
}

func TestCompareDecompressedDataMissingSource(t *testing.T) {
Expand All @@ -51,5 +51,77 @@ func TestCompareDecompressedDataMissingSource(t *testing.T) {

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]mismatch{3: {paginationKey: 3, mismatchType: MismatchRowMissingOnSource}}, result)
assert.Equal(t, map[uint64]mismatch{3: {mismatchType: MismatchRowMissingOnSource}}, result)
}

func TestFormatMismatch(t *testing.T) {
mismatches := map[string]map[string][]InlineVerifierMismatches{
"default": {
"users": {
InlineVerifierMismatches{
Pk: 1,
SourceChecksum: "",
TargetChecksum: "bar",
Mismatch: mismatch{
mismatchType: MismatchRowMissingOnSource,
},
},
},
},
}
message, tables := formatMismatches(mismatches)

assert.Equal(t, string("cutover verification failed for: default.users [paginationKeys: 1 (source: , target: bar, type: row missing on source) ] "), message)
assert.Equal(t, []string{string("default.users")}, tables)
}

func TestFormatMismatches(t *testing.T) {
mismatches := map[string]map[string][]InlineVerifierMismatches{
"default": {
"users": {
InlineVerifierMismatches{
Pk: 1,
SourceChecksum: "",
TargetChecksum: "bar",
Mismatch: mismatch{
mismatchType: MismatchRowMissingOnSource,
},
},
InlineVerifierMismatches{
Pk: 5,
SourceChecksum: "baz",
TargetChecksum: "",
Mismatch: mismatch{
mismatchType: MismatchRowMissingOnTarget,
},
},
},
"posts": {
InlineVerifierMismatches{
Pk: 9,
SourceChecksum: "boo",
TargetChecksum: "aaa",
Mismatch: mismatch{
mismatchType: MismatchContentDifference,
column: string("title"),
},
},
},
"attachments": {
InlineVerifierMismatches{
Pk: 7,
SourceChecksum: "boo",
TargetChecksum: "aaa",
Mismatch: mismatch{
mismatchType: MismatchContentDifference,
column: string("name"),
},
},
},
},
}
message, tables := formatMismatches(mismatches)

assert.Equal(t, string("cutover verification failed for: default.attachments [paginationKeys: 7 (source: boo, target: aaa, type: content difference, column: name) ] default.posts [paginationKeys: 9 (source: boo, target: aaa, type: content difference, column: title) ] default.users [paginationKeys: 1 (source: , target: bar, type: row missing on source) 5 (source: baz, target: , type: row missing on target) ] "), message)
assert.Equal(t, []string{string("default.attachments"), string("default.posts"), string("default.users")}, tables)
}

0 comments on commit 2dd8c90

Please sign in to comment.