diff --git a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp index cfbdd117d3e9..70d3a20fd7e2 100644 --- a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp +++ b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp @@ -100,7 +100,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { Controller->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable); Controller->SetExpectedShardsCount(ShardsCount); Controller->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); - Controller->SetOverrideReadTimeoutClean(TDuration::Seconds(1)); + Controller->SetOverrideMaxReadStaleness(TDuration::Seconds(1)); Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize(); @@ -117,7 +117,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { } void WaitNormalization() { - Controller->SetOverrideReadTimeoutClean(TDuration::Seconds(1)); + Controller->SetOverrideMaxReadStaleness(TDuration::Seconds(1)); Controller->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Force); const auto start = TInstant::Now(); while (!Controller->IsTrivialLinks() && TInstant::Now() - start < TDuration::Seconds(30)) { @@ -126,11 +126,11 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { } AFL_VERIFY(Controller->IsTrivialLinks()); Controller->CheckInvariants(); - Controller->SetOverrideReadTimeoutClean(TDuration::Minutes(5)); + Controller->SetOverrideMaxReadStaleness(TDuration::Minutes(5)); } void Execute(const ui64 destinationIdx, const std::vector& sourceIdxs, const bool move, const NOlap::TSnapshot& snapshot, const std::set& pathIdxs) { - Controller->SetOverrideReadTimeoutClean(TDuration::Seconds(1)); + Controller->SetOverrideMaxReadStaleness(TDuration::Seconds(1)); AFL_VERIFY(destinationIdx < ShardIds.size()); const ui64 destination = ShardIds[destinationIdx]; std::vector sources; @@ -198,7 +198,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) { CSTransferStatus->Reset(); AFL_VERIFY(!Controller->IsTrivialLinks()); Controller->CheckInvariants(); - Controller->SetOverrideReadTimeoutClean(TDuration::Minutes(5)); + Controller->SetOverrideMaxReadStaleness(TDuration::Minutes(5)); } }; Y_UNIT_TEST(BlobsSharingSplit1_1) { diff --git a/ydb/core/kqp/ut/olap/write_ut.cpp b/ydb/core/kqp/ut/olap/write_ut.cpp index 0784cfa235af..c339990668bd 100644 --- a/ydb/core/kqp/ut/olap/write_ut.cpp +++ b/ydb/core/kqp/ut/olap/write_ut.cpp @@ -251,7 +251,7 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) { .ExtractValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); } - csController->SetOverrideReadTimeoutClean(TDuration::Zero()); + csController->SetOverrideMaxReadStaleness(TDuration::Zero()); csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::GC); { const TInstant start = TInstant::Now(); diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 0d5c5766c270..117efd69d2e7 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -200,11 +200,16 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon } void TColumnShard::Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& /*ev*/, const TActorContext& ctx) { - if (auto writeTx = - InFlightReadsTracker.Ping(this, NYDBTest::TControllers::GetColumnShardController()->GetPingCheckPeriod(), TInstant::Now())) { + const TDuration stalenessLivetime = NYDBTest::TControllers::GetColumnShardController()->GetMaxReadStaleness(); + const TDuration stalenessInMem = NYDBTest::TControllers::GetColumnShardController()->GetMaxReadStalenessInMem(); + const TDuration usedLivetime = NYDBTest::TControllers::GetColumnShardController()->GetUsedSnapshotLivetime(); + AFL_VERIFY(usedLivetime < stalenessInMem || (stalenessInMem == usedLivetime && usedLivetime == TDuration::Zero()))("used", usedLivetime)( + "staleness", stalenessInMem); + const TDuration ping = 0.3 * std::min(stalenessInMem - usedLivetime, stalenessLivetime - stalenessInMem); + if (auto writeTx = InFlightReadsTracker.Ping(this, stalenessInMem, usedLivetime, TInstant::Now())) { Execute(writeTx.release(), ctx); } - ctx.Schedule(0.3 * GetMaxReadStaleness(), new TEvPrivate::TEvPingSnapshotsUsage()); + ctx.Schedule(NYDBTest::TControllers::GetColumnShardController()->GetStalenessLivetimePing(ping), new TEvPrivate::TEvPingSnapshotsUsage()); } void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 3821f0229993..5e1244cf1bf1 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -194,7 +194,7 @@ ui64 TColumnShard::GetOutdatedStep() const { } NOlap::TSnapshot TColumnShard::GetMinReadSnapshot() const { - ui64 delayMillisec = GetMaxReadStaleness().MilliSeconds(); + ui64 delayMillisec = NYDBTest::TControllers::GetColumnShardController()->GetMaxReadStaleness().MilliSeconds(); ui64 passedStep = GetOutdatedStep(); ui64 minReadStep = (passedStep > delayMillisec ? passedStep - delayMillisec : 0); @@ -1600,8 +1600,4 @@ const NKikimr::NColumnShard::NTiers::TManager* TColumnShard::GetTierManagerPoint return Tiers->GetManagerOptional(tierId); } -TDuration TColumnShard::GetMaxReadStaleness() { - return NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(); -} - } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 844a3e20b80c..1c3f6343ecfc 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -536,7 +536,6 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa TLimits Limits; NOlap::TNormalizationController NormalizerController; NDataShard::TSysLocks SysLocks; - static TDuration GetMaxReadStaleness(); void TryRegisterMediatorTimeCast(); void UnregisterMediatorTimeCast(); diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 28470aca8bda..a747c5a5837a 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -61,6 +61,9 @@ class ICSController { }; protected: + virtual std::optional DoGetStalenessLivetimePing() const { + return {}; + } virtual void DoOnTabletInitCompleted(const ::NKikimr::NColumnShard::TColumnShard& /*shard*/) { return; } @@ -85,7 +88,7 @@ class ICSController { virtual void DoOnDataSharingStarted(const ui64 /*tabletId*/, const TString& /*sessionId*/) { } - virtual TDuration DoGetPingCheckPeriod(const TDuration defaultValue) const { + virtual TDuration DoGetUsedSnapshotLivetime(const TDuration defaultValue) const { return defaultValue; } virtual TDuration DoGetOverridenGCPeriod(const TDuration defaultValue) const { @@ -109,7 +112,7 @@ class ICSController { virtual ui64 DoGetSmallPortionSizeDetector(const ui64 defaultValue) const { return defaultValue; } - virtual TDuration DoGetReadTimeoutClean(const TDuration defaultValue) const { + virtual TDuration DoGetMaxReadStaleness(const TDuration defaultValue) const { return defaultValue; } virtual TDuration DoGetGuaranteeIndexationInterval(const TDuration defaultValue) const { @@ -158,9 +161,13 @@ class ICSController { } virtual bool CheckPortionForEvict(const NOlap::TPortionInfo& portion) const; - TDuration GetPingCheckPeriod() const { - const TDuration defaultValue = 0.6 * GetReadTimeoutClean(); - return DoGetPingCheckPeriod(defaultValue); + TDuration GetStalenessLivetimePing(const TDuration defValue) const { + const auto val = DoGetStalenessLivetimePing(); + if (!val || defValue < *val) { + return defValue; + } else { + return *val; + } } virtual bool IsBackgroundEnabled(const EBackground /*id*/) const { @@ -261,9 +268,16 @@ class ICSController { } virtual void OnIndexSelectProcessed(const std::optional /*result*/) { } - TDuration GetReadTimeoutClean() const { + TDuration GetMaxReadStaleness() const { const TDuration defaultValue = TDuration::MilliSeconds(GetConfig().GetMaxReadStaleness_ms()); - return DoGetReadTimeoutClean(defaultValue); + return DoGetMaxReadStaleness(defaultValue); + } + TDuration GetMaxReadStalenessInMem() const { + return 0.9 * GetMaxReadStaleness(); + } + TDuration GetUsedSnapshotLivetime() const { + const TDuration defaultValue = 0.6 * GetMaxReadStaleness(); + return DoGetUsedSnapshotLivetime(defaultValue); } virtual EOptimizerCompactionWeightControl GetCompactionControl() const { return EOptimizerCompactionWeightControl::Force; diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index d57470a0a93b..af375a612caf 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -12,7 +12,8 @@ namespace NKikimr::NYDBTest::NColumnShard { class TController: public TReadOnlyController { private: using TBase = TReadOnlyController; - YDB_ACCESSOR_DEF(std::optional, OverrideRequestsTracePingCheckPeriod); + YDB_ACCESSOR_DEF(std::optional, OverrideUsedSnapshotLivetime); + YDB_ACCESSOR_DEF(std::optional, OverrideStalenessLivetimePing); YDB_ACCESSOR_DEF(std::optional, OverrideLagForCompactionBeforeTierings); YDB_ACCESSOR(std::optional, OverrideGuaranteeIndexationInterval, TDuration::Zero()); YDB_ACCESSOR(std::optional, OverridePeriodicWakeupActivationPeriod, std::nullopt); @@ -21,7 +22,7 @@ class TController: public TReadOnlyController { YDB_ACCESSOR(std::optional, OverrideOptimizerFreshnessCheckDuration, TDuration::Zero()); YDB_ACCESSOR_DEF(std::optional, OverrideCompactionActualizationLag); YDB_ACCESSOR_DEF(std::optional, OverrideTasksActualizationLag); - YDB_ACCESSOR_DEF(std::optional, OverrideReadTimeoutClean); + YDB_ACCESSOR_DEF(std::optional, OverrideMaxReadStaleness); YDB_ACCESSOR(std::optional, OverrideMemoryLimitForPortionReading, 100); YDB_ACCESSOR_DEF(std::optional, OverrideBlobPutResultOnWriteValue); @@ -142,10 +143,12 @@ class TController: public TReadOnlyController { return OverrideLagForCompactionBeforeTierings.value_or(def); } - virtual TDuration DoGetPingCheckPeriod(const TDuration def) const override { - return OverrideRequestsTracePingCheckPeriod.value_or(def); + virtual TDuration DoGetUsedSnapshotLivetime(const TDuration def) const override { + return OverrideUsedSnapshotLivetime.value_or(def); + } + virtual std::optional DoGetStalenessLivetimePing() const override { + return OverrideStalenessLivetimePing; } - virtual TDuration DoGetCompactionActualizationLag(const TDuration def) const override { return OverrideCompactionActualizationLag.value_or(def); } @@ -180,8 +183,8 @@ class TController: public TReadOnlyController { virtual TDuration DoGetOptimizerFreshnessCheckDuration(const TDuration defaultValue) const override { return OverrideOptimizerFreshnessCheckDuration.value_or(defaultValue); } - virtual TDuration DoGetReadTimeoutClean(const TDuration def) const override { - return OverrideReadTimeoutClean.value_or(def); + virtual TDuration DoGetMaxReadStaleness(const TDuration def) const override { + return OverrideMaxReadStaleness.value_or(def); } virtual ui64 DoGetReduceMemoryIntervalLimit(const ui64 def) const override { return OverrideReduceMemoryIntervalLimit.value_or(def); diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.cpp b/ydb/core/tx/columnshard/inflight_request_tracker.cpp index 6b7830b26cb0..ea5a7bc68e17 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.cpp +++ b/ydb/core/tx/columnshard/inflight_request_tracker.cpp @@ -19,9 +19,7 @@ NOlap::NReader::TReadMetadataBase::TConstPtr TInFlightReadsTracker::ExtractInFli { auto it = SnapshotsLive.find(readMetaBase->GetRequestSnapshot()); AFL_VERIFY(it != SnapshotsLive.end()); - if (it->second.DelRequest(cookie, now)) { - SnapshotsLive.erase(it); - } + Y_UNUSED(it->second.DelRequest(cookie, now)); } if (NOlap::NReader::NPlain::TReadMetadata::TConstPtr readMeta = @@ -93,27 +91,29 @@ class TTransactionSavePersistentSnapshots: public NOlap::NDataSharing::TExtended } // namespace std::unique_ptr TInFlightReadsTracker::Ping( - TColumnShard* self, const TDuration critDuration, const TInstant now) { + TColumnShard* self, const TDuration stalenessInMem, const TDuration usedSnapshotLivetime, const TInstant now) { std::set snapshotsToSave; - std::set snapshotsToFree; + std::set snapshotsToFreeInDB; + std::set snapshotsToFreeInMem; for (auto&& i : SnapshotsLive) { - if (i.second.Ping(critDuration, now)) { + if (i.second.IsExpired(usedSnapshotLivetime, now)) { if (i.second.GetIsLock()) { - Counters->OnSnapshotLocked(); - snapshotsToSave.emplace(i.first); - } else { Counters->OnSnapshotUnlocked(); - snapshotsToFree.emplace(i.first); + snapshotsToFreeInDB.emplace(i.first); } + snapshotsToFreeInMem.emplace(i.first); + } else if (i.second.CheckToLock(stalenessInMem, usedSnapshotLivetime, now)) { + Counters->OnSnapshotLocked(); + snapshotsToSave.emplace(i.first); } } - for (auto&& i : snapshotsToFree) { + for (auto&& i : snapshotsToFreeInMem) { SnapshotsLive.erase(i); } Counters->OnSnapshotsInfo(SnapshotsLive.size(), GetSnapshotToClean()); - if (snapshotsToFree.size() || snapshotsToSave.size()) { - NYDBTest::TControllers::GetColumnShardController()->OnRequestTracingChanges(snapshotsToSave, snapshotsToFree); - return std::make_unique(self, std::move(snapshotsToSave), std::move(snapshotsToFree)); + if (snapshotsToFreeInDB.size() || snapshotsToSave.size()) { + NYDBTest::TControllers::GetColumnShardController()->OnRequestTracingChanges(snapshotsToSave, snapshotsToFreeInMem); + return std::make_unique(self, std::move(snapshotsToSave), std::move(snapshotsToFreeInDB)); } else { return nullptr; } diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h index 0aeec5acddbe..51de0a26795f 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.h +++ b/ydb/core/tx/columnshard/inflight_request_tracker.h @@ -17,7 +17,6 @@ using NOlap::IBlobInUseTracker; class TSnapshotLiveInfo { private: const NOlap::TSnapshot Snapshot; - std::optional LastPingInstant; std::optional LastRequestFinishedInstant; THashSet Requests; YDB_READONLY(bool, IsLock, false); @@ -48,22 +47,32 @@ class TSnapshotLiveInfo { static TSnapshotLiveInfo BuildFromDatabase(const NOlap::TSnapshot& reqSnapshot) { TSnapshotLiveInfo result(reqSnapshot); - result.LastPingInstant = TInstant::Now(); - result.LastRequestFinishedInstant = result.LastPingInstant; + result.LastRequestFinishedInstant = TInstant::Now(); result.IsLock = true; return result; } - bool Ping(const TDuration critDuration, const TInstant now) { - LastPingInstant = now; - if (Requests.empty()) { - AFL_VERIFY(LastRequestFinishedInstant); - if (critDuration < *LastPingInstant - *LastRequestFinishedInstant && IsLock) { - IsLock = false; + bool IsExpired(const TDuration critDuration, const TInstant now) const { + if (Requests.size()) { + return false; + } + AFL_VERIFY(LastRequestFinishedInstant); + return critDuration < now - *LastRequestFinishedInstant; + } + + bool CheckToLock(const TDuration snapshotLivetime, const TDuration usedSnapshotGuaranteeLivetime, const TInstant now) { + if (IsLock) { + return false; + } + + if (Requests.size()) { + if (now + usedSnapshotGuaranteeLivetime > Snapshot.GetPlanInstant() + snapshotLivetime) { + IsLock = true; return true; } } else { - if (critDuration < *LastPingInstant - Snapshot.GetPlanInstant() && !IsLock) { + AFL_VERIFY(LastRequestFinishedInstant); + if (*LastRequestFinishedInstant + usedSnapshotGuaranteeLivetime > Snapshot.GetPlanInstant() + snapshotLivetime) { IsLock = true; return true; } @@ -88,7 +97,8 @@ class TInFlightReadsTracker { bool LoadFromDatabase(NTable::TDatabase& db); - [[nodiscard]] std::unique_ptr Ping(TColumnShard* self, const TDuration critDuration, const TInstant now); + [[nodiscard]] std::unique_ptr Ping( + TColumnShard* self, const TDuration stalenessInMem, const TDuration usedSnapshotLivetime, const TInstant now); // Returns a unique cookie associated with this request [[nodiscard]] ui64 AddInFlightRequest( diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 25e1de8038e7..262fd7be633d 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -552,7 +552,7 @@ void TestWriteReadDup(const TestTableDescription& table = {}) { void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString codec = "") { auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); csControllerGuard->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); - csControllerGuard->SetOverrideReadTimeoutClean(TDuration::Max()); + csControllerGuard->SetOverrideMaxReadStaleness(TDuration::Max()); TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -2654,7 +2654,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { PlanCommit(runtime, sender, planStep, txId); } UNIT_ASSERT_EQUAL(cleanupsHappened, 0); - csDefaultControllerGuard->SetOverrideRequestsTracePingCheckPeriod(TDuration::Zero()); + csDefaultControllerGuard->SetOverrideStalenessLivetimePing(TDuration::Zero()); + csDefaultControllerGuard->SetOverrideUsedSnapshotLivetime(TDuration::Zero()); { auto read = std::make_unique(); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());