Skip to content

Commit

Permalink
split reader and parser
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhongshu123 committed Nov 19, 2024
1 parent cb7c8b1 commit a1fbdb6
Show file tree
Hide file tree
Showing 25 changed files with 840 additions and 424 deletions.
3 changes: 3 additions & 0 deletions kag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@
init_env()

import kag.interface

import kag.builder.component
import kag.builder.default_chain
import kag.builder.runner
import kag.builder.prompt
import kag.solver.prompt
import kag.common.vectorize_model
Expand Down
30 changes: 18 additions & 12 deletions kag/builder/component/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,22 @@
from kag.builder.component.mapping.relation_mapping import RelationMapping
from kag.builder.component.mapping.spo_mapping import SPOMapping
from kag.builder.component.reader.csv_reader import CSVReader
from kag.builder.component.reader.pdf_reader import PDFReader
from kag.builder.component.reader.json_reader import JSONReader
from kag.builder.component.reader.markdown_reader import MarkDownReader
from kag.builder.component.reader.docx_reader import DocxReader
from kag.builder.component.reader.txt_reader import TXTReader
from kag.builder.component.reader.yuque_reader import YuqueReader
from kag.builder.component.reader.dataset_reader import (
HotpotqaCorpusReader,
TwowikiCorpusReader,
MusiqueCorpusReader,
HotpotqaCorpusReader,
)
from kag.builder.component.reader.yuque_reader import YuqueReader
from kag.builder.component.reader.directory_reader import DirectoryReader


from kag.builder.component.record_parser.pdf_parser import PDFParser
from kag.builder.component.record_parser.markdown_parser import MarkDownParser
from kag.builder.component.record_parser.docx_parser import DocxParser
from kag.builder.component.record_parser.txt_parser import TXTParser
from kag.builder.component.record_parser.dict_parser import DictParser


from kag.builder.component.splitter.length_splitter import LengthSplitter
from kag.builder.component.splitter.pattern_splitter import PatternSplitter
from kag.builder.component.splitter.outline_splitter import OutlineSplitter
Expand All @@ -53,16 +58,17 @@
"SPGTypeMapping",
"RelationMapping",
"SPOMapping",
"TXTReader",
"PDFReader",
"MarkDownReader",
"TXTParser",
"PDFParser",
"MarkDownParser",
"DocxParser",
"DictParser",
"JSONReader",
"HotpotqaCorpusReader",
"MusiqueCorpusReader",
"TwowikiCorpusReader",
"DirectoryReader",
"YuqueReader",
"CSVReader",
"DocxReader",
"LengthSplitter",
"PatternSplitter",
"OutlineSplitter",
Expand Down
1 change: 1 addition & 0 deletions kag/builder/component/extractor/.#kag_extractor.py
73 changes: 7 additions & 66 deletions kag/builder/component/reader/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import os
from typing import List, Type, Dict
from typing import Dict, List

import pandas as pd

from kag.builder.model.chunk import Chunk
from kag.interface import SourceReaderABC
from knext.common.base.runnable import Input, Output

