diff --git a/kag/builder/component/__init__.py b/kag/builder/component/__init__.py index c411ca9b..a530b1e1 100644 --- a/kag/builder/component/__init__.py +++ b/kag/builder/component/__init__.py @@ -29,6 +29,7 @@ MusiqueCorpusReader, HotpotqaCorpusReader, ) +from kag.builder.component.reader.file_reader import FileReader from kag.builder.component.reader.directory_reader import DirectoryReader @@ -66,6 +67,7 @@ "JSONReader", "HotpotqaCorpusReader", "MusiqueCorpusReader", + "FileReader", "DirectoryReader", "YuqueReader", "CSVReader", diff --git a/kag/builder/component/extractor/.#kag_extractor.py b/kag/builder/component/extractor/.#kag_extractor.py deleted file mode 120000 index 4ee9f497..00000000 --- a/kag/builder/component/extractor/.#kag_extractor.py +++ /dev/null @@ -1 +0,0 @@ -simplex@MacBook-Pro.local.69088 \ No newline at end of file diff --git a/kag/builder/component/extractor/kag_extractor.py b/kag/builder/component/extractor/kag_extractor.py index effd3d48..cbfdec6e 100644 --- a/kag/builder/component/extractor/kag_extractor.py +++ b/kag/builder/component/extractor/kag_extractor.py @@ -45,7 +45,6 @@ def __init__( external_graph: ExternalGraphLoaderABC = None, ): self.llm = llm - print(f"self.llm: {self.llm}") self.schema = SchemaClient(project_id=KAG_PROJECT_CONF.project_id).load() self.ner_prompt = ner_prompt self.std_prompt = std_prompt diff --git a/kag/builder/component/mapping/relation_mapping.py b/kag/builder/component/mapping/relation_mapping.py index 6ec8f424..9e3bbb73 100644 --- a/kag/builder/component/mapping/relation_mapping.py +++ b/kag/builder/component/mapping/relation_mapping.py @@ -10,7 +10,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. -from collections import defaultdict from typing import Dict, List from kag.builder.model.sub_graph import SubGraph @@ -37,6 +36,9 @@ def __init__( subject_name: str, predicate_name: str, object_name: str, + src_id_field: str = None, + dst_id_field: str = None, + property_mapping: dict = {}, **kwargs, ): super().__init__(**kwargs) @@ -51,10 +53,9 @@ def __init__( ), f"{predicate_name} is not a valid SPG property/relation name" self.predicate_name = predicate_name - self.src_id_field = None - self.dst_id_field = None - self.property_mapping: Dict = defaultdict(list) - self.linking_strategies: Dict = dict() + self.src_id_field = src_id_field + self.dst_id_field = dst_id_field + self.property_mapping = property_mapping def add_src_id_mapping(self, source_name: str): """ @@ -93,7 +94,11 @@ def add_sub_property_mapping(self, source_name: str, target_name: str): Returns: self """ - self.property_mapping[target_name].append(source_name) + + if target_name in self.property_mapping: + self.property_mapping[target_name].append(source_name) + else: + self.property_mapping[target_name] = [source_name] return self @property diff --git a/kag/builder/component/mapping/spo_mapping.py b/kag/builder/component/mapping/spo_mapping.py index 48f94308..d2288006 100644 --- a/kag/builder/component/mapping/spo_mapping.py +++ b/kag/builder/component/mapping/spo_mapping.py @@ -10,7 +10,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. import json -from collections import defaultdict from typing import List, Type, Dict from kag.interface.builder.mapping_abc import MappingABC @@ -21,15 +20,24 @@ @MappingABC.register("spo") class SPOMapping(MappingABC): - def __init__(self): + def __init__( + self, + s_type_col: str = None, + s_id_col: str = None, + p_type_col: str = None, + o_type_col: str = None, + o_id_col: str = None, + sub_property_col: str = None, + sub_property_mapping: dict = {}, + ): super().__init__() - self.s_type_col = None - self.s_id_col = None - self.p_type_col = None - self.o_type_col = None - self.o_id_col = None - self.sub_property_mapping = defaultdict(list) - self.sub_property_col = None + self.s_type_col = s_type_col + self.s_id_col = s_id_col + self.p_type_col = p_type_col + self.o_type_col = o_type_col + self.o_id_col = o_id_col + self.sub_property_col = sub_property_col + self.sub_property_mapping = sub_property_mapping @property def input_types(self) -> Type[Input]: @@ -70,7 +78,10 @@ def add_sub_property_mapping(self, source_name: str, target_name: str = None): if not target_name: self.sub_property_col = source_name else: - self.sub_property_mapping[target_name].append(source_name) + if target_name in self.sub_property_mapping: + self.sub_property_mapping[target_name].append(source_name) + else: + self.sub_property_mapping[target_name] = [source_name] return self def assemble_sub_graph(self, record: Dict[str, str]): diff --git a/kag/builder/component/reader/csv_reader.py b/kag/builder/component/reader/csv_reader.py index 6dbeb162..c487ee71 100644 --- a/kag/builder/component/reader/csv_reader.py +++ b/kag/builder/component/reader/csv_reader.py @@ -36,5 +36,5 @@ def output_types(self) -> Output: return Dict def load_data(self, input: Input, **kwargs) -> List[Output]: - data = pd.read_csv(input) + data = pd.read_csv(input, dtype=str) return data.to_dict(orient="records") diff --git a/kag/builder/component/reader/file_reader.py b/kag/builder/component/reader/file_reader.py new file mode 100644 index 00000000..2d610248 --- /dev/null +++ b/kag/builder/component/reader/file_reader.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 OpenSPG Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +from typing import List + +from kag.interface import SourceReaderABC + +from knext.common.base.runnable import Input, Output + + +@SourceReaderABC.register("file") +class FileReader(SourceReaderABC): + @property + def input_types(self) -> Input: + return str + + @property + def output_types(self) -> Output: + return str + + def load_data(self, input: Input, **kwargs) -> List[Output]: + return [input] diff --git a/kag/builder/component/vectorizer/batch_vectorizer.py b/kag/builder/component/vectorizer/batch_vectorizer.py index b364f5e4..d907a559 100644 --- a/kag/builder/component/vectorizer/batch_vectorizer.py +++ b/kag/builder/component/vectorizer/batch_vectorizer.py @@ -51,7 +51,7 @@ def get_placeholder(self, properties, vector_field): if not property_value: return None if not isinstance(property_value, str): - message = f"property {property_key!r} must be string to generate embedding vector" + message = f"property {property_key!r} must be string to generate embedding vector, got {property_value} with type {type(property_value)}" raise RuntimeError(message) num = len(self._placeholders) placeholder = EmbeddingVectorPlaceholder( diff --git a/kag/builder/default_chain.py b/kag/builder/default_chain.py index 37cdacd2..8dfe3f2b 100644 --- a/kag/builder/default_chain.py +++ b/kag/builder/default_chain.py @@ -11,20 +11,14 @@ # or implied. import logging -import os - -from kag.builder.component import ( - SPGTypeMapping, - KGWriter, -) from kag.interface import ( - SourceReaderABC, + RecordParserABC, + MappingABC, ExtractorABC, SplitterABC, VectorizerABC, - RecordParserABC, PostProcessorABC, SinkWriterABC, KAGBuilderChain, @@ -50,9 +44,28 @@ class DefaultStructuredBuilderChain(KAGBuilderChain): spg_type_name (str): The name of the SPG type. """ - def __init__(self, spg_type_name: str, **kwargs): - super().__init__(**kwargs) - self.spg_type_name = spg_type_name + def __init__( + self, + mapping: MappingABC, + writer: SinkWriterABC, + vectorizer: VectorizerABC = None, + ): + self.mapping = mapping + self.writer = writer + self.vectorizer = vectorizer + # self.spg_type_name = spg_type_name + # self.parser = RecordParserABC.from_config( + # { + # "type": "dict", + # "id_col": id_col, + # "name_col": name_col, + # "content_col": content_col, + # } + # ) + # self.mapping = MappingABC.from_config( + # {"type": "spg", "spg_type_name": self.spg_type_name} + # ) + # self.writer = SinkWriterABC.from_config({"type": "kg"}) def build(self, **kwargs): """ @@ -64,34 +77,100 @@ def build(self, **kwargs): Returns: chain: The constructed processing chain. """ - file_path = kwargs.get("file_path") - # source = get_reader(file_path)(output_type="Dict") - suffix = os.path.basename(file_path).split(".")[-1] - source_config = {"type": suffix} - if suffix in ["json", "csv"]: - source_config["output_type"] = "Dict" - source = SourceReaderABC.from_config(source_config) - - mapping = SPGTypeMapping(spg_type_name=self.spg_type_name) - sink = KGWriter() - - chain = source >> mapping >> sink + # file_path = kwargs.get("file_path") + # # source = get_reader(file_path)(output_type="Dict") + # suffix = os.path.basename(file_path).split(".")[-1] + # source_config = {"type": suffix} + # if suffix in ["json", "csv"]: + # source_config["output_type"] = "Dict" + # source = SourceReaderABC.from_config(source_config) + + # mapping = SPGTypeMapping(spg_type_name=self.spg_type_name) + # sink = KGWriter() + + # chain = source >> mapping >> sink + if self.vectorizer: + chain = self.mapping >> self.vectorizer >> self.writer + else: + chain = self.mapping >> self.writer + return chain - def invoke(self, file_path, max_workers=10, **kwargs): - logger.info(f"begin processing file_path:{file_path}") - """ - Invokes the processing chain with the given file path and optional parameters. + # def invoke(self, file_path, max_workers=10, **kwargs): + # logger.info(f"begin processing file_path:{file_path}") + # """ + # Invokes the processing chain with the given file path and optional parameters. - Args: - file_path (str): The path to the input file. - max_workers (int, optional): The maximum number of workers. Defaults to 10. - **kwargs: Additional keyword arguments. + # Args: + # file_path (str): The path to the input file. + # max_workers (int, optional): The maximum number of workers. Defaults to 10. + # **kwargs: Additional keyword arguments. - Returns: - The result of invoking the processing chain. - """ - return super().invoke(file_path=file_path, max_workers=max_workers, **kwargs) + # Returns: + # The result of invoking the processing chain. + # """ + # return super().invoke(file_path=file_path, max_workers=max_workers, **kwargs) + + +# @KAGBuilderChain.register("structured") +# class DefaultStructuredBuilderChain(KAGBuilderChain): + +# """ +# A class representing a default SPG builder chain, used to import structured data based on schema definitions + +# Steps: +# 0. Initializing by a give SpgType name, which indicates the target of import. +# 1. SourceReader: Reading structured dicts from a given file. +# 2. SPGTypeMapping: Mapping source fields to the properties of target type, and assemble a sub graph. +# By default, the same name mapping is used, which means importing the source field into a property with the same name. +# 3. KGWriter: Writing sub graph into KG storage. + +# Attributes: +# spg_type_name (str): The name of the SPG type. +# """ + +# def __init__(self, spg_type_name: str, **kwargs): +# super().__init__(**kwargs) +# self.spg_type_name = spg_type_name + +# def build(self, **kwargs): +# """ +# Builds the processing chain for the SPG. + +# Args: +# **kwargs: Additional keyword arguments. + +# Returns: +# chain: The constructed processing chain. +# """ +# file_path = kwargs.get("file_path") +# # source = get_reader(file_path)(output_type="Dict") +# suffix = os.path.basename(file_path).split(".")[-1] +# source_config = {"type": suffix} +# if suffix in ["json", "csv"]: +# source_config["output_type"] = "Dict" +# source = SourceReaderABC.from_config(source_config) + +# mapping = SPGTypeMapping(spg_type_name=self.spg_type_name) +# sink = KGWriter() + +# chain = source >> mapping >> sink +# return chain + +# def invoke(self, file_path, max_workers=10, **kwargs): +# logger.info(f"begin processing file_path:{file_path}") +# """ +# Invokes the processing chain with the given file path and optional parameters. + +# Args: +# file_path (str): The path to the input file. +# max_workers (int, optional): The maximum number of workers. Defaults to 10. +# **kwargs: Additional keyword arguments. + +# Returns: +# The result of invoking the processing chain. +# """ +# return super().invoke(file_path=file_path, max_workers=max_workers, **kwargs) @KAGBuilderChain.register("unstructured") @@ -102,8 +181,8 @@ def __init__( splitter: SplitterABC, extractor: ExtractorABC, vectorizer: VectorizerABC, - post_processor: PostProcessorABC, writer: SinkWriterABC, + post_processor: PostProcessorABC = None, ): self.parser = parser self.splitter = splitter @@ -113,11 +192,19 @@ def __init__( self.writer = writer def build(self, **kwargs): + if self.post_processor: + return ( + self.parser + >> self.splitter + >> self.extractor + >> self.vectorizer + >> self.post_processor + >> self.writer + ) return ( self.parser >> self.splitter >> self.extractor >> self.vectorizer - >> self.post_processor >> self.writer ) diff --git a/kag/builder/runner.py b/kag/builder/runner.py index 9ac15451..943984d6 100644 --- a/kag/builder/runner.py +++ b/kag/builder/runner.py @@ -25,11 +25,14 @@ def generate_hash_id(value): if isinstance(value, dict): sorted_items = sorted(value.items()) - value = str(sorted_items) - if isinstance(value, str): - value = value.encode("utf-8") + key = str(sorted_items) + else: + key = value + if isinstance(key, str): + key = key.encode("utf-8") hasher = hashlib.sha256() - hasher.update(value) + hasher.update(key) + return hasher.hexdigest() @@ -73,12 +76,14 @@ def __init__( self, reader: SourceReaderABC, chain: KAGBuilderChain, - num_parallel: int = 4, + num_parallel: int = 2, + chain_level_num_paralle: int = 8, ckpt_dir: str = None, ): self.reader = reader self.chain = chain self.num_parallel = num_parallel + self.chain_level_num_paralle = chain_level_num_paralle if ckpt_dir is None: ckpt_dir = "./ckpt" self.ckpt_dir = ckpt_dir @@ -86,12 +91,12 @@ def __init__( os.makedirs(self.ckpt_dir, exist_ok=True) self.ckpt = CKPT(self.ckpt_dir) - print(self.ckpt._ckpt) def invoke(self, input): def process(chain, data, data_id): try: - result = chain.invoke(data) + + result = chain.invoke(data, max_workers=self.chain_level_num_paralle) return result, data_id except Exception: traceback.print_exc() @@ -99,6 +104,7 @@ def process(chain, data, data_id): self.ckpt.open() futures = [] + print(f"Processing {input}") with ThreadPoolExecutor(self.num_parallel) as executor: for item in self.reader.invoke(input): item_id = generate_hash_id(item) @@ -107,7 +113,10 @@ def process(chain, data, data_id): fut = executor.submit(process, self.chain, item, item_id) futures.append(fut) for future in tqdm( - as_completed(futures), total=len(futures), desc="Processing" + as_completed(futures), + total=len(futures), + desc="Progress", + position=0, ): result = future.result() if result is not None: diff --git a/kag/common/env.py b/kag/common/env.py index 9957d106..916726de 100644 --- a/kag/common/env.py +++ b/kag/common/env.py @@ -30,7 +30,6 @@ def get_rank(default=None): tf_config = parse_tf_config() if tf_config is None: - print(f"no RANK info in env/tf_config, use default value:{default}") return default num_master = get_role_number(tf_config, "master") diff --git a/kag/common/llm/mock_llm.py b/kag/common/llm/mock_llm.py index b4559026..1036e886 100644 --- a/kag/common/llm/mock_llm.py +++ b/kag/common/llm/mock_llm.py @@ -10,7 +10,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. - +import time import json from kag.interface import LLMClient @@ -21,6 +21,7 @@ def __init__(self): pass def match_input(self, prompt): + time.sleep(3) # mimic llm call if "You're a very effective entity extraction system" in prompt: return [ { diff --git a/kag/common/llm/openai_client.py b/kag/common/llm/openai_client.py index 6e01bbf0..a47acdf5 100644 --- a/kag/common/llm/openai_client.py +++ b/kag/common/llm/openai_client.py @@ -17,7 +17,8 @@ from kag.interface import LLMClient -# logging.basicConfig(level=logging.DEBUG) +logging.getLogger("openai").setLevel(logging.ERROR) +logging.getLogger("httpx").setLevel(logging.ERROR) logger = logging.getLogger(__name__) diff --git a/kag/examples/2wiki/builder/indexer.py b/kag/examples/2wiki/builder/indexer.py index 0220708f..5194e987 100644 --- a/kag/examples/2wiki/builder/indexer.py +++ b/kag/examples/2wiki/builder/indexer.py @@ -11,7 +11,7 @@ import logging from kag.common.registry import import_modules_from_path -from kag.builder.default_chain import DefaultUnstructuredBuilderChain +from kag.builder.runner import BuilderChainRunner logger = logging.getLogger(__name__) @@ -19,9 +19,8 @@ def buildKB(file_path): from kag.common.conf import KAG_CONFIG - chain_config = KAG_CONFIG.all_config["chain"] - chain = DefaultUnstructuredBuilderChain.from_config(chain_config) - chain.invoke(file_path=file_path, max_workers=10) + runner = BuilderChainRunner.from_config(KAG_CONFIG.all_config["runner"]) + runner.invoke(file_path) logger.info(f"\n\nbuildKB successfully for {file_path}\n\n") diff --git a/kag/examples/2wiki/kag_config.yaml b/kag/examples/2wiki/kag_config.yaml index 3b3240f5..a3f5673b 100644 --- a/kag/examples/2wiki/kag_config.yaml +++ b/kag/examples/2wiki/kag_config.yaml @@ -3,7 +3,7 @@ project: host_addr: http://127.0.0.1:8887 language: en namespace: TwoWiki - id: 2 + id: 1 vectorize_model: &vectorize_model_config @@ -11,6 +11,8 @@ vectorize_model: &vectorize_model_config type: bge vector_dimensions: 768 +vectorizer: *vectorize_model_config + llm: &llm_conf api_key: key @@ -21,31 +23,37 @@ llm: &llm_conf log: level: INFO - -chain: - extractor: - llm: *llm_conf - ner_prompt: - type: 2wiki_ner - std_prompt: - type: 2wiki_std - triple_prompt: - type: 2wiki_triple - type: kag +runner: + num_parallel: 10 + chain_level_num_paralle: 1 reader: - type: 2wiki - splitter: - split_length: 100000 - type: length - window_length: 0 - vectorizer: - type: batch - vectorize_model: *vectorize_model_config - post_processor: - type: base - similarity_threshold: 0.9 - writer: - type: kg + type: + 2wiki + chain: + type: unstructured + extractor: + llm: *llm_conf + ner_prompt: + type: 2wiki_ner + std_prompt: + type: 2wiki_std + triple_prompt: + type: 2wiki_triple + type: kag + parser: + type: dict + splitter: + split_length: 100000 + type: length + window_length: 0 + vectorizer: + type: batch + vectorize_model: *vectorize_model_config + post_processor: + type: base + similarity_threshold: 0.9 + writer: + type: kg lf_solver_pipeline: generator: diff --git a/kag/examples/hotpotqa/builder/indexer.py b/kag/examples/hotpotqa/builder/indexer.py index 32e06e4a..1e132a67 100644 --- a/kag/examples/hotpotqa/builder/indexer.py +++ b/kag/examples/hotpotqa/builder/indexer.py @@ -10,7 +10,8 @@ # or implied. import logging from kag.common.registry import import_modules_from_path -from kag.builder.default_chain import DefaultUnstructuredBuilderChain + +from kag.builder.runner import BuilderChainRunner logger = logging.getLogger(__name__) @@ -18,9 +19,8 @@ def buildKB(file_path): from kag.common.conf import KAG_CONFIG - chain_config = KAG_CONFIG.all_config["chain"] - chain = DefaultUnstructuredBuilderChain.from_config(chain_config) - chain.invoke(file_path=file_path, max_workers=10) + runner = BuilderChainRunner.from_config(KAG_CONFIG.all_config["runner"]) + runner.invoke(file_path) logger.info(f"\n\nbuildKB successfully for {file_path}\n\n") diff --git a/kag/examples/hotpotqa/kag_config.yaml b/kag/examples/hotpotqa/kag_config.yaml index 6987ff18..2b521935 100644 --- a/kag/examples/hotpotqa/kag_config.yaml +++ b/kag/examples/hotpotqa/kag_config.yaml @@ -21,31 +21,36 @@ llm: &llm_conf log: level: INFO - -chain: - extractor: - llm: *llm_conf - ner_prompt: - type: hotpotqa_ner - std_prompt: - type: hotpotqa_std - triple_prompt: - type: hotpotqa_triple - type: kag +runner: + num_parallel: 10 reader: - type: hotpotqa - splitter: - split_length: 100000 - type: length - window_length: 0 - vectorizer: - type: batch - vectorize_model: *vectorize_model_config - post_processor: - type: base - similarity_threshold: 0.9 - writer: - type: kg + type: + hotpotqa + chain: + type: unstructured + extractor: + llm: *llm_conf + ner_prompt: + type: hotpotqa_ner + std_prompt: + type: hotpotqa_std + triple_prompt: + type: hotpotqa_triple + type: kag + parser: + type: dict + splitter: + split_length: 100000 + type: length + window_length: 0 + vectorizer: + type: batch + vectorize_model: *vectorize_model_config + post_processor: + type: base + similarity_threshold: 0.9 + writer: + type: kg lf_solver_pipeline: generator: diff --git a/kag/examples/medicine/builder/indexer.py b/kag/examples/medicine/builder/indexer.py index b9356e1e..9f817b28 100644 --- a/kag/examples/medicine/builder/indexer.py +++ b/kag/examples/medicine/builder/indexer.py @@ -1,85 +1,27 @@ import os -from kag.interface import ( - MappingABC, - SourceReaderABC, - SinkWriterABC, - SplitterABC, - ExtractorABC, - VectorizerABC, -) +import copy from kag.common.conf import KAG_CONFIG -from kag.builder.default_chain import ( - DefaultStructuredBuilderChain, -) -from kag.common.registry import Registrable, import_modules_from_path -from kag.interface import KAGBuilderChain as BuilderChainABC - - -class SPOBuilderChain(BuilderChainABC): - def __init__( - self, - reader: SourceReaderABC, - mapping: MappingABC, - vectorizer: VectorizerABC, - writer: SinkWriterABC, - ): - self.reader = reader - self.mapping = mapping - self.vectorizer = vectorizer - self.writer = writer - - def build(self, **kwargs): - - self.mapping.add_field_mappings( - s_id_col="S", - p_type_col="P", - o_id_col="O", - ).add_sub_property_mapping("properties") - - return self.reader >> self.mapping >> self.vectorizer >> self.writer - - -class DiseaseBuilderChain(BuilderChainABC): - def __init__( - self, - reader: SourceReaderABC, - splitter: SplitterABC, - extractor: ExtractorABC, - vectorizer: VectorizerABC, - writer: SinkWriterABC, - ): - self.reader = reader - self.splitter = splitter - self.extractor = extractor - self.vectorizer = vectorizer - self.writer = writer - - def build(self, **kwargs): - return ( - self.reader - >> self.splitter - >> self.extractor - >> self.vectorizer - >> self.writer - ) +from kag.common.registry import import_modules_from_path +from kag.builder.runner import BuilderChainRunner def import_data(): pwd = os.path.dirname(__file__) - DefaultStructuredBuilderChain("HumanBodyPart").invoke( - file_path=os.path.join(pwd, "data/HumanBodyPart.csv") - ) - DefaultStructuredBuilderChain("HospitalDepartment").invoke( - file_path=os.path.join(pwd, "data/HospitalDepartment.csv") - ) - - extractor_chain = DiseaseBuilderChain.from_config( - KAG_CONFIG.all_config["extract_chain"] - ) - extractor_chain.invoke(file_path=os.path.join(pwd, "data/Disease.csv")) - - spo_chain = SPOBuilderChain.from_config(KAG_CONFIG.all_config["spo_chain"]) - spo_chain.invoke(file_path=os.path.join(pwd, "data/SPO.csv")) + spo_runner_config = KAG_CONFIG.all_config["spg_runner"] + for spg_type_name in ["HumanBodyPart", "HospitalDepartment"]: + runner_config = copy.deepcopy(spo_runner_config) + runner_config["chain"]["mapping"]["spg_type_name"] = spg_type_name + file_path = os.path.join(pwd, f"data/{spg_type_name}.csv") + runner = BuilderChainRunner.from_config(runner_config) + runner.invoke(file_path) + + extract_runner_config = KAG_CONFIG.all_config["extract_runner"] + extract_runner = BuilderChainRunner.from_config(extract_runner_config) + extract_runner.invoke(os.path.join(pwd, "data/Disease.csv")) + + spo_runner_config = KAG_CONFIG.all_config["spo_runner"] + spo_runner = BuilderChainRunner.from_config(spo_runner_config) + spo_runner.invoke(os.path.join(pwd, "data/SPO.csv")) if __name__ == "__main__": diff --git a/kag/examples/medicine/kag_config.yaml b/kag/examples/medicine/kag_config.yaml index 84c00ebd..9307beaa 100644 --- a/kag/examples/medicine/kag_config.yaml +++ b/kag/examples/medicine/kag_config.yaml @@ -25,44 +25,68 @@ log: level: INFO -spo_chain: +spg_runner: reader: - type: csv - output_type: Dict - mapping: - type: spo - vectorizer: - type: batch - vectorize_model: *vectorize_model_config - writer: - type: kg + type: + csv + chain: + type: structured + mapping: + type: spg + writer: + type: kg + ckpt_dir: ./spg-runner-ckpt + +spo_runner: + reader: + type: + csv + chain: + type: structured + mapping: + type: spo + s_id_col: S + p_type_col: P + o_id_col: O + sub_property_col: properties + + writer: + type: kg + ckpt_dir: ./spo-runner-ckpt -extract_chain: +extract_runner: + num_parallel: 10 reader: - type: csv - output_type: Chunk - id_col: idx - name_col: title - content_col: text - splitter: - split_length: 100000 - type: length - window_length: 0 - extractor: - llm: *llm_conf - ner_prompt: - type: example_medical_ner - std_prompt: - type: example_medical_std - triple_prompt: - type: example_medical_triple - type: kag - vectorizer: - type: batch - vectorize_model: *vectorize_model_config - writer: - type: kg - + type: + csv + chain: + type: unstructured + parser: + type: dict + id_col: idx + name_col: title + content_col: text + splitter: + split_length: 100000 + type: length + window_length: 0 + extractor: + llm: *llm_conf + ner_prompt: + type: example_medical_ner + std_prompt: + type: example_medical_std + triple_prompt: + type: example_medical_triple + type: kag + vectorizer: + type: batch + vectorize_model: *vectorize_model_config + writer: + type: kg + ckpt_dir: ./extract-runner-ckpt + + lf_solver_pipeline: generator: generate_prompt: diff --git a/kag/examples/musique/builder/indexer.py b/kag/examples/musique/builder/indexer.py index 3dd9bb6d..5dea072d 100644 --- a/kag/examples/musique/builder/indexer.py +++ b/kag/examples/musique/builder/indexer.py @@ -9,8 +9,9 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. import logging -from kag.common.registry import Registrable, import_modules_from_path -from kag.builder.default_chain import DefaultUnstructuredBuilderChain +from kag.common.registry import import_modules_from_path + +from kag.builder.runner import BuilderChainRunner logger = logging.getLogger(__name__) @@ -18,9 +19,8 @@ def buildKB(file_path): from kag.common.conf import KAG_CONFIG - chain_config = KAG_CONFIG.all_config["chain"] - chain = DefaultUnstructuredBuilderChain.from_config(chain_config) - chain.invoke(file_path=file_path, max_workers=10) + runner = BuilderChainRunner.from_config(KAG_CONFIG.all_config["runner"]) + runner.invoke(file_path) logger.info(f"\n\nbuildKB successfully for {file_path}\n\n") diff --git a/kag/examples/musique/kag_config.yaml b/kag/examples/musique/kag_config.yaml index 7c3eab37..dcfadc28 100644 --- a/kag/examples/musique/kag_config.yaml +++ b/kag/examples/musique/kag_config.yaml @@ -2,7 +2,7 @@ project: biz_scene: default host_addr: http://127.0.0.1:8887 language: en - namespace: MuSiQue + namespace: Musique id: 1 @@ -21,31 +21,37 @@ llm: &llm_conf log: level: INFO - -chain: - extractor: - llm: *llm_conf - ner_prompt: - type: musique_ner - std_prompt: - type: musique_std - triple_prompt: - type: musique_triple - type: kag +runner: + num_parallel: 10 + chain_level_num_paralle: 1 reader: - type: musique - splitter: - split_length: 100000 - type: length - window_length: 0 - vectorizer: - type: batch - vectorize_model: *vectorize_model_config - post_processor: - type: base - similarity_threshold: 0.9 - writer: - type: kg + type: + musique + chain: + type: unstructured + extractor: + llm: *llm_conf + ner_prompt: + type: musique_ner + std_prompt: + type: musique_std + triple_prompt: + type: musique_triple + type: kag + parser: + type: dict + splitter: + split_length: 100000 + type: length + window_length: 0 + vectorizer: + type: batch + vectorize_model: *vectorize_model_config + post_processor: + type: base + similarity_threshold: 0.9 + writer: + type: kg lf_solver_pipeline: generator: diff --git a/kag/examples/riskmining/builder/indexer.py b/kag/examples/riskmining/builder/indexer.py index 8c57ea79..40e99812 100644 --- a/kag/examples/riskmining/builder/indexer.py +++ b/kag/examples/riskmining/builder/indexer.py @@ -17,6 +17,7 @@ from kag.builder.component.reader.csv_reader import CSVReader from kag.common.conf import KAG_CONFIG from kag.interface import KAGBuilderChain as BuilderChainABC +from kag.builder.runner import BuilderChainRunner class RiskMiningEntityChain(BuilderChainABC): @@ -25,12 +26,11 @@ def __init__(self, spg_type_name: str): self.spg_type_name = spg_type_name def build(self, **kwargs): - source = CSVReader(output_type="Dict") mapping = SPGTypeMapping(spg_type_name=self.spg_type_name) vectorizer = BatchVectorizer.from_config(KAG_CONFIG.all_config["vectorizer"]) sink = KGWriter() - chain = source >> mapping >> vectorizer >> sink + chain = mapping >> vectorizer >> sink return chain @@ -40,7 +40,6 @@ def __init__(self, spg_type_name: str): self.spg_type_name = spg_type_name def build(self, **kwargs): - source = CSVReader(output_type="Dict") subject_name, relation, object_name = self.spg_type_name.split("_") mapping = ( RelationMapping(subject_name, relation, object_name) @@ -48,7 +47,7 @@ def build(self, **kwargs): .add_dst_id_mapping("dst") ) sink = KGWriter() - return source >> mapping >> sink + return mapping >> sink class RiskMiningPersonFundTransPersonChain(RiskMiningRelationChain): @@ -56,7 +55,6 @@ def __init__(self, spg_type_name: str): super().__init__(spg_type_name) def build(self, **kwargs): - source = CSVReader(output_type="Dict") subject_name, relation, object_name = self.spg_type_name.split("_") mapping = ( RelationMapping(subject_name, relation, object_name) @@ -66,47 +64,82 @@ def build(self, **kwargs): .add_sub_property_mapping("transAmt", "transAmt") ) sink = KGWriter() - return source >> mapping >> sink + return mapping >> sink def import_data(): file_path = os.path.dirname(__file__) - RiskMiningEntityChain(spg_type_name="Cert").invoke( - os.path.join(file_path, "data/Cert.csv") - ) - RiskMiningEntityChain(spg_type_name="App").invoke( - os.path.join(file_path, "data/App.csv") - ) - RiskMiningEntityChain(spg_type_name="Company").invoke( - os.path.join(file_path, "data/Company.csv") - ) - RiskMiningRelationChain(spg_type_name="Company_hasCert_Cert").invoke( - os.path.join(file_path, "data/Company_hasCert_Cert.csv") - ) - RiskMiningEntityChain(spg_type_name="Device").invoke( - os.path.join(file_path, "data/Device.csv") - ) - RiskMiningPersonFundTransPersonChain( - spg_type_name="Person_fundTrans_Person" - ).invoke(os.path.join(file_path, "data/Person_fundTrans_Person.csv")) - RiskMiningRelationChain(spg_type_name="Person_hasCert_Cert").invoke( - os.path.join(file_path, "data/Person_hasCert_Cert.csv") - ) - RiskMiningRelationChain(spg_type_name="Person_hasDevice_Device").invoke( - os.path.join(file_path, "data/Person_hasDevice_Device.csv") - ) - RiskMiningRelationChain(spg_type_name="Person_holdShare_Company").invoke( - os.path.join(file_path, "data/Person_holdShare_Company.csv") - ) - RiskMiningEntityChain(spg_type_name="Person").invoke( - os.path.join(file_path, "data/Person.csv") - ) - RiskMiningEntityChain(spg_type_name="TaxOfRiskApp").invoke( - os.path.join(file_path, "data/TaxOfRiskApp.csv") - ) - RiskMiningEntityChain(spg_type_name="TaxOfRiskUser").invoke( - os.path.join(file_path, "data/TaxOfRiskUser.csv") - ) + for spg_type_name in [ + "App", + "Cert", + "Company", + "Device", + "Person", + "TaxOfRiskApp", + "TaxOfRiskUser", + ]: + file_name = os.path.join(file_path, f"data/{spg_type_name}.csv") + chain = RiskMiningEntityChain(spg_type_name=spg_type_name) + runner = BuilderChainRunner( + reader=CSVReader(), + chain=chain, + ) + runner.invoke(file_name) + + for spg_type_name in [ + "Company_hasCert_Cert", + "Person_fundTrans_Person", + "Person_hasCert_Cert", + "Person_hasDevice_Device", + "Person_holdShare_Company", + ]: + file_name = os.path.join(file_path, f"data/{spg_type_name}.csv") + if spg_type_name == "Person_fundTrans_Person": + chain = RiskMiningPersonFundTransPersonChain(spg_type_name=spg_type_name) + else: + chain = RiskMiningRelationChain(spg_type_name=spg_type_name) + runner = BuilderChainRunner( + reader=CSVReader(), + chain=chain, + ) + runner.invoke(file_name) + + # RiskMiningEntityChain(spg_type_name="Cert").invoke( + # os.path.join(file_path, "data/Cert.csv") + # ) + # RiskMiningEntityChain(spg_type_name="App").invoke( + # os.path.join(file_path, "data/App.csv") + # ) + # RiskMiningEntityChain(spg_type_name="Company").invoke( + # os.path.join(file_path, "data/Company.csv") + # ) + # RiskMiningRelationChain(spg_type_name="Company_hasCert_Cert").invoke( + # os.path.join(file_path, "data/Company_hasCert_Cert.csv") + # ) + # RiskMiningEntityChain(spg_type_name="Device").invoke( + # os.path.join(file_path, "data/Device.csv") + # ) + # RiskMiningPersonFundTransPersonChain( + # spg_type_name="Person_fundTrans_Person" + # ).invoke(os.path.join(file_path, "data/Person_fundTrans_Person.csv")) + # RiskMiningRelationChain(spg_type_name="Person_hasCert_Cert").invoke( + # os.path.join(file_path, "data/Person_hasCert_Cert.csv") + # ) + # RiskMiningRelationChain(spg_type_name="Person_hasDevice_Device").invoke( + # os.path.join(file_path, "data/Person_hasDevice_Device.csv") + # ) + # RiskMiningRelationChain(spg_type_name="Person_holdShare_Company").invoke( + # os.path.join(file_path, "data/Person_holdShare_Company.csv") + # ) + # RiskMiningEntityChain(spg_type_name="Person").invoke( + # os.path.join(file_path, "data/Person.csv") + # ) + # RiskMiningEntityChain(spg_type_name="TaxOfRiskApp").invoke( + # os.path.join(file_path, "data/TaxOfRiskApp.csv") + # ) + # RiskMiningEntityChain(spg_type_name="TaxOfRiskUser").invoke( + # os.path.join(file_path, "data/TaxOfRiskUser.csv") + # ) if __name__ == "__main__": diff --git a/kag/examples/riskmining/kag_config.yaml b/kag/examples/riskmining/kag_config.yaml index 73138bad..950f756d 100644 --- a/kag/examples/riskmining/kag_config.yaml +++ b/kag/examples/riskmining/kag_config.yaml @@ -1,24 +1,10 @@ -llm: - api_key: - base_url: https://api.deepseek.com - model: deepseek-chat - type: maas -log: - level: INFO project: biz_scene: default host_addr: http://127.0.0.1:8887 id: '5' language: zh namespace: RiskMining - -lf_solver_pipeline: - reasoner: - lf_planner: - type: base - logic_form_plan_prompt: - type: riskmining_lf_plan - + vectorize_model: &vectorize_model_config type: mock vector_dimensions: 768 @@ -26,3 +12,19 @@ vectorize_model: &vectorize_model_config vectorizer: type: batch vectorize_model: *vectorize_model_config +log: + level: INFO + +llm: + api_key: key + base_url: https://api.deepseek.com + model: deepseek-chat + type: maas + +lf_solver_pipeline: + reasoner: + lf_planner: + type: base + logic_form_plan_prompt: + type: riskmining_lf_plan + diff --git a/kag/examples/supplychain/builder/data/Product.csv b/kag/examples/supplychain/builder/data/Product.csv index 302f3861..ec34cadc 100644 --- a/kag/examples/supplychain/builder/data/Product.csv +++ b/kag/examples/supplychain/builder/data/Product.csv @@ -16,11 +16,6 @@ id,belongToIndustry,hasSupplyChain 轮胎与橡胶-轮胎-斜交轮胎,非日常生活消费品-汽车与汽车零部件-汽车零配件-轮胎与橡胶,"建筑、农用机械与重型卡车-港口机械,汽车-摩托车制造-三轮摩托车,汽车-摩托车制造-二轮摩托车,机动车贸易-机动车零配件零售,建筑、农用机械与重型卡车-农业机械-农机具及其零部件-收割机,商业服务-综合支持服务-加工劳务-轮胎分装,建筑、农用机械与重型卡车-工程机械-筑养路机械,建筑、农用机械与重型卡车-工程机械-起重装卸机械,建筑、农用机械与重型卡车-农业机械-农机具及其零部件-农用车辆,建筑、农用机械与重型卡车-农业机械-农机具及其零部件-农用车辆-拖拉机,建筑、农用机械与重型卡车-工程机械-混凝土机械" 建筑、农用机械与重型卡车,工业-资本品-机械制造-建筑、农用机械与重型卡车,"商业服务-综合支持服务-安装劳务,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,工业设备和产品贸易-其他工业机械和产品经销商,公路与铁路运输,交通基本设施,工业设备和产品贸易,商业服务-综合支持服务-安装劳务-其他安装劳务,煤与消费用燃料,建筑与工程,建筑与工程-其他建筑与工程承包" 建筑、农用机械与重型卡车-港口机械,工业-资本品-机械制造-建筑、农用机械与重型卡车,"交通基本设施-海港与服务-港口服务-港口物流,交通基本设施-海港与服务,交通基本设施-海港与服务-港口服务,交通基本设施-海港与服务-港口服务-其他港口服务,交通基本设施-海港与服务-港口服务-港口物流-装卸业务,交通基本设施-海港与服务-港口服务-港口物流-堆存业务,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁" -化工商品贸易,商贸-资本品商贸-工业资本品贸易-化工商品贸易, -化工商品贸易-化工产品贸易,商贸-资本品商贸-工业资本品贸易-化工商品贸易, -化工商品贸易-化工产品贸易-橡塑制品贸易,商贸-资本品商贸-工业资本品贸易-化工商品贸易, -机动车贸易,商贸-消费品商贸-非日常消费品商贸-机动车贸易, -机动车贸易-汽车贸易,商贸-消费品商贸-非日常消费品商贸-机动车贸易, 汽车,非日常生活消费品-汽车与汽车零部件-汽车-汽车,"机动车贸易-汽车贸易,公路与铁路运输-陆运,公路与铁路运输-陆运-汽车租赁,特殊消费者服务-商品预订,商业服务-综合支持服务-维护服务-交通工具维护服务-机动车维修服务,保险-财产与意外伤害保险-机动车辆险,石油与天然气-石油与天然气的炼制和营销-石油炼制产品-成品油-汽油,商业服务-综合支持服务-维护服务-交通工具维护服务,机动车贸易-其他机动车贸易,调查和咨询服务-专业技术服务-其他技术服务,商业服务,机动车贸易,机动车贸易-汽车贸易-二手车零售,综合金融服务-特殊金融服务-汽车金融服务,调查和咨询服务-专业技术服务,商业服务-综合支持服务-汽车美容服务-洗车服务,商业服务-综合支持服务-汽车试驾测试服务,航空货运与物流-物流服务-汽车物流服务,互联网软件与服务-电商平台提供商-汽车类信息服务,商业服务-综合支持服务-汽车美容服务" 轮胎与橡胶,非日常生活消费品-汽车与汽车零部件-汽车零配件-轮胎与橡胶,"汽车-汽车制造,建筑、农用机械与重型卡车-环卫机械-环卫车,机动车贸易-机动车零配件零售,商业服务-综合支持服务-维护服务-交通工具维护服务-机动车维修服务,建筑、农用机械与重型卡车-重卡及专用车,汽车,建筑、农用机械与重型卡车-工程机械-工业车辆,建筑、农用机械与重型卡车-农业机械-农机具及其零部件-农用车辆" 轮胎与橡胶-轮胎,非日常生活消费品-汽车与汽车零部件-汽车零配件-轮胎与橡胶,"汽车-汽车制造,建筑、农用机械与重型卡车-环卫机械-环卫车,消闲用品-自行车,机动车贸易-机动车零配件零售,建筑、农用机械与重型卡车-重卡及专用车,汽车,建筑、农用机械与重型卡车-工程机械-工业车辆,商业服务-综合支持服务-加工劳务-轮胎分装,建筑、农用机械与重型卡车-农业机械-农机具及其零部件-农用车辆,建筑、农用机械与重型卡车-机场服务设备-机场专用车辆" @@ -41,13 +36,7 @@ id,belongToIndustry,hasSupplyChain 建筑、农用机械与重型卡车-环卫机械-环卫车,工业-资本品-机械制造-建筑、农用机械与重型卡车,"消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,机动车贸易-其他机动车贸易,机动车贸易,商业服务-综合支持服务-清洁服务-道路清洁服务" 消闲用品,非日常生活消费品-耐用消费品与服装-休闲设备与用品-消闲用品,"赌场与赌博,消闲设施,消闲设施-其他消闲设施" 消闲用品-自行车,非日常生活消费品-耐用消费品与服装-休闲设备与用品-消闲用品,"保险-财产与意外伤害保险-非机动车辆险,商业服务-综合支持服务-维护服务-交通工具维护服务,生活消费品贸易" -机动车贸易,商贸-消费品商贸-非日常消费品商贸-机动车贸易, -机动车贸易-机动车零配件零售,商贸-消费品商贸-非日常消费品商贸-机动车贸易, -商业服务,工业-商业和专业服务-商业服务与商业用品-商业服务, 商业服务-综合支持服务,工业-商业和专业服务-商业服务与商业用品-商业服务,"商业服务-环境与设施服务,商业服务-环境与设施服务-其他环境工程服务,信息技术服务-信息科技咨询与其他服务,信息技术服务-信息科技咨询与其他服务-其他信息科技服务,互联网软件与服务-电商平台提供商-其他电子商务服务,互联网软件与服务-电商平台提供商" -商业服务-综合支持服务-维护服务,工业-商业和专业服务-商业服务与商业用品-商业服务, -商业服务-综合支持服务-维护服务-交通工具维护服务,工业-商业和专业服务-商业服务与商业用品-商业服务, -商业服务-综合支持服务-维护服务-交通工具维护服务-机动车维修服务,工业-商业和专业服务-商业服务与商业用品-商业服务, 保险,金融-保险-保险-保险,调查和咨询服务-咨询服务-金融中介服务-保险公估服务 保险-财产与意外伤害保险,金融-保险-保险-保险,"保险-保险经纪服务,保险-再保险,调查和咨询服务-咨询服务-金融中介服务-保险公估服务" 保险-财产与意外伤害保险-机动车辆险,金融-保险-保险-保险,保险-保险经纪服务 @@ -66,10 +55,7 @@ id,belongToIndustry,hasSupplyChain 建筑、农用机械与重型卡车,工业-资本品-机械制造-建筑、农用机械与重型卡车,"商业服务-综合支持服务-安装劳务,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,工业设备和产品贸易-其他工业机械和产品经销商,公路与铁路运输,交通基本设施,工业设备和产品贸易,商业服务-综合支持服务-安装劳务-其他安装劳务,煤与消费用燃料,建筑与工程,建筑与工程-其他建筑与工程承包" 建筑、农用机械与重型卡车-工程机械,工业-资本品-机械制造-建筑、农用机械与重型卡车,"建筑与工程-水利工程-疏浚,工业设备和产品贸易-工业机械贸易-农业与工程机械贸易,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,石油与天然气-石油与天然气的炼制和营销-石油炼制产品-成品油-柴油,建筑与工程-燃气管道工程-城市燃气管道建设-燃气管道安装服务,建筑与工程-专项建设工程-爆破工程,建筑与工程-专项建设工程,建筑与工程-房屋建筑工程,建筑与工程-专项建设工程-其他专项建设工程,建筑与工程-工业建筑工程-化工工程,建筑与工程-交通工程-铁路工程,建筑与工程-交通工程-公路工程,建筑与工程-交通工程-桥梁工程,建筑与工程-交通工程-隧道工程,调查和咨询服务-专业技术服务-矿山开发服务-矿石开采及冶炼,建筑与工程-水利工程-节水灌溉工程,建筑与工程-工业建筑工程-冶金工程,建筑与工程-医疗建筑工程,建筑与工程-医疗建筑工程-医疗专业工程,建筑与工程-照明工程,建筑与工程-专项建设工程-供暖工程,建筑与工程-工业建筑工程,建筑与工程-专项建设工程-体育设施工程,建筑与工程-专项建设工程-混凝土拆卸工程,建筑与工程-专项建设工程-防腐涂装工程,建筑与工程-专项建设工程-建筑保温工程,建筑与工程-专项建设工程-供暖工程-地暖工程,建筑与工程-水利工程-给排水工程,建筑与工程-水利工程-给排水工程-供水管道安装工程,建筑与工程-燃气管道工程-长距离管道建设,建筑与工程-燃气管道工程-城市燃气管道建设,建筑与工程-专项建设工程-古建筑修复工程,建筑与工程-专项建设工程-气膜建筑工程,建筑与工程-专项建设工程-基坑支护工程,建筑与工程-专项建设工程-斜坡防护工程,建筑与工程,建筑与工程-交通工程,建筑与工程-水利工程,建筑与工程-电力工程,建筑与工程-燃气管道工程,建筑与工程-工业建筑工程-煤炭工程,建筑与工程-其他建筑与工程承包,建筑与工程-专项建设工程-强夯地基工程,建筑与工程-专项建设工程-建筑防水工程" 建筑、农用机械与重型卡车-工程机械-工业车辆,工业-资本品-机械制造-建筑、农用机械与重型卡车,"交通基本设施-海港与服务-港口服务-港口物流,工业设备和产品贸易-工业机械贸易-农业与工程机械贸易,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,房地产开发与经营-多样化房地产业务,石油与天然气-石油与天然气的炼制和营销-石油炼制产品-成品油-柴油,房地产开发与经营-土地开发,建筑材料-基础材料-集料-砂石,建筑与工程-工业建筑工程-化工工程,房地产开发与经营,建筑与工程-交通工程-公路工程,建筑与工程-交通工程-桥梁工程,消费信贷-租赁服务-经营性租赁-机械产品经营租赁-叉车租赁,房地产开发与经营-土地开发-土地一级开发,建筑与工程-交通工程" -商业服务,工业-商业和专业服务-商业服务与商业用品-商业服务, 商业服务-综合支持服务,工业-商业和专业服务-商业服务与商业用品-商业服务,"商业服务-环境与设施服务,商业服务-环境与设施服务-其他环境工程服务,信息技术服务-信息科技咨询与其他服务,信息技术服务-信息科技咨询与其他服务-其他信息科技服务,互联网软件与服务-电商平台提供商-其他电子商务服务,互联网软件与服务-电商平台提供商" -商业服务-综合支持服务-加工劳务,工业-商业和专业服务-商业服务与商业用品-商业服务, -商业服务-综合支持服务-加工劳务-轮胎分装,工业-商业和专业服务-商业服务与商业用品-商业服务, 建筑、农用机械与重型卡车,工业-资本品-机械制造-建筑、农用机械与重型卡车,"商业服务-综合支持服务-安装劳务,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,工业设备和产品贸易-其他工业机械和产品经销商,公路与铁路运输,交通基本设施,工业设备和产品贸易,商业服务-综合支持服务-安装劳务-其他安装劳务,煤与消费用燃料,建筑与工程,建筑与工程-其他建筑与工程承包" 建筑、农用机械与重型卡车-工程机械,工业-资本品-机械制造-建筑、农用机械与重型卡车,"建筑与工程-水利工程-疏浚,工业设备和产品贸易-工业机械贸易-农业与工程机械贸易,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,石油与天然气-石油与天然气的炼制和营销-石油炼制产品-成品油-柴油,建筑与工程-燃气管道工程-城市燃气管道建设-燃气管道安装服务,建筑与工程-专项建设工程-爆破工程,建筑与工程-专项建设工程,建筑与工程-房屋建筑工程,建筑与工程-专项建设工程-其他专项建设工程,建筑与工程-工业建筑工程-化工工程,建筑与工程-交通工程-铁路工程,建筑与工程-交通工程-公路工程,建筑与工程-交通工程-桥梁工程,建筑与工程-交通工程-隧道工程,调查和咨询服务-专业技术服务-矿山开发服务-矿石开采及冶炼,建筑与工程-水利工程-节水灌溉工程,建筑与工程-工业建筑工程-冶金工程,建筑与工程-医疗建筑工程,建筑与工程-医疗建筑工程-医疗专业工程,建筑与工程-照明工程,建筑与工程-专项建设工程-供暖工程,建筑与工程-工业建筑工程,建筑与工程-专项建设工程-体育设施工程,建筑与工程-专项建设工程-混凝土拆卸工程,建筑与工程-专项建设工程-防腐涂装工程,建筑与工程-专项建设工程-建筑保温工程,建筑与工程-专项建设工程-供暖工程-地暖工程,建筑与工程-水利工程-给排水工程,建筑与工程-水利工程-给排水工程-供水管道安装工程,建筑与工程-燃气管道工程-长距离管道建设,建筑与工程-燃气管道工程-城市燃气管道建设,建筑与工程-专项建设工程-古建筑修复工程,建筑与工程-专项建设工程-气膜建筑工程,建筑与工程-专项建设工程-基坑支护工程,建筑与工程-专项建设工程-斜坡防护工程,建筑与工程,建筑与工程-交通工程,建筑与工程-水利工程,建筑与工程-电力工程,建筑与工程-燃气管道工程,建筑与工程-工业建筑工程-煤炭工程,建筑与工程-其他建筑与工程承包,建筑与工程-专项建设工程-强夯地基工程,建筑与工程-专项建设工程-建筑防水工程" 建筑、农用机械与重型卡车-工程机械-筑养路机械,工业-资本品-机械制造-建筑、农用机械与重型卡车,"工业设备和产品贸易-工业机械贸易-农业与工程机械贸易,消费信贷-租赁服务,消费信贷-租赁服务-融资租赁,消费信贷-租赁服务-融资租赁-机械产品融资租赁,消费信贷-租赁服务-经营性租赁,消费信贷-租赁服务-经营性租赁-机械产品经营租赁,石油与天然气-石油与天然气的炼制和营销-石油炼制产品-成品油-柴油,建筑与工程-交通工程-公路工程,建筑与工程-交通工程-桥梁工程,建筑与工程-交通工程" diff --git a/kag/examples/supplychain/builder/indexer.py b/kag/examples/supplychain/builder/indexer.py index 0d7bc4b1..93385d80 100644 --- a/kag/examples/supplychain/builder/indexer.py +++ b/kag/examples/supplychain/builder/indexer.py @@ -23,6 +23,7 @@ from knext.search.client import SearchClient from kag.interface import KAGBuilderChain as BuilderChainABC from knext.search.client import SearchClient +from kag.builder.runner import BuilderChainRunner def company_link_func(prop_value, node): @@ -38,11 +39,10 @@ def company_link_func(prop_value, node): class SupplyChainPersonChain(BuilderChainABC): def __init__(self, spg_type_name: str): - super().__init__() + # super().__init__() self.spg_type_name = spg_type_name def build(self, **kwargs): - source = CSVReader(output_type="Dict") mapping = ( SPGTypeMapping(spg_type_name=self.spg_type_name) .add_property_mapping("name", "name") @@ -56,7 +56,7 @@ def build(self, **kwargs): ) vectorizer = BatchVectorizer.from_config(KAG_CONFIG.all_config["vectorizer"]) sink = KGWriter() - return source >> mapping >> vectorizer >> sink + return mapping >> vectorizer >> sink class SupplyChainCompanyFundTransCompanyChain(BuilderChainABC): @@ -65,7 +65,6 @@ def __init__(self, spg_type_name: str): self.spg_type_name = spg_type_name def build(self, **kwargs): - source = CSVReader(output_type="Dict") subject_name, relation, object_name = self.spg_type_name.split("_") date_process_op = FundDateProcessComponent() mapping = ( @@ -77,12 +76,13 @@ def build(self, **kwargs): ) vectorizer = BatchVectorizer.from_config(KAG_CONFIG.all_config["vectorizer"]) sink = KGWriter() - return source >> date_process_op >> mapping >> vectorizer >> sink + return date_process_op >> mapping >> vectorizer >> sink -class SupplyChainDefaulStructuredBuilderChain(DefaultStructuredBuilderChain): - def __init__(self, spg_type_name: str, **kwargs): - super().__init__(spg_type_name, **kwargs) +class SupplyChainDefaulStructuredBuilderChain(BuilderChainABC): + def __init__(self, spg_type_name: str): + super().__init__() + self.spg_type_name = spg_type_name def build(self, **kwargs): """ @@ -94,17 +94,16 @@ def build(self, **kwargs): Returns: chain: The constructed processing chain. """ - source = CSVReader(output_type="Dict") mapping = SPGTypeMapping(spg_type_name=self.spg_type_name) sink = KGWriter() vectorizer = BatchVectorizer.from_config(KAG_CONFIG.all_config["vectorizer"]) - chain = source >> mapping >> vectorizer >> sink + chain = mapping >> vectorizer >> sink return chain class SupplyChainEventBuilderChain(DefaultStructuredBuilderChain): def __init__(self, spg_type_name: str, **kwargs): - super().__init__(spg_type_name, **kwargs) + self.spg_type_name = spg_type_name def build(self, **kwargs): """ @@ -116,47 +115,53 @@ def build(self, **kwargs): Returns: chain: The constructed processing chain. """ - source = CSVReader(output_type="Dict") + mapping = SPGTypeMapping(spg_type_name=self.spg_type_name) sink = EventKGWriter() vectorizer = BatchVectorizer.from_config(KAG_CONFIG.all_config["vectorizer"]) - chain = source >> mapping >> vectorizer >> sink + chain = mapping >> vectorizer >> sink return chain def import_data(): file_path = os.path.dirname(__file__) - SupplyChainDefaulStructuredBuilderChain(spg_type_name="TaxOfCompanyEvent").invoke( - file_path=os.path.join(file_path, "data/TaxOfCompanyEvent.csv") - ) - SupplyChainDefaulStructuredBuilderChain(spg_type_name="TaxOfProdEvent").invoke( - file_path=os.path.join(file_path, "data/TaxOfProdEvent.csv") - ) - SupplyChainDefaulStructuredBuilderChain(spg_type_name="Trend").invoke( - file_path=os.path.join(file_path, "data/Trend.csv") - ) - SupplyChainDefaulStructuredBuilderChain(spg_type_name="Industry").invoke( - file_path=os.path.join(file_path, "data/Industry.csv") - ) - SupplyChainDefaulStructuredBuilderChain(spg_type_name="Product").invoke( - file_path=os.path.join(file_path, "data/Product.csv") - ) - SupplyChainDefaulStructuredBuilderChain(spg_type_name="Company").invoke( - file_path=os.path.join(file_path, "data/Company.csv") - ) - SupplyChainDefaulStructuredBuilderChain(spg_type_name="Index").invoke( - file_path=os.path.join(file_path, "data/Index.csv") + for spg_type_name in [ + "TaxOfCompanyEvent", + "TaxOfProdEvent", + "Trend", + "Industry", + "Product", + "Company", + "Index", + "Person", + ]: + file_name = os.path.join(file_path, f"data/{spg_type_name}.csv") + if spg_type_name == "Person": + chain = SupplyChainPersonChain(spg_type_name=spg_type_name) + else: + chain = SupplyChainDefaulStructuredBuilderChain(spg_type_name=spg_type_name) + runner = BuilderChainRunner( + num_parallel=4, + reader=CSVReader(), + chain=chain, + ) + runner.invoke(file_name) + + chain = SupplyChainCompanyFundTransCompanyChain( + spg_type_name="Company_fundTrans_Company" ) - SupplyChainPersonChain(spg_type_name="Person").invoke( - file_path=os.path.join(file_path, "data/Person.csv") + runner = BuilderChainRunner( + reader=CSVReader(), + chain=chain, ) + runner.invoke(os.path.join(file_path, "data/Company_fundTrans_Company.csv")) - SupplyChainCompanyFundTransCompanyChain( - spg_type_name="Company_fundTrans_Company" - ).invoke(file_path=os.path.join(file_path, "data/Company_fundTrans_Company.csv")) - SupplyChainEventBuilderChain(spg_type_name="ProductChainEvent").invoke( - file_path=os.path.join(file_path, "data/ProductChainEvent.csv") + chain = SupplyChainEventBuilderChain(spg_type_name="ProductChainEvent") + runner = BuilderChainRunner( + reader=CSVReader(), + chain=chain, ) + runner.invoke(os.path.join(file_path, "data/ProductChainEvent.csv")) if __name__ == "__main__": diff --git a/kag/examples/supplychain/kag_config.yaml b/kag/examples/supplychain/kag_config.yaml index 9193ccab..54fa7ec5 100644 --- a/kag/examples/supplychain/kag_config.yaml +++ b/kag/examples/supplychain/kag_config.yaml @@ -14,7 +14,7 @@ log: project: biz_scene: default host_addr: http://127.0.0.1:8887 - id: '6' + id: '2' language: zh namespace: SupplyChain vectorize_model: &id001 diff --git a/kag/interface/builder/builder_chain_abc.py b/kag/interface/builder/builder_chain_abc.py index b5879353..e3973ac4 100644 --- a/kag/interface/builder/builder_chain_abc.py +++ b/kag/interface/builder/builder_chain_abc.py @@ -9,21 +9,36 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. - from typing import List + +from concurrent.futures import ThreadPoolExecutor, as_completed from kag.common.registry import Registrable from knext.builder.builder_chain_abc import BuilderChainABC class KAGBuilderChain(BuilderChainABC, Registrable): - def invoke(self, file_path, **kwargs): + def invoke(self, file_path, max_workers=10, **kwargs): def execute_node(node, inputs: List[str]): - result = [] - for item in inputs: - res = node.invoke(item) - result.extend(res) - return result + node_name = type(node).__name__.split(".")[-1] + with ThreadPoolExecutor(max_workers) as inner_executor: + inner_futures = [ + inner_executor.submit(node.invoke, inp) for inp in inputs + ] + result = [] + from tqdm import tqdm + + for inner_future in tqdm( + as_completed(inner_futures), + total=len(inner_futures), + desc=f"[{node_name}]", + position=1, + leave=False, + ): + # for inner_future in as_completed(inner_futures): + ret = inner_future.result() + result.extend(ret) + return result chain = self.build(file_path=file_path, **kwargs) dag = chain.dag @@ -31,10 +46,10 @@ def execute_node(node, inputs: List[str]): nodes = list(nx.topological_sort(dag)) node_outputs = {} - processed_node_names = [] + # processed_node_names = [] for node in nodes: - node_name = type(node).__name__.split(".")[-1] - processed_node_names.append(node_name) + # node_name = type(node).__name__.split(".")[-1] + # processed_node_names.append(node_name) predecessors = list(dag.predecessors(node)) if len(predecessors) == 0: node_input = [file_path] diff --git a/tests/unit/builder/data/long_text_1.txt b/tests/unit/builder/data/long_text_1.txt new file mode 100644 index 00000000..a353fb71 --- /dev/null +++ b/tests/unit/builder/data/long_text_1.txt @@ -0,0 +1,123 @@ +第一回 灵根育孕源流出 心性修持大道生 + +诗曰: + +混沌未分天地乱,茫茫渺渺无人见。 +自从盘古破鸿蒙,开辟从兹清浊辨。 +覆载群生仰至仁,发明万物皆成善。 +欲知造化会元功,须看《西游释厄传》。 + +盖闻天地之数,有十二万九千六百岁为一元。将一元分为十二会,乃子、丑、寅、卯、辰、巳(sì)、午、未、申、酉、戌、亥之十二支也。每会该一万八百岁。且就一日而论:子时得阳气,而丑则鸡鸣;寅不通光,而卯则日出;辰时食后,而巳则挨排;日午天中,而未则西蹉;申时晡而日落酉;戌黄昏而人定亥。譬于大数,若到戌会之终,则天地昏蒙而万物否矣。再去五千四百岁,交亥会之初,则当黑暗,而两间人物俱无矣,故曰混沌。又五千四百岁,亥会将终,贞下起元,近子之会,而复逐渐开明。邵康节曰:“冬至子之半,天心无改移。一阳初动处,万物未生时。”到此,天始有根。再五千四百岁,正当子会,轻清上腾,有日,有月,有星,有辰。日、月、星、辰,谓之四象。故曰,天开于子。又经五千四百岁,子会将终,近丑之会,而逐渐坚实。易曰:“大哉乾元!至哉坤元!万物资生,乃顺承天。”至此,地始凝结。再五千四百岁,正当丑会,重浊下凝,有水,有火,有山,有石,有土。水、火、山、石、土谓之五形。故曰,地辟于丑。又经五千四百岁,丑会终而寅会之初,发生万物。历曰:“天气下降,地气上升;天地交合,群物皆生。”至此,天清地爽,阴阳交合。再五千四百岁,正当寅会,生人,生兽,生禽,正谓天地人,三才定位。故曰,人生于寅。 + +感盘古开辟,三皇治世,五帝定伦,世界之间,遂分为四大部洲:曰东胜神洲,曰西牛贺洲,曰南赡部洲,曰北俱芦洲。这部书单表东胜神洲。海外有一国土,名曰傲来国。国近大海,海中有一座山,唤为花果山。此山乃十洲之祖脉,三岛之来龙,自开清浊而立,鸿蒙判后而成。真个好山!有词赋为证。赋曰: + +势镇汪洋,威宁瑶海。势镇汪洋,潮涌银山鱼入穴;威宁瑶海,波翻雪浪蜃(shèn)离渊。木火方隅高积上,东海之处耸崇巅。丹崖怪石,削壁奇峰。丹崖上,彩凤双鸣;削壁前,麒麟独卧。峰头时听锦鸡鸣,石窟每观龙出入。林中有寿鹿仙狐,树上有灵禽玄鹤。瑶草奇花不谢,青松翠柏长春。仙桃常结果,修竹每留云。一条涧壑藤萝密,四面原堤草色新。正是百川会处擎天柱,万劫无移大地根。 + +那座山,正当顶上,有一块仙石。其石有三丈六尺五寸高,有二丈四尺围圆。三丈六尺五寸高,按周天三百六十五度;二丈四尺围圆,按政历二十四气。上有九窍八孔,按九宫八卦。四面更无树木遮阴,左右倒有芝兰相衬。盖自开辟以来,每受天真地秀,日精月华,感之既久,遂有灵通之意。内育仙胞,一日迸裂,产一石卵,似圆球样大。因见风,化作一个石猴,五官俱备,四肢皆全。便就学爬学走,拜了四方。目运两道金光,射冲斗府。惊动高天上圣大慈仁者玉皇大天尊玄穹高上帝,驾座金阙云宫灵霄宝殿,聚集仙卿,见有金光焰焰,即命千里眼、顺风耳开南天门观看。二将果奉旨出门外,看的真,听的明。须臾回报道:“臣奉旨观听金光之处,乃东胜神洲海东傲来小国之界,有一座花果山,山上有一仙石,石产一卵,见风化一石猴,在那里拜四方,眼运金光,射冲斗府。如今服饵水食,金光将潜息矣。”玉帝垂赐恩慈曰:“下方之物,乃天地精华所生,不足为异。” + +那猴在山中,却会行走跳跃,食草木,饮涧泉,采山花,觅树果;与狼虫为伴,虎豹为群,獐鹿为友,猕猿为亲;夜宿石崖之下,朝游峰洞之中。真是“山中无甲子,寒尽不知年。” + +一朝天气炎热,与群猴避暑,都在松阴之下顽耍。你看他一个个: + +跳树攀枝,采花觅果;抛弹子,邷么儿(以磨光的碎瓦片或小石子为玩具的儿童游戏。有些地方称为“抓子儿”。邷,wá) +;跑沙窝,砌宝塔;赶蜻蜓,扑  蜡;参老天,拜菩萨;扯葛藤,编草帓;捉虱子,咬又掐;理毛衣,剔指甲;挨的挨,擦的擦;推的推,压的压;扯的扯,拉的拉,青松林下任他顽,绿水涧边随洗濯(zhuó)。 + +一群猴子耍了一会,却去那山涧中洗澡。见那股涧水奔流,真个似滚瓜涌溅。古云:“禽有禽言,兽有兽语。”众猴都道:“这股水不知是那里的水。我们今日赶闲无事,顺涧边往上溜头寻看源流,耍子去耶!”喊一声,都拖男挈女,呼弟呼兄,一齐跑来,顺涧爬山,直至源流之处,乃是一股瀑布飞泉。但见那: + +一派白虹起,千寻雪浪飞;海风吹不断,江月照还依。 +冷气分青嶂,馀流润翠微;潺湲(chán yuán,水慢慢流动的样子)名瀑布,真似挂帘帷。 + +众猴拍手称扬道:“好水!好水!原来此处远通山脚之下,直接大海之波。”又道:“那一个有本事的,钻进去寻个源头出来,不伤身体者,我等即拜他为王。”连呼了三声,忽见丛杂中跳出一名石猴,应声高叫道:“我进去!我进去!”好猴!也是他: + +今日芳名显,时来大运通;有缘居此地,王遣入仙宫。 + +你看他瞑(míng)目蹲身,将身一纵,径跳入瀑布泉中,忽睁睛抬头观看,那里边却无水无波,明明朗朗的一架桥梁。他住了身,定了神,仔细再看,原来是座铁板桥。桥下之水,冲贯于石窍之间,倒挂流出去,遮闭了桥门。却又欠身上桥头,再走再看,却似有人家住处一般,真个好所在。但见那: + +翠藓堆蓝,白云浮玉,光摇片片烟霞。虚窗静室,滑凳板生花。乳窟龙珠倚挂,萦回满地奇葩。锅灶傍崖存火迹,樽罍(zūn +léi)靠案见肴渣。石座石床真可爱,石盆石碗更堪夸。又见那一竿两竿修竹,三点五点梅花。几树青松常带雨,浑然像个人家。 + +看罢多时,跳过桥中间,左右观看,只见正当中有一石碣。碣上有一行楷书大字,镌着“花果山福地,水帘洞洞天。”石猴喜不自胜,急抽身往外便走,复瞑目蹲身,跳出水外,打了两个呵呵道:“大造化!大造化!”众猴把他围住,问道:“里面怎么样?水有多深?”石猴道:“没水!没水!原来是一座铁板桥。桥那边是一座天造地设的家当。”众猴道:“怎见得是个家当?”石猴笑道:“这股水乃是桥下冲贯石桥,倒挂下来遮闭门户的。桥边有花有树,乃是一座石房。房内有石窝、石灶、石碗、石盆、石床、石凳。中间一块石碣上,镌着‘花果山福地,水帘洞洞天。’真个是我们安身之处。里面且是宽阔,容得千百口老小。我们都进去住也,省得受老天之气。这里边: + +刮风有处躲,下雨好存身。霜雪全无惧,雷声永不闻。 +烟霞常照耀,祥瑞每蒸熏。松竹年年秀,奇花日日新。” + +众猴听得,个个欢喜。都道:“你还先走,带我们进去,进去!”石猴却又瞑目蹲身,往里一跳,叫道:“都随我进来!进来!”那些猴有胆大的,都跳进去了;胆小的,一个个伸头缩颈,抓耳挠腮,大声叫喊,缠一会,也都进去了。跳过桥头,一个个抢盆夺碗,占灶争床,搬过来,移过去,正是猴性顽劣,再无一个宁时,只搬得力倦神疲方止。石猴端坐上面道:“列位呵,‘人而无信,不知其可’。你们才说有本事进得来,出得去,不伤身体者,就拜他为王。我如今进来又出去,出去又进来,寻了这一个洞天与列位安眠稳睡,各享成家之福,何不拜我为王?”众猴听说,即拱伏无违。一个个序齿(以年龄为顺序)排班,朝上礼拜,都称“千岁大王”。自此,石猴高登王位,将“石”字儿隐了,遂称美猴王。有诗为证,诗曰: + +三阳交泰产群生,仙石胞含日月精。 +借卵化猴完大道,假他名姓配丹成。 +内观不识因无相,外合明知作有形。 +历代人人皆属此,称王称圣任纵横。 + +美猴王领一群猿猴、猕猴、马猴等,分派了君臣佐使,朝游花果山,暮宿水帘洞,合契同情,不入飞鸟之丛,不从走兽之类,独自为王,不胜欢乐。是以: + +春采百花为饮食,夏寻诸果作生涯。 +秋收芋栗延时节,冬觅黄精度岁华。 + +美猴王享乐天真,何期有三五百载。一日,与群猴喜宴之间,忽然忧恼,堕下泪来。众猴慌忙罗拜道:“大王何为烦恼?”猴王道:“我虽在欢喜之时,却有一点儿远虑,故此烦恼。”众猴又笑道:“大王好不知足!我等日日欢会,在仙山福地,古洞神州,不伏麒麟辖,不伏凤凰管,又不伏人间王位所拘束,自由自在,乃无量之福,为何远虑而忧也?”猴王道:“今日虽不归人王法律,不惧禽兽威服,将来年老血衰,暗中有阎王老子管着,一旦身亡,可不枉生世界之中,不得久住天人之内?”众猴闻此言,一个个掩面悲啼,俱以无常为虑。 + +只见那班部中,忽跳出一个通背猿猴,厉声高叫道:“大王若是这般远虑,真所谓道心开发也!如今五虫之内,惟有三等名色,不伏阎王老子所管。”猴王道:“你知那三等人?”猿猴道:“乃是佛与仙与神圣三者,躲过轮回,不生不灭,与天地山川齐寿。”猴王道:“此三者居于何所?”猿猴道:“他只在阎浮世界之中,古洞仙山之内。”猴王闻之,满心欢喜,道:“我明日就辞汝等下山,云游海角,远涉天涯,务必访此三者,学一个不老长生,常躲过阎君之难。”噫!这句话,顿教跳出轮回网,致使齐天大圣成。众猴鼓掌称扬,都道:“善哉!善哉!我等明日越岭登山,广寻些果品,大设筵宴送大王也。” + +次日,众猴果去采仙桃,摘异果,刨山药,劚黄精,芝兰香蕙,瑶草奇花,般般件件,整整齐齐,摆开石凳石桌,排列仙酒仙肴。但见那: + +金丸珠弹,红绽黄肥。金丸珠弹腊樱桃,色真甘美;红绽黄肥熟梅子,味果香酸。鲜龙眼,肉甜皮薄;火荔枝,核小囊红。林檎碧实连枝献,枇杷缃苞带叶擎。兔头梨子鸡心枣,消渴除烦更解酲(chéng)。香桃烂杏,美甘甘似玉液琼浆;脆李杨梅,酸荫荫如脂酥膏酪。红囊黑子熟西瓜,四瓣黄皮大柿子。石榴裂破,丹砂粒现火晶珠;芋栗剖开,坚硬肉团金玛瑙。胡桃银杏可传茶,椰子葡萄能做酒。榛松榧奈满盘盛,橘蔗柑橙盈案摆。熟煨山药,烂煮黄精。捣碎茯苓并薏苡(yì yǐ),石锅微火漫炊羹。人间纵有珍馐味,怎比山猴乐更宁? + +群猴尊美猴王上坐,各依齿肩排于下边,一个个轮流上前,奉酒,奉花,奉果,痛饮了一日。 + +次日,美猴王早起,教:“小的们,替我折些枯松,编作筏子,取个竹竿作篙,收拾些果品之类,我将去也。”果独自登筏,尽力撑开,飘飘荡荡,径向大海波中,趁天风,来渡南赡部洲地界。这一去,正是那: + +天产仙猴道行隆,离山驾筏趁天风。 +飘洋过海寻仙道,立志潜心建大功。 +有分有缘休俗愿,无忧无虑会元龙。 +料应必遇知音者,说破源流万法通。 + +也是他运至时来,自登木筏之后,连日东南风紧,将他送到西北岸前,乃是南赡部洲地界。持篙试水,偶得浅水,弃了筏子,跳上岸来,只见海边有人捕鱼、打雁、挖蛤、淘盐。他走近前,弄个把戏,妆个【上左“齿”右“可”,下“女”】虎(做出一种吓人的怪样子) +,吓得那些人丢筐弃网,四散奔跑。将那跑不动的拿住一个,剥了他衣裳,也学人穿在身上,摇摇摆摆,穿州过府,在市廛(chán)中,学人礼,学人话。朝餐夜宿,一心里访问佛仙神圣之道,觅个长生不老之方。见世人都是为名为利之徒,更无一个为身命者。正是那: + +争名夺利几时休?早起迟眠不自由! +骑着驴骡思骏马,官居宰相望王侯。 +只愁衣食耽劳碌,何怕阎君就取勾? +继子荫孙图富贵,更无一个肯回头! + +猴王参访仙道,无缘得遇。在于南赡部洲,串长城,游小县,不觉八九年余。忽行至西洋大海,他想着海外必有神仙。独自个依前作筏,又飘过西海,直至西牛贺洲地界。登岸遍访多时,忽见一座高山秀丽,林麓(lù)幽深。他也不怕狼虫,不惧虎豹,登山顶上观看。果是好山: + +千峰排戟,万仞开屏。日映岚光轻锁翠,雨收黛色冷含青。枯藤缠老树,古渡界幽程。奇花瑞草,修竹乔松。修竹乔松,万载常青欺福地;奇花瑞草,四时不谢赛蓬瀛(péng yíng,蓬莱山和瀛洲,相传为仙人所居之处。亦泛指仙境)。幽鸟啼声近,源泉响溜清。重重谷壑芝兰绕,处处巉(chán)崖苔藓生。起伏峦头龙脉好,必有高人隐姓名。 + +正观看间,忽闻得林深之处,有人言语,急忙趋步,穿入林中,侧耳而听,原来是歌唱之声。歌曰: + +“观棋柯烂,伐木丁丁,云边谷口徐行,卖薪沽酒,狂笑自陶情。苍迳秋高,对月枕松根,一觉天明。认旧林,登崖过岭,持斧断枯藤。 + +“观棋柯烂,伐木丁丁,云边谷口徐行。卖薪沽酒(买酒。沽,gū),狂笑自陶情。苍径秋高对月,枕松根,一觉天明。认旧林,登崖过岭,持斧断枯藤。收来成一担,行歌市上,易米三升。更无些子争竞,时价平平。不会机谋巧算,没荣辱,恬淡延生。相逢处,非仙即道,静坐讲《黄庭》。” + +美猴王听得此言,满心欢喜道:“神仙原来藏在这里!”急忙跳入里面,仔细再看,乃是一个樵子,在那里举斧砍柴。但看他打扮非常: + +头上戴箬笠(ruò lì,用箬竹叶及篾编成的宽边帽),乃是新笋初脱之箨(tuò)。身上穿布衣,乃是木绵拈就之纱。腰间系环绦,乃是老蚕口吐之丝。足下踏草履(lǚ),乃是枯莎槎就之爽。手执衠钢斧,担挽火麻绳。扳松劈枯树,争似此樵能 + +猴王近前叫道:“老神仙!弟子起手。”那樵汉慌忙丢了斧,转身答礼道:“不当人!不当人!我拙汉衣食不全,怎敢当‘神仙’二字?”猴王道:“你不是神仙,如何说出神仙的话来?”樵夫道:“我说什么神仙话?”猴王道:“我才来至林边,只听的你说:‘相逢处,非仙即道,静坐讲《黄庭》。’《黄庭》乃道德真言,非神仙而何?”樵夫笑道:“实不瞒你说,这个词名做《满庭芳》,乃一神仙教我的。那神仙与我舍下相邻。他见我家事劳苦,日常烦恼,教我遇烦恼时,即把这词儿念念,一则散心,二则解困。我才有些不足处思虑,故此念念。不期被你听了。”猴王道:“你家既与神仙相邻,何不从他修行?学得个不老之方,却不是好?”樵夫道:“我一生命苦:自幼蒙父母养育至八九岁,才知人事,不幸父丧,母亲居孀(shuāng)。再无兄弟姊妹,只我一人,没奈何,早晚侍奉。如今母老,一发不敢抛离。却又田园荒芜,衣食不足,只得斫(zhuó)两束柴薪,挑向市廛之间,货几文钱,籴几升米,自炊自造,安排些茶饭,供养老母,所以不能修行。”猴王道:“据你说起来,乃是一个行孝的君子,向后必有好处。但望你指与我那神仙住处,却好拜访去也。”樵夫道:“不远,不远。此山叫做灵台方寸山。山中有座斜月三星洞。那洞中有一个神仙,称名须菩提祖师。那祖师出去的徒弟,也不计其数,见今还有三四十人从他修行。你顺那条小路儿,向南行七八里远近,即是他家了。”猴王用手扯住樵夫道:“老兄,你便同我去去。若还得了好处,决不忘你指引之恩。”樵夫道:“你这汉子,甚不通变。我方才这般与你说了,你还不省?假若我与你去了,却不误了我的生意?老母何人奉养?我要斫柴,你自去,自去!” + +猴王听说,只得相辞。出深林,找上路径,过一山坡,约有七八里远,果然望见一座洞府。挺身观看,真好去处!但见: + +烟霞散彩,日月摇光。千株老柏,万节修篁(修竹,长竹子。篁,huáng)。千株老柏,带雨半空青冉冉;万节修篁,含烟一壑色苍苍。门外奇花布锦,桥边瑶草喷香。石崖突兀青苔润,悬壁高张翠藓长。时闻仙鹤唳(lì),每见凤凰翔。仙鹤唳时,声振九皋霄汉远;凤凰翔起,翎毛五色彩云光。玄猿白鹿随隐见,金狮玉象任行藏。细观灵福地,真个赛天堂! + +又见那洞门紧闭,静悄悄杳无人迹。忽回头,见崖头立一石碑,约有三丈余高,八尺余阔,上有一行十个大字,乃是“灵台方寸山,斜月三星洞”。美猴王十分欢喜道:“此间人果是朴实。果有此山此洞。”看勾多时,不敢敲门。且去跳上松枝梢头,摘松子吃了顽耍。 + +少顷间,只听得呀的一声,洞门开处,里面走出一个仙童,真个丰姿英伟,像貌清奇,比寻常俗子不同。但见他: + +髽髻(即抓髻)双丝绾,宽袍两袖风。貌和身自别,心与相俱空。 +物外长年客,山中永寿童。一尘全不染,甲子任翻腾。 + +那童子出得门来,高叫道:“甚么人在此搔扰?”猴王扑的跳下树来,上前躬身道:“仙童,我是个访道学仙之弟子,更不敢在此搔扰。”仙童笑道:“你是个访道的么?”猴王道:“是。”童子道:“我家师父,正才下榻,登坛讲道。还未说出原由,就教我出来开门。说:‘外面有个修行的来了,可去接待接待。’想必就是你了?”猴王笑道:“是我,是我。”童子道:“你跟我进来。” + +这猴王整衣端肃,随童子径入洞天深处观看:一层层深阁琼楼,一进进珠宫贝阙,说不尽那静室幽居,直至瑶台之下。见那菩提祖师端坐在台上,两边有三十个小仙侍立台下。果然是: + +大觉金仙没垢姿,西方妙相祖菩提; +不生不灭三三行,全气全神万万慈。 +空寂自然随变化,真如本性任为之; +与天同寿庄严体,历劫明心大法师。 + +美猴王一见,倒身下拜,磕头不计其数,口中只道:“师父!师父!我弟子志心朝礼!志心朝礼!”祖师道:“你是那方人氏?且说个乡贯姓名明白,再拜。”猴王道:“弟子东胜神洲傲来国花果山水帘洞人氏。”祖师喝令:“赶出去!他本是个撒诈捣虚之徒,那里修甚么道果!”猴王慌忙磕头不住道:“弟子是老实之言,决无虚诈。”祖师道:“你既老实,怎么说东胜神洲?那去处到我这里,隔两重大海,一座南赡部洲,如何就得到此?”猴王叩头道:“弟子飘洋过海,登界游方,有十数个年头,方才访到此处。” + +祖师道:“既是逐渐行来的也罢。你姓甚么?”猴王又道:“我无性。人若骂我,我也不恼;若打我,我也不嗔,只是陪个礼儿就罢了。一生无性。”祖师道:“不是这个性。你父母原来姓甚么?”猴王道:“我也无父母。”祖师道:“既无父母,想是树上生的?”猴王道:“我虽不是树生,却是石里长的。我只记得花果山上有一块仙石,其年石破,我便生也。”祖师闻言,暗喜道:“这等说,却是天地生成的。你起来走走我看。”猴王纵身跳起,拐呀拐的走了两遍。祖师笑道:“你身躯虽是鄙陋,却像个食松果的猢狲。我与你就身上取个姓氏,意思教你姓‘猢’。猢字去了个兽傍,乃是古月。古者,老也;月者,阴也。老阴不能化育,教你姓‘狲’倒好。狲字去了兽傍,乃是个子系。子者,儿男也;系者,婴细也。正合婴儿之本论。教你姓‘孙’罢。”猴王听说,满心欢喜,朝上叩头道:“好!好!好!今日方知姓也。万望师父慈悲!既然有姓,再乞赐个名字,却好呼唤。”祖师道:“我门中有十二个字,分派起名到你乃第十辈之小徒矣。”猴王道:“那十二个字?”祖师道:“乃广、大、智、慧、真、如、性、海、颖、悟、圆、觉十二字。排到你,正当‘悟’字。与你起个法名叫做‘孙悟空’好么?”猴王笑道:“好!好!好!自今就叫做孙悟空也!”正是: + +鸿蒙初辟原无姓,打破顽空须悟空。 + +毕竟不之向后修些甚么道果,且听下回分解。 diff --git a/tests/unit/builder/data/long_text_2.txt b/tests/unit/builder/data/long_text_2.txt new file mode 100644 index 00000000..397834c3 --- /dev/null +++ b/tests/unit/builder/data/long_text_2.txt @@ -0,0 +1,93 @@ +第二回 悟彻菩提真妙理 断魔归本合元神 +当前位置: +主页 +西游记 +神话表美猴王得了姓名,怡然踊跃,对菩提前作礼启谢。那祖师即命大众引孙悟空出二门外,教他洒扫应对,进退周旋之节。众仙奉行而出。悟空到门外,又拜了大众师兄,就于廊庑(láng wǔ)之间,安排寝处。次早,与众师兄学言语礼貌,讲经论道,习字焚香,每日如此。闲时即扫地锄园,养花修树,寻柴燃火,挑水运浆。凡所用之物,无一不备。在洞中不觉倏六七年。一日,祖师登坛高坐,唤集诸仙,开讲大道。真个是: + +天花乱坠,地涌金莲。妙演三乘(佛教术语。三乘指小乘、中乘和大乘)教,精微万法全。 +慢摇麈尾(古人闲谈时执以驱虫、掸尘的一种工具。麈,zhǔ)喷珠玉,响振雷霆动九天。 +说一会道,讲一会禅,三家配合本如然。 +开明一字皈诚理,指引无生了性玄。 + +孙悟空在旁闻讲,喜得他抓耳挠腮,眉花眼笑。忍不住手之舞之,足之蹈之。忽被祖师看见,叫孙悟空道:“你在班中,怎么颠狂跃舞,不听我讲?”悟空道:“弟子诚心听讲,听到老师父妙音处,喜不自胜,故不觉作此踊跃之状。望师父恕罪!”祖师道:“你既识妙音,我且问你,你到洞中多少时了?”悟空道:“弟子本来懵懂(头脑不清楚或不能明辨事物。懵,měng),不知多少时节。只记得灶下无火,常去山后打柴,见一山好桃树,我在那里吃了七次饱桃矣。”祖师道:“那山唤名烂桃山。你既吃七次,想是七年了。你今要从我学些什么道?”悟空道:“但凭尊师教诲,只是有些道气儿,弟子便就学了。” + +祖师道:“‘道’字门中有三百六十傍门(bàng mén,道教术语。道教以修炼金丹、全身保真为正道,余皆为“傍门”,不能得正果),傍门皆有正果。不知你学那一门哩?”悟空道:“凭尊师意思。弟子倾心听从。”祖师道:“我教你个‘术’字门中之道,如何?”悟空道:“术门之道怎么说?”祖师道:“术字门中,乃是些请仙扶鸾(两人执丁字木笔在沙盘上写字叫扶鸾,是一种迷信求神以问吉凶的方法。鸾,luán),问卜揲蓍(shéshī,古代问卜的一种方式),能知趋吉避凶之理。”悟空道:“似这般可得长生么?”祖师道:“不能!不能!”悟空道:“不学!不学!” + +祖师道:“教你‘静’字门中之道,如何?”悟空道:“静字门中,是甚正果?”祖师道:“此是休粮(即“避谷”,停食谷物。道家的修炼方法之一)守谷,清静无为,参禅打坐,戒语持斋,或睡功,或立功,并入定坐关之类。”悟空道:“这般也能长生么?”祖师道:“也似‘窑头土坯’。”悟空笑道:“师父果有些滴。一行说我不会打市语。怎么谓之‘窑头土坯’?”祖师道:“就如那窑头上,造成砖瓦之坯,虽已成形,尚未经水火煅炼,一朝大雨滂沱,他必滥矣。”悟空道:“也不长远。不学!不学!” + +祖师道:“教你‘动’字门中之道,如何?”悟空道:“动门之道,却又怎样?”祖师道:“此是有为有作,采阴补阳,攀弓踏弩,摩脐过气,用方炮制,烧茅打鼎,进红铅,炼秋石,并服妇乳之类。”悟空道:“似这等也得长生么?”祖师道:“此欲长生,亦如‘水中捞月’。”悟空道:“师父又来了!怎么叫做‘水中捞月’?”祖师道:“月在长空,水中有影,虽然看见,只是无捞摸处,到底只成空耳。”悟空道:“也不学!不学!” + +祖师闻言,咄的一声,跳下高台,手持戒尺,指定悟空道:“你这猢狲,这般不学,那般不学,却待怎么?”走上前,将悟空头上打了三下,倒背着手,走入里面,将中门关了,撇下大众而去。唬得那一班听讲的,人人惊惧,皆怨悟空道:“你这泼猴,十分无状(没有礼貌)!师父传你道法,如何不学,却与师父顶嘴?这番冲撞了他,不知几时才出来呵!”此时俱甚报怨他,又鄙贱嫌恶他。悟空一些儿也不恼,只是满脸陪笑。原来那猴王,已打破盘中之谜,暗暗在心,所以不与众人争竞,只是忍耐无言。祖师打他三下者,教他三更时分存心;倒背着手,走入里面,将中门关上者,教他从后门进步,秘处传他道也。 + +当日悟空与众等,喜喜欢欢,在三星仙洞之前,盼望天色,急不能到晚。及黄昏时,却与众就寝,假合眼,定息存神。山中又没支更传箭(报告时间。古用铜壶滴漏计时,看水平面箭上的刻度,即知时刻),不知时分,只自家将鼻孔中出入之气调定。约到子时前后,轻轻的起来,穿了衣服,偷开前门,躲离大众,走出外,抬头观看。正是那: + +月明清露冷,八极迥无尘。深树幽禽宿,源头水溜汾。 +飞萤光散影,过雁字排云。正直三更候,应该访道真。 + +你看他从旧路径至后门外,只见那门儿半开半掩。悟空喜道:“老师父果然注意与我传道,故此开着门也。”即曳步近前,侧身进得门里,只走到祖师寝榻之下。见祖师蜷局身躯,朝里睡着了。悟空不敢惊动,即跪在榻前。那祖师不多时觉来,舒开两足,口中自吟道: + +“难!难!难!道最玄,莫把金丹作等闲。 +不遇至人传妙诀,空言口困舌头干!” + +悟空应声叫道:“师父,弟子在此跪候多时。”祖师闻得声音是悟空,即起披衣,盘坐喝道:“这猢狲!你不在前边去睡,却来我这后边作甚?”悟空道:“师父昨日坛前对众相允,教弟子三更时候,从后门里传我道理,故此大胆径拜老爷榻下。”祖师听说,十分欢喜,暗自寻思道:“这厮(古时对男子的称呼)果然是个天地生成的!不然,何就打破我盘中之暗谜也?”悟空道:“此间更无六耳,止只弟子一人,望师父大舍慈悲,传与我长生之道罢,永不忘恩!”祖师道:“你今有缘,我亦喜说。既识得盘中暗谜,你近前来,仔细听之,当传与你长生之妙道也。”悟空叩头谢了,洗耳用心,跪于榻下。祖师云: + +“显密圆通真妙诀,惜修生命无他说。 +都来总是精气神,谨固牢藏休漏泄。 +休漏泄,体中藏,汝受吾传道自昌。 +口诀记来多有益,屏除邪欲得清凉。 +得清凉,光皎洁,好向丹台赏明月。 +月藏玉兔日藏乌,自有龟蛇相盘结。 +相盘结,性命坚,却能火里种金莲。 +攒簇五行颠倒用,功完随作佛和仙。” + +此时说破根源,悟空心灵福至,切切记了口诀,对祖师拜谢深恩,即出后门观看。但见东方天色微舒白,西路金光大显明。依旧路,转到前门,轻轻的推开进去,坐在原寝之处,故将床铺摇响道:“天光了!天光了!起耶!”那大众还正睡哩,不知悟空已得了好事。当日起来打混,暗暗维持,子前午后,自己调息。 + +却早过了三年,祖师复登宝座,与众说法。谈的是公案比语,论的是外像(佛教术语。指显露、表现在外表上的善恶美丑和言语行动)包皮。忽问:“悟空何在?”悟空近前跪下:“弟子有。”祖师道:“你这一向修些什么道来?”悟空道:“弟子近来法性颇通,根源亦渐坚固矣。”祖师道:“你既通法性,会得根源,已注神体,却只是防备着‘三灾利害’。”悟空听说,沉吟良久道:“师父之言谬矣。我尝闻道高德隆,与天同寿;水火既济,百病不生,却怎么有个‘三灾利害’?”祖师道:“此乃非常之道:夺天地之造化,侵日月之玄机;丹成之后,鬼神难容。虽驻颜益寿,但到了五百年后,天降雷灾打你,须要见性明心,预先躲避。躲得过,寿与天齐;躲不过,就此绝命。再五百年后,天降火灾烧你。这火不是天火,亦不是凡火,唤做‘阴火’。自本身涌泉穴(指足心)下烧起,直透泥垣宫,五脏成灰,四肢皆朽,把千年苦行,俱为虚幻。再五百年,又降风灾吹你。这风不是东南西北风,不是和熏金朔风,亦不是花柳松竹风,唤做‘赑风’。自囟门中吹入六腑,过丹田,穿九窍,骨肉消疏,其身自解。所以都要躲过。” + +悟空闻说,毛骨悚然(毛发竖起,脊梁骨发冷。形容恐惧惊骇的样子。悚,sǒng),叩头礼拜道:“万望老爷垂悯,传与躲避三灾之法,到底不敢忘恩。”祖师道:“此亦无难,只是你比他人不同,故传不得。”悟空道:“我也头圆顶天,足方履地,一般有九窍四肢,五脏六腑,何以比人不同?”祖师道:“你虽然像人,却比人少腮。”原来那猴子孤拐面,凹脸尖嘴。悟空伸手一摸,笑道:“师父没成算!我虽少腮,却比人多这个嗉袋(指猿猴类、啮齿类的嗉囊。嗉,sù),亦可准折过也。”祖师说:“也罢,你要学那一般?有一般天罡(gāng)数,该三十六般变化;有一般地煞数,该七十二般变化。”悟空道:“弟子愿多里捞摸,学一个地煞变化罢。”祖师道:“既如此,上前来,传与你口诀。”遂附耳低言,不知说了些什么妙法。这猴王也是他一窍通时百窍通,当时习了口诀,自修自炼,将七十二般变化,都学成了。 + +忽一日,祖师与众门人在三星洞前戏玩晚景。祖师道:“悟空,事成了未曾?”悟空道:“多蒙师父海恩,弟子功果完备,已能霞举飞升也。”祖师道:“你试飞举我看。” + +悟空弄本事,将身一耸,打了个连扯跟头,跳离地有五六丈,踏云霞去勾有顿饭功夫,返复不上三里远近,落在面前,叉手道:“师父,这就是飞举腾云了。”祖师笑道:“这个算不得腾云,只算得爬云而已。自古道:‘神仙朝游北海暮苍梧。’似你这半日,去不上三里,即爬云也还算不得哩!”悟空道:“怎么为‘朝游北海暮苍梧’?”祖师道:“凡腾云之辈,早辰起自北海,游过东海、西海、南海、复转苍梧,苍梧者却是北海零陵之语话也。将四海之外,一日都游遍,方算得腾云。”悟空道:“这个却难!却难!”祖师道:“世上无难事,只怕有心人。”悟空闻得此言,叩头礼拜,启道:“师父,‘为人须为彻’,索性舍个大慈悲,将此腾云之法,一发传与我罢,决不敢忘恩。”祖师道:“凡诸仙腾云,皆跌足而起,你却不是这般。我才见你去,连扯方才跳上。我今只就你这个势,传你个‘筋斗云’罢。”悟空又礼拜恳求,祖师却又传个口诀道:“这朵云,捻着诀,念动真言,攒紧了拳,对身一抖,跳将起来,一筋斗就有十万八千里路哩!”大众听说,一个个嘻嘻笑道:“悟空造化!若会这个法儿,与人家当铺兵,送文书,递报单,不管那里都寻了饭吃!”师徒们天昏各归洞府。这一夜,悟空即运神炼法,会了筋斗云。逐日家无拘无束,自在逍遥此一长生之美。 + +一日,春归夏至,大众都在松树下会讲多时。大众曰:“悟空,你是那世修来的缘法?前日师父拊耳低言,传与你的躲三灾变化之法,可都会么?”悟空笑道:“不瞒诸兄长说,一则是师父传授,二来也是我昼夜殷勤,那几般儿都会了。”大众道:“趁此良时,你试演演,让我等看看。”悟空闻说,抖搜精神,卖弄手段道:“众师兄请出个题目。要我变化甚么?”大众道:“就变棵松树罢。”悟空捻着诀,念动咒语,摇身一变,就变做一棵松树。真个是: + +郁郁含烟贯四时,凌云直上秀贞姿。 +全无一点妖猴像,尽是经霜耐雪枝。 + +大众见了,鼓掌呀呀大笑。都道:“好猴儿!好猴儿!”不觉的嚷闹,惊动了祖师。 + +祖师急拽杖出门来问道:“是何人在此喧哗?”大众闻呼,慌忙检束,整衣向前。悟空也现了本相,杂在丛中道:“启上尊师,我等在此会讲,更无外姓喧哗。”祖师怒喝道:“你等大呼小叫,全不像个修行的体段!修行的人,口开神气散,舌动是非生。如何在此嚷笑?”大众道:“不敢瞒师父,适才孙悟空演变化耍子。教他变棵松树,果然是棵松树,弟子们俱称扬喝采,故高声惊冒尊师,望乞恕罪。”祖师道:“你等起去。”叫:“悟空,过来!我问你弄甚么精神,变甚么松树?这个工夫,可好在人前卖弄?假如你见别人有,不要求他?别人见你有,必然求你。你若畏祸,却要传他;若不传他,必然加害:你之性命又不可保。”悟空叩道:“只望师父恕罪!”祖师道:“我也不罪你,但只是你去吧。”悟空闻此言,满眼堕泪道:“师父教我往那里去?”祖师道:“你从那里来,便从那里去就是了。”悟空顿然醒悟道:“我自东胜神洲傲来国花果山水帘洞来的。”祖师道:“你快回去,全你性命,若在此间,断然不可!”悟空领罪,“上告尊师,我也离家有二十年矣,虽是回顾旧日儿孙,但念师父厚恩未报,不敢去。”祖师道:“那里甚么恩义?你只是不惹祸不牵带我就罢了!” + +悟空见没奈何,只得拜辞,与众相别。祖师道:“你这去,定生不良。凭你怎么惹祸行凶,却不许说是我的徒弟。你说出半个字来,我就知之,把你这猢狲剥皮锉骨,将神魂贬在九幽之处,教你万劫不得翻身!”悟空道:“决不敢提起师父一字,只说是我自家会的便罢。” + +悟空谢了。即抽身,捻着诀,丢个连扯,纵起筋斗云,径回东海。那里消一个时辰,早看见花果山水帘洞。美猴王自知快乐,暗暗的自称道: + +“去时凡骨凡胎重,得道身轻体亦轻。 +举世无人肯立志,立志修玄玄自明。 +当时过海波难进,今日来回甚易行。 +别语叮咛还在耳,何期顷刻见东溟。” + +悟空按下云头,直至花果山。找路而走,忽听得鹤唳猿啼,鹤唳声冲霄汉外,猿啼悲切甚伤情。即开口叫道:“孩儿们,我来了也!”那崖下石坎边,花草中,树木里,若大若小之猴,跳出千千万万,把个美猴王围在当中,叩头叫道:“大王,你好宽心!怎么一去许久?把我们俱闪在这里,望你诚如饥渴!近来被一妖魔在此欺虐,强要占我们水帘洞府,是我等舍死忘生,与他争斗。这些时,被那厮抢了我们家火,捉了许多子侄,教我们昼夜无眠,看守家业。幸得大王来了!大王若再年载不来,我等连山洞尽属他人矣!” + +悟空闻说,心中大怒道:“是什么妖魔,辄(zhé)敢无状!你且细细说来,待我寻他报仇。”众猴叩头:“告上大王,那厮自称混世魔王,住居在直北下。”悟空道:“此间到他那里,有多少路程?”众猴道:“他来时云,去时雾,或风或雨,或电或雷,我等不知有多少路。”悟空道:“既如此,你们休怕,且自顽耍,等我寻他去来!” + +好猴王,将身一纵,跳起去,一路筋斗,直至北下观看,见一座高山,真是十分险峻。好山: + +笔峰挺立,曲涧深沉。笔峰挺立透空霄,曲涧深沉通地户。两崖花木争奇,几处松篁斗翠。左边龙,熟熟驯驯;右边虎,平平伏伏。每见铁牛耕,常有金钱种。幽禽睍睆(xiàn huǎn,婉转的鸟鸣声)声,丹凤朝阳立。石磷磷,波净净,古怪跷蹊真恶狞。世上名山无数多,花开花谢蘩还众。争如此景永长存,八节四时浑不动。诚为三界(佛教术语。指众生轮回的欲界、色界和无色界)坎源山,滋养五行水脏洞! + +美猴王正默看景致,只听得有人言语。径自下山寻觅,原来那陡崖之前,乃是那水脏洞。洞门外有几个小妖跳舞,见了悟空就走。悟空道:“休走!借你口中言,传我心内事。我乃正南方花果山水帘洞洞主。你家甚么混世鸟魔,屡次欺我儿孙,我特寻来,要与他见个上下!” + +那小妖听说,疾忙跑入洞里,报道:“大王!祸事了!”魔王道:“有甚祸事?”小妖道:“洞外有猴头称为花果山水帘洞洞主。他说你屡次欺他儿孙,特来寻你,见个上下哩。”魔王笑道:“我常闻得那些猴精说他有个大王,出家修行去,想是今番来了。你们见他怎生打扮,有甚器械?”小妖道:“他也没甚么器械,光着个头,穿一领红色衣,勒一条黄绦,足下踏一对乌靴,不僧不俗,又不像道士神仙,赤手空拳,在门外叫哩。”魔王闻说:“取我批挂兵器来!”那小妖即时取出。那魔王穿了甲胄,绰刀在手,与众妖出得门来,即高声叫道:“那个是水帘洞洞主?”悟空急睁睛观看,只见那魔王: + +头戴乌金盔,映日光明;身挂皂罗袍,迎风飘荡。下穿着黑铁甲,紧勒皮条;足踏着花褶靴,雄如上将。腰广十围,身高三丈,手执一口刀,锋刃多明亮。称为混世魔,磊落凶模样。 + +猴王喝道:“这泼魔这般眼大,看不见老孙!”魔王见了,笑道:“你身不满四尺,年不过三旬,手内又无兵器,怎么大胆猖狂,要寻我见甚么上下?”悟空骂道:“你这泼魔,原来没眼!你量我小,要大却也不难。你量我无兵器,我两只手勾着天边月哩!你不要怕,只吃老孙一拳!”纵一纵,跳上去,劈脸就打。那魔王伸手架住道:“你这般矬矮,我这般高长,你要使拳,我要使刀,使刀就杀了你,也吃人笑,待我放下刀,与你使路拳看。”悟空道:“说得是。好汉子!走来!”那魔王丢开架子便打,这悟空钻进去相撞相迎。他两个拳捶脚踢,一冲一撞。原来长拳空大,短簇坚牢。那魔王被悟空掏短肋,撞了裆,几下筋节,把他打重了。他闪过,拿起那板大的钢刀,望悟空劈头就砍。悟空急撤身,他砍了一个空。悟空见他凶猛,即使身外身法,拔一把毫毛,丢在口中嚼碎,望空中喷去,叫一声“变!”,即变做三二百个小猴,周围攒簇。 + +原来人得仙体,出神变化,无方不知。这猴王自从了道之后,身上有八万四千毛羽,根根能变,应物随心。那些小猴,眼乖会跳,刀来砍不着,枪去不能伤。你看他前踊后跃,钻上去,把魔王围绕,抱的抱,扯的扯,钻裆的钻裆,扳脚的扳脚,踢打挦毛,抠眼睛,捻鼻子,抬鼓弄,直打做一个攒盘。这悟空才去夺得他的刀来,分开小猴,照顶门一下,砍为两段。领众杀进洞中,将那大小妖精,尽皆剿灭。却把毫毛一抖,收上身来。又见那收不上身者,却是那魔王在水帘洞中擒去的小猴,悟空道:“汝等何为到此?”约有三五十个,都含泪道:“我等因大王修仙去后,这两年被他争吵,把我们都摄将来,那不是我们洞中的家火?石盆、石碗都被这厮拿来也。”悟空道:“既是我们的家火,你们都搬出外去。”随即洞里放起火来,把那水脏洞烧得枯干,尽归了一体。对众道:“汝等跟我回去。”众猴道:“大王,我们来时,只听得耳边风声,虚飘飘到于此地,更不识路径,今怎得回乡?”悟空道:“这是他弄的个术法儿,有何难也!我如今一窍通,百窍通,我也会弄。你们都合了眼,休怕!” + +好猴王,念声咒语,驾阵狂风,云头落下。叫:“孩儿们,睁眼。”众猴脚屣实地,认得是家乡,个个欢喜,都奔洞门旧路。那在洞众猴,都一齐簇拥同入,分班齿序,礼拜猴王。安排酒果,接风贺喜,启问降魔救子之事。悟空备细言了一遍,众猴称扬不尽道:“大王去到那方,不意学得这般手段!”悟空又道:“我当年别汝等,随波逐流,飘过东洋大海,径至南赡部洲,学成人像,着此衣,穿此履,摆摆摇摇,云游八九年馀,更不曾有道;又渡西洋大海,到西牛贺洲地界,访问多时,幸遇一老祖,传了我与天同寿的真功果,不死长生的大法门。”众猴称贺。都道:“万劫难逢也!”悟空又笑道:“小的们,又喜我这一门皆有姓氏。”众猴道:“大王何姓?”悟空道:“我今姓孙,法名悟空。”众猴闻说,鼓掌忻然道:“大王是老孙,我们都是二孙、三孙、细孙、小孙、——一家孙、一国孙、一窝孙矣!”都来奉承老孙,大盆小碗的,椰子酒、葡萄酒、仙花、仙果,真个是合家欢乐!咦! + +贯通一姓身归本,只待荣迁仙录箓名。 + +毕竟不知怎生结果,居此界终始如何,且听下回分解。 diff --git a/tests/unit/builder/test_runner.py b/tests/unit/builder/test_runner.py index 5c19d262..9254d46c 100644 --- a/tests/unit/builder/test_runner.py +++ b/tests/unit/builder/test_runner.py @@ -1,5 +1,10 @@ # -*- coding: utf-8 -*- -from kag.builder.runner import CKPT, BuilderRunner +import os +from kag.common.conf import KAG_CONFIG +from kag.builder.runner import CKPT, BuilderChainRunner + +# pwd = os.path.dirname(__file__) +pwd = "./" def test_ckpt(): @@ -14,3 +19,29 @@ def test_ckpt(): assert ckpt.is_processed("aaaa") assert ckpt.is_processed("bbbb") assert ckpt.is_processed("cccc") + + +def test_chain_runner(): + runner_config = { + "reader": {"type": "dir", "file_pattern": ".*long_text.*"}, + "chain": { + "type": "unstructured", + "extractor": { + "type": "kag", + "llm": KAG_CONFIG.all_config["llm"], + }, + "parser": {"type": "txt"}, + "splitter": { + "type": "length", + "split_length": 200, + "window_length": 0, + }, + "vectorizer": KAG_CONFIG.all_config["vectorizer"], + "post_processor": {"type": "base"}, + "writer": {"type": "kg"}, + }, + "num_parallel": 2, + "chain_level_num_paralle": 8, + } + runner = BuilderChainRunner.from_config(runner_config) + runner.invoke(os.path.join(pwd, "data/"))