Skip to content

Latest commit

 

History

History
232 lines (175 loc) · 12.1 KB

ARCHITECTURE.md

File metadata and controls

232 lines (175 loc) · 12.1 KB

Architecture

Summary

Overview

ecChronos is built to be continuously repairing data in the background. Each ecChronos instance keeps track of the repairs of a single cassandra node. A lock table in cassandra makes sure that only a subset of repairs run at any one time. Repairs can be configured to run only during certain time periods or not at all. Settings for backpressure are provided to make sure repair is spread out over the interval time while alarms are provided to signal when a job has not run for longer than expected.

  graph TB
      c[ecChronos Instance]---C((Cassandra Node));

      a[ecChronos Instance]---A((Cassandra Node));

      A((Cassandra Node))===B((Cassandra Node));

      A((Cassandra Node))===C((Cassandra Node));

      C((Cassandra Node))===D((Cassandra Node))

      b[ecChronos Instance]---B((Cassandra Node))

      B((Cassandra Node))===D((Cassandra Node))

      D((Cassandra Node))---d[ecChronos Instance]
Loading
Figure 1: ecChronos and Cassandra Nodes.

Concepts

Leases

flowchart TB
  A[Create Lease] -->|Failed| b[Sleep]
  b[Sleep] --> A[Create Lease]
  A[Create Lease] -->|Succeeded| c[Get the Lock]
  c[Get the Lock] --> D[Periodically Renew Lease]
  D[Periodically Renew Lease] -->|Failed| A[Create Lease]
  D[Periodically Renew Lease] -->|Succeeded|D[Periodically Renew Lease]
Loading
Figure 2: Lease Typically Election.

In order to perform distributed scheduling ecChronos utilize two things deterministic priorities and distributed leases. Deterministic priorities means that all nodes use the same algorithm to decide how important the local work is. By having the priorities deterministic it is possible to compare the priority between nodes and get a fair scheduling. Each time a node wants to perform some work a lease needs to be acquired for the node and should typically go to the node with the highest priority. It bestows upon the node to conduct repair during a specific time frame. Within this duration, the node assesses the data, ensuring that all replicas are updated and consistent. Additionally, it helps prevent multiple nodes from concurrently initiating repairs on the same data, thereby mitigating potential consistency issues and cluster overload. Once the repair is completed by the node, the "lease" is released, enabling other nodes to request and carry out their own repairs as needed. It helps in efficiently distributing the repair load within the cluster [1].

The default implementation of leases in ecChronos is based on CAS (Compare-And-Set) with Apache Cassandra as backend. When the local node tries to obtain a lease it first announces its own priority and check what other nodes have announced. If the local node has the highest priority it will try to obtain the lease. The announcement is done to avoid node starvation and to try to promote the highest prioritized work in the cluster.

The leases are created with a TTL of 10 minutes to avoid locking in case of failure. As some jobs might take more than 10 minutes to run the lease is continuously updated every minute until the job finishes.

flowchart LR
    A(((Local Node))) --> a[DeclarePriority]
    B(((Other Node))) --> b[DeclarePriority]
    a[DeclarePriority] --> CheckOtherNodePriority
    b[DeclarePriority] --> CheckOtherNodePriority
    CheckOtherNodePriority --> ObtainTheLease
Loading
Figure 3: Compare-And-Set.

Scheduling flow

The scheduling in ecChronos is handled by the schedule manager. The schedule manager is responsible to keep track of the local work queue, check with run policies if a job should run and also to acquire the leases for the jobs before running them.

Scheduled jobs

The work a node needs to perform is split into different jobs. The most common example of a job is to keep a single table repaired from the local nodes point of view. The priority of a job is calculated based on the last time the table was repaired. Repairs performed outside of the local ecChronos instance would be included towards the progress.

When the job is executed the work is split into one or more tasks. In the case of repairs one task could correspond to the repair of one virtual node. When all virtual nodes are repaired the job is considered to be finished and will be added back to the work queue.

As repair is a resource intensive operation the leases are used to make sure that a node is only part of one repair at a time. It is configurable if the leases should be on a data center level or on a node level.

Run policies

Run policies are used to prevent jobs from running. Before a job is started the run policies are consulted to see if it is appropriate for the job to run at this time.

The default implementation is time based and reads configuration from a table in Apache Cassandra. For more information about time based run policy refer to Time based run policy

