From 94c36f990d230d8e84239f4b3634e6f908717436 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 26 Jun 2023 12:12:36 -0700 Subject: [PATCH 1/4] [VIP-2] Removing Per Record Offset Metadata From Venice-Server Storage With Heartbeats Add VIP-2 --- docs/proposals/VIP-2.md | 117 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 docs/proposals/VIP-2.md diff --git a/docs/proposals/VIP-2.md b/docs/proposals/VIP-2.md new file mode 100644 index 0000000000..f0b4535518 --- /dev/null +++ b/docs/proposals/VIP-2.md @@ -0,0 +1,117 @@ +--- +layout: default +title: VIP-1 +parent: Community Guides +permalink: /docs/proposals/VIP_TEMPLATE.md +--- + +# [VIP-2] Removing Per Record Offset Metadata From Venice-Server Storage + +* **Status**: Discussion +* **Author(s)**: Zac Policzer +* **Pull Request**: _TODO_ +* **Release**: _TODO_ + + + +## Introduction + +This VIP explores strategies for removing the offset metadata stored per record by utilizing heartbeats as well as an exploration of what other system benefits can be had by adopting replica heartbeats. + +## Problem Statement + +Today, Venice stores which have Active/Active replication enabled store N number of offsets per record where N is the number of replicating sites. This is done so as to enable data drift detection via a Venice algorithm called Leap Frog (which we will document in a later section) so that it's possible to detect and flag inconsistencies triggered by outages and bugs with very high granularity. + +In addition, these offsets are utilized by Venice Change Capture consumers for filtering out duplicated events across version pushes, so that downstream consumers do not get callbacks triggered for events that they already consumed. + +This was a fine approach, however, it presumes that for a given message in an underlying pubsub implementation, that the object used to describe it's position (in Kafka's case, a long) is reasonably small. So long as it's small, the added overhead per record is tolerable. + +This is not neceassrily the case. **In at least one PubSub implementation that Venice will need to support, the position object is as large as over 20 Bytes.** + +Venice (at least in LinkedIn deploments at time of writing) is a storage bound service, and this high overhead has real cost implications. + +## Scope + +_Call out explicitly:_ + +1. **What is in scope?** + + We must be able to shrink down or remove this per record overhead in the venice server, but still be able to retain the functionality that we depend on (diff detection and change capture client filtering). + +2. **What is out of scope?** + + We will not seek to reduce the footprint of this data in the underlying PubSub (where it's presumed that disk is cheaper). + +## Project Justification + +This project seeks to improve the **performance, scalability, and cost to serve of Venice**. By shrinking the per record metadata we'll not only save on disk, but we should be able to improve the ingestion speed slightly (less bytes is less bytes afterall) and reduce the potential increase in storage cost as new replicating sites (henceforth refered to as **colos**) are added. + +## Proposed Design + +Before we can discuss proposed designs, it's important to clear up some aspects of per record pubsub positions (henceforth referred to as **offsets**) which have heretofore not been documented in Venice open source (but for which there are some mentions of in the code). + +### How Are Per Record Offsets Used Today? +* #### **Leap Frog** + +* #### **Venice Change Capture** + +_This section must describe the proposed design and implementation in detail, and how it +addresses the problems outlined in the problem statement. It must provide details on alternative solutions +that were considered, stating clearly why they lost out to the final design. This latter point is particularly +important or else we tend to vacillate on old decisions because we cannot remember the original rationale. +Phrase this section in terms of trade offs since it is rare that one approach is better in all ways; there is +generally a tradeoff and making that explicit makes it much easier to understand._ + +_The following aspects of the design must be covered when applicable:_ + +1. _Changes to APIs or protocols or data format highlighting any compatibility issues and how they will be addressed._ +2. _Major components and sequence diagrams between them._ +3. _Multi-region implications including Parent/Child Controller communication._ +4. _Alternative designs considered, and brief explanation of tradeoffs and decision versus the chosen design, +and important aspects to consider_: + 1. Security. + 2. Performance. + 3. Maintainability. + 4. Operability. + 5. Scalability. + 6. Compatibility. + 7. Testability. + 8. Risks/Mitigations. + 9. Overall effort. + 10. Leverageable. + 11. Align with the long-term vision. +5. Preferred option by the designer/author. + 1. _Conclusions/Decisions made in the design review sessions. Explanation why it’s selected_ + +## Development Milestones +_This section described milestones for rather large projects so that we can set up intermediate goals for +better progress management. +For small projects, this may not be necessary._ + +## Test Plan + +_Describe in few sentences how the functionality will be tested. + How will we know that the implementation works as expected? How will we know nothing broke?_ + +1. _What unit tests would be added to cover the critical logic?_ +2. _What integration tests would be added to cover major components, if they cannot be tested by unit tests?_ +3. _Any tests to cover performance or Security concerns._ + + +## References + +_List any references here._ + +## Appendix + +_Add supporting information here, such as the results of performance studies and back-of-the-envelope calculations. +Organize it however you want but make sure it is comprehensible to your readers._ + + + + + + + + + From 3b1ad665162f0375713111568951966c90f3ecfb Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 26 Jun 2023 12:13:47 -0700 Subject: [PATCH 2/4] Add leap frog TLA+ spec --- specs/TLA+/LeapFrog/MCleapfrog.cfg | 14 + specs/TLA+/LeapFrog/MCleapfrog.tla | 17 ++ specs/TLA+/LeapFrog/leapfrog.tla | 424 +++++++++++++++++++++++++++++ 3 files changed, 455 insertions(+) create mode 100644 specs/TLA+/LeapFrog/MCleapfrog.cfg create mode 100644 specs/TLA+/LeapFrog/MCleapfrog.tla create mode 100644 specs/TLA+/LeapFrog/leapfrog.tla diff --git a/specs/TLA+/LeapFrog/MCleapfrog.cfg b/specs/TLA+/LeapFrog/MCleapfrog.cfg new file mode 100644 index 0000000000..be20e56514 --- /dev/null +++ b/specs/TLA+/LeapFrog/MCleapfrog.cfg @@ -0,0 +1,14 @@ +SPECIFICATION Spec +CONSTRAINTS + TerminateComparison + NoSuccessiveControlMessages +CONSTANTS + COLOS = {"lor1", "ltx1"} + KEYS = {"key1", "key2"} + VALUES = {"squirrel", "elephant"} + MAX_WRITES = 3 + +INVARIANT MaxDiameter +PROPERTIES + Safe + Live \ No newline at end of file diff --git a/specs/TLA+/LeapFrog/MCleapfrog.tla b/specs/TLA+/LeapFrog/MCleapfrog.tla new file mode 100644 index 0000000000..f2cba428b4 --- /dev/null +++ b/specs/TLA+/LeapFrog/MCleapfrog.tla @@ -0,0 +1,17 @@ +----------- MODULE MCleapfrog ---- +EXTENDS TLC, leapfrog + +(* Do not explore states that have more writes then whats been configured *) +TerminateComparison == + \/ WRITES <= MAX_WRITES + +(* Do not explore states where we're just running comparisons infinitely *) +NoSuccessiveControlMessages == + /\ \A i,j \in 1..Len(WALs.coloA): + (j = i + 1 /\ WALs.coloA[i].key = "controlKey") => WALs.coloA[j].key # WALs.coloA[i].key + /\ \A i,j \in 1..Len(WALs.coloB): + (j = i + 1 /\ WALs.coloB[i].key = "controlKey") => WALs.coloB[j].key # WALs.coloB[i].key + +(* INVARIANT meant to police state explosion (possible bug) *) +MaxDiameter == TLCGet("level") < 50 +==== \ No newline at end of file diff --git a/specs/TLA+/LeapFrog/leapfrog.tla b/specs/TLA+/LeapFrog/leapfrog.tla new file mode 100644 index 0000000000..8c8e56744e --- /dev/null +++ b/specs/TLA+/LeapFrog/leapfrog.tla @@ -0,0 +1,424 @@ +----------------------------- MODULE leapfrog ------------------------------- +(***************************************************************************) +(* LeapFrog is an algorithm that is meant to detect data divergence of a *) +(* pair of active replicas for a datastore. It is able to take advantage *) +(* of the efficiencies of batch processing of incremental changes in *) +(* either a streaming environment, or, a map reduce environment. It treats *) +(* the upstream datastore (for the most part) as a black box. It does not *) +(* make assumptions about any conflict resolution strategy or consensus *) +(* protocols which might be employed, therefore, it does not make *) +(* determinations on the correctness of those strategies. This algorithm *) +(* is only capable of making judgements on the consistency of COMMITTED *) +(* data of replicas. *) +(* *) +(* COMMITED data is defined as data which has undergone conflict *) +(* resolution/concensus and been written to a write ahead log (WAL) *) +(* *) +(* We also assume that active replicas of the store leverage a WAL where *) +(* committed data is recorded. Events written to this WAL have a *) +(* coordinate system where events in a WAL can be causally compared (given *) +(* two events, we can determine which event got committed before the *) +(* other) and we can trace where these writes originated from. As an *) +(* example, given an entry in a WAL by looking at the entries metadata we *) +(* can determine the order in which it was committed to the attached *) +(* replica relative to other commits, and we can determine it's ordering *) +(* relative to events committed from it's original source replica. *) +(***************************************************************************) + +EXTENDS Naturals, Sequences + +(***************************************************************************) +(* We first declare the following inputs to represent our replicating *) +(* environment: *) +(***************************************************************************) +(***************************************************************************) +(* COLOS: A set of active replicas that are applying updates *) +(***************************************************************************) +(***************************************************************************) +(* KEYS: The keyspace for updates being applied to active replicas *) +(***************************************************************************) +(***************************************************************************) +(* VALUES: The set of values to write for those keys *) +(***************************************************************************) +(***************************************************************************) +(* MAX_WRITES: An upper bound on the number of events we write (just to *) +(* keep runtime of the model checker within reason) *) +(***************************************************************************) + +CONSTANTS COLOS, KEYS, VALUES, MAX_WRITES + +(***************************************************************************) +(* Global Variables *) +(***************************************************************************) +(* A pair of collections to represent the WAL's for the active replicas we *) +(* mean to compare. Each collection is an ordered list of updates to a *) +(* given replica of the upstream store. *) +(* This variable is a function with a domain of coloA and coloB meant to *) +(* represent the two replicas we're comparing. *) +VARIABLE WALs + +(* A set of monotonically increasing counters for each event committed by *) +(* each replica. In the Venice implementation, this corresponds to the *) +(* incrementing offsets given to messages in the kafka real time topic. In *) +(* MysQL it would correspond to binlog GTID's *) +VARIABLE coloCounters + +(* A boolean not used for implementation, but used in this specification. *) +(* This is meant do denote that in a given run, data was not symmetrically *) +(* delivered to a replica, therefore, the checker MUST flag this. If it's *) +(* false, the checker MUST NOT flag this. *) +VARIABLE dataDiverges + +(* A boolean meant to represent the flag the consistency checker will mark *) +(* once it detects that data has diverged. *) +VARIABLE divergenceDetected + +(* A pair of key value stores meant to simulate the replicas we mean to *) +(* compare. Each update delivered will merge the record and record *) +(* metadata and add an event to it's respective WAL. *) +VARIABLE keyValueStores + +(* In the implementation of this algorithm, we perform a stepwise *) +(* computation of data in a WAL at a given time. To get equivalent *) +(* behavior, we'll increment a token per destination replica in order to *) +(* represent a snapshot in time of the pair of WALs. We'll advance this *) +(* token in one of the two comparingReplicas with each check. *) +VARIABLE leapFrogIndexes + +(* As leap frog scans WAL entries, it maintains a pair of vectors where *) +(* each component part of the vector is the high water mark of a given WAL *) +(* coordinate for a given active replica that has been thus far scanned. *) +(* watermark of a source replica write. *) +VARIABLE leapFrogHighWaterMarks + +(* A pair of maps keep the last update in a given WAL to each key as *) +(* of the position of the last leapFrogIndexes. Functionally, it's a way *) +(* to track intermediate states for each key in a replica. *) +VARIABLE leapFrogLatestUpdates + +(* The combined key space including control keys (KEYS \union controlKey) *) +VARIABLE KEY_SPACE + +(* A simple counter of submitted writes meant to track termination in TLC *) +VARIABLE WRITES + +(* This spec assumes that we are comparing two replicas at a time. *) +comparingReplicas == {"coloA", "coloB"} + +(* These are special keys/values that we leverage to deal with the edge *) +(* case of a dropped write being the last write to one of a pair of *) +(* replicas. It's assumed that there will always be another write in most *) +(* production use cases, but in order to account for all scenarios in TLC *) +(* we must account for this by writing a new event to each replica as a *) +(* kind of 'capstone' write when initiating a comparison. In a production *) +(* environment it is likely not needed to impelemnt this as part of the *) +(* the consistency checker. Automatic replica heartbeats which exist in *) +(* most replicating sytems can fill the same role and catch such cases as *) +(* a completely broken replicating pipeline. *) +controlKey == "controlKey" +controlValue == "controlValue" + +(* This is a special value that is meant to be temporary. It is *) +(* committed, and then overriden and is meant to simulate a system where *) +(* there is an eventually consistent model in play (data may temporarily *) +(* diverge but eventually will become consistent). Usually this means *) +(* that we'll see asymmetric WAL entries (one commit in one colo, and two *) +(* commits in another). Such an event should not be flagged as an *) +(* an inconsistency as it's part of normal operation. *) +temporaryValue == "temporaryRecord" + +(* Default value placeholder for initialization *) +null == "null" + +(* Enumeration of variables for TLC state tracking *) +vars == + <> + +(* Variables which only change when we run leapfrog comparison *) +leapfrogVars == <> +---- +(***************************************************************************) +(* Properties: *) +(***************************************************************************) +(* Live: If data diverges, then we should detect it eventually and not *) +(* flip the state back. *) +Live == + dataDiverges ~> []divergenceDetected + +(* Safe: we should never detect divergence when there is none at any state *) +Safe == + [](divergenceDetected => dataDiverges) +---- +(**************************************************************************) +(* KV Store Helper functions *) +(**************************************************************************) +(* These Helpers describe a key value store with the following behaviors: *) +(* *) +(* Updates/Inserts to a key are accompanied by metadata which designates *) +(* a source replica and a monotonically increasing counter with values *) +(* that are not reused across writes from a given replica. When a key is *) +(* updated, metadata for that update is merged with the existing record *) +(* maintaining the highest counter value from each active replica that *) +(* has applied an update on this record. *) +(* *) +(* This merge behavior is described with MergeRowReplicaCounters which *) +(* returns the union of DOMAIN of two functions and keeps the greater of *) +(* each RANGE value *) +(* Ex: if r1=[lor1 |-> 20, lva1 |-> 10] *) +(* r2=[lor1 |-> 9, lva1 |-> 15, ltx1 |-> 5] *) +(* *) +(* Then return [lor1 |-> 20, lva1 |-> 15, ltx1 |-> 5] *) +(**************************************************************************) +MergeRowReplicaCounters(r1, r2) == + LET D1 == DOMAIN r1 + D2 == DOMAIN r2 IN + [k \in (D1 \cup D2) |-> + IF k \in D1 THEN + IF k \in D2 + THEN IF r1[k] > r2[k] THEN r1[k] ELSE r2[k] + ELSE r1[k] + ELSE r2[k] + ] + +(**************************************************************************) +(* Merge together two KV records based on the semantic that a new update *) +(* should have it's value overwrite from the previous one, and then we *) +(* merge the metadata of the new value with the old one with *) +(**************************************************************************) +MergeKVRecords(r1, r2) == + [value |-> r2.value, replicaCounters |-> + MergeRowReplicaCounters(r1.replicaCounters, r2.replicaCounters)] + +(**************************************************************************) +(* Build a record which conforms to the structure we expect when parsing *) +(* WAL events *) +(**************************************************************************) +BuildWALRecord(newKey, newValue, newreplicaCounters) == + [key |-> newKey, value |-> newValue, replicaCounters |-> newreplicaCounters] + +(***************************************************************************) +(* Write a replicated event to each replica. This is used to simulate an *) +(* update getting applied to an active replica and then replicating to *) +(* the two active replicas we are comparing *) +(***************************************************************************) +SymmetricWrite(sourceColo, newKey, newValue) == + /\ keyValueStores' = [ keyValueStores EXCEPT + !.coloA[newKey].value = newValue, + !.coloA[newKey].replicaCounters[sourceColo] = coloCounters[sourceColo], + !.coloB[newKey].value = newValue, + !.coloB[newKey].replicaCounters[sourceColo] = coloCounters[sourceColo]] + /\ WALs' = [WALs EXCEPT + !.coloA = Append(WALs.coloA, [key |-> newKey, + value |-> keyValueStores.coloA[newKey].value', + replicaCounters |-> keyValueStores.coloA[newKey].replicaCounters']), + !.coloB = Append(WALs.coloB, [key |-> newKey, + value |-> keyValueStores.coloB[newKey].value', + replicaCounters |-> keyValueStores.coloB[newKey].replicaCounters'])] + /\ coloCounters' = [coloCounters EXCEPT + ![sourceColo] = coloCounters[sourceColo] + 1] + /\ UNCHANGED dataDiverges + +(***************************************************************************) +(* This is where things go wrong. This write is meant to simulate a bug *) +(* in the consistency model of this store and results in divergent data. *) +(* We model this as a situation where a write is applied to only one *) +(* but it does not replicate. *) +(***************************************************************************) +AsymmetricWrite(sourceColo, newKey, newValue) == + /\ keyValueStores' = [ keyValueStores EXCEPT + !.coloA[newKey].value = "EVILPOISONPILLVALUE", + !.coloA[newKey].replicaCounters[sourceColo] = coloCounters[sourceColo]] + /\ WALs' = [WALs EXCEPT + !.coloA = Append(WALs.coloA, [key |-> newKey, + value |-> keyValueStores.coloA[newKey].value', + replicaCounters |-> keyValueStores.coloA[newKey].replicaCounters'])] + /\ coloCounters' = [coloCounters EXCEPT + ![sourceColo] = coloCounters[sourceColo] + 1] + /\ dataDiverges'=TRUE + +(**************************************************************************) +(* This function applies two conflicting writes to the same key to two *) +(* replicas, and then immediately overrides one of the keys in one of the *) +(* repolicas with the write from the other replica. The effect of this *) +(* is that data converges for the two replicas, but, there is an *) +(* asymmetric history of writes for the two replicas recorded in the two *) +(* WAL's. This scenario is typical of data systems which utilize async *) +(* replication and eventual consistency. Leap Frog should observe these *) +(* differences, but recognize that they don't mean that the replicas *) +(* reached a state of permanent inconsistency. Rather, it treats this as *) +(* normal operation, and does not flag it. *) +(**************************************************************************) +EventuallyConsistentWrite(sourceColo, conflictingColo, newKey, newValue) == + /\ keyValueStores' = [ keyValueStores EXCEPT + !.coloA[newKey].value = newValue, + !.coloA[newKey].replicaCounters[sourceColo] = coloCounters[sourceColo], + !.coloA[newKey].replicaCounters[conflictingColo] = coloCounters[conflictingColo], + !.coloB[newKey].value = newValue, + !.coloB[newKey].replicaCounters[sourceColo] = coloCounters[sourceColo]] + /\ WALs' = [WALs EXCEPT + !.coloA = WALs.coloA \o + < coloCounters[conflictingColo]])) + ,BuildWALRecord(newKey, newValue, keyValueStores.coloA[newKey].replicaCounters')>>, + !.coloB = WALs.coloB \o + <>] + /\ coloCounters' = [coloCounters EXCEPT + ![sourceColo] = coloCounters[sourceColo] + 1] + /\ UNCHANGED dataDiverges + +(***************************************************************************) +(* Write an event to each replica that increments the counter from each *) +(* source replica. A production equivalent would be a set of heartbeats *) +(* from all active source replicas getting written to the pair of replicas *) +(* that we're comparing *) +(***************************************************************************) +SymmetricWriteWithFullVector(newKey, newValue) == + /\ keyValueStores' = [ keyValueStores EXCEPT + !.coloA[newKey].value = newValue, + !.coloA[newKey].replicaCounters = coloCounters, + !.coloB[newKey].value = newValue, + !.coloB[newKey].replicaCounters = coloCounters] + /\ WALs' = [WALs EXCEPT + !.coloA = Append(WALs.coloA, [key |-> newKey, + value |-> keyValueStores.coloA[newKey].value', + replicaCounters |-> keyValueStores.coloA[newKey].replicaCounters']), + !.coloB = Append(WALs.coloB, [key |-> newKey, + value |-> keyValueStores.coloB[newKey].value', + replicaCounters |-> keyValueStores.coloB[newKey].replicaCounters'])] + /\ coloCounters' = [k \in COLOS |-> coloCounters[k] + 1] + /\ UNCHANGED dataDiverges + +---- +(**************************************************************************) +(* LeapFrog functions *) +(**************************************************************************) +(* These functions describe the core steps of the leap frog algorithm *) +(**************************************************************************) + +(**************************************************************************) +(* Based on the leap frog state variables, determine if divergence is *) +(* detected. Records diverge iff ALL of the following criteria are met: *) +(**************************************************************************) +(* 1. Records share a key. Records which don't share a key are not *) +(* comparable. *) +(* 2. Values are not the same. If values are the same for a key, then *) +(* they don't diverge. An implementation should define value as the *) +(* user data as well as any metadata which may will effect system *) +(* behavior. Things like timestamps and TTL or whatever metadata *) +(* which might effect future rounds of conflict resolution). *) +(* 3. If the high watermark of observed offsets for all writers to the *) +(* first replica is greater then the offsets recorded for all writers *) +(* have touched this record in the second replica. Another way to put *) +(* it, the first replica has seen at least all events which have *) +(* touched this record in the second replica. *) +(* 3. If the high watermark of observed offsets for all writers to the *) +(* second repolica is greater then the offsets recorded for all all *) +(* writers that have touched this record in the first repolica. *) +(* Another way to put it, the second repolica has seen at least all *) +(* events which have touched this record in the first replica. *) +(**************************************************************************) +DoRecordsDiverge(keyA, keyB, valueA, valueB, HWMreplicaCountersA, + HWMreplicaCountersB, keyreplicaCountersA, keyreplicaCountersB) == + /\ keyA = keyB + /\ valueA # valueB + /\ \A x \in (DOMAIN HWMreplicaCountersA): + HWMreplicaCountersA[x] <= keyreplicaCountersB[x] + /\ \A x \in (DOMAIN HWMreplicaCountersA): + HWMreplicaCountersB[x] <= keyreplicaCountersA[x] + + +(**************************************************************************) +(* At a given step, update all states. This includes adjusting the *) +(* observed high water marks for each WAL at the current WAL position as *) +(* well as the state of those keys at that position. We also increment *) +(* the index at which we've advanced in a given WAL based on which *) +(* high water mark has advanced completely ahead of the other. That is *) +(* if one high watermark for a given replica is ahead for all components *) +(* then we advance the token of the one that is behind. Otherwise we *) +(* advance the other. We don't advance any index beyond the length of *) +(* either WAL *) +(**************************************************************************) +UpdateLeapFrogStates == + /\ leapFrogHighWaterMarks' = [leapFrogHighWaterMarks EXCEPT + !.coloA = MergeRowReplicaCounters(leapFrogHighWaterMarks.coloA, + WALs.coloA[leapFrogIndexes.coloA].replicaCounters), + !.coloB = MergeRowReplicaCounters(leapFrogHighWaterMarks.coloB, + WALs.coloB[leapFrogIndexes.coloB].replicaCounters)] + /\ leapFrogLatestUpdates' = [leapFrogLatestUpdates EXCEPT + !.coloA[WALs.coloA[leapFrogIndexes.coloA].key] = + [value |-> WALs.coloA[leapFrogIndexes.coloA].value, + replicaCounters |-> WALs.coloA[leapFrogIndexes.coloA].replicaCounters], + !.coloB[WALs.coloB[leapFrogIndexes.coloB].key] = + [value |-> WALs.coloB[leapFrogIndexes.coloB].value, + replicaCounters |-> WALs.coloB[leapFrogIndexes.coloB].replicaCounters]] + /\ IF \A n \in DOMAIN leapFrogHighWaterMarks.coloA': + leapFrogHighWaterMarks.coloA[n]' > leapFrogHighWaterMarks.coloB[n]' + THEN leapFrogIndexes' = + [leapFrogIndexes EXCEPT !.coloB = + IF leapFrogIndexes.coloB >= Len(WALs.coloB) + THEN leapFrogIndexes.coloB + ELSE leapFrogIndexes.coloB + 1] + ELSE leapFrogIndexes' = + [leapFrogIndexes EXCEPT !.coloA = + IF leapFrogIndexes.coloA >= Len(WALs.coloA) + THEN leapFrogIndexes.coloA + ELSE leapFrogIndexes.coloA + 1] + + +(**************************************************************************) +(* Optional for production environments, delivers a capstone write to all *) +(* replicas *) +(**************************************************************************) +DeliverControlWrite == + SymmetricWriteWithFullVector(controlKey, controlValue) + +(**************************************************************************) +(* Run the comparison for a given step. Here we deliver our control *) +(* write to all replicas, update our states, and then check if we have *) +(* detected divergence and flag it. *) +(**************************************************************************) +LeapFrogCompare == + /\ DeliverControlWrite + /\ divergenceDetected' = + \E n \in KEY_SPACE : DoRecordsDiverge(n, n, + leapFrogLatestUpdates.coloA[n].value, + leapFrogLatestUpdates.coloB[n].value, + leapFrogHighWaterMarks.coloA, + leapFrogHighWaterMarks.coloB, + leapFrogLatestUpdates.coloA[n].replicaCounters, + leapFrogLatestUpdates.coloB[n].replicaCounters) + /\ UpdateLeapFrogStates + /\ UNCHANGED <> + +---- +(**************************************************************************) +(* Control Functions *) +(**************************************************************************) + +(* Selects a colo from the set of colos to compete with a given colo *) +SelectCompeteingColo(colo) == + CHOOSE otherColo \in COLOS \ {colo} : TRUE + +(**************************************************************************) +(* Write a new random record to a random colo and randomly choose it to *) +(* to be a symmetric write, an asymmetric write (bug) or an eventually *) +(* consistent write *) +(**************************************************************************) +DeliverWrite == + /\ WRITES' = WRITES + 1 + /\ \E <> \in COLOS \X KEYS \X VALUES : + /\ \/ SymmetricWrite(n, k, v) + \/ AsymmetricWrite(n, k, v) + \/ EventuallyConsistentWrite(n, SelectCompeteingColo(n), k, v) + /\ UNCHANGED KEY_SPACE + /\ UNCHANGED leapfrogVars + +Next == + \/ DeliverWrite + \/ LeapFrogCompare + +Init == \ No newline at end of file From ff5d7ccbecb24c9aa9d8fcac3a1daa9e439acbe2 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 26 Jun 2023 14:58:56 -0700 Subject: [PATCH 3/4] more updates --- docs/proposals/VIP-2.md | 225 +++++++++++++++++++++++++++++++--------- 1 file changed, 174 insertions(+), 51 deletions(-) diff --git a/docs/proposals/VIP-2.md b/docs/proposals/VIP-2.md index f0b4535518..5582f5104d 100644 --- a/docs/proposals/VIP-2.md +++ b/docs/proposals/VIP-2.md @@ -5,7 +5,7 @@ parent: Community Guides permalink: /docs/proposals/VIP_TEMPLATE.md --- -# [VIP-2] Removing Per Record Offset Metadata From Venice-Server Storage +# [VIP-2] Removing Per Record Offset Metadata From Venice-Server Storage With Heartbeats * **Status**: Discussion * **Author(s)**: Zac Policzer @@ -16,24 +16,22 @@ permalink: /docs/proposals/VIP_TEMPLATE.md ## Introduction -This VIP explores strategies for removing the offset metadata stored per record by utilizing heartbeats as well as an exploration of what other system benefits can be had by adopting replica heartbeats. +This VIP explores a strategy for removing the offset metadata stored per record in Venice by utilizing replica heartbeats. This VIP also explores what possible other benefits might be had by introducing heartbeats, and why it would be a strategic feature to adopt. ## Problem Statement -Today, Venice stores which have Active/Active replication enabled store N number of offsets per record where N is the number of replicating sites. This is done so as to enable data drift detection via a Venice algorithm called Leap Frog (which we will document in a later section) so that it's possible to detect and flag inconsistencies triggered by outages and bugs with very high granularity. +Today, Venice stores which have Active/Active replication enabled store N number of offsets per record where N is the number of replicating sites. Venice does this in order to enable data drift detection between active sites via a Venice algorithm called Leap Frog (which we will document in a later section of this VIP and accompanying PR). At a high level, Leap Frog enables Venice operators to detect and flag inconsistencies triggered by outages and bugs with very high granularity. -In addition, these offsets are utilized by Venice Change Capture consumers for filtering out duplicated events across version pushes, so that downstream consumers do not get callbacks triggered for events that they already consumed. +In addition, these per record offsets are utilized by Venice Change Capture consumers for filtering out duplicated events across version pushes, so that downstream consumers do not get callbacks triggered for events that they already consumed. -This was a fine approach, however, it presumes that for a given message in an underlying pubsub implementation, that the object used to describe it's position (in Kafka's case, a long) is reasonably small. So long as it's small, the added overhead per record is tolerable. +This was a fine approach, however, it presumes that for a given message in an underlying pubsub implementation, that the object used to describe it's position (in Kafka's case, a long to represent an offset) is reasonably small. So long as it's small, the added overhead per record is tolerable. This is not neceassrily the case. **In at least one PubSub implementation that Venice will need to support, the position object is as large as over 20 Bytes.** -Venice (at least in LinkedIn deploments at time of writing) is a storage bound service, and this high overhead has real cost implications. +Venice (at least in LinkedIn deploments at time of writing) is a storage bound service, and this high overhead has real cost and performance implications. ## Scope -_Call out explicitly:_ - 1. **What is in scope?** We must be able to shrink down or remove this per record overhead in the venice server, but still be able to retain the functionality that we depend on (diff detection and change capture client filtering). @@ -44,68 +42,193 @@ _Call out explicitly:_ ## Project Justification -This project seeks to improve the **performance, scalability, and cost to serve of Venice**. By shrinking the per record metadata we'll not only save on disk, but we should be able to improve the ingestion speed slightly (less bytes is less bytes afterall) and reduce the potential increase in storage cost as new replicating sites (henceforth refered to as **colos**) are added. +This project seeks to improve the **performance, scalability, and cost to serve of Venice**. By shrinking the per record metadata we'll not only save on disk, but we should be able to improve the ingestion speed slightly (by removing some per record overhead) and reduce the potential increase in storage cost as new replicating sites (henceforth refered to as **colos**) are added. ## Proposed Design Before we can discuss proposed designs, it's important to clear up some aspects of per record pubsub positions (henceforth referred to as **offsets**) which have heretofore not been documented in Venice open source (but for which there are some mentions of in the code). ### How Are Per Record Offsets Used Today? -* #### **Leap Frog** +#### **Leap Frog** + +Leap Frog works on the notion that we can detect if two replicas diverge for a record if the following holds true when comparing replica A and B + +1. The value for a certain key on replicas A and B are different AND +2. The record on replica A was received from an offset that falls below a highwatermark of applied events on replica B AND +3. The value on replica B was received from an offset that falls below a highwatermark of applied events on replica A + +If all three are true, then we can detect divergence. The implementation of LeapFrog is based on the consumption of a Venice version topic (as opposed to interrogating the on disk state of Venice). + +We won't deep dive Leap Frog here, but in order to give the reader more context, there is [a TLA+ spec included with this PR](../../specs/TLA%2B/LeapFrog/leapfrog.tla) which explains and and provides some validation of the correctness of the detection algorithm. + +#### **Venice change capture** + +Venice change capture relies on per record metadata in order for the client to be able to screen out duplicate events. Duplicated events either come in the form of duplicate events sent from the PubSub Broker, or events which are applied following a rewind on a version push. It works by maintaining a high watermark state on the client which screens out events which fall below the cached high watermark. + +#### How are offsets stored + +When a record is consumed from a given colo's RT, the offset position that it was consumed is noted and merged into a vector of offsets stored alongside that record. The vector is a set of high watermark offsets consumed from each replicating site. So if we have three sites actively replicating a given partition, we'll store at most three offsets per record. We store this because of the way Venice handles write compute and AA. The state of the record can be the accumulated state of disjoint updates from different colos, and this is why it's not sufficient to store just the LAST update to a given record. + +### What are the base common requirements + +If you squint at both of these, they boil down to relying on the following properties. + +* Being able to build a high watermark based on events consumed +* Being able to causally compare a consumed event/record to this highwatermark and take action. + +We're able to meet the first requirement by just starting at an arbitrary point in a version topic stream and consuming. Every single event has an RMD field that can help a consumer of this data build a highwatermark. And then since every single record we consume has the same information, it's very straightforward to determine the causality of that event relative to a built up high watermark. + +That said, it's not actually a requirement to be able to do this on every single event we consume. It's possible to meet the first two requirements at a courser granularity of updates. + +### Heartbeat Algorithm + +Here we introduce the notion of heartbeats as a means of checkpointing the stream. Today a stream of events looks like this: + +>Record 1: {1, 200, 5000} +> +>Record 2: {0, 201, 4000} +> +>Record 3: {5, 195, 3000} +> +>Record 4: {3, 205, 5001} + +From the above, if we consumed events 1-3 we'd construct a high water mark of {5, 201, 5000}, and if we looked at Record 4, we could see that it comes after our highwatermark as it's middle component vector (205) is higher then the middle component vector of our high water mark (which is at 201). + +Now, lets see if we can try and come to a similar conclusion with minimal information. **minimal infromation** means that we only have access to which active site and at what offset this event was triggered by in a colo's upstream real time topic. This is essentially all the information we have on hand after ingesting an event in the server. If we publish just that information into the version topic, we would have an event stream which looked like: + +>Record 1: {_, 200, _} +> +>Record 2: {_, 201, _} +> +>Record 3: {5, _, _} +> +>Record 4: {_, _, 5001} + +If we consume events 1-3, we'll build a high water mark of {5, 201, _} and when consuming record 4, it's very hard to actually determine if this record actually comes from before or after our high water mark. We don't have enough information about the current state to definitively come to that conclusion. + +So lets see if we can make this work with heartbeats. The original issue with the aforementioned scenario was that we didn't have enough information about our third active site in order to make a call on the causality of Record 4. -* #### **Venice Change Capture** +>HEARTBEAT: {_, _, 4500} +> +>Record 1: {_, 200, _} +> +>Record 2: {_, 201, _} +> +>Record 3: {5, _, _} +> +>Record 4: {_, _, 5001} -_This section must describe the proposed design and implementation in detail, and how it -addresses the problems outlined in the problem statement. It must provide details on alternative solutions -that were considered, stating clearly why they lost out to the final design. This latter point is particularly -important or else we tend to vacillate on old decisions because we cannot remember the original rationale. -Phrase this section in terms of trade offs since it is rare that one approach is better in all ways; there is -generally a tradeoff and making that explicit makes it much easier to understand._ +Now, if we consume that heartbeat and read up to record 3, we'll have a high water mark of {5, 201, 4500}, and when looking at record 4, we can see that it has a vector component that is higher then our high water mark. -_The following aspects of the design must be covered when applicable:_ +The reason why the above works is because we're able to take for granted that we'll at some point either in the past or future get at least one update to help us build our high water mark. -1. _Changes to APIs or protocols or data format highlighting any compatibility issues and how they will be addressed._ -2. _Major components and sequence diagrams between them._ -3. _Multi-region implications including Parent/Child Controller communication._ -4. _Alternative designs considered, and brief explanation of tradeoffs and decision versus the chosen design, -and important aspects to consider_: - 1. Security. - 2. Performance. - 3. Maintainability. - 4. Operability. - 5. Scalability. - 6. Compatibility. - 7. Testability. - 8. Risks/Mitigations. - 9. Overall effort. - 10. Leverageable. - 11. Align with the long-term vision. -5. Preferred option by the designer/author. - 1. _Conclusions/Decisions made in the design review sessions. Explanation why it’s selected_ +Now, the presented example is somewhat simplistic and there are some edge cases. For example, what if our sequence of events was with a slightly different heartbeat. Something like: -## Development Milestones -_This section described milestones for rather large projects so that we can set up intermediate goals for -better progress management. -For small projects, this may not be necessary._ +>HEARTBEAT: {_, 199, _} +> +>Record 1: {_, 200, _} +> +>Record 2: {_, 201, _} +> +>Record 3: {5, _, _} +> +>Record 4: {_, _, 5001} -## Test Plan +This is problematic. If reading from the heartbeat up to record 3, then we're stuck in the same situation as the minimal data example. So this informs that we need to be able to adhere to a property where when doing this kind of evaluation, we're able to assemble some information about all replicating colos. We can avoid this in all cases by making sure the heartbeat has all potentially revelevant information. This would now look like: -_Describe in few sentences how the functionality will be tested. - How will we know that the implementation works as expected? How will we know nothing broke?_ +>HEARTBEAT: {3, 199, 4500} +> +>Record 1: {_, 200, _} +> +>Record 2: {_, 201, _} +> +>Record 3: {5, _, _} +> +>Record 4: {_, _, 5001} -1. _What unit tests would be added to cover the critical logic?_ -2. _What integration tests would be added to cover major components, if they cannot be tested by unit tests?_ -3. _Any tests to cover performance or Security concerns._ +The heartbeat is now a published baseline that we can use to build our highwatermark when doing the comparison. +**This approach is advantageous because it means we no longer have to merge together offset vectors in per row RMD's on venice servers. We need only have to publish on hand information to the version topic for a given store partiiton** -## References +#### Leap Frog with heartbeats (two colos) -_List any references here._ +Lets look at what this implies for Leap Frog. Leap Frog compares a high water mark and a record that both are remote from each other. -## Appendix +>**COLO A** +> +>HEARTBEAT: {3, 199, 4500} +> +>Record 1: {_, 200, _} +> +>Record 2: {_, 201, _} +> +>Record 3: {5, _, _} +> +>Record 4: {_, _, 5001} -_Add supporting information here, such as the results of performance studies and back-of-the-envelope calculations. -Organize it however you want but make sure it is comprehensible to your readers._ +>**COLO B** +> +>HEARTBEAT: {4, 199, 4900} +> +>Record 1: {_, 200, _} +> +>Record 2: {_, _, 4950} +> +>Record 3: {_, 300, _} +> +>Record 4: {20, _, _} + +Originally, we only needed the highwatermark of coloA and the rmd of an individual record in coloB (and vice versa going the other way). However, in this context, we need more information. Each record will only have the offset of the LAST upsream RT message which touched the row. We can backfill a bit more information based on the last heartbeat. Since everything is produced in serial in increasing offsets we know that the other updates came from an offset previous to the published highwatermark in the last received heartbeat. So, this looks like: + +>**COLO A** +> +>HEARTBEAT: {3, 199, 4500} +> +>Record 1: {<3, 200, <4500} +> +>Record 2: {<3, 201, <4500} +> +>Record 3: {5, <199, <4500} +> +>Record 4: {<3, <199, 5001} + +>**COLO B** +> +>HEARTBEAT: {4, 199, 4900} +> +>Record 1: {<4, 200, <4900} +> +>Record 2: {<4, <199, 4950} +> +>Record 3: {<4, 300, <4900} +> +>Record 4: {20, <199, <4900} + +Now this starts to look a bit more like what we have today, just at a courser granularity. This has advantages and disadvantages. One nice advantage is that heartbeat records can be used as logical markers for chunks of the version topic stream which can be directly compared (keys which have value mismatch or don't appear within comparable ranges are either divergent or completely missing). + +There is a downside however. Previously, we could detect divergence at a very fine granularity (essentially, at every write). In previous testing, this was valuable because in order to trigger some bugs, we had to write very aggressively to a small keyspace, so it was useful to be able to detect and flag this divergence at a situation where the divergence only existed for less then a second. **With the above approach, it is actually only possible to detect when records diverge based on the compacted view of records between heartbeats**. + +### Other Advantages To Be Had From Heartbeats + +Heartbeats are something that we've talked about internally in a few contexts. Some possible advantages that come to mind: + +* **Time Based Checkpointing Ecosystem:** +If a heartbeat were to also carry with it some notion of time, we could use it to checkpoint the upstream RT with all systems which follow. Verion topics, change capture topics, views, and ETL's can all be checkpointed based on these upstream heartbeats and ca now be tied together in a coherent way. + +* **Transmit and Restore DIV State:** +If a leader were broadcasting their state with each heartbeat message to downstream consumers like followers via VT, then DIV state could be restored should a Leader node go offline. This would clean up DIV error false positives (or negatives?) potentially triggered from follower rewind. + +* **Proactive Fault Detection:** +Today, our lag monitoring relies on users transmitting writes to Venice. If a fault has ocurred and no one is transmitting data, we are none the wiser. Heartbeats coupled with adequate metric emission on the time since last received heartbeat would help us detect these problems before they become a problem for end users. + +### Implementation + +**THIS IS SECTION IS WIP** + +#### **Leaders Emit Heartbeats** +The suggestion here is that for every leader for every store partition, the leader emits a heartbeat control message directly to it's local RT. It then consumes this message, and publishes another control message into local VT, which will then be consumed by followers. Meanwhile, other leaders in remote colo's also consume the remote RT event and do the same. The idea here is that by emitting directly to the top of the replication pipeline, all parties which participate in consuming this stream of data (PubSub broker, Leader, Followers, remote nodes, clients, etc.) can all individually give testimony to being in working order. Leaders upon receiving the message publish relevant metadata in the form of their local upstream RT highwatermarks, and DIV/timestamp information. + +### **Other Ideas (RFC's)** From e53ff9079762fc669ee7e569b697496f7646a215 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Mon, 26 Jun 2023 15:01:46 -0700 Subject: [PATCH 4/4] Remove vip-1 copy pasta --- docs/proposals/VIP-2.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/proposals/VIP-2.md b/docs/proposals/VIP-2.md index 5582f5104d..cf76c21cc5 100644 --- a/docs/proposals/VIP-2.md +++ b/docs/proposals/VIP-2.md @@ -1,6 +1,6 @@ --- layout: default -title: VIP-1 +title: [VIP-2] Removing Per Record Offset Metadata From Venice-Server Storage With Heartbeats parent: Community Guides permalink: /docs/proposals/VIP_TEMPLATE.md ---