Skip to content

Commit

Permalink
feat(openspg): version 0.6 (#451)
Browse files Browse the repository at this point in the history
  • Loading branch information
andylau-55 authored Jan 8, 2025
1 parent 65b4042 commit e53b20a
Show file tree
Hide file tree
Showing 260 changed files with 17,115 additions and 867 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private static BaseLogicalNode<?> parse(Node node) {
case BUILDER_INDEX:
return new BuilderIndexNode(
node.getId(), node.getName(), (BuilderIndexNodeConfig) node.getNodeConfig());
case PYTHON:
return new PythonNode(
node.getId(), node.getName(), (PythonNodeConfig) node.getNodeConfig());
case PARAGRAPH_SPLIT:
return new ParagraphSplitNode(
node.getId(), node.getName(), (ParagraphSplitNodeConfig) node.getNodeConfig());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/

package com.antgroup.openspg.builder.core.logical;

import com.antgroup.openspg.builder.model.pipeline.config.PythonNodeConfig;
import com.antgroup.openspg.builder.model.pipeline.enums.NodeTypeEnum;

public class PythonNode extends BaseLogicalNode<PythonNodeConfig> {

public PythonNode(String id, String name, PythonNodeConfig nodeConfig) {
super(id, name, NodeTypeEnum.PYTHON, nodeConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ private static BaseProcessor<?> parse(BaseLogicalNode<?> node) {
case BUILDER_INDEX:
return new BuilderIndexProcessor(
node.getId(), node.getName(), (BuilderIndexNodeConfig) node.getNodeConfig());
case PYTHON:
return new PythonProcessor(
node.getId(), node.getName(), (PythonNodeConfig) node.getNodeConfig());
case LLM_NL_EXTRACT:
return new LLMNlExtractProcessor(
node.getId(), node.getName(), (LLMNlExtractNodeConfig) node.getNodeConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,99 +15,49 @@

import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.model.pipeline.config.OperatorConfig;
import java.util.*;
import java.util.stream.Collectors;
import com.antgroup.openspg.common.util.pemja.PemjaUtils;
import com.antgroup.openspg.common.util.pemja.model.PemjaConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import pemja.core.PythonInterpreter;
import pemja.core.PythonInterpreterConfig;

@Slf4j
public class PythonOperatorFactory implements OperatorFactory {

private String pythonExec;
private String[] pythonPaths;
private String pythonKnextPath;
private String pythonPaths;
private String hostAddr;
private Long projectId;

private PythonOperatorFactory() {}

public static OperatorFactory getInstance() {
return new PythonOperatorFactory();
}

private PythonInterpreter newPythonInterpreter() {

PythonInterpreterConfig.PythonInterpreterConfigBuilder builder =
PythonInterpreterConfig.newBuilder();
if (pythonExec != null) {
builder.setPythonExec(pythonExec);
}
if (pythonPaths != null) {
builder.addPythonPaths(pythonPaths);
}
return new PythonInterpreter(builder.build());
}

@Override
public void init(BuilderContext context) {
pythonExec = context.getPythonExec();
pythonPaths = (context.getPythonPaths() != null ? context.getPythonPaths().split(";") : null);
pythonKnextPath = context.getPythonKnextPath();
log.info("pythonExec={}, pythonPaths={}", pythonExec, Arrays.toString(pythonPaths));
}

public PythonInterpreter getPythonInterpreter(OperatorConfig config) {
PythonInterpreter interpreter = newPythonInterpreter();
loadOperatorObject(config, interpreter);
return interpreter;
pythonPaths = context.getPythonPaths();
hostAddr = context.getSchemaUrl();
projectId = context.getProjectId();
log.info("pythonExec={}, pythonPaths={}", pythonExec, pythonPaths);
}

@Override
public void loadOperator(OperatorConfig config) {}

@Override
public Object invoke(OperatorConfig config, Object... input) {
PythonInterpreter interpreter = getPythonInterpreter(config);
String pythonObject = getPythonOperatorObject(config);
try {
return interpreter.invokeMethod(pythonObject, config.getMethod(), input);
} finally {
interpreter.close();
}
}

private void loadOperatorObject(OperatorConfig config, PythonInterpreter interpreter) {
if (StringUtils.isNotBlank(pythonKnextPath)) {
interpreter.exec(String.format("import sys; sys.path.append(\"%s\")", pythonKnextPath));
}
String pythonOperatorObject = getPythonOperatorObject(config);
interpreter.exec(
String.format("from %s import %s", config.getModulePath(), config.getClassName()));
interpreter.exec(
String.format(
"%s=%s(%s)",
pythonOperatorObject,
PemjaConfig pemjaConfig =
new PemjaConfig(
pythonExec,
pythonPaths,
hostAddr,
projectId,
config.getModulePath(),
config.getClassName(),
paramToPythonString(config.getParams(), config.getParamsPrefix())));
}

private String getPythonOperatorObject(OperatorConfig config) {
String pythonOperatorObject = config.getClassName() + "_" + config.getUniqueKey();
return pythonOperatorObject;
}

private String paramToPythonString(Map<String, String> params, String paramsPrefix) {
if (MapUtils.isEmpty(params)) {
return "";
}
if (StringUtils.isBlank(paramsPrefix)) {
paramsPrefix = "";
}
String keyValue =
params.entrySet().stream()
.map(entry -> String.format("'%s': '%s'", entry.getKey(), entry.getValue()))
.collect(Collectors.joining(","));
return String.format("%s{%s}", paramsPrefix, keyValue);
config.getMethod(),
config.getParams(),
config.getParamsPrefix());
return PemjaUtils.invoke(pemjaConfig, input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

public class BuilderIndexProcessor extends BaseProcessor<BuilderIndexNodeConfig> {

private ExecuteNode node;
private ExecuteNode node = new ExecuteNode();
private SearchEngineClient searchEngineClient;
private CacheClient cacheClient;

Expand All @@ -48,7 +48,9 @@ public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
searchEngineClient = SearchEngineClientDriverManager.getClient(context.getSearchEngineUrl());
cacheClient = CacheClientDriverManager.getClient(context.getCacheUrl());
this.node = context.getExecuteNodes().get(getId());
if (context.getExecuteNodes() != null) {
this.node = context.getExecuteNodes().get(getId());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
package com.antgroup.openspg.builder.core.physical.process;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.model.exception.BuilderException;
import com.antgroup.openspg.builder.model.pipeline.ExecuteNode;
import com.antgroup.openspg.builder.model.pipeline.config.ExtractPostProcessorNodeConfig;
import com.antgroup.openspg.builder.model.pipeline.enums.StatusEnum;
import com.antgroup.openspg.builder.model.record.BaseRecord;
import com.antgroup.openspg.builder.model.record.SubGraphRecord;
import com.google.common.collect.Lists;
import com.antgroup.openspg.common.constants.BuilderConstant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ExtractPostProcessor extends BasePythonProcessor<ExtractPostProcessorNodeConfig> {

private ExecuteNode node;
private ExecuteNode node = new ExecuteNode();

public ExtractPostProcessor(String id, String name, ExtractPostProcessorNodeConfig config) {
super(id, name, config);
Expand All @@ -37,39 +39,42 @@ public ExtractPostProcessor(String id, String name, ExtractPostProcessorNodeConf
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
this.node = context.getExecuteNodes().get(getId());
if (context.getExecuteNodes() != null) {
this.node = context.getExecuteNodes().get(getId());
}
}

@Override
public List<BaseRecord> process(List<BaseRecord> inputs) {
node.setStatus(StatusEnum.RUNNING);
node.addTraceLog("Start post processor...");
JSONObject pyConfig = new JSONObject();
pyConfig.put(BuilderConstant.TYPE, BuilderConstant.BASE);
node.addTraceLog("Start alignment...");
List<BaseRecord> results = new ArrayList<>();

List<Map> lists = Lists.newArrayList();
for (BaseRecord record : inputs) {
SubGraphRecord spgRecord = (SubGraphRecord) record;
lists.add(mapper.convertValue(spgRecord, Map.class));
Map map = mapper.convertValue(spgRecord, Map.class);
node.addTraceLog("invoke alignment operator:%s", config.getOperatorConfig().getClassName());
List<Object> result =
(List<Object>)
operatorFactory.invoke(
config.getOperatorConfig(),
BuilderConstant.POSTPROCESSOR_ABC,
pyConfig.toJSONString(),
map);
node.addTraceLog(
"invoke alignment operator:%s succeed", config.getOperatorConfig().getClassName());
List<SubGraphRecord> records =
JSON.parseObject(JSON.toJSONString(result), new TypeReference<List<SubGraphRecord>>() {});
for (SubGraphRecord subGraph : records) {
node.addTraceLog(
"alignment succeed node:%s edge%s",
subGraph.getResultNodes().size(), subGraph.getResultEdges().size());
results.add(subGraph);
}
}

node.addTraceLog(
"invoke post processor operator:%s", config.getOperatorConfig().getClassName());
Object result = operatorFactory.invoke(config.getOperatorConfig(), lists);
node.addTraceLog(
"invoke post processor operator:%s succeed", config.getOperatorConfig().getClassName());
SubGraphRecord subGraph = JSON.parseObject(JSON.toJSONString(result), SubGraphRecord.class);
node.addTraceLog(
"post processor succeed node:%s edge%s",
subGraph.getResultNodes().size(), subGraph.getResultEdges().size());

/*ProjectSchema projectSchema = CommonUtils.getProjectSchema(context);
List<BaseSPGRecord> nodes = CommonUtils.convertNodes(subGraph, projectSchema);
List<BaseSPGRecord> edges = CommonUtils.convertEdges(subGraph, projectSchema);
results.addAll(nodes);
results.addAll(edges);*/
results.add(subGraph);
node.addTraceLog("post processor complete...");
node.setOutputs(subGraph);
node.addTraceLog("alignment complete...");
node.setStatus(StatusEnum.FINISH);
return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.antgroup.openspg.builder.core.physical.process;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.antgroup.openspg.builder.core.physical.operator.OperatorFactory;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
Expand All @@ -24,6 +25,9 @@
import com.antgroup.openspg.builder.model.record.BaseRecord;
import com.antgroup.openspg.builder.model.record.ChunkRecord;
import com.antgroup.openspg.builder.model.record.SubGraphRecord;
import com.antgroup.openspg.common.constants.BuilderConstant;
import com.antgroup.openspg.server.common.model.CommonConstants;
import com.antgroup.openspg.server.common.model.project.Project;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,7 +44,8 @@
@Slf4j
public class LLMNlExtractProcessor extends BasePythonProcessor<LLMNlExtractNodeConfig> {

private ExecuteNode node;
private ExecuteNode node = new ExecuteNode();
private Project project;

private static final RejectedExecutionHandler handler =
(r, executor) -> {
Expand All @@ -60,7 +65,10 @@ public LLMNlExtractProcessor(String id, String name, LLMNlExtractNodeConfig conf
@Override
public void doInit(BuilderContext context) throws BuilderException {
super.doInit(context);
this.node = context.getExecuteNodes().get(getId());
if (context.getExecuteNodes() != null) {
this.node = context.getExecuteNodes().get(getId());
}
project = JSON.parseObject(context.getProject(), Project.class);
if (executor == null) {
executor =
new ThreadPoolExecutor(
Expand All @@ -84,7 +92,8 @@ public List<BaseRecord> process(List<BaseRecord> inputs) {
for (BaseRecord record : inputs) {
ChunkRecord chunkRecord = (ChunkRecord) record;
Future<List<SubGraphRecord>> future =
executor.submit(new ExtractTaskCallable(node, chunkRecord, operatorFactory, config));
executor.submit(
new ExtractTaskCallable(node, chunkRecord, operatorFactory, config, project));
futures.add(future);
}

Expand All @@ -106,32 +115,44 @@ static class ExtractTaskCallable implements Callable<List<SubGraphRecord>> {
private final ChunkRecord chunkRecord;
private final OperatorFactory operatorFactory;
private final LLMNlExtractNodeConfig config;
private final Project project;

public ExtractTaskCallable(
ExecuteNode node,
ChunkRecord chunkRecord,
OperatorFactory operatorFactory,
LLMNlExtractNodeConfig config) {
LLMNlExtractNodeConfig config,
Project project) {
this.chunkRecord = chunkRecord;
this.node = node;
this.operatorFactory = operatorFactory;
this.config = config;
this.project = project;
}

@Override
public List<SubGraphRecord> call() throws Exception {
ChunkRecord.Chunk chunk = chunkRecord.getChunk();
String names = chunk.getName();
String projectConfig = project.getConfig();
JSONObject llm = JSONObject.parseObject(projectConfig).getJSONObject(CommonConstants.LLM);
JSONObject pyConfig = new JSONObject();
pyConfig.put(BuilderConstant.TYPE, BuilderConstant.SCHEMA_FREE);
pyConfig.put(BuilderConstant.LLM, llm);
node.addTraceLog(
"invoke extract operator:%s chunk:%s", config.getOperatorConfig().getClassName(), names);

Map record = new ObjectMapper().convertValue(chunk, Map.class);

log.info("LLMNlExtractProcessor invoke Chunks: {}", names);
List<Object> result =
(List<Object>) operatorFactory.invoke(config.getOperatorConfig(), record);
(List<Object>)
operatorFactory.invoke(
config.getOperatorConfig(),
BuilderConstant.EXTRACTOR_ABC,
pyConfig.toJSONString(),
record);
List<SubGraphRecord> records =
JSON.parseObject(JSON.toJSONString(result), new TypeReference<List<SubGraphRecord>>() {});
// CommonUtils.addLabelPrefix(project.getNamespace(), records);
node.addTraceLog(
"invoke extract operator:%s chunk:%s succeed",
config.getOperatorConfig().getClassName(), names);
Expand Down
Loading

0 comments on commit e53b20a

Please sign in to comment.