Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema-conflicting node properties: only warn, don't error #195

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ lazy val tests = project
.settings(
name := "flatgraph-tests",
publish / skip := true,
libraryDependencies += "com.github.pathikrit" %% "better-files" % "3.9.2" % Test,
libraryDependencies ++= Seq(
"com.github.pathikrit" %% "better-files" % "3.9.2" % Test,
"org.scalamock" %% "scalamock" % "6.0.0" % Test
),
Test/compile := (Test/compile).dependsOn(testSchemas/generateDomainClassesForTestSchemas).value,
)

Expand Down
131 changes: 64 additions & 67 deletions core/src/main/scala/flatgraph/DiffGraphApplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ package flatgraph
import DiffGraphBuilder.*
import flatgraph.Edge.Direction
import flatgraph.Edge.Direction.{Incoming, Outgoing}
import flatgraph.misc.SchemaViolationReporter

import scala.collection.{Iterator, mutable}

object DiffGraphApplier {
def applyDiff(graph: Graph, diff: DiffGraphBuilder): Unit = {
def applyDiff(
graph: Graph,
diff: DiffGraphBuilder,
schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter
): Unit = {
if (graph.isClosed) throw new GraphClosedException(s"graph cannot be modified any longer since it's closed")
new DiffGraphApplier(graph, diff).applyUpdate()
new DiffGraphApplier(graph, diff, schemaViolationReporter).applyUpdate()
diff.buffer = null
}
}

/** The class that is responsible for applying diffgraphs. This is not supposed to be public API, users should stick to applyDiff
*/
private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder) {
private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, schemaViolationReporter: SchemaViolationReporter) {
val newNodes = new Array[mutable.ArrayBuffer[DNode]](graph.schema.getNumberOfNodeKinds)
// newEdges and delEdges are oversized, in order to permit usage of the same indexing function
val newEdges = new Array[mutable.ArrayBuffer[AddEdgeProcessed]](graph.neighbors.size)
Expand All @@ -37,16 +42,20 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder)

private def insertProperty0(node: GNode, propertyKind: Int, propertyValues: Iterator[Any]): Unit = {
val pos = graph.schema.propertyOffsetArrayIndex(node.nodeKind, propertyKind)
if (setNodeProperties(pos) == null)
setNodeProperties(pos) = mutable.ArrayBuffer.empty
val buf = setNodeProperties(pos)
val start = buf.size
propertyValues.iterator.foreach {
case dnode: DNode => buf.addOne(getGNode(dnode))
case other => buf.addOne(other)
if (0 > pos || pos >= setNodeProperties.length) {
schemaViolationReporter.illegalNodeProperty(node.nodeKind, propertyKind, graph.schema)
} else {
if (setNodeProperties(pos) == null)
setNodeProperties(pos) = mutable.ArrayBuffer.empty
val buf = setNodeProperties(pos)
val start = buf.size
propertyValues.iterator.foreach {
case dnode: DNode => buf.addOne(getGNode(dnode))
case other => buf.addOne(other)
}
val bound = new SetPropertyDesc(node, start, buf.size)
insert(setNodeProperties, bound, pos + 1)
}
val bound = new SetPropertyDesc(node, start, buf.size)
insert(setNodeProperties, bound, pos + 1)
}

private def insert[T](a: Array[mutable.ArrayBuffer[T]], item: T, pos: Int): Unit = {
Expand Down Expand Up @@ -176,7 +185,7 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder)
drainDeferred()
}