Expand All @@ -30,70 +27,14 @@ class CSVReader(SourceReaderABC):
**kwargs: Additional keyword arguments passed to the parent class constructor.
"""

def __init__(
self,
output_type: str = "Chunk",
id_col: str = "id",
name_col: str = "name",
content_col: str = "content",
):
if output_type.lower().strip() == "dict":
self.output_types = Dict[str, str]
else:
self.output_types = Chunk
self.id_col = id_col
self.name_col = name_col
self.content_col = content_col

@property
def input_types(self) -> Type[Input]:
def input_types(self) -> Input:
return str

@property
def output_types(self) -> Type[Output]:
return self._output_types

@output_types.setter
def output_types(self, output_types):
self._output_types = output_types

def invoke(self, input: Input, **kwargs) -> List[Output]:
"""
Reads a CSV file and converts the data format based on the output type.
Args:
input (Input): Input parameter, expected to be a string representing the path to the CSV file.
**kwargs: Additional keyword arguments, currently unused but kept for potential future expansion.
Returns:
List[Output]:
- If `output_types` is `Chunk`, returns a list of Chunk objects.
- If `output_types` is `Dict`, returns a list of dictionaries.
"""

try:
data = pd.read_csv(input)
data = data.astype(str)
except Exception as e:
raise IOError(f"Failed to read the file: {e}")
def output_types(self) -> Output:
return Dict

if self.output_types == Chunk:
chunks = []
basename, _ = os.path.splitext(os.path.basename(input))
for idx, row in enumerate(data.to_dict(orient="records")):
kwargs = {
k: v
for k, v in row.items()
if k not in [self.id_col, self.name_col, self.content_col]
}
chunks.append(
Chunk(
id=row.get(self.id_col)
or Chunk.generate_hash_id(f"{input}#{idx}"),
name=row.get(self.name_col) or f"{basename}#{idx}",
content=row[self.content_col],
**kwargs,
)
)
return chunks
else:
return data.to_dict(orient="records")
def load_data(self, input: Input, **kwargs) -> List[Output]:
data = pd.read_csv(input)
return data.to_dict(orient="records")
72 changes: 27 additions & 45 deletions kag/builder/component/reader/dataset_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

import json
import os
from typing import List, Type
from typing import List, Type, Dict


from kag.builder.model.chunk import Chunk
from kag.interface import SourceReaderABC
from knext.common.base.runnable import Input, Output

Expand All @@ -29,27 +29,25 @@ def input_types(self) -> Type[Input]:
@property
def output_types(self) -> Type[Output]:
"""The type of output this Runnable object produces specified as a type annotation."""
return Chunk
return Dict

def invoke(self, input: str, **kwargs) -> List[Output]:
def load_data(self, input: Input, **kwargs) -> List[Output]:
if os.path.exists(str(input)):
with open(input, "r") as f:
corpus = json.load(f)
else:
corpus = json.loads(input)
chunks = []

data = []
for item_key, item_value in corpus.items():
chunk = Chunk(
id=item_key,
name=item_key,
content="\n".join(item_value),
data.append(
{"id": item_key, "name": item_key, "content": "\n".join(item_value)}
)
chunks.append(chunk)
return chunks
return data


@SourceReaderABC.register("musique")
@SourceReaderABC.register("2wiki")
class MusiqueCorpusReader(SourceReaderABC):
@property
def input_types(self) -> Type[Input]:
Expand All @@ -59,42 +57,26 @@ def input_types(self) -> Type[Input]:
@property
def output_types(self) -> Type[Output]:
"""The type of output this Runnable object produces specified as a type annotation."""
return Chunk
return Dict

def get_basename(self, file_name: str):
base, ext = os.path.splitext(os.path.basename(file_name))
base, _ = os.path.splitext(os.path.basename(file_name))
return base

def invoke(self, input: str, **kwargs) -> List[Output]:
id_column = kwargs.get("id_column", "title")
name_column = kwargs.get("name_column", "title")
content_column = kwargs.get("content_column", "text")

if os.path.exists(str(input)):
with open(input, "r") as f:
corpusList = json.load(f)
else:
corpusList = input
chunks = []

for idx, item in enumerate(corpusList):
chunk = Chunk(
id=f"{item[id_column]}#{idx}",
name=item[name_column],
content=item[content_column],
def load_data(self, input: Input, **kwargs) -> List[Output]:

with open(input, "r") as f:
corpus = json.load(f)
data = []

for idx, item in enumerate(corpus):
title = item["title"]
content = item["text"]
data.append(
{
"id": f"{title}#{idx}",
"name": title,
"content": content,
}
)
chunks.append(chunk)
return chunks


@SourceReaderABC.register("2wiki")
class TwowikiCorpusReader(MusiqueCorpusReader):
@property
def input_types(self) -> Type[Input]:
"""The type of input this Runnable object accepts specified as a type annotation."""
return str

@property
def output_types(self) -> Type[Output]:
"""The type of output this Runnable object produces specified as a type annotation."""
return Chunk
return data
57 changes: 57 additions & 0 deletions kag/builder/component/reader/directory_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import os
import re
from typing import List

from kag.interface import SourceReaderABC

from knext.common.base.runnable import Input, Output


@SourceReaderABC.register("dir")
class DirectoryReader(SourceReaderABC):
def __init__(
self,
file_pattern: str = None,
file_suffix: str = None,
rank: int = 0,
world_size: int = 1,
):
super().__init__(rank, world_size)

if file_pattern is None:
if file_suffix:
file_pattern = f".*{file_suffix}$"
else:
file_pattern = r".*txt$"
self.file_pattern = re.compile(file_pattern)

@property
def input_types(self) -> Input:
return str

@property
def output_types(self) -> Output:
return str

def find_files_by_regex(self, directory):
matched_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if self.file_pattern.match(file):
file_path = os.path.join(root, file)
matched_files.append(file_path)
return matched_files

def load_data(self, input: Input, **kwargs) -> List[Output]:
return self.find_files_by_regex(input)
Loading

0 comments on commit a1fbdb6

Please sign in to comment.