Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #417: Abstract the Qbeast Snapshot Module #411

Merged
merged 7 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ThisBuild / libraryDependencies ++= Seq(
mockito % Test)

Test / javaOptions ++= Seq("-Xmx10G", "-XX:+UseG1GC")
Test / testOptions += Tests.Argument("-oD")
osopardo1 marked this conversation as resolved.
Show resolved Hide resolved
Test / fork := true

// Scala compiler settings
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/qbeast/context/QbeastContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import io.qbeast.core.keeper.Keeper
import io.qbeast.core.keeper.LocalKeeper
import io.qbeast.core.model._
import io.qbeast.spark.delta.writer.RollupDataWriter
import io.qbeast.spark.delta.SparkDeltaMetadataManager
import io.qbeast.spark.delta.DeltaMetadataManager
import io.qbeast.spark.index.SparkColumnsToIndexSelector
import io.qbeast.spark.index.SparkOTreeManager
import io.qbeast.spark.index.SparkRevisionFactory
Expand Down Expand Up @@ -92,7 +92,7 @@ object QbeastContext
override def indexManager: IndexManager[DataFrame] = SparkOTreeManager

override def metadataManager: MetadataManager[StructType, FileAction, QbeastOptions] =
SparkDeltaMetadataManager
DeltaMetadataManager

override def dataWriter: DataWriter[DataFrame, StructType, FileAction] =
RollupDataWriter
Expand Down
35 changes: 31 additions & 4 deletions src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.qbeast.core.model

import io.qbeast.IISeq
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

