Skip to content

Commit

Permalink
Fixed an issue where secondary index cursor lookups could fail if the…
Browse files Browse the repository at this point in the history
… contents (the ID) ended split on a page
  • Loading branch information
dimitribouniol committed Aug 23, 2024
1 parent 44162be commit 01fb4cc
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,14 @@ extension DiskPersistence.Datastore.Index {
/// - Parameters:
/// - proposedEntry: The entry to use in comparison with other persisted entries.
/// - pages: A collection of pages to check against.
/// - requiresCompleteEntries: Set to `true` if the comparator requires a complete entry to operate with.
/// - pageBuilder: A closure that provides a cached Page object for the loaded page.
/// - comparator: A comparator to determine order and equality between the proposed entry and a persisted one.
/// - Returns: The index within the pages collection where the entry would reside.
func pageIndex<T>(
for proposedEntry: T,
in pages: [DatastoreIndexManifest.PageInfo],
requiresCompleteEntries: Bool,
pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> Int? {
Expand Down Expand Up @@ -348,11 +350,13 @@ extension DiskPersistence.Datastore.Index {

/// If we have some bytes, attempt to decode them into an entry.
if let bytesForFirstEntry {
firstEntryOfPage = try? DatastorePageEntry(bytes: bytesForFirstEntry, isPartial: false)
firstEntryOfPage = try? DatastorePageEntry(bytes: bytesForFirstEntry, isPartial: true)
}

/// If we have an entry, stop scanning as we can go ahead and operate on it.
if firstEntryOfPage != nil { break pageIterator }
/// If we have an entry, stop scanning as we can go ahead and operate on it. Also make sure that we have a complete entry if one is required by rejecting partial entries when the flag is set.
if let firstEntryOfPage, !(requiresCompleteEntries && firstEntryOfPage.isPartial) {
break pageIterator
}
}
}

Expand Down Expand Up @@ -388,6 +392,7 @@ extension DiskPersistence.Datastore.Index {

func entry<T>(
for proposedEntry: T,
requiresCompleteEntries: Bool,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> (
cursor: DiskPersistence.InstanceCursor,
Expand All @@ -396,6 +401,7 @@ extension DiskPersistence.Datastore.Index {
try await entry(
for: proposedEntry,
in: try await manifest.orderedPages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: { await datastore.page(for: .init(index: self.id, page: $0)) },
comparator: comparator
)
Expand All @@ -404,6 +410,7 @@ extension DiskPersistence.Datastore.Index {
func entry<T>(
for proposedEntry: T,
in pages: [DatastoreIndexManifest.PageInfo],
requiresCompleteEntries: Bool,
pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> (
Expand All @@ -415,6 +422,7 @@ extension DiskPersistence.Datastore.Index {
let startingPageIndex = try await pageIndex(
for: proposedEntry,
in: pages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: pageBuilder,
comparator: comparator
)
Expand Down Expand Up @@ -549,11 +557,13 @@ extension DiskPersistence.Datastore.Index {

func insertionCursor<T>(
for proposedEntry: T,
requiresCompleteEntries: Bool,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> DiskPersistence.InsertionCursor {
try await insertionCursor(
for: proposedEntry,
in: try await manifest.orderedPages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: { await datastore.page(for: .init(index: self.id, page: $0)) },
comparator: comparator
)
Expand All @@ -562,6 +572,7 @@ extension DiskPersistence.Datastore.Index {
func insertionCursor<T>(
for proposedEntry: T,
in pages: [DatastoreIndexManifest.PageInfo],
requiresCompleteEntries: Bool,
pageBuilder: @Sendable (_ pageID: DatastorePageIdentifier) async -> DiskPersistence.Datastore.Page,
comparator: @Sendable (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
) async throws -> DiskPersistence.InsertionCursor {
Expand All @@ -570,6 +581,7 @@ extension DiskPersistence.Datastore.Index {
let startingPageIndex = try await pageIndex(
for: proposedEntry,
in: pages,
requiresCompleteEntries: requiresCompleteEntries,
pageBuilder: pageBuilder,
comparator: comparator
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import Bytes
struct DatastorePageEntry: Hashable {
var headers: [Bytes]
var content: Bytes

/// Whether the entry contains a complete header, but a partial content.
var isPartial: Bool = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,11 @@ extension DiskPersistence.Transaction {

let index = try await rootObject.primaryIndex

let (cursor, entry) = try await index.entry(for: identifier, comparator: primaryIndexComparator)
let (cursor, entry) = try await index.entry(
for: identifier,
requiresCompleteEntries: false,
comparator: primaryIndexComparator
)
guard entry.headers.count == 2
else { throw DiskPersistenceError.invalidEntryFormat }

Expand All @@ -551,7 +555,11 @@ extension DiskPersistence.Transaction {

let index = try await rootObject.primaryIndex

return try await index.insertionCursor(for: identifier, comparator: primaryIndexComparator)
return try await index.insertionCursor(
for: identifier,
requiresCompleteEntries: false,
comparator: primaryIndexComparator
)
}

func directIndexCursor<IndexType: Indexable, IdentifierType: Indexable>(
Expand All @@ -572,7 +580,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.directIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

let (cursor, entry) = try await index.entry(for: (indexValue, identifier), comparator: directIndexComparator)
let (cursor, entry) = try await index.entry(
for: (indexValue, identifier),
requiresCompleteEntries: false,
comparator: directIndexComparator
)
guard entry.headers.count == 3
else { throw DiskPersistenceError.invalidEntryFormat }

Expand All @@ -597,7 +609,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.directIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

return try await index.insertionCursor(for: (indexValue, identifier), comparator: directIndexComparator)
return try await index.insertionCursor(
for: (indexValue, identifier),
requiresCompleteEntries: false,
comparator: directIndexComparator
)
}

func secondaryIndexCursor<IndexType: Indexable, IdentifierType: Indexable>(
Expand All @@ -614,7 +630,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.secondaryIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

let (cursor, _) = try await index.entry(for: (indexValue, identifier), comparator: secondaryIndexComparator)
let (cursor, _) = try await index.entry(
for: (indexValue, identifier),
requiresCompleteEntries: true,
comparator: secondaryIndexComparator
)

return cursor
}
Expand All @@ -633,7 +653,11 @@ extension DiskPersistence.Transaction {
guard let index = try await rootObject.secondaryIndexes[indexName]
else { throw DatastoreInterfaceError.indexNotFound }

return try await index.insertionCursor(for: (indexValue, identifier), comparator: secondaryIndexComparator)
return try await index.insertionCursor(
for: (indexValue, identifier),
requiresCompleteEntries: true,
comparator: secondaryIndexComparator
)
}
}

Expand Down Expand Up @@ -698,6 +722,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.lowerBoundExpression, .ascending),
requiresCompleteEntries: false,
comparator: primaryIndexBoundComparator
)
}
Expand All @@ -723,6 +748,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.upperBoundExpression, .descending),
requiresCompleteEntries: false,
comparator: primaryIndexBoundComparator
)
}
Expand Down Expand Up @@ -766,6 +792,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.lowerBoundExpression, .ascending),
requiresCompleteEntries: false,
comparator: directIndexBoundComparator
)
}
Expand All @@ -791,6 +818,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.upperBoundExpression, .descending),
requiresCompleteEntries: false,
comparator: directIndexBoundComparator
)
}
Expand Down Expand Up @@ -834,6 +862,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.lowerBoundExpression, .ascending),
requiresCompleteEntries: false,
comparator: secondaryIndexBoundComparator
)
}
Expand All @@ -857,6 +886,7 @@ extension DiskPersistence.Transaction {
} else {
try await index.insertionCursor(
for: (range.upperBoundExpression, .descending),
requiresCompleteEntries: false,
comparator: secondaryIndexBoundComparator
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import XCTest
@testable import CodableDatastore

fileprivate struct SortError: Error, Equatable {}

final class DiskPersistenceDatastoreIndexTests: XCTestCase {
var temporaryStoreURL: URL = FileManager.default.temporaryDirectory

Expand All @@ -26,7 +28,10 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
func assertPageSearch(
proposedEntry: UInt8,
pages: [[DatastorePageEntryBlock]],
requiredContentLength: Int? = nil,
requiresCompleteEntries: Bool = false,
expectedIndex: Int?,
expectedSearchFailure: Bool = false,
file: StaticString = #filePath,
line: UInt = #line
) async throws {
Expand Down Expand Up @@ -67,13 +72,25 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
pageInfos.append(.existing(pageID))
}

let result = try await index.pageIndex(for: proposedEntry, in: pageInfos) { [pageLookup] pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
lhs.sortOrder(comparedTo: rhs.headers[0][0])
do {
let result = try await index.pageIndex(
for: proposedEntry,
in: pageInfos,
requiresCompleteEntries: requiresCompleteEntries
) { [pageLookup] pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
if let requiredContentLength, rhs.content.count != requiredContentLength {
throw SortError()
}
return lhs.sortOrder(comparedTo: rhs.headers[0][0])
}

XCTAssertEqual(result, expectedIndex, file: file, line: line)
XCTAssertFalse(expectedSearchFailure, file: file, line: line)
} catch is SortError {
XCTAssertTrue(expectedSearchFailure, "Encountered unexpected error", file: file, line: line)
}

XCTAssertEqual(result, expectedIndex, file: file, line: line)
}

func assertInsertionCursor(
Expand Down Expand Up @@ -121,7 +138,7 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
pageInfos.append(.existing(pageID))
}

let result = try await index.insertionCursor(for: RangeBoundExpression.including(proposedEntry), in: pageInfos) { [pageLookup] pageID in
let result = try await index.insertionCursor(for: RangeBoundExpression.including(proposedEntry), in: pageInfos, requiresCompleteEntries: false) { [pageLookup] pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
lhs.sortOrder(comparedTo: rhs.headers[0][0], order: .ascending)
Expand Down Expand Up @@ -208,6 +225,29 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
try await assertPageSearch(proposedEntry: 4, pages: pages, expectedIndex: 1)
}

func testSplitContentBlockSearch() async throws {
let entry1 = DatastorePageEntry(headers: [[1]], content: Array(repeating: 1, count: 100)).blocks(remainingPageSpace: 20, maxPageSpace: 1024)
let entry3 = DatastorePageEntry(headers: [[3]], content: Array(repeating: 3, count: 100)).blocks(remainingPageSpace: 20, maxPageSpace: 1024)

let pages = [
[entry1[0]],
[entry1[1], entry3[0]],
[entry3[1]]
]

try await assertPageSearch(proposedEntry: 0, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 1, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 2, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 0, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 3, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 1, expectedSearchFailure: true)
try await assertPageSearch(proposedEntry: 4, pages: pages, requiredContentLength: 100, requiresCompleteEntries: false, expectedIndex: 1, expectedSearchFailure: true)

try await assertPageSearch(proposedEntry: 0, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 1, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 2, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 0, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 3, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 1, expectedSearchFailure: false)
try await assertPageSearch(proposedEntry: 4, pages: pages, requiredContentLength: 100, requiresCompleteEntries: true, expectedIndex: 1, expectedSearchFailure: false)
}

func testTwoPageBackwardsBleedingBlockSearch() async throws {
let entry1 = DatastorePageEntry(headers: [[1]], content: [1]).blocks(remainingPageSpace: 1024, maxPageSpace: 1024)
let entry3 = DatastorePageEntry(headers: [[3]], content: [3]).blocks(remainingPageSpace: 7, maxPageSpace: 1024)
Expand Down Expand Up @@ -381,7 +421,7 @@ final class DiskPersistenceDatastoreIndexTests: XCTestCase {
let exp = expectation(description: "Finished")
Task { [pageInfos, pageLookup] in
for _ in 0..<1000 {
_ = try await index.pageIndex(for: UInt64.random(in: 0..<1000000), in: pageInfos) { pageID in
_ = try await index.pageIndex(for: UInt64.random(in: 0..<1000000), in: pageInfos, requiresCompleteEntries: false) { pageID in
pageLookup[pageID]!
} comparator: { lhs, rhs in
lhs.sortOrder(comparedTo: try UInt64(bigEndianBytes: rhs.headers[0]))
Expand Down

0 comments on commit 01fb4cc

Please sign in to comment.