private def applyUpdate(): Unit = {
private[flatgraph] def applyUpdate(): Unit = {
splitUpdate()

// set edge properties
Expand Down Expand Up @@ -581,64 +590,52 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder)
dedupBy(setPropertyPositions, (setProp: SetPropertyDesc) => setProp.node.seq())
val nodeCount = graph.nodesArray(nodeKind).length

def throwSchemaViolationException() = {
val contextBuilder = Seq.newBuilder[String]
contextBuilder += s"nodeKind=$nodeKind,propertyKind=$propertyKind"
schema.getNodeLabelMaybe(nodeKind).foreach { nodeLabel =>
contextBuilder += s"nodeLabel=$nodeLabel"
val allowedPropertyNames = schema.getNodePropertyNames(nodeLabel).toSeq.sorted.mkString(",")
contextBuilder += s"allowedPropertyNames=[$allowedPropertyNames]"
}
schema.getPropertyLabelMaybe(nodeKind, propertyKind).foreach { propertyLabel =>
contextBuilder += s"propertyLabel=$propertyLabel"
}
val context = contextBuilder.result().mkString(",")
throw new SchemaViolationException(s"""Unsupported property on node. Context: $context""")
}

val setPropertyValues = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(propertyBuf.size)
if (setPropertyValues == null) throwSchemaViolationException()
copyToArray(propertyBuf, setPropertyValues)

val oldQty = Option(graph.properties(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1))
val oldProperty = Option(graph.properties(pos + 1))
.getOrElse(schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(0))
.asInstanceOf[Array[?]]
if (oldProperty == null) throwSchemaViolationException()

val newQty = new Array[Int](nodeCount + 1)
val newProperty = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(get(oldQty, nodeCount) + propertyBuf.size)

val insertionIter = setPropertyPositions.iterator
var copyStartSeq = 0
var outIndex = 0
while (copyStartSeq < nodeCount) {
val insertion = insertionIter.nextOption()
val insertionSeq = insertion.map(_.node.seq()).getOrElse(nodeCount)
val copyStartIndex = get(oldQty, copyStartSeq)
val copyEndIndex = get(oldQty, insertionSeq)
val offset = outIndex - copyStartIndex
System.arraycopy(oldProperty, copyStartIndex, newProperty, outIndex, copyEndIndex - copyStartIndex)
outIndex += copyEndIndex - copyStartIndex
assert(
newQty(copyStartSeq) == get(oldQty, copyStartSeq) + offset,
s"something went wrong while copying properties: newQty(copyStartSeq) was supposed to be ${get(oldQty, copyStartSeq) + offset} but instead was ${newQty(copyStartSeq)}"
)
for (idx <- Range(copyStartSeq + 1, insertionSeq + 1))
newQty(idx) = get(oldQty, idx) + offset
if (setPropertyValues == null) {
schemaViolationReporter.illegalNodeProperty(nodeKind, propertyKind, schema)
} else {
copyToArray(propertyBuf, setPropertyValues)

val oldQty = Option(graph.properties(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1))
val oldProperty = Option(graph.properties(pos + 1))
.getOrElse(schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(0))
.asInstanceOf[Array[?]]
if (oldProperty == null) schemaViolationReporter.illegalNodeProperty(nodeKind, propertyKind, schema)

val newQty = new Array[Int](nodeCount + 1)
val newProperty = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(get(oldQty, nodeCount) + propertyBuf.size)

val insertionIter = setPropertyPositions.iterator
var copyStartSeq = 0
var outIndex = 0
while (copyStartSeq < nodeCount) {
val insertion = insertionIter.nextOption()
val insertionSeq = insertion.map(_.node.seq()).getOrElse(nodeCount)
val copyStartIndex = get(oldQty, copyStartSeq)
val copyEndIndex = get(oldQty, insertionSeq)
val offset = outIndex - copyStartIndex
System.arraycopy(oldProperty, copyStartIndex, newProperty, outIndex, copyEndIndex - copyStartIndex)
outIndex += copyEndIndex - copyStartIndex
assert(
newQty(copyStartSeq) == get(oldQty, copyStartSeq) + offset,
s"something went wrong while copying properties: newQty(copyStartSeq) was supposed to be ${get(oldQty, copyStartSeq) + offset} but instead was ${newQty(copyStartSeq)}"
)
for (idx <- Range(copyStartSeq + 1, insertionSeq + 1))
newQty(idx) = get(oldQty, idx) + offset

insertion.foreach { insertion =>
System.arraycopy(setPropertyValues, insertion.start, newProperty, outIndex, insertion.length)
outIndex += insertion.length
newQty(insertionSeq + 1) = outIndex
insertion.foreach { insertion =>
System.arraycopy(setPropertyValues, insertion.start, newProperty, outIndex, insertion.length)
outIndex += insertion.length
newQty(insertionSeq + 1) = outIndex
}
copyStartSeq = insertionSeq + 1
}
copyStartSeq = insertionSeq + 1
}
newQty(nodeCount) = outIndex
newQty(nodeCount) = outIndex

graph.properties(pos) = newQty
// fixme: need to support graphs with unknown schema. Then we need to homogenize the array here.
graph.properties(pos + 1) = newProperty
graph.properties(pos) = newQty
// fixme: need to support graphs with unknown schema. Then we need to homogenize the array here.
graph.properties(pos + 1) = newProperty
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/flatgraph/DiffGraphBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package flatgraph

import DiffGraphBuilder.*
import flatgraph.misc.Conversions.toShortSafely
import flatgraph.misc.SchemaViolationReporter

import scala.collection.mutable

class DiffGraphBuilder(schema: Schema) {
var buffer = mutable.ArrayDeque[RawUpdate]()
class DiffGraphBuilder(schema: Schema, schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter) {
private[flatgraph] var buffer = mutable.ArrayDeque[RawUpdate]()

def addNode(newNode: DNode): this.type = {
this.buffer.append(newNode)
Expand Down Expand Up @@ -38,7 +40,8 @@ class DiffGraphBuilder(schema: Schema) {
def setNodeProperty(node: GNode, propertyName: String, property: Any): this.type = {
schema.getPropertyKindByName(propertyName) match {
case Schema.UndefinedKind =>
throw new SchemaViolationException(s"unknown property: `$propertyName`")
schemaViolationReporter.illegalNodeProperty(node.nodeKind, propertyName, schema)
this
case propertyKind =>
this._setNodeProperty(node, propertyKind, property)
}
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/flatgraph/misc/SchemaViolationReporter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package flatgraph.misc

import flatgraph.Schema
import org.slf4j.LoggerFactory

import scala.collection.mutable

/** Illegal usage of node properties often roots in deserialising an old storage format, so we don't want to error out but rather log a
* warning, and only do so once for each node/property combination.
*/
class SchemaViolationReporter {
private val logger = LoggerFactory.getLogger(getClass)
private val loggedSchemaViolations = mutable.Set.empty[(Int, String | Int)] // (NodeKind, PropertyLabel|PropertyKind)

def illegalNodeProperty(nodeKind: Int, propertyIdentifier: String | Int, schema: Schema): Unit = {
if (loggedSchemaViolations.contains((nodeKind, propertyIdentifier))) {
val contextBuilder = Seq.newBuilder[String]

contextBuilder += s"nodeKind=$nodeKind"
schema.getNodeLabelMaybe(nodeKind).foreach { nodeLabel =>
contextBuilder += s",nodeLabel=$nodeLabel"
}
propertyIdentifier match {
case name: String =>
contextBuilder += s",propertyName=$name"
case kind: Int =>
contextBuilder += s",propertyKind=$kind"
schema.getPropertyLabelMaybe(nodeKind, kind).foreach { name =>
contextBuilder += s",propertyName=$name"
}
}
val context = contextBuilder.result().mkString(",")
logger.warn(s"""Unsupported (deprecated?) property on node: $context""")
loggedSchemaViolations.addOne((nodeKind, propertyIdentifier))
}
}

}
3 changes: 3 additions & 0 deletions core/src/main/scala/flatgraph/traversal/Language.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,4 +600,7 @@ class NodeSteps[A <: GNode](traversal: Iterator[A]) extends AnyVal {

def property[@specialized ValueType](propertyKey: MultiPropertyKey[ValueType]): Iterator[ValueType] =
traversal.flatMap(_.property(propertyKey))

def propertiesMap: Iterator[java.util.Map[String, AnyRef]] =
traversal.map(_.propertiesMap)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package flatgraph.misc
package flatgraph

import flatgraph.*
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,53 +485,53 @@ class DomainClassesGenerator(schema: Schema) {
)
val nodeSource = {
s"""package $basePackage.nodes
|
|import $basePackage.Language.*
|import scala.collection.immutable.{IndexedSeq, ArraySeq}
|
|$erasedMarkerType
|
|$baseTrait {
| ${baseNodeProps.mkString("\n")}
| $propDictItemsSource
|}
|
|object ${nodeType.className} {
| val Label = "${nodeType.name}"
| object PropertyNames {
| $propertyNames
| }
| object PropertyKeys {
| $propertyKeys
| }
| object PropertyDefaults {
| $propertyDefaults
| }
|}
|
|$storedNode {
| ${storedNodeProps.mkString("\n")}
|
| override def productElementName(n: Int): String =
| n match {
| $productElementNames
| case _ => ""
| }
|
| override def productElement(n: Int): Any =
| n match {
| $productElementAccessors
| case _ => null
| }
|
| override def productPrefix = "${nodeType.className}"
| override def productArity = ${productElements.size}
|
| override def canEqual(that: Any): Boolean = that != null && that.isInstanceOf[${nodeType.className}]
|}
|
|$newNode
|""".stripMargin
|
|import $basePackage.Language.*
|import scala.collection.immutable.{IndexedSeq, ArraySeq}
|
|$erasedMarkerType
|
|$baseTrait {
| ${baseNodeProps.mkString("\n")}
| $propDictItemsSource
|}
|
|object ${nodeType.className} {
| val Label = "${nodeType.name}"
| object PropertyNames {
| $propertyNames
| }
| object PropertyKeys {
| $propertyKeys
| }
| object PropertyDefaults {
| $propertyDefaults
| }
|}
|
|$storedNode {
| ${storedNodeProps.mkString("\n")}
|
| override def productElementName(n: Int): String =
| n match {
| $productElementNames
| case _ => ""
| }
|
| override def productElement(n: Int): Any =
| n match {
| $productElementAccessors
| case _ => null
| }
|
| override def productPrefix = "${nodeType.className}"
| override def productArity = ${productElements.size}
|
| override def canEqual(that: Any): Boolean = that != null && that.isInstanceOf[${nodeType.className}]
|}
|
|$newNode
|""".stripMargin
}
os.write(nodesRootDir / s"${nodeType.className}.scala", nodeSource)
}
Expand Down Expand Up @@ -691,6 +691,24 @@ class DomainClassesGenerator(schema: Schema) {
}
os.write(outputDir0 / "GraphSchema.scala", schemaFile)

os.write(
outputDir0 / "PropertyErrorRegister.scala",
s"""package $basePackage
|
|object PropertyErrorRegister {
| private var errorMap = Set.empty[(Class[?], String)]
| private val logger = org.slf4j.LoggerFactory.getLogger(getClass)
|
| def logPropertyErrorIfFirst(clazz: Class[?], propertyName: String): Unit = {
| if (!errorMap.contains((clazz, propertyName))) {
| logger.warn("Property " + propertyName + " is deprecated for " + clazz.getName + ".")
| errorMap += ((clazz, propertyName))
| }
| }
|}
|""".stripMargin
)

// Accessors and traversals: start
// TODO extract into separate method
val accessorsForConcreteStoredNodes = mutable.ArrayBuffer.empty[String]
Expand Down
Loading
Loading