Skip to content

Commit

Permalink
all examples
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhongshu123 committed Nov 22, 2024
1 parent a1fbdb6 commit 9dccb06
Show file tree
Hide file tree
Showing 30 changed files with 804 additions and 389 deletions.
2 changes: 2 additions & 0 deletions kag/builder/component/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
MusiqueCorpusReader,
HotpotqaCorpusReader,
)
from kag.builder.component.reader.file_reader import FileReader
from kag.builder.component.reader.directory_reader import DirectoryReader


Expand Down Expand Up @@ -66,6 +67,7 @@
"JSONReader",
"HotpotqaCorpusReader",
"MusiqueCorpusReader",
"FileReader",
"DirectoryReader",
"YuqueReader",
"CSVReader",
Expand Down
1 change: 0 additions & 1 deletion kag/builder/component/extractor/.#kag_extractor.py

This file was deleted.

1 change: 0 additions & 1 deletion kag/builder/component/extractor/kag_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def __init__(
external_graph: ExternalGraphLoaderABC = None,
):
self.llm = llm
print(f"self.llm: {self.llm}")
self.schema = SchemaClient(project_id=KAG_PROJECT_CONF.project_id).load()
self.ner_prompt = ner_prompt
self.std_prompt = std_prompt
Expand Down
17 changes: 11 additions & 6 deletions kag/builder/component/mapping/relation_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

from collections import defaultdict
from typing import Dict, List

