Skip to content

Commit

Permalink
Abstract StagingDataManager
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Sep 17, 2024
1 parent a384bb5 commit efbd61d
Show file tree
Hide file tree
Showing 19 changed files with 257 additions and 134 deletions.
16 changes: 10 additions & 6 deletions src/main/scala/io/qbeast/context/QbeastContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package io.qbeast.context
import io.qbeast.core.keeper.Keeper
import io.qbeast.core.keeper.LocalKeeper
import io.qbeast.core.model._
import io.qbeast.spark.delta.writer.RollupDataWriter
import io.qbeast.spark.delta.writer.DeltaRollupDataWriter
import io.qbeast.spark.delta.DeltaMetadataManager
import io.qbeast.spark.delta.DeltaStagingDataManagerFactory
import io.qbeast.spark.index.SparkColumnsToIndexSelector
import io.qbeast.spark.index.SparkOTreeManager
import io.qbeast.spark.index.SparkRevisionFactory
Expand All @@ -28,7 +29,6 @@ import io.qbeast.spark.table.IndexedTableFactory
import io.qbeast.spark.table.IndexedTableFactoryImpl
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.scheduler.SparkListenerApplicationEnd
import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -75,7 +75,7 @@ trait QbeastContext {
*/
object QbeastContext
extends QbeastContext
with QbeastCoreContext[DataFrame, StructType, QbeastOptions, FileAction] {
with QbeastCoreContext[DataFrame, StructType, QbeastOptions, IndexFile] {
private var managedOption: Option[QbeastContext] = None
private var unmanagedOption: Option[QbeastContext] = None

Expand All @@ -91,11 +91,14 @@ object QbeastContext

override def indexManager: IndexManager[DataFrame] = SparkOTreeManager

override def metadataManager: MetadataManager[StructType, FileAction, QbeastOptions] =
override def metadataManager: MetadataManager[StructType, IndexFile, QbeastOptions] =
DeltaMetadataManager

override def dataWriter: DataWriter[DataFrame, StructType, FileAction] =
RollupDataWriter
override def dataWriter: DataWriter[DataFrame, StructType, IndexFile] =
DeltaRollupDataWriter

override def stagingDataManagerBuilder: StagingDataManagerFactory[DataFrame, QbeastOptions] =
DeltaStagingDataManagerFactory

override def revisionBuilder: RevisionFactory[StructType, QbeastOptions] =
SparkRevisionFactory
Expand Down Expand Up @@ -156,6 +159,7 @@ object QbeastContext
indexManager,
metadataManager,
dataWriter,
stagingDataManagerBuilder,
revisionBuilder,
columnSelector)

Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/io/qbeast/core/model/IndexFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ case class IndexFile(
size: Long,
modificationTime: Long,
revisionId: RevisionID,
blocks: IISeq[Block])
blocks: IISeq[Block],
remove: Boolean = false)
extends Serializable {

/**
Expand Down Expand Up @@ -71,11 +72,11 @@ case class IndexFile(
val newBlocks = blocks.map { block =>
if (cubeIds.contains(block.cubeId)) block.replicate() else block
}
Some(IndexFile(path, size, newModificationTime, revisionId, newBlocks))
Some(IndexFile(path, size, newModificationTime, revisionId, newBlocks, remove))
}

override def toString: String = {
s"IndexFile($path, $size, $modificationTime, $revisionId, $blocks)"
s"IndexFile($path, $size, $modificationTime, $revisionId, $blocks, $remove)"
}

}
9 changes: 8 additions & 1 deletion src/main/scala/io/qbeast/core/model/IndexFileBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ final class IndexFileBuilder {
private var modificationTime: Long = 0L
private var revisionId: RevisionID = 0L
private val blocks = immutable.Seq.newBuilder[VolatileBlock]
private var remove: Boolean = false

/**
* Sets the path.
Expand Down Expand Up @@ -92,6 +93,11 @@ final class IndexFileBuilder {
this
}

def setRemove(): IndexFileBuilder = {
this.remove = true
this
}

/**
* Builds th result.
*
Expand All @@ -105,7 +111,8 @@ final class IndexFileBuilder {
size,
modificationTime,
revisionId,
blocks.result().map(_.toBlock(filePath)))
blocks.result().map(_.toBlock(filePath)),
remove)
}

}
Expand Down
31 changes: 29 additions & 2 deletions src/main/scala/io/qbeast/core/model/PreCommitHook.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import PreCommitHook.PreCommitHookOutput
* Pre-commit hooks are executed before a commit is made to the table. They can be used to perform
* actions such as validation, logging, or other custom logic.
*/
trait PreCommitHook[T] {
trait PreCommitHook {

/**
* The name of the hook.
Expand All @@ -45,7 +45,7 @@ trait PreCommitHook[T] {
* @return
* The output of the hook as a `PreCommitHookOutput`.
*/
def run(actions: Seq[T]): PreCommitHookOutput
def run(actions: Seq[IndexFile]): PreCommitHookOutput

}

Expand All @@ -72,6 +72,33 @@ object PreCommitHook {

}

/**
* A loader for PreCommitHooks
*/
object QbeastHookLoader {

/**
* Loads a pre-commit hook from a `HookInfo` object.
*
* This method takes a `HookInfo` object and returns a `PreCommitHook` instance.
*
* @param hookInfo
* The `HookInfo` object representing the hook to load.
* @return
* The loaded `PreCommitHook` instance.
*/
def loadHook(hookInfo: HookInfo): PreCommitHook = hookInfo match {
case HookInfo(_, clsFullName, argOpt) =>
val cls = Class.forName(clsFullName)
val instance =
if (argOpt.isDefined)
cls.getDeclaredConstructor(argOpt.get.getClass).newInstance(argOpt.get)
else cls.getDeclaredConstructor().newInstance()
instance.asInstanceOf[PreCommitHook]
}

}

/**
* A case class representing information about a hook.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait QbeastCoreContext[DATA, DataSchema, QbeastOptions, FileDescriptor] {
def metadataManager: MetadataManager[DataSchema, FileDescriptor, QbeastOptions]
def dataWriter: DataWriter[DATA, DataSchema, FileDescriptor]
def indexManager: IndexManager[DATA]
def stagingDataManagerBuilder: StagingDataManagerFactory[DATA, QbeastOptions]
def revisionBuilder: RevisionFactory[DataSchema, QbeastOptions]
def columnSelector: ColumnsToIndexSelector[DATA]
def keeper: Keeper
Expand Down
80 changes: 80 additions & 0 deletions src/main/scala/io/qbeast/core/model/StagingDataManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.core.model

import org.apache.spark.sql.DataFrame

/**
* Metadata Manager template
*
* @tparam DataSchema
* type of data schema
* @tparam FileDescriptor
* type of file descriptor
* @tparam QbeastOptions
* type of the Qbeast options
*/
trait StagingDataManager[DATA, QbeastOptions] {

/**
* Resolve write policy according to the current staging size and its desired value.
*
* @param data
* DataFrame to write.
* @return
* A StagingResolution instance containing the data to write, the staging RemoveFiles, and a
* boolean denoting whether the data to write is to be staged or indexed.
*/
def updateWithStagedData(data: DATA): StagingResolution

/**
* Stage the data without indexing by writing it in the desired format.
*
* @param data
* The data to stage.
* @param indexStatus
* The index status.
* @param options
* The options for staging.
* @param append
* Whether the operation appends data or overwrites.
*/
def stageData(
data: DATA,
indexStatus: IndexStatus,
options: QbeastOptions,
append: Boolean): Unit

}

trait StagingDataManagerFactory[DATA, QbeastOptions] {

/**
* Returns a IndexedTable for given SQLContext and path. It is not guaranteed that the returned
* table physically exists, use IndexedTable#exists attribute to verify it.
*
* @param tableId
* the table path
* @return
* the stagingmanager
*/
def getManager(tableId: QTableID): StagingDataManager[DATA, QbeastOptions]
}

case class StagingResolution(
dataToWrite: DataFrame,
removeFiles: Seq[IndexFile],
sendToStaging: Boolean)
19 changes: 16 additions & 3 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,33 @@ import org.apache.spark.sql.SparkSession
/**
* Spark+Delta implementation of the MetadataManager interface
*/
object DeltaMetadataManager extends MetadataManager[StructType, FileAction, QbeastOptions] {
object DeltaMetadataManager extends MetadataManager[StructType, IndexFile, QbeastOptions] {

override def updateWithTransaction(
tableID: QTableID,
schema: StructType,
options: QbeastOptions,
append: Boolean)(writer: => (TableChanges, IISeq[FileAction])): Unit = {
append: Boolean)(writer: => (TableChanges, IISeq[IndexFile])): Unit = {

val deltaLog = loadDeltaLog(tableID)
val mode = if (append) SaveMode.Append else SaveMode.Overwrite

val metadataWriter =
DeltaMetadataWriter(tableID, mode, deltaLog, options, schema)
metadataWriter.writeWithTransaction(writer)

val deltaWriter: (TableChanges, Seq[FileAction]) = {
val (tableChanges, indexFiles) = writer
val fileActions = indexFiles.map { indexFile =>
if (indexFile.remove) {
IndexFiles.toRemoveFile(dataChange = true)(indexFile)
} else {
IndexFiles.toAddFile(dataChange = false)(indexFile)
}
}
(tableChanges, fileActions)
}

metadataWriter.writeWithTransaction(deltaWriter)
}

override def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)(
Expand Down
17 changes: 10 additions & 7 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package io.qbeast.spark.delta

import io.qbeast.core.model.PreCommitHook
import io.qbeast.core.model.PreCommitHook.PreCommitHookOutput
import io.qbeast.core.model.QTableID
import io.qbeast.core.model.QbeastHookLoader
import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.TableChanges
import io.qbeast.spark.delta.hook.DeltaHookLoader
import io.qbeast.spark.delta.hook.DeltaPreCommitHook
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
Expand Down Expand Up @@ -100,7 +100,7 @@ private[delta] case class DeltaMetadataWriter(
statsTrackers
}

private val preCommitHooks = new ListBuffer[DeltaPreCommitHook]()
private val preCommitHooks = new ListBuffer[PreCommitHook]()

// Load the pre-commit hooks
loadPreCommitHooks().foreach(registerPreCommitHooks)
Expand All @@ -110,7 +110,7 @@ private[delta] case class DeltaMetadataWriter(
* @param preCommitHook
* the hook to register
*/
private def registerPreCommitHooks(preCommitHook: DeltaPreCommitHook): Unit = {
private def registerPreCommitHooks(preCommitHook: PreCommitHook): Unit = {
if (!preCommitHooks.contains(preCommitHook)) {
preCommitHooks.append(preCommitHook)
}
Expand All @@ -121,8 +121,8 @@ private[delta] case class DeltaMetadataWriter(
* @return
* the loaded hooks
*/
private def loadPreCommitHooks(): Seq[DeltaPreCommitHook] =
qbeastOptions.hookInfo.map(DeltaHookLoader.loadHook)
private def loadPreCommitHooks(): Seq[PreCommitHook] =
qbeastOptions.hookInfo.map(QbeastHookLoader.loadHook)

/**
* Executes all registered pre-commit hooks.
Expand Down Expand Up @@ -150,7 +150,10 @@ private[delta] case class DeltaMetadataWriter(
* A Map[String, String] representing the combined outputs of all hooks.
*/
private def runPreCommitHooks(actions: Seq[Action]): PreCommitHookOutput = {
preCommitHooks.foldLeft(Map.empty[String, String]) { (acc, hook) => acc ++ hook.run(actions) }
val qbeastActions = actions.map(IndexFiles.fromAction)
preCommitHooks.foldLeft(Map.empty[String, String]) { (acc, hook) =>
acc ++ hook.run(qbeastActions)
}
}

def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = {
Expand Down
Loading

0 comments on commit efbd61d

Please sign in to comment.