mirror of
https://github.com/TeamWiseFlow/wiseflow.git
synced 2025-01-23 02:20:20 +08:00
second commit for V0.3.22
This commit is contained in:
parent
f18d9ba084
commit
b83ca2369a
20
CHANGELOG.md
Normal file
20
CHANGELOG.md
Normal file
@ -0,0 +1,20 @@
|
||||
# V0.3.2
|
||||
- 引入 Crawlee(playwrigt模块),大幅提升通用爬取能力,适配实际项目场景;
|
||||
|
||||
Introduce Crawlee (playwright module), significantly enhancing general crawling capabilities and adapting to practical project scenarios;
|
||||
|
||||
- 完全重写了信息提取模块,引入“爬-查一体”策略,你关注的才是你想要的;
|
||||
|
||||
Completely rewrote the information extraction module, introducing an "integrated crawl-search" strategy, focusing on what you care about;
|
||||
|
||||
- 新策略下放弃了 gne、jieba 等模块,去除了安装包;
|
||||
|
||||
Under the new strategy, modules such as gne and jieba have been abandoned, reducing the installation package size;
|
||||
|
||||
- 重写了 pocketbase 的表单结构;
|
||||
|
||||
Rewrote the PocketBase form structure;
|
||||
|
||||
- 进一步简化部署操作步骤。
|
||||
|
||||
Further simplified deployment steps.
|
@ -1,11 +1,10 @@
|
||||
from core.llms.openai_wrapper import openai_llm as llm
|
||||
# from core.llms.siliconflow_wrapper import sfa_llm
|
||||
import re
|
||||
from core.utils.general_utils import is_chinese, extract_and_convert_dates, extract_urls
|
||||
from loguru import logger
|
||||
from core.utils.pb_api import PbTalker
|
||||
import os
|
||||
from datetime import datetime, date
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlparse
|
||||
import json_repair
|
||||
|
||||
@ -27,7 +26,7 @@ class GeneralInfoExtractor:
|
||||
focus_data.append({"name": focus, "explaination": explanation,
|
||||
"id": pb.add('focus_points', {"focuspoint": focus, "explanation": explanation})})
|
||||
|
||||
self.focus_list = [item["focuspoint"] for item in focus_data]
|
||||
# self.focus_list = [item["focuspoint"] for item in focus_data]
|
||||
self.focus_dict = {item["focuspoint"]: item["id"] for item in focus_data}
|
||||
focus_statement = ''
|
||||
for item in focus_data:
|
||||
@ -80,6 +79,8 @@ If the webpage text does not contain any information related to points of intere
|
||||
self.get_more_link_suffix = "Please analyze the above text: URL pairs. First, output your analysis basis, and then give the conclusion on whether to select it. If you decide to select this item, then copy and output the URL of this item following the conclusion; otherwise, proceed directly to the analysis of the next item. Analyze one by one, do not miss any one."
|
||||
|
||||
async def get_author_and_publish_date(self, text: str) -> tuple[str, str]:
|
||||
if not text:
|
||||
return "NA", "NA"
|
||||
system_prompt = "As an information extraction assistant, your task is to accurately extract the source (or author) and publication date from the given webpage text. It is important to adhere to extracting the information directly from the original text. If the original text does not contain a particular piece of information, please replace it with NA"
|
||||
suffix = '''Please output the extracted information in the following JSON format:
|
||||
{"source": source or article author (use "NA" if this information cannot be extracted), "publish_date": extracted publication date (keep only the year, month, and day; use "NA" if this information cannot be extracted)}'''
|
||||
@ -94,13 +95,13 @@ If the webpage text does not contain any information related to points of intere
|
||||
result = json_repair.repair_json(llm_output, return_objects=True)
|
||||
self.logger.debug(f"decoded_object: {result}")
|
||||
if not isinstance(result, dict):
|
||||
self.logger.debug("failed to parse from llm output")
|
||||
self.logger.warning("failed to parse from llm output")
|
||||
return '', ''
|
||||
if 'source' not in result or 'publish_date' not in result:
|
||||
self.logger.debug("failed to parse from llm output")
|
||||
self.logger.warning("failed to parse from llm output")
|
||||
return '', ''
|
||||
|
||||
return result['source'], result['publish_date']
|
||||
return result['source'], extract_and_convert_dates(result['publish_date'])
|
||||
|
||||
async def get_more_related_urls(self, link_dict: dict) -> set[str]:
|
||||
if not link_dict:
|
||||
@ -116,80 +117,79 @@ If the webpage text does not contain any information related to points of intere
|
||||
raw_urls = list(link_dict.values())
|
||||
for url in urls:
|
||||
if url not in raw_urls:
|
||||
self.logger.debug(f"{url} not in link_dict, it's model's Hallucination")
|
||||
self.logger.warning(f"{url} not in link_dict, it's model's Hallucination")
|
||||
urls.remove(url)
|
||||
return urls
|
||||
|
||||
async def get_info(self, text: str, domain: str) -> list[dict]:
|
||||
# logger.debug(f'receive new article_content:\n{article_content}')
|
||||
async def get_info(self, text: str, info_pre_fix: str) -> list[dict]:
|
||||
if not text:
|
||||
return []
|
||||
content = f'<text>\n{text}\n</text>\n\n{self.get_info_suffix}'
|
||||
result = await llm([{'role': 'system', 'content': self.get_info_prompt}, {'role': 'user', 'content': content}],
|
||||
model=self.model, temperature=0.1, response_format={"type": "json_object"})
|
||||
self.logger.debug(f'get_info llm output:\n{result}')
|
||||
if not result:
|
||||
return []
|
||||
|
||||
result = json_repair.repair_json(result, return_objects=True)
|
||||
if not isinstance(result, list):
|
||||
self.logger.warning("failed to parse from llm output")
|
||||
return []
|
||||
if not result:
|
||||
self.logger.info("no info found")
|
||||
return []
|
||||
|
||||
system = '''判断给定的信息是否与网页文本相符。信息将用标签<info></info>包裹,网页文本则用<text></text>包裹。请遵循如下工作流程:
|
||||
1、尝试找出网页文本中所有与信息对应的文本片段(可能有多处);
|
||||
2、基于这些片段给出是否相符的最终结论,最终结论仅为“是”或“否”'''
|
||||
suffix = '先输出找到的所有文本片段,再输出最终结论(仅为是或否)'
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
domain = urlparse(url).netloc
|
||||
|
||||
def get_info(article_content: str) -> list[dict]:
|
||||
# logger.debug(f'receive new article_content:\n{article_content}')
|
||||
result = openai_llm([{'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': article_content}],
|
||||
model=get_info_model, logger=logger, temperature=0.1)
|
||||
|
||||
# results = pattern.findall(result)
|
||||
texts = result.split('<tag>')
|
||||
texts = [_.strip() for _ in texts if '</tag>' in _.strip()]
|
||||
if not texts:
|
||||
logger.debug(f'can not find info, llm result:\n{result}')
|
||||
return []
|
||||
|
||||
cache = []
|
||||
for text in texts:
|
||||
try:
|
||||
strings = text.split('</tag>')
|
||||
tag = strings[0]
|
||||
tag = tag.strip()
|
||||
if tag not in focus_list:
|
||||
logger.info(f'tag not in focus_list: {tag}, aborting')
|
||||
final = []
|
||||
for item in result:
|
||||
if 'focus' not in item or 'content' not in item:
|
||||
self.logger.warning(f"not quality item: {item}, it's model's Hallucination")
|
||||
continue
|
||||
if item['focus'] not in self.focus_dict:
|
||||
self.logger.warning(f"{item['focus']} not in focus_list, it's model's Hallucination")
|
||||
continue
|
||||
judge = await llm([{'role': 'system', 'content': system},
|
||||
{'role': 'user', 'content': f'<info>\n{item["content"]}\n</info>\n\n<text>\n{text}\n</text>\n\n{suffix}'}],
|
||||
model=self.secondary_model, temperature=0.1)
|
||||
self.logger.debug(f'judge llm output:\n{judge}')
|
||||
if not judge:
|
||||
self.logger.warning("failed to parse from llm output, skip checking")
|
||||
final.append({'tag': self.focus_dict[item['focus']], 'content': f"{info_pre_fix}{item['content']}"})
|
||||
continue
|
||||
info = strings[1]
|
||||
info = info.split('\n\n')
|
||||
info = info[0].strip()
|
||||
except Exception as e:
|
||||
logger.info(f'parse error: {e}')
|
||||
tag = ''
|
||||
info = ''
|
||||
|
||||
if not info or not tag:
|
||||
logger.info(f'parse failed-{text}')
|
||||
continue
|
||||
to_save = False
|
||||
for i in range(min(7, len(judge))):
|
||||
char = judge[-1 - i]
|
||||
if char == '是':
|
||||
to_save = True
|
||||
break
|
||||
elif char == '否':
|
||||
break
|
||||
if not to_save:
|
||||
self.logger.info(f"secondary model judge {item} not faithful to article text, aborting")
|
||||
continue
|
||||
final.append({'tag': self.focus_dict[item['focus']], 'content': f"{info_pre_fix}{item['content']}"})
|
||||
|
||||
if len(info) < 7:
|
||||
logger.info(f'info too short, possible invalid: {info}')
|
||||
continue
|
||||
if not final:
|
||||
self.logger.info("no quality result from llm output")
|
||||
return final
|
||||
|
||||
if info.startswith('无相关信息') or info.startswith('该新闻未提及') or info.startswith('未提及'):
|
||||
logger.info(f'no relevant info: {text}')
|
||||
continue
|
||||
async def __call__(self, text: str, link_dict: dict, base_url: str, author: str = None, publish_date: str = None) -> tuple[list, set, str, str]:
|
||||
if not author and not publish_date and text:
|
||||
author, publish_date = await self.get_author_and_publish_date(text)
|
||||
|
||||
while info.endswith('"'):
|
||||
info = info[:-1]
|
||||
info = info.strip()
|
||||
if not author or author.lower() == 'na':
|
||||
author = urlparse(base_url).netloc
|
||||
|
||||
# 拼接下来源信息
|
||||
sources = re.findall(r'\[from (.*?)]', article_content)
|
||||
if sources and sources[0]:
|
||||
info = f"[from {sources[0]}] {info}"
|
||||
if not publish_date or publish_date.lower() == 'na':
|
||||
publish_date = datetime.now().strftime('%Y-%m-%d')
|
||||
|
||||
cache.append({'content': info, 'tag': focus_dict[tag]})
|
||||
related_urls = await self.get_more_related_urls(link_dict)
|
||||
info_prefix = f"//{author} {publish_date}//"
|
||||
infos = await self.get_info(text, info_prefix)
|
||||
|
||||
return cache
|
||||
return infos, related_urls, author, publish_date
|
||||
|
43
core/custom_crawlers/README.md
Normal file
43
core/custom_crawlers/README.md
Normal file
@ -0,0 +1,43 @@
|
||||
wiseflow 致力于通过一套通用流程(使用大模型驱动的可以自主使用爬虫工具的智能体)处理所有页面。
|
||||
|
||||
不过我们也为客户保留自定义处理的灵活性。您可以在这里添加并注册针对特定域名的自定义爬虫。
|
||||
|
||||
请遵照如下规范:
|
||||
|
||||
1、爬虫应该是一个函数(而不是类);
|
||||
|
||||
2、入参只接受两个:**url**(要处理的 url,请只提供一个 url,而不是列表,因为主函数会处理队列逻辑) 和 **logger** 对象(这意味着不要为你的自定义爬虫添加日志对象,wiseflow 会统一管理);
|
||||
|
||||
3、出参必须为两个,一个是解析后的文章详情 article,类型为 dict,另一个是从页面解析出的外链字典 link_dict,类型也是 dict。
|
||||
|
||||
article 的字典格式如下(注意,'content' 是必须的,其他也可以没有,另外额外的键值信息会被忽略):
|
||||
|
||||
`{'url': ..., 'title': ..., 'author': ..., 'publish_date': ..., 'screenshot': ..., 'content': ...(not empty)}`
|
||||
|
||||
上述值的类型都要求为 **str**, 日期格式为 **YYYY-MM-DD**,screenshot 为**文件路径**,可以是相对于 core目录的相对路径也可以是绝对路径,文件类型为 **png**。
|
||||
|
||||
**注意:**
|
||||
- 'content' 要有且不为空,不然无法触发后续的提取,文章也会被舍弃。这是唯一要求不为空的项;
|
||||
- 'author' 和 'publish_date' 尽量有,不然 wiseflow 会自动用域名对应 demain 和 当日日期代替。
|
||||
|
||||
link_dict 的格式如下:
|
||||
`{'text': 外链对应的文字信息, 'url': 外链对应的 url}`
|
||||
|
||||
wiseflow 会以这个为输入,使用 llm 判断值得继续爬取的链接。
|
||||
|
||||
4、在 core/custom_crawlers/__init__.py 中注册,参考:
|
||||
|
||||
```pyhton
|
||||
from .mp import mp_crawler
|
||||
|
||||
customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}
|
||||
```
|
||||
|
||||
注意键使用域名,可以使用 urllib.parse获取:
|
||||
|
||||
```pyhton
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed_url = urlparse("site's url")
|
||||
domain = parsed_url.netloc
|
||||
```
|
45
core/custom_crawlers/README_EN.md
Normal file
45
core/custom_crawlers/README_EN.md
Normal file
@ -0,0 +1,45 @@
|
||||
wiseflow is dedicated to processing all pages through a set of general workflows (llm-agent that can autonomously use crawling tools).
|
||||
|
||||
However, we also reserve the flexibility for customers to handle custom processing. You can add and register custom crawlers for specific domains here.
|
||||
|
||||
Please follow the guidelines below:
|
||||
|
||||
1. The crawler should be a function (not a class);
|
||||
|
||||
2. It only accepts two parameters: **url** (the URL to process, please provide only one URL, not a list, as the main function will handle queue logic) and **logger** object (which means you should not add a logging object to your custom crawler, wiseflow will manage it uniformly);
|
||||
|
||||
3. The output must be two items: one is the parsed article details `article`, which is a dict object, and the other is a dictionary of external links `link_dict`, which is also a dict object.
|
||||
|
||||
The format for `article` is as follows (note that 'content' is required and cannot be empty, other fields can be omitted, and any additional key-value information will be ignored):
|
||||
|
||||
`{'url': ..., 'title': ..., 'author': ..., 'publish_date': ..., 'screenshot': ..., 'content': ...(not empty)}`
|
||||
|
||||
All values in the above dictionary should be of type **str**, the date format should be **YYYY-MM-DD**, and the screenshot should be a **file path**, which can be a relative path to the core directory or an absolute path, with the file type being **png**.
|
||||
|
||||
**Note:**
|
||||
- 'content' must exist and not be empty, otherwise it will not trigger subsequent extraction, and the article will be discarded. This is the only required non-empty item;
|
||||
- 'author' and 'publish_date' are strongly recommended, otherwise wiseflow will automatically use the domain name corresponding to the domain and the current date as substitutes.
|
||||
|
||||
The format for `link_dict` is as follows:
|
||||
|
||||
`{'text': text information corresponding to the external link, 'url': URL corresponding to the external link}`
|
||||
|
||||
wiseflow will use this as input and use LLM to determine which links are worth further crawling.
|
||||
|
||||
4. Register in `core/custom_crawlers/__init__.py`, refer to:
|
||||
|
||||
|
||||
```pyhton
|
||||
from .mp import mp_crawler
|
||||
|
||||
customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}
|
||||
```
|
||||
|
||||
Note that the key should be the domain name, which can be obtained using `urllib.parse`:
|
||||
|
||||
```pyhton
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed_url = urlparse("site's url")
|
||||
domain = parsed_url.netloc
|
||||
```
|
@ -1,4 +1,4 @@
|
||||
from .mp_process import mp_crawler
|
||||
from .mp import mp_crawler
|
||||
|
||||
customer_crawler_map = {}
|
||||
# customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}
|
@ -1,10 +1,9 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# warining: the mp_crawler will be deprecated in future version, we try to use general_process handle mp articles
|
||||
|
||||
from core.agents import get_info
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import datetime, date
|
||||
from datetime import datetime
|
||||
import re
|
||||
import asyncio
|
||||
|
||||
@ -13,10 +12,10 @@ header = {
|
||||
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}
|
||||
|
||||
|
||||
async def mp_crawler(url: str, logger) -> tuple[dict, list, set]:
|
||||
async def mp_crawler(url: str, logger) -> tuple[dict, dict]:
|
||||
if not url.startswith('https://mp.weixin.qq.com') and not url.startswith('http://mp.weixin.qq.com'):
|
||||
logger.warning(f'{url} is not a mp url, you should not use this function')
|
||||
return {}, [], set()
|
||||
return {}, {}
|
||||
|
||||
url = url.replace("http://", "https://", 1)
|
||||
|
||||
@ -32,30 +31,31 @@ async def mp_crawler(url: str, logger) -> tuple[dict, list, set]:
|
||||
await asyncio.sleep(60)
|
||||
else:
|
||||
logger.warning(e)
|
||||
return {}, [], set()
|
||||
return {}, {}
|
||||
|
||||
soup = BeautifulSoup(response.text, 'html.parser')
|
||||
|
||||
if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
|
||||
# 文章目录
|
||||
urls = {li.attrs['data-link'].replace("http://", "https://", 1) for li in soup.find_all('li', class_='album__list-item')}
|
||||
simple_urls = set()
|
||||
for url in urls:
|
||||
cut_off_point = url.find('chksm=')
|
||||
links = soup.find_all('li', class_='album__list-item')
|
||||
link_dict = {}
|
||||
for li in links:
|
||||
u = li.attrs['data-link'].replace("http://", "https://", 1)
|
||||
t = li.text.strip()
|
||||
cut_off_point = u.find('chksm=')
|
||||
if cut_off_point != -1:
|
||||
url = url[:cut_off_point - 1]
|
||||
simple_urls.add(url)
|
||||
return {}, [], simple_urls
|
||||
u = u[:cut_off_point - 1]
|
||||
if t and u:
|
||||
link_dict[t] = u
|
||||
return {}, link_dict
|
||||
|
||||
# Get the original release date first
|
||||
pattern = r"var createTime = '(\d{4}-\d{2}-\d{2}) \d{2}:\d{2}'"
|
||||
match = re.search(pattern, response.text)
|
||||
|
||||
if match:
|
||||
date_only = match.group(1)
|
||||
publish_time = datetime.strptime(date_only, "%Y-%m-%d")
|
||||
publish_time = match.group(1)
|
||||
else:
|
||||
publish_time = date.today()
|
||||
publish_time = datetime.strftime(datetime.today(), "%Y-%m-%d")
|
||||
|
||||
# Get description content from < meta > tag
|
||||
try:
|
||||
@ -70,14 +70,13 @@ async def mp_crawler(url: str, logger) -> tuple[dict, list, set]:
|
||||
except Exception as e:
|
||||
logger.warning(f"not mp format: {url}\n{e}")
|
||||
# For mp.weixin.qq.com types, mp_crawler won't work, and most likely neither will the other two
|
||||
return {}, [], set()
|
||||
return {}, {}
|
||||
|
||||
if not rich_media_title or not profile_nickname:
|
||||
logger.warning(f"failed to analysis {url}, no title or profile_nickname")
|
||||
return {}, [], set()
|
||||
return {}, {}
|
||||
|
||||
# Parse text and image links within the content interval
|
||||
# Todo This scheme is compatible with picture sharing MP articles, but the pictures of the content cannot be obtained,
|
||||
# because the structure of this part is completely different, and a separate analysis scheme needs to be written
|
||||
# (but the proportion of this type of article is not high).
|
||||
texts = []
|
||||
@ -92,18 +91,18 @@ async def mp_crawler(url: str, logger) -> tuple[dict, list, set]:
|
||||
content = '\n'.join(cleaned_texts)
|
||||
else:
|
||||
logger.warning(f"failed to analysis contents {url}")
|
||||
return {}, [], set()
|
||||
return {}, {}
|
||||
if content:
|
||||
content = f"[from {profile_nickname}]{content}"
|
||||
else:
|
||||
# If the content does not have it, but the summary has it, it means that it is an mp of the picture sharing type.
|
||||
# If the content does not have it, but the summary has it, it means that it is a mp of the picture sharing type.
|
||||
# At this time, you can use the summary as the content.
|
||||
content = f"[from {profile_nickname}]{summary}"
|
||||
|
||||
infos = get_info(content, logger)
|
||||
article = {'url': url,
|
||||
'title': rich_media_title,
|
||||
'author': profile_nickname,
|
||||
'publish_date': publish_time}
|
||||
'publish_date': publish_time,
|
||||
'content': content}
|
||||
|
||||
return article, infos, set()
|
||||
return article, {}
|
@ -1,30 +0,0 @@
|
||||
wiseflow 致力于通过一套通用流程(使用视觉大模型驱动的可以自主使用爬虫工具的智能体)处理所有页面。
|
||||
|
||||
不过我们也为客户保留自定义处理的灵活性。
|
||||
|
||||
为添加自定义处理逻辑单元请遵照如下规范:
|
||||
|
||||
1、逻辑处理单元应该是一个函数(而不是类);
|
||||
|
||||
2、入参只接受两个:url(要处理的 url,请只提供一个 url,而不是列表,因为主函数会处理队列逻辑) 和 logger 对象(这意味着不要为你的自定义处理单元添加日志对象);
|
||||
|
||||
3、出参需要返回解析后的文章详情(dict),和信息列表(list),以及解析出来的需要添加到工作队列的 url 结合(元组)。出参必须同时返回这三个结果,没有的话,可以分别传出 {} [] set()
|
||||
|
||||
article 的字典必须包括 'url'(str), 'title'(str), 'author'(str), 'publish_date'(date 日期对象,注意是日期)四个键值,额外可以添加一个 'sceenshort' 值是一个 png 文件的路径。
|
||||
|
||||
4、在 core/custom_crawlers/__init__.py 中注册,参考:
|
||||
|
||||
```pyhton
|
||||
from .mp_crawler import mp_crawler
|
||||
|
||||
customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}
|
||||
```
|
||||
|
||||
注意键使用域名,可以使用 urllib.parse获取:
|
||||
|
||||
```pyhton
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed_url = urlparse("site's url")
|
||||
domain = parsed_url.netloc
|
||||
```
|
@ -1,26 +1,27 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from utils.pb_api import PbTalker
|
||||
from utils.general_utils import get_logger
|
||||
from utils.general_utils import get_logger, extract_and_convert_dates
|
||||
from agents.get_info import GeneralInfoExtractor
|
||||
from bs4 import BeautifulSoup
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
from custom_process import customer_crawler_map
|
||||
from custom_crawlers import customer_crawler_map
|
||||
from urllib.parse import urlparse, urljoin
|
||||
import hashlib
|
||||
from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
project_dir = os.environ.get("PROJECT_DIR", "")
|
||||
if project_dir:
|
||||
os.makedirs(project_dir, exist_ok=True)
|
||||
|
||||
os.environ['CRAWLEE_STORAGE_DIR'] = os.path.join(project_dir, 'crawlee_storage')
|
||||
screenshot_dir = os.path.join(project_dir, 'crawlee_storage', 'screenshots')
|
||||
|
||||
wiseflow_logger = get_logger('general_process', f'{project_dir}/general_process.log')
|
||||
pb = PbTalker(wiseflow_logger)
|
||||
ie = GeneralInfoExtractor(pb, wiseflow_logger)
|
||||
gie = GeneralInfoExtractor(pb, wiseflow_logger)
|
||||
|
||||
# Global variables
|
||||
working_list = set()
|
||||
@ -33,7 +34,8 @@ async def save_to_pb(article: dict, infos: list):
|
||||
article_id = pb.add(collection_name='articles', body=article)
|
||||
if not article_id:
|
||||
wiseflow_logger.error('add article failed, writing to cache_file')
|
||||
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
|
||||
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
with open(os.path.join(project_dir, f'{timestamp}_cache_article.json'), 'w', encoding='utf-8') as f:
|
||||
json.dump(article, f, ensure_ascii=False, indent=4)
|
||||
return
|
||||
if screenshot:
|
||||
@ -46,10 +48,11 @@ async def save_to_pb(article: dict, infos: list):
|
||||
|
||||
for info in infos:
|
||||
info['articles'] = [article_id]
|
||||
_ = pb.add(collection_name='agents', body=info)
|
||||
_ = pb.add(collection_name='infos', body=info)
|
||||
if not _:
|
||||
wiseflow_logger.error('add insight failed, writing to cache_file')
|
||||
with open(os.path.join(project_dir, 'cache_insights.json'), 'a', encoding='utf-8') as f:
|
||||
wiseflow_logger.error('add info failed, writing to cache_file')
|
||||
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
with open(os.path.join(project_dir, f'{timestamp}_cache_infos.json'), 'w', encoding='utf-8') as f:
|
||||
json.dump(info, f, ensure_ascii=False, indent=4)
|
||||
|
||||
|
||||
@ -75,7 +78,6 @@ async def pipeline(url: str):
|
||||
# future work: try to use a visual-llm do all the job...
|
||||
text = await context.page.inner_text('body')
|
||||
wiseflow_logger.debug(f"got text: {text}")
|
||||
|
||||
html = await context.page.inner_html('body')
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
links = soup.find_all('a', href=True)
|
||||
@ -83,34 +85,36 @@ async def pipeline(url: str):
|
||||
link_dict = {}
|
||||
for a in links:
|
||||
new_url = a.get('href')
|
||||
text = a.text.strip()
|
||||
if new_url and text:
|
||||
absolute_url = urljoin(base_url, new_url)
|
||||
link_dict[text] = absolute_url
|
||||
t = a.text.strip()
|
||||
if new_url and t:
|
||||
link_dict[t] = urljoin(base_url, new_url)
|
||||
wiseflow_logger.debug(f'found {len(link_dict)} more links')
|
||||
|
||||
publish_date = soup.find('div', class_='date').get_text(strip=True) if soup.find('div', class_='date') else None
|
||||
if publish_date:
|
||||
publish_date = extract_and_convert_dates(publish_date)
|
||||
author = soup.find('div', class_='author').get_text(strip=True) if soup.find('div', class_='author') else None
|
||||
if not author:
|
||||
author = soup.find('div', class_='source').get_text(strip=True) if soup.find('div',
|
||||
class_='source') else None
|
||||
|
||||
screenshot_file_name = f"{hashlib.sha256(context.request.url.encode()).hexdigest()}.png"
|
||||
await context.page.screenshot(path=os.path.join(screenshot_dir, screenshot_file_name), full_page=True)
|
||||
wiseflow_logger.debug(f'screenshot saved to {screenshot_file_name}')
|
||||
|
||||
# get infos by llm
|
||||
infos, author, publish_date, related_urls = await ie(text, link_dict, base_url, wiseflow_logger)
|
||||
infos, related_urls, author, publish_date = await gie(text, link_dict, base_url, author, publish_date)
|
||||
if infos:
|
||||
# get author and publish date by llm
|
||||
wiseflow_logger.debug(f'LLM result -- author: {author}, publish_date: {publish_date}')
|
||||
article = {
|
||||
'url': context.request.url,
|
||||
'title': await context.page.title(),
|
||||
'author': author,
|
||||
'publish_date': publish_date,
|
||||
'screenshot': os.path.join(screenshot_dir, screenshot_file_name),
|
||||
'tags': [info['name'] for info in infos]
|
||||
'tags': [info['tag'] for info in infos]
|
||||
}
|
||||
await save_to_pb(article, infos)
|
||||
|
||||
# find any related urls
|
||||
related_urls = await get_more_related_urls(html, wiseflow_logger)
|
||||
wiseflow_logger.debug(f'got {len(related_urls)} more urls')
|
||||
if related_urls:
|
||||
async with lock:
|
||||
new_urls = related_urls - existing_urls
|
||||
@ -127,22 +131,47 @@ async def pipeline(url: str):
|
||||
parsed_url = urlparse(url)
|
||||
domain = parsed_url.netloc
|
||||
if domain in customer_crawler_map:
|
||||
wiseflow_logger.debug(f'routed to customer process for {domain}')
|
||||
wiseflow_logger.info(f'routed to customer crawler for {domain}')
|
||||
try:
|
||||
article, infos, related_urls = await customer_crawler_map[domain](url, wiseflow_logger)
|
||||
article, more_urls = await customer_crawler_map[domain](url, wiseflow_logger)
|
||||
except Exception as e:
|
||||
wiseflow_logger.error(f'error occurred in crawling {url}: {e}')
|
||||
continue
|
||||
|
||||
if infos and article:
|
||||
wiseflow_logger.debug("receiving new infos from customer crawler, saving to pb")
|
||||
article['tags'] = [info['name'] for info in infos]
|
||||
if not article and not more_urls:
|
||||
wiseflow_logger.info(f'no content found in {url} by customer crawler')
|
||||
continue
|
||||
|
||||
text = article.pop('content') if 'content' in article else None
|
||||
author = article.get('author', None)
|
||||
publish_date = article.get('publish_date', None)
|
||||
title = article.get('title', "")
|
||||
screenshot = article.get('screenshot', '')
|
||||
|
||||
# get infos by llm
|
||||
try:
|
||||
infos, related_urls, author, publish_date = await gie(text, more_urls, url, author, publish_date)
|
||||
except Exception as e:
|
||||
wiseflow_logger.error(f'gie error occurred in processing {article}: {e}')
|
||||
continue
|
||||
|
||||
if infos:
|
||||
article = {
|
||||
'url': url,
|
||||
'title': title,
|
||||
'author': author,
|
||||
'publish_date': publish_date,
|
||||
'screenshot': screenshot,
|
||||
'tags': [info['tag'] for info in infos]
|
||||
}
|
||||
await save_to_pb(article, infos)
|
||||
|
||||
if related_urls:
|
||||
wiseflow_logger.debug('receiving new related_urls from customer crawler, adding to working_list')
|
||||
new_urls = related_urls - existing_urls
|
||||
working_list.update(new_urls)
|
||||
async with lock:
|
||||
new_urls = related_urls - existing_urls
|
||||
working_list.update(new_urls)
|
||||
continue
|
||||
|
||||
try:
|
||||
await crawler.run([url])
|
||||
except Exception as e:
|
||||
|
@ -1,12 +1,9 @@
|
||||
openai
|
||||
loguru
|
||||
gne
|
||||
jieba
|
||||
httpx
|
||||
pocketbase
|
||||
pydantic
|
||||
uvicorn
|
||||
json_repair==0.*
|
||||
beautifulsoup4
|
||||
fastapi
|
||||
requests
|
||||
requests
|
||||
crawlee[playwright]
|
@ -72,8 +72,8 @@ def extract_and_convert_dates(input_string):
|
||||
if matches:
|
||||
break
|
||||
if matches:
|
||||
return ''.join(matches[0])
|
||||
return ''
|
||||
return '-'.join(matches[0])
|
||||
return None
|
||||
|
||||
|
||||
def get_logger(logger_name: str, logger_file_path: str):
|
||||
|
@ -1,13 +0,0 @@
|
||||
from core.custom_process import crawler as general_crawler
|
||||
from pprint import pprint
|
||||
import asyncio
|
||||
|
||||
test_list = ['http://society.people.com.cn/n1/2024/1202/c1008-40373268.html']
|
||||
|
||||
async def crawler_test():
|
||||
for test in test_list:
|
||||
data = await general_crawler.run([test])
|
||||
print(type(data))
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(crawler_test())
|
Loading…
Reference in New Issue
Block a user