Skip to content

Commit

Permalink
chore: Update vendored sources to duckdb/duckdb@74c9f4d (#343)
Browse files Browse the repository at this point in the history
Merge pull request duckdb/duckdb#13712 from Mytherin/checkpointdeadlock
Merge pull request duckdb/duckdb#13710 from lnkuiper/tsan_suppress_jemalloc

Co-authored-by: krlmlr <krlmlr@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and krlmlr authored Sep 9, 2024
1 parent de6a361 commit 36c26f7
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 9 deletions.
1 change: 0 additions & 1 deletion src/duckdb/src/function/table/table_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ static unique_ptr<LocalTableFunctionState> TableScanInitLocal(ExecutionContext &
}

unique_ptr<GlobalTableFunctionState> TableScanInitGlobal(ClientContext &context, TableFunctionInitInput &input) {

D_ASSERT(input.bind_data);
auto &bind_data = input.bind_data->Cast<TableScanBindData>();
auto result = make_uniq<TableScanGlobalState>(context, input.bind_data.get());
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "1-dev5127"
#define DUCKDB_PATCH_VERSION "1-dev5133"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 0
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.0.1-dev5127"
#define DUCKDB_VERSION "v1.0.1-dev5133"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "e906114007"
#define DUCKDB_SOURCE_ID "74c9f4df1f"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ struct DataTableInfo {
const vector<IndexStorageInfo> &GetIndexStorageInfo() const {
return index_storage_infos;
}
unique_ptr<StorageLockKey> GetSharedLock() {
return checkpoint_lock.GetSharedLock();
}

string GetSchemaName();
string GetTableName();
Expand Down
13 changes: 11 additions & 2 deletions src/duckdb/src/include/duckdb/storage/table/scan_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ struct TableScanOptions {
bool force_fetch_row = false;
};

class CheckpointLock {
public:
explicit CheckpointLock(unique_ptr<StorageLockKey> lock_p) : lock(std::move(lock_p)) {
}

private:
unique_ptr<StorageLockKey> lock;
};

class TableScanState {
public:
TableScanState();
Expand All @@ -218,7 +227,7 @@ class TableScanState {
//! Options for scanning
TableScanOptions options;
//! Shared lock over the checkpoint to prevent checkpoints while reading
unique_ptr<StorageLockKey> checkpoint_lock;
shared_ptr<CheckpointLock> checkpoint_lock;
//! Filter info
ScanFilterInfo filters;

Expand Down Expand Up @@ -253,7 +262,7 @@ struct ParallelTableScanState {
//! Parallel scan state for the transaction-local state
ParallelCollectionScanState local_state;
//! Shared lock over the checkpoint to prevent checkpoints while reading
unique_ptr<StorageLockKey> checkpoint_lock;
shared_ptr<CheckpointLock> checkpoint_lock;
};

struct PrefetchState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
#include "duckdb/common/reference_map.hpp"

namespace duckdb {
class CheckpointLock;
class RowGroupCollection;
class RowVersionManager;
class DuckTransactionManager;
class StorageLockKey;
class StorageCommitState;
struct DataTableInfo;
struct UndoBufferProperties;

class DuckTransaction : public Transaction {
Expand Down Expand Up @@ -79,6 +81,9 @@ class DuckTransaction : public Transaction {

void UpdateCollection(shared_ptr<RowGroupCollection> &collection);

//! Get a shared lock on a table
shared_ptr<CheckpointLock> SharedLockTable(DataTableInfo &info);

private:
DuckTransactionManager &transaction_manager;
//! The undo buffer is used to store old versions of rows that are updated
Expand All @@ -94,6 +99,10 @@ class DuckTransaction : public Transaction {
reference_map_t<SequenceCatalogEntry, reference<SequenceValue>> sequence_usage;
//! Collections that are updated by this transaction
reference_map_t<RowGroupCollection, shared_ptr<RowGroupCollection>> updated_collections;
//! Lock for the active_locks map
mutex active_locks_lock;
//! Active locks on tables
reference_map_t<DataTableInfo, weak_ptr<CheckpointLock>> active_locks;
};

} // namespace duckdb
12 changes: 9 additions & 3 deletions src/duckdb/src/storage/data_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,21 +225,26 @@ TableIOManager &TableIOManager::Get(DataTable &table) {
//===--------------------------------------------------------------------===//
void DataTable::InitializeScan(TableScanState &state, const vector<column_t> &column_ids,
TableFilterSet *table_filters) {
state.checkpoint_lock = info->checkpoint_lock.GetSharedLock();
if (!state.checkpoint_lock) {
state.checkpoint_lock = make_shared_ptr<CheckpointLock>(info->checkpoint_lock.GetSharedLock());
}
state.Initialize(column_ids, table_filters);
row_groups->InitializeScan(state.table_state, column_ids, table_filters);
}

void DataTable::InitializeScan(DuckTransaction &transaction, TableScanState &state, const vector<column_t> &column_ids,
TableFilterSet *table_filters) {
state.checkpoint_lock = transaction.SharedLockTable(*info);
auto &local_storage = LocalStorage::Get(transaction);
InitializeScan(state, column_ids, table_filters);
local_storage.InitializeScan(*this, state.local_state, table_filters);
}

void DataTable::InitializeScanWithOffset(TableScanState &state, const vector<column_t> &column_ids, idx_t start_row,
idx_t end_row) {
state.checkpoint_lock = info->checkpoint_lock.GetSharedLock();
if (!state.checkpoint_lock) {
state.checkpoint_lock = make_shared_ptr<CheckpointLock>(info->checkpoint_lock.GetSharedLock());
}
state.Initialize(column_ids);
row_groups->InitializeScanWithOffset(state.table_state, column_ids, start_row, end_row);
}
Expand All @@ -255,7 +260,8 @@ idx_t DataTable::MaxThreads(ClientContext &context) {

void DataTable::InitializeParallelScan(ClientContext &context, ParallelTableScanState &state) {
auto &local_storage = LocalStorage::Get(context, db);
state.checkpoint_lock = info->checkpoint_lock.GetSharedLock();
auto &transaction = DuckTransaction::Get(context, db);
state.checkpoint_lock = transaction.SharedLockTable(*info);
row_groups->InitializeParallelScan(state.scan_state);

local_storage.InitializeParallelScan(*this, state.local_state);
Expand Down
23 changes: 23 additions & 0 deletions src/duckdb/src/transaction/duck_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "duckdb/main/client_data.hpp"
#include "duckdb/main/attached_database.hpp"
#include "duckdb/storage/storage_lock.hpp"
#include "duckdb/storage/table/data_table_info.hpp"
#include "duckdb/storage/table/scan_state.hpp"

namespace duckdb {

Expand Down Expand Up @@ -268,4 +270,25 @@ unique_ptr<StorageLockKey> DuckTransaction::TryGetCheckpointLock() {
return transaction_manager.TryUpgradeCheckpointLock(*write_lock);
}

shared_ptr<CheckpointLock> DuckTransaction::SharedLockTable(DataTableInfo &info) {
lock_guard<mutex> l(active_locks_lock);
auto entry = active_locks.find(info);
if (entry != active_locks.end()) {
// found an existing lock
auto lock_weak_ptr = entry->second;
// check if it is expired
auto lock = lock_weak_ptr.lock();
if (lock) {
// not expired - return it
return lock;
}
}
// no existing lock - obtain it
auto table_lock = info.GetSharedLock();
auto checkpoint_lock = make_shared_ptr<CheckpointLock>(std::move(table_lock));
// insert it into the active locks and return it
active_locks.insert(make_pair(std::ref(info), checkpoint_lock));
return checkpoint_lock;
}

} // namespace duckdb

0 comments on commit 36c26f7

Please sign in to comment.