Skip to content

Commit

Permalink
feat(reasoner): add lube-api.
Browse files Browse the repository at this point in the history
  • Loading branch information
fishjoy committed Dec 12, 2023
1 parent 22acb2c commit 7bcbcbc
Show file tree
Hide file tree
Showing 22 changed files with 152 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ package com.antgroup.openspg.reasoner.lube.block
import scala.collection.mutable.ListBuffer

import com.antgroup.openspg.reasoner.lube.common.expr.Aggregator
import com.antgroup.openspg.reasoner.lube.common.graph.{IRField, IRGraph}
import com.antgroup.openspg.reasoner.lube.common.graph.IRField

final case class AggregationBlock(
dependencies: List[Block],
aggregations: Aggregations,
group: List[String],
graph: IRGraph)
group: List[String])
extends BasicBlock[Fields](BlockType("aggregation")) {

override def binds: Fields = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

package com.antgroup.openspg.reasoner.lube.block

import com.antgroup.openspg.reasoner.lube.common.graph.IRGraph

abstract class BasicBlock[B <: Binds](override val blockType: BlockType) extends Block {
override def binds: B

override def graph: IRGraph = dependencies.head.graph
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package com.antgroup.openspg.reasoner.lube.block

import com.antgroup.openspg.reasoner.lube.common.graph.IRGraph
import com.antgroup.openspg.reasoner.lube.common.rule.Rule

/**
Expand All @@ -23,7 +22,7 @@ import com.antgroup.openspg.reasoner.lube.common.rule.Rule
* @param rules
* @param graph
*/
final case class FilterBlock(dependencies: List[Block], rules: Rule, graph: IRGraph)
final case class FilterBlock(dependencies: List[Block], rules: Rule)
extends BasicBlock[Binds](BlockType("filter")) {

override def binds: Binds = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@

package com.antgroup.openspg.reasoner.lube.block

import scala.collection.mutable

import com.antgroup.openspg.reasoner.lube.common.graph.{IRArray, IREdge, IRGraph, IRNode, IRPath}
import com.antgroup.openspg.reasoner.lube.common.pattern.{
Connection,
GraphPath,
VariablePatternConnection
}
import com.antgroup.openspg.reasoner.lube.common.graph.{IREdge, IRNode, IRPath, IRRepeatPath}
import com.antgroup.openspg.reasoner.lube.common.pattern.{Connection, GraphPath, VariablePatternConnection}

/**
* parse from "GraphStructure" block to match path
Expand All @@ -31,34 +25,34 @@ import com.antgroup.openspg.reasoner.lube.common.pattern.{
*/
final case class MatchBlock(
dependencies: List[Block],
patterns: Map[String, GraphPath],
graph: IRGraph)
patterns: Map[String, GraphPath])
extends BasicBlock[Binds](BlockType("match")) {

override def binds: Binds = {
val props = patterns.values.head.graphPattern.properties
val nodes = patterns.values
.flatMap(path =>
path.graphPattern.nodes.map(node => IRNode(node._1, new mutable.HashSet[String]())))
path.graphPattern.nodes.map(node => IRNode(node._1, props(node._1))))
.toList
val edges = patterns.values
.flatMap(path =>
path.graphPattern.edges
.map(pair => pair._2.map(rel => edgeToIRField(rel)).toList)
.map(pair => pair._2.map(rel => edgeToIRField(rel, props)).toList)
.flatten)
.toList
Fields(nodes.++(edges))
}

private def edgeToIRField(edge: Connection) = {
private def edgeToIRField(edge: Connection, props: Map[String, Set[String]]) = {
edge match {
case connection: VariablePatternConnection =>
val start = IRNode(connection.source, new mutable.HashSet[String]())
val end = IRNode(connection.target, new mutable.HashSet[String]())
val irEdge = IREdge(connection.alias, new mutable.HashSet[String]())
val start = IRNode(connection.source, props(connection.source))
val end = IRNode(connection.target, props(connection.target))
val irEdge = IREdge(connection.alias, props(connection.alias))
val path = IRPath(connection.alias, List.apply(start, irEdge, end))
IRArray(path)
IRRepeatPath(path, connection.lower, connection.upper)
case _ =>
IREdge(edge.alias, new mutable.HashSet[String]())
IREdge(edge.alias, props(edge.alias))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ final case class OrderAndSliceBlock(
dependencies: List[Block],
orderBy: Seq[SortItem],
limit: Option[Int],
group: List[String],
graph: IRGraph)
group: List[String])
extends BasicBlock[Binds](BlockType("order-and-slice")) {
override def binds: Binds = dependencies.head.binds
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.antgroup.openspg.reasoner.lube.block

import scala.collection.mutable.ListBuffer

import com.antgroup.openspg.reasoner.lube.common.graph.{IRField, IRGraph, IRVariable}
import com.antgroup.openspg.reasoner.lube.common.graph.IRField
import com.antgroup.openspg.reasoner.lube.common.rule.Rule

/**
Expand All @@ -26,8 +26,7 @@ import com.antgroup.openspg.reasoner.lube.common.rule.Rule
*/
final case class ProjectBlock(
dependencies: List[Block],
projects: ProjectFields = ProjectFields(),
graph: IRGraph)
projects: ProjectFields = ProjectFields())
extends BasicBlock[Fields](BlockType("project")) {

override def binds: Fields = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@

package com.antgroup.openspg.reasoner.lube.block

import scala.collection.mutable.ListBuffer

import com.antgroup.openspg.reasoner.common.exception.UnsupportedOperationException
import com.antgroup.openspg.reasoner.common.types.KgType
import com.antgroup.openspg.reasoner.lube.common.expr.Expr
import com.antgroup.openspg.reasoner.lube.common.graph._
Expand All @@ -24,7 +21,7 @@ import com.antgroup.openspg.reasoner.lube.common.pattern.{Element, PatternElemen
/**
* every operator block tree of root is result block
*/
sealed trait ResultBlock extends Block {}
abstract class ResultBlock[B <: Binds] extends BasicBlock[B](BlockType("result"))

/**
* output as table
Expand All @@ -35,16 +32,15 @@ sealed trait ResultBlock extends Block {}
final case class TableResultBlock(
dependencies: List[Block],
selectList: OrderedFields,
asList: List[String],
graph: IRGraph)
extends ResultBlock {
asList: List[String])
extends ResultBlock[OrderedFields] {

/**
* The metadata output by the current block
*
* @return
*/
override def binds: Binds = selectList
override def binds: OrderedFields = selectList
}

/**
Expand All @@ -53,11 +49,8 @@ final case class TableResultBlock(
* @param outputGraphPath the path name array for output
* @param graph
*/
final case class GraphResultBlock(
dependencies: List[Block],
outputGraphPath: List[String],
graph: IRGraph)
extends ResultBlock {
final case class GraphResultBlock(dependencies: List[Block], outputGraphPath: List[String])
extends ResultBlock[Binds] {
override val binds: Binds = dependencies.head.binds
}

Expand Down Expand Up @@ -94,34 +87,14 @@ case class AddPredicate(predicate: PredicateElement) extends DDLOp
* @param dependencies
* @param graph
*/
case class DDLBlock(ddlOp: Set[DDLOp], dependencies: List[Block], graph: IRGraph)
extends ResultBlock {
case class DDLBlock(ddlOp: Set[DDLOp], dependencies: List[Block]) extends ResultBlock[Fields] {

/**
* The metadata output by the current block
*
* @return
*/
override def binds: Binds = {
val fields = dependencies.head.binds.fields
ddlOp.head match {
case AddProperty(s, propertyType, _) =>
val field = fields.find(f => f.name.equals(s.alias)).get
if (field.isInstanceOf[IRNode]) {
field.asInstanceOf[IRNode].fields.add(propertyType)
} else if (field.isInstanceOf[IREdge]) {
field.asInstanceOf[IREdge].fields.add(propertyType)
}
Fields(fields)
case AddPredicate(predicate) =>
val newFields = new ListBuffer[IRField]
newFields.++=(dependencies.head.binds.fields)
newFields.+=(IREdge(predicate.alias, null))
Fields(newFields.toList)
case other =>
throw UnsupportedOperationException(s"$other ddlop unsupported")
}
}
override def binds: Fields = Fields.empty

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ package com.antgroup.openspg.reasoner.lube.block

import com.antgroup.openspg.reasoner.lube.common.graph.IRGraph

case class SourceBlock(graph: IRGraph) extends BasicBlock[Binds](BlockType("source")) {
case class SourceBlock(override val graph: IRGraph)
extends BasicBlock[Binds](BlockType("source")) {
override val dependencies: List[Block] = List.empty
override val binds: Binds = Binds.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import org.json4s.ext.EnumNameSerializer
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write

class PropertyGraphSchema(
val nodes: mutable.Map[String, Node],
val edges: mutable.Map[SPO, Edge]) extends Serializable {
class PropertyGraphSchema(val nodes: mutable.Map[String, Node], val edges: mutable.Map[SPO, Edge])
extends Serializable {

def getNodeField(nodeTypes: Set[String], fieldName: String): Field = {
for (nodeType <- nodeTypes) {
Expand All @@ -52,6 +51,14 @@ class PropertyGraphSchema(
}
}

def addNode(
typeName: String,
nodeType: NodeType.Value,
properties: Set[Field],
resolved: Boolean): Unit = {
nodes.put(typeName, Node(typeName, nodeType, properties, resolved))
}

def addVertexField(nodeType: String, field: Field): Unit = {
val node = nodes.get(nodeType)
if (!node.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import scala.collection.mutable

import com.antgroup.openspg.reasoner.common.exception.NotDefineException
import com.antgroup.openspg.reasoner.common.graph.edge.{Direction, SPO}
import com.antgroup.openspg.reasoner.common.types.KgType
import com.antgroup.openspg.reasoner.lube.catalog.struct.{Edge, Field, Node}
import com.antgroup.openspg.reasoner.common.types.{KgType, KTString}
import com.antgroup.openspg.reasoner.lube.catalog.struct.{Edge, Field, Node, NodeType}

/**
* A graph defined by Property Graph Model with enhanced semantics.
Expand Down Expand Up @@ -49,14 +49,24 @@ class SemanticPropertyGraph(
graphSchema.addVertexField(nodeLabel, property)
}

def getNode(nodeLabel: String): Node = {
if (nodeLabel.contains("/")) {
graphSchema.nodes(nodeLabel.split("/")(0))
def addNode(nodeLabel: String, nodeType: NodeType.Value, properties: Set[Field]): Unit = {
val node = graphSchema.nodes.get(nodeLabel)
if (node.isDefined) {
return
}
val finalProperties =
properties ++ Set.apply(new Field("id", KTString, true), new Field("name", KTString, true))
if (nodeType == NodeType.CONCEPT) {
graphSchema.addNode(nodeLabel, nodeType, finalProperties, true)
} else {
graphSchema.nodes(nodeLabel)
graphSchema.addNode(nodeLabel, nodeType, finalProperties, false)
}
}

def getNode(nodeLabel: String): Node = {
graphSchema.nodes(nodeLabel)
}

def getEdge(spoStr: String): Edge = {
var spo = new SPO(spoStr)
if (spo.getP.equals("belongTo") && !graphSchema.edges.contains(spo)) {
Expand All @@ -66,11 +76,7 @@ class SemanticPropertyGraph(
}

def containsNode(nodeLabel: String): Boolean = {
if (nodeLabel.contains("/")) {
graphSchema.nodes.contains(nodeLabel.split("/")(0))
} else {
graphSchema.nodes.contains(nodeLabel)
}
graphSchema.nodes.contains(nodeLabel)
}

def containsEdge(spoStr: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class JSONGraphCatalog(val propertyGraph: String) extends Catalog {
: Set[Field] = {
Set.apply(
new Field(Constants.NODE_ID_KEY, KTString, true),
new Field(Constants.VERTEX_INTERNAL_ID_KEY, KTString, true),
new Field(Constants.CONTEXT_LABEL, KTString, true))
}

Expand All @@ -70,8 +71,13 @@ class JSONGraphCatalog(val propertyGraph: String) extends Catalog {
override def getDefaultEdgeProperties()
: Set[Field] = {
Set.apply(
new Field(Constants.CONTEXT_LABEL, KTString, true),
new Field(Constants.EDGE_FROM_ID_KEY, KTString, true),
new Field(Constants.EDGE_TO_ID_KEY, KTString, true)
new Field(Constants.EDGE_TO_ID_KEY, KTString, true),
new Field(Constants.EDGE_FROM_INTERNAL_ID_KEY, KTString, true),
new Field(Constants.EDGE_TO_INTERNAL_ID_KEY, KTString, true),
new Field(Constants.EDGE_FROM_ID_TYPE_KEY, KTString, true),
new Field(Constants.EDGE_TO_ID_TYPE_KEY, KTString, true)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
* or implied.
*/

/**
* Alipay.com Inc.
* Copyright (c) 2004-2023 All Rights Reserved.
*/
package com.antgroup.openspg.reasoner.lube.catalog.impl

import scala.collection.mutable
Expand Down Expand Up @@ -77,6 +73,7 @@ class PropertyGraphCatalog(val propertyGraphSchema: Map[String, Set[String]]) ex
: Set[Field] = {
Set.apply(
new Field(Constants.NODE_ID_KEY, KTString, true),
new Field(Constants.VERTEX_INTERNAL_ID_KEY, KTString, true),
new Field(Constants.CONTEXT_LABEL, KTString, true))
}

Expand All @@ -86,8 +83,13 @@ class PropertyGraphCatalog(val propertyGraphSchema: Map[String, Set[String]]) ex
override def getDefaultEdgeProperties()
: Set[Field] = {
Set.apply(
new Field(Constants.CONTEXT_LABEL, KTString, true),
new Field(Constants.EDGE_FROM_ID_KEY, KTString, true),
new Field(Constants.EDGE_TO_ID_KEY, KTString, true)
new Field(Constants.EDGE_TO_ID_KEY, KTString, true),
new Field(Constants.EDGE_FROM_INTERNAL_ID_KEY, KTString, true),
new Field(Constants.EDGE_TO_INTERNAL_ID_KEY, KTString, true),
new Field(Constants.EDGE_FROM_ID_TYPE_KEY, KTString, true),
new Field(Constants.EDGE_TO_ID_TYPE_KEY, KTString, true)
)
}
}
Loading

0 comments on commit 7bcbcbc

Please sign in to comment.