diff --git a/tokenserver-db-mysql/src/lib.rs b/tokenserver-db-mysql/src/lib.rs index 30caa23e1a..2dd39ce6f5 100644 --- a/tokenserver-db-mysql/src/lib.rs +++ b/tokenserver-db-mysql/src/lib.rs @@ -3,3 +3,4 @@ extern crate diesel; extern crate diesel_migrations; pub mod pool; +pub mod models; diff --git a/tokenserver-db-mysql/src/models.rs b/tokenserver-db-mysql/src/models.rs new file mode 100644 index 0000000000..692454ad99 --- /dev/null +++ b/tokenserver-db-mysql/src/models.rs @@ -0,0 +1,108 @@ +pub const LAST_INSERT_ID_QUERY: &'static str = "SELECT LAST_INSERT_ID() AS id"; +pub const GET_NODE_ID_SYNC_QUERY: &str = r#" +SELECT id +FROM nodes +WHERE service = ? +AND node = ?"#; +pub const REPLACE_USERS_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE service = ? +AND email = ? +AND replaced_at IS NULL +AND created_at < ?"#; +pub const REPLACE_USER_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE service = ? +AND uid = ?"#; +// The `where` clause on this statement is designed as an extra layer of +// protection, to ensure that concurrent updates don't accidentally move +// timestamp fields backwards in time. The handling of `keys_changed_at` +// is additionally weird because we want to treat the default `NULL` value +// as zero. +pub const PUT_USER_SYNC_QUERY: &str = r#" +UPDATE users +SET generation = ?, +keys_changed_at = ? +WHERE service = ? +AND email = ? +AND generation <= ? +AND COALESCE(keys_changed_at, 0) <= COALESCE(?, keys_changed_at, 0) +AND replaced_at IS NULL"#; +pub const POST_USER_SYNC_QUERY: &str = r#" +INSERT INTO users (service, email, generation, client_state, created_at, nodeid, keys_changed_at, replaced_at) +VALUES (?, ?, ?, ?, ?, ?, ?, NULL);"#; +pub const CHECK_SYNC_QUERY: &str = "SHOW STATUS LIKE \"Uptime\""; +pub const GET_BEST_NODE_QUERY: &str = r#" +SELECT id, node +FROM nodes +WHERE service = ? +AND available > 0 +AND capacity > current_load +AND downed = 0 +AND backoff = 0 +ORDER BY LOG(current_load) / LOG(capacity) +LIMIT 1"#; +pub const GET_BEST_NODE_RELEASE_CAPACITY_QUERY: &str = r#" +UPDATE nodes +SET available = LEAST(capacity * ?, capacity - current_load) +WHERE service = ? +AND available <= 0 +AND capacity > current_load +AND downed = 0"#; +pub const GET_BEST_NODE_SPANNER_QUERY: &str = r#" +SELECT id, node +FROM nodes +WHERE id = ? +LIMIT 1"#; +pub const ADD_USER_TO_NODE_SYNC_QUERY: &str = r#" +UPDATE nodes +SET current_load = current_load + 1, +available = GREATEST(available - 1, 0) +WHERE service = ? +AND node = ?"#; +pub const ADD_USER_TO_NODE_SYNC_SPANNER_QUERY: &str = r#" +UPDATE nodes +SET current_load = current_load + 1 +WHERE service = ? +AND node = ?"#; +pub const GET_USERS_SYNC_QUERY: &str = r#" +SELECT uid, nodes.node, generation, keys_changed_at, client_state, created_at, replaced_at +FROM users +LEFT OUTER JOIN nodes ON users.nodeid = nodes.id +WHERE email = ? +AND users.service = ? +ORDER BY created_at DESC, uid DESC +LIMIT 20"#; +pub const GET_SERVICE_ID_SYNC_QUERY: &str = r#" +SELECT id +FROM services +WHERE service = ?"#; +pub const SET_USER_CREATED_AT_SYNC_QUERY: &str = r#" +UPDATE users +SET created_at = ? +WHERE uid = ?"#; +pub const SET_USER_REPLACED_AT_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE uid = ?"#; +pub const GET_USER_SYNC_QUERY: &str = r#" +SELECT service, email, generation, client_state, replaced_at, nodeid, keys_changed_at +FROM users +WHERE uid = ?"#; +pub const POST_NODE_SYNC_QUERY: &str = r#" +INSERT INTO nodes (service, node, available, current_load, capacity, downed, backoff) +VALUES (?, ?, ?, ?, ?, ?, ?)"#; +pub const GET_NODE_SYNC_QUERY: &str = r#" +SELECT * +FROM nodes +WHERE id = ?"#; +pub const UNASSIGNED_NODE_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE nodeid = ?"#; +pub const REMOVE_NODE_SYNC_QUERY: &str = "DELETE FROM nodes WHERE id = ?"; +pub const POST_SERVICE_INSERT_SERVICE_QUERY: &str = r#" +INSERT INTO services (service, pattern) +VALUES (?, ?)"#; diff --git a/tokenserver-db-sqlite/src/lib.rs b/tokenserver-db-sqlite/src/lib.rs index 30caa23e1a..2dd39ce6f5 100644 --- a/tokenserver-db-sqlite/src/lib.rs +++ b/tokenserver-db-sqlite/src/lib.rs @@ -3,3 +3,4 @@ extern crate diesel; extern crate diesel_migrations; pub mod pool; +pub mod models; diff --git a/tokenserver-db-sqlite/src/models.rs b/tokenserver-db-sqlite/src/models.rs new file mode 100644 index 0000000000..f552739eb2 --- /dev/null +++ b/tokenserver-db-sqlite/src/models.rs @@ -0,0 +1,108 @@ +pub const LAST_INSERT_ID_QUERY: &'static str = "SELECT LAST_INSERT_ROWID() AS id"; +pub const GET_NODE_ID_SYNC_QUERY: &str = r#" +SELECT rowid as id +FROM nodes +WHERE service = ? +AND node = ?"#; +pub const REPLACE_USERS_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE service = ? +AND email = ? +AND replaced_at IS NULL +AND created_at < ?"#; +pub const REPLACE_USER_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE service = ? +AND uid = ?"#; +// The `where` clause on this statement is designed as an extra layer of +// protection, to ensure that concurrent updates don't accidentally move +// timestamp fields backwards in time. The handling of `keys_changed_at` +// is additionally weird because we want to treat the default `NULL` value +// as zero. +pub const PUT_USER_SYNC_QUERY: &str = r#" +UPDATE users +SET generation = ?, +keys_changed_at = ? +WHERE service = ? +AND email = ? +AND generation <= ? +AND COALESCE(keys_changed_at, 0) <= COALESCE(?, keys_changed_at, 0) +AND replaced_at IS NULL"#; +pub const POST_USER_SYNC_QUERY: &str = r#" +INSERT INTO users (service, email, generation, client_state, created_at, nodeid, keys_changed_at, replaced_at) +VALUES (?, ?, ?, ?, ?, ?, ?, ?);"#; +pub const CHECK_SYNC_QUERY: &str = "SHOW STATUS LIKE \"Uptime\""; +pub const GET_BEST_NODE_QUERY: &str = r#" +SELECT id, node +FROM nodes +WHERE service = ? +AND available > 0 +AND capacity > current_load +AND downed = 0 +AND backoff = 0 +ORDER BY LOG(current_load) / LOG(capacity) +LIMIT 1"#; +pub const GET_BEST_NODE_RELEASE_CAPACITY_QUERY: &str = r#" +UPDATE nodes +SET available = LEAST(capacity * ?, capacity - current_load) +WHERE service = ? +AND available <= 0 +AND capacity > current_load +AND downed = 0"#; +pub const GET_BEST_NODE_SPANNER_QUERY: &str = r#" +SELECT id, node +FROM nodes +WHERE id = ? +LIMIT 1"#; +pub const ADD_USER_TO_NODE_SYNC_QUERY: &str = r#" +UPDATE nodes +SET current_load = current_load + 1, +available = GREATEST(available - 1, 0) +WHERE service = ? +AND node = ?"#; +pub const ADD_USER_TO_NODE_SYNC_SPANNER_QUERY: &str = r#" +UPDATE nodes +SET current_load = current_load + 1 +WHERE service = ? +AND node = ?"#; +pub const GET_USERS_SYNC_QUERY: &str = r#" +SELECT uid, nodes.node, generation, keys_changed_at, client_state, created_at, replaced_at +FROM users +LEFT OUTER JOIN nodes ON users.nodeid = nodes.id +WHERE email = ? +AND users.service = ? +ORDER BY created_at DESC, uid DESC +LIMIT 20"#; +pub const GET_SERVICE_ID_SYNC_QUERY: &str = r#" +SELECT id +FROM services +WHERE service = ?"#; +pub const SET_USER_CREATED_AT_SYNC_QUERY: &str = r#" +UPDATE users +SET created_at = ? +WHERE uid = ?"#; +pub const SET_USER_REPLACED_AT_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE uid = ?"#; +pub const GET_USER_SYNC_QUERY: &str = r#" +SELECT service, email, generation, client_state, replaced_at, nodeid, keys_changed_at +FROM users +WHERE uid = ?"#; +pub const POST_NODE_SYNC_QUERY: &str = r#" +INSERT INTO nodes (service, node, available, current_load, capacity, downed, backoff) +VALUES (?, ?, ?, ?, ?, ?, ?)"#; +pub const GET_NODE_SYNC_QUERY: &str = r#" +SELECT * +FROM nodes +WHERE id = ?"#; +pub const UNASSIGNED_NODE_SYNC_QUERY: &str = r#" +UPDATE users +SET replaced_at = ? +WHERE nodeid = ?"#; +pub const REMOVE_NODE_SYNC_QUERY: &str = "DELETE FROM nodes WHERE id = ?"; +pub const POST_SERVICE_INSERT_SERVICE_QUERY: &str = r#" +INSERT INTO services (service, pattern) +VALUES (?, ?)"#; diff --git a/tokenserver-db/src/models.rs b/tokenserver-db/src/models.rs index b2d7538a01..5b0f5f49db 100644 --- a/tokenserver-db/src/models.rs +++ b/tokenserver-db/src/models.rs @@ -13,6 +13,10 @@ use http::StatusCode; use syncserver_common::{BlockingThreadpool, Metrics}; use syncserver_db_common::{sync_db_method, DbFuture}; use tokenserver_db_common::error::{DbError, DbResult}; +#[cfg(feature = "mysql")] +use tokenserver_db_mysql::models::*; +#[cfg(feature = "sqlite")] +use tokenserver_db_sqlite::models::*; use super::{params, results, PooledConn}; @@ -56,12 +60,6 @@ impl TokenserverDb { // If connections were shared across requests, using this function would introduce a race condition, // as we could potentially get IDs from records created during other requests. - // TODO: Move this in backend specific crates - #[cfg(feature = "mysql")] - const LAST_INSERT_ID_QUERY: &'static str = "SELECT LAST_INSERT_ID() AS id"; - #[cfg(feature = "sqlite")] - const LAST_INSERT_ID_QUERY: &'static str = "SELECT LAST_INSERT_ROWID() AS id"; - pub fn new( conn: PooledConn, metrics: &Metrics, @@ -90,20 +88,6 @@ impl TokenserverDb { } fn get_node_id_sync(&self, params: params::GetNodeId) -> DbResult { - #[cfg(feature = "mysql")] - const QUERY: &str = r#" - SELECT id - FROM nodes - WHERE service = ? - AND node = ? - "#; - #[cfg(feature = "sqlite")] - const QUERY: &str = r#" - SELECT rowid as id - FROM nodes - WHERE service = ? - AND node = ? - "#; if let Some(id) = self.spanner_node_id { Ok(results::GetNodeId { id: id as i64 }) @@ -111,7 +95,7 @@ impl TokenserverDb { let mut metrics = self.metrics.clone(); metrics.start_timer("storage.get_node_id", None); - diesel::sql_query(QUERY) + diesel::sql_query(GET_NODE_ID_SYNC_QUERY) .bind::(params.service_id) .bind::(¶ms.node) .get_result(&self.inner.conn) @@ -121,19 +105,10 @@ impl TokenserverDb { /// Mark users matching the given email and service ID as replaced. fn replace_users_sync(&self, params: params::ReplaceUsers) -> DbResult { - const QUERY: &str = r#" - UPDATE users - SET replaced_at = ? - WHERE service = ? - AND email = ? - AND replaced_at IS NULL - AND created_at < ? - "#; - let mut metrics = self.metrics.clone(); metrics.start_timer("storage.replace_users", None); - diesel::sql_query(QUERY) + diesel::sql_query(REPLACE_USERS_SYNC_QUERY) .bind::(params.replaced_at) .bind::(¶ms.service_id) .bind::(¶ms.email) @@ -145,14 +120,7 @@ impl TokenserverDb { /// Mark the user with the given uid and service ID as being replaced. fn replace_user_sync(&self, params: params::ReplaceUser) -> DbResult { - const QUERY: &str = r#" - UPDATE users - SET replaced_at = ? - WHERE service = ? - AND uid = ? - "#; - - diesel::sql_query(QUERY) + diesel::sql_query(REPLACE_USER_SYNC_QUERY) .bind::(params.replaced_at) .bind::(params.service_id) .bind::(params.uid) @@ -164,26 +132,10 @@ impl TokenserverDb { /// Update the user with the given email and service ID with the given `generation` and /// `keys_changed_at`. fn put_user_sync(&self, params: params::PutUser) -> DbResult { - // The `where` clause on this statement is designed as an extra layer of - // protection, to ensure that concurrent updates don't accidentally move - // timestamp fields backwards in time. The handling of `keys_changed_at` - // is additionally weird because we want to treat the default `NULL` value - // as zero. - const QUERY: &str = r#" - UPDATE users - SET generation = ?, - keys_changed_at = ? - WHERE service = ? - AND email = ? - AND generation <= ? - AND COALESCE(keys_changed_at, 0) <= COALESCE(?, keys_changed_at, 0) - AND replaced_at IS NULL - "#; - let mut metrics = self.metrics.clone(); metrics.start_timer("storage.put_user", None); - diesel::sql_query(QUERY) + diesel::sql_query(PUT_USER_SYNC_QUERY) .bind::(params.generation) .bind::, _>(params.keys_changed_at) .bind::(¶ms.service_id) @@ -197,21 +149,10 @@ impl TokenserverDb { /// Create a new user. fn post_user_sync(&self, user: params::PostUser) -> DbResult { - #[cfg(feature = "mysql")] - const QUERY: &str = r#" - INSERT INTO users (service, email, generation, client_state, created_at, nodeid, keys_changed_at, replaced_at) - VALUES (?, ?, ?, ?, ?, ?, ?, NULL); - "#; - #[cfg(feature = "sqlite")] - const QUERY: &str = r#" - INSERT INTO users (service, email, generation, client_state, created_at, nodeid, keys_changed_at, replaced_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?); - "#; - let mut metrics = self.metrics.clone(); metrics.start_timer("storage.post_user", None); - diesel::sql_query(QUERY) + diesel::sql_query(POST_USER_SYNC_QUERY) .bind::(user.service_id) .bind::(&user.email) .bind::(user.generation) @@ -221,7 +162,7 @@ impl TokenserverDb { .bind::, _>(user.keys_changed_at) .execute(&self.inner.conn)?; - diesel::sql_query(Self::LAST_INSERT_ID_QUERY) + diesel::sql_query(LAST_INSERT_ID_QUERY) .bind::(&user.email) .get_result::(&self.inner.conn) .map_err(Into::into) @@ -229,44 +170,19 @@ impl TokenserverDb { fn check_sync(&self) -> DbResult { // has the database been up for more than 0 seconds? - let result = diesel::sql_query("SHOW STATUS LIKE \"Uptime\"").execute(&self.inner.conn)?; + let result = diesel::sql_query(CHECK_SYNC_QUERY).execute(&self.inner.conn)?; Ok(result as u64 > 0) } /// Gets the least-loaded node that has available slots. fn get_best_node_sync(&self, params: params::GetBestNode) -> DbResult { const DEFAULT_CAPACITY_RELEASE_RATE: f32 = 0.1; - const GET_BEST_NODE_QUERY: &str = r#" - SELECT id, node - FROM nodes - WHERE service = ? - AND available > 0 - AND capacity > current_load - AND downed = 0 - AND backoff = 0 - ORDER BY LOG(current_load) / LOG(capacity) - LIMIT 1 - "#; - const RELEASE_CAPACITY_QUERY: &str = r#" - UPDATE nodes - SET available = LEAST(capacity * ?, capacity - current_load) - WHERE service = ? - AND available <= 0 - AND capacity > current_load - AND downed = 0 - "#; - const SPANNER_QUERY: &str = r#" - SELECT id, node - FROM nodes - WHERE id = ? - LIMIT 1 - "#; let mut metrics = self.metrics.clone(); metrics.start_timer("storage.get_best_node", None); if let Some(spanner_node_id) = self.spanner_node_id { - diesel::sql_query(SPANNER_QUERY) + diesel::sql_query(GET_BEST_NODE_SPANNER_QUERY) .bind::(spanner_node_id) .get_result::(&self.inner.conn) .map_err(|e| { @@ -290,7 +206,7 @@ impl TokenserverDb { // There were no available nodes. Try to release additional capacity from any nodes // that are not fully occupied. - let affected_rows = diesel::sql_query(RELEASE_CAPACITY_QUERY) + let affected_rows = diesel::sql_query(GET_BEST_NODE_RELEASE_CAPACITY_QUERY) .bind::( params .capacity_release_rate @@ -318,24 +234,10 @@ impl TokenserverDb { let mut metrics = self.metrics.clone(); metrics.start_timer("storage.add_user_to_node", None); - const QUERY: &str = r#" - UPDATE nodes - SET current_load = current_load + 1, - available = GREATEST(available - 1, 0) - WHERE service = ? - AND node = ? - "#; - const SPANNER_QUERY: &str = r#" - UPDATE nodes - SET current_load = current_load + 1 - WHERE service = ? - AND node = ? - "#; - let query = if self.spanner_node_id.is_some() { - SPANNER_QUERY + ADD_USER_TO_NODE_SYNC_SPANNER_QUERY } else { - QUERY + ADD_USER_TO_NODE_SYNC_QUERY }; diesel::sql_query(query) @@ -350,18 +252,7 @@ impl TokenserverDb { let mut metrics = self.metrics.clone(); metrics.start_timer("storage.get_users", None); - const QUERY: &str = r#" - SELECT uid, nodes.node, generation, keys_changed_at, client_state, created_at, - replaced_at - FROM users - LEFT OUTER JOIN nodes ON users.nodeid = nodes.id - WHERE email = ? - AND users.service = ? - ORDER BY created_at DESC, uid DESC - LIMIT 20 - "#; - - diesel::sql_query(QUERY) + diesel::sql_query(GET_USERS_SYNC_QUERY) .bind::(¶ms.email) .bind::(params.service_id) .load::(&self.inner.conn) @@ -532,16 +423,11 @@ impl TokenserverDb { &self, params: params::GetServiceId, ) -> DbResult { - const QUERY: &str = r#" - SELECT id - FROM services - WHERE service = ? - "#; if let Some(id) = self.service_id { Ok(results::GetServiceId { id }) } else { - diesel::sql_query(QUERY) + diesel::sql_query(GET_SERVICE_ID_SYNC_QUERY) .bind::(params.service) .get_result::(&self.inner.conn) .map_err(Into::into) @@ -553,12 +439,7 @@ impl TokenserverDb { &self, params: params::SetUserCreatedAt, ) -> DbResult { - const QUERY: &str = r#" - UPDATE users - SET created_at = ? - WHERE uid = ? - "#; - diesel::sql_query(QUERY) + diesel::sql_query(SET_USER_CREATED_AT_SYNC_QUERY) .bind::(params.created_at) .bind::(¶ms.uid) .execute(&self.inner.conn) @@ -571,12 +452,7 @@ impl TokenserverDb { &self, params: params::SetUserReplacedAt, ) -> DbResult { - const QUERY: &str = r#" - UPDATE users - SET replaced_at = ? - WHERE uid = ? - "#; - diesel::sql_query(QUERY) + diesel::sql_query(SET_USER_REPLACED_AT_SYNC_QUERY) .bind::(params.replaced_at) .bind::(¶ms.uid) .execute(&self.inner.conn) @@ -586,13 +462,7 @@ impl TokenserverDb { #[cfg(test)] fn get_user_sync(&self, params: params::GetUser) -> DbResult { - const QUERY: &str = r#" - SELECT service, email, generation, client_state, replaced_at, nodeid, keys_changed_at - FROM users - WHERE uid = ? - "#; - - diesel::sql_query(QUERY) + diesel::sql_query(GET_USER_SYNC_QUERY) .bind::(params.id) .get_result::(&self.inner.conn) .map_err(Into::into) @@ -600,11 +470,7 @@ impl TokenserverDb { #[cfg(test)] fn post_node_sync(&self, params: params::PostNode) -> DbResult { - const QUERY: &str = r#" - INSERT INTO nodes (service, node, available, current_load, capacity, downed, backoff) - VALUES (?, ?, ?, ?, ?, ?, ?) - "#; - diesel::sql_query(QUERY) + diesel::sql_query(POST_NODE_SYNC_QUERY) .bind::(params.service_id) .bind::(¶ms.node) .bind::(params.available) @@ -614,20 +480,14 @@ impl TokenserverDb { .bind::(params.backoff) .execute(&self.inner.conn)?; - diesel::sql_query(Self::LAST_INSERT_ID_QUERY) + diesel::sql_query(LAST_INSERT_ID_QUERY) .get_result::(&self.inner.conn) .map_err(Into::into) } #[cfg(test)] fn get_node_sync(&self, params: params::GetNode) -> DbResult { - const QUERY: &str = r#" - SELECT * - FROM nodes - WHERE id = ? - "#; - - diesel::sql_query(QUERY) + diesel::sql_query(GET_NODE_SYNC_QUERY) .bind::(params.id) .get_result::(&self.inner.conn) .map_err(Into::into) @@ -635,18 +495,12 @@ impl TokenserverDb { #[cfg(test)] fn unassign_node_sync(&self, params: params::UnassignNode) -> DbResult { - const QUERY: &str = r#" - UPDATE users - SET replaced_at = ? - WHERE nodeid = ? - "#; - let current_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as i64; - diesel::sql_query(QUERY) + diesel::sql_query(UNASSIGNED_NODE_SYNC_QUERY) .bind::(current_time) .bind::(params.node_id) .execute(&self.inner.conn) @@ -656,9 +510,7 @@ impl TokenserverDb { #[cfg(test)] fn remove_node_sync(&self, params: params::RemoveNode) -> DbResult { - const QUERY: &str = "DELETE FROM nodes WHERE id = ?"; - - diesel::sql_query(QUERY) + diesel::sql_query(UNASSIGNED_NODE_SYNC_QUERY) .bind::(params.node_id) .execute(&self.inner.conn) .map(|_| ()) @@ -667,17 +519,12 @@ impl TokenserverDb { #[cfg(test)] fn post_service_sync(&self, params: params::PostService) -> DbResult { - const INSERT_SERVICE_QUERY: &str = r#" - INSERT INTO services (service, pattern) - VALUES (?, ?) - "#; - - diesel::sql_query(INSERT_SERVICE_QUERY) + diesel::sql_query(POST_SERVICE_INSERT_SERVICE_QUERY) .bind::(¶ms.service) .bind::(¶ms.pattern) .execute(&self.inner.conn)?; - diesel::sql_query(Self::LAST_INSERT_ID_QUERY) + diesel::sql_query(LAST_INSERT_ID_QUERY) .get_result::(&self.inner.conn) .map(|result| results::PostService { id: result.id as i32,