Skip to content

Commit

Permalink
schema-conflicting node properties: only warn, don't error (#195)
Browse files Browse the repository at this point in the history
* schema-conflicting node properties: only warn, don't error

Illegal usage of node properties often roots in deserialising an old storage format, so we
don't want to error out but rather log a warning, and only do so once for each node/property combination.

* fmt

* add one more test case

* fixup, cleanup
  • Loading branch information
mpollmeier authored May 6, 2024
1 parent f17ba4c commit 08009ce
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 134 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ lazy val tests = project
.settings(
name := "flatgraph-tests",
publish / skip := true,
libraryDependencies += "com.github.pathikrit" %% "better-files" % "3.9.2" % Test,
libraryDependencies ++= Seq(
"com.github.pathikrit" %% "better-files" % "3.9.2" % Test,
"org.scalamock" %% "scalamock" % "6.0.0" % Test
),
Test/compile := (Test/compile).dependsOn(testSchemas/generateDomainClassesForTestSchemas).value,
)

Expand Down
131 changes: 64 additions & 67 deletions core/src/main/scala/flatgraph/DiffGraphApplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ package flatgraph
import DiffGraphBuilder.*
import flatgraph.Edge.Direction
import flatgraph.Edge.Direction.{Incoming, Outgoing}
import flatgraph.misc.SchemaViolationReporter

import scala.collection.{Iterator, mutable}

object DiffGraphApplier {
def applyDiff(graph: Graph, diff: DiffGraphBuilder): Unit = {
def applyDiff(
graph: Graph,
diff: DiffGraphBuilder,
schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter
): Unit = {
if (graph.isClosed) throw new GraphClosedException(s"graph cannot be modified any longer since it's closed")
new DiffGraphApplier(graph, diff).applyUpdate()
new DiffGraphApplier(graph, diff, schemaViolationReporter).applyUpdate()
diff.buffer = null
}
}

/** The class that is responsible for applying diffgraphs. This is not supposed to be public API, users should stick to applyDiff
*/
private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder) {
private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, schemaViolationReporter: SchemaViolationReporter) {
val newNodes = new Array[mutable.ArrayBuffer[DNode]](graph.schema.getNumberOfNodeKinds)
// newEdges and delEdges are oversized, in order to permit usage of the same indexing function
val newEdges = new Array[mutable.ArrayBuffer[AddEdgeProcessed]](graph.neighbors.size)
Expand All @@ -37,16 +42,20 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder)

private def insertProperty0(node: GNode, propertyKind: Int, propertyValues: Iterator[Any]): Unit = {
val pos = graph.schema.propertyOffsetArrayIndex(node.nodeKind, propertyKind)
if (setNodeProperties(pos) == null)
setNodeProperties(pos) = mutable.ArrayBuffer.empty
val buf = setNodeProperties(pos)
val start = buf.size
propertyValues.iterator.foreach {
case dnode: DNode => buf.addOne(getGNode(dnode))
case other => buf.addOne(other)
if (0 > pos || pos >= setNodeProperties.length) {
schemaViolationReporter.illegalNodeProperty(node.nodeKind, propertyKind, graph.schema)
} else {
if (setNodeProperties(pos) == null)
setNodeProperties(pos) = mutable.ArrayBuffer.empty
val buf = setNodeProperties(pos)
val start = buf.size
propertyValues.iterator.foreach {
case dnode: DNode => buf.addOne(getGNode(dnode))
case other => buf.addOne(other)
}
val bound = new SetPropertyDesc(node, start, buf.size)
insert(setNodeProperties, bound, pos + 1)
}
val bound = new SetPropertyDesc(node, start, buf.size)
insert(setNodeProperties, bound, pos + 1)
}

private def insert[T](a: Array[mutable.ArrayBuffer[T]], item: T, pos: Int): Unit = {
Expand Down Expand Up @@ -176,7 +185,7 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder)
drainDeferred()
}

