Skip to content

Commit

Permalink
[server] AAWC should ignore NO_OP new field from update record (#1159)
Browse files Browse the repository at this point in the history
If a field from update record is new field but it is NO_OP, we should skip this field, instead of letting it slip into checking TS from old TS record, which will result in exception and halt the ingestion
  • Loading branch information
sixpluszero committed Sep 6, 2024
1 parent 96e6512 commit 4b1dfd4
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -764,13 +764,15 @@ private boolean ignoreNewUpdate(
return true; // Write Compute does not try to update any non-existing fields in the old value (schema).

case PER_FIELD_TIMESTAMP:
GenericRecord timestampRecord = (GenericRecord) oldTimestampObject;
GenericRecord oldTimestampRecord = (GenericRecord) oldTimestampObject;
for (Schema.Field field: writeComputeRecord.getSchema().getFields()) {
if (getFieldOperationType(writeComputeRecord.get(field.pos())) != NO_OP_ON_FIELD
&& timestampRecord.get(field.name()) == null) {
return false; // Write Compute tries to update a non-existing field.
if (!oldTimestampRecord.hasField(field.name())) {
if (getFieldOperationType(writeComputeRecord.get(field.pos())) == NO_OP_ON_FIELD) {
continue; // New field does not perform actual update.
}
return false; // Partial update tries to update a non-existing field.
}
if (isRmdFieldTimestampSmaller(timestampRecord, field.name(), updateOperationTimestamp, false)) {
if (isRmdFieldTimestampSmaller(oldTimestampRecord, field.name(), updateOperationTimestamp, false)) {
return false; // One existing field must be updated.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.rmd.RmdConstants;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.utils.AvroSchemaUtils;
Expand Down Expand Up @@ -97,6 +98,132 @@ public void testUpdateIgnoredFieldUpdate() {
"When the Update request is ignored, replication_checkpoint_vector should stay the same (empty).");
}

@Test
public void testUpdateIgnoredFieldUpdateWithNewSchema() {
final int incomingValueSchemaId = 3;
final int incomingWriteComputeSchemaId = 1;
final int oldValueSchemaId = 2;
// Set up
Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV3);
GenericRecord updateFieldWriteComputeRecord = new UpdateBuilderImpl(writeComputeSchema).setNewFieldValue("age", 66)
.setNewFieldValue("name", "Venice")
.build();
ByteBuffer writeComputeBytes = ByteBuffer.wrap(
MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord));
final long valueLevelTimestamp = 10L;
Map<String, Long> fieldNameToTimestampMap = new HashMap<>();
fieldNameToTimestampMap.put("age", 10L);
fieldNameToTimestampMap.put("favoritePet", 10L);
fieldNameToTimestampMap.put("name", 10L);
fieldNameToTimestampMap.put("intArray", 10L);
fieldNameToTimestampMap.put("stringArray", 10L);

GenericRecord rmdRecord = createRmdWithFieldLevelTimestamp(personRmdSchemaV2, fieldNameToTimestampMap);
RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(oldValueSchemaId, RMD_VERSION_ID, rmdRecord);
ReadOnlySchemaRepository readOnlySchemaRepository = mock(ReadOnlySchemaRepository.class);

doReturn(new DerivedSchemaEntry(incomingValueSchemaId, incomingWriteComputeSchemaId, writeComputeSchema))
.when(readOnlySchemaRepository)
.getDerivedSchema(storeName, incomingValueSchemaId, incomingWriteComputeSchemaId);
doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV2)).when(readOnlySchemaRepository)
.getValueSchema(storeName, oldValueSchemaId);
doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository)
.getValueSchema(storeName, incomingValueSchemaId);
doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository)
.getSupersetSchema(storeName);
StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache =
new StringAnnotatedStoreSchemaCache(storeName, readOnlySchemaRepository);
// Update happens below
MergeConflictResolver mergeConflictResolver = MergeConflictResolverFactory.getInstance()
.createMergeConflictResolver(
stringAnnotatedStoreSchemaCache,
new RmdSerDe(stringAnnotatedStoreSchemaCache, RMD_VERSION_ID),
storeName);
MergeConflictResult mergeConflictResult = mergeConflictResolver.update(
Lazy.of(() -> null),
rmdWithValueSchemaId,
writeComputeBytes,
incomingValueSchemaId,
incomingWriteComputeSchemaId,
valueLevelTimestamp - 1, // Slightly lower than existing timestamp. Thus update should be ignored.
1,
1,
1,
null);
Assert.assertEquals(mergeConflictResult, MergeConflictResult.getIgnoredResult());
Assert.assertTrue(
((List<?>) rmdRecord.get(RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME)).isEmpty(),
"When the Update request is ignored, replication_checkpoint_vector should stay the same (empty).");
}

