Skip to content

Commit

Permalink
Control ordering of glob-based sources
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed May 26, 2020
1 parent 784c4c3 commit 9eb6fe5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.16.0] - 2020-05-25
### Changed
- Improve sort order of glob-based file sources
- Spark engine will persist events ordered by event time

## [0.15.0] - 2020-05-09
### Added
- `purge` command now supports `--recursive` flag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.time.Instant

import dev.kamu.core.utils.Clock

// TODO: Refactor this to allow using modification time from source metadata
trait EventTimeSource {
def getEventTime(source: FileSystemSource): Option[Instant] =
getEventTimeDefault()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

package dev.kamu.cli.ingest.fetch

import dev.kamu.core.manifests.{CachingKind, EventTimeKind, FetchSourceKind}
import dev.kamu.core.manifests.{
CachingKind,
EventTimeKind,
FetchSourceKind,
OrderingKind
}
import dev.kamu.core.utils.Clock
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.LogManager
Expand All @@ -18,7 +23,7 @@ class SourceFactory(fileSystem: FileSystem, systemClock: Clock) {

def getSource(kind: FetchSourceKind): Seq[CacheableSource] = {
val eventTimeSource = kind.eventTime match {
case None | Some(_: EventTimeKind.FromSystemTime) =>
case None =>
new EventTimeSource.FromSystemTime(systemClock)
case Some(e: EventTimeKind.FromPath) =>
new EventTimeSource.FromPath(e.pattern, e.timestampFormat)
Expand Down Expand Up @@ -91,13 +96,27 @@ class SourceFactory(fileSystem: FileSystem, systemClock: Clock) {
eventTimeSource
)
)
.sortBy(eventTimeSource.getEventTime)

val orderBy = kind.orderBy.getOrElse(
if (kind.eventTime.isDefined) OrderingKind.ByMetadataEventTime
else OrderingKind.ByName
)

val sorted = orderBy match {
case OrderingKind.ByName =>
globbed.sortWith(
(lhs, rhs) =>
lhs.path.getName.compareToIgnoreCase(rhs.path.getName) < 0
)
case OrderingKind.ByMetadataEventTime =>
globbed.sortBy(eventTimeSource.getEventTime)
}

logger.debug(
s"Glob pattern resolved to: ${globbed.map(_.path.getName).mkString(", ")}"
s"Glob pattern resolved to: ${sorted.map(_.path.getName).mkString(", ")}"
)

globbed
sorted
}

def getCachingBehavior(kind: FetchSourceKind): CachingBehavior = {
Expand Down
2 changes: 1 addition & 1 deletion core.manifests

0 comments on commit 9eb6fe5

Please sign in to comment.