diff --git a/tlaplus/ghostferry.tla b/tlaplus/ghostferry.tla index 02b13962..3ed22fa2 100644 --- a/tlaplus/ghostferry.tla +++ b/tlaplus/ghostferry.tla @@ -165,12 +165,13 @@ ***************************************************************************) -EXTENDS Integers, Sequences, TLC +EXTENDS Integers, Sequences, TLC, FiniteSets \* Helper Methods \* ============== SetMin(S) == CHOOSE i \in S: \A j \in S : i <= j +Range(S) == { S[x] : x \in DOMAIN S } \* Constant Declarations \* ===================== @@ -189,6 +190,45 @@ updated to an element \in Records. ***************************************************************************) CONSTANT TableCapacity +(*************************************************************************** +In the case where Ghostferry is restarted, we need to start at a binlog +position that is not the current position. Since the model does not store past +binlog events, we cannot model it that way. Instead, we simply generate +some binlog entries (and the same number of changes on the SourceTable). + +Another couple of ways to think about this parameter: + +- Number of changes that occured while Ghostferry is down before being resumed. +- The current master position of the source database at the moment Ghostferry + starts, while the last successful binlog position would be 0. + +Example: 0 if not running the reconciliation step. + 2, 3, some small-ish number if running the reconciliation step. + ***************************************************************************) +CONSTANT InitialTargetBinlogPos + +(*************************************************************************** +In the case where Ghostferry is restarted, we can restart at a particular +location saved from previously. This parameter does exactly that. This number +should be 0 if you don't care about this step. + +Example: 1 + ***************************************************************************) +CONSTANT LastSuccessfulPrimaryKey + +(*************************************************************************** +The number of rows/binlogs that have already been copied to the target +database. This is to emulate a case where Ghostferry is shutdown but the saved +cursor and binlog positions are out of date. + +Note that RowsOvercopied must be lower than TableCapacity - +LastSuccessfulPrimaryKey. BinlogOvercopied must be smaller than MaxBinlogSize. + +Example: 1 + ***************************************************************************) +CONSTANT RowsOvercopied +CONSTANT BinlogOvercopied + (*************************************************************************** Records defines the set of possible records that can be written to the database. The set of model values can be symmetrical because the starting order @@ -213,30 +253,34 @@ CONSTANTS TableIterator, BinlogStreamer, Application, Ferry \* A set of possible records for TypeOK PossibleRecords == Records \cup {NoRecordHere} +\* A set of tables up to TableCapacity, but have different entries in them. InitialTables == [1..TableCapacity -> PossibleRecords] \* The set of all possible primary key PrimaryKeys == 1..TableCapacity +\* This generates all the permutations of the rows that are changed. The number +\* of changes must equal InitialTargetBinlogSize. +RowsChanged == {key \in [1..InitialTargetBinlogPos -> 1..TableCapacity] : Cardinality(Range(key)) = InitialTargetBinlogPos} + +ComputeMaxPrimaryKey(table, capacity) == IF table[capacity] # NoRecordHere + THEN capacity + ELSE + IF \A i \in DOMAIN table : table[i] = NoRecordHere + THEN 0 + ELSE (CHOOSE i \in DOMAIN table : \E j \in DOMAIN table : table[i] # NoRecordHere /\ table[j] = NoRecordHere /\ i = j - 1) + (*************************************************************************** --algorithm ghostferry { variables - - \* The source table is initialized with the given InitialTable. - \* The target table is initialized with the same capacity but has no records - \* associated with any of the primary key (in the real world: no rows exists) - SourceTable \in InitialTables, - TargetTable = [k \in PrimaryKeys |-> NoRecordHere], - - \* CurrentMaxPrimaryKey indicates the length of the table as opposed to the - \* capacity of the table. - MaxPrimaryKey = IF SourceTable[TableCapacity] # NoRecordHere - THEN TableCapacity - ELSE - IF \A i \in DOMAIN SourceTable : SourceTable[i] = NoRecordHere - THEN 0 - ELSE (CHOOSE i \in DOMAIN SourceTable : \E j \in DOMAIN SourceTable : SourceTable[i] # NoRecordHere /\ SourceTable[j] = NoRecordHere /\ i = j - 1), - CurrentMaxPrimaryKey = MaxPrimaryKey, + \* We can pick an InitialTable from the permutations of possible tables. + \* The target table is initialized with the same capacity but has no records, + \* unless LastSuccessfulPrimaryKey is set to >0, in which case the record + \* is set to equal the ones found on the SourceTable as we have copied it + \* previously. + InitialTable \in InitialTables, + SourceTable = InitialTable, + TargetTable = [k \in PrimaryKeys |-> IF k <= LastSuccessfulPrimaryKey + RowsOvercopied THEN SourceTable[k] ELSE NoRecordHere], \* The binlogs are modeled as a list of binlog events. \* The size of the binlog is constrainted to MaxBinlogSize via @@ -257,12 +301,29 @@ PrimaryKeys == 1..TableCapacity \* This is set to TRUE to stop all components of Ghostferry and Ghostferry \* should terminate after finishing streaming all the binlog events. - BinlogStreamingStopRequested = FALSE; + BinlogStreamingStopRequested = FALSE, + + ChangesApplied = FALSE, + FerryThreadsStarted = FALSE, + + \* In the actual application, we have to set this as an argument. In the + \* model, it is always 0. + LastSuccessfulBinlogPos = 0, + + \* MaxPrimaryKey, initialized so the application model can use it to not + \* create holes in the source database. + MaxPrimaryKey = ComputeMaxPrimaryKey(SourceTable, TableCapacity), + CurrentMaxPrimaryKey = MaxPrimaryKey; + + macro DetermineMaxPrimaryKey() { + MaxPrimaryKey := ComputeMaxPrimaryKey(SourceTable, TableCapacity); + CurrentMaxPrimaryKey := MaxPrimaryKey; + }; fair process (ProcTableIterator = TableIterator) variables - lastSuccessfulPK = 0, \* Last PK successfully applied to the target db. - currentRow; \* The current row's data + lastSuccessfulPK = LastSuccessfulPrimaryKey, \* Last PK successfully applied to the target db. + currentRow; \* The current row's data { \* Note that tblit_rw is an atomic step. If the read and write steps are \* two distinct steps, this could cause a race condition that will cause @@ -281,6 +342,8 @@ PrimaryKeys == 1..TableCapacity \* It may be possible to perform some sort of locking via the Application, \* but this seems cumbersome and prone to implementation level error. \* TODO: model this with TLA+ and validate its correctness. + tblit_wait: await FerryThreadsStarted = TRUE; + DetermineMaxPrimaryKey(); tblit_loop: while (lastSuccessfulPK < MaxPrimaryKey) { tblit_rw: currentRow := SourceTable[lastSuccessfulPK + 1]; if (currentRow # NoRecordHere) { @@ -292,22 +355,22 @@ PrimaryKeys == 1..TableCapacity fair process (ProcBinlogStreamer = BinlogStreamer) variables - lastSuccessfulBinlogPos = 0, \* Last binlog pos successfully applied on the target db currentBinlogEntry; \* The binlog event that is currently being read { - binlog_loop: while (BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ lastSuccessfulBinlogPos < TargetBinlogPos)) { + binlog_wait: await FerryThreadsStarted = TRUE; + binlog_loop: while (BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ LastSuccessfulBinlogPos < TargetBinlogPos)) { \* We cannot use an await as there could be a deadlock for \* when the application is set to read only and thus nothing \* else writes to the database. \* \* This also means in the real implementation we need a \* non-blocking read for the binlog. - binlog_read: if (lastSuccessfulBinlogPos < Len(SourceBinlog)) { - currentBinlogEntry := SourceBinlog[lastSuccessfulBinlogPos + 1]; + binlog_read: if (LastSuccessfulBinlogPos < Len(SourceBinlog)) { + currentBinlogEntry := SourceBinlog[LastSuccessfulBinlogPos + 1]; binlog_write: if (TargetTable[currentBinlogEntry.pk] = currentBinlogEntry.oldr) { TargetTable[currentBinlogEntry.pk] := currentBinlogEntry.newr; }; - binlog_upkey: lastSuccessfulBinlogPos := lastSuccessfulBinlogPos + 1; + binlog_upkey: LastSuccessfulBinlogPos := LastSuccessfulBinlogPos + 1; }; } } @@ -330,8 +393,11 @@ PrimaryKeys == 1..TableCapacity variables oldRecord, newRecord, - chosenPK, + chosenPK; { + app_wait: if (InitialTargetBinlogPos > 0) { + await ChangesApplied = TRUE; + }; app_loop: while (ApplicationReadonly = FALSE) { \* Choose a "random" PK to update. app_write: with (pk \in 1..SetMin({TableCapacity, CurrentMaxPrimaryKey + 1})) { @@ -363,26 +429,69 @@ PrimaryKeys == 1..TableCapacity if (oldRecord = NoRecordHere /\ chosenPK > CurrentMaxPrimaryKey) { assert (chosenPK - 1 = CurrentMaxPrimaryKey); CurrentMaxPrimaryKey := chosenPK; - } + }; } } (*********************************************************************** - In the actual code, the Ferry class would have started all of the above. - This is unnecessary here as it is done via the Next definition in TLA+ - (automatically generated from PlusCal). Thus, the Ferry here is really - the Ferry that each application must implement: + In the model, the ferry code actually performs the changes while + Ghostferry is down, before running the reconciliation. This is obviously + not needed in the real code. - 1. Waiting until the DataITerator is finished copying data. - 2. Perform the cutover operation (setting the source to be read only). - 3. Instruct the BinlogStreamer to quit after streaming. + Each apllication only needs to implement these steps: + + 1. Perform reconciliation if applicable. + 2. Start all threads/goroutines. + 3. Waiting until the DataIterator is finished copying data. + 4. Perform the cutover operation (setting the source to be read only). + 5. Instruct the BinlogStreamer to quit after streaming. Note that setting the target binlog position and requesting binlog streaming to stop are two distinct steps. Making them one atomic step is not realistic unless we implement a lock. With two distinct steps, if the steps are reversed, a race condition will be present. ***********************************************************************) - fair process (ProcFerry = Ferry) { + fair process (ProcFerry = Ferry) + variables + currentChangeToPerform = 1, + changedRowIds; + { + ferry_reconcile: if (InitialTargetBinlogPos > 0) { + ferry_pickchanges: with (rowIds \in RowsChanged) { + changedRowIds := rowIds; + }; + + ferry_runchanges: while (currentChangeToPerform <= InitialTargetBinlogPos) { + with (pk = changedRowIds[currentChangeToPerform]) { + with (r \in PossibleRecords \ {SourceTable[pk]}) { + SourceBinlog := Append( + SourceBinlog, + [ + pk |-> pk, + oldr |-> SourceTable[pk], + newr |-> r + ] + ); + + SourceTable[pk] := r; + if (currentChangeToPerform <= BinlogOvercopied) { + TargetTable[pk] := r; + }; + currentChangeToPerform := currentChangeToPerform + 1; + }; + }; + }; + ChangesApplied := TRUE; \* Start the application inserting now + + ferry_reconc_loop: while (LastSuccessfulBinlogPos < InitialTargetBinlogPos) { + LastSuccessfulBinlogPos := LastSuccessfulBinlogPos + 1; + ferry_reconc_del: TargetTable[SourceBinlog[LastSuccessfulBinlogPos].pk] := NoRecordHere; + ferry_reconc_ins: if (SourceBinlog[LastSuccessfulBinlogPos].pk <= LastSuccessfulPrimaryKey) { + TargetTable[SourceBinlog[LastSuccessfulBinlogPos].pk] := SourceTable[SourceBinlog[LastSuccessfulBinlogPos].pk]; + }; + }; + }; + ferry_start: FerryThreadsStarted := TRUE; ferry_setro: await pc[TableIterator] = "Done"; ApplicationReadonly := TRUE; ferry_waitro: await pc[Application] = "Done"; @@ -394,60 +503,79 @@ PrimaryKeys == 1..TableCapacity ***************************************************************************) \* BEGIN TRANSLATION CONSTANT defaultInitValue -VARIABLES SourceTable, TargetTable, MaxPrimaryKey, CurrentMaxPrimaryKey, - SourceBinlog, ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, pc, lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, - chosenPK - -vars == << SourceTable, TargetTable, MaxPrimaryKey, CurrentMaxPrimaryKey, - SourceBinlog, ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, pc, lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, - chosenPK >> +VARIABLES InitialTable, SourceTable, TargetTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, pc, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds + +vars == << InitialTable, SourceTable, TargetTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, pc, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> ProcSet == {TableIterator} \cup {BinlogStreamer} \cup {Application} \cup {Ferry} Init == (* Global variables *) - /\ SourceTable \in InitialTables - /\ TargetTable = [k \in PrimaryKeys |-> NoRecordHere] - /\ MaxPrimaryKey = (IF SourceTable[TableCapacity] # NoRecordHere - THEN TableCapacity - ELSE - IF \A i \in DOMAIN SourceTable : SourceTable[i] = NoRecordHere - THEN 0 - ELSE (CHOOSE i \in DOMAIN SourceTable : \E j \in DOMAIN SourceTable : SourceTable[i] # NoRecordHere /\ SourceTable[j] = NoRecordHere /\ i = j - 1)) - /\ CurrentMaxPrimaryKey = MaxPrimaryKey + /\ InitialTable \in InitialTables + /\ SourceTable = InitialTable + /\ TargetTable = [k \in PrimaryKeys |-> IF k <= LastSuccessfulPrimaryKey + RowsOvercopied THEN SourceTable[k] ELSE NoRecordHere] /\ SourceBinlog = <<>> /\ ApplicationReadonly = FALSE /\ TargetBinlogPos = 0 /\ BinlogStreamingStopRequested = FALSE + /\ ChangesApplied = FALSE + /\ FerryThreadsStarted = FALSE + /\ LastSuccessfulBinlogPos = 0 + /\ MaxPrimaryKey = ComputeMaxPrimaryKey(SourceTable, TableCapacity) + /\ CurrentMaxPrimaryKey = MaxPrimaryKey (* Process ProcTableIterator *) - /\ lastSuccessfulPK = 0 + /\ lastSuccessfulPK = LastSuccessfulPrimaryKey /\ currentRow = defaultInitValue (* Process ProcBinlogStreamer *) - /\ lastSuccessfulBinlogPos = 0 /\ currentBinlogEntry = defaultInitValue (* Process ProcApplication *) /\ oldRecord = defaultInitValue /\ newRecord = defaultInitValue /\ chosenPK = defaultInitValue - /\ pc = [self \in ProcSet |-> CASE self = TableIterator -> "tblit_loop" - [] self = BinlogStreamer -> "binlog_loop" - [] self = Application -> "app_loop" - [] self = Ferry -> "ferry_setro"] + (* Process ProcFerry *) + /\ currentChangeToPerform = 1 + /\ changedRowIds = defaultInitValue + /\ pc = [self \in ProcSet |-> CASE self = TableIterator -> "tblit_wait" + [] self = BinlogStreamer -> "binlog_wait" + [] self = Application -> "app_wait" + [] self = Ferry -> "ferry_reconcile"] + +tblit_wait == /\ pc[TableIterator] = "tblit_wait" + /\ FerryThreadsStarted = TRUE + /\ MaxPrimaryKey' = ComputeMaxPrimaryKey(SourceTable, TableCapacity) + /\ CurrentMaxPrimaryKey' = MaxPrimaryKey' + /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_loop"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> tblit_loop == /\ pc[TableIterator] = "tblit_loop" /\ IF lastSuccessfulPK < MaxPrimaryKey THEN /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_rw"] ELSE /\ pc' = [pc EXCEPT ![TableIterator] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> tblit_rw == /\ pc[TableIterator] = "tblit_rw" /\ currentRow' = SourceTable[lastSuccessfulPK + 1] @@ -456,48 +584,70 @@ tblit_rw == /\ pc[TableIterator] = "tblit_rw" ELSE /\ TRUE /\ UNCHANGED TargetTable /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_upkey"] - /\ UNCHANGED << SourceTable, MaxPrimaryKey, CurrentMaxPrimaryKey, - SourceBinlog, ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> tblit_upkey == /\ pc[TableIterator] = "tblit_upkey" /\ lastSuccessfulPK' = lastSuccessfulPK + 1 /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_loop"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> - -ProcTableIterator == tblit_loop \/ tblit_rw \/ tblit_upkey + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, changedRowIds >> + +ProcTableIterator == tblit_wait \/ tblit_loop \/ tblit_rw \/ tblit_upkey + +binlog_wait == /\ pc[BinlogStreamer] = "binlog_wait" + /\ FerryThreadsStarted = TRUE + /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> binlog_loop == /\ pc[BinlogStreamer] = "binlog_loop" - /\ IF BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ lastSuccessfulBinlogPos < TargetBinlogPos) + /\ IF BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ LastSuccessfulBinlogPos < TargetBinlogPos) THEN /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_read"] ELSE /\ pc' = [pc EXCEPT ![BinlogStreamer] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> binlog_read == /\ pc[BinlogStreamer] = "binlog_read" - /\ IF lastSuccessfulBinlogPos < Len(SourceBinlog) - THEN /\ currentBinlogEntry' = SourceBinlog[lastSuccessfulBinlogPos + 1] + /\ IF LastSuccessfulBinlogPos < Len(SourceBinlog) + THEN /\ currentBinlogEntry' = SourceBinlog[LastSuccessfulBinlogPos + 1] /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_write"] ELSE /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] /\ UNCHANGED currentBinlogEntry - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, oldRecord, - newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> binlog_write == /\ pc[BinlogStreamer] = "binlog_write" /\ IF TargetTable[currentBinlogEntry.pk] = currentBinlogEntry.oldr @@ -505,37 +655,58 @@ binlog_write == /\ pc[BinlogStreamer] = "binlog_write" ELSE /\ TRUE /\ UNCHANGED TargetTable /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_upkey"] - /\ UNCHANGED << SourceTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, + changedRowIds >> binlog_upkey == /\ pc[BinlogStreamer] = "binlog_upkey" - /\ lastSuccessfulBinlogPos' = lastSuccessfulBinlogPos + 1 + /\ LastSuccessfulBinlogPos' = LastSuccessfulBinlogPos + 1 /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, currentBinlogEntry, oldRecord, - newRecord, chosenPK >> - -ProcBinlogStreamer == binlog_loop \/ binlog_read \/ binlog_write - \/ binlog_upkey + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, + changedRowIds >> + +ProcBinlogStreamer == binlog_wait \/ binlog_loop \/ binlog_read + \/ binlog_write \/ binlog_upkey + +app_wait == /\ pc[Application] = "app_wait" + /\ IF InitialTargetBinlogPos > 0 + THEN /\ ChangesApplied = TRUE + ELSE /\ TRUE + /\ pc' = [pc EXCEPT ![Application] = "app_loop"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, currentBinlogEntry, + oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> app_loop == /\ pc[Application] = "app_loop" /\ IF ApplicationReadonly = FALSE THEN /\ pc' = [pc EXCEPT ![Application] = "app_write"] ELSE /\ pc' = [pc EXCEPT ![Application] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, currentBinlogEntry, + oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> app_write == /\ pc[Application] = "app_write" /\ \E pk \in 1..SetMin({TableCapacity, CurrentMaxPrimaryKey + 1}): @@ -553,65 +724,199 @@ app_write == /\ pc[Application] = "app_write" ) /\ SourceTable' = [SourceTable EXCEPT ![chosenPK'] = newRecord'] /\ IF oldRecord' = NoRecordHere /\ chosenPK' > CurrentMaxPrimaryKey - THEN /\ Assert((chosenPK' - 1 = CurrentMaxPrimaryKey), - "Failure of assertion at line 365, column 21.") + THEN /\ Assert((chosenPK' - 1 = CurrentMaxPrimaryKey), + "Failure of assertion at line 430, column 21.") /\ CurrentMaxPrimaryKey' = chosenPK' ELSE /\ TRUE /\ UNCHANGED CurrentMaxPrimaryKey /\ pc' = [pc EXCEPT ![Application] = "app_loop"] - /\ UNCHANGED << TargetTable, MaxPrimaryKey, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry >> - -ProcApplication == app_loop \/ app_write + /\ UNCHANGED << InitialTable, TargetTable, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + lastSuccessfulPK, currentRow, currentBinlogEntry, + currentChangeToPerform, changedRowIds >> + +ProcApplication == app_wait \/ app_loop \/ app_write + +ferry_reconcile == /\ pc[Ferry] = "ferry_reconcile" + /\ IF InitialTargetBinlogPos > 0 + THEN /\ pc' = [pc EXCEPT ![Ferry] = "ferry_pickchanges"] + ELSE /\ pc' = [pc EXCEPT ![Ferry] = "ferry_start"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> + +ferry_pickchanges == /\ pc[Ferry] = "ferry_pickchanges" + /\ \E rowIds \in RowsChanged: + changedRowIds' = rowIds + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_runchanges"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform >> + +ferry_runchanges == /\ pc[Ferry] = "ferry_runchanges" + /\ IF currentChangeToPerform <= InitialTargetBinlogPos + THEN /\ LET pk == changedRowIds[currentChangeToPerform] IN + \E r \in PossibleRecords \ {SourceTable[pk]}: + /\ SourceBinlog' = Append( + SourceBinlog, + [ + pk |-> pk, + oldr |-> SourceTable[pk], + newr |-> r + ] + ) + /\ SourceTable' = [SourceTable EXCEPT ![pk] = r] + /\ IF currentChangeToPerform <= BinlogOvercopied + THEN /\ TargetTable' = [TargetTable EXCEPT ![pk] = r] + ELSE /\ TRUE + /\ UNCHANGED TargetTable + /\ currentChangeToPerform' = currentChangeToPerform + 1 + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_runchanges"] + /\ UNCHANGED ChangesApplied + ELSE /\ ChangesApplied' = TRUE + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_loop"] + /\ UNCHANGED << SourceTable, TargetTable, + SourceBinlog, + currentChangeToPerform >> + /\ UNCHANGED << InitialTable, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, changedRowIds >> + +ferry_reconc_loop == /\ pc[Ferry] = "ferry_reconc_loop" + /\ IF LastSuccessfulBinlogPos < InitialTargetBinlogPos + THEN /\ LastSuccessfulBinlogPos' = LastSuccessfulBinlogPos + 1 + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_del"] + ELSE /\ pc' = [pc EXCEPT ![Ferry] = "ferry_start"] + /\ UNCHANGED LastSuccessfulBinlogPos + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, + changedRowIds >> + +ferry_reconc_del == /\ pc[Ferry] = "ferry_reconc_del" + /\ TargetTable' = [TargetTable EXCEPT ![SourceBinlog[LastSuccessfulBinlogPos].pk] = NoRecordHere] + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_ins"] + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> + +ferry_reconc_ins == /\ pc[Ferry] = "ferry_reconc_ins" + /\ IF SourceBinlog[LastSuccessfulBinlogPos].pk <= LastSuccessfulPrimaryKey + THEN /\ TargetTable' = [TargetTable EXCEPT ![SourceBinlog[LastSuccessfulBinlogPos].pk] = SourceTable[SourceBinlog[LastSuccessfulBinlogPos].pk]] + ELSE /\ TRUE + /\ UNCHANGED TargetTable + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_loop"] + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> + +ferry_start == /\ pc[Ferry] = "ferry_start" + /\ FerryThreadsStarted' = TRUE + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_setro"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, changedRowIds >> ferry_setro == /\ pc[Ferry] = "ferry_setro" /\ pc[TableIterator] = "Done" /\ ApplicationReadonly' = TRUE /\ pc' = [pc EXCEPT ![Ferry] = "ferry_waitro"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, changedRowIds >> ferry_waitro == /\ pc[Ferry] = "ferry_waitro" /\ pc[Application] = "Done" /\ pc' = [pc EXCEPT ![Ferry] = "ferry_binlogpos"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> ferry_binlogpos == /\ pc[Ferry] = "ferry_binlogpos" /\ TargetBinlogPos' = Len(SourceBinlog) /\ pc' = [pc EXCEPT ![Ferry] = "ferry_binlogstop"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, - BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> ferry_binlogstop == /\ pc[Ferry] = "ferry_binlogstop" /\ BinlogStreamingStopRequested' = TRUE /\ pc' = [pc EXCEPT ![Ferry] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> - -ProcFerry == ferry_setro \/ ferry_waitro \/ ferry_binlogpos - \/ ferry_binlogstop + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, ChangesApplied, + FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> + +ProcFerry == ferry_reconcile \/ ferry_pickchanges \/ ferry_runchanges + \/ ferry_reconc_loop \/ ferry_reconc_del + \/ ferry_reconc_ins \/ ferry_start \/ ferry_setro + \/ ferry_waitro \/ ferry_binlogpos \/ ferry_binlogstop Next == ProcTableIterator \/ ProcBinlogStreamer \/ ProcApplication \/ ProcFerry @@ -631,7 +936,7 @@ Termination == <>(\A self \in ProcSet: pc[self] = "Done") \* Safety Constraints \* ================== -SourceTargetEquality == (\A self \in ProcSet: pc[self] = "Done") => (SourceTable = TargetTable) /\ PrintT(<<"Source", SourceTable, "Target", TargetTable>>) +SourceTargetEquality == (\A self \in ProcSet: pc[self] = "Done") => (SourceTable = TargetTable) \* Action Constraints \* ================== @@ -647,5 +952,5 @@ BinlogSizeActionConstraint == Len(SourceBinlog) <= MaxBinlogSize ============================================================================= \* Modification History -\* Last modified Mon Jun 25 16:25:19 EDT 2018 by shuhao +\* Last modified Mon Jul 09 10:43:17 EDT 2018 by shuhao \* Created Thu Jan 18 11:35:40 EST 2018 by shuhao diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch index 28a4afc6..1c0cca52 100644 --- a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch @@ -19,11 +19,12 @@ - - + + + @@ -40,6 +41,10 @@ + + + + diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithOverCopiedReconciliationModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithOverCopiedReconciliationModel.launch new file mode 100644 index 00000000..1d144804 --- /dev/null +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithOverCopiedReconciliationModel.launch @@ -0,0 +1,61 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithReconciliationModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithReconciliationModel.launch new file mode 100644 index 00000000..b08d2fba --- /dev/null +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithReconciliationModel.launch @@ -0,0 +1,61 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +