From 8985b345a6b004ab0228eab803686b8b721ba29a Mon Sep 17 00:00:00 2001 From: Sebastien Flory Date: Mon, 2 Dec 2024 18:32:53 +0100 Subject: [PATCH] Use psql copy to upsert all rows at once in case of truncate = true (#9060) * Use psql copy to upsert all rows at once in case of truncate = true * Review fdbk + only use copy if nb rows > 1024 --- .vscode/settings.json | 29 +++++----- core/src/databases_store/store.rs | 93 +++++++++++++++++++++++++------ front/lib/api/tables.ts | 11 ++++ 3 files changed, 102 insertions(+), 31 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 7032987cfe38..5151eeea366c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,18 +8,19 @@ }, "eslint.run": "onSave", "typescript.preferences.importModuleSpecifier": "non-relative", - "github-actions.workflows.pinned.workflows": [] - , - "editor.defaultFormatter": "esbenp.prettier-vscode", - "editor.formatOnSave": true, - "[javascript]": { - "editor.defaultFormatter": "esbenp.prettier-vscode" - }, - "[typescript]": { - "editor.defaultFormatter": "esbenp.prettier-vscode" - }, - "[typescriptreact]": { - "editor.defaultFormatter": "esbenp.prettier-vscode" - } - + "github-actions.workflows.pinned.workflows": [], + "editor.defaultFormatter": "esbenp.prettier-vscode", + "editor.formatOnSave": true, + "[javascript]": { + "editor.defaultFormatter": "esbenp.prettier-vscode" + }, + "[typescript]": { + "editor.defaultFormatter": "esbenp.prettier-vscode" + }, + "[typescriptreact]": { + "editor.defaultFormatter": "esbenp.prettier-vscode" + }, + "[rust]": { + "editor.defaultFormatter": "rust-lang.rust-analyzer" + } } diff --git a/core/src/databases_store/store.rs b/core/src/databases_store/store.rs index 66df339ec2d3..21879d7b6a94 100644 --- a/core/src/databases_store/store.rs +++ b/core/src/databases_store/store.rs @@ -2,7 +2,9 @@ use anyhow::Result; use async_trait::async_trait; use bb8::Pool; use bb8_postgres::PostgresConnectionManager; +use futures::SinkExt; use serde_json::Value; +use std::io::Cursor; use tokio_postgres::{types::ToSql, NoTls}; use tracing::info; @@ -182,33 +184,90 @@ impl DatabasesStore for PostgresDatabasesStore { } } - let stmt = c - .prepare( - "INSERT INTO tables_rows - (table_id, row_id, created, content) - SELECT * FROM UNNEST($1::text[], $2::text[], $3::bigint[], $4::text[]) - ON CONFLICT (table_id, row_id) DO UPDATE - SET content = EXCLUDED.content", - ) + // For now, only do it if we are inserting more than 1024 rows + if truncate && rows.len() > 1024 { + // Start COPY operation directly into the target table + let mut sink = c + .copy_in("COPY tables_rows (table_id, row_id, created, content) FROM STDIN WITH (FORMAT text)") .await?; - for chunk in rows.chunks(1024) { let now = utils::now() as i64; - let table_ids: Vec<&str> = vec![table_id; chunk.len()]; - let row_ids: Vec<&str> = chunk.iter().map(|r| r.row_id()).collect(); - let createds: Vec = vec![now; chunk.len()]; - let contents: Vec = chunk.iter().map(|r| r.content().to_string()).collect(); + // Create a single buffer for all the data + let mut buffer = Vec::new(); + + for row in rows { + // Escape special characters in content + let escaped_content = row + .content() + .to_string() + // Postgresql [doc](https://www.postgresql.org/docs/current/sql-copy.html) + // Backslash characters (\) can be used in the COPY data to quote data characters that might otherwise be taken as row or column delimiters. + // In particular, the following characters must be preceded by a backslash if they appear as part of a column value: + // the backslash itself, newline, carriage return, and the current delimiter character. + .replace('\\', "\\\\") + .replace('\n', "\\n") + .replace('\r', "\\r") + .replace('\t', "\\t"); + + // Format: table_id, row_id, created, content + let line = format!( + "{}\t{}\t{}\t{}\n", + table_id, + row.row_id(), + now, + &escaped_content + ); - c.execute(&stmt, &[&table_ids, &row_ids, &createds, &contents]) - .await?; + buffer.extend_from_slice(line.as_bytes()); + } + + // Send all data at once + let mut pinned_sink = std::pin::pin!(sink); + pinned_sink.send(Cursor::new(buffer)).await?; + + // Close the sink + let rows_count = pinned_sink.finish().await?; + + if rows_count != rows.len() as u64 { + return Err(anyhow::anyhow!("Failed to insert all rows")); + } info!( duration = utils::now() - now as u64, table_id, - inserted_rows = chunk.len(), - "DSSTRUCTSTAT [upsert_rows] insertion batch" + inserted_rows = rows_count, + "DSSTRUCTSTAT [upsert_rows] insertion batch (COPY)" ); + } else { + let stmt = c + .prepare( + "INSERT INTO tables_rows + (table_id, row_id, created, content) + SELECT * FROM UNNEST($1::text[], $2::text[], $3::bigint[], $4::text[]) + ON CONFLICT (table_id, row_id) DO UPDATE + SET content = EXCLUDED.content", + ) + .await?; + + for chunk in rows.chunks(1024) { + let now = utils::now() as i64; + + let table_ids: Vec<&str> = vec![table_id; chunk.len()]; + let row_ids: Vec<&str> = chunk.iter().map(|r| r.row_id()).collect(); + let createds: Vec = vec![now; chunk.len()]; + let contents: Vec = chunk.iter().map(|r| r.content().to_string()).collect(); + + c.execute(&stmt, &[&table_ids, &row_ids, &createds, &contents]) + .await?; + + info!( + duration = utils::now() - now as u64, + table_id, + inserted_rows = chunk.len(), + "DSSTRUCTSTAT [upsert_rows] insertion batch (INSERT...ON CONFLICT)" + ); + } } Ok(()) diff --git a/front/lib/api/tables.ts b/front/lib/api/tables.ts index 8b7063e58adf..b038df33ff0c 100644 --- a/front/lib/api/tables.ts +++ b/front/lib/api/tables.ts @@ -349,6 +349,7 @@ export async function rowsFromCsv({ CsvParsingError > > { + const now = performance.now(); const delimiter = await guessDelimiter(csv); if (!delimiter) { return new Err({ @@ -497,6 +498,16 @@ export async function rowsFromCsv({ rows.push({ row_id: rowId, value: record }); } + logger.info( + { + durationMs: performance.now() - now, + nbRows, + nbCols: header.length, + workspaceId: auth.getNonNullableWorkspace().id, + }, + "Parsing CSV" + ); + return new Ok({ detectedHeaders: { header, rowIndex }, rows }); }