from kag.builder.model.sub_graph import SubGraph
Expand All @@ -37,6 +36,9 @@ def __init__(
subject_name: str,
predicate_name: str,
object_name: str,
src_id_field: str = None,
dst_id_field: str = None,
property_mapping: dict = {},
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -51,10 +53,9 @@ def __init__(
), f"{predicate_name} is not a valid SPG property/relation name"
self.predicate_name = predicate_name

self.src_id_field = None
self.dst_id_field = None
self.property_mapping: Dict = defaultdict(list)
self.linking_strategies: Dict = dict()
self.src_id_field = src_id_field
self.dst_id_field = dst_id_field
self.property_mapping = property_mapping

def add_src_id_mapping(self, source_name: str):
"""
Expand Down Expand Up @@ -93,7 +94,11 @@ def add_sub_property_mapping(self, source_name: str, target_name: str):
Returns:
self
"""
self.property_mapping[target_name].append(source_name)

if target_name in self.property_mapping:
self.property_mapping[target_name].append(source_name)
else:
self.property_mapping[target_name] = [source_name]
return self

@property
Expand Down
31 changes: 21 additions & 10 deletions kag/builder/component/mapping/spo_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import json
from collections import defaultdict
from typing import List, Type, Dict

from kag.interface.builder.mapping_abc import MappingABC
Expand All @@ -21,15 +20,24 @@

@MappingABC.register("spo")
class SPOMapping(MappingABC):
def __init__(self):
def __init__(
self,
s_type_col: str = None,
s_id_col: str = None,
p_type_col: str = None,
o_type_col: str = None,
o_id_col: str = None,
sub_property_col: str = None,
sub_property_mapping: dict = {},
):
super().__init__()
self.s_type_col = None
self.s_id_col = None
self.p_type_col = None
self.o_type_col = None
self.o_id_col = None
self.sub_property_mapping = defaultdict(list)
self.sub_property_col = None
self.s_type_col = s_type_col
self.s_id_col = s_id_col
self.p_type_col = p_type_col
self.o_type_col = o_type_col
self.o_id_col = o_id_col
self.sub_property_col = sub_property_col
self.sub_property_mapping = sub_property_mapping

@property
def input_types(self) -> Type[Input]:
Expand Down Expand Up @@ -70,7 +78,10 @@ def add_sub_property_mapping(self, source_name: str, target_name: str = None):
if not target_name:
self.sub_property_col = source_name
else:
self.sub_property_mapping[target_name].append(source_name)
if target_name in self.sub_property_mapping:
self.sub_property_mapping[target_name].append(source_name)
else:
self.sub_property_mapping[target_name] = [source_name]
return self

def assemble_sub_graph(self, record: Dict[str, str]):
Expand Down
2 changes: 1 addition & 1 deletion kag/builder/component/reader/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ def output_types(self) -> Output:
return Dict

def load_data(self, input: Input, **kwargs) -> List[Output]:
data = pd.read_csv(input)
data = pd.read_csv(input, dtype=str)
return data.to_dict(orient="records")
30 changes: 30 additions & 0 deletions kag/builder/component/reader/file_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
from typing import List

from kag.interface import SourceReaderABC

from knext.common.base.runnable import Input, Output


@SourceReaderABC.register("file")
class FileReader(SourceReaderABC):
@property
def input_types(self) -> Input:
return str

@property
def output_types(self) -> Output:
return str

def load_data(self, input: Input, **kwargs) -> List[Output]:
return [input]
2 changes: 1 addition & 1 deletion kag/builder/component/vectorizer/batch_vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_placeholder(self, properties, vector_field):
if not property_value:
return None
if not isinstance(property_value, str):
message = f"property {property_key!r} must be string to generate embedding vector"
message = f"property {property_key!r} must be string to generate embedding vector, got {property_value} with type {type(property_value)}"
raise RuntimeError(message)
num = len(self._placeholders)
placeholder = EmbeddingVectorPlaceholder(
Expand Down
161 changes: 124 additions & 37 deletions kag/builder/default_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,14 @@
# or implied.

import logging
import os

from kag.builder.component import (
SPGTypeMapping,
KGWriter,
)


from kag.interface import (
SourceReaderABC,
RecordParserABC,
MappingABC,
ExtractorABC,
SplitterABC,
VectorizerABC,
RecordParserABC,
PostProcessorABC,
SinkWriterABC,
KAGBuilderChain,
Expand All @@ -50,9 +44,28 @@ class DefaultStructuredBuilderChain(KAGBuilderChain):
spg_type_name (str): The name of the SPG type.
"""

def __init__(self, spg_type_name: str, **kwargs):
super().__init__(**kwargs)
self.spg_type_name = spg_type_name
def __init__(
self,
mapping: MappingABC,
writer: SinkWriterABC,
vectorizer: VectorizerABC = None,
):
self.mapping = mapping
self.writer = writer
self.vectorizer = vectorizer
# self.spg_type_name = spg_type_name
# self.parser = RecordParserABC.from_config(
# {
# "type": "dict",
# "id_col": id_col,
# "name_col": name_col,
# "content_col": content_col,
# }
# )
# self.mapping = MappingABC.from_config(
# {"type": "spg", "spg_type_name": self.spg_type_name}
# )
# self.writer = SinkWriterABC.from_config({"type": "kg"})

def build(self, **kwargs):
"""
Expand All @@ -64,34 +77,100 @@ def build(self, **kwargs):
Returns:
chain: The constructed processing chain.
"""
file_path = kwargs.get("file_path")
# source = get_reader(file_path)(output_type="Dict")
suffix = os.path.basename(file_path).split(".")[-1]
source_config = {"type": suffix}
if suffix in ["json", "csv"]:
source_config["output_type"] = "Dict"
source = SourceReaderABC.from_config(source_config)

mapping = SPGTypeMapping(spg_type_name=self.spg_type_name)
sink = KGWriter()

chain = source >> mapping >> sink
# file_path = kwargs.get("file_path")
# # source = get_reader(file_path)(output_type="Dict")
# suffix = os.path.basename(file_path).split(".")[-1]
# source_config = {"type": suffix}
# if suffix in ["json", "csv"]:
# source_config["output_type"] = "Dict"
# source = SourceReaderABC.from_config(source_config)

# mapping = SPGTypeMapping(spg_type_name=self.spg_type_name)
# sink = KGWriter()

# chain = source >> mapping >> sink
if self.vectorizer:
chain = self.mapping >> self.vectorizer >> self.writer
else:
chain = self.mapping >> self.writer

return chain

def invoke(self, file_path, max_workers=10, **kwargs):
logger.info(f"begin processing file_path:{file_path}")
"""
Invokes the processing chain with the given file path and optional parameters.
# def invoke(self, file_path, max_workers=10, **kwargs):
# logger.info(f"begin processing file_path:{file_path}")
# """
# Invokes the processing chain with the given file path and optional parameters.

Args:
file_path (str): The path to the input file.
max_workers (int, optional): The maximum number of workers. Defaults to 10.
**kwargs: Additional keyword arguments.
# Args:
# file_path (str): The path to the input file.
# max_workers (int, optional): The maximum number of workers. Defaults to 10.
# **kwargs: Additional keyword arguments.

Returns:
The result of invoking the processing chain.
"""
return super().invoke(file_path=file_path, max_workers=max_workers, **kwargs)
# Returns:
# The result of invoking the processing chain.
# """
# return super().invoke(file_path=file_path, max_workers=max_workers, **kwargs)


# @KAGBuilderChain.register("structured")
# class DefaultStructuredBuilderChain(KAGBuilderChain):

# """
# A class representing a default SPG builder chain, used to import structured data based on schema definitions

# Steps:
# 0. Initializing by a give SpgType name, which indicates the target of import.
# 1. SourceReader: Reading structured dicts from a given file.
# 2. SPGTypeMapping: Mapping source fields to the properties of target type, and assemble a sub graph.
# By default, the same name mapping is used, which means importing the source field into a property with the same name.
# 3. KGWriter: Writing sub graph into KG storage.

# Attributes:
# spg_type_name (str): The name of the SPG type.
# """

# def __init__(self, spg_type_name: str, **kwargs):
# super().__init__(**kwargs)
# self.spg_type_name = spg_type_name

# def build(self, **kwargs):
# """
# Builds the processing chain for the SPG.

# Args:
# **kwargs: Additional keyword arguments.

# Returns:
# chain: The constructed processing chain.
# """
# file_path = kwargs.get("file_path")
# # source = get_reader(file_path)(output_type="Dict")
# suffix = os.path.basename(file_path).split(".")[-1]
# source_config = {"type": suffix}
# if suffix in ["json", "csv"]:
# source_config["output_type"] = "Dict"
# source = SourceReaderABC.from_config(source_config)

# mapping = SPGTypeMapping(spg_type_name=self.spg_type_name)
# sink = KGWriter()

# chain = source >> mapping >> sink
# return chain

# def invoke(self, file_path, max_workers=10, **kwargs):
# logger.info(f"begin processing file_path:{file_path}")
# """
# Invokes the processing chain with the given file path and optional parameters.

# Args:
# file_path (str): The path to the input file.
# max_workers (int, optional): The maximum number of workers. Defaults to 10.
# **kwargs: Additional keyword arguments.

# Returns:
# The result of invoking the processing chain.
# """
# return super().invoke(file_path=file_path, max_workers=max_workers, **kwargs)


@KAGBuilderChain.register("unstructured")
Expand All @@ -102,8 +181,8 @@ def __init__(
splitter: SplitterABC,
extractor: ExtractorABC,
vectorizer: VectorizerABC,
post_processor: PostProcessorABC,
writer: SinkWriterABC,
post_processor: PostProcessorABC = None,
):
self.parser = parser
self.splitter = splitter
Expand All @@ -113,11 +192,19 @@ def __init__(
self.writer = writer

def build(self, **kwargs):
if self.post_processor:
return (
self.parser
>> self.splitter
>> self.extractor
>> self.vectorizer
>> self.post_processor
>> self.writer
)
return (
self.parser
>> self.splitter
>> self.extractor
>> self.vectorizer
>> self.post_processor
>> self.writer
)
Loading

0 comments on commit 9dccb06

Please sign in to comment.