Skip to content

Commit

Permalink
Use psql copy to upsert all rows at once in case of truncate = true (#…
Browse files Browse the repository at this point in the history
…9060)

* Use psql copy to upsert all rows at once in case of truncate = true

* Review fdbk + only use copy if nb rows > 1024
  • Loading branch information
Fraggle authored Dec 2, 2024
1 parent 41b30bc commit 8985b34
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 31 deletions.
29 changes: 15 additions & 14 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
},
"eslint.run": "onSave",
"typescript.preferences.importModuleSpecifier": "non-relative",
"github-actions.workflows.pinned.workflows": []
,
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.formatOnSave": true,
"[javascript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[typescript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[typescriptreact]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
}

"github-actions.workflows.pinned.workflows": [],
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.formatOnSave": true,
"[javascript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[typescript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[typescriptreact]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[rust]": {
"editor.defaultFormatter": "rust-lang.rust-analyzer"
}
}
93 changes: 76 additions & 17 deletions core/src/databases_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use anyhow::Result;
use async_trait::async_trait;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use futures::SinkExt;
use serde_json::Value;
use std::io::Cursor;
use tokio_postgres::{types::ToSql, NoTls};
use tracing::info;

Expand Down Expand Up @@ -182,33 +184,90 @@ impl DatabasesStore for PostgresDatabasesStore {
}
}

let stmt = c
.prepare(
"INSERT INTO tables_rows
(table_id, row_id, created, content)
SELECT * FROM UNNEST($1::text[], $2::text[], $3::bigint[], $4::text[])
ON CONFLICT (table_id, row_id) DO UPDATE
SET content = EXCLUDED.content",
)
// For now, only do it if we are inserting more than 1024 rows
if truncate && rows.len() > 1024 {
// Start COPY operation directly into the target table
let mut sink = c
.copy_in("COPY tables_rows (table_id, row_id, created, content) FROM STDIN WITH (FORMAT text)")
.await?;

for chunk in rows.chunks(1024) {
let now = utils::now() as i64;

let table_ids: Vec<&str> = vec![table_id; chunk.len()];
let row_ids: Vec<&str> = chunk.iter().map(|r| r.row_id()).collect();
let createds: Vec<i64> = vec![now; chunk.len()];
let contents: Vec<String> = chunk.iter().map(|r| r.content().to_string()).collect();
// Create a single buffer for all the data
let mut buffer = Vec::new();

for row in rows {
// Escape special characters in content
let escaped_content = row
.content()
.to_string()
// Postgresql [doc](https://www.postgresql.org/docs/current/sql-copy.html)
// Backslash characters (\) can be used in the COPY data to quote data characters that might otherwise be taken as row or column delimiters.
// In particular, the following characters must be preceded by a backslash if they appear as part of a column value:
// the backslash itself, newline, carriage return, and the current delimiter character.
.replace('\\', "\\\\")
.replace('\n', "\\n")
.replace('\r', "\\r")
.replace('\t', "\\t");

// Format: table_id, row_id, created, content
let line = format!(
"{}\t{}\t{}\t{}\n",
table_id,
row.row_id(),
now,
&escaped_content
);

c.execute(&stmt, &[&table_ids, &row_ids, &createds, &contents])
.await?;
buffer.extend_from_slice(line.as_bytes());
}

// Send all data at once
let mut pinned_sink = std::pin::pin!(sink);
pinned_sink.send(Cursor::new(buffer)).await?;

// Close the sink
let rows_count = pinned_sink.finish().await?;

if rows_count != rows.len() as u64 {
return Err(anyhow::anyhow!("Failed to insert all rows"));
}

info!(
duration = utils::now() - now as u64,
table_id,
inserted_rows = chunk.len(),
"DSSTRUCTSTAT [upsert_rows] insertion batch"
inserted_rows = rows_count,
"DSSTRUCTSTAT [upsert_rows] insertion batch (COPY)"
);
} else {
let stmt = c
.prepare(
"INSERT INTO tables_rows
(table_id, row_id, created, content)
SELECT * FROM UNNEST($1::text[], $2::text[], $3::bigint[], $4::text[])
ON CONFLICT (table_id, row_id) DO UPDATE
SET content = EXCLUDED.content",
)
.await?;

for chunk in rows.chunks(1024) {
let now = utils::now() as i64;

let table_ids: Vec<&str> = vec![table_id; chunk.len()];
let row_ids: Vec<&str> = chunk.iter().map(|r| r.row_id()).collect();
let createds: Vec<i64> = vec![now; chunk.len()];
let contents: Vec<String> = chunk.iter().map(|r| r.content().to_string()).collect();

c.execute(&stmt, &[&table_ids, &row_ids, &createds, &contents])
.await?;

info!(
duration = utils::now() - now as u64,
table_id,
inserted_rows = chunk.len(),
"DSSTRUCTSTAT [upsert_rows] insertion batch (INSERT...ON CONFLICT)"
);
}
}

Ok(())
Expand Down
11 changes: 11 additions & 0 deletions front/lib/api/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ export async function rowsFromCsv({
CsvParsingError
>
> {
const now = performance.now();
const delimiter = await guessDelimiter(csv);
if (!delimiter) {
return new Err({
Expand Down Expand Up @@ -497,6 +498,16 @@ export async function rowsFromCsv({
rows.push({ row_id: rowId, value: record });
}

logger.info(
{
durationMs: performance.now() - now,
nbRows,
nbCols: header.length,
workspaceId: auth.getNonNullableWorkspace().id,
},
"Parsing CSV"
);

return new Ok({ detectedHeaders: { header, rowIndex }, rows });
}

Expand Down

0 comments on commit 8985b34

Please sign in to comment.