Skip to content

Commit

Permalink
fix: mysql: Replace the user_id%10 in batch_id with a global counter.
Browse files Browse the repository at this point in the history
Since the batch_uploads primary key already contains user_id, it does
nothing to increase time resolution. The counter may still see
conflicts with other processes, but the probably is much lower
now. (Spanner uses UUIDv4 for batch IDs.)

The user_id hack was added in e9b455f "to match the Python
implementation". E2E tests are failing because we can now
generate more than one batch within 10 ms.
  • Loading branch information
tommie committed Dec 29, 2024
1 parent 2538c85 commit 1402c55
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
15 changes: 9 additions & 6 deletions syncstorage-mysql/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use base64::Engine;
use std::collections::HashSet;
use std::{
collections::HashSet,
sync::atomic::{AtomicU32, Ordering},
};

use diesel::{
self,
Expand All @@ -21,6 +24,8 @@ use super::{

const MAXTTL: i32 = 2_100_000_000;

static COUNTER: AtomicU32 = AtomicU32::new(0);

pub fn create(db: &MysqlDb, params: params::CreateBatch) -> DbResult<results::CreateBatch> {
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
Expand All @@ -32,11 +37,9 @@ pub fn create(db: &MysqlDb, params: params::CreateBatch) -> DbResult<results::Cr
// sharding writes via (batchid % num_tables), and leaving it as zero would
// skew the sharding distribution.
//
// So we mix in the lowest digit of the uid to improve the distribution
// while still letting us treat these ids as millisecond timestamps. It's
// yuck, but it works and it keeps the weirdness contained to this single
// line of code.
let batch_id = db.timestamp().as_i64() + (user_id % 10);
// We mix in a per-process counter to make batch IDs (more) unique within
// a timestamp.
let batch_id = db.timestamp().as_i64() * 100 + COUNTER.fetch_add(1, Ordering::Relaxed) as i64 % 1000;
insert_into(batch_uploads::table)
.values((
batch_uploads::batch_id.eq(&batch_id),
Expand Down
5 changes: 3 additions & 2 deletions syncstorage-mysql/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,10 @@ impl MysqlDb {

// Lock the db
self.begin(true)?;
// SyncTimestamp only has 10 ms resolution.
let result = user_collections::table
.select((
sql::<BigInt>("UNIX_TIMESTAMP(NOW(2))*1000"),
sql::<BigInt>("UNIX_TIMESTAMP(UTC_TIMESTAMP(2))*1000"),
user_collections::modified,
))
.filter(user_collections::user_id.eq(user_id))
Expand All @@ -236,7 +237,7 @@ impl MysqlDb {
.insert((user_id as u32, collection_id), modified);
now
} else {
let result = sql_query("SELECT UNIX_TIMESTAMP(NOW(2))*1000 AS timestamp")
let result = sql_query("SELECT UNIX_TIMESTAMP(UTC_TIMESTAMP(2))*1000 AS timestamp")
.get_result::<TimestampResult>(&mut *self.conn.write().unwrap())
.unwrap();
SyncTimestamp::from_i64(result.timestamp)?
Expand Down

0 comments on commit 1402c55

Please sign in to comment.