@Test
public void testUpdateAppliedFieldUpdateWithNewSchema() {
final int incomingValueSchemaId = 3;
final int oldValueSchemaId = 2;
// Set up
Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV3);
GenericRecord updateFieldWriteComputeRecord = new UpdateBuilderImpl(writeComputeSchema).setNewFieldValue("age", 66)
.setNewFieldValue("name", "Venice")
.setNewFieldValue("nullableListField", null)
.build();
ByteBuffer writeComputeBytes = ByteBuffer.wrap(
MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord));
final long valueLevelTimestamp = 10L;
Map<String, Long> fieldNameToTimestampMap = new HashMap<>();
fieldNameToTimestampMap.put("age", 10L);
fieldNameToTimestampMap.put("favoritePet", 10L);
fieldNameToTimestampMap.put("name", 10L);
fieldNameToTimestampMap.put("intArray", 10L);
fieldNameToTimestampMap.put("stringArray", 10L);

GenericRecord rmdRecord = createRmdWithFieldLevelTimestamp(personRmdSchemaV2, fieldNameToTimestampMap);
RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(oldValueSchemaId, RMD_VERSION_ID, rmdRecord);
ReadOnlySchemaRepository readOnlySchemaRepository = mock(ReadOnlySchemaRepository.class);

doReturn(new DerivedSchemaEntry(incomingValueSchemaId, 1, writeComputeSchema)).when(readOnlySchemaRepository)
.getDerivedSchema(storeName, incomingValueSchemaId, 1);
doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV2)).when(readOnlySchemaRepository)
.getValueSchema(storeName, oldValueSchemaId);
doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository)
.getValueSchema(storeName, incomingValueSchemaId);
doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository)
.getSupersetSchema(storeName);
doReturn(new RmdSchemaEntry(oldValueSchemaId, 1, personRmdSchemaV2)).when(readOnlySchemaRepository)
.getReplicationMetadataSchema(storeName, oldValueSchemaId, 1);
doReturn(new RmdSchemaEntry(incomingValueSchemaId, 1, personRmdSchemaV3)).when(readOnlySchemaRepository)
.getReplicationMetadataSchema(storeName, incomingValueSchemaId, 1);
doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository)
.getValueSchema(storeName, incomingValueSchemaId);

StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache =
new StringAnnotatedStoreSchemaCache(storeName, readOnlySchemaRepository);
// Update happens below
MergeConflictResolver mergeConflictResolver = MergeConflictResolverFactory.getInstance()
.createMergeConflictResolver(
stringAnnotatedStoreSchemaCache,
new RmdSerDe(stringAnnotatedStoreSchemaCache, RMD_VERSION_ID),
storeName);
MergeConflictResult mergeConflictResult = mergeConflictResolver.update(
Lazy.of(() -> null),
rmdWithValueSchemaId,
writeComputeBytes,
incomingValueSchemaId,
1,
valueLevelTimestamp + 1, // Slightly higher than existing timestamp. Thus update should be applied.
1,
1,
1,
null);
Assert.assertFalse(mergeConflictResult.isUpdateIgnored());

ByteBuffer newValueOptional = mergeConflictResult.getNewValue();
Assert.assertNotNull(newValueOptional);
GenericRecord newValueRecord = getDeserializer(personSchemaV3, personSchemaV3).deserialize(newValueOptional);
Assert.assertEquals(newValueRecord.get("age").toString(), "66");
Assert.assertEquals(newValueRecord.get("name").toString(), "Venice");
Assert.assertNull(newValueRecord.get("nullableListField"));
}

@Test
public void testWholeFieldUpdate() {
final int incomingValueSchemaId = 3;
Expand Down
16 changes: 8 additions & 8 deletions clients/da-vinci-client/src/test/resources/avro/PersonV3.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
"name": "Person",
"type": "record",
"fields": [
{
"name": "stringMap",
"type": {
"type": "map",
"values": "string"
},
"default": {}
},
{
"name": "nullableListField",
"type": [
Expand Down Expand Up @@ -43,14 +51,6 @@
"items": "string"
},
"default": []
},
{
"name": "stringMap",
"type": {
"type": "map",
"values": "string"
},
"default": {}
}
]
}

0 comments on commit 4b1dfd4

Please sign in to comment.