From 4fba06bbb3491333c8c436a3d7c34e70632882b6 Mon Sep 17 00:00:00 2001 From: Jingcheng Yang Date: Wed, 13 Mar 2024 16:57:49 -0400 Subject: [PATCH] [Fix Bug] Improve the performance of the importer. --- README.md | 33 ++++-- src/bin/biomedgps-cli.rs | 85 ++++++++++++++-- src/lib.rs | 19 ++-- src/model/graph.rs | 24 ++--- src/model/init_db.rs | 210 ++++++++++++++++++++++++++------------- 5 files changed, 262 insertions(+), 109 deletions(-) diff --git a/README.md b/README.md index eccb32c..8185fe2 100644 --- a/README.md +++ b/README.md @@ -64,39 +64,50 @@ docker-compose up -d `Step 3`: Init Database, Check and Import Data +- Init database + ```bash -# Init database export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli initdb -# Check and import data, -t is the table name, -f is the data file path, -D is the delete flag -# Import entity data +## Check and import data, -t is the table name, -f is the data file path, -D is the delete flag +## Import entity data export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity.tsv -t entity -D -# Import relation data +## Import relation data export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/relation.tsv -t relation -D --dataset drkg -# Generate metadata for entity +## Generate metadata for entity export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity.tsv -t entity_metadata -D -# Generate metadata for relation +## Generate metadata for relation export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/relation_types.tsv -t relation_metadata -D -# Import entity2d data +## Import entity2d data export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity2d.tsv -t entity2d -D -# Import entity embeddings +## Import entity embeddings export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity_embeddings.tsv -t entity_embedding -D -# Import relation embeddings +## Import relation embeddings export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/relation_embeddings.tsv -t relation_embedding -D +``` -# Import entity data to graph database +- Init Graph Database + +```bash +## Import entity data to graph database export NEO4J_URL=neo4j://neo4j:password@localhost:7687 && biomedgps-cli importgraph -f /data/entities.tsv -t entity -b 1000 -# Import relation data to graph database +## Import relation data to graph database export NEO4J_URL=neo4j://neo4j:password@localhost:7687 && biomedgps-cli importgraph -f /data/relations.tsv -t relation -b 1000 ``` +- Make several cache tables for performance + +```bash +export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && export NEO4J_URL=neo4j://neo4j:password@localhost:7687 && biomedgps-cli cachetable --table knowledge-score -T biomedgps +``` + `Step 4`: Launch the platform, see more details on usage [here](#usage). ```bash diff --git a/src/bin/biomedgps-cli.rs b/src/bin/biomedgps-cli.rs index aefca11..e1c9f8f 100644 --- a/src/bin/biomedgps-cli.rs +++ b/src/bin/biomedgps-cli.rs @@ -3,7 +3,7 @@ extern crate log; use biomedgps::model::init_db::create_kg_score_table; use biomedgps::model::kge::{init_kge_models, DEFAULT_MODEL_NAME}; use biomedgps::model::{ - init_db::{create_score_table, kg_score_table2graphdb}, + init_db::{create_score_table, get_kg_score_table_name, kg_score_table2graphdb}, util::read_annotation_file, }; use biomedgps::{ @@ -11,6 +11,8 @@ use biomedgps::{ run_migrations, }; use log::*; +use regex::Regex; +use sqlx::Row; use std::path::PathBuf; use std::sync::Arc; use structopt::StructOpt; @@ -32,14 +34,16 @@ struct Opt { enum SubCommands { #[structopt(name = "initdb")] InitDB(InitDbArguments), - #[structopt(name = "inittable")] - InitTable(InitTableArguments), + #[structopt(name = "cachetable")] + CacheTable(CacheTableArguments), #[structopt(name = "importdb")] ImportDB(ImportDBArguments), #[structopt(name = "importgraph")] ImportGraph(ImportGraphArguments), #[structopt(name = "importkge")] ImportKGE(ImportKGEArguments), + #[structopt(name = "cleandb")] + CleanDB(CleanDBArguments), } /// Init database. @@ -51,6 +55,19 @@ pub struct InitDbArguments { database_url: Option, } +/// Clean database +#[derive(StructOpt, PartialEq, Debug)] +#[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - cleandb", author="Jingcheng Yang ")] +pub struct CleanDBArguments { + /// Database url, such as postgres://postgres:postgres@localhost:5432/rnmpdb or neo4j://:@localhost:7687, if not set, use the value of environment variable DATABASE_URL or NEO4J_URL. + #[structopt(name = "database_url", short = "d", long = "database-url")] + database_url: Option, + + /// Which table to clean. e.g. entity, relation, entity_metadata, relation_metadata, knowledge_curation, subgraph, entity2d, compound-disease-symptom, knowledge-score, embedding, graph etc. + #[structopt(name = "table", short = "t", long = "table")] + table: String, +} + /// Import data files into database. #[derive(StructOpt, PartialEq, Debug)] #[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - importdb", author="Jingcheng Yang ")] @@ -108,14 +125,18 @@ pub struct ImportDBArguments { show_all_errors: bool, } -/// Init tables for performance. You must run this command after the importdb command. +/// Cache tables for performance. You must run this command after the importdb command. #[derive(StructOpt, PartialEq, Debug)] -#[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - inittable", author="Jingcheng Yang ")] -pub struct InitTableArguments { +#[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - cachetable", author="Jingcheng Yang ")] +pub struct CacheTableArguments { /// [Required] Database url, such as postgres://postgres:postgres@localhost:5432/rnmpdb, if not set, use the value of environment variable DATABASE_URL. #[structopt(name = "database_url", short = "d", long = "database-url")] database_url: Option, + /// [Optional] Database host, such as postgres-ml:5432. Only needed when you run your application in a docker container and the database is in another container. + #[structopt(name = "db_host", short = "D", long = "db-host")] + db_host: Option, + /// [Optional] Database url, such as neo4j://:@localhost:7687, if not set, use the value of environment variable NEO4J_URL. #[structopt(name = "neo4j_url", short = "n", long = "neo4j-url")] neo4j_url: Option, @@ -136,6 +157,15 @@ pub struct InitTableArguments { default_value = DEFAULT_MODEL_NAME )] table_prefix: String, + + /// [Optional] The batch size for caching table. + #[structopt( + name = "batch_size", + short = "b", + long = "batch-size", + default_value = "10000" + )] + batch_size: usize, } /// Import data files into a graph database. @@ -290,7 +320,7 @@ async fn main() { Err(e) => error!("Init database failed: {}", e), } } - SubCommands::InitTable(arguments) => { + SubCommands::CacheTable(arguments) => { let database_url = arguments.database_url; let database_url = if database_url.is_none() { @@ -390,9 +420,43 @@ async fn main() { error!("{}", "NEO4J_URL is not set, skip to import kg score table to graph database."); std::process::exit(0); } else { + let table_prefix = &arguments.table_prefix; + let table_name = get_kg_score_table_name(table_prefix); + let total = match sqlx::query(&format!( + "SELECT count(*) FROM {}", + table_name + )) + .fetch_one(&pool) + .await + { + Ok(row) => row.get::("count"), + Err(e) => { + error!( + "Failed to get the total number of the records in the score table: {}", + e + ); + std::process::exit(1); + } + }; + // Use the regex to replace the database host and port. + let re = Regex::new(r"(.*//.*?@)[^/]*(/.*)").unwrap(); + let database_url = if arguments.db_host.is_none() { + database_url + } else { + let caps = re.captures(&database_url).unwrap(); + let db_host = arguments.db_host.unwrap(); + format!("{}{}{}", &caps[1], db_host, &caps[2]) + }; let graph = Arc::new(connect_graph_db(&neo4j_url).await); - match kg_score_table2graphdb(&pool, &graph, Some(&arguments.table_prefix)) - .await + match kg_score_table2graphdb( + &database_url, + &graph, + Some(table_prefix), + total as usize, + arguments.batch_size, + false, + ) + .await { Ok(_) => { info!("Import kg score table to graph database successfully.") @@ -572,5 +636,8 @@ async fn main() { ) .await } + SubCommands::CleanDB(arguments) => { + info!("To be implemented.") + } } } diff --git a/src/lib.rs b/src/lib.rs index 501d850..d00c7ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,10 +30,9 @@ use std::os::unix::fs::PermissionsExt; use std::vec; use crate::model::core::{ - CheckData, Entity, Entity2D, KnowledgeCuration, Relation, RelationMetadata, - Subgraph, + CheckData, Entity, Entity2D, KnowledgeCuration, Relation, RelationMetadata, Subgraph, }; -use crate::model::graph::Node; +use crate::model::graph::{Edge, Node}; use crate::model::kge::{EntityEmbedding, LegacyRelationEmbedding, RelationEmbedding}; use crate::model::util::{ drop_records, drop_table, get_delimiter, import_file_in_loop, show_errors, @@ -269,15 +268,15 @@ pub async fn prepare_relation_queries( let query_string = if check_exist { format!( "MATCH (e1:{} {{idx: $source_idx}}) - MATCH (e2:{} {{idx: $target_idx}}) - MERGE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset}}]->(e2)", + MATCH (e2:{} {{idx: $target_idx}}) + MERGE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset, idx: $relation_idx}}]->(e2)", record.source_type, record.target_type, label ) } else { format!( "MATCH (e1:{} {{idx: $source_idx}}) - MATCH (e2:{} {{idx: $target_idx}}) - CREATE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset}}]->(e2)", + MATCH (e2:{} {{idx: $target_idx}}) + CREATE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset, idx: $relation_idx}}]->(e2)", record.source_type, record.target_type, label ) }; @@ -294,7 +293,11 @@ pub async fn prepare_relation_queries( .param("pmids", pmids) .param("resource", record.resource) .param("key_sentence", key_sentence) - .param("dataset", dataset); + .param("dataset", dataset) + .param( + "relation_idx", + Edge::format_id(&record.source_id, &label, &record.target_id), + ); queries.push(query); } diff --git a/src/model/graph.rs b/src/model/graph.rs index 9999605..17b289c 100644 --- a/src/model/graph.rs +++ b/src/model/graph.rs @@ -518,6 +518,10 @@ pub struct Edge { } impl Edge { + pub fn format_id(source_id: &str, relation_type: &str, target_id: &str) -> String { + format!("{}-{}-{}", source_id, relation_type, target_id) + } + /// Create a new edge. pub fn new( relation_type: &str, @@ -527,7 +531,7 @@ impl Edge { target_type: &str, distance: Option, ) -> Self { - let relid = format!("{}-{}-{}", source_id, relation_type, target_id); + let relid = Edge::format_id(source_id, relation_type, target_id); Edge { relid: relid.clone(), @@ -554,10 +558,7 @@ impl Edge { /// Create a new edge from an EdgeData struct. pub fn from_edge_data(edge: &EdgeData) -> Self { Edge { - relid: format!( - "{}-{}-{}", - edge.source_id, edge.relation_type, edge.target_id - ), + relid: Edge::format_id(&edge.source_id, &edge.relation_type, &edge.target_id), source: Node::format_id(&edge.source_type, &edge.source_id), category: "edge".to_string(), target: Node::format_id(&edge.target_type, &edge.target_id), @@ -569,10 +570,7 @@ impl Edge { /// It will convert the [`Relation`](struct.Relation.html) struct to the [`Edge`](struct.Edge.html) struct. pub fn from_relation(relation: &Relation) -> Self { - let relid = format!( - "{}-{}-{}", - relation.source_id, relation.relation_type, relation.target_id - ); + let relid = Edge::format_id(&relation.source_id, &relation.relation_type, &relation.target_id); Edge { relid: relid.clone(), source: Node::format_id(&relation.source_type, &relation.source_id), @@ -585,10 +583,12 @@ impl Edge { } pub fn from_curated_knowledge(knowledge: &KnowledgeCuration) -> Self { - let relid = format!( - "{}-{}-{}", - knowledge.source_id, knowledge.relation_type, knowledge.target_id + let relid = Edge::format_id( + &knowledge.source_id, + &knowledge.relation_type, + &knowledge.target_id, ); + Edge { relid: relid.clone(), source: Node::format_id(&knowledge.source_type, &knowledge.source_id), diff --git a/src/model/init_db.rs b/src/model/init_db.rs index 9341893..76dff69 100644 --- a/src/model/init_db.rs +++ b/src/model/init_db.rs @@ -1,14 +1,13 @@ //! SQL initialization strings for creating tables. -use crate::model::core::Relation; use crate::model::kge::{ get_embedding_metadata, get_entity_emb_table_name, get_relation_emb_table_name, EmbeddingMetadata, DEFAULT_MODEL_NAME, }; use crate::model::util::ValidationError; -use futures::stream::{FuturesUnordered, StreamExt}; -use log::{debug, error, info}; -use neo4rs::{query, Graph}; +use anyhow::anyhow; +use log::{debug, error, info, warn}; +use neo4rs::{query, Graph, Node as NeoNode}; use sqlx::PgPool; use std::sync::Arc; @@ -528,87 +527,160 @@ fn get_score_attr_name(table_prefix: &str) -> String { format!("{}_score", table_prefix) } +/// Convert the score table of the triple entity to the graph database. +/// +/// # Arguments +/// * `jdbc_url` - The JDBC URL of the database. +/// * `graphdb` - The graph database. +/// * `table_prefix` - Optional prefix for the table name. If not provided, the default model name will be used. +/// * `total` - The total number of the records in the score table. +/// * `batch_size` - The batch size for the iteration. +/// * `only_score` - If true, only the score will be set for the relation, otherwise the resource, dataset, pmids and key_sentence will also be set. If you want to update all the attributes, you need to set it to false. Otherwise, we assume that you have an idx attribute for the relation. pub async fn kg_score_table2graphdb( - pool: &PgPool, + database_url: &str, graphdb: &Arc, table_prefix: Option<&str>, -) -> Result<(), ValidationError> { + total: usize, + batch_size: usize, + only_score: bool, +) -> Result<(), anyhow::Error> { + let jdbc_url = database_url.replace("postgres://", "jdbc:postgresql://"); let table_prefix = table_prefix.unwrap_or(DEFAULT_MODEL_NAME); let score_table_name = get_kg_score_table_name(table_prefix); - let score_attr_name = get_score_attr_name(table_prefix); + info!("jdbc_url: {}", jdbc_url); - let query_str = format!( - r#" - SELECT - id, - source_id, - source_type, - target_id, - target_type, - relation_type, - formatted_relation_type, - key_sentence, - resource, - dataset, - pmids, - score - FROM {score_table}; - "#, - score_table = score_table_name, - ); - - let rows = match sqlx::query_as::<_, Relation>(&query_str) - .fetch_all(pool) - .await - { - Ok(rows) => rows, - Err(e) => { - error!("Failed to get the rows from the score table: {}", e); - return Err(ValidationError::new( - &format!("Failed to get the rows from the score table: {}", e), - vec![], - )); - } - }; + info!("Need to convert {} records to the graph database", total); + let batch = total as usize / batch_size; - for row in rows { - let query_str = format!( - r#" - CALL apoc.periodic.iterate( - "MATCH (source:{source_type} {{idx: '{source_node_id}'}}) - MATCH (target:{target_type} {{idx: '{target_node_id}'}}) - MATCH (source)-[r:`{relation_type}`]->(target) - RETURN r", - "SET r.{score_attr_name} = {score}", - {{batchSize:1000, parallel: true, iterateList: true, retries: 3}} - ); - "#, - source_type = row.source_type, - source_node_id = format!("{}::{}", row.source_type, row.source_id), - target_type = row.target_type, - target_node_id = format!("{}::{}", row.target_type, row.target_id), - relation_type = row.relation_type, - score_attr_name = score_attr_name, - score = row.score.unwrap_or(0.0), + for i in 0..batch { + info!( + "Run the batch: {}/{}, each batch has {} records", + i, batch, batch_size ); + let offset = i * batch_size; + // https://github.com/pgjdbc/pgjdbc + let query_str = if !only_score { + format!( + r#" + CALL apoc.periodic.iterate( + 'CALL apoc.load.jdbc( + "{jdbc_url}", + "SELECT id, source_id, source_type, target_id, target_type, relation_type, + formatted_relation_type, key_sentence, resource, dataset, pmids, score, + COALESCE(source_type, \'\') || \'::\' || COALESCE(source_id, \'\') AS source_node_id, + COALESCE(target_type, \'\') || \'::\' || COALESCE(target_id, \'\') AS target_node_id, + COALESCE(source_id, \'\') || \'-\' || COALESCE(relation_type, \'\') || \'-\' || COALESCE(target_id, \'\') AS idx + FROM {score_table} LIMIT {limit} OFFSET {offset}") YIELD row RETURN row', + 'WITH row + CALL apoc.merge.node([row.source_type], {{ idx: row.source_node_id }}) YIELD node as source + CALL apoc.merge.node([row.target_type], {{ idx: row.target_node_id }}) YIELD node as target + CALL apoc.merge.relationship(source, row.relation_type, {{}}, {{}}, target, {{}}) YIELD rel + SET rel.{score_attr_name} = row.score, + rel.idx = row.idx, + rel.resource = row.resource, + rel.dataset = row.dataset, + rel.pmids = row.pmids, + rel.key_sentence = row.key_sentence', + {{batchSize: {batch_size}, parallel: true, iterateList: true, retries: 0}} + ) + YIELD batches, total, errorMessages, failedOperations + RETURN batches, total, errorMessages, failedOperations + "#, + limit = batch_size * 5, + offset = offset, + jdbc_url = jdbc_url, + score_table = score_table_name, + score_attr_name = score_attr_name, + batch_size = batch_size, + ) + } else { + format!( + r#" + CALL apoc.periodic.iterate( + 'CALL apoc.load.jdbc( + "{jdbc_url}", + "SELECT id, source_id, source_type, target_id, target_type, relation_type, formatted_relation_type, + key_sentence, resource, dataset, pmids, score, + COALESCE(source_id, \'\') || \'-\' || COALESCE(relation_type, \'\') || \'-\' || COALESCE(target_id, \'\') AS idx + FROM {score_table} LIMIT {limit} OFFSET {offset}") + YIELD row RETURN row', + 'WITH row + MATCH ()-[r]->() WHERE r.idx = row.idx + SET r.{score_attr_name} = row.score + {{batchSize: {batch_size}, parallel: true, iterateList: true, retries: 0}} + ) + YIELD batches, total, errorMessages, failedOperations + RETURN batches, total, errorMessages, failedOperations + "#, + limit = batch_size * 5, + offset = offset, + jdbc_url = jdbc_url, + score_table = score_table_name, + score_attr_name = score_attr_name, + batch_size = batch_size, + ) + }; + + info!("query_str: {}", query_str); + + let err_msg = "if you encounter a connection error and you are using the docker container, please try to set the --db-host to the hostname:port of your database docker container."; match graphdb.execute(query(&query_str)).await { - Ok(_) => { - debug!( - "The score is set successfully for the relation: {}", - row.relation_type + Ok(mut result) => { + // Extract the batches, total and errorMessages from the result + while let Some(row) = result.next().await? { + info!("row: {:?}", row); + let batches = match row.get("batches") { + Some(b) => b, + None => 0, + }; + let total = match row.get("total") { + Some(t) => t, + None => 0, + }; + let failed_operations = match row.get::("failedOperations") { + Some(f) => f, + None => continue, + }; + let msg = match row.get::("errorMessages") { + Some(e) => e, + None => continue, + }; + + if batches == 0 && total == 0 { + warn!("The score table is empty."); + return Ok(()); + } + + if failed_operations > 0 { + error!( + "The score table is not empty, but the number of failed operations is {} out of {}. The error message: {:?}", + failed_operations, total, msg + ); + + return Err( + anyhow!( + "The score table is not empty, but the number of failed operations is {} out of {}. The error message: {:?}", + failed_operations, total, msg + ) + ); + } + } + info!( + "The score table {} is converted to the graph database successfully", + score_table_name ); } Err(e) => { - error!("Failed to set the score for the relation: {}", e); - return Err(ValidationError::new( - &format!("Failed to set the score for the relation: {}", e), - vec![], - )); + error!( + "Failed to set the score for the relation: {}, {}", + e, err_msg + ); + return Err(anyhow::Error::new(e)); } } - }; + } info!("The score table is converted to the graph database successfully");