From 5e5367a024bbfef6a578d2aeae4f96f735919a38 Mon Sep 17 00:00:00 2001 From: Mark Drobnak Date: Fri, 17 Jul 2020 12:11:07 -0400 Subject: [PATCH 1/3] Avoid concurrent inserts when creating a collection in MysqlDb (#722) * Avoid concurrent inserts when creating a collection in MysqlDb Simply wrapping the SELECT + INSERT in a transaction is not sufficient for avoiding concurrency issues. Both transactions could try to insert the same row, which would cause one transaction to fail. Using `INSERT IGNORE` first and then selecting the ID does work in a concurrent setting. * Remove unused last_insert_id Closes #678 Co-authored-by: JR Conlin --- src/db/mysql/models.rs | 41 ++++++++++++++++++++++------------------- src/db/mysql/test.rs | 2 +- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/db/mysql/models.rs b/src/db/mysql/models.rs index 7f9082ed8d..29b91419d2 100644 --- a/src/db/mysql/models.rs +++ b/src/db/mysql/models.rs @@ -33,8 +33,6 @@ use crate::db::{ use crate::server::metrics::Metrics; use crate::web::extractors::{BsoQueryParams, HawkIdentifier}; -no_arg_sql_function!(last_insert_id, Integer); - pub type Result = std::result::Result; type Conn = PooledConnection>; @@ -314,25 +312,27 @@ impl MysqlDb { self.get_storage_timestamp_sync(params.user_id) } - pub(super) fn create_collection(&self, name: &str) -> Result { - // XXX: handle concurrent attempts at inserts + pub(super) fn get_or_create_collection_id(&self, name: &str) -> Result { + if let Some(id) = self.coll_cache.get_id(name)? { + return Ok(id); + } + let id = self.conn.transaction(|| { - sql_query( - "INSERT INTO collections (name) - VALUES (?)", - ) - .bind::(name) - .execute(&self.conn)?; - collections::table.select(last_insert_id).first(&self.conn) + diesel::insert_or_ignore_into(collections::table) + .values(collections::name.eq(name)) + .execute(&self.conn)?; + + collections::table + .select(collections::id) + .filter(collections::name.eq(name)) + .first(&self.conn) })?; - Ok(id) - } - fn get_or_create_collection_id(&self, name: &str) -> Result { - self.get_collection_id(name).or_else(|e| match e.kind() { - DbErrorKind::CollectionNotFound => self.create_collection(name), - _ => Err(e), - }) + if !self.session.borrow().in_write_transaction { + self.coll_cache.put(id, name.to_owned())?; + } + + Ok(id) } pub(super) fn get_collection_id(&self, name: &str) -> Result { @@ -989,7 +989,10 @@ impl<'a> Db<'a> for MysqlDb { #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); - Box::pin(block(move || db.create_collection(&name).map_err(Into::into)).map_err(Into::into)) + Box::pin( + block(move || db.get_or_create_collection_id(&name).map_err(Into::into)) + .map_err(Into::into), + ) } #[cfg(test)] diff --git a/src/db/mysql/test.rs b/src/db/mysql/test.rs index 7b6afad304..9270e56f4d 100644 --- a/src/db/mysql/test.rs +++ b/src/db/mysql/test.rs @@ -93,7 +93,7 @@ fn static_collection_id() -> Result<()> { assert_eq!(result, *id); } - let cid = db.create_collection("col1")?; + let cid = db.get_or_create_collection_id("col1")?; assert!(cid >= 100); Ok(()) } From 7cd04bc9b4245bfb2ffca5e09de99cf3dd5753a8 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Sat, 18 Jul 2020 05:13:58 -0700 Subject: [PATCH 2/3] fix: switch create_session to async (#733) note google_default_credentials blocking concerns (#732) Closes #731 Issue #732 --- src/db/spanner/manager.rs | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/db/spanner/manager.rs b/src/db/spanner/manager.rs index a264c1f9f4..fd4f0df17b 100644 --- a/src/db/spanner/manager.rs +++ b/src/db/spanner/manager.rs @@ -72,18 +72,23 @@ impl ManageConnection type Error = grpcio::Error; async fn connect(&self) -> Result { - // Requires GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json - let creds = ChannelCredentials::google_default_credentials()?; - - // Create a Spanner client. - let chan = ChannelBuilder::new(self.env.clone()) - .max_send_message_len(100 << 20) - .max_receive_message_len(100 << 20) - .secure_connect(SPANNER_ADDRESS, creds); + let chan = { + // Requires + // GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json + // XXX: issue732: Could google_default_credentials (or + // ChannelBuilder::secure_connect) block?! + let creds = ChannelCredentials::google_default_credentials()?; + + // Create a Spanner client. + ChannelBuilder::new(self.env.clone()) + .max_send_message_len(100 << 20) + .max_receive_message_len(100 << 20) + .secure_connect(SPANNER_ADDRESS, creds) + }; let client = SpannerClient::new(chan); // Connect to the instance and create a Spanner session. - let session = create_session(&client, &self.database_name)?; + let session = create_session(&client, &self.database_name).await?; Ok(SpannerSession { client, @@ -100,7 +105,7 @@ impl ManageConnection grpcio::Error::RpcFailure(ref status) if status.status == grpcio::RpcStatusCode::NOT_FOUND => { - conn.session = create_session(&conn.client, &self.database_name)?; + conn.session = create_session(&conn.client, &self.database_name).await?; } _ => return Err(e), } @@ -113,12 +118,15 @@ impl ManageConnection } } -fn create_session(client: &SpannerClient, database_name: &str) -> Result { +async fn create_session( + client: &SpannerClient, + database_name: &str, +) -> Result { let mut req = CreateSessionRequest::new(); req.database = database_name.to_owned(); let mut meta = MetadataBuilder::new(); meta.add_str("google-cloud-resource-prefix", database_name)?; meta.add_str("x-goog-api-client", "gcp-grpc-rs")?; let opt = CallOption::default().headers(meta.build()); - client.create_session_opt(&req, opt) + client.create_session_async_opt(&req, opt)?.await } From 40b97fc331d088462e09cbc5949b961ef5b6d4a5 Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Mon, 20 Jul 2020 17:00:01 -0700 Subject: [PATCH 3/3] feat: make migrations play nice with existing databases. (#721) Closes #663 Co-authored-by: Donovan Preston --- migrations/2018-08-28-010336_init/up.sql | 10 +-- migrations/2019-09-11-164500/down.sql | 5 ++ migrations/2019-09-11-164500/up.sql | 65 +++++++++++++++---- .../2020-04-03-102015_change_userid/up.sql | 1 - 4 files changed, 64 insertions(+), 17 deletions(-) diff --git a/migrations/2018-08-28-010336_init/up.sql b/migrations/2018-08-28-010336_init/up.sql index aadfc5c8b1..3463796ed1 100644 --- a/migrations/2018-08-28-010336_init/up.sql +++ b/migrations/2018-08-28-010336_init/up.sql @@ -2,7 +2,7 @@ -- DROP TABLE IF EXISTS `bso`; -- XXX: bsov1, etc -CREATE TABLE `bso` ( +CREATE TABLE IF NOT EXISTS `bso`( `user_id` INT NOT NULL, `collection_id` INT NOT NULL, `id` VARCHAR(64) NOT NULL, @@ -22,8 +22,8 @@ CREATE TABLE `bso` ( ) ENGINE=InnoDB DEFAULT CHARSET=latin1; --- DROP TABLE IF EXISTS `collections`; -CREATE TABLE `collections` ( +DROP TABLE IF EXISTS `collections`; +CREATE TABLE `collections`( `id` INT PRIMARY KEY NOT NULL AUTO_INCREMENT, `name` VARCHAR(32) UNIQUE NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1; @@ -44,7 +44,7 @@ INSERT INTO collections (id, name) VALUES -- DROP TABLE IF EXISTS `user_collections`; -CREATE TABLE `user_collections` ( +CREATE TABLE IF NOT EXISTS `user_collections`( `user_id` INT NOT NULL, `collection_id` INT NOT NULL, -- last modified time in milliseconds since epoch @@ -55,7 +55,7 @@ CREATE TABLE `user_collections` ( -- XXX: based on the go version (bsos is a concatenated blob of BSO jsons separated by newlines) -- DROP TABLE IF EXISTS `batches`; -CREATE TABLE `batches` ( +CREATE TABLE IF NOT EXISTS `batches`( `user_id` INT NOT NULL, `collection_id` INT NOT NULL, `id` BIGINT NOT NULL, diff --git a/migrations/2019-09-11-164500/down.sql b/migrations/2019-09-11-164500/down.sql index 591b22ed48..d054b1b6c2 100644 --- a/migrations/2019-09-11-164500/down.sql +++ b/migrations/2019-09-11-164500/down.sql @@ -1,3 +1,8 @@ +-- At this point, it's sanest to just drop the tables rather than revert them +-- there are a number of non-backwards compatible changes performed and data +-- corruption is HIGHLY likely. +-- Best just try and install the python version (probably in a docker), and +-- let the client try and reconnect and restore. DROP TABLE IF EXISTS `bso`; DROP TABLE IF EXISTS `collections`; DROP TABLE IF EXISTS `user_collections`; diff --git a/migrations/2019-09-11-164500/up.sql b/migrations/2019-09-11-164500/up.sql index e7d3d99227..43198a4ed8 100644 --- a/migrations/2019-09-11-164500/up.sql +++ b/migrations/2019-09-11-164500/up.sql @@ -1,11 +1,54 @@ -alter table `batches` change column `user_id` `userid` int(11) not null; -alter table `batches` change column `collection_id` `collection` int(11) not null; -alter table `bso` change column `user_id` `userid` int(11) not null; -alter table `bso` change column `collection_id` `collection` int(11) not null; -alter table `bso` change column `expiry` `ttl` bigint(20) not null; -alter table `user_collections` change column `user_id` `userid` int(11) not null; -alter table `user_collections` change column `collection_id` `collection` int(11) not null; -alter table `user_collections` change column `modified` `last_modified` bigint(20) not null; --- must be last in case of error --- the following column is not used, but preserved for legacy and stand alone systems. -alter table `bso` add column `payload_size` int(11) default 0; +-- tests to see if columns need adjustment +-- this is because the `init` may create the tables +-- with column names already correct + +CREATE PROCEDURE UPDATE_165600() +BEGIN + IF EXISTS( SELECT column_name + FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name='bso' AND column_name="user_id") + THEN + BEGIN + alter table `bso` change column `user_id` `userid` int(11) not null; + alter table `bso` change column `collection_id` `collection` int(11) not null; + alter table `bso` change column `expiry` `ttl` bigint(20) not null; + END; + END IF; + + IF EXISTS( SELECT column_name + FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name='batches' AND column_name="user_id") + THEN + BEGIN + alter table `batches` change column `user_id` `userid` int(11) not null; + alter table `batches` change column `collection_id` `collection` int(11) not null; + END; + END IF; + + IF EXISTS( SELECT column_name + FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name='user_collections' AND column_name="user_id") + THEN + BEGIN + alter table `user_collections` change column `user_id` `userid` int(11) not null; + alter table `user_collections` change column `collection_id` `collection` int(11) not null; + alter table `user_collections` change column `modified` `last_modified` bigint(20) not null; + END; + END IF; + + -- must be last in case of error + -- the following column is not used, but preserved for legacy and stand alone systems. + IF NOT EXISTS( SELECT column_name + FROM INFORMATION_SCHEMA.COLUMNS + where table_name='bso' AND column_name="payload_size") + THEN + BEGIN + alter table `bso` add column `payload_size` int(11) default 0; + END; + + END IF; +END; + +CALL UPDATE_165600(); + +DROP PROCEDURE UPDATE_165600; \ No newline at end of file diff --git a/migrations/2020-04-03-102015_change_userid/up.sql b/migrations/2020-04-03-102015_change_userid/up.sql index 42448b9df5..6c48ac4eb1 100644 --- a/migrations/2020-04-03-102015_change_userid/up.sql +++ b/migrations/2020-04-03-102015_change_userid/up.sql @@ -1,4 +1,3 @@ --- Your SQL goes here alter table `batches` change column `userid` `userid` bigint(20) not null; alter table `bso` change column `userid` `userid` bigint(20) not null; alter table `user_collections` change column `userid` `userid` bigint(20) not null;