diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java index ad999612bb..844fa298c2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java @@ -764,13 +764,15 @@ private boolean ignoreNewUpdate( return true; // Write Compute does not try to update any non-existing fields in the old value (schema). case PER_FIELD_TIMESTAMP: - GenericRecord timestampRecord = (GenericRecord) oldTimestampObject; + GenericRecord oldTimestampRecord = (GenericRecord) oldTimestampObject; for (Schema.Field field: writeComputeRecord.getSchema().getFields()) { - if (getFieldOperationType(writeComputeRecord.get(field.pos())) != NO_OP_ON_FIELD - && timestampRecord.get(field.name()) == null) { - return false; // Write Compute tries to update a non-existing field. + if (!oldTimestampRecord.hasField(field.name())) { + if (getFieldOperationType(writeComputeRecord.get(field.pos())) == NO_OP_ON_FIELD) { + continue; // New field does not perform actual update. + } + return false; // Partial update tries to update a non-existing field. } - if (isRmdFieldTimestampSmaller(timestampRecord, field.name(), updateOperationTimestamp, false)) { + if (isRmdFieldTimestampSmaller(oldTimestampRecord, field.name(), updateOperationTimestamp, false)) { return false; // One existing field must be updated. } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java index 8ed300e372..cb9091678e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java @@ -14,6 +14,7 @@ import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.rmd.RmdConstants; +import com.linkedin.venice.schema.rmd.RmdSchemaEntry; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.utils.AvroSchemaUtils; @@ -97,6 +98,132 @@ public void testUpdateIgnoredFieldUpdate() { "When the Update request is ignored, replication_checkpoint_vector should stay the same (empty)."); } + @Test + public void testUpdateIgnoredFieldUpdateWithNewSchema() { + final int incomingValueSchemaId = 3; + final int incomingWriteComputeSchemaId = 1; + final int oldValueSchemaId = 2; + // Set up + Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV3); + GenericRecord updateFieldWriteComputeRecord = new UpdateBuilderImpl(writeComputeSchema).setNewFieldValue("age", 66) + .setNewFieldValue("name", "Venice") + .build(); + ByteBuffer writeComputeBytes = ByteBuffer.wrap( + MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); + final long valueLevelTimestamp = 10L; + Map fieldNameToTimestampMap = new HashMap<>(); + fieldNameToTimestampMap.put("age", 10L); + fieldNameToTimestampMap.put("favoritePet", 10L); + fieldNameToTimestampMap.put("name", 10L); + fieldNameToTimestampMap.put("intArray", 10L); + fieldNameToTimestampMap.put("stringArray", 10L); + + GenericRecord rmdRecord = createRmdWithFieldLevelTimestamp(personRmdSchemaV2, fieldNameToTimestampMap); + RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(oldValueSchemaId, RMD_VERSION_ID, rmdRecord); + ReadOnlySchemaRepository readOnlySchemaRepository = mock(ReadOnlySchemaRepository.class); + + doReturn(new DerivedSchemaEntry(incomingValueSchemaId, incomingWriteComputeSchemaId, writeComputeSchema)) + .when(readOnlySchemaRepository) + .getDerivedSchema(storeName, incomingValueSchemaId, incomingWriteComputeSchemaId); + doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV2)).when(readOnlySchemaRepository) + .getValueSchema(storeName, oldValueSchemaId); + doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository) + .getValueSchema(storeName, incomingValueSchemaId); + doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository) + .getSupersetSchema(storeName); + StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache = + new StringAnnotatedStoreSchemaCache(storeName, readOnlySchemaRepository); + // Update happens below + MergeConflictResolver mergeConflictResolver = MergeConflictResolverFactory.getInstance() + .createMergeConflictResolver( + stringAnnotatedStoreSchemaCache, + new RmdSerDe(stringAnnotatedStoreSchemaCache, RMD_VERSION_ID), + storeName); + MergeConflictResult mergeConflictResult = mergeConflictResolver.update( + Lazy.of(() -> null), + rmdWithValueSchemaId, + writeComputeBytes, + incomingValueSchemaId, + incomingWriteComputeSchemaId, + valueLevelTimestamp - 1, // Slightly lower than existing timestamp. Thus update should be ignored. + 1, + 1, + 1, + null); + Assert.assertEquals(mergeConflictResult, MergeConflictResult.getIgnoredResult()); + Assert.assertTrue( + ((List) rmdRecord.get(RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME)).isEmpty(), + "When the Update request is ignored, replication_checkpoint_vector should stay the same (empty)."); + } + + @Test + public void testUpdateAppliedFieldUpdateWithNewSchema() { + final int incomingValueSchemaId = 3; + final int oldValueSchemaId = 2; + // Set up + Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV3); + GenericRecord updateFieldWriteComputeRecord = new UpdateBuilderImpl(writeComputeSchema).setNewFieldValue("age", 66) + .setNewFieldValue("name", "Venice") + .setNewFieldValue("nullableListField", null) + .build(); + ByteBuffer writeComputeBytes = ByteBuffer.wrap( + MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); + final long valueLevelTimestamp = 10L; + Map fieldNameToTimestampMap = new HashMap<>(); + fieldNameToTimestampMap.put("age", 10L); + fieldNameToTimestampMap.put("favoritePet", 10L); + fieldNameToTimestampMap.put("name", 10L); + fieldNameToTimestampMap.put("intArray", 10L); + fieldNameToTimestampMap.put("stringArray", 10L); + + GenericRecord rmdRecord = createRmdWithFieldLevelTimestamp(personRmdSchemaV2, fieldNameToTimestampMap); + RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(oldValueSchemaId, RMD_VERSION_ID, rmdRecord); + ReadOnlySchemaRepository readOnlySchemaRepository = mock(ReadOnlySchemaRepository.class); + + doReturn(new DerivedSchemaEntry(incomingValueSchemaId, 1, writeComputeSchema)).when(readOnlySchemaRepository) + .getDerivedSchema(storeName, incomingValueSchemaId, 1); + doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV2)).when(readOnlySchemaRepository) + .getValueSchema(storeName, oldValueSchemaId); + doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository) + .getValueSchema(storeName, incomingValueSchemaId); + doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository) + .getSupersetSchema(storeName); + doReturn(new RmdSchemaEntry(oldValueSchemaId, 1, personRmdSchemaV2)).when(readOnlySchemaRepository) + .getReplicationMetadataSchema(storeName, oldValueSchemaId, 1); + doReturn(new RmdSchemaEntry(incomingValueSchemaId, 1, personRmdSchemaV3)).when(readOnlySchemaRepository) + .getReplicationMetadataSchema(storeName, incomingValueSchemaId, 1); + doReturn(new SchemaEntry(incomingValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository) + .getValueSchema(storeName, incomingValueSchemaId); + + StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache = + new StringAnnotatedStoreSchemaCache(storeName, readOnlySchemaRepository); + // Update happens below + MergeConflictResolver mergeConflictResolver = MergeConflictResolverFactory.getInstance() + .createMergeConflictResolver( + stringAnnotatedStoreSchemaCache, + new RmdSerDe(stringAnnotatedStoreSchemaCache, RMD_VERSION_ID), + storeName); + MergeConflictResult mergeConflictResult = mergeConflictResolver.update( + Lazy.of(() -> null), + rmdWithValueSchemaId, + writeComputeBytes, + incomingValueSchemaId, + 1, + valueLevelTimestamp + 1, // Slightly higher than existing timestamp. Thus update should be applied. + 1, + 1, + 1, + null); + Assert.assertFalse(mergeConflictResult.isUpdateIgnored()); + + ByteBuffer newValueOptional = mergeConflictResult.getNewValue(); + Assert.assertNotNull(newValueOptional); + GenericRecord newValueRecord = getDeserializer(personSchemaV3, personSchemaV3).deserialize(newValueOptional); + Assert.assertEquals(newValueRecord.get("age").toString(), "66"); + Assert.assertEquals(newValueRecord.get("name").toString(), "Venice"); + Assert.assertNull(newValueRecord.get("nullableListField")); + } + @Test public void testWholeFieldUpdate() { final int incomingValueSchemaId = 3; diff --git a/clients/da-vinci-client/src/test/resources/avro/PersonV3.avsc b/clients/da-vinci-client/src/test/resources/avro/PersonV3.avsc index 110cae16b3..b9c5a9c6b1 100644 --- a/clients/da-vinci-client/src/test/resources/avro/PersonV3.avsc +++ b/clients/da-vinci-client/src/test/resources/avro/PersonV3.avsc @@ -2,6 +2,14 @@ "name": "Person", "type": "record", "fields": [ + { + "name": "stringMap", + "type": { + "type": "map", + "values": "string" + }, + "default": {} + }, { "name": "nullableListField", "type": [ @@ -43,14 +51,6 @@ "items": "string" }, "default": [] - }, - { - "name": "stringMap", - "type": { - "type": "map", - "values": "string" - }, - "default": {} } ] } \ No newline at end of file