Expand All @@ -26,10 +27,19 @@ trait QbeastSnapshot {

/**
* The current state of the snapshot.
* @return
*/
def isInitial: Boolean

/**
* Returns the total number of data files in the snapshot.
*/
def allFilesCount: Long

/**
* Provides the schema of the dataset for this snapshot.
*/
def schema: StructType
JosepSampe marked this conversation as resolved.
Show resolved Hide resolved

/**
* The current table description.
* @return
Expand Down Expand Up @@ -71,14 +81,14 @@ trait QbeastSnapshot {
def loadLatestIndexFiles: Dataset[IndexFile]

/**
* Loads the index files of the specified revision.
* Loads the index files of the specified revision (revision files).
*
* @param revisionId
* @param revisionID
* the revision identifier
* @return
* the index files of the specified revision
*/
def loadIndexFiles(revisionId: RevisionID): Dataset[IndexFile]
def loadIndexFiles(revisionID: RevisionID): Dataset[IndexFile]

/**
* Obtains all Revisions
Expand All @@ -87,6 +97,16 @@ trait QbeastSnapshot {
*/
def loadAllRevisions: IISeq[Revision]

/**
* Returns true if a revision with a specific revision identifier exists
*
* @param revisionID
* the identifier of the revision
* @return
* boolean
*/
def existsRevision(revisionID: RevisionID): Boolean

/**
* Obtains the last Revision available
* @return
Expand All @@ -112,6 +132,13 @@ trait QbeastSnapshot {
*/
def loadRevisionAt(timestamp: Long): Revision

/**
* Loads the dataset of qbeast blocks from index files
* @param indexFile
* A dataset of index files
* @return
* the Datasetframe
*/
def loadDataframeFromIndexFiles(indexFile: Dataset[IndexFile]): DataFrame

}
6 changes: 1 addition & 5 deletions src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import io.qbeast.spark.internal.commands.AnalyzeTableCommand
import io.qbeast.spark.internal.commands.OptimizeTableCommand
import io.qbeast.spark.table._
import io.qbeast.spark.utils.IndexMetrics
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.AnalysisExceptionFactory
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
Expand All @@ -48,10 +47,7 @@ class QbeastTable private (
extends Serializable
with StagingUtils {

private def deltaLog: DeltaLog = DeltaLog.forTable(sparkSession, tableID.id)

private def qbeastSnapshot: DeltaQbeastSnapshot =
delta.DeltaQbeastSnapshot(deltaLog.update())
private def qbeastSnapshot: DeltaQbeastSnapshot = DeltaQbeastSnapshot(tableID)

private def indexedTable: IndexedTable = indexedTableFactory.getIndexedTable(tableID)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import org.apache.spark.sql.SparkSession
/**
* Spark+Delta implementation of the MetadataManager interface
*/
object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction, QbeastOptions] {
object DeltaMetadataManager extends MetadataManager[StructType, FileAction, QbeastOptions] {

override def updateWithTransaction(
tableID: QTableID,
schema: StructType,
options: QbeastOptions,
append: Boolean)(writer: => (TableChanges, IISeq[FileAction])): Unit = {

val deltaLog = loadDeltaQbeastLog(tableID).deltaLog
val deltaLog = loadDeltaLog(tableID)
val mode = if (append) SaveMode.Append else SaveMode.Overwrite

val metadataWriter =
Expand All @@ -45,7 +45,7 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction,

override def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)(
update: => Configuration): Unit = {
val deltaLog = loadDeltaQbeastLog(tableID).deltaLog
val deltaLog = loadDeltaLog(tableID)
val metadataWriter =
DeltaMetadataWriter(tableID, mode = SaveMode.Append, deltaLog, QbeastOptions.empty, schema)

Expand All @@ -54,11 +54,11 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction,
}

override def loadSnapshot(tableID: QTableID): DeltaQbeastSnapshot = {
DeltaQbeastSnapshot(loadDeltaQbeastLog(tableID).deltaLog.update())
DeltaQbeastSnapshot(tableID)
}

override def loadCurrentSchema(tableID: QTableID): StructType = {
loadDeltaQbeastLog(tableID).deltaLog.update().schema
loadDeltaLog(tableID).update().schema
}

override def updateRevision(tableID: QTableID, revisionChange: RevisionChange): Unit = {}
Expand All @@ -71,8 +71,8 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction,
* the table ID
* @return
*/
def loadDeltaQbeastLog(tableID: QTableID): DeltaQbeastLog = {
DeltaQbeastLog(DeltaLog.forTable(SparkSession.active, tableID.id))
def loadDeltaLog(tableID: QTableID): DeltaLog = {
DeltaLog.forTable(SparkSession.active, tableID.id)
}

override def hasConflicts(
Expand All @@ -98,16 +98,17 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction,
* @return
*/
override def existsLog(tableID: QTableID): Boolean = {
loadDeltaQbeastLog(tableID).deltaLog.tableExists
loadDeltaLog(tableID).tableExists
}

/**
* Creates an initial log directory
*
* @param tableID
* Table ID
*/
override def createLog(tableID: QTableID): Unit = {
loadDeltaQbeastLog(tableID).deltaLog.createLogDirectory()
loadDeltaLog(tableID).createLogDirectory()
}

}
31 changes: 0 additions & 31 deletions src/main/scala/io/qbeast/spark/delta/DeltaQbeastLog.scala

This file was deleted.

52 changes: 27 additions & 25 deletions src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@ import io.qbeast.spark.utils.TagColumns
import io.qbeast.IISeq
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.AnalysisExceptionFactory
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession

/**
* Qbeast Snapshot that provides information about the current index state.
*
* @param snapshot
* the internal Delta Lakes log snapshot
* @param tableID
* the table ID
*/
case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
extends QbeastSnapshot
with DeltaStagingUtils {
case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with DeltaStagingUtils {

override val snapshot: Snapshot =
DeltaLog.forTable(SparkSession.active, tableID.id).update()

/**
* The current state of the snapshot.
Expand All @@ -44,6 +48,10 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
*/
override def isInitial: Boolean = snapshot.version == -1

override val schema: StructType = snapshot.metadata.schema

override val allFilesCount: Long = snapshot.allFiles.count

private val metadataMap: Map[String, String] = snapshot.metadata.configuration

/**
Expand Down Expand Up @@ -110,7 +118,7 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
* @return
* boolean
*/
def existsRevision(revisionID: RevisionID): Boolean = {
override def existsRevision(revisionID: RevisionID): Boolean = {
revisionsMap.contains(revisionID)
}

Expand Down Expand Up @@ -138,13 +146,11 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)

override def loadLatestIndexFiles: Dataset[IndexFile] = loadIndexFiles(lastRevisionID)

override def loadIndexFiles(revisionId: RevisionID): Dataset[IndexFile] = {
val revision = loadRevision(revisionId)
val dimensionCount = revision.transformations.size
override def loadIndexFiles(revisionID: RevisionID): Dataset[IndexFile] = {
val dimensionCount = loadRevision(revisionID).transformations.size
val addFiles =
if (isStaging(revision)) loadStagingFiles()
else loadRevisionFiles(revisionId)

if (isStaging(revisionID)) loadStagingFiles()
else snapshot.allFiles.where(TagColumns.revision === lit(revisionID.toString))
import addFiles.sparkSession.implicits._
addFiles.map(IndexFiles.fromAddFile(dimensionCount))
}
Expand Down Expand Up @@ -198,21 +204,12 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
}

/**
* Loads the dataset of qbeast blocks for a given revision
* @param revisionID
* the revision identifier
* Loads the dataset of qbeast blocks from index files
* @param indexFile
* A dataset of index files
* @return
* the Dataset of QbeastBlocks
*/
def loadRevisionFiles(revisionID: RevisionID): Dataset[AddFile] = {
if (isStaging(revisionID)) loadStagingFiles()
else snapshot.allFiles.where(TagColumns.revision === lit(revisionID.toString))
}

/**
* Loads Staging AddFiles
* the Datasetframe
*/
def loadStagingFiles(): Dataset[AddFile] = stagingFiles()

override def loadDataframeFromIndexFiles(indexFile: Dataset[IndexFile]): DataFrame = {
if (snapshot.deletionVectorsSupported) {
Expand All @@ -231,4 +228,9 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
}
}

/**
* Loads Staging AddFiles
*/
private def loadStagingFiles(): Dataset[AddFile] = stagingFiles()

}
18 changes: 4 additions & 14 deletions src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.qbeast.spark.delta

import io.qbeast.core.model._
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset

Expand All @@ -33,21 +32,12 @@ import scala.collection.immutable.SortedMap
* the announced set available for the revision
*/
private[delta] class IndexStatusBuilder(
qbeastSnapshot: DeltaQbeastSnapshot,
qbeastSnapshot: QbeastSnapshot,
revision: Revision,
announcedSet: Set[CubeId] = Set.empty)
extends Serializable
with StagingUtils {

/**
* Dataset of files belonging to the specific revision
* @return
* the dataset of AddFile actions
*/
def revisionFiles: Dataset[AddFile] =
// this must be external to the lambda, to avoid SerializationErrors
qbeastSnapshot.loadRevisionFiles(revision.revisionID)

def build(): IndexStatus = {
val cubeStatus =
if (isStaging(revision)) stagingCubeStatuses
Expand Down Expand Up @@ -78,13 +68,13 @@ private[delta] class IndexStatusBuilder(
*/
def indexCubeStatuses: SortedMap[CubeId, CubeStatus] = {
val builder = SortedMap.newBuilder[CubeId, CubeStatus]
val dimensionCount = revision.transformations.size
val desiredCubeSize = revision.desiredCubeSize
val revisionAddFiles: Dataset[AddFile] = revisionFiles
val revisionAddFiles: Dataset[IndexFile] =
qbeastSnapshot.loadIndexFiles(revision.revisionID)

import revisionAddFiles.sparkSession.implicits._
val cubeStatuses = revisionAddFiles
.flatMap(IndexFiles.fromAddFile(dimensionCount)(_).blocks)
.flatMap(_.blocks)
.groupBy($"cubeId")
.agg(
min($"maxWeight.value").as("maxWeightInt"),
Expand Down
Loading
Loading