private def applyUpdate(): Unit = {
private[flatgraph] def applyUpdate(): Unit = {
splitUpdate()

// set edge properties
Expand Down Expand Up @@ -581,64 +590,52 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder)
dedupBy(setPropertyPositions, (setProp: SetPropertyDesc) => setProp.node.seq())
val nodeCount = graph.nodesArray(nodeKind).length

def throwSchemaViolationException() = {
val contextBuilder = Seq.newBuilder[String]
contextBuilder += s"nodeKind=$nodeKind,propertyKind=$propertyKind"
schema.getNodeLabelMaybe(nodeKind).foreach { nodeLabel =>
contextBuilder += s"nodeLabel=$nodeLabel"
val allowedPropertyNames = schema.getNodePropertyNames(nodeLabel).toSeq.sorted.mkString(",")
contextBuilder += s"allowedPropertyNames=[$allowedPropertyNames]"
}
schema.getPropertyLabelMaybe(nodeKind, propertyKind).foreach { propertyLabel =>
contextBuilder += s"propertyLabel=$propertyLabel"
}
val context = contextBuilder.result().mkString(",")
throw new SchemaViolationException(s"""Unsupported property on node. Context: $context""")
}

val setPropertyValues = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(propertyBuf.size)
if (setPropertyValues == null) throwSchemaViolationException()
copyToArray(propertyBuf, setPropertyValues)

val oldQty = Option(graph.properties(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1))
val oldProperty = Option(graph.properties(pos + 1))
.getOrElse(schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(0))
.asInstanceOf[Array[?]]
if (oldProperty == null) throwSchemaViolationException()

val newQty = new Array[Int](nodeCount + 1)
val newProperty = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(get(oldQty, nodeCount) + propertyBuf.size)

val insertionIter = setPropertyPositions.iterator
var copyStartSeq = 0
var outIndex = 0
while (copyStartSeq < nodeCount) {
val insertion = insertionIter.nextOption()
val insertionSeq = insertion.map(_.node.seq()).getOrElse(nodeCount)
val copyStartIndex = get(oldQty, copyStartSeq)
val copyEndIndex = get(oldQty, insertionSeq)
val offset = outIndex - copyStartIndex
System.arraycopy(oldProperty, copyStartIndex, newProperty, outIndex, copyEndIndex - copyStartIndex)
outIndex += copyEndIndex - copyStartIndex
assert(
newQty(copyStartSeq) == get(oldQty, copyStartSeq) + offset,
s"something went wrong while copying properties: newQty(copyStartSeq) was supposed to be ${get(oldQty, copyStartSeq) + offset} but instead was ${newQty(copyStartSeq)}"
)
for (idx <- Range(copyStartSeq + 1, insertionSeq + 1))
newQty(idx) = get(oldQty, idx) + offset
if (setPropertyValues == null) {
schemaViolationReporter.illegalNodeProperty(nodeKind, propertyKind, schema)
} else {
copyToArray(propertyBuf, setPropertyValues)

val oldQty = Option(graph.properties(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1))
val oldProperty = Option(graph.properties(pos + 1))
.getOrElse(schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(0))
.asInstanceOf[Array[?]]
if (oldProperty == null) schemaViolationReporter.illegalNodeProperty(nodeKind, propertyKind, schema)

val newQty = new Array[Int](nodeCount + 1)
val newProperty = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(get(oldQty, nodeCount) + propertyBuf.size)

val insertionIter = setPropertyPositions.iterator
var copyStartSeq = 0
var outIndex = 0
while (copyStartSeq < nodeCount) {
val insertion = insertionIter.nextOption()
val insertionSeq = insertion.map(_.node.seq()).getOrElse(nodeCount)
val copyStartIndex = get(oldQty, copyStartSeq)
val copyEndIndex = get(oldQty, insertionSeq)
val offset = outIndex - copyStartIndex
System.arraycopy(oldProperty, copyStartIndex, newProperty, outIndex, copyEndIndex - copyStartIndex)
outIndex += copyEndIndex - copyStartIndex
assert(
newQty(copyStartSeq) == get(oldQty, copyStartSeq) + offset,
s"something went wrong while copying properties: newQty(copyStartSeq) was supposed to be ${get(oldQty, copyStartSeq) + offset} but instead was ${newQty(copyStartSeq)}"
)
for (idx <- Range(copyStartSeq + 1, insertionSeq + 1))
newQty(idx) = get(oldQty, idx) + offset

insertion.foreach { insertion =>
System.arraycopy(setPropertyValues, insertion.start, newProperty, outIndex, insertion.length)
outIndex += insertion.length
newQty(insertionSeq + 1) = outIndex
insertion.foreach { insertion =>
System.arraycopy(setPropertyValues, insertion.start, newProperty, outIndex, insertion.length)
outIndex += insertion.length
newQty(insertionSeq + 1) = outIndex
}
copyStartSeq = insertionSeq + 1
}
copyStartSeq = insertionSeq + 1
}
newQty(nodeCount) = outIndex
newQty(nodeCount) = outIndex

graph.properties(pos) = newQty
// fixme: need to support graphs with unknown schema. Then we need to homogenize the array here.
graph.properties(pos + 1) = newProperty
graph.properties(pos) = newQty
// fixme: need to support graphs with unknown schema. Then we need to homogenize the array here.
graph.properties(pos + 1) = newProperty
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/flatgraph/DiffGraphBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package flatgraph

import DiffGraphBuilder.*
import flatgraph.misc.Conversions.toShortSafely
import flatgraph.misc.SchemaViolationReporter

import scala.collection.mutable

class DiffGraphBuilder(schema: Schema) {
var buffer = mutable.ArrayDeque[RawUpdate]()
class DiffGraphBuilder(schema: Schema, schemaViolationReporter: SchemaViolationReporter = new SchemaViolationReporter) {
private[flatgraph] var buffer = mutable.ArrayDeque[RawUpdate]()

def addNode(newNode: DNode): this.type = {
this.buffer.append(newNode)
Expand Down Expand Up @@ -38,7 +40,8 @@ class DiffGraphBuilder(schema: Schema) {
def setNodeProperty(node: GNode, propertyName: String, property: Any): this.type = {
schema.getPropertyKindByName(propertyName) match {
case Schema.UndefinedKind =>
throw new SchemaViolationException(s"unknown property: `$propertyName`")
schemaViolationReporter.illegalNodeProperty(node.nodeKind, propertyName, schema)
this
case propertyKind =>
this._setNodeProperty(node, propertyKind, property)
}
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/flatgraph/misc/SchemaViolationReporter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package flatgraph.misc

import flatgraph.Schema
import org.slf4j.LoggerFactory

import scala.collection.mutable

/** Illegal usage of node properties often roots in deserialising an old storage format, so we don't want to error out but rather log a
* warning, and only do so once for each node/property combination.
*/
class SchemaViolationReporter {
private val logger = LoggerFactory.getLogger(getClass)
private val loggedSchemaViolations = mutable.Set.empty[(Int, String | Int)] // (NodeKind, PropertyLabel|PropertyKind)

def illegalNodeProperty(nodeKind: Int, propertyIdentifier: String | Int, schema: Schema): Unit = {
if (loggedSchemaViolations.contains((nodeKind, propertyIdentifier))) {
val contextBuilder = Seq.newBuilder[String]

contextBuilder += s"nodeKind=$nodeKind"
schema.getNodeLabelMaybe(nodeKind).foreach { nodeLabel =>
contextBuilder += s",nodeLabel=$nodeLabel"
}
propertyIdentifier match {
case name: String =>
contextBuilder += s",propertyName=$name"
case kind: Int =>
contextBuilder += s",propertyKind=$kind"
schema.getPropertyLabelMaybe(nodeKind, kind).foreach { name =>
contextBuilder += s",propertyName=$name"
}
}
val context = contextBuilder.result().mkString(",")
logger.warn(s"""Unsupported (deprecated?) property on node: $context""")
loggedSchemaViolations.addOne((nodeKind, propertyIdentifier))
}
}

}
3 changes: 3 additions & 0 deletions core/src/main/scala/flatgraph/traversal/Language.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,4 +600,7 @@ class NodeSteps[A <: GNode](traversal: Iterator[A]) extends AnyVal {

def property[@specialized ValueType](propertyKey: MultiPropertyKey[ValueType]): Iterator[ValueType] =
traversal.flatMap(_.property(propertyKey))

def propertiesMap: Iterator[java.util.Map[String, AnyRef]] =
traversal.map(_.propertiesMap)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package flatgraph.misc
package flatgraph

import flatgraph.*
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,53 +485,53 @@ class DomainClassesGenerator(schema: Schema) {
)
val nodeSource = {
s"""package $basePackage.nodes
|
|import $basePackage.Language.*
|import scala.collection.immutable.{IndexedSeq, ArraySeq}
|
|$erasedMarkerType
|
|$baseTrait {
| ${baseNodeProps.mkString("\n")}
| $propDictItemsSource
|}
|
|object ${nodeType.className} {
| val Label = "${nodeType.name}"
| object PropertyNames {
| $propertyNames
| }
| object PropertyKeys {
| $propertyKeys
| }
| object PropertyDefaults {
| $propertyDefaults
| }
|}
|
|$storedNode {
| ${storedNodeProps.mkString("\n")}
|
| override def productElementName(n: Int): String =
| n match {
| $productElementNames
| case _ => ""
| }
|
| override def productElement(n: Int): Any =
| n match {
| $productElementAccessors
| case _ => null
| }
|
| override def productPrefix = "${nodeType.className}"
| override def productArity = ${productElements.size}
|
| override def canEqual(that: Any): Boolean = that != null && that.isInstanceOf[${nodeType.className}]
|}
|
|$newNode
|""".stripMargin
|
|import $basePackage.Language.*
|import scala.collection.immutable.{IndexedSeq, ArraySeq}
|
|$erasedMarkerType
|
|$baseTrait {
| ${baseNodeProps.mkString("\n")}
| $propDictItemsSource
|}
|
|object ${nodeType.className} {
| val Label = "${nodeType.name}"
| object PropertyNames {
| $propertyNames
| }
| object PropertyKeys {
| $propertyKeys
| }
| object PropertyDefaults {
| $propertyDefaults
| }
|}
|
|$storedNode {
| ${storedNodeProps.mkString("\n")}
|
| override def productElementName(n: Int): String =
| n match {
| $productElementNames
| case _ => ""
| }
|
| override def productElement(n: Int): Any =
| n match {
| $productElementAccessors
| case _ => null
| }
|
| override def productPrefix = "${nodeType.className}"
| override def productArity = ${productElements.size}
|
| override def canEqual(that: Any): Boolean = that != null && that.isInstanceOf[${nodeType.className}]
|}
|
|$newNode
|""".stripMargin
}
os.write(nodesRootDir / s"${nodeType.className}.scala", nodeSource)
}
Expand Down Expand Up @@ -691,6 +691,24 @@ class DomainClassesGenerator(schema: Schema) {
}
os.write(outputDir0 / "GraphSchema.scala", schemaFile)

os.write(
outputDir0 / "PropertyErrorRegister.scala",
s"""package $basePackage
|
|object PropertyErrorRegister {
| private var errorMap = Set.empty[(Class[?], String)]
| private val logger = org.slf4j.LoggerFactory.getLogger(getClass)
|
| def logPropertyErrorIfFirst(clazz: Class[?], propertyName: String): Unit = {
| if (!errorMap.contains((clazz, propertyName))) {
| logger.warn("Property " + propertyName + " is deprecated for " + clazz.getName + ".")
| errorMap += ((clazz, propertyName))
| }
| }
|}
|""".stripMargin
)

// Accessors and traversals: start
// TODO extract into separate method
val accessorsForConcreteStoredNodes = mutable.ArrayBuffer.empty[String]
Expand Down
Loading

0 comments on commit 08009ce

Please sign in to comment.