-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathgeneral_process.py
220 lines (192 loc) · 9.78 KB
/
general_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# -*- coding: utf-8 -*-
from utils.pb_api import PbTalker
from utils.general_utils import get_logger, extract_and_convert_dates, is_chinese
from agents.get_info import *
import json
from scrapers import *
from utils.zhipu_search import run_v4_async
from urllib.parse import urlparse
from crawl4ai import AsyncWebCrawler, CacheMode
from datetime import datetime
import feedparser
project_dir = os.environ.get("PROJECT_DIR", "")
if project_dir:
os.makedirs(project_dir, exist_ok=True)
wiseflow_logger = get_logger('wiseflow', project_dir)
pb = PbTalker(wiseflow_logger)
crawler = AsyncWebCrawler(verbose=False)
model = os.environ.get("PRIMARY_MODEL", "")
if not model:
raise ValueError("PRIMARY_MODEL not set, please set it in environment variables or edit core/.env")
secondary_model = os.environ.get("SECONDARY_MODEL", model)
async def info_process(url: str,
url_title: str,
author: str,
publish_date: str,
contents: list[str],
link_dict: dict,
focus_id: str,
get_info_prompts: list[str]):
infos = await get_info(contents, link_dict, get_info_prompts, author, publish_date, _logger=wiseflow_logger)
if infos:
wiseflow_logger.debug(f'get {len(infos)} infos, will save to pb')
for info in infos:
info['url'] = url
info['url_title'] = url_title
info['tag'] = focus_id
_ = pb.add(collection_name='infos', body=info)
if not _:
wiseflow_logger.error('add info failed, writing to cache_file')
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
with open(os.path.join(project_dir, f'{timestamp}_cache_infos.json'), 'w', encoding='utf-8') as f:
json.dump(info, f, ensure_ascii=False, indent=4)
async def main_process(focus: dict, sites: list):
wiseflow_logger.debug('new task initializing...')
focus_id = focus["id"]
focus_point = focus["focuspoint"].strip()
explanation = focus["explanation"].strip()
wiseflow_logger.debug(f'focus_id: {focus_id}, focus_point: {focus_point}, explanation: {explanation}, search_engine: {focus["search_engine"]}')
existing_urls = {url['url'] for url in pb.read(collection_name='infos', fields=['url'], filter=f"tag='{focus_id}'")}
focus_statement = f"//{focus_point}//"
if explanation:
if is_chinese(explanation):
focus_statement = f"{focus_statement}\n解释:{explanation}"
else:
focus_statement = f"{focus_statement}\nExplanation: {explanation}"
date_stamp = datetime.now().strftime('%Y-%m-%d')
if is_chinese(focus_statement):
get_link_sys_prompt = get_link_system.replace('{focus_statement}', focus_statement)
get_link_sys_prompt = f"今天的日期是{date_stamp},{get_link_sys_prompt}"
get_link_suffix_prompt = get_link_suffix
get_info_sys_prompt = get_info_system.replace('{focus_statement}', focus_statement)
get_info_sys_prompt = f"今天的日期是{date_stamp},{get_info_sys_prompt}"
get_info_suffix_prompt = get_info_suffix
else:
get_link_sys_prompt = get_link_system_en.replace('{focus_statement}', focus_statement)
get_link_sys_prompt = f"today is {date_stamp}, {get_link_sys_prompt}"
get_link_suffix_prompt = get_link_suffix_en
get_info_sys_prompt = get_info_system_en.replace('{focus_statement}', focus_statement)
get_info_sys_prompt = f"today is {date_stamp}, {get_info_sys_prompt}"
get_info_suffix_prompt = get_info_suffix_en
get_link_prompts = [get_link_sys_prompt, get_link_suffix_prompt, secondary_model]
get_info_prompts = [get_info_sys_prompt, get_info_suffix_prompt, model]
working_list = set()
if focus.get('search_engine', False):
query = focus_point if not explanation else f"{focus_point}({explanation})"
search_intent, search_content = await run_v4_async(query, _logger=wiseflow_logger)
_intent = search_intent['search_intent'][0]['intent']
_keywords = search_intent['search_intent'][0]['keywords']
wiseflow_logger.info(f'query: {query}\nsearch intent: {_intent}\nkeywords: {_keywords}')
search_results = search_content['search_result']
for result in search_results:
url = result['link']
if url in existing_urls:
continue
if '(发布时间' not in result['title']:
wiseflow_logger.debug(f'can not find publish time in the search result {url}, adding to working list')
working_list.add(url)
continue
title, publish_date = result['title'].split('(发布时间')
title = title.strip() + '(from search engine)'
publish_date = publish_date.strip(')')
# 严格匹配YYYY-MM-DD格式
date_match = re.search(r'\d{4}-\d{2}-\d{2}', publish_date)
if date_match:
publish_date = date_match.group()
publish_date = extract_and_convert_dates(publish_date)
else:
wiseflow_logger.warning(f'can not find publish time in the search result {url}, adding to working list')
working_list.add(url)
continue
author = result['media']
texts = [result['content']]
await info_process(url, title, author, publish_date, texts, {}, focus_id, get_info_prompts)
recognized_img_cache = {}
for site in sites:
if site.get('type', 'web') == 'rss':
try:
feed = feedparser.parse(site['url'])
except Exception as e:
wiseflow_logger.warning(f"{site['url']} RSS feed is not valid: {e}")
continue
rss_urls = {entry.link for entry in feed.entries if entry.link}
wiseflow_logger.debug(f'get {len(rss_urls)} urls from rss source {site["url"]}')
working_list.update(rss_urls - existing_urls)
else:
working_list.add(site['url'])
await crawler.start()
while working_list:
url = working_list.pop()
existing_urls.add(url)
wiseflow_logger.debug(f'process new url, still {len(working_list)} urls in working list')
has_common_ext = any(url.lower().endswith(ext) for ext in common_file_exts)
if has_common_ext:
wiseflow_logger.debug(f'{url} is a common file, skip')
continue
parsed_url = urlparse(url)
existing_urls.add(f"{parsed_url.scheme}://{parsed_url.netloc}")
existing_urls.add(f"{parsed_url.scheme}://{parsed_url.netloc}/")
domain = parsed_url.netloc
if domain in custom_fetching_configs:
wiseflow_logger.debug(f'{url} will using custom crawl4ai run config')
run_config = custom_fetching_configs[domain]
else:
run_config = crawler_config
run_config.cache_mode = CacheMode.WRITE_ONLY if url in sites else CacheMode.ENABLED
result = await crawler.arun(url=url, config=run_config)
if not result.success:
wiseflow_logger.warning(f'{url} failed to crawl, destination web cannot reach, skip')
continue
metadata_dict = result.metadata if result.metadata else {}
if domain in custom_scrapers:
result = custom_scrapers[domain](result)
raw_markdown = result.content
used_img = result.images
title = result.title
if title == 'maybe a new_type_article':
wiseflow_logger.warning(f'we found a new type here,{url}')
base_url = result.base
author = result.author
publish_date = result.publish_date
else:
raw_markdown = result.markdown
media_dict = result.media if result.media else {}
used_img = [d['src'] for d in media_dict.get('images', [])]
title = ''
base_url = ''
author = ''
publish_date = ''
if not raw_markdown:
wiseflow_logger.warning(f'{url} no content, something during fetching failed, skip')
continue
if not title:
title = metadata_dict.get('title', '')
if not base_url:
base_url = metadata_dict.get('base', '')
if not base_url:
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
if not author:
author = metadata_dict.get('author', '')
if not publish_date:
publish_date = metadata_dict.get('publish_date', '')
link_dict, links_parts, contents, recognized_img_cache = await pre_process(raw_markdown, base_url, used_img, recognized_img_cache, existing_urls)
if link_dict and links_parts:
links_texts = []
for _parts in links_parts:
links_texts.extend(_parts.split('\n\n'))
more_url = await get_more_related_urls(links_texts, link_dict, get_link_prompts, _logger=wiseflow_logger)
if more_url:
wiseflow_logger.debug(f'get {len(more_url)} more related urls, will add to working list')
working_list.update(more_url - existing_urls)
if not contents:
continue
if not author or author.lower() == 'na' or not publish_date or publish_date.lower() == 'na':
author, publish_date = await get_author_and_publish_date(raw_markdown, model, _logger=wiseflow_logger)
if not author or author.lower() == 'na':
author = parsed_url.netloc
if publish_date:
publish_date = extract_and_convert_dates(publish_date)
else:
publish_date = date_stamp
await info_process(url, title, author, publish_date, contents, link_dict, focus_id, get_info_prompts)
await crawler.close()