Skip to content

Commit

Permalink
Abstract PreCommitHook
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Sep 16, 2024
1 parent 746a307 commit a384bb5
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta.hook
package io.qbeast.core.model

import io.qbeast.spark.delta.hook.PreCommitHook.getHookArgName
import io.qbeast.spark.delta.hook.PreCommitHook.getHookName
import io.qbeast.spark.delta.hook.PreCommitHook.PreCommitHookOutput
import org.apache.spark.sql.delta.actions.Action
import PreCommitHook.getHookArgName
import PreCommitHook.getHookName
import PreCommitHook.PreCommitHookOutput

/**
* A trait representing a pre-commit hook.
*
* Pre-commit hooks are executed before a commit is made to the table. They can be used to perform
* actions such as validation, logging, or other custom logic.
*/
trait PreCommitHook {
trait PreCommitHook[T] {

/**
* The name of the hook.
Expand All @@ -46,7 +45,7 @@ trait PreCommitHook {
* @return
* The output of the hook as a `PreCommitHookOutput`.
*/
def run(actions: Seq[Action]): PreCommitHookOutput
def run(actions: Seq[T]): PreCommitHookOutput

}

Expand All @@ -73,33 +72,6 @@ object PreCommitHook {

}

/**
* A loader for PreCommitHooks
*/
object QbeastHookLoader {

/**
* Loads a pre-commit hook from a `HookInfo` object.
*
* This method takes a `HookInfo` object and returns a `PreCommitHook` instance.
*
* @param hookInfo
* The `HookInfo` object representing the hook to load.
* @return
* The loaded `PreCommitHook` instance.
*/
def loadHook(hookInfo: HookInfo): PreCommitHook = hookInfo match {
case HookInfo(_, clsFullName, argOpt) =>
val cls = Class.forName(clsFullName)
val instance =
if (argOpt.isDefined)
cls.getDeclaredConstructor(argOpt.get.getClass).newInstance(argOpt.get)
else cls.getDeclaredConstructor().newInstance()
instance.asInstanceOf[PreCommitHook]
}

}

/**
* A case class representing information about a hook.
*
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package io.qbeast.spark.delta

import io.qbeast.core.model.PreCommitHook.PreCommitHookOutput
import io.qbeast.core.model.QTableID
import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.TableChanges
import io.qbeast.spark.delta.hook.PreCommitHook
import io.qbeast.spark.delta.hook.PreCommitHook.PreCommitHookOutput
import io.qbeast.spark.delta.hook.QbeastHookLoader
import io.qbeast.spark.delta.hook.DeltaHookLoader
import io.qbeast.spark.delta.hook.DeltaPreCommitHook
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
Expand Down Expand Up @@ -100,7 +100,7 @@ private[delta] case class DeltaMetadataWriter(
statsTrackers
}

private val preCommitHooks = new ListBuffer[PreCommitHook]()
private val preCommitHooks = new ListBuffer[DeltaPreCommitHook]()

// Load the pre-commit hooks
loadPreCommitHooks().foreach(registerPreCommitHooks)
Expand All @@ -110,7 +110,7 @@ private[delta] case class DeltaMetadataWriter(
* @param preCommitHook
* the hook to register
*/
private def registerPreCommitHooks(preCommitHook: PreCommitHook): Unit = {
private def registerPreCommitHooks(preCommitHook: DeltaPreCommitHook): Unit = {
if (!preCommitHooks.contains(preCommitHook)) {
preCommitHooks.append(preCommitHook)
}
Expand All @@ -121,8 +121,8 @@ private[delta] case class DeltaMetadataWriter(
* @return
* the loaded hooks
*/
private def loadPreCommitHooks(): Seq[PreCommitHook] =
qbeastOptions.hookInfo.map(QbeastHookLoader.loadHook)
private def loadPreCommitHooks(): Seq[DeltaPreCommitHook] =
qbeastOptions.hookInfo.map(DeltaHookLoader.loadHook)

/**
* Executes all registered pre-commit hooks.
Expand Down
49 changes: 49 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/hook/DeltaPreCommitHook.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta.hook

import io.qbeast.core.model.HookInfo
import io.qbeast.core.model.PreCommitHook
import org.apache.spark.sql.delta.actions.Action

trait DeltaPreCommitHook extends PreCommitHook[Action]

/**
* A loader for PreCommitHooks
*/
object DeltaHookLoader {

/**
* Loads a pre-commit hook from a `HookInfo` object.
*
* This method takes a `HookInfo` object and returns a `PreCommitHook` instance.
*
* @param hookInfo
* The `HookInfo` object representing the hook to load.
* @return
* The loaded `PreCommitHook` instance.
*/
def loadHook(hookInfo: HookInfo): DeltaPreCommitHook = hookInfo match {
case HookInfo(_, clsFullName, argOpt) =>
val cls = Class.forName(clsFullName)
val instance =
if (argOpt.isDefined)
cls.getDeclaredConstructor(argOpt.get.getClass).newInstance(argOpt.get)
else cls.getDeclaredConstructor().newInstance()
instance.asInstanceOf[DeltaPreCommitHook]
}

}
8 changes: 3 additions & 5 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package io.qbeast.spark.internal

import io.qbeast.core.model.ColumnToIndex
import io.qbeast.core.model.HookInfo
import io.qbeast.core.model.PreCommitHook.getHookArgName
import io.qbeast.core.model.PreCommitHook.PRE_COMMIT_HOOKS_PREFIX
import io.qbeast.core.model.QTableID
import io.qbeast.spark.delta.hook.HookInfo
import io.qbeast.spark.delta.hook.PreCommitHook.getHookArgName
import io.qbeast.spark.delta.hook.PreCommitHook.PRE_COMMIT_HOOKS_PREFIX
import io.qbeast.spark.index.ColumnsToIndex
import io.qbeast.spark.internal.QbeastOptions.COLUMNS_TO_INDEX
import io.qbeast.spark.internal.QbeastOptions.CUBE_SIZE
Expand All @@ -37,8 +37,6 @@ import scala.util.matching.Regex
*
* @param columnsToIndex
* A sequence of column names to index.
* @param columnsToIndexDecoded
* A sequence of ColumnToIndex objects representing the columns to index.
* @param cubeSize
* The number of desired elements per cube.
* @param stats
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package io.qbeast.spark.delta.hook

import io.qbeast.context.QbeastContext
import io.qbeast.core.model.PreCommitHook.PRE_COMMIT_HOOKS_PREFIX
import io.qbeast.core.model.PreCommitHook.PreCommitHookOutput
import io.qbeast.core.model.QTableID
import io.qbeast.spark.delta.hook.PreCommitHook.PRE_COMMIT_HOOKS_PREFIX
import io.qbeast.spark.delta.hook.PreCommitHook.PreCommitHookOutput
import io.qbeast.spark.QbeastIntegrationTestSpec
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.actions.CommitInfo
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.DeltaLog

private class SimpleHook(kv: String) extends PreCommitHook {
private class SimpleHook(kv: String) extends DeltaPreCommitHook {
override val name: String = "SimpleHook"

override def run(args: Seq[Action]): PreCommitHookOutput = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package io.qbeast.spark.delta.hook

import io.qbeast.spark.delta.hook.PreCommitHook.PreCommitHookOutput
import io.qbeast.core.model.HookInfo
import io.qbeast.core.model.PreCommitHook.PreCommitHookOutput
import io.qbeast.spark.delta.hook.StatefulTestHook.StatefulTestHookState
import io.qbeast.spark.internal.QbeastOptions
import org.apache.spark.sql.delta.actions.Action
Expand All @@ -25,7 +26,7 @@ import org.scalatest.matchers.should.Matchers

import java.util.UUID

private class SimpleTestHook extends PreCommitHook {
private class SimpleTestHook extends DeltaPreCommitHook {
override val name: String = "SimpleTestHook"

var args: Seq[Action] = Seq.empty
Expand All @@ -37,7 +38,7 @@ private class SimpleTestHook extends PreCommitHook {

}

private class StatefulTestHook(val stateId: String) extends PreCommitHook {
private class StatefulTestHook(val stateId: String) extends DeltaPreCommitHook {

val state: StatefulTestHookState = StatefulTestHook.stateMap(stateId)

Expand Down Expand Up @@ -73,9 +74,9 @@ class QbeastHookLoaderTest extends AnyFlatSpec with Matchers {
when(qbeastOptions.hookInfo).thenReturn(
HookInfo("", classOf[SimpleTestHook].getCanonicalName, None) :: Nil)

val hookOpts = qbeastOptions.hookInfo.map(QbeastHookLoader.loadHook)
val hookOpts = qbeastOptions.hookInfo.map(DeltaHookLoader.loadHook)
hookOpts.size shouldBe 1
hookOpts.head shouldBe a[PreCommitHook]
hookOpts.head shouldBe a[DeltaPreCommitHook]

val mockActions = mock(classOf[List[Action]])
hookOpts.head.run(mockActions)
Expand All @@ -90,9 +91,9 @@ class QbeastHookLoaderTest extends AnyFlatSpec with Matchers {
when(qbeastOptions.hookInfo).thenReturn(
HookInfo("", classOf[StatefulTestHook].getCanonicalName, Some(argument)) :: Nil)

val hooks = qbeastOptions.hookInfo.map(QbeastHookLoader.loadHook)
val hooks = qbeastOptions.hookInfo.map(DeltaHookLoader.loadHook)
hooks.size shouldBe 1
hooks.head shouldBe a[PreCommitHook]
hooks.head shouldBe a[DeltaPreCommitHook]

val mockActions = mock(classOf[List[Action]])
hooks.head.run(mockActions)
Expand All @@ -106,7 +107,7 @@ class QbeastHookLoaderTest extends AnyFlatSpec with Matchers {
val qbeastOptions = mock(classOf[QbeastOptions])
when(qbeastOptions.hookInfo).thenReturn(Nil)

val hookOpts = qbeastOptions.hookInfo.map(QbeastHookLoader.loadHook)
val hookOpts = qbeastOptions.hookInfo.map(DeltaHookLoader.loadHook)
hookOpts.size shouldBe 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.qbeast.spark.internal.sources

import io.qbeast.spark.delta.hook.HookInfo
import io.qbeast.spark.delta.hook.PreCommitHook.PRE_COMMIT_HOOKS_PREFIX
import io.qbeast.core.model.HookInfo
import io.qbeast.core.model.PreCommitHook.PRE_COMMIT_HOOKS_PREFIX
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.QbeastIntegrationTestSpec
import org.apache.spark.qbeast.config
Expand Down

0 comments on commit a384bb5

Please sign in to comment.