From b7e1171ab4b748cfde4c36d1236f1185ee9a5638 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Mon, 25 Jun 2018 14:06:36 -0400 Subject: [PATCH 1/3] ASSUME NoRecordHere \notin Records --- tlaplus/ghostferry.tla | 14 ++++++++------ .../ghostferry___GhostferryPrimaryModel.launch | 5 ++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tlaplus/ghostferry.tla b/tlaplus/ghostferry.tla index f281d5de4..0eb5a8a8a 100644 --- a/tlaplus/ghostferry.tla +++ b/tlaplus/ghostferry.tla @@ -191,12 +191,18 @@ CONSTANT InitialTable CONSTANT MaxPrimaryKey (*************************************************************************** -This defines the set of possible records that can be written to the +Records defines the set of possible records that can be written to the database. Example: {r0, r1} + +NoRecordHere serves as a placeholder for saying that the row with that id +does not exist in the database. This can be defined as a regular model value +in TLC. ***************************************************************************) CONSTANT Records +CONSTANT NoRecordHere +ASSUME NoRecordHere \notin Records (*************************************************************************** These are defined as ModelValues that will serve as the identifier to the @@ -210,10 +216,6 @@ TableCapacity == Len(InitialTable) \* The set of all possible primary key PrimaryKeys == 1..TableCapacity -\* This value cannot be computed by the TLC. Use the Model to override it to be -\* a model value instead. -NoRecordHere == CHOOSE r : r \notin Records - \* A set of possible records for TypeOK PossibleRecords == Records \cup {NoRecordHere} @@ -626,5 +628,5 @@ BinlogSizeActionConstraint == Len(SourceBinlog) <= MaxBinlogSize ============================================================================= \* Modification History -\* Last modified Thu Jan 25 10:09:10 EST 2018 by shuhao +\* Last modified Mon Jun 25 14:04:50 EDT 2018 by shuhao \* Created Thu Jan 18 11:35:40 EST 2018 by shuhao diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch index 669284d05..a5ec0b410 100644 --- a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch @@ -40,11 +40,10 @@ - - - + + From 52e15cb54f2a28d261bd8d5034766a0c5e8fb45a Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Mon, 25 Jun 2018 16:25:32 -0400 Subject: [PATCH 2/3] Starting table defined by size, not state. Instead of checking only a particular table, we can now check all tables and ensure that Ghostferry works on all of them. The check time on my machine is not super large either, at about 3 minutes for a table size of 3. --- tlaplus/ghostferry.tla | 189 ++++++++++-------- ...ghostferry___GhostferryPrimaryModel.launch | 9 +- 2 files changed, 108 insertions(+), 90 deletions(-) diff --git a/tlaplus/ghostferry.tla b/tlaplus/ghostferry.tla index 0eb5a8a8a..02b139622 100644 --- a/tlaplus/ghostferry.tla +++ b/tlaplus/ghostferry.tla @@ -187,12 +187,12 @@ The example table will also have a TableCapacity of 3 and a CurrentMaxPrimaryKey of 2. The CurrentMaxPrimaryKey can be increased when the third element is updated to an element \in Records. ***************************************************************************) -CONSTANT InitialTable -CONSTANT MaxPrimaryKey +CONSTANT TableCapacity (*************************************************************************** Records defines the set of possible records that can be written to the -database. +database. The set of model values can be symmetrical because the starting order +does not matter. Example: {r0, r1} @@ -210,28 +210,34 @@ different processes running. ***************************************************************************) CONSTANTS TableIterator, BinlogStreamer, Application, Ferry -\* Maximum table capacity -TableCapacity == Len(InitialTable) +\* A set of possible records for TypeOK +PossibleRecords == Records \cup {NoRecordHere} + +InitialTables == [1..TableCapacity -> PossibleRecords] \* The set of all possible primary key PrimaryKeys == 1..TableCapacity -\* A set of possible records for TypeOK -PossibleRecords == Records \cup {NoRecordHere} - (*************************************************************************** --algorithm ghostferry { variables - \* CurrentMaxPrimaryKey indicates the length of the table as opposed to the - \* capacity of the table. - CurrentMaxPrimaryKey = MaxPrimaryKey, \* The source table is initialized with the given InitialTable. \* The target table is initialized with the same capacity but has no records \* associated with any of the primary key (in the real world: no rows exists) - SourceTable = InitialTable, + SourceTable \in InitialTables, TargetTable = [k \in PrimaryKeys |-> NoRecordHere], + \* CurrentMaxPrimaryKey indicates the length of the table as opposed to the + \* capacity of the table. + MaxPrimaryKey = IF SourceTable[TableCapacity] # NoRecordHere + THEN TableCapacity + ELSE + IF \A i \in DOMAIN SourceTable : SourceTable[i] = NoRecordHere + THEN 0 + ELSE (CHOOSE i \in DOMAIN SourceTable : \E j \in DOMAIN SourceTable : SourceTable[i] # NoRecordHere /\ SourceTable[j] = NoRecordHere /\ i = j - 1), + CurrentMaxPrimaryKey = MaxPrimaryKey, + \* The binlogs are modeled as a list of binlog events. \* The size of the binlog is constrainted to MaxBinlogSize via \* ActionConstraint. This serves to ensure that the binlog do not increase @@ -388,22 +394,30 @@ PossibleRecords == Records \cup {NoRecordHere} ***************************************************************************) \* BEGIN TRANSLATION CONSTANT defaultInitValue -VARIABLES CurrentMaxPrimaryKey, SourceTable, TargetTable, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, - pc, lastSuccessfulPK, currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, chosenPK - -vars == << CurrentMaxPrimaryKey, SourceTable, TargetTable, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, - pc, lastSuccessfulPK, currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, chosenPK >> +VARIABLES SourceTable, TargetTable, MaxPrimaryKey, CurrentMaxPrimaryKey, + SourceBinlog, ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, pc, lastSuccessfulPK, currentRow, + lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, + chosenPK + +vars == << SourceTable, TargetTable, MaxPrimaryKey, CurrentMaxPrimaryKey, + SourceBinlog, ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, pc, lastSuccessfulPK, currentRow, + lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, + chosenPK >> ProcSet == {TableIterator} \cup {BinlogStreamer} \cup {Application} \cup {Ferry} Init == (* Global variables *) - /\ CurrentMaxPrimaryKey = MaxPrimaryKey - /\ SourceTable = InitialTable + /\ SourceTable \in InitialTables /\ TargetTable = [k \in PrimaryKeys |-> NoRecordHere] + /\ MaxPrimaryKey = (IF SourceTable[TableCapacity] # NoRecordHere + THEN TableCapacity + ELSE + IF \A i \in DOMAIN SourceTable : SourceTable[i] = NoRecordHere + THEN 0 + ELSE (CHOOSE i \in DOMAIN SourceTable : \E j \in DOMAIN SourceTable : SourceTable[i] # NoRecordHere /\ SourceTable[j] = NoRecordHere /\ i = j - 1)) + /\ CurrentMaxPrimaryKey = MaxPrimaryKey /\ SourceBinlog = <<>> /\ ApplicationReadonly = FALSE /\ TargetBinlogPos = 0 @@ -427,12 +441,13 @@ tblit_loop == /\ pc[TableIterator] = "tblit_loop" /\ IF lastSuccessfulPK < MaxPrimaryKey THEN /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_rw"] ELSE /\ pc' = [pc EXCEPT ![TableIterator] = "Done"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, lastSuccessfulPK, + currentRow, lastSuccessfulBinlogPos, + currentBinlogEntry, oldRecord, newRecord, + chosenPK >> tblit_rw == /\ pc[TableIterator] = "tblit_rw" /\ currentRow' = SourceTable[lastSuccessfulPK + 1] @@ -441,8 +456,8 @@ tblit_rw == /\ pc[TableIterator] = "tblit_rw" ELSE /\ TRUE /\ UNCHANGED TargetTable /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_upkey"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, + /\ UNCHANGED << SourceTable, MaxPrimaryKey, CurrentMaxPrimaryKey, + SourceBinlog, ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, lastSuccessfulPK, lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, chosenPK >> @@ -450,12 +465,12 @@ tblit_rw == /\ pc[TableIterator] = "tblit_rw" tblit_upkey == /\ pc[TableIterator] = "tblit_upkey" /\ lastSuccessfulPK' = lastSuccessfulPK + 1 /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_loop"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, currentRow, + lastSuccessfulBinlogPos, currentBinlogEntry, + oldRecord, newRecord, chosenPK >> ProcTableIterator == tblit_loop \/ tblit_rw \/ tblit_upkey @@ -463,12 +478,13 @@ binlog_loop == /\ pc[BinlogStreamer] = "binlog_loop" /\ IF BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ lastSuccessfulBinlogPos < TargetBinlogPos) THEN /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_read"] ELSE /\ pc' = [pc EXCEPT ![BinlogStreamer] = "Done"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, lastSuccessfulPK, + currentRow, lastSuccessfulBinlogPos, + currentBinlogEntry, oldRecord, newRecord, + chosenPK >> binlog_read == /\ pc[BinlogStreamer] = "binlog_read" /\ IF lastSuccessfulBinlogPos < Len(SourceBinlog) @@ -476,12 +492,12 @@ binlog_read == /\ pc[BinlogStreamer] = "binlog_read" /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_write"] ELSE /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] /\ UNCHANGED currentBinlogEntry - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, lastSuccessfulPK, + currentRow, lastSuccessfulBinlogPos, oldRecord, + newRecord, chosenPK >> binlog_write == /\ pc[BinlogStreamer] = "binlog_write" /\ IF TargetTable[currentBinlogEntry.pk] = currentBinlogEntry.oldr @@ -489,22 +505,23 @@ binlog_write == /\ pc[BinlogStreamer] = "binlog_write" ELSE /\ TRUE /\ UNCHANGED TargetTable /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_upkey"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, - SourceBinlog, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << SourceTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, lastSuccessfulPK, + currentRow, lastSuccessfulBinlogPos, + currentBinlogEntry, oldRecord, newRecord, + chosenPK >> binlog_upkey == /\ pc[BinlogStreamer] = "binlog_upkey" /\ lastSuccessfulBinlogPos' = lastSuccessfulBinlogPos + 1 /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK >> ProcBinlogStreamer == binlog_loop \/ binlog_read \/ binlog_write \/ binlog_upkey @@ -513,8 +530,9 @@ app_loop == /\ pc[Application] = "app_loop" /\ IF ApplicationReadonly = FALSE THEN /\ pc' = [pc EXCEPT ![Application] = "app_write"] ELSE /\ pc' = [pc EXCEPT ![Application] = "Done"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, ApplicationReadonly, TargetBinlogPos, + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, lastSuccessfulPK, currentRow, lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, chosenPK >> @@ -536,15 +554,15 @@ app_write == /\ pc[Application] = "app_write" /\ SourceTable' = [SourceTable EXCEPT ![chosenPK'] = newRecord'] /\ IF oldRecord' = NoRecordHere /\ chosenPK' > CurrentMaxPrimaryKey THEN /\ Assert((chosenPK' - 1 = CurrentMaxPrimaryKey), - "Failure of assertion at line 230, column 21.") + "Failure of assertion at line 365, column 21.") /\ CurrentMaxPrimaryKey' = chosenPK' ELSE /\ TRUE /\ UNCHANGED CurrentMaxPrimaryKey /\ pc' = [pc EXCEPT ![Application] = "app_loop"] - /\ UNCHANGED << TargetTable, ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry >> + /\ UNCHANGED << TargetTable, MaxPrimaryKey, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + lastSuccessfulPK, currentRow, + lastSuccessfulBinlogPos, currentBinlogEntry >> ProcApplication == app_loop \/ app_write @@ -552,28 +570,29 @@ ferry_setro == /\ pc[Ferry] = "ferry_setro" /\ pc[TableIterator] = "Done" /\ ApplicationReadonly' = TRUE /\ pc' = [pc EXCEPT ![Ferry] = "ferry_waitro"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + TargetBinlogPos, BinlogStreamingStopRequested, + lastSuccessfulPK, currentRow, + lastSuccessfulBinlogPos, currentBinlogEntry, + oldRecord, newRecord, chosenPK >> ferry_waitro == /\ pc[Ferry] = "ferry_waitro" /\ pc[Application] = "Done" /\ pc' = [pc EXCEPT ![Ferry] = "ferry_binlogpos"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, TargetTable, - SourceBinlog, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, lastSuccessfulPK, + currentRow, lastSuccessfulBinlogPos, + currentBinlogEntry, oldRecord, newRecord, + chosenPK >> ferry_binlogpos == /\ pc[Ferry] = "ferry_binlogpos" /\ TargetBinlogPos' = Len(SourceBinlog) /\ pc' = [pc EXCEPT ![Ferry] = "ferry_binlogstop"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, - TargetTable, SourceBinlog, + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, ApplicationReadonly, BinlogStreamingStopRequested, lastSuccessfulPK, currentRow, @@ -583,8 +602,8 @@ ferry_binlogpos == /\ pc[Ferry] = "ferry_binlogpos" ferry_binlogstop == /\ pc[Ferry] = "ferry_binlogstop" /\ BinlogStreamingStopRequested' = TRUE /\ pc' = [pc EXCEPT ![Ferry] = "Done"] - /\ UNCHANGED << CurrentMaxPrimaryKey, SourceTable, - TargetTable, SourceBinlog, + /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, + CurrentMaxPrimaryKey, SourceBinlog, ApplicationReadonly, TargetBinlogPos, lastSuccessfulPK, currentRow, lastSuccessfulBinlogPos, @@ -628,5 +647,5 @@ BinlogSizeActionConstraint == Len(SourceBinlog) <= MaxBinlogSize ============================================================================= \* Modification History -\* Last modified Mon Jun 25 14:04:50 EDT 2018 by shuhao +\* Last modified Mon Jun 25 16:25:19 EDT 2018 by shuhao \* Created Thu Jan 18 11:35:40 EST 2018 by shuhao diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch index a5ec0b410..28a4afc69 100644 --- a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch @@ -19,28 +19,27 @@ - + - + - - + - + From bd4ff7c6a38567c0fee677db816cf041f650a22b Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Mon, 25 Jun 2018 18:36:05 -0400 Subject: [PATCH 3/3] Reconciliation algorithm for Ghostferry This adds the ability to model reconciliation in Ghostferry. --- tlaplus/ghostferry.tla | 625 +++++++++++++----- ...ghostferry___GhostferryPrimaryModel.launch | 9 +- ...ryWithOverCopiedReconciliationModel.launch | 61 ++ ...__GhostferryWithReconciliationModel.launch | 61 ++ 4 files changed, 594 insertions(+), 162 deletions(-) create mode 100644 tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithOverCopiedReconciliationModel.launch create mode 100644 tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithReconciliationModel.launch diff --git a/tlaplus/ghostferry.tla b/tlaplus/ghostferry.tla index 02b139622..3ed22fa2d 100644 --- a/tlaplus/ghostferry.tla +++ b/tlaplus/ghostferry.tla @@ -165,12 +165,13 @@ ***************************************************************************) -EXTENDS Integers, Sequences, TLC +EXTENDS Integers, Sequences, TLC, FiniteSets \* Helper Methods \* ============== SetMin(S) == CHOOSE i \in S: \A j \in S : i <= j +Range(S) == { S[x] : x \in DOMAIN S } \* Constant Declarations \* ===================== @@ -189,6 +190,45 @@ updated to an element \in Records. ***************************************************************************) CONSTANT TableCapacity +(*************************************************************************** +In the case where Ghostferry is restarted, we need to start at a binlog +position that is not the current position. Since the model does not store past +binlog events, we cannot model it that way. Instead, we simply generate +some binlog entries (and the same number of changes on the SourceTable). + +Another couple of ways to think about this parameter: + +- Number of changes that occured while Ghostferry is down before being resumed. +- The current master position of the source database at the moment Ghostferry + starts, while the last successful binlog position would be 0. + +Example: 0 if not running the reconciliation step. + 2, 3, some small-ish number if running the reconciliation step. + ***************************************************************************) +CONSTANT InitialTargetBinlogPos + +(*************************************************************************** +In the case where Ghostferry is restarted, we can restart at a particular +location saved from previously. This parameter does exactly that. This number +should be 0 if you don't care about this step. + +Example: 1 + ***************************************************************************) +CONSTANT LastSuccessfulPrimaryKey + +(*************************************************************************** +The number of rows/binlogs that have already been copied to the target +database. This is to emulate a case where Ghostferry is shutdown but the saved +cursor and binlog positions are out of date. + +Note that RowsOvercopied must be lower than TableCapacity - +LastSuccessfulPrimaryKey. BinlogOvercopied must be smaller than MaxBinlogSize. + +Example: 1 + ***************************************************************************) +CONSTANT RowsOvercopied +CONSTANT BinlogOvercopied + (*************************************************************************** Records defines the set of possible records that can be written to the database. The set of model values can be symmetrical because the starting order @@ -213,30 +253,34 @@ CONSTANTS TableIterator, BinlogStreamer, Application, Ferry \* A set of possible records for TypeOK PossibleRecords == Records \cup {NoRecordHere} +\* A set of tables up to TableCapacity, but have different entries in them. InitialTables == [1..TableCapacity -> PossibleRecords] \* The set of all possible primary key PrimaryKeys == 1..TableCapacity +\* This generates all the permutations of the rows that are changed. The number +\* of changes must equal InitialTargetBinlogSize. +RowsChanged == {key \in [1..InitialTargetBinlogPos -> 1..TableCapacity] : Cardinality(Range(key)) = InitialTargetBinlogPos} + +ComputeMaxPrimaryKey(table, capacity) == IF table[capacity] # NoRecordHere + THEN capacity + ELSE + IF \A i \in DOMAIN table : table[i] = NoRecordHere + THEN 0 + ELSE (CHOOSE i \in DOMAIN table : \E j \in DOMAIN table : table[i] # NoRecordHere /\ table[j] = NoRecordHere /\ i = j - 1) + (*************************************************************************** --algorithm ghostferry { variables - - \* The source table is initialized with the given InitialTable. - \* The target table is initialized with the same capacity but has no records - \* associated with any of the primary key (in the real world: no rows exists) - SourceTable \in InitialTables, - TargetTable = [k \in PrimaryKeys |-> NoRecordHere], - - \* CurrentMaxPrimaryKey indicates the length of the table as opposed to the - \* capacity of the table. - MaxPrimaryKey = IF SourceTable[TableCapacity] # NoRecordHere - THEN TableCapacity - ELSE - IF \A i \in DOMAIN SourceTable : SourceTable[i] = NoRecordHere - THEN 0 - ELSE (CHOOSE i \in DOMAIN SourceTable : \E j \in DOMAIN SourceTable : SourceTable[i] # NoRecordHere /\ SourceTable[j] = NoRecordHere /\ i = j - 1), - CurrentMaxPrimaryKey = MaxPrimaryKey, + \* We can pick an InitialTable from the permutations of possible tables. + \* The target table is initialized with the same capacity but has no records, + \* unless LastSuccessfulPrimaryKey is set to >0, in which case the record + \* is set to equal the ones found on the SourceTable as we have copied it + \* previously. + InitialTable \in InitialTables, + SourceTable = InitialTable, + TargetTable = [k \in PrimaryKeys |-> IF k <= LastSuccessfulPrimaryKey + RowsOvercopied THEN SourceTable[k] ELSE NoRecordHere], \* The binlogs are modeled as a list of binlog events. \* The size of the binlog is constrainted to MaxBinlogSize via @@ -257,12 +301,29 @@ PrimaryKeys == 1..TableCapacity \* This is set to TRUE to stop all components of Ghostferry and Ghostferry \* should terminate after finishing streaming all the binlog events. - BinlogStreamingStopRequested = FALSE; + BinlogStreamingStopRequested = FALSE, + + ChangesApplied = FALSE, + FerryThreadsStarted = FALSE, + + \* In the actual application, we have to set this as an argument. In the + \* model, it is always 0. + LastSuccessfulBinlogPos = 0, + + \* MaxPrimaryKey, initialized so the application model can use it to not + \* create holes in the source database. + MaxPrimaryKey = ComputeMaxPrimaryKey(SourceTable, TableCapacity), + CurrentMaxPrimaryKey = MaxPrimaryKey; + + macro DetermineMaxPrimaryKey() { + MaxPrimaryKey := ComputeMaxPrimaryKey(SourceTable, TableCapacity); + CurrentMaxPrimaryKey := MaxPrimaryKey; + }; fair process (ProcTableIterator = TableIterator) variables - lastSuccessfulPK = 0, \* Last PK successfully applied to the target db. - currentRow; \* The current row's data + lastSuccessfulPK = LastSuccessfulPrimaryKey, \* Last PK successfully applied to the target db. + currentRow; \* The current row's data { \* Note that tblit_rw is an atomic step. If the read and write steps are \* two distinct steps, this could cause a race condition that will cause @@ -281,6 +342,8 @@ PrimaryKeys == 1..TableCapacity \* It may be possible to perform some sort of locking via the Application, \* but this seems cumbersome and prone to implementation level error. \* TODO: model this with TLA+ and validate its correctness. + tblit_wait: await FerryThreadsStarted = TRUE; + DetermineMaxPrimaryKey(); tblit_loop: while (lastSuccessfulPK < MaxPrimaryKey) { tblit_rw: currentRow := SourceTable[lastSuccessfulPK + 1]; if (currentRow # NoRecordHere) { @@ -292,22 +355,22 @@ PrimaryKeys == 1..TableCapacity fair process (ProcBinlogStreamer = BinlogStreamer) variables - lastSuccessfulBinlogPos = 0, \* Last binlog pos successfully applied on the target db currentBinlogEntry; \* The binlog event that is currently being read { - binlog_loop: while (BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ lastSuccessfulBinlogPos < TargetBinlogPos)) { + binlog_wait: await FerryThreadsStarted = TRUE; + binlog_loop: while (BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ LastSuccessfulBinlogPos < TargetBinlogPos)) { \* We cannot use an await as there could be a deadlock for \* when the application is set to read only and thus nothing \* else writes to the database. \* \* This also means in the real implementation we need a \* non-blocking read for the binlog. - binlog_read: if (lastSuccessfulBinlogPos < Len(SourceBinlog)) { - currentBinlogEntry := SourceBinlog[lastSuccessfulBinlogPos + 1]; + binlog_read: if (LastSuccessfulBinlogPos < Len(SourceBinlog)) { + currentBinlogEntry := SourceBinlog[LastSuccessfulBinlogPos + 1]; binlog_write: if (TargetTable[currentBinlogEntry.pk] = currentBinlogEntry.oldr) { TargetTable[currentBinlogEntry.pk] := currentBinlogEntry.newr; }; - binlog_upkey: lastSuccessfulBinlogPos := lastSuccessfulBinlogPos + 1; + binlog_upkey: LastSuccessfulBinlogPos := LastSuccessfulBinlogPos + 1; }; } } @@ -330,8 +393,11 @@ PrimaryKeys == 1..TableCapacity variables oldRecord, newRecord, - chosenPK, + chosenPK; { + app_wait: if (InitialTargetBinlogPos > 0) { + await ChangesApplied = TRUE; + }; app_loop: while (ApplicationReadonly = FALSE) { \* Choose a "random" PK to update. app_write: with (pk \in 1..SetMin({TableCapacity, CurrentMaxPrimaryKey + 1})) { @@ -363,26 +429,69 @@ PrimaryKeys == 1..TableCapacity if (oldRecord = NoRecordHere /\ chosenPK > CurrentMaxPrimaryKey) { assert (chosenPK - 1 = CurrentMaxPrimaryKey); CurrentMaxPrimaryKey := chosenPK; - } + }; } } (*********************************************************************** - In the actual code, the Ferry class would have started all of the above. - This is unnecessary here as it is done via the Next definition in TLA+ - (automatically generated from PlusCal). Thus, the Ferry here is really - the Ferry that each application must implement: + In the model, the ferry code actually performs the changes while + Ghostferry is down, before running the reconciliation. This is obviously + not needed in the real code. - 1. Waiting until the DataITerator is finished copying data. - 2. Perform the cutover operation (setting the source to be read only). - 3. Instruct the BinlogStreamer to quit after streaming. + Each apllication only needs to implement these steps: + + 1. Perform reconciliation if applicable. + 2. Start all threads/goroutines. + 3. Waiting until the DataIterator is finished copying data. + 4. Perform the cutover operation (setting the source to be read only). + 5. Instruct the BinlogStreamer to quit after streaming. Note that setting the target binlog position and requesting binlog streaming to stop are two distinct steps. Making them one atomic step is not realistic unless we implement a lock. With two distinct steps, if the steps are reversed, a race condition will be present. ***********************************************************************) - fair process (ProcFerry = Ferry) { + fair process (ProcFerry = Ferry) + variables + currentChangeToPerform = 1, + changedRowIds; + { + ferry_reconcile: if (InitialTargetBinlogPos > 0) { + ferry_pickchanges: with (rowIds \in RowsChanged) { + changedRowIds := rowIds; + }; + + ferry_runchanges: while (currentChangeToPerform <= InitialTargetBinlogPos) { + with (pk = changedRowIds[currentChangeToPerform]) { + with (r \in PossibleRecords \ {SourceTable[pk]}) { + SourceBinlog := Append( + SourceBinlog, + [ + pk |-> pk, + oldr |-> SourceTable[pk], + newr |-> r + ] + ); + + SourceTable[pk] := r; + if (currentChangeToPerform <= BinlogOvercopied) { + TargetTable[pk] := r; + }; + currentChangeToPerform := currentChangeToPerform + 1; + }; + }; + }; + ChangesApplied := TRUE; \* Start the application inserting now + + ferry_reconc_loop: while (LastSuccessfulBinlogPos < InitialTargetBinlogPos) { + LastSuccessfulBinlogPos := LastSuccessfulBinlogPos + 1; + ferry_reconc_del: TargetTable[SourceBinlog[LastSuccessfulBinlogPos].pk] := NoRecordHere; + ferry_reconc_ins: if (SourceBinlog[LastSuccessfulBinlogPos].pk <= LastSuccessfulPrimaryKey) { + TargetTable[SourceBinlog[LastSuccessfulBinlogPos].pk] := SourceTable[SourceBinlog[LastSuccessfulBinlogPos].pk]; + }; + }; + }; + ferry_start: FerryThreadsStarted := TRUE; ferry_setro: await pc[TableIterator] = "Done"; ApplicationReadonly := TRUE; ferry_waitro: await pc[Application] = "Done"; @@ -394,60 +503,79 @@ PrimaryKeys == 1..TableCapacity ***************************************************************************) \* BEGIN TRANSLATION CONSTANT defaultInitValue -VARIABLES SourceTable, TargetTable, MaxPrimaryKey, CurrentMaxPrimaryKey, - SourceBinlog, ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, pc, lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, - chosenPK - -vars == << SourceTable, TargetTable, MaxPrimaryKey, CurrentMaxPrimaryKey, - SourceBinlog, ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, pc, lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, oldRecord, newRecord, - chosenPK >> +VARIABLES InitialTable, SourceTable, TargetTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, pc, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds + +vars == << InitialTable, SourceTable, TargetTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, pc, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> ProcSet == {TableIterator} \cup {BinlogStreamer} \cup {Application} \cup {Ferry} Init == (* Global variables *) - /\ SourceTable \in InitialTables - /\ TargetTable = [k \in PrimaryKeys |-> NoRecordHere] - /\ MaxPrimaryKey = (IF SourceTable[TableCapacity] # NoRecordHere - THEN TableCapacity - ELSE - IF \A i \in DOMAIN SourceTable : SourceTable[i] = NoRecordHere - THEN 0 - ELSE (CHOOSE i \in DOMAIN SourceTable : \E j \in DOMAIN SourceTable : SourceTable[i] # NoRecordHere /\ SourceTable[j] = NoRecordHere /\ i = j - 1)) - /\ CurrentMaxPrimaryKey = MaxPrimaryKey + /\ InitialTable \in InitialTables + /\ SourceTable = InitialTable + /\ TargetTable = [k \in PrimaryKeys |-> IF k <= LastSuccessfulPrimaryKey + RowsOvercopied THEN SourceTable[k] ELSE NoRecordHere] /\ SourceBinlog = <<>> /\ ApplicationReadonly = FALSE /\ TargetBinlogPos = 0 /\ BinlogStreamingStopRequested = FALSE + /\ ChangesApplied = FALSE + /\ FerryThreadsStarted = FALSE + /\ LastSuccessfulBinlogPos = 0 + /\ MaxPrimaryKey = ComputeMaxPrimaryKey(SourceTable, TableCapacity) + /\ CurrentMaxPrimaryKey = MaxPrimaryKey (* Process ProcTableIterator *) - /\ lastSuccessfulPK = 0 + /\ lastSuccessfulPK = LastSuccessfulPrimaryKey /\ currentRow = defaultInitValue (* Process ProcBinlogStreamer *) - /\ lastSuccessfulBinlogPos = 0 /\ currentBinlogEntry = defaultInitValue (* Process ProcApplication *) /\ oldRecord = defaultInitValue /\ newRecord = defaultInitValue /\ chosenPK = defaultInitValue - /\ pc = [self \in ProcSet |-> CASE self = TableIterator -> "tblit_loop" - [] self = BinlogStreamer -> "binlog_loop" - [] self = Application -> "app_loop" - [] self = Ferry -> "ferry_setro"] + (* Process ProcFerry *) + /\ currentChangeToPerform = 1 + /\ changedRowIds = defaultInitValue + /\ pc = [self \in ProcSet |-> CASE self = TableIterator -> "tblit_wait" + [] self = BinlogStreamer -> "binlog_wait" + [] self = Application -> "app_wait" + [] self = Ferry -> "ferry_reconcile"] + +tblit_wait == /\ pc[TableIterator] = "tblit_wait" + /\ FerryThreadsStarted = TRUE + /\ MaxPrimaryKey' = ComputeMaxPrimaryKey(SourceTable, TableCapacity) + /\ CurrentMaxPrimaryKey' = MaxPrimaryKey' + /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_loop"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> tblit_loop == /\ pc[TableIterator] = "tblit_loop" /\ IF lastSuccessfulPK < MaxPrimaryKey THEN /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_rw"] ELSE /\ pc' = [pc EXCEPT ![TableIterator] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> tblit_rw == /\ pc[TableIterator] = "tblit_rw" /\ currentRow' = SourceTable[lastSuccessfulPK + 1] @@ -456,48 +584,70 @@ tblit_rw == /\ pc[TableIterator] = "tblit_rw" ELSE /\ TRUE /\ UNCHANGED TargetTable /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_upkey"] - /\ UNCHANGED << SourceTable, MaxPrimaryKey, CurrentMaxPrimaryKey, - SourceBinlog, ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> tblit_upkey == /\ pc[TableIterator] = "tblit_upkey" /\ lastSuccessfulPK' = lastSuccessfulPK + 1 /\ pc' = [pc EXCEPT ![TableIterator] = "tblit_loop"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> - -ProcTableIterator == tblit_loop \/ tblit_rw \/ tblit_upkey + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, changedRowIds >> + +ProcTableIterator == tblit_wait \/ tblit_loop \/ tblit_rw \/ tblit_upkey + +binlog_wait == /\ pc[BinlogStreamer] = "binlog_wait" + /\ FerryThreadsStarted = TRUE + /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> binlog_loop == /\ pc[BinlogStreamer] = "binlog_loop" - /\ IF BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ lastSuccessfulBinlogPos < TargetBinlogPos) + /\ IF BinlogStreamingStopRequested = FALSE \/ (BinlogStreamingStopRequested = TRUE /\ LastSuccessfulBinlogPos < TargetBinlogPos) THEN /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_read"] ELSE /\ pc' = [pc EXCEPT ![BinlogStreamer] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> binlog_read == /\ pc[BinlogStreamer] = "binlog_read" - /\ IF lastSuccessfulBinlogPos < Len(SourceBinlog) - THEN /\ currentBinlogEntry' = SourceBinlog[lastSuccessfulBinlogPos + 1] + /\ IF LastSuccessfulBinlogPos < Len(SourceBinlog) + THEN /\ currentBinlogEntry' = SourceBinlog[LastSuccessfulBinlogPos + 1] /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_write"] ELSE /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] /\ UNCHANGED currentBinlogEntry - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, oldRecord, - newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> binlog_write == /\ pc[BinlogStreamer] = "binlog_write" /\ IF TargetTable[currentBinlogEntry.pk] = currentBinlogEntry.oldr @@ -505,37 +655,58 @@ binlog_write == /\ pc[BinlogStreamer] = "binlog_write" ELSE /\ TRUE /\ UNCHANGED TargetTable /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_upkey"] - /\ UNCHANGED << SourceTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, + changedRowIds >> binlog_upkey == /\ pc[BinlogStreamer] = "binlog_upkey" - /\ lastSuccessfulBinlogPos' = lastSuccessfulBinlogPos + 1 + /\ LastSuccessfulBinlogPos' = LastSuccessfulBinlogPos + 1 /\ pc' = [pc EXCEPT ![BinlogStreamer] = "binlog_loop"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, currentBinlogEntry, oldRecord, - newRecord, chosenPK >> - -ProcBinlogStreamer == binlog_loop \/ binlog_read \/ binlog_write - \/ binlog_upkey + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, + changedRowIds >> + +ProcBinlogStreamer == binlog_wait \/ binlog_loop \/ binlog_read + \/ binlog_write \/ binlog_upkey + +app_wait == /\ pc[Application] = "app_wait" + /\ IF InitialTargetBinlogPos > 0 + THEN /\ ChangesApplied = TRUE + ELSE /\ TRUE + /\ pc' = [pc EXCEPT ![Application] = "app_loop"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, currentBinlogEntry, + oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> app_loop == /\ pc[Application] = "app_loop" /\ IF ApplicationReadonly = FALSE THEN /\ pc' = [pc EXCEPT ![Application] = "app_write"] ELSE /\ pc' = [pc EXCEPT ![Application] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, currentBinlogEntry, + oldRecord, newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> app_write == /\ pc[Application] = "app_write" /\ \E pk \in 1..SetMin({TableCapacity, CurrentMaxPrimaryKey + 1}): @@ -553,65 +724,199 @@ app_write == /\ pc[Application] = "app_write" ) /\ SourceTable' = [SourceTable EXCEPT ![chosenPK'] = newRecord'] /\ IF oldRecord' = NoRecordHere /\ chosenPK' > CurrentMaxPrimaryKey - THEN /\ Assert((chosenPK' - 1 = CurrentMaxPrimaryKey), - "Failure of assertion at line 365, column 21.") + THEN /\ Assert((chosenPK' - 1 = CurrentMaxPrimaryKey), + "Failure of assertion at line 430, column 21.") /\ CurrentMaxPrimaryKey' = chosenPK' ELSE /\ TRUE /\ UNCHANGED CurrentMaxPrimaryKey /\ pc' = [pc EXCEPT ![Application] = "app_loop"] - /\ UNCHANGED << TargetTable, MaxPrimaryKey, ApplicationReadonly, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry >> - -ProcApplication == app_loop \/ app_write + /\ UNCHANGED << InitialTable, TargetTable, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + lastSuccessfulPK, currentRow, currentBinlogEntry, + currentChangeToPerform, changedRowIds >> + +ProcApplication == app_wait \/ app_loop \/ app_write + +ferry_reconcile == /\ pc[Ferry] = "ferry_reconcile" + /\ IF InitialTargetBinlogPos > 0 + THEN /\ pc' = [pc EXCEPT ![Ferry] = "ferry_pickchanges"] + ELSE /\ pc' = [pc EXCEPT ![Ferry] = "ferry_start"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> + +ferry_pickchanges == /\ pc[Ferry] = "ferry_pickchanges" + /\ \E rowIds \in RowsChanged: + changedRowIds' = rowIds + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_runchanges"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform >> + +ferry_runchanges == /\ pc[Ferry] = "ferry_runchanges" + /\ IF currentChangeToPerform <= InitialTargetBinlogPos + THEN /\ LET pk == changedRowIds[currentChangeToPerform] IN + \E r \in PossibleRecords \ {SourceTable[pk]}: + /\ SourceBinlog' = Append( + SourceBinlog, + [ + pk |-> pk, + oldr |-> SourceTable[pk], + newr |-> r + ] + ) + /\ SourceTable' = [SourceTable EXCEPT ![pk] = r] + /\ IF currentChangeToPerform <= BinlogOvercopied + THEN /\ TargetTable' = [TargetTable EXCEPT ![pk] = r] + ELSE /\ TRUE + /\ UNCHANGED TargetTable + /\ currentChangeToPerform' = currentChangeToPerform + 1 + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_runchanges"] + /\ UNCHANGED ChangesApplied + ELSE /\ ChangesApplied' = TRUE + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_loop"] + /\ UNCHANGED << SourceTable, TargetTable, + SourceBinlog, + currentChangeToPerform >> + /\ UNCHANGED << InitialTable, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, changedRowIds >> + +ferry_reconc_loop == /\ pc[Ferry] = "ferry_reconc_loop" + /\ IF LastSuccessfulBinlogPos < InitialTargetBinlogPos + THEN /\ LastSuccessfulBinlogPos' = LastSuccessfulBinlogPos + 1 + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_del"] + ELSE /\ pc' = [pc EXCEPT ![Ferry] = "ferry_start"] + /\ UNCHANGED LastSuccessfulBinlogPos + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, + changedRowIds >> + +ferry_reconc_del == /\ pc[Ferry] = "ferry_reconc_del" + /\ TargetTable' = [TargetTable EXCEPT ![SourceBinlog[LastSuccessfulBinlogPos].pk] = NoRecordHere] + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_ins"] + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> + +ferry_reconc_ins == /\ pc[Ferry] = "ferry_reconc_ins" + /\ IF SourceBinlog[LastSuccessfulBinlogPos].pk <= LastSuccessfulPrimaryKey + THEN /\ TargetTable' = [TargetTable EXCEPT ![SourceBinlog[LastSuccessfulBinlogPos].pk] = SourceTable[SourceBinlog[LastSuccessfulBinlogPos].pk]] + ELSE /\ TRUE + /\ UNCHANGED TargetTable + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_reconc_loop"] + /\ UNCHANGED << InitialTable, SourceTable, SourceBinlog, + ApplicationReadonly, TargetBinlogPos, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> + +ferry_start == /\ pc[Ferry] = "ferry_start" + /\ FerryThreadsStarted' = TRUE + /\ pc' = [pc EXCEPT ![Ferry] = "ferry_setro"] + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, changedRowIds >> ferry_setro == /\ pc[Ferry] = "ferry_setro" /\ pc[TableIterator] = "Done" /\ ApplicationReadonly' = TRUE /\ pc' = [pc EXCEPT ![Ferry] = "ferry_waitro"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - TargetBinlogPos, BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, TargetBinlogPos, + BinlogStreamingStopRequested, ChangesApplied, + FerryThreadsStarted, LastSuccessfulBinlogPos, + MaxPrimaryKey, CurrentMaxPrimaryKey, + lastSuccessfulPK, currentRow, + currentBinlogEntry, oldRecord, newRecord, + chosenPK, currentChangeToPerform, changedRowIds >> ferry_waitro == /\ pc[Ferry] = "ferry_waitro" /\ pc[Application] = "Done" /\ pc' = [pc EXCEPT ![Ferry] = "ferry_binlogpos"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - BinlogStreamingStopRequested, lastSuccessfulPK, - currentRow, lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> ferry_binlogpos == /\ pc[Ferry] = "ferry_binlogpos" /\ TargetBinlogPos' = Len(SourceBinlog) /\ pc' = [pc EXCEPT ![Ferry] = "ferry_binlogstop"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, - BinlogStreamingStopRequested, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, currentBinlogEntry, - oldRecord, newRecord, chosenPK >> + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + BinlogStreamingStopRequested, + ChangesApplied, FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, currentChangeToPerform, + changedRowIds >> ferry_binlogstop == /\ pc[Ferry] = "ferry_binlogstop" /\ BinlogStreamingStopRequested' = TRUE /\ pc' = [pc EXCEPT ![Ferry] = "Done"] - /\ UNCHANGED << SourceTable, TargetTable, MaxPrimaryKey, - CurrentMaxPrimaryKey, SourceBinlog, - ApplicationReadonly, TargetBinlogPos, - lastSuccessfulPK, currentRow, - lastSuccessfulBinlogPos, - currentBinlogEntry, oldRecord, newRecord, - chosenPK >> - -ProcFerry == ferry_setro \/ ferry_waitro \/ ferry_binlogpos - \/ ferry_binlogstop + /\ UNCHANGED << InitialTable, SourceTable, TargetTable, + SourceBinlog, ApplicationReadonly, + TargetBinlogPos, ChangesApplied, + FerryThreadsStarted, + LastSuccessfulBinlogPos, MaxPrimaryKey, + CurrentMaxPrimaryKey, lastSuccessfulPK, + currentRow, currentBinlogEntry, oldRecord, + newRecord, chosenPK, + currentChangeToPerform, changedRowIds >> + +ProcFerry == ferry_reconcile \/ ferry_pickchanges \/ ferry_runchanges + \/ ferry_reconc_loop \/ ferry_reconc_del + \/ ferry_reconc_ins \/ ferry_start \/ ferry_setro + \/ ferry_waitro \/ ferry_binlogpos \/ ferry_binlogstop Next == ProcTableIterator \/ ProcBinlogStreamer \/ ProcApplication \/ ProcFerry @@ -631,7 +936,7 @@ Termination == <>(\A self \in ProcSet: pc[self] = "Done") \* Safety Constraints \* ================== -SourceTargetEquality == (\A self \in ProcSet: pc[self] = "Done") => (SourceTable = TargetTable) /\ PrintT(<<"Source", SourceTable, "Target", TargetTable>>) +SourceTargetEquality == (\A self \in ProcSet: pc[self] = "Done") => (SourceTable = TargetTable) \* Action Constraints \* ================== @@ -647,5 +952,5 @@ BinlogSizeActionConstraint == Len(SourceBinlog) <= MaxBinlogSize ============================================================================= \* Modification History -\* Last modified Mon Jun 25 16:25:19 EDT 2018 by shuhao +\* Last modified Mon Jul 09 10:43:17 EDT 2018 by shuhao \* Created Thu Jan 18 11:35:40 EST 2018 by shuhao diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch index 28a4afc69..1c0cca528 100644 --- a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryPrimaryModel.launch @@ -19,11 +19,12 @@ - - + + + @@ -40,6 +41,10 @@ + + + + diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithOverCopiedReconciliationModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithOverCopiedReconciliationModel.launch new file mode 100644 index 000000000..1d1448043 --- /dev/null +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithOverCopiedReconciliationModel.launch @@ -0,0 +1,61 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithReconciliationModel.launch b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithReconciliationModel.launch new file mode 100644 index 000000000..b08d2fba5 --- /dev/null +++ b/tlaplus/ghostferry.toolbox/ghostferry___GhostferryWithReconciliationModel.launch @@ -0,0 +1,61 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +