Skip to content

Commit

Permalink
Snapshot livetime control (#12301)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 5, 2024
1 parent 7d9ab3e commit ffa5093
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 56 deletions.
10 changes: 5 additions & 5 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
Controller->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);
Controller->SetExpectedShardsCount(ShardsCount);
Controller->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
Controller->SetOverrideReadTimeoutClean(TDuration::Seconds(1));
Controller->SetOverrideMaxReadStaleness(TDuration::Seconds(1));

Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize();

Expand All @@ -117,7 +117,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
}

void WaitNormalization() {
Controller->SetOverrideReadTimeoutClean(TDuration::Seconds(1));
Controller->SetOverrideMaxReadStaleness(TDuration::Seconds(1));
Controller->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Force);
const auto start = TInstant::Now();
while (!Controller->IsTrivialLinks() && TInstant::Now() - start < TDuration::Seconds(30)) {
Expand All @@ -126,11 +126,11 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
}
AFL_VERIFY(Controller->IsTrivialLinks());
Controller->CheckInvariants();
Controller->SetOverrideReadTimeoutClean(TDuration::Minutes(5));
Controller->SetOverrideMaxReadStaleness(TDuration::Minutes(5));
}

void Execute(const ui64 destinationIdx, const std::vector<ui64>& sourceIdxs, const bool move, const NOlap::TSnapshot& snapshot, const std::set<ui64>& pathIdxs) {
Controller->SetOverrideReadTimeoutClean(TDuration::Seconds(1));
Controller->SetOverrideMaxReadStaleness(TDuration::Seconds(1));
AFL_VERIFY(destinationIdx < ShardIds.size());
const ui64 destination = ShardIds[destinationIdx];
std::vector<ui64> sources;
Expand Down Expand Up @@ -198,7 +198,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
CSTransferStatus->Reset();
AFL_VERIFY(!Controller->IsTrivialLinks());
Controller->CheckInvariants();
Controller->SetOverrideReadTimeoutClean(TDuration::Minutes(5));
Controller->SetOverrideMaxReadStaleness(TDuration::Minutes(5));
}
};
Y_UNIT_TEST(BlobsSharingSplit1_1) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/olap/write_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) {
.ExtractValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
}
csController->SetOverrideReadTimeoutClean(TDuration::Zero());
csController->SetOverrideMaxReadStaleness(TDuration::Zero());
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::GC);
{
const TInstant start = TInstant::Now();
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,16 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
}

void TColumnShard::Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& /*ev*/, const TActorContext& ctx) {
if (auto writeTx =
InFlightReadsTracker.Ping(this, NYDBTest::TControllers::GetColumnShardController()->GetPingCheckPeriod(), TInstant::Now())) {
const TDuration stalenessLivetime = NYDBTest::TControllers::GetColumnShardController()->GetMaxReadStaleness();
const TDuration stalenessInMem = NYDBTest::TControllers::GetColumnShardController()->GetMaxReadStalenessInMem();
const TDuration usedLivetime = NYDBTest::TControllers::GetColumnShardController()->GetUsedSnapshotLivetime();
AFL_VERIFY(usedLivetime < stalenessInMem || (stalenessInMem == usedLivetime && usedLivetime == TDuration::Zero()))("used", usedLivetime)(
"staleness", stalenessInMem);
const TDuration ping = 0.3 * std::min(stalenessInMem - usedLivetime, stalenessLivetime - stalenessInMem);
if (auto writeTx = InFlightReadsTracker.Ping(this, stalenessInMem, usedLivetime, TInstant::Now())) {
Execute(writeTx.release(), ctx);
}
ctx.Schedule(0.3 * GetMaxReadStaleness(), new TEvPrivate::TEvPingSnapshotsUsage());
ctx.Schedule(NYDBTest::TControllers::GetColumnShardController()->GetStalenessLivetimePing(ping), new TEvPrivate::TEvPingSnapshotsUsage());
}

void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) {
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ ui64 TColumnShard::GetOutdatedStep() const {
}

NOlap::TSnapshot TColumnShard::GetMinReadSnapshot() const {
ui64 delayMillisec = GetMaxReadStaleness().MilliSeconds();
ui64 delayMillisec = NYDBTest::TControllers::GetColumnShardController()->GetMaxReadStaleness().MilliSeconds();
ui64 passedStep = GetOutdatedStep();
ui64 minReadStep = (passedStep > delayMillisec ? passedStep - delayMillisec : 0);

Expand Down Expand Up @@ -1600,8 +1600,4 @@ const NKikimr::NColumnShard::NTiers::TManager* TColumnShard::GetTierManagerPoint
return Tiers->GetManagerOptional(tierId);
}

TDuration TColumnShard::GetMaxReadStaleness() {
return NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean();
}

} // namespace NKikimr::NColumnShard
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,6 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
TLimits Limits;
NOlap::TNormalizationController NormalizerController;
NDataShard::TSysLocks SysLocks;
static TDuration GetMaxReadStaleness();

void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
Expand Down
28 changes: 21 additions & 7 deletions ydb/core/tx/columnshard/hooks/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class ICSController {
};

protected:
virtual std::optional<TDuration> DoGetStalenessLivetimePing() const {
return {};
}
virtual void DoOnTabletInitCompleted(const ::NKikimr::NColumnShard::TColumnShard& /*shard*/) {
return;
}
Expand All @@ -85,7 +88,7 @@ class ICSController {
virtual void DoOnDataSharingStarted(const ui64 /*tabletId*/, const TString& /*sessionId*/) {
}

virtual TDuration DoGetPingCheckPeriod(const TDuration defaultValue) const {
virtual TDuration DoGetUsedSnapshotLivetime(const TDuration defaultValue) const {
return defaultValue;
}
virtual TDuration DoGetOverridenGCPeriod(const TDuration defaultValue) const {
Expand All @@ -109,7 +112,7 @@ class ICSController {
virtual ui64 DoGetSmallPortionSizeDetector(const ui64 defaultValue) const {
return defaultValue;
}
virtual TDuration DoGetReadTimeoutClean(const TDuration defaultValue) const {
virtual TDuration DoGetMaxReadStaleness(const TDuration defaultValue) const {
return defaultValue;
}
virtual TDuration DoGetGuaranteeIndexationInterval(const TDuration defaultValue) const {
Expand Down Expand Up @@ -158,9 +161,13 @@ class ICSController {
}
virtual bool CheckPortionForEvict(const NOlap::TPortionInfo& portion) const;

TDuration GetPingCheckPeriod() const {
const TDuration defaultValue = 0.6 * GetReadTimeoutClean();
return DoGetPingCheckPeriod(defaultValue);
TDuration GetStalenessLivetimePing(const TDuration defValue) const {
const auto val = DoGetStalenessLivetimePing();
if (!val || defValue < *val) {
return defValue;
} else {
return *val;
}
}

virtual bool IsBackgroundEnabled(const EBackground /*id*/) const {
Expand Down Expand Up @@ -261,9 +268,16 @@ class ICSController {
}
virtual void OnIndexSelectProcessed(const std::optional<bool> /*result*/) {
}
TDuration GetReadTimeoutClean() const {
TDuration GetMaxReadStaleness() const {
const TDuration defaultValue = TDuration::MilliSeconds(GetConfig().GetMaxReadStaleness_ms());
return DoGetReadTimeoutClean(defaultValue);
return DoGetMaxReadStaleness(defaultValue);
}
TDuration GetMaxReadStalenessInMem() const {
return 0.9 * GetMaxReadStaleness();
}
TDuration GetUsedSnapshotLivetime() const {
const TDuration defaultValue = 0.6 * GetMaxReadStaleness();
return DoGetUsedSnapshotLivetime(defaultValue);
}
virtual EOptimizerCompactionWeightControl GetCompactionControl() const {
return EOptimizerCompactionWeightControl::Force;
Expand Down
17 changes: 10 additions & 7 deletions ydb/core/tx/columnshard/hooks/testing/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace NKikimr::NYDBTest::NColumnShard {
class TController: public TReadOnlyController {
private:
using TBase = TReadOnlyController;
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideRequestsTracePingCheckPeriod);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideUsedSnapshotLivetime);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideStalenessLivetimePing);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideLagForCompactionBeforeTierings);
YDB_ACCESSOR(std::optional<TDuration>, OverrideGuaranteeIndexationInterval, TDuration::Zero());
YDB_ACCESSOR(std::optional<TDuration>, OverridePeriodicWakeupActivationPeriod, std::nullopt);
Expand All @@ -21,7 +22,7 @@ class TController: public TReadOnlyController {
YDB_ACCESSOR(std::optional<TDuration>, OverrideOptimizerFreshnessCheckDuration, TDuration::Zero());
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideCompactionActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideReadTimeoutClean);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideMaxReadStaleness);
YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100);
YDB_ACCESSOR_DEF(std::optional<NKikimrProto::EReplyStatus>, OverrideBlobPutResultOnWriteValue);

Expand Down Expand Up @@ -142,10 +143,12 @@ class TController: public TReadOnlyController {
return OverrideLagForCompactionBeforeTierings.value_or(def);
}

virtual TDuration DoGetPingCheckPeriod(const TDuration def) const override {
return OverrideRequestsTracePingCheckPeriod.value_or(def);
virtual TDuration DoGetUsedSnapshotLivetime(const TDuration def) const override {
return OverrideUsedSnapshotLivetime.value_or(def);
}
virtual std::optional<TDuration> DoGetStalenessLivetimePing() const override {
return OverrideStalenessLivetimePing;
}

virtual TDuration DoGetCompactionActualizationLag(const TDuration def) const override {
return OverrideCompactionActualizationLag.value_or(def);
}
Expand Down Expand Up @@ -180,8 +183,8 @@ class TController: public TReadOnlyController {
virtual TDuration DoGetOptimizerFreshnessCheckDuration(const TDuration defaultValue) const override {
return OverrideOptimizerFreshnessCheckDuration.value_or(defaultValue);
}
virtual TDuration DoGetReadTimeoutClean(const TDuration def) const override {
return OverrideReadTimeoutClean.value_or(def);
virtual TDuration DoGetMaxReadStaleness(const TDuration def) const override {
return OverrideMaxReadStaleness.value_or(def);
}
virtual ui64 DoGetReduceMemoryIntervalLimit(const ui64 def) const override {
return OverrideReduceMemoryIntervalLimit.value_or(def);
Expand Down
28 changes: 14 additions & 14 deletions ydb/core/tx/columnshard/inflight_request_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ NOlap::NReader::TReadMetadataBase::TConstPtr TInFlightReadsTracker::ExtractInFli
{
auto it = SnapshotsLive.find(readMetaBase->GetRequestSnapshot());
AFL_VERIFY(it != SnapshotsLive.end());
if (it->second.DelRequest(cookie, now)) {
SnapshotsLive.erase(it);
}
Y_UNUSED(it->second.DelRequest(cookie, now));
}

if (NOlap::NReader::NPlain::TReadMetadata::TConstPtr readMeta =
Expand Down Expand Up @@ -93,27 +91,29 @@ class TTransactionSavePersistentSnapshots: public NOlap::NDataSharing::TExtended
} // namespace

std::unique_ptr<NTabletFlatExecutor::ITransaction> TInFlightReadsTracker::Ping(
TColumnShard* self, const TDuration critDuration, const TInstant now) {
TColumnShard* self, const TDuration stalenessInMem, const TDuration usedSnapshotLivetime, const TInstant now) {
std::set<NOlap::TSnapshot> snapshotsToSave;
std::set<NOlap::TSnapshot> snapshotsToFree;
std::set<NOlap::TSnapshot> snapshotsToFreeInDB;
std::set<NOlap::TSnapshot> snapshotsToFreeInMem;
for (auto&& i : SnapshotsLive) {
if (i.second.Ping(critDuration, now)) {
if (i.second.IsExpired(usedSnapshotLivetime, now)) {
if (i.second.GetIsLock()) {
Counters->OnSnapshotLocked();
snapshotsToSave.emplace(i.first);
} else {
Counters->OnSnapshotUnlocked();
snapshotsToFree.emplace(i.first);
snapshotsToFreeInDB.emplace(i.first);
}
snapshotsToFreeInMem.emplace(i.first);
} else if (i.second.CheckToLock(stalenessInMem, usedSnapshotLivetime, now)) {
Counters->OnSnapshotLocked();
snapshotsToSave.emplace(i.first);
}
}
for (auto&& i : snapshotsToFree) {
for (auto&& i : snapshotsToFreeInMem) {
SnapshotsLive.erase(i);
}
Counters->OnSnapshotsInfo(SnapshotsLive.size(), GetSnapshotToClean());
if (snapshotsToFree.size() || snapshotsToSave.size()) {
NYDBTest::TControllers::GetColumnShardController()->OnRequestTracingChanges(snapshotsToSave, snapshotsToFree);
return std::make_unique<TTransactionSavePersistentSnapshots>(self, std::move(snapshotsToSave), std::move(snapshotsToFree));
if (snapshotsToFreeInDB.size() || snapshotsToSave.size()) {
NYDBTest::TControllers::GetColumnShardController()->OnRequestTracingChanges(snapshotsToSave, snapshotsToFreeInMem);
return std::make_unique<TTransactionSavePersistentSnapshots>(self, std::move(snapshotsToSave), std::move(snapshotsToFreeInDB));
} else {
return nullptr;
}
Expand Down
32 changes: 21 additions & 11 deletions ydb/core/tx/columnshard/inflight_request_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ using NOlap::IBlobInUseTracker;
class TSnapshotLiveInfo {
private:
const NOlap::TSnapshot Snapshot;
std::optional<TInstant> LastPingInstant;
std::optional<TInstant> LastRequestFinishedInstant;
THashSet<ui32> Requests;
YDB_READONLY(bool, IsLock, false);
Expand Down Expand Up @@ -48,22 +47,32 @@ class TSnapshotLiveInfo {

static TSnapshotLiveInfo BuildFromDatabase(const NOlap::TSnapshot& reqSnapshot) {
TSnapshotLiveInfo result(reqSnapshot);
result.LastPingInstant = TInstant::Now();
result.LastRequestFinishedInstant = result.LastPingInstant;
result.LastRequestFinishedInstant = TInstant::Now();
result.IsLock = true;
return result;
}

bool Ping(const TDuration critDuration, const TInstant now) {
LastPingInstant = now;
if (Requests.empty()) {
AFL_VERIFY(LastRequestFinishedInstant);
if (critDuration < *LastPingInstant - *LastRequestFinishedInstant && IsLock) {
IsLock = false;
bool IsExpired(const TDuration critDuration, const TInstant now) const {
if (Requests.size()) {
return false;
}
AFL_VERIFY(LastRequestFinishedInstant);
return critDuration < now - *LastRequestFinishedInstant;
}

bool CheckToLock(const TDuration snapshotLivetime, const TDuration usedSnapshotGuaranteeLivetime, const TInstant now) {
if (IsLock) {
return false;
}

if (Requests.size()) {
if (now + usedSnapshotGuaranteeLivetime > Snapshot.GetPlanInstant() + snapshotLivetime) {
IsLock = true;
return true;
}
} else {
if (critDuration < *LastPingInstant - Snapshot.GetPlanInstant() && !IsLock) {
AFL_VERIFY(LastRequestFinishedInstant);
if (*LastRequestFinishedInstant + usedSnapshotGuaranteeLivetime > Snapshot.GetPlanInstant() + snapshotLivetime) {
IsLock = true;
return true;
}
Expand All @@ -88,7 +97,8 @@ class TInFlightReadsTracker {

bool LoadFromDatabase(NTable::TDatabase& db);

[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> Ping(TColumnShard* self, const TDuration critDuration, const TInstant now);
[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> Ping(
TColumnShard* self, const TDuration stalenessInMem, const TDuration usedSnapshotLivetime, const TInstant now);

// Returns a unique cookie associated with this request
[[nodiscard]] ui64 AddInFlightRequest(
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString codec = "") {
auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
csControllerGuard->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
csControllerGuard->SetOverrideReadTimeoutClean(TDuration::Max());
csControllerGuard->SetOverrideMaxReadStaleness(TDuration::Max());
TTestBasicRuntime runtime;
TTester::Setup(runtime);

Expand Down Expand Up @@ -2654,7 +2654,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
PlanCommit(runtime, sender, planStep, txId);
}
UNIT_ASSERT_EQUAL(cleanupsHappened, 0);
csDefaultControllerGuard->SetOverrideRequestsTracePingCheckPeriod(TDuration::Zero());
csDefaultControllerGuard->SetOverrideStalenessLivetimePing(TDuration::Zero());
csDefaultControllerGuard->SetOverrideUsedSnapshotLivetime(TDuration::Zero());
{
auto read = std::make_unique<NColumnShard::TEvPrivate::TEvPingSnapshotsUsage>();
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
Expand Down

0 comments on commit ffa5093

Please sign in to comment.