Skip to content

Commit

Permalink
fix(tables): col names must be case-insensitive + fix truncate merge …
Browse files Browse the repository at this point in the history
…schema (#3388)

Co-authored-by: Henry Fontanier <henry@dust.tt>
  • Loading branch information
fontanierh and Henry Fontanier authored Jan 23, 2024
1 parent 23b68e2 commit 31d7540
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 9 deletions.
38 changes: 31 additions & 7 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,39 @@ impl Table {
rows: &Vec<Row>,
truncate: bool,
) -> Result<()> {
// Validate the tables schema, merge it if necessary and store it in the schema cache.
let table_schema = match self.schema() {
// If there is no existing schema cache, simply use the new schema.
None => TableSchema::from_rows(&rows)?,
Some(existing_table_schema) => {
// If there is an existing schema cache, merge it with the new schema.
existing_table_schema.merge(&TableSchema::from_rows(&rows)?)?
// Validate that all rows keys are lowercase.
for (row_index, row) in rows.iter().enumerate() {
let object = match row.value().as_object() {
Some(object) => object,
None => Err(anyhow!("Row {} is not an object", row_index,))?,
};
match object.keys().find(|key| match key.chars().next() {
Some(c) => !c.is_ascii_lowercase(),
None => false,
}) {
Some(key) => Err(anyhow!(
"Row {} has a key '{}' that is not lowercase",
row_index,
key
))?,
None => (),
}
}

// Validate the tables schema, merge it if necessary and store it in the schema cache.
let table_schema = match truncate {
// If the new rows replace existing ones, we need to clear the schema cache.
true => TableSchema::from_rows(&rows)?,
false => match self.schema() {
// If there is no existing schema cache, simply use the new schema.
None => TableSchema::from_rows(&rows)?,
Some(existing_table_schema) => {
// If there is an existing schema cache, merge it with the new schema.
existing_table_schema.merge(&TableSchema::from_rows(&rows)?)?
}
},
};

store
.update_table_schema(
&self.project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,31 @@ async function handler(
status_code: 400,
});
}
const { rows: rowsToUpsert, truncate } = bodyValidation.right;

const { truncate } = bodyValidation.right;
let { rows: rowsToUpsert } = bodyValidation.right;

// Make every key in the rows lowercase and ensure there are no duplicates.
const allKeys = rowsToUpsert.map((row) => Object.keys(row.value)).flat();
const keysSet = new Set<string>();
for (const key of allKeys) {
if (keysSet.has(key)) {
return apiError(req, res, {
status_code: 400,
api_error: {
type: "invalid_request_error",
message: `Duplicate key: ${key}`,
},
});
}
keysSet.add(key);
}
rowsToUpsert = rowsToUpsert.map((row) => {
const value: Record<string, string | number | boolean | null> = {};
for (const [key, val] of Object.entries(row.value)) {
value[key.toLowerCase()] = val;
}
return { row_id: row.row_id, value };
});
const upsertRes = await coreAPI.upsertTableRows({
projectId: dataSource.dustAPIProjectId,
dataSourceName: dataSource.name,
Expand Down
12 changes: 12 additions & 0 deletions front/pages/api/w/[wId]/data_sources/[name]/tables/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ async function rowsFromCsv(
continue;
}

header = header.map((h) => h.trim().toLowerCase());
const headerSet = new Set<string>();
for (const h of header) {
if (headerSet.has(h)) {
return new Err({
type: "invalid_request_error",
message: `Duplicate header: ${h}.`,
});
}
headerSet.add(h);
}

for (const [i, h] of header.entries()) {
const col = record[i];
if (col === undefined) {
Expand Down

0 comments on commit 31d7540

Please sign in to comment.