diff --git a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java index d9a3948cc..4f2f2c9ae 100644 --- a/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java +++ b/reasoner/runner/runner-common/src/main/java/com/antgroup/openspg/reasoner/rdg/common/LinkEdgeImpl.java @@ -14,6 +14,7 @@ package com.antgroup.openspg.reasoner.rdg.common; import com.antgroup.openspg.reasoner.common.constants.Constants; +import com.antgroup.openspg.reasoner.common.graph.edge.Direction; import com.antgroup.openspg.reasoner.common.graph.edge.IEdge; import com.antgroup.openspg.reasoner.common.graph.edge.impl.Edge; import com.antgroup.openspg.reasoner.common.graph.property.IProperty; @@ -151,6 +152,10 @@ public List> link(KgGraph kgGraph) { genTargetVertexTypes = Lists.newArrayList(linkedUdtfResult.getTargetVertexTypeList().get(i)); } + String direction = Direction.OUT.name(); + if (CollectionUtils.isNotEmpty(linkedUdtfResult.getDirection()) && i < linkedUdtfResult.getDirection().size()) { + direction = linkedUdtfResult.getDirection().get(i); + } if (genTargetVertexTypes.size() == 0) { throw new RuntimeException( "Linked edge target vertex type must contains at least one type"); @@ -170,21 +175,32 @@ public List> link(KgGraph kgGraph) { newAliasVertexMap.computeIfAbsent(targetAlias, k -> new HashSet<>()); newVertexSet.add(new Vertex<>(targetId, vertexProperty)); + Map props = new HashMap<>(linkedUdtfResult.getEdgePropertyMap()); - props.put(Constants.EDGE_TO_ID_KEY, targetIdStr); + Object from_id = sourceVertex.getValue().get(Constants.NODE_ID_KEY); + Object to_id = targetIdStr; + if (Objects.equals(direction, Direction.IN.name())) { + to_id = from_id; + from_id = targetId; + } + props.put(Constants.EDGE_TO_ID_KEY, to_id); if (sourceVertex.getValue().isKeyExist(Constants.NODE_ID_KEY)) { props.put( - Constants.EDGE_FROM_ID_KEY, sourceVertex.getValue().get(Constants.NODE_ID_KEY)); + Constants.EDGE_FROM_ID_KEY, from_id); } IProperty property = new EdgeProperty(props); // construct new edge IEdge linkedEdge = new Edge<>(sourceId, targetId, property); + String edgeType = StringUtils.isNotEmpty(linkedUdtfResult.getEdgeType()) ? linkedUdtfResult.getEdgeType() : linkedEdgePattern.edge().funcName(); linkedEdge.setType(sourceId.getType() + "_" + edgeType + "_" + targetVertexType); + if (Objects.equals(direction, Direction.IN.name())) { + linkedEdge.setType(targetVertexType + "_" + edgeType + "_" + sourceId.getType()); + } String edgeAlias = pc.alias(); Set> newEdgeSet = diff --git a/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/model/LinkedUdtfResult.java b/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/model/LinkedUdtfResult.java index 742800c6c..4f463047a 100644 --- a/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/model/LinkedUdtfResult.java +++ b/reasoner/udf/src/main/java/com/antgroup/openspg/reasoner/udf/model/LinkedUdtfResult.java @@ -28,6 +28,9 @@ public class LinkedUdtfResult { /** The target vertex id of linked edge */ private List targetVertexIdList = new ArrayList<>(); + /** The linked direction */ + private List direction = new ArrayList<>(); + /** The target vertex type of linked edge */ private List targetVertexTypeList = new ArrayList<>(); diff --git a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java index 954d4a243..948080141 100644 --- a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java +++ b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/impl/Utils.java @@ -34,13 +34,13 @@ public class Utils { public static List getAllRdfEntity( - GraphState graphState, IVertexId id) { + GraphState graphState, IVertexId id, String rdfType) { List result = new ArrayList<>(); // find vertex prop IVertex vertex = graphState.getVertex(id, null, null); - if (null != vertex && null != vertex.getValue()) { + if (null != vertex && null != vertex.getValue() && !"relation".equals(rdfType)) { // 提取属性 log.info("vertex_property,{}", vertex); for (String propertyName : vertex.getValue().getKeySet()) { @@ -56,6 +56,7 @@ public static List getAllRdfEntity( "name", String.valueOf(pValue))); graphState.addVertex(propVertex); LinkedUdtfResult udtfRes = new LinkedUdtfResult(); + udtfRes.getDirection().add(Direction.OUT.name()); udtfRes.setEdgeType(propertyName); udtfRes.getTargetVertexIdList().add(String.valueOf(pValue)); if (pValue instanceof Integer) { @@ -76,6 +77,14 @@ public static List getAllRdfEntity( if (CollectionUtils.isNotEmpty(edgeList)) { for (IEdge edge : edgeList) { Object toIdObj = edge.getValue().get(Constants.EDGE_TO_ID_KEY); + String dir = Direction.OUT.name(); + Object nodeIdObj = vertex.getValue().get(Constants.NODE_ID_KEY); + String targetType = edge.getTargetId().getType(); + if (nodeIdObj.equals(toIdObj)) { + toIdObj = edge.getValue().get(Constants.EDGE_FROM_ID_KEY); + dir = Direction.IN.name(); + targetType = String.valueOf(edge.getValue().get(Constants.EDGE_FROM_ID_TYPE_KEY)); + } if (null == toIdObj) { continue; } @@ -84,7 +93,8 @@ public static List getAllRdfEntity( LinkedUdtfResult udtfRes = new LinkedUdtfResult(); udtfRes.setEdgeType(spo.getP()); udtfRes.getTargetVertexIdList().add(String.valueOf(toIdObj)); - udtfRes.getTargetVertexTypeList().add(edge.getTargetId().getType()); + udtfRes.getTargetVertexTypeList().add(targetType); + udtfRes.getDirection().add(dir); for (String propKey : edge.getValue().getKeySet()) { if (propKey.startsWith("_")) { continue; diff --git a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java index d7d4b8e3c..413ed5647 100644 --- a/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java +++ b/server/core/reasoner/service/src/main/java/com/antgroup/openspg/server/core/reasoner/service/udtf/RdfExpand.java @@ -68,6 +68,10 @@ public void process(List args) { String vertexType = null; String bizId = null; Object s = context.get(srcAlias); + String rdfType = null; + if (args.size() > 0) { + rdfType = (String) args.get(0); + } if (s instanceof Map) { Map sMap = (Map) s; bizId = (String) sMap.get(Constants.NODE_ID_KEY); @@ -75,7 +79,7 @@ public void process(List args) { } IVertexId id = new VertexBizId(bizId, vertexType); // 结果 - List validBizIds = Utils.getAllRdfEntity(graphState, id); + List validBizIds = Utils.getAllRdfEntity(graphState, id, rdfType); for (LinkedUdtfResult udtfResult : validBizIds) { forward(Lists.newArrayList(udtfResult)); }