Skip to content

Commit

Permalink
Merge branch 'master' into release/0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
pjenvey committed Jul 21, 2020
2 parents cdb2bbd + 40b97fc commit 75d49a7
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 49 deletions.
10 changes: 5 additions & 5 deletions migrations/2018-08-28-010336_init/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

-- DROP TABLE IF EXISTS `bso`;
-- XXX: bsov1, etc
CREATE TABLE `bso` (
CREATE TABLE IF NOT EXISTS `bso`(
`user_id` INT NOT NULL,
`collection_id` INT NOT NULL,
`id` VARCHAR(64) NOT NULL,
Expand All @@ -22,8 +22,8 @@ CREATE TABLE `bso` (
) ENGINE=InnoDB DEFAULT CHARSET=latin1;


-- DROP TABLE IF EXISTS `collections`;
CREATE TABLE `collections` (
DROP TABLE IF EXISTS `collections`;
CREATE TABLE `collections`(
`id` INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
`name` VARCHAR(32) UNIQUE NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Expand All @@ -44,7 +44,7 @@ INSERT INTO collections (id, name) VALUES


-- DROP TABLE IF EXISTS `user_collections`;
CREATE TABLE `user_collections` (
CREATE TABLE IF NOT EXISTS `user_collections`(
`user_id` INT NOT NULL,
`collection_id` INT NOT NULL,
-- last modified time in milliseconds since epoch
Expand All @@ -55,7 +55,7 @@ CREATE TABLE `user_collections` (

-- XXX: based on the go version (bsos is a concatenated blob of BSO jsons separated by newlines)
-- DROP TABLE IF EXISTS `batches`;
CREATE TABLE `batches` (
CREATE TABLE IF NOT EXISTS `batches`(
`user_id` INT NOT NULL,
`collection_id` INT NOT NULL,
`id` BIGINT NOT NULL,
Expand Down
5 changes: 5 additions & 0 deletions migrations/2019-09-11-164500/down.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
-- At this point, it's sanest to just drop the tables rather than revert them
-- there are a number of non-backwards compatible changes performed and data
-- corruption is HIGHLY likely.
-- Best just try and install the python version (probably in a docker), and
-- let the client try and reconnect and restore.
DROP TABLE IF EXISTS `bso`;
DROP TABLE IF EXISTS `collections`;
DROP TABLE IF EXISTS `user_collections`;
Expand Down
65 changes: 54 additions & 11 deletions migrations/2019-09-11-164500/up.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,54 @@
alter table `batches` change column `user_id` `userid` int(11) not null;
alter table `batches` change column `collection_id` `collection` int(11) not null;
alter table `bso` change column `user_id` `userid` int(11) not null;
alter table `bso` change column `collection_id` `collection` int(11) not null;
alter table `bso` change column `expiry` `ttl` bigint(20) not null;
alter table `user_collections` change column `user_id` `userid` int(11) not null;
alter table `user_collections` change column `collection_id` `collection` int(11) not null;
alter table `user_collections` change column `modified` `last_modified` bigint(20) not null;
-- must be last in case of error
-- the following column is not used, but preserved for legacy and stand alone systems.
alter table `bso` add column `payload_size` int(11) default 0;
-- tests to see if columns need adjustment
-- this is because the `init` may create the tables
-- with column names already correct

CREATE PROCEDURE UPDATE_165600()
BEGIN
IF EXISTS( SELECT column_name
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name='bso' AND column_name="user_id")
THEN
BEGIN
alter table `bso` change column `user_id` `userid` int(11) not null;
alter table `bso` change column `collection_id` `collection` int(11) not null;
alter table `bso` change column `expiry` `ttl` bigint(20) not null;
END;
END IF;

IF EXISTS( SELECT column_name
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name='batches' AND column_name="user_id")
THEN
BEGIN
alter table `batches` change column `user_id` `userid` int(11) not null;
alter table `batches` change column `collection_id` `collection` int(11) not null;
END;
END IF;

IF EXISTS( SELECT column_name
FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name='user_collections' AND column_name="user_id")
THEN
BEGIN
alter table `user_collections` change column `user_id` `userid` int(11) not null;
alter table `user_collections` change column `collection_id` `collection` int(11) not null;
alter table `user_collections` change column `modified` `last_modified` bigint(20) not null;
END;
END IF;

-- must be last in case of error
-- the following column is not used, but preserved for legacy and stand alone systems.
IF NOT EXISTS( SELECT column_name
FROM INFORMATION_SCHEMA.COLUMNS
where table_name='bso' AND column_name="payload_size")
THEN
BEGIN
alter table `bso` add column `payload_size` int(11) default 0;
END;

END IF;
END;

CALL UPDATE_165600();

DROP PROCEDURE UPDATE_165600;
1 change: 0 additions & 1 deletion migrations/2020-04-03-102015_change_userid/up.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- Your SQL goes here
alter table `batches` change column `userid` `userid` bigint(20) not null;
alter table `bso` change column `userid` `userid` bigint(20) not null;
alter table `user_collections` change column `userid` `userid` bigint(20) not null;
41 changes: 22 additions & 19 deletions src/db/mysql/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use crate::db::{
use crate::server::metrics::Metrics;
use crate::web::extractors::{BsoQueryParams, HawkIdentifier};

no_arg_sql_function!(last_insert_id, Integer);

pub type Result<T> = std::result::Result<T, DbError>;
type Conn = PooledConnection<ConnectionManager<MysqlConnection>>;

Expand Down Expand Up @@ -314,25 +312,27 @@ impl MysqlDb {
self.get_storage_timestamp_sync(params.user_id)
}

pub(super) fn create_collection(&self, name: &str) -> Result<i32> {
// XXX: handle concurrent attempts at inserts
pub(super) fn get_or_create_collection_id(&self, name: &str) -> Result<i32> {
if let Some(id) = self.coll_cache.get_id(name)? {
return Ok(id);
}

let id = self.conn.transaction(|| {
sql_query(
"INSERT INTO collections (name)
VALUES (?)",
)
.bind::<Text, _>(name)
.execute(&self.conn)?;
collections::table.select(last_insert_id).first(&self.conn)
diesel::insert_or_ignore_into(collections::table)
.values(collections::name.eq(name))
.execute(&self.conn)?;

collections::table
.select(collections::id)
.filter(collections::name.eq(name))
.first(&self.conn)
})?;
Ok(id)
}

fn get_or_create_collection_id(&self, name: &str) -> Result<i32> {
self.get_collection_id(name).or_else(|e| match e.kind() {
DbErrorKind::CollectionNotFound => self.create_collection(name),
_ => Err(e),
})
if !self.session.borrow().in_write_transaction {
self.coll_cache.put(id, name.to_owned())?;
}

Ok(id)
}

pub(super) fn get_collection_id(&self, name: &str) -> Result<i32> {
Expand Down Expand Up @@ -989,7 +989,10 @@ impl<'a> Db<'a> for MysqlDb {
#[cfg(test)]
fn create_collection(&self, name: String) -> DbFuture<'_, i32> {
let db = self.clone();
Box::pin(block(move || db.create_collection(&name).map_err(Into::into)).map_err(Into::into))
Box::pin(
block(move || db.get_or_create_collection_id(&name).map_err(Into::into))
.map_err(Into::into),
)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/db/mysql/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn static_collection_id() -> Result<()> {
assert_eq!(result, *id);
}

let cid = db.create_collection("col1")?;
let cid = db.get_or_create_collection_id("col1")?;
assert!(cid >= 100);
Ok(())
}
32 changes: 20 additions & 12 deletions src/db/spanner/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,23 @@ impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
type Error = grpcio::Error;

async fn connect(&self) -> Result<Self::Connection, Self::Error> {
// Requires GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
let creds = ChannelCredentials::google_default_credentials()?;

// Create a Spanner client.
let chan = ChannelBuilder::new(self.env.clone())
.max_send_message_len(100 << 20)
.max_receive_message_len(100 << 20)
.secure_connect(SPANNER_ADDRESS, creds);
let chan = {
// Requires
// GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
// XXX: issue732: Could google_default_credentials (or
// ChannelBuilder::secure_connect) block?!
let creds = ChannelCredentials::google_default_credentials()?;

// Create a Spanner client.
ChannelBuilder::new(self.env.clone())
.max_send_message_len(100 << 20)
.max_receive_message_len(100 << 20)
.secure_connect(SPANNER_ADDRESS, creds)
};
let client = SpannerClient::new(chan);

// Connect to the instance and create a Spanner session.
let session = create_session(&client, &self.database_name)?;
let session = create_session(&client, &self.database_name).await?;

Ok(SpannerSession {
client,
Expand All @@ -100,7 +105,7 @@ impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
grpcio::Error::RpcFailure(ref status)
if status.status == grpcio::RpcStatusCode::NOT_FOUND =>
{
conn.session = create_session(&conn.client, &self.database_name)?;
conn.session = create_session(&conn.client, &self.database_name).await?;
}
_ => return Err(e),
}
Expand All @@ -113,12 +118,15 @@ impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
}
}

fn create_session(client: &SpannerClient, database_name: &str) -> Result<Session, grpcio::Error> {
async fn create_session(
client: &SpannerClient,
database_name: &str,
) -> Result<Session, grpcio::Error> {
let mut req = CreateSessionRequest::new();
req.database = database_name.to_owned();
let mut meta = MetadataBuilder::new();
meta.add_str("google-cloud-resource-prefix", database_name)?;
meta.add_str("x-goog-api-client", "gcp-grpc-rs")?;
let opt = CallOption::default().headers(meta.build());
client.create_session_opt(&req, opt)
client.create_session_async_opt(&req, opt)?.await
}

0 comments on commit 75d49a7

Please sign in to comment.