Repair scheduling

The repair scheduling begins by providing a RepairConfiguration to the RepairScheduler. The repair scheduler then creates a TableRepairJob or IncrementalRepairJob and schedules it using the ScheduleManager [2].

stateDiagram
    direction LR
    state RepairScheduler {
      direction LR
      RepairConfiguration1
      RepairConfiguration2
      RepairConfiguration3
    }
    state SchedulerManager {
      [...]
    }
    state ShouldJobRunCondition <<choice>>
    RepairScheduler --> CreateRepairJobs
    CreateRepairJobs --> SchedulerManager
    SchedulerManager --> RefreshPriorities
    RefreshPriorities --> PickJobWithHighestPriority
    PickJobWithHighestPriority --> ShouldJobRun
    ShouldJobRun --> ShouldJobRunCondition
    ShouldJobRunCondition --> PickJobWithHighestPriority: No
    ShouldJobRunCondition --> CreateJobTasks: Yes
    CreateJobTasks --> ExecuteTasks
    ExecuteTasks --> SchedulerManager
Loading
Figure 4: Scheduling flow.

Vnode repairs

Each TableRepairJob keeps a representation of the repair history in the RepairState. This information is used to determine when the table is eligible for the next repair and when to send alarms if necessary.

When a table is able to run repair the RepairState calculates the next tokens to repair and collects it in an ordered list of ReplicaRepairGroups. The calculation is performed by the VnodeRepairGroupFactory by default. The TableRepairJob then generates RepairGroups which are snapshots from how the state was when it was calculated. When the RepairGroup is executed it will generate one VnodeRepairTask per token range to repair. The VnodeRepairTask is the class that will perform the repair.

Sub-range repairs

As of #96 the repair scheduler in ecChronos has support for sub range repairs within virtual nodes. This is activated by specifying a target repair size in the RepairConfiguration. For the standalone version the option is called repair.size.target in the configuration file. Each sub-range repair session will aim to handle the target amount of data.

Note: Without this option specified the repair mechanism will handle full virtual nodes only (including how it interprets the repair history)
Note: The target repair size is assuming a uniform data distribution across partitions on the local node

Example

With a local table containing 100 bytes of data and a total of 100 tokens locally in a continuous range (0, 100]. When the repair target is set to 2 bytes the range will be divided into 50 sub ranges, each handling two tokens. The sub ranges would be:
(0, 2]
(2, 4]
...
(96, 98]
(98, 100]

Repair history

Sub-ranges are handled in two parts, one part is building an internal state of the repair history and the other is performing the repairs. While building the internal repair history state all sub-ranges which are fully contained within a local virtual node are collected from the repair history. This means that for a virtual node (1, 5] it will collect ranges such as (1, 3] and (2, 4]. It will not collect (0, 3] since it is not fully contained in the virtual node even though it is intersecting it.

As the sub-range repair mechanism is using dynamic sizes of the sub-ranges there is a need of handling overlapping sub-ranges. E.g. there could be entries for both (1, 3] and (2, 4] within one virtual node that has been repaired at different times. This is handled by splitting the history into (1, 2], (2, 3] and (3, 4] where the middle range gets the latest repair time of the two. In order to keep the memory footprint small these ranges are later consolidated where adjacent ranges that has been repaired closely together are merged.

In a larger context this also works for repairs covering the full virtual node. Given a virtual node (0, 30] that was repaired at timestamp X and a repair history entry containing the sub range (15, 20] repaired at Y. Assuming that X is more than one hour before Y this will produce three sub ranges in the internal representation:

  • (0, 15] repaired at X.
  • (15, 20] repaired at Y
  • (20, 30] repaired at X

Incremental repairs

Incremental repairs do not use ecchronos repair history

Each IncrementalRepairJob uses metrics from Cassandra maxRepairedAt and percentRepaired. This information is used to determine when the job is eligible for the next repair and when to send alarms if necessary. The job jumps over intervals if there's nothing to repair, i.e percentRepaired is 100%.

When the job runs, it calculates the replicas that might be involved in the repair using ReplicationStateImpl. Afterwards a single RepairGroups is created. When the RepairGroup is executed it will generate one IncrementalRepairTask. The IncrementalRepairTask is the class that will perform the incremental repair [3].

References

[1]: Consensus on Cassandra;

[2]: Incremental and Full Repairs

[3]: Cassandra Metrics