Skip to content

Commit

Permalink
Move meta chain from manifests
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed May 10, 2020
1 parent 78dcd71 commit a9f1032
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 49 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.15.0] - 2020-05-09
### Added
- `purge` command now supports `--recursive` flag
- Internal improvements and refactoring

## [0.14.0] - 2020-05-03
### Changed
- Consolidating more logic into `engine.spark`
Expand Down
5 changes: 5 additions & 0 deletions core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
descr = "Purge all datasets"
)

val recursive = opt[Boolean](
"recursive",
descr = "Also purge all known dependent datasets"
)

val ids = trailArg[List[String]](
"ids",
required = false,
Expand Down
3 changes: 2 additions & 1 deletion core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class Kamu(
new PurgeCommand(
metadataRepository,
c.purge.ids(),
c.purge.all()
c.purge.all(),
c.purge.recursive()
)
case List(c.delete) =>
new DeleteCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,50 @@

package dev.kamu.cli.commands

import dev.kamu.cli.metadata.{DoesNotExistException, MetadataRepository}
import dev.kamu.cli.UsageException
import dev.kamu.cli.metadata._
import dev.kamu.core.manifests.DatasetID
import org.apache.log4j.LogManager

class PurgeCommand(
metadataRepository: MetadataRepository,
ids: Seq[String],
all: Boolean
all: Boolean,
recursive: Boolean
) extends Command {
private val logger = LogManager.getLogger(getClass.getName)

override def run(): Unit = {
val toPurge =
if (all)
metadataRepository.getAllDatasets()
else
ids.map(DatasetID)

val numPurged = toPurge
.map(id => {
try {
logger.info(s"Purging dataset: ${id.toString}")
metadataRepository.purgeDataset(id)
1
} catch {
case e: DoesNotExistException =>
logger.error(e.getMessage)
0
}
})
.sum

logger.info(s"Purged $numPurged datasets")
val datasetIDs = {
if (all) metadataRepository.getAllDatasets()
else ids.map(DatasetID)
}

val plan = try {
metadataRepository
.getDatasetsInReverseDependencyOrder(
datasetIDs,
recursive || all // All implies recursive, which is more efficient
)
} catch {
case e: DoesNotExistException =>
throw new UsageException(e.getMessage)
}

val ghostSnapshots = plan.map(id => {
logger.info(s"Purging dataset: ${id.toString}")
val snapshot = metadataRepository.getMetadataChain(id).getSnapshot()
try {
metadataRepository.deleteDataset(id)
} catch {
case e: DanglingReferenceException =>
throw new UsageException(e.getMessage)
}
snapshot
})

ghostSnapshots.reverse.foreach(metadataRepository.addDataset)

logger.info(s"Purged ${ghostSnapshots.size} datasets")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package dev.kamu.cli.external

object DockerImages {
val SPARK = "kamudata/engine-spark:0.1.0"
val SPARK = "kamudata/engine-spark:0.2.0"
val LIVY = SPARK
val JUPYTER = "kamudata/jupyter-uber:0.0.1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import dev.kamu.cli.ingest.fetch.{
SourceFactory
}
import dev.kamu.cli.ingest.prep.{PrepCheckpoint, PrepStepFactory}
import dev.kamu.cli.metadata.MetadataRepository
import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository}
import dev.kamu.cli.transform.EngineFactory
import dev.kamu.core.manifests.infra.{IngestConfig, IngestTask, MetadataChainFS}
import dev.kamu.core.manifests.infra.{IngestConfig, IngestTask}
import dev.kamu.core.manifests.{
DatasetID,
DatasetVocabulary,
Expand Down Expand Up @@ -355,7 +355,7 @@ class IngestService(

def commitMetadata(
datasetID: DatasetID,
metaChain: MetadataChainFS,
metaChain: MetadataChain,
ingestResult: ExecutionResult[IngestCheckpoint]
): Unit = {
// TODO: Avoid loading blocks again
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2018 kamu.dev
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package dev.kamu.cli.metadata

import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.time.Instant

import dev.kamu.core.manifests._
import dev.kamu.core.manifests.parsing.pureconfig.yaml
import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._
import dev.kamu.core.utils.fs._
import org.apache.hadoop.fs.{FileSystem, Path}
import pureconfig.{ConfigReader, ConfigWriter, Derivation}
import pureconfig.generic.auto._

import scala.reflect.ClassTag

class MetadataChain(fileSystem: FileSystem, datasetDir: Path) {

def init(ds: DatasetSnapshot, systemTime: Instant): Unit = {
val initialBlock = MetadataBlock(
prevBlockHash = "",
systemTime = systemTime,
source = Some(ds.source)
).hashed()

val initialSummary = DatasetSummary(
id = ds.id,
kind = ds.kind,
datasetDependencies = ds.dependsOn.toSet,
vocabulary = ds.vocabulary,
lastPulled = None,
numRecords = 0,
dataSize = 0
)

try {
fileSystem.mkdirs(blocksDir)
saveResource(initialSummary, summaryPath)
saveResource(initialBlock, blocksDir.resolve(initialBlock.blockHash))
} catch {
case e: Exception =>
fileSystem.delete(datasetDir, true)
throw e
}
}

// TODO: add invariant validation
def append(_block: MetadataBlock): MetadataBlock = {
val block = _block.hashed()
saveResource(block, blocksDir.resolve(block.blockHash))
block
}

def getSummary(): DatasetSummary = {
loadResource[DatasetSummary](summaryPath)
}

def updateSummary(
update: DatasetSummary => DatasetSummary
): DatasetSummary = {
val newSummary = update(getSummary())
saveResource(newSummary, summaryPath)
newSummary
}

def getSnapshot(): DatasetSnapshot = {
val summary = getSummary()

val source = getBlocks().reverse
.flatMap(_.source)
.head

DatasetSnapshot(
id = summary.id,
source = source,
vocabulary = summary.vocabulary
)
}

/** Returns metadata blocks in historical order */
def getBlocks(): Vector[MetadataBlock] = {
val blocks = fileSystem
.listStatus(blocksDir)
.map(_.getPath)
.map(loadResource[MetadataBlock])
.map(b => (b.blockHash, b))
.toMap

val nextBlocks = blocks.values
.map(b => (b.prevBlockHash, b.blockHash))
.toMap

val blocksOrdered =
new scala.collection.immutable.VectorBuilder[MetadataBlock]()

var parentBlockHash = ""
while (nextBlocks.contains(parentBlockHash)) {
parentBlockHash = nextBlocks(parentBlockHash)
blocksOrdered += blocks(parentBlockHash)
}

blocksOrdered.result()
}

protected def summaryPath: Path = datasetDir.resolve("summary")

protected def blocksDir: Path = datasetDir.resolve("blocks")

/////////////////////////////////////////////////////////////////////////////
// Helpers
/////////////////////////////////////////////////////////////////////////////

protected def saveResource[T <: Resource: ClassTag](obj: T, path: Path)(
implicit derivation: Derivation[ConfigWriter[Manifest[T]]]
): Unit = {
val outputStream = fileSystem.create(path)
try {
yaml.save(Manifest(obj), outputStream)
} finally {
outputStream.close()
}
}

protected def loadResource[T <: Resource: ClassTag](path: Path)(
implicit derivation: Derivation[ConfigReader[Manifest[T]]]
): T = {
val inputStream = fileSystem.open(path)
try {
yaml.load[Manifest[T]](inputStream).content
} finally {
inputStream.close()
}
}

protected implicit class MetadataBlockEx(b: MetadataBlock) {
def hashed(): MetadataBlock = {
val digest = MessageDigest.getInstance("sha-256")
val repr = yaml.saveStr(b)

val blockHash = digest
.digest(repr.getBytes(StandardCharsets.UTF_8))
.map("%02x".format(_))
.mkString

b.copy(blockHash = blockHash)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import java.net.URI

import dev.kamu.cli.utility.DependencyGraph
import dev.kamu.cli._
import dev.kamu.core.manifests.infra.MetadataChainFS
import dev.kamu.core.manifests._
import dev.kamu.core.utils.Clock
import dev.kamu.core.manifests.parsing.pureconfig.yaml
Expand Down Expand Up @@ -83,8 +82,8 @@ class MetadataRepository(
)
}

def getMetadataChain(id: DatasetID): MetadataChainFS = {
new MetadataChainFS(fileSystem, getDatasetLayout(id).metadataDir)
def getMetadataChain(id: DatasetID): MetadataChain = {
new MetadataChain(fileSystem, getDatasetLayout(id).metadataDir)
}

def getDatasetKind(id: DatasetID): DatasetKind = {
Expand All @@ -98,10 +97,16 @@ class MetadataRepository(
def getDatasetSummary(id: DatasetID): DatasetSummary = {
ensureDatasetExistsAndPulled(id)

val chain = new MetadataChainFS(fileSystem, datasetMetadataDir(id))
val chain = new MetadataChain(fileSystem, datasetMetadataDir(id))
chain.getSummary()
}

def getDatasetVocabulary(id: DatasetID): DatasetVocabulary = {
getDatasetSummary(id).vocabulary
.getOrElse(DatasetVocabularyOverrides())
.asDatasetVocabulary()
}

def getDatasetRef(id: DatasetID): DatasetRef = {
if (!isRemote(id))
throw new RuntimeException(s"Dataset $id is not remote")
Expand Down Expand Up @@ -134,6 +139,30 @@ class MetadataRepository(
depGraph.resolve(ids.toList)
}

def getDatasetsInReverseDependencyOrder(
ids: Seq[DatasetID],
recursive: Boolean
): Seq[DatasetID] = {
val inverseDependencies =
getAllDatasets()
.flatMap(id => {
getDatasetDependencies(id).map(depID => (depID, id))
})
.groupBy(_._1)
.map { case (id, seq) => (id, seq.map(_._2).toList) }

def dependencyOf(id: DatasetID): List[DatasetID] = {
inverseDependencies.getOrElse(id, List.empty)
}

val depGraph = new DependencyGraph[DatasetID](dependencyOf)
val deps = depGraph.resolve(ids.toList)
if (recursive)
deps
else
deps.filter(ids.contains)
}

def getAllDatasets(): Seq[DatasetID] = {
fileSystem
.listStatus(workspaceLayout.metadataDir)
Expand Down Expand Up @@ -163,7 +192,7 @@ class MetadataRepository(
)
}

val chain = new MetadataChainFS(fileSystem, datasetDir)
val chain = new MetadataChain(fileSystem, datasetDir)
chain.init(ds, systemClock.instant())
}

Expand Down Expand Up @@ -201,13 +230,6 @@ class MetadataRepository(
fileSystem.delete(workspaceLayout.metadataDir.resolve(id.toString), true)
}

def purgeDataset(id: DatasetID): Unit = {
// TODO: Purging a dataset that is used by non-empty derivatives should raise an error
val snapshot = getMetadataChain(id).getSnapshot()
deleteDataset(id)
addDataset(snapshot)
}

////////////////////////////////////////////////////////////////////////////
// Volumes
////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit a9f1032

Please sign in to comment.