From 280a98f3225c1f1011bd9e7c829019a9135ddc9b Mon Sep 17 00:00:00 2001 From: yanqiangmiffy <1185918903@qq.com> Date: Thu, 28 Nov 2024 15:35:09 +0800 Subject: [PATCH 1/6] =?UTF-8?q?demo@update:=E4=BF=AE=E6=94=B9demo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README_en.md | 2 +- examples/parsers/demo.py | 4 +- trustrag/modules/clusters/singlepass.py | 2 +- trustrag/modules/clusters/utils.py | 80 ++++++++++++++++--------- 4 files changed, 55 insertions(+), 33 deletions(-) diff --git a/README_en.md b/README_en.md index 380aad5..74a3938 100644 --- a/README_en.md +++ b/README_en.md @@ -20,4 +20,4 @@ git install ... ``` ## Demo -![demo](resources/demo.png) \ No newline at end of file +![app_logging3.png](resources%2Fapp_logging3.png) \ No newline at end of file diff --git a/examples/parsers/demo.py b/examples/parsers/demo.py index 12c5060..ac894c4 100644 --- a/examples/parsers/demo.py +++ b/examples/parsers/demo.py @@ -8,7 +8,7 @@ # files = {'file': ('testfile.txt', data)} # # # 发送POST请求 -# response = requests.post("http://10.208.62.156:10000/gomate_tool/parse/", files=files) +# response = requests.post("http://127.0.0.1:10000/gomate_tool/parse/", files=files) # # # 打印响应内容 # print(response.json()) @@ -21,7 +21,7 @@ files = {'file': open(file_path, 'rb')} # 发送文件 -response = requests.post("http://10.208.62.156:10000/gomate_tool/parse/", files=files) +response = requests.post("http://127.0.0.1:10000/gomate_tool/parse/", files=files) print(response.json()) files['file'].close() diff --git a/trustrag/modules/clusters/singlepass.py b/trustrag/modules/clusters/singlepass.py index 1054316..10376c8 100644 --- a/trustrag/modules/clusters/singlepass.py +++ b/trustrag/modules/clusters/singlepass.py @@ -192,7 +192,7 @@ def classify(self, data): data['cluster_count'] = data.groupby(by='cluster_index')['id'].transform('count') usual_print(self.output_file, "正在保存到") # print(data) - save_cols = ['id', 'title', 'content', 'cluster_level1_index', 'cluster_level2_index', 'cluster_label', + save_cols = ['id', 'title', 'content', 'url','cluster_level1_index', 'cluster_level2_index', 'cluster_label', 'cluster_count'] if self.output_file.endswith('xlsx'): data[save_cols].to_excel(self.output_file, index=None) diff --git a/trustrag/modules/clusters/utils.py b/trustrag/modules/clusters/utils.py index 02da339..4ccc731 100644 --- a/trustrag/modules/clusters/utils.py +++ b/trustrag/modules/clusters/utils.py @@ -1,6 +1,6 @@ import sys -sys.path.append("/data/users/searchgpt/yq/GoMate_dev") +sys.path.append("/data/users/searchgpt/yq/TrustRAG") import json import os import time @@ -19,12 +19,10 @@ from datetime import datetime import uuid - - keywords = [ "美国", "中美贸易", - "俄乌冲突", + "俄罗斯", "中东", ] @@ -135,24 +133,31 @@ def __init__(self, type="title"): 注意: 1. 生成标题首尾不要带有引号,中间可以带有引号 2. 如果输入标题内容是英文,请用中文编写 - 新闻标题: + + ##新闻标题列表: {titles} - 主题标题: + + ##主题标题: + """ else: self.prompt_template = """ - 请根据以下提供的新闻素材,编写一份主题报告,内容贴切主题内容,如果输入标题内容是英文,请用中文编写,不少于50字。 - - 新闻素材: - {contexts} - - 主题报告: - """ + 请根据以下提供的新闻素材,编写一份主题报告,内容贴切主题内容,如果输入标题内容是英文,请用中文编写,不少于50字。 + + 注意: + 1. 文章开头或者结尾不要生成额外修饰词 + 2. 主题内容越多多好,尽量全面详细 + + ##新闻素材: + {contexts} + + ##主题内容: + """ self.api_url = "http://10.208.63.29:8888" def compress(self, titles, contents): if self.type == 'title': - titles = "\n".join(titles) + titles = "\n".join([str(title) for title in titles]) prompt = self.prompt_template.format(titles=titles) else: contexts = '' @@ -187,13 +192,15 @@ def get_es_data(): with open(f"data/{word}_data.json", "r", encoding="utf-8") as f: data = json.load(f) sources = [hit["_source"] for hit in data["hits"]["hits"]] + ids = [hit["_id"] for hit in data["hits"]["hits"]] source_df = pd.DataFrame(sources) + # print(source_df) + source_df["id"] = ids + # source_df["id"] = "source_" + source_df["id"].astype(str) - source_df["id"] = source_df.index - source_df["id"] = "source_" + source_df["id"].astype(str) - - source_df[["id", "title", "content"]].to_excel(f"data/{word}_data.xlsx") + # source_df.to_excel(f"data/{word}_data.xlsx") + source_df[["id", "title", "content","url"]].to_excel(f"data/{word}_data.xlsx") def run_cluster_data(): @@ -260,12 +267,13 @@ def generate_report(): titles = group["title"][:30].tolist() contents = group["title"][:5].tolist() response1 = llm_api.compress(titles, contents) + titles = group["title"][:5].tolist() response2 = llm_report.compress(titles, contents) - + urls=group["url"][:5].tolist() f.write( json.dumps({"cluster_level1_index": index, "level1_title": response1["response"].strip(), - "level1_content": response2["response"].strip()}, ensure_ascii=False) + "\n") + "level1_content": response2["response"].strip(),"level1_urls":urls}, ensure_ascii=False) + "\n") with open(f"result/{keyword}_cluster_level2_index.jsonl", "w", encoding="utf-8") as f: for index, group in tqdm(df.groupby(by=["cluster_level2_index"])): @@ -275,14 +283,18 @@ def generate_report(): response1 = llm_api.compress(titles, contents) titles = group["title"][:5].tolist() response2 = llm_report.compress(titles, contents) + urls=group["url"][:5].tolist() + f.write( json.dumps({"cluster_level2_index": index, "level2_title": response1["response"].strip(), - "level2_content": response2["response"].strip()}, ensure_ascii=False) + "\n") + "level2_content": response2["response"].strip(),"level2_urls":urls}, ensure_ascii=False) + "\n") def insert_mongo_report(): mc = MongoCursor() for idx, keyword in enumerate(keywords): + with open(f"data/{keyword}_data.json", "r", encoding="utf-8") as f: + sources = json.load(f) try: loguru.logger.info("正在插入MongoDB成功:" + keyword) df = pd.read_excel(f"result/{keyword}_cluster_double.xlsx") @@ -293,6 +305,7 @@ def insert_mongo_report(): level1_mapping[data['cluster_level1_index']] = { 'level1_title': data['level1_title'], 'level1_content': data['level1_content'], + 'level1_urls': data['level1_urls'], } level2_mapping = {} @@ -302,17 +315,22 @@ def insert_mongo_report(): level2_mapping[data['cluster_level2_index']] = { 'level2_title': data['level2_title'], 'level2_content': data['level2_content'], + 'level2_urls': data['level2_urls'], } df['level1_title'] = df['cluster_level1_index'].apply( lambda x: level1_mapping.get(x, {}).get('level1_title', '')) df['level1_content'] = df['cluster_level1_index'].apply( lambda x: level1_mapping.get(x, {}).get('level1_content', '')) + df['level1_urls'] = df['cluster_level1_index'].apply( + lambda x: level1_mapping.get(x, {}).get('level1_urls', [])) + df['level2_title'] = df['cluster_level2_index'].apply( lambda x: level2_mapping.get(x, {}).get('level2_title', '')) df['level2_content'] = df['cluster_level2_index'].apply( lambda x: level2_mapping.get(x, {}).get('level2_content', '')) - + df['level2_urls'] = df['cluster_level2_index'].apply( + lambda x: level1_mapping.get(x, {}).get('level2_urls', [])) # 查看结果 # 获取当前日期并格式化为 YYYYMMDD 格式 current_date = datetime.now().strftime("%Y%m%d") @@ -329,7 +347,8 @@ def insert_mongo_report(): 'source': 'admin', 'owner': 'system', 'created_time': int(time.time() * 1000), - 'modified_time': int(time.time() * 1000) + 'modified_time': int(time.time() * 1000), + "articles": sources } contents = [] for level1_index, group1 in df.groupby(by=["cluster_level1_index"]): @@ -338,12 +357,14 @@ def insert_mongo_report(): nodes.append( { 'title': group2['level2_title'].unique()[0], - 'content': group2['level2_content'].unique()[0] + 'content': group2['level2_content'].unique()[0], + 'level2_urls': group2['level2_urls'] } ) contents.append({ 'title': group1['level1_title'].unique()[0], 'content': group1['level1_content'].unique()[0], + 'level1_urls': group1['level1_urls'], 'nodes': nodes }) template['content'] = contents @@ -389,12 +410,13 @@ def main(): loguru.logger.info("调度器已关闭") -# def sing_run(): +def sing_run(): + get_es_data() + run_cluster_data() + generate_report() + insert_mongo_report() - # get_es_data() - # run_cluster_data() - # generate_report() - # insert_mongo_report() if __name__ == '__main__': + sing_run() main() From eb2e7d0f4b55d2bb39e27d48d88e548b5f14cfb2 Mon Sep 17 00:00:00 2001 From: yanqiangmiffy <1185918903@qq.com> Date: Fri, 29 Nov 2024 11:17:39 +0800 Subject: [PATCH 2/6] update@add url --- trustrag/modules/clusters/singlepass.py | 2 +- trustrag/modules/clusters/utils.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/trustrag/modules/clusters/singlepass.py b/trustrag/modules/clusters/singlepass.py index 10376c8..439a101 100644 --- a/trustrag/modules/clusters/singlepass.py +++ b/trustrag/modules/clusters/singlepass.py @@ -181,7 +181,7 @@ def classify(self, data): data['cluster_count'] = data.groupby(by='cluster_index')['id'].transform('count') usual_print(self.output_file, "正在保存到") # print(data) - save_cols = ['id', 'title', 'content', 'cluster_index', 'cluster_label', 'cluster_count'] + save_cols = ['id', 'title', 'content','url', 'cluster_index', 'cluster_label', 'cluster_count'] else: print("len(artilceid_clusterid)", len(artilceid_clusterid)) print("len(clusterid_keywords)", len(clusterid_keywords)) diff --git a/trustrag/modules/clusters/utils.py b/trustrag/modules/clusters/utils.py index 4ccc731..9d00f8f 100644 --- a/trustrag/modules/clusters/utils.py +++ b/trustrag/modules/clusters/utils.py @@ -244,7 +244,7 @@ def run_cluster_data(): ) sc.classify(group) except: - pass + loguru.logger.info("=========二级聚类报错==========") def generate_report(): @@ -358,19 +358,19 @@ def insert_mongo_report(): { 'title': group2['level2_title'].unique()[0], 'content': group2['level2_content'].unique()[0], - 'level2_urls': group2['level2_urls'] + 'level2_urls': group2['level2_urls'].values.tolist() } ) contents.append({ 'title': group1['level1_title'].unique()[0], 'content': group1['level1_content'].unique()[0], - 'level1_urls': group1['level1_urls'], + 'level1_urls': group1['level1_urls'].values.tolist(), 'nodes': nodes }) template['content'] = contents mc.insert_one(template, 'report') except Exception as e: - print(e) + loguru.logger.error(e) loguru.logger.error("插入MongoDB失败:" + keyword) @@ -411,9 +411,9 @@ def main(): def sing_run(): - get_es_data() - run_cluster_data() - generate_report() + # get_es_data() + # run_cluster_data() + # generate_report() insert_mongo_report() From 701444a889f087375f471340e61c13ea72907d4c Mon Sep 17 00:00:00 2001 From: yanqiangmiffy <1185918903@qq.com> Date: Thu, 5 Dec 2024 11:31:57 +0800 Subject: [PATCH 3/6] =?UTF-8?q?cluster@=E6=9B=B4=E6=96=B0=E8=81=9A?= =?UTF-8?q?=E7=B1=BB=E7=AE=97=E6=B3=95=20singglepass?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- trustrag/modules/clusters/utils.py | 422 ----------------------------- 1 file changed, 422 deletions(-) delete mode 100644 trustrag/modules/clusters/utils.py diff --git a/trustrag/modules/clusters/utils.py b/trustrag/modules/clusters/utils.py deleted file mode 100644 index 9d00f8f..0000000 --- a/trustrag/modules/clusters/utils.py +++ /dev/null @@ -1,422 +0,0 @@ -import sys - -sys.path.append("/data/users/searchgpt/yq/TrustRAG") -import json -import os -import time -from datetime import date -from datetime import datetime -from apscheduler.schedulers.blocking import BlockingScheduler -from apscheduler.triggers.cron import CronTrigger -import numpy as np -import pandas as pd -import pymongo -import requests -from bson import ObjectId -from tqdm import tqdm -import loguru -from singlepass import SGCluster -from datetime import datetime -import uuid - -keywords = [ - "美国", - "中美贸易", - "俄罗斯", - "中东", -] - - -class NpEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, np.integer): - return int(obj) - elif isinstance(obj, np.floating): - return float(obj) - elif isinstance(obj, np.ndarray): - return obj.tolist() - elif isinstance(obj, ObjectId): - return str(obj) - elif isinstance(obj, datetime): - return obj.strftime('%Y-%m-%d %H:%M:%S') - elif isinstance(obj, date): - return obj.strftime('%Y-%m-%d') - elif isinstance(obj, ObjectId): - return str(obj) - else: - return super(NpEncoder, self).default(obj) - - -class MongoCursor(object): - def __init__(self): - self.db = self.get_conn() - - def get_conn(self): - # client = pymongo.MongoClient("mongodb://root:golaxyintelligence@10.208.61.115:20000/") - client = pymongo.MongoClient("mongodb://root:golaxyintelligence@10.60.1.145:27017/") - db = client['goinv3_2409'] - return db - - def get_reports(self): - """ - 获取任务标签 - """ - collection = self.db['report'] - data = [json.loads(json.dumps(task, cls=NpEncoder, ensure_ascii=False)) for task in collection.find()][0] - # print(data) - print(data['_id']) - del data['_id'] - return data - - def insert_one(self, document, collection_name): - # print(document) - collection = self.db[collection_name] - result = collection.insert_one(document) - # print(result.inserted_id) - result = {'id': result.inserted_id} - result = json.loads(json.dumps(result, cls=NpEncoder, ensure_ascii=False)) - return result - - def find_one(self, id, collection_name): - collection = self.db[collection_name] - result = collection.find_one({'_id': ObjectId(id)}) - # print(type(result)) - result = json.loads(json.dumps(result, cls=NpEncoder, ensure_ascii=False)) - result = self.rename(result, '_id', 'id') - return result - - def delete_one(self, id, collection_name): - collection = self.db[collection_name] - result = collection.delete_one({'_id': ObjectId(id)}) - return result - - def find_many(self, query, collection_name): - collection = self.db[collection_name] - results = collection.find(query, sort=[("update_time", pymongo.DESCENDING)]) - results = [json.loads(json.dumps(task, cls=NpEncoder, ensure_ascii=False)) for task in results] - results = [self.rename(result, 'title', 'name') for result in results] - results = [self.rename(result, 'update_time', 'update_date') for result in results] - return results - - def update_one(self, id, collection_name, **kwargs): - """ - - :param id: - :param collection_name: - :param update: {'$set': kwargs} - :return: - """ - collection = self.db[collection_name] - result = collection.update_one(filter={'_id': ObjectId(id)}, update={'$set': kwargs}) - # print(type(result.raw_result)) - message = result.raw_result - if message['updatedExisting']: - message['message'] = "保存成功" - else: - message['message'] = "保存失败,请检查id是否存在" - return message - - def rename(self, old_dict, old_name, new_name): - new_dict = {} - for key, value in zip(old_dict.keys(), old_dict.values()): - new_key = key if key != old_name else new_name - new_dict[new_key] = old_dict[key] - return new_dict - - -class LLMCompressApi(): - def __init__(self, type="title"): - self.type = type - if self.type == "title": - self.prompt_template = """ - 分析以下新闻标题列表,提取它们的共同主题。生成一个简洁、准确且不超过10个字的主题标题。 - 注意: - 1. 生成标题首尾不要带有引号,中间可以带有引号 - 2. 如果输入标题内容是英文,请用中文编写 - - ##新闻标题列表: - {titles} - - ##主题标题: - - """ - else: - self.prompt_template = """ - 请根据以下提供的新闻素材,编写一份主题报告,内容贴切主题内容,如果输入标题内容是英文,请用中文编写,不少于50字。 - - 注意: - 1. 文章开头或者结尾不要生成额外修饰词 - 2. 主题内容越多多好,尽量全面详细 - - ##新闻素材: - {contexts} - - ##主题内容: - """ - self.api_url = "http://10.208.63.29:8888" - - def compress(self, titles, contents): - if self.type == 'title': - titles = "\n".join([str(title) for title in titles]) - prompt = self.prompt_template.format(titles=titles) - else: - contexts = '' - for title, content in zip(titles, contents): - contexts += f'标题:{title},"新闻内容:{content}\n' - prompt = self.prompt_template.format(contexts=contexts)[:4096] - # ====根据自己的api接口传入和输出修改==== - - data = { - "prompt": prompt, - } - # loguru.logger.info(data) - post_json = json.dumps(data) - response = requests.post(self.api_url, data=post_json, timeout=600) # v100-2 - response = response.json() - # =====根据自己的api接口传入和输出修改=== - - return response - - -def get_es_data(): - os.makedirs("data/", exist_ok=True) - - for word in keywords: - loguru.logger.info("正在获取es数据:" + word) - # url = f"http://10.208.61.117:9200/document_share_data_30_news/_search?q={word}&size=6000&sort=publish_time:desc" - url = f"http://10.208.61.117:9200/goinv3_document_news/_search?q={word}&sort=publish_time:desc&size=2000" - response = requests.get(url) - with open(f"data/{word}_data.json", "w", encoding="utf-8") as f: - json.dump(response.json(), f, ensure_ascii=False, indent=4) - - with open(f"data/{word}_data.json", "r", encoding="utf-8") as f: - data = json.load(f) - sources = [hit["_source"] for hit in data["hits"]["hits"]] - ids = [hit["_id"] for hit in data["hits"]["hits"]] - - source_df = pd.DataFrame(sources) - # print(source_df) - source_df["id"] = ids - # source_df["id"] = "source_" + source_df["id"].astype(str) - - # source_df.to_excel(f"data/{word}_data.xlsx") - source_df[["id", "title", "content","url"]].to_excel(f"data/{word}_data.xlsx") - - -def run_cluster_data(): - print("=========一级聚类==========") - for keyword in keywords: - loguru.logger.info("一级聚类:" + keyword) - data = pd.read_excel(f"data/{keyword}_data.xlsx", dtype={"id": str}) - data = data.drop_duplicates(subset=["title"]).reset_index(drop=True) - data["id"] = data["id"].astype(str) - if not os.path.exists(f"result/level1_{keyword}_result.xlsx"): - sc = SGCluster( - vector_path=f"result/level1_{keyword}_vector.npy", - result_txt_file=f"result/level1_{keyword}_result.txt", - output_file=f"result/level1_{keyword}_result.xlsx", - threshold=0.4, - max_features=8888, - n_components=1024, - ngrams=2, - level=1 - ) - sc.classify(data) - print("=========二级聚类==========") - for keyword in keywords: - loguru.logger.info("二级聚类:" + keyword) - data = pd.read_excel(f"result/level1_{keyword}_result.xlsx", dtype={"id": str}) - data = data.drop_duplicates(subset=["title"]).reset_index(drop=True) - data["id"] = data["id"].astype(str) - for cluster_index, group in data.groupby(by="cluster_index"): - try: - if len(group) > 4: - group = group.reset_index(drop=True) - sc = SGCluster( - vector_path=f"result/level2_{keyword}_vector_{cluster_index}.npy", - result_txt_file=f"result/level2_{keyword}_result_{cluster_index}.txt", - output_file=f"result/level2_{keyword}_result_{cluster_index}.xlsx", - threshold=0.5, - max_features=8888, - n_components=64, - ngrams=2, - level=2 - ) - sc.classify(group) - except: - loguru.logger.info("=========二级聚类报错==========") - - -def generate_report(): - for keyword in keywords: - loguru.logger.info("正在生成报告:" + keyword) - dfs = [] - for file in os.listdir("result"): - if file.endswith(".xlsx") and keyword in file and 'level2_' in file: - df = pd.read_excel(f"result/{file}") - dfs.append(df) - df = pd.concat(dfs, axis=0).reset_index(drop=True) - - df.to_excel(f"result/{keyword}_cluster_double.xlsx", index=False) - llm_api = LLMCompressApi(type="title") - llm_report = LLMCompressApi(type="report") - if not os.path.exists(f"result/{keyword}_cluster_level1_index.jsonl"): - with open(f"result/{keyword}_cluster_level1_index.jsonl", "w", encoding="utf-8") as f: - for index, group in tqdm(df.groupby(by=["cluster_level1_index"])): - if len(group) >= 3: - titles = group["title"][:30].tolist() - contents = group["title"][:5].tolist() - response1 = llm_api.compress(titles, contents) - - titles = group["title"][:5].tolist() - response2 = llm_report.compress(titles, contents) - urls=group["url"][:5].tolist() - f.write( - json.dumps({"cluster_level1_index": index, "level1_title": response1["response"].strip(), - "level1_content": response2["response"].strip(),"level1_urls":urls}, ensure_ascii=False) + "\n") - - with open(f"result/{keyword}_cluster_level2_index.jsonl", "w", encoding="utf-8") as f: - for index, group in tqdm(df.groupby(by=["cluster_level2_index"])): - if len(group) >= 3: - titles = group["title"][:30].tolist() - contents = group["title"][:5].tolist() - response1 = llm_api.compress(titles, contents) - titles = group["title"][:5].tolist() - response2 = llm_report.compress(titles, contents) - urls=group["url"][:5].tolist() - - f.write( - json.dumps({"cluster_level2_index": index, "level2_title": response1["response"].strip(), - "level2_content": response2["response"].strip(),"level2_urls":urls}, ensure_ascii=False) + "\n") - - -def insert_mongo_report(): - mc = MongoCursor() - for idx, keyword in enumerate(keywords): - with open(f"data/{keyword}_data.json", "r", encoding="utf-8") as f: - sources = json.load(f) - try: - loguru.logger.info("正在插入MongoDB成功:" + keyword) - df = pd.read_excel(f"result/{keyword}_cluster_double.xlsx") - level1_mapping = {} - with open(f"result/{keyword}_cluster_level1_index.jsonl", 'r', encoding='utf-8') as f: - for line in f.readlines(): - data = json.loads(line.strip()) - level1_mapping[data['cluster_level1_index']] = { - 'level1_title': data['level1_title'], - 'level1_content': data['level1_content'], - 'level1_urls': data['level1_urls'], - } - - level2_mapping = {} - with open(f"result/{keyword}_cluster_level2_index.jsonl", 'r', encoding='utf-8') as f: - for line in f.readlines(): - data = json.loads(line.strip()) - level2_mapping[data['cluster_level2_index']] = { - 'level2_title': data['level2_title'], - 'level2_content': data['level2_content'], - 'level2_urls': data['level2_urls'], - } - - df['level1_title'] = df['cluster_level1_index'].apply( - lambda x: level1_mapping.get(x, {}).get('level1_title', '')) - df['level1_content'] = df['cluster_level1_index'].apply( - lambda x: level1_mapping.get(x, {}).get('level1_content', '')) - df['level1_urls'] = df['cluster_level1_index'].apply( - lambda x: level1_mapping.get(x, {}).get('level1_urls', [])) - - df['level2_title'] = df['cluster_level2_index'].apply( - lambda x: level2_mapping.get(x, {}).get('level2_title', '')) - df['level2_content'] = df['cluster_level2_index'].apply( - lambda x: level2_mapping.get(x, {}).get('level2_content', '')) - df['level2_urls'] = df['cluster_level2_index'].apply( - lambda x: level1_mapping.get(x, {}).get('level2_urls', [])) - # 查看结果 - # 获取当前日期并格式化为 YYYYMMDD 格式 - current_date = datetime.now().strftime("%Y%m%d") - # 生成一个唯一的ID - unique_id = f"{current_date}_{uuid.uuid4().hex[:6]}" # 只取uuid的前6位 - template = { - '_id': f'{current_date}_00{idx + 1}_{unique_id}', - 'name': f'开源情报每日简报-{current_date}-{keyword}', - 'description': '', - 'tags': ['开源', '新闻', keyword], - 'content': [], - 'version': '1.0', - 'comment': '', - 'source': 'admin', - 'owner': 'system', - 'created_time': int(time.time() * 1000), - 'modified_time': int(time.time() * 1000), - "articles": sources - } - contents = [] - for level1_index, group1 in df.groupby(by=["cluster_level1_index"]): - nodes = [] - for level2_index, group2 in group1.groupby(by=["cluster_level2_index"]): - nodes.append( - { - 'title': group2['level2_title'].unique()[0], - 'content': group2['level2_content'].unique()[0], - 'level2_urls': group2['level2_urls'].values.tolist() - } - ) - contents.append({ - 'title': group1['level1_title'].unique()[0], - 'content': group1['level1_content'].unique()[0], - 'level1_urls': group1['level1_urls'].values.tolist(), - 'nodes': nodes - }) - template['content'] = contents - mc.insert_one(template, 'report') - except Exception as e: - loguru.logger.error(e) - loguru.logger.error("插入MongoDB失败:" + keyword) - - -def run(): - try: - loguru.logger.info("开始执行任务") - get_es_data() - run_cluster_data() - generate_report() - insert_mongo_report() - loguru.logger.info("任务执行完成") - except Exception as e: - loguru.logger.error(f"任务执行出错: {str(e)}") - - -def main(): - scheduler = BlockingScheduler() - - # 设置每天06:00执行任务 - trigger = CronTrigger( - hour=6, - minute=0 - ) - - scheduler.add_job( - run, - trigger=trigger, - id='daily_job', - name='每日数据处理任务', - misfire_grace_time=3600 # 错过执行时间1小时内仍会执行 - ) - - loguru.logger.info("调度器已启动,等待执行...") - try: - scheduler.start() - except (KeyboardInterrupt, SystemExit): - loguru.logger.info("调度器已关闭") - - -def sing_run(): - # get_es_data() - # run_cluster_data() - # generate_report() - insert_mongo_report() - - -if __name__ == '__main__': - sing_run() - main() From 0a13b83040b6e1fcad73dc2160acc83406aee6ce Mon Sep 17 00:00:00 2001 From: yanqiangmiffy <1185918903@qq.com> Date: Thu, 5 Dec 2024 13:38:01 +0800 Subject: [PATCH 4/6] =?UTF-8?q?update@chunk=E5=88=87=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/chunks.md | 82 +++++++++++++++++++++++++++++ trustrag/modules/chunks/__init__.py | 0 2 files changed, 82 insertions(+) create mode 100644 docs/chunks.md create mode 100644 trustrag/modules/chunks/__init__.py diff --git a/docs/chunks.md b/docs/chunks.md new file mode 100644 index 0000000..3f7668b --- /dev/null +++ b/docs/chunks.md @@ -0,0 +1,82 @@ +## 文本分块 + +### 切块定义 +分块(Chunking)是将整篇文本分成小段的过程。当我们使用LLM embedding内容时,分块可以帮助优化从向量数据库被召回的内容的准确性,因此文本段的质量也是RAG中比较重要的一环。 + +### 切块作用 + +切块的主要目的是对上下文中的一个片段进行嵌入的过程中尽可能少的噪声,而还要保证内容仍然与语义相关。 + + + +### 常见的chunk切分方法 + +1、 固定长度切分 +● 操作:按照文本的字数或者词数将文本切分为多块。比如可以将文档按照500字切分,切分之后的每个文本块字数为500。 + +● 优点:简单易实现,可快速处理。 + +● 缺点:可能会导致上下文断裂,影响重要的语义信息。 + +2、 基于句子的切分 +● 操作:按照句子粒度进行切分,比如以句号、点号等标点符号进行切分 + +● 优点:该方法能保证每个句子的完整性、上下文连贯性 + +● 缺点:如果句子过长,可能丢失一些细节。可能切分的不准确,影响检索效果。 + +3、 滑动窗口切分 +● 操作:创建一个重叠的滑动窗口,比如设置窗口大小为500,步长为100。 + +● 优点:可以减少因固定长度或句子边界切分可能引入的信息丢失问题。 + +● 缺点:上下文重叠导致信息重复,增加计算量。窗口的开始和结束可能会在句子或短语中间,导致语义不连贯。 + +4、 基于主题切分 +● 操作:通过识别文章主题的变换点进行切分。 + +● 优点:保持高度的语义连贯性,适用于结构化比较好的文本。 + +● 缺点:无法处理结构化不足的文本。 + +5、 基于语义相似度的切分 +● 操作:使用模型来评估文本间的语义相似度,并在相似度降低到某个阈值以下时进行切分 + +● 优点:保持高度语义相似性,优化检索效果 + +● 缺点:模型准确率要求高 + +6、按文档结构切分 +● 操作:典型的是markdown切分工具,按照文档结构切分 + +● 优点:语义连贯 + +● 缺点:有的问题涉及多个部分的内容,可能无法覆盖;生成模型的token数有限制,该切分方式可能不满足token限制; + +7、文档块摘要切分 +● 操作:切分文档后,使用摘要生成技术来提取每个块的关键信息 + +● 优点:可以将关键信息精简并保留 + +● 缺点:摘要生成方法的精度直接影响整体效果 + +### 分块需要考虑因素: +1、 被索引内容的性质是什么? + +是处理较长的文本(书籍或文章),还是处理较短的内容。不同场景需要的分块策略不同。 + +2、 不同的embedding模型在不同大小块上的效果不同 + +3、 查询query的长度和复杂度与块的切分有很大关系 + +用户输入的查询文件时简短而具体的还是冗长而复杂的。 + +4、 如何在特定的程序中使用检索结果 + +比如在LLM中,token长度会限制切块的大小。 + +**注意:文档切的多,向量就多,导致查询效率变差,语义内聚性也降低。因此,没必要切的时候,尽量别切。但是切的时候也要顶着最大长度切,能有效降低文档切块的数量。** + + +### 参考资料 +[RAG中常见的chunk切分方法](https://www.ctyun.cn/developer/article/551915360890949) \ No newline at end of file diff --git a/trustrag/modules/chunks/__init__.py b/trustrag/modules/chunks/__init__.py new file mode 100644 index 0000000..e69de29 From e874bf6e9014156eb2957c8b38d926666cf3f670 Mon Sep 17 00:00:00 2001 From: yanqiangmiffy <1185918903@qq.com> Date: Thu, 19 Dec 2024 15:31:52 +0800 Subject: [PATCH 5/6] engine@update qdrant --- ...7\276\216\345\233\275_cluster_double.xlsx" | Bin 0 -> 165 bytes trustrag/modules/engine/qdrant_service.py | 91 ++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 "trustrag/modules/clusters/result/.~\347\276\216\345\233\275_cluster_double.xlsx" create mode 100644 trustrag/modules/engine/qdrant_service.py diff --git "a/trustrag/modules/clusters/result/.~\347\276\216\345\233\275_cluster_double.xlsx" "b/trustrag/modules/clusters/result/.~\347\276\216\345\233\275_cluster_double.xlsx" new file mode 100644 index 0000000000000000000000000000000000000000..237cc0c693115349767f2b9a8df4da2746019baa GIT binary patch literal 165 kcmd<6Ow23HOw3DHAPI0VR5Bzoz>% literal 0 HcmV?d00001 diff --git a/trustrag/modules/engine/qdrant_service.py b/trustrag/modules/engine/qdrant_service.py new file mode 100644 index 0000000..38c3157 --- /dev/null +++ b/trustrag/modules/engine/qdrant_service.py @@ -0,0 +1,91 @@ +import openai +from fastapi import FastAPI +from qdrant_client import QdrantClient +from sentence_transformers import SentenceTransformer + +import os +from pathlib import Path + +from dotenv import load_dotenv + +load_dotenv() + +# Paths +ROOT = Path(__file__).parent +DATA = ROOT / "data" +MAX_SENTENCE_LENGTH = 100 + +# Qdrant +QDRANT_HOST = os.getenv("QDRANT_HOST") +QDRANT_PORT = os.getenv("QDRANT_PORT") +QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") +COLLECTION_NAME = "meditations-collection" + +# OpenAI +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +openai.api_key = OPENAI_API_KEY + +qdrant_client = QdrantClient( + host=QDRANT_HOST, + port=QDRANT_PORT, + api_key=QDRANT_API_KEY, +) + +retrieval_model = SentenceTransformer("msmarco-MiniLM-L-6-v3") + +app = FastAPI() + + +def build_prompt(question: str, references: list) -> tuple[str, str]: + prompt = f""" + You're Marcus Aurelius, emperor of Rome. You're giving advice to a friend who has asked you the following question: '{question}' + + You've selected the most relevant passages from your writings to use as source for your answer. Cite them in your answer. + + References: + """.strip() + + references_text = "" + + for i, reference in enumerate(references, start=1): + text = reference.payload["text"].strip() + references_text += f"\n[{i}]: {text}" + + prompt += ( + references_text + + "\nHow to cite a reference: This is a citation [1]. This one too [3]. And this is sentence with many citations [2][3].\nAnswer:" + ) + return prompt, references_text + + +@app.get("/") +def read_root(): + return { + "message": "Make a post request to /ask to ask a question about Meditations by Marcus Aurelius" + } + + +@app.post("/ask") +def ask(question: str): + similar_docs = qdrant_client.search( + collection_name=COLLECTION_NAME, + query_vector=retrieval_model.encode(question), + limit=3, + append_payload=True, + ) + + prompt, references = build_prompt(question, similar_docs) + + response = openai.ChatCompletion.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "user", "content": prompt}, + ], + max_tokens=250, + temperature=0.2, + ) + + return { + "response": response["choices"][0]["message"]["content"], + "references": references, + } \ No newline at end of file From f6d07c73eaf5d69ceb060a1af5389baf037ad48c Mon Sep 17 00:00:00 2001 From: yanqiangmiffy <1185918903@qq.com> Date: Thu, 26 Dec 2024 20:57:28 +0800 Subject: [PATCH 6/6] bug@fixed faiss float64 to float32 --- trustrag/modules/retrieval/dense_retriever.py | 93 +++++++++++++++---- 1 file changed, 76 insertions(+), 17 deletions(-) diff --git a/trustrag/modules/retrieval/dense_retriever.py b/trustrag/modules/retrieval/dense_retriever.py index 54abcc9..b36ec92 100644 --- a/trustrag/modules/retrieval/dense_retriever.py +++ b/trustrag/modules/retrieval/dense_retriever.py @@ -9,7 +9,7 @@ """ import gc import os -from typing import List +from typing import List,Dict,Union import faiss import numpy as np @@ -82,57 +82,116 @@ def __init__(self, config): self.batch_size = config.batch_size def load_index(self, index_path: str = None): - """Load the FAISS index from the specified path.""" + """ + Load the FAISS index from the specified path. + + Args: + index_path (str, optional): The path to load the index from. Defaults to self.index_path. + """ if index_path is None: index_path = self.index_path + # Load the document embeddings and texts from the saved file data = np.load(os.path.join(index_path, 'document.vecstore.npz'), allow_pickle=True) self.documents, self.embeddings = data['documents'].tolist(), data['embeddings'].tolist() + # Load the FAISS index self.index = faiss.read_index(os.path.join(index_path, 'fassis.index')) print("Index loaded successfully from", index_path) - del data - gc.collect() + del data # Free up memory + gc.collect() # Perform garbage collection def save_index(self, index_path: str = None): - """Save the FAISS index to the specified path.""" + """ + Save the FAISS index to the specified path. + + Args: + index_path (str, optional): The path to save the index to. Defaults to self.index_path. + """ if self.index and self.embeddings and self.documents: if index_path is None: index_path = self.index_path + # Create the directory if it doesn't exist if not os.path.exists(index_path): os.makedirs(index_path, exist_ok=True) print(f"Index saving to:{index_path}") + # Save the document embeddings and texts np.savez( os.path.join(index_path, 'document.vecstore'), embeddings=self.embeddings, documents=self.documents ) + # Save the FAISS index faiss.write_index(self.index, os.path.join(index_path, 'fassis.index')) print("Index saved successfully to", index_path) def get_embedding(self, sentences: List[str]) -> np.ndarray: - """Generate embeddings for a list of sentences.""" - return self.model.encode(sentences=sentences, batch_size=self.batch_size) # Using configured batch_size + """ + Generate embeddings for a list of sentences. + Args: + sentences (List[str]): List of sentences to generate embeddings for. + + Returns: + np.ndarray: A numpy array of embeddings. + """ + # Using configured batch_size + return self.model.encode(sentences=sentences, batch_size=self.batch_size) def add_texts(self, texts: List[str]): - """Add multiple texts to the index.""" + """ + Add multiple texts to the index. + + Args: + texts (List[str]): List of texts to add to the index. + """ embeddings = self.get_embedding(texts) - self.index.add(embeddings) - self.documents.extend(texts) - self.embeddings.extend(embeddings) - self.num_documents += len(texts) + # Convert embeddings to float32 (required by FAISS) + # faiss issue:https://github.com/facebookresearch/faiss/issues/1732 + self.index.add(embeddings.astype("float32")) + self.documents.extend(texts) # Add texts to the documents list + self.embeddings.extend(embeddings) # Add embeddings to the embeddings list + self.num_documents += len(texts) # Update the document count + def add_text(self, text: str): - """Add a single text to the index.""" + """ + Add a single text to the index. + + Args: + text (str): The text to add to the index. + """ self.add_texts([text]) def build_from_texts(self, corpus: List[str]): - """Process and index a list of texts in batches.""" + """ + Process and index a list of texts in batches. + + Args: + corpus (List[str]): List of texts to index. + """ if not corpus: return + # Process texts in batches for i in tqdm(range(0, len(corpus), self.batch_size), desc="Building index"): batch = corpus[i:i + self.batch_size] self.add_texts(batch) - def retrieve(self, query: str = None, top_k: int = 5): - D, I = self.index.search(self.get_embedding([query]), top_k) - return [{'text': self.documents[idx], 'score': score} for idx, score in zip(I[0], D[0])] + + def retrieve(self, query: str = None, top_k: int = 5) -> List[Dict[str, Union[str, float]]]: + """ + Retrieve the top_k documents relevant to the query. + + Args: + query (str, optional): The query string. Defaults to None. + top_k (int, optional): The number of top documents to retrieve. Defaults to 5. + + Returns: + List[Dict[str, Union[str, float]]]: A list of dictionaries containing the retrieved documents and their scores. + """ + # generate query embedding + query_embedding = self.get_embedding([query]).astype("float32") + # search the index + D, I = self.index.search(query_embedding, top_k) + # free up memory + del query_embedding + # Return the retrieved documents with their scores + return [{'text': self.documents[idx], 'score': score} for idx, score in zip(I[0], D[0])] \ No newline at end of file