From ce84d7f98221901307baf01544a5c970422a2467 Mon Sep 17 00:00:00 2001 From: Michael Pollmeier Date: Thu, 25 Apr 2024 12:24:36 +0200 Subject: [PATCH] bring back and fix algorithms from odb incl. tests --- .../algorithm/DependencySequencer.scala | 36 +++++++ .../algorithm/LowestCommonAncestors.scala | 34 +++++++ .../flatgraph/algorithm/PathFinder.scala | 68 +++++++++++++ .../scala/flatgraph/algorithm/package.scala | 9 ++ .../algorithm/DependencySequencerTests.scala | 99 +++++++++++++++++++ .../LowestCommonAncestorsTests.scala | 81 +++++++++++++++ .../flatgraph/algorithm/PathFinderTests.scala | 76 ++++++++++++++ .../formats/neo4jcsv/Neo4jCsvTests.scala | 2 +- 8 files changed, 404 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/flatgraph/algorithm/DependencySequencer.scala create mode 100644 core/src/main/scala/flatgraph/algorithm/LowestCommonAncestors.scala create mode 100644 core/src/main/scala/flatgraph/algorithm/PathFinder.scala create mode 100644 core/src/main/scala/flatgraph/algorithm/package.scala create mode 100644 core/src/test/scala/flatgraph/algorithm/DependencySequencerTests.scala create mode 100644 core/src/test/scala/flatgraph/algorithm/LowestCommonAncestorsTests.scala create mode 100644 core/src/test/scala/flatgraph/algorithm/PathFinderTests.scala diff --git a/core/src/main/scala/flatgraph/algorithm/DependencySequencer.scala b/core/src/main/scala/flatgraph/algorithm/DependencySequencer.scala new file mode 100644 index 00000000..c6238e9a --- /dev/null +++ b/core/src/main/scala/flatgraph/algorithm/DependencySequencer.scala @@ -0,0 +1,36 @@ +package flatgraph.algorithm + +import scala.annotation.tailrec + +object DependencySequencer { + + /** Find the sequence of dependencies a set of nodes in a directed acyclic graph (DAG). Sample use case: concurrent task processing: given + * a set of tasks, determine which ones can be executed in parallel, and which ones need to run in sequence. + * + * @throws java.lang.AssertionError + * if given nodes have cyclic dependencies + * + * Algorithm: variant of Kahn's algorithm for topological sort 1) for given nodes, find all leaves, i.e. the those without parents (e.g. + * task dependencies) 2) disregard all that have already been visited and add to the results sequence 3) repeat for the remainder of + * nodes + * + * see https://en.wikipedia.org/wiki/Topological_sorting#Kahn%27s_algorithm + */ + def apply[A: GetParents](nodes: Set[A]): Seq[Set[A]] = { + apply0(nodes, Seq.empty, Set.empty) + } + + @tailrec + private def apply0[A: GetParents](nodes: Set[A], accumulator: Seq[Set[A]], visited: Set[A]): Seq[Set[A]] = { + if (nodes.size == 0) { + accumulator + } else { + val getParents = implicitly[GetParents[A]] + val leaves = nodes.filter(getParents(_).diff(visited).isEmpty) + val remainder = nodes.diff(leaves) + assert(remainder.size < nodes.size, s"given set of nodes is not a directed acyclic graph (DAG): ${nodes ++ accumulator.flatten}") + apply0(remainder, accumulator :+ leaves, visited ++ leaves) + } + } + +} diff --git a/core/src/main/scala/flatgraph/algorithm/LowestCommonAncestors.scala b/core/src/main/scala/flatgraph/algorithm/LowestCommonAncestors.scala new file mode 100644 index 00000000..82a0c773 --- /dev/null +++ b/core/src/main/scala/flatgraph/algorithm/LowestCommonAncestors.scala @@ -0,0 +1,34 @@ +package flatgraph.algorithm + +/** Find the lowest common ancestor(s) + * + * 1) for each relevant node, find their recursive parents 2) create the intersection of all of those sets 3) the LCA are those nodes, that + * do not have any children in that set + * + * based on https://www.baeldung.com/cs/lowest-common-ancestor-acyclic-graph + */ +object LowestCommonAncestors { + + def apply[A](nodes: Set[A])(parents: A => Set[A]): Set[A] = { + + def parentsRecursive(node: A, seen: Set[A] = Set.empty): Set[A] = { + val nodeParents = parents(node) -- seen + nodeParents ++ nodeParents.flatMap(node => parentsRecursive(node, seen ++ nodeParents)) + } + + if (nodes.size <= 1) { + nodes + } else { + val (head, tail) = (nodes.head, nodes.tail) + val parentsIntersection = tail.foldLeft(parentsRecursive(head)) { case (res, next) => + res.intersect(parentsRecursive(next)) + } + + parentsIntersection.filter { node => + val childCount = parentsIntersection.count(parentsRecursive(_).contains(node)) + childCount == 0 + } + } + } + +} diff --git a/core/src/main/scala/flatgraph/algorithm/PathFinder.scala b/core/src/main/scala/flatgraph/algorithm/PathFinder.scala new file mode 100644 index 00000000..ab7c2974 --- /dev/null +++ b/core/src/main/scala/flatgraph/algorithm/PathFinder.scala @@ -0,0 +1,68 @@ +package flatgraph.algorithm + +import flatgraph.Edge.Direction +import flatgraph.GNode +import flatgraph.traversal.Language.* + +object PathFinder { + def apply(nodeA: GNode, nodeB: GNode, maxDepth: Int = -1): Seq[Path] = { + if (nodeA == nodeB) Seq(Path(Seq(nodeA))) + else { + Iterator + .single(nodeA) + .enablePathTracking + .repeat(_.both) { initialBehaviour => + val behaviour = initialBehaviour.dedup // no cycles + .until(_.is(nodeB)) // don't continue on a given path if we've reached our destination + if (maxDepth > -1) behaviour.maxDepth(maxDepth) + else behaviour + } + .is(nodeB) // we only care about the paths that lead to our destination + .path + .cast[Seq[GNode]] + .map(Path.apply) + .toSeq + } + } + + case class Path(nodes: Seq[GNode]) { + def withEdges: PathWithEdges = { + val elements = Seq.newBuilder[PathEntry] + nodes.headOption.foreach { firstElement => + elements.addOne(NodeEntry(firstElement)) + } + + for { + case Seq(nodeA, nodeB) <- nodes.sliding(2) + edgesBetweenAsPathEntry: PathEntry = + edgesBetween(nodeA, nodeB) match { + case Nil => + throw new AssertionError(s"no edges between nodes $nodeA and $nodeB - this looks like a bug in PathFinder") + case Seq(edgeEntry) => edgeEntry + case multipleEdges => EdgeEntries(multipleEdges) + } + } { + elements.addOne(edgesBetweenAsPathEntry) + elements.addOne(NodeEntry(nodeB)) + } + + PathWithEdges(elements.result()) + } + } + + private def edgesBetween(nodeA: GNode, nodeB: GNode): Seq[EdgeEntry] = { + val outEdges = nodeA.outE.filter(_.dst == nodeB).map(edge => EdgeEntry(Direction.Outgoing, edge.label)) + val inEdges = nodeA.inE.filter(_.src == nodeB).map(edge => EdgeEntry(Direction.Incoming, edge.label)) + outEdges.to(Seq) ++ inEdges.to(Seq) + } + + case class PathWithEdges(elements: Seq[PathEntry]) + sealed trait PathEntry + case class NodeEntry(node: GNode) extends PathEntry { + def label: String = node.label() + def id: Long = node.id() + } + case class EdgeEntries(edgeEntries: Seq[EdgeEntry]) extends PathEntry + case class EdgeEntry(direction: Direction, label: String) extends PathEntry + +} diff --git a/core/src/main/scala/flatgraph/algorithm/package.scala b/core/src/main/scala/flatgraph/algorithm/package.scala new file mode 100644 index 00000000..f5e00cfa --- /dev/null +++ b/core/src/main/scala/flatgraph/algorithm/package.scala @@ -0,0 +1,9 @@ +package flatgraph + +package object algorithm { + + trait GetParents[A] { + def apply(a: A): Set[A] + } + +} diff --git a/core/src/test/scala/flatgraph/algorithm/DependencySequencerTests.scala b/core/src/test/scala/flatgraph/algorithm/DependencySequencerTests.scala new file mode 100644 index 00000000..03854a01 --- /dev/null +++ b/core/src/test/scala/flatgraph/algorithm/DependencySequencerTests.scala @@ -0,0 +1,99 @@ +package flatgraph.algorithm + +import org.scalatest.matchers.should.Matchers.* +import org.scalatest.wordspec.AnyWordSpec + +class DependencySequencerTests extends AnyWordSpec { + + "empty graph" in { + DependencySequencer(Set.empty[Node]) shouldBe Seq.empty + } + + "one node" in { + val A = new Node("A") + DependencySequencer(Set(A)) shouldBe Seq(Set(A)) + } + + "two independent nodes" in { + val A = new Node("A") + val B = new Node("B") + DependencySequencer(Set(A, B)) shouldBe Seq(Set(A, B)) + } + + "two nodes in sequence" in { + val A = new Node("A") + val B = new Node("B", Set(A)) + DependencySequencer(Set(A, B)) shouldBe Seq(Set(A), Set(B)) + } + + "sequence and parallelism - simple 1" in { + val A = new Node("A") + val B = new Node("B") + val C = new Node("C", Set(A, B)) + DependencySequencer(Set(A, B, C)) shouldBe Seq(Set(A, B), Set(C)) + } + + "sequence and parallelism - simple 2" in { + val A = new Node("A") + val B = new Node("B", Set(A)) + val C = new Node("C", Set(A)) + DependencySequencer(Set(A, B, C)) shouldBe Seq(Set(A), Set(B, C)) + } + + "throw error if it's not a DAG" in { + val A = new Node("A") + val B = new Node("B", Set(A)) + A.parents = Set(B) // cycle in dependencies, not a DAG any longer + assertThrows[AssertionError](DependencySequencer(Set(A, B))) + } + + "larger graph 1" in { + // format: off + /** \+-------------------+ + * \| v + * \+---+ +---+ +---+ +---+ + * \| A | --> | B | --> | C | --> | E | + * \+---+ +---+ +---+ +---+ + * \| ^ v | + * \+---+ | + * \| D | ----------------+ + * \+---+ + */ + // format: on + val A = new Node("A") + val B = new Node("B", Set(A)) + val C = new Node("C", Set(B)) + val D = new Node("D", Set(B)) + val E = new Node("E", Set(B, C, D)) + DependencySequencer(Set(A, B, C, D, E)) shouldBe Seq(Set(A), Set(B), Set(C, D), Set(E)) + } + + "larger graph 2" in { + // format: off + /** \+-----------------------------+ + * \| v + * \+---+ +---+ +---+ +---+ +---+ + * \| A | --> | B | --> | D | --> | E | --> | F | + * \+---+ +---+ +---+ +---+ +---+ + * \| ^ v | + * \+---+ | + * \| C | --------------------------+ + * \+---+ + */ + // format: on + val A = new Node("A") + val B = new Node("B", Set(A)) + val C = new Node("C", Set(B)) + val D = new Node("D", Set(B)) + val E = new Node("E", Set(D)) + val F = new Node("F", Set(B, C, E)) + DependencySequencer(Set(A, B, C, D, E, F)) shouldBe Seq(Set(A), Set(B), Set(C, D), Set(E), Set(F)) + // note: for task processing this isn't actually the optimal solution, + // because E will only start after [C|D] are finished... it wouldn't need to wait for C though... + } + + class Node(val name: String, var parents: Set[Node] = Set.empty) { + override def toString = name + } + implicit def getParents: GetParents[Node] = (node: Node) => node.parents +} diff --git a/core/src/test/scala/flatgraph/algorithm/LowestCommonAncestorsTests.scala b/core/src/test/scala/flatgraph/algorithm/LowestCommonAncestorsTests.scala new file mode 100644 index 00000000..db8f52a4 --- /dev/null +++ b/core/src/test/scala/flatgraph/algorithm/LowestCommonAncestorsTests.scala @@ -0,0 +1,81 @@ +package flatgraph.algorithm + +import org.scalatest.matchers.should.Matchers.* +import org.scalatest.wordspec.AnyWordSpec + +class LowestCommonAncestorsTests extends AnyWordSpec { + + /** +--------------+ + * | | + * | +---+ +---+ +---+ +---+ +---+ +---+ + * | | A | --> | C | --> | D | --> | | --> | H | --> | I | + * | +---+ +---+ +---+ | | +---+ +---+ + * | | | | | + * | | +---------------> | G | + * | v | | + * | +---+ | | +---+ + * | | B | ----------------------> | | --> | F | + * | +---+ +---+ +---+ + * | | + * | | + * | v + * | +---+ + * +> | E | + * +---+ + * + * created by `graph-easy --input=lca.eg`, where lca.eg: + * [A] --> [B],[C] + * [B] --> [E],[G] + * [C] --> [D],[E],[G] + * [D] --> [G] + * [G] --> [F],[H] + * [H] --> [I] + */ + + val A = new Node("A", Set.empty) + val B = new Node("B", Set(A)) + val C = new Node("C", Set(A)) + val D = new Node("D", Set(C)) + val E = new Node("E", Set(B, C)) + val G = new Node("G", Set(B, C, D)) + val F = new Node("F", Set(G)) + val H = new Node("H", Set(G)) + val I = new Node("I", Set(H)) + + "empty set" in { + val relevantNodes = Set.empty[Node] + LowestCommonAncestors(relevantNodes)(_.parents) shouldBe Set.empty + } + + "one node" in { + val relevantNodes = Set(D) + LowestCommonAncestors(relevantNodes)(_.parents) shouldBe relevantNodes + } + + "node E and H" in { + val relevantNodes = Set(E, H) + LowestCommonAncestors(relevantNodes)(_.parents) shouldBe Set(B, C) + } + + "node B,E,H" in { + val relevantNodes = Set(B, E, H) + LowestCommonAncestors(relevantNodes)(_.parents) shouldBe Set(A) + } + + "node A,B,E,H" in { + val relevantNodes = Set(A, B, E, H) + LowestCommonAncestors(relevantNodes)(_.parents) shouldBe Set.empty + } + + "cyclic dependencies" in { + val A = new Node("A", Set.empty) + val B = new Node("B", Set(A)) + A.parents = Set(B) // cycle in dependencies, not a DAG any longer + LowestCommonAncestors(Set(A, B))(_.parents) shouldBe Set.empty + } + + class Node(val name: String, var parents: Set[Node]) { + override def toString = name + } + implicit def getParents: GetParents[Node] = (node: Node) => node.parents +} diff --git a/core/src/test/scala/flatgraph/algorithm/PathFinderTests.scala b/core/src/test/scala/flatgraph/algorithm/PathFinderTests.scala new file mode 100644 index 00000000..0ed74be6 --- /dev/null +++ b/core/src/test/scala/flatgraph/algorithm/PathFinderTests.scala @@ -0,0 +1,76 @@ +package flatgraph.algorithm + +import flatgraph.traversal.testdomains.simple.ExampleGraphSetup +import PathFinder.* +import flatgraph.Edge.Direction +import flatgraph.traversal.testdomains.simple.SimpleDomain.Connection +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.matchers.should.Matchers.* + +class PathFinderTests extends AnyWordSpec with ExampleGraphSetup { + /* sample graph: + * L3 <- L2 <- L1 <- Center -> R1 -> R2 -> R3 -> R4 -> R5 + */ + "identity" in { + val path = PathFinder(center, center) + path shouldBe Seq(Path(Seq(center))) + + path.head.withEdges shouldBe PathWithEdges(Seq(NodeEntry(center))) + } + + "direct neighbors" in { + val path = PathFinder(center, r1) + path shouldBe Seq(Path(Seq(center, r1))) + + path.head.withEdges shouldBe PathWithEdges(Seq(NodeEntry(center), EdgeEntry(Direction.Outgoing, Connection.Label), NodeEntry(r1))) + } + + "longer path" in { + val path = PathFinder(l1, r1) + path shouldBe Seq(Path(Seq(l1, center, r1))) + + path.head.withEdges shouldBe PathWithEdges( + Seq( + NodeEntry(l1), + EdgeEntry(Direction.Incoming, Connection.Label), + NodeEntry(center), + EdgeEntry(Direction.Outgoing, Connection.Label), + NodeEntry(r1) + ) + ) + } + + "longest path" in { + val path = PathFinder(l3, r5) + path shouldBe Seq(Path(Seq(l3, l2, l1, center, r1, r2, r3, r4, r5))) + + path.head.withEdges shouldBe PathWithEdges( + Seq( + NodeEntry(l3), + EdgeEntry(Direction.Incoming, Connection.Label), + NodeEntry(l2), + EdgeEntry(Direction.Incoming, Connection.Label), + NodeEntry(l1), + EdgeEntry(Direction.Incoming, Connection.Label), + NodeEntry(center), + EdgeEntry(Direction.Outgoing, Connection.Label), + NodeEntry(r1), + EdgeEntry(Direction.Outgoing, Connection.Label), + NodeEntry(r2), + EdgeEntry(Direction.Outgoing, Connection.Label), + NodeEntry(r3), + EdgeEntry(Direction.Outgoing, Connection.Label), + NodeEntry(r4), + EdgeEntry(Direction.Outgoing, Connection.Label), + NodeEntry(r5) + ) + ) + } + + "max depth" in { + PathFinder(center, r3, maxDepth = 3) shouldBe Seq(Path(Vector(center, r1, r2, r3))) + + PathFinder(center, r3, maxDepth = 2) shouldBe Nil + } + +} diff --git a/formats/src/test/scala/flatgraph/formats/neo4jcsv/Neo4jCsvTests.scala b/formats/src/test/scala/flatgraph/formats/neo4jcsv/Neo4jCsvTests.scala index e6ebb62a..22d770e1 100644 --- a/formats/src/test/scala/flatgraph/formats/neo4jcsv/Neo4jCsvTests.scala +++ b/formats/src/test/scala/flatgraph/formats/neo4jcsv/Neo4jCsvTests.scala @@ -16,7 +16,7 @@ class Neo4jCsvTests extends AnyWordSpec { val neo4jcsvRoot = Paths.get(subprojectRoot, "src/test/resources/neo4jcsv") "foo" in { - pending // TODO + pending // TODO bring back: generate domain specific code with codegen from a schema } // "Importer" should {