refactor(core): the new general crawler

This commit is contained in:
bigbrother666sh 2024-12-08 18:03:34 +08:00
parent 8c64749ba7
commit ec514b49dd
17 changed files with 200 additions and 358 deletions

View File

@ -1,20 +0,0 @@
.git
.vscode
.dockerignore
.gitignore
.env
config
build
web/dist
web/node_modules
docker-compose.yaml
Dockerfile
README.md
core/__pycache__
core/work_dir
env_sample
core/pb/pb_data
core/pb/CHANGELOG.md
core/pb/LICENSE.md
core/pb/pocketbase
work_dir

View File

@ -1,7 +1,7 @@
# V0.3.2
- 引入 Crawlee(playwrigt模块),大幅提升通用爬取能力,适配实际项目场景;
Introduce Crawlee (playwright module), significantly enhancing general crawling capabilities and adapting to practical project scenarios;
Introduce Crawlee (playwright module), significantly enhancing general crawling capabilities and adapting to real-world task;
- 完全重写了信息提取模块,引入“爬-查一体”策略,你关注的才是你想要的;

View File

@ -1,54 +0,0 @@
# For Developer Only
```bash
conda create -n wiseflow python=3.10
conda activate wiseflow
cd core
pip install -r requirements.txt
```
- tasks.py background task circle process
- backend.py main process pipeline service (based on fastapi)
### WiseFlow fastapi detail
- api address http://127.0.0.1:8077/feed
- request method : post
- body :
```python
{'user_id': str, 'type': str, 'content':str, 'addition': Optional[str]}
# Type is one of "text", "publicMsg", "site" and "url"
# user_id: str
type: Literal["text", "publicMsg", "file", "image", "video", "location", "chathistory", "site", "attachment", "url"]
content: str
addition: Optional[str] = None
```
see more (when backend started) http://127.0.0.1:8077/docs
### WiseFlow Repo File Structure
```
wiseflow
|- dockerfiles
|- ...
|- core
|- tasks.py
|- backend.py
|- insights
|- __init__.py # main process
|- get_info.py # module use llm to get a summary of information and match tags
|- llms # llm service wrapper
|- pb # pocketbase filefolder
|- scrapers
|- __init__.py # You can register a proprietary site scraper here
|- general_scraper.py # module to get all possible article urls for general site
|- general_crawler.py # module for general article sites
|- mp_crawler.py # module for mp article (weixin public account) sites
|- utils # tools
```
Although the general_scraper included in wiseflow can be applied to the parsing of most static pages, for actual business, we still recommend that customers to write their own crawlers aiming the actual info source.
See core/scrapers/README.md for integration instructions for proprietary crawlers

View File

@ -1,8 +1,8 @@
from core.llms.openai_wrapper import openai_llm as llm
from llms.openai_wrapper import openai_llm as llm
# from core.llms.siliconflow_wrapper import sfa_llm
from core.utils.general_utils import is_chinese, extract_and_convert_dates, extract_urls
from utils.general_utils import is_chinese, extract_and_convert_dates, extract_urls
from loguru import logger
from core.utils.pb_api import PbTalker
from utils.pb_api import PbTalker
import os
from datetime import datetime
from urllib.parse import urlparse
@ -30,8 +30,8 @@ class GeneralInfoExtractor:
self.focus_dict = {item["focuspoint"]: item["id"] for item in focus_data}
focus_statement = ''
for item in focus_data:
tag = item["name"]
expl = item["explaination"]
tag = item["focuspoint"]
expl = item["explanation"]
focus_statement = f"{focus_statement}#{tag}\n"
if expl:
focus_statement = f"{focus_statement}解释:{expl}\n"
@ -56,7 +56,7 @@ class GeneralInfoExtractor:
如果网页文本中不包含任何与兴趣点相关的信息请仅输出[]'''
self.get_more_link_prompt = f"作为一位高效的信息筛选助手你的任务是根据给定的兴趣点从给定的文本及其对应的URL中挑选出最值得关注的URL。兴趣点及其解释如下\n\n{focus_statement}"
self.get_more_link_suffix = "请逐条分析上述 文本url 对。首先输出你的分析依据,然后给出是否挑选它的结论,如果决定挑选该条,在结论后复制输出该条的 url否则的话直接进入下一条的分析。请一条一条的分析不要漏掉任何一条。"
self.get_more_link_suffix = "请逐条分析:对于每一条,首先复制文本,然后给出分析依据,最后给出结论。如果决定挑选该条,在结论后复制对应的url否则的话直接进入下一条的分析。请一条一条的分析不要漏掉任何一条。"
else:
self.get_info_prompt = f'''As an information extraction assistant, your task is to extract content related to the following user focus points from the given web page text. The list of focus points and their explanations is as follows:
@ -76,7 +76,7 @@ Example:
If the webpage text does not contain any information related to points of interest, please output only: []'''
self.get_more_link_prompt = f"As an efficient information filtering assistant, your task is to select the most noteworthy URLs from a set of texts and their corresponding URLs based on the given focus points. The focus points and their explanations are as follows:\n\n{focus_statement}"
self.get_more_link_suffix = "Please analyze the above text: URL pairs. First, output your analysis basis, and then give the conclusion on whether to select it. If you decide to select this item, then copy and output the URL of this item following the conclusion; otherwise, proceed directly to the analysis of the next item. Analyze one by one, do not miss any one."
self.get_more_link_suffix = "Please analyze each item one by one: For each item, first copy the text, then provide the analysis basis, and finally give the conclusion. If the decision is to select the item, copy the corresponding URL after the conclusion; otherwise, proceed directly to the analysis of the next item. Analyze each item one by one, without missing any."
async def get_author_and_publish_date(self, text: str) -> tuple[str, str]:
if not text:
@ -218,9 +218,11 @@ If the webpage text does not contain any information related to points of intere
for line in lines:
text = f'{text}{line}'
if len(text) > 2048:
infos.extend(await self.get_info(text, info_prefix, link_dict))
cache = await self.get_info(text, info_prefix, link_dict)
infos.extend(cache)
text = ''
if text:
infos.extend(await self.get_info(text, info_prefix, link_dict))
cache = await self.get_info(text, info_prefix, link_dict)
infos.extend(cache)
return infos, related_urls, author, publish_date

View File

@ -1,45 +0,0 @@
wiseflow is dedicated to processing all pages through a set of general workflows (llm-agent that can autonomously use crawling tools).
However, we also reserve the flexibility for customers to handle custom processing. You can add and register custom crawlers for specific domains here.
Please follow the guidelines below:
1. The crawler should be a function (not a class);
2. It only accepts two parameters: **url** (the URL to process, please provide only one URL, not a list, as the main function will handle queue logic) and **logger** object (which means you should not add a logging object to your custom crawler, wiseflow will manage it uniformly);
3. The output must be two items: one is the parsed article details `article`, which is a dict object, and the other is a dictionary of external links `link_dict`, which is also a dict object.
The format for `article` is as follows (note that 'content' is required and cannot be empty, other fields can be omitted, and any additional key-value information will be ignored):
`{'url': ..., 'title': ..., 'author': ..., 'publish_date': ..., 'screenshot': ..., 'content': ...(not empty)}`
All values in the above dictionary should be of type **str**, the date format should be **YYYY-MM-DD**, and the screenshot should be a **file path**, which can be a relative path to the core directory or an absolute path, with the file type being **png**.
**Note:**
- 'content' must exist and not be empty, otherwise it will not trigger subsequent extraction, and the article will be discarded. This is the only required non-empty item;
- 'author' and 'publish_date' are strongly recommended, otherwise wiseflow will automatically use the domain name corresponding to the domain and the current date as substitutes.
The format for `link_dict` is as follows:
`{'text': text information corresponding to the external link, 'url': URL corresponding to the external link}`
wiseflow will use this as input and use LLM to determine which links are worth further crawling.
4. Register in `core/custom_crawlers/__init__.py`, refer to:
```pyhton
from .mp import mp_crawler
customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}
```
Note that the key should be the domain name, which can be obtained using `urllib.parse`:
```pyhton
from urllib.parse import urlparse
parsed_url = urlparse("site's url")
domain = parsed_url.netloc
```

View File

@ -1,4 +0,0 @@
from .mp import mp_crawler
customer_crawler_map = {}
# customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}

View File

@ -1,108 +0,0 @@
# -*- coding: utf-8 -*-
# warining: the mp_crawler will be deprecated in future version, we try to use general_process handle mp articles
import httpx
from bs4 import BeautifulSoup
from datetime import datetime
import re
import asyncio
header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}
async def mp_crawler(url: str, logger) -> tuple[dict, dict]:
if not url.startswith('https://mp.weixin.qq.com') and not url.startswith('http://mp.weixin.qq.com'):
logger.warning(f'{url} is not a mp url, you should not use this function')
return {}, {}
url = url.replace("http://", "https://", 1)
async with httpx.AsyncClient() as client:
for retry in range(2):
try:
response = await client.get(url, headers=header, timeout=30)
response.raise_for_status()
break
except Exception as e:
if retry < 1:
logger.info(f"{e}\nwaiting 1min")
await asyncio.sleep(60)
else:
logger.warning(e)
return {}, {}
soup = BeautifulSoup(response.text, 'html.parser')
if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
# 文章目录
links = soup.find_all('li', class_='album__list-item')
link_dict = {}
for li in links:
u = li.attrs['data-link'].replace("http://", "https://", 1)
t = li.text.strip()
cut_off_point = u.find('chksm=')
if cut_off_point != -1:
u = u[:cut_off_point - 1]
if t and u:
link_dict[t] = u
return {}, link_dict
# Get the original release date first
pattern = r"var createTime = '(\d{4}-\d{2}-\d{2}) \d{2}:\d{2}'"
match = re.search(pattern, response.text)
if match:
publish_time = match.group(1)
else:
publish_time = datetime.strftime(datetime.today(), "%Y-%m-%d")
# Get description content from < meta > tag
try:
meta_description = soup.find('meta', attrs={'name': 'description'})
summary = meta_description['content'].strip() if meta_description else ''
# card_info = soup.find('div', id='img-content')
# Parse the required content from the < div > tag
rich_media_title = soup.find('h1', id='activity-name').text.strip() \
if soup.find('h1', id='activity-name') \
else soup.find('h1', class_='rich_media_title').text.strip()
profile_nickname = soup.find('div', class_='wx_follow_nickname').text.strip()
except Exception as e:
logger.warning(f"not mp format: {url}\n{e}")
# For mp.weixin.qq.com types, mp_crawler won't work, and most likely neither will the other two
return {}, {}
if not rich_media_title or not profile_nickname:
logger.warning(f"failed to analysis {url}, no title or profile_nickname")
return {}, {}
# Parse text and image links within the content interval
# because the structure of this part is completely different, and a separate analysis scheme needs to be written
# (but the proportion of this type of article is not high).
texts = []
content_area = soup.find('div', id='js_content')
if content_area:
# 提取文本
for section in content_area.find_all(['section', 'p'], recursive=False): # 遍历顶级section
text = section.get_text(separator=' ', strip=True)
if text and text not in texts:
texts.append(text)
cleaned_texts = [t for t in texts if t.strip()]
content = '\n'.join(cleaned_texts)
else:
logger.warning(f"failed to analysis contents {url}")
return {}, {}
if content:
content = f"[from {profile_nickname}]{content}"
else:
# If the content does not have it, but the summary has it, it means that it is a mp of the picture sharing type.
# At this time, you can use the summary as the content.
content = f"[from {profile_nickname}]{summary}"
article = {'url': url,
'title': rich_media_title,
'author': profile_nickname,
'publish_date': publish_time,
'content': content}
return article, {}

View File

@ -0,0 +1,3 @@
from .mp import mp_scraper
custom_scraper_map = {'mp.weixin.qq.com': mp_scraper}

90
core/custom_scraper/mp.py Normal file
View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
from bs4 import BeautifulSoup
from datetime import datetime
import os, re
from utils.general_utils import get_logger
project_dir = os.environ.get("PROJECT_DIR", "")
if project_dir:
os.makedirs(project_dir, exist_ok=True)
mp_logger = get_logger('mp_scraper', project_dir)
async def mp_scraper(html: str, url: str) -> tuple[dict, set, list]:
if not url.startswith('https://mp.weixin.qq.com') and not url.startswith('http://mp.weixin.qq.com'):
mp_logger.warning(f'{url} is not a mp url, you should not use this function')
return {}, set(), []
url = url.replace("http://", "https://", 1)
soup = BeautifulSoup(html, 'html.parser')
if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
# 文章目录
urls = {li.attrs['data-link'].replace("http://", "https://", 1) for li in soup.find_all('li', class_='album__list-item')}
simple_urls = set()
for url in urls:
cut_off_point = url.find('chksm=')
if cut_off_point != -1:
url = url[:cut_off_point - 1]
simple_urls.add(url)
return {}, simple_urls, []
# Get the original release date first
pattern = r"var createTime = '(\d{4}-\d{2}-\d{2}) \d{2}:\d{2}'"
match = re.search(pattern, html)
if match:
publish_time = match.group(1)
else:
publish_time = datetime.strftime(datetime.today(), "%Y-%m-%d")
# Get description content from < meta > tag
try:
meta_description = soup.find('meta', attrs={'name': 'description'})
summary = meta_description['content'].strip() if meta_description else ''
# card_info = soup.find('div', id='img-content')
# Parse the required content from the < div > tag
rich_media_title = soup.find('h1', id='activity-name').text.strip() \
if soup.find('h1', id='activity-name') \
else soup.find('h1', class_='rich_media_title').text.strip()
profile_nickname = soup.find('div', class_='wx_follow_nickname').text.strip()
except Exception as e:
mp_logger.warning(f"not mp format: {url}\n{e}")
# For mp.weixin.qq.com types, mp_crawler won't work, and most likely neither will the other two
return {}, set(), []
if not rich_media_title or not profile_nickname:
mp_logger.warning(f"failed to analysis {url}, no title or profile_nickname")
return {}, set(), []
# Parse text and image links within the content interval
# because the structure of this part is completely different, and a separate analysis scheme needs to be written
# (but the proportion of this type of article is not high).
texts = []
content_area = soup.find('div', id='js_content')
if content_area:
# 提取文本
for section in content_area.find_all(['section', 'p'], recursive=False): # 遍历顶级section
text = section.get_text(separator=' ', strip=True)
if text and text not in texts:
texts.append(text)
cleaned_texts = [t for t in texts if t.strip()]
content = '\n'.join(cleaned_texts)
else:
mp_logger.warning(f"failed to analysis contents {url}")
return {}, set(), []
if content:
content = f"[from {profile_nickname}]{content}"
else:
# If the content does not have it, but the summary has it, it means that it is a mp of the picture sharing type.
# At this time, you can use the summary as the content.
content = f"[from {profile_nickname}]{summary}"
article = {'url': url,
'title': rich_media_title,
'author': profile_nickname,
'publish_date': publish_time,
'content': content}
return article, set(), []

View File

@ -6,7 +6,7 @@ from bs4 import BeautifulSoup
import os
import json
import asyncio
from custom_crawlers import customer_crawler_map
from custom_scraper import custom_scraper_map
from urllib.parse import urlparse, urljoin
import hashlib
from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext
@ -19,7 +19,7 @@ if project_dir:
os.environ['CRAWLEE_STORAGE_DIR'] = os.path.join(project_dir, 'crawlee_storage')
screenshot_dir = os.path.join(project_dir, 'crawlee_storage', 'screenshots')
wiseflow_logger = get_logger('general_process', f'{project_dir}/general_process.log')
wiseflow_logger = get_logger('general_process', project_dir)
pb = PbTalker(wiseflow_logger)
gie = GeneralInfoExtractor(pb, wiseflow_logger)
@ -52,31 +52,64 @@ async def save_to_pb(article: dict, infos: list):
json.dump(info, f, ensure_ascii=False, indent=4)
async def pipeline(url: str):
working_list = set()
existing_urls = {url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']}
lock = asyncio.Lock()
working_list.add(url)
crawler = PlaywrightCrawler(
# Limit the crawl to max requests. Remove or increase it for crawling all links.
max_requests_per_crawl=100,
)
# Define the default request handler, which will be called for every request.
@crawler.router.default_handler
async def request_handler(context: PlaywrightCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')
# Handle dialogs (alerts, confirms, prompts)
async def handle_dialog(dialog):
context.log.info(f'Closing dialog: {dialog.message}')
await dialog.accept()
crawler = PlaywrightCrawler(
# Limit the crawl to max requests. Remove or increase it for crawling all links.
# max_requests_per_crawl=1,
headless=False if os.environ.get("VERBOSE", "").lower() in ["true", "1"] else True
)
@crawler.router.default_handler
async def request_handler(context: PlaywrightCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')
# Handle dialogs (alerts, confirms, prompts)
async def handle_dialog(dialog):
context.log.info(f'Closing dialog: {dialog.message}')
await dialog.accept()
context.page.on('dialog', handle_dialog)
context.page.on('dialog', handle_dialog)
await context.page.wait_for_load_state('networkidle')
html = await context.page.inner_html('body')
context.log.info('successfully finish fetching')
parsed_url = urlparse(context.request.url)
domain = parsed_url.netloc
if domain in custom_scraper_map:
context.log.info(f'routed to customer scraper for {domain}')
try:
article, more_urls, infos = await custom_scraper_map[domain](html, context.request.url)
except Exception as e:
context.log.error(f'error occurred: {e}')
wiseflow_logger.warning(f'handle {parsed_url} failed by customer scraper, this url will be skipped')
return
if not article and not infos and not more_urls:
wiseflow_logger.warning(f'{parsed_url} handled by customer scraper, bot got nothing')
return
title = article.get('title', "")
link_dict = more_urls if isinstance(more_urls, dict) else None
related_urls = more_urls if isinstance(more_urls, set) else set()
if not infos and not related_urls:
text = article.pop('content') if 'content' in article else None
if not text:
wiseflow_logger.warning(f'no content found in {parsed_url} by customer scraper, cannot use llm GIE')
author, publish_date = '', ''
else:
author = article.get('author', '')
publish_date = article.get('publish_date', '')
# get infos by llm
try:
infos, related_urls, author, publish_date = await gie(text, link_dict, context.request.url, author, publish_date)
except Exception as e:
wiseflow_logger.error(f'gie error occurred in processing: {e}')
infos = []
author, publish_date = '', ''
else:
author = article.get('author', '')
publish_date = article.get('publish_date', '')
else:
# Extract data from the page.
# future work: try to use a visual-llm do all the job...
text = await context.page.inner_text('body')
wiseflow_logger.debug(f"got text: {text}")
html = await context.page.inner_html('body')
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all('a', href=True)
base_url = context.request.url
@ -86,100 +119,40 @@ async def pipeline(url: str):
t = a.text.strip()
if new_url and t:
link_dict[t] = urljoin(base_url, new_url)
wiseflow_logger.debug(f'found {len(link_dict)} more links')
publish_date = soup.find('div', class_='date').get_text(strip=True) if soup.find('div', class_='date') else None
if publish_date:
publish_date = extract_and_convert_dates(publish_date)
author = soup.find('div', class_='author').get_text(strip=True) if soup.find('div', class_='author') else None
if not author:
author = soup.find('div', class_='source').get_text(strip=True) if soup.find('div',
class_='source') else None
screenshot_file_name = f"{hashlib.sha256(context.request.url.encode()).hexdigest()}.png"
await context.page.screenshot(path=os.path.join(screenshot_dir, screenshot_file_name), full_page=True)
wiseflow_logger.debug(f'screenshot saved to {screenshot_file_name}')
author = soup.find('div', class_='source').get_text(strip=True) if soup.find('div', class_='source') else None
# get infos by llm
infos, related_urls, author, publish_date = await gie(text, link_dict, base_url, author, publish_date)
if infos:
article = {
'url': context.request.url,
'title': await context.page.title(),
'author': author,
'publish_date': publish_date,
'screenshot': os.path.join(screenshot_dir, screenshot_file_name),
'tags': [info['tag'] for info in infos]
}
await save_to_pb(article, infos)
title = await context.page.title()
if related_urls:
async with lock:
new_urls = related_urls - existing_urls
working_list.update(new_urls)
screenshot_file_name = f"{hashlib.sha256(context.request.url.encode()).hexdigest()}.png"
await context.page.screenshot(path=os.path.join(screenshot_dir, screenshot_file_name), full_page=True)
wiseflow_logger.debug(f'screenshot saved to {screenshot_file_name}')
# todo: use llm to determine next action
if infos:
article = {
'url': context.request.url,
'title': title,
'author': author,
'publish_date': publish_date,
'screenshot': os.path.join(screenshot_dir, screenshot_file_name),
'tags': [info['tag'] for info in infos]
}
await save_to_pb(article, infos)
while working_list:
async with lock:
if not working_list:
break
url = working_list.pop()
existing_urls.add(url)
parsed_url = urlparse(url)
domain = parsed_url.netloc
if domain in customer_crawler_map:
wiseflow_logger.info(f'routed to customer crawler for {domain}')
try:
article, more_urls = await customer_crawler_map[domain](url, wiseflow_logger)
except Exception as e:
wiseflow_logger.error(f'error occurred in crawling {url}: {e}')
continue
if not article and not more_urls:
wiseflow_logger.info(f'no content found in {url} by customer crawler')
continue
text = article.pop('content') if 'content' in article else None
author = article.get('author', None)
publish_date = article.get('publish_date', None)
title = article.get('title', "")
screenshot = article.get('screenshot', '')
# get infos by llm
try:
infos, related_urls, author, publish_date = await gie(text, more_urls, url, author, publish_date)
except Exception as e:
wiseflow_logger.error(f'gie error occurred in processing {article}: {e}')
continue
if infos:
article = {
'url': url,
'title': title,
'author': author,
'publish_date': publish_date,
'screenshot': screenshot,
'tags': [info['tag'] for info in infos]
}
await save_to_pb(article, infos)
if related_urls:
async with lock:
new_urls = related_urls - existing_urls
working_list.update(new_urls)
continue
try:
await crawler.run([url])
except Exception as e:
wiseflow_logger.error(f'error occurred in crawling {url}: {e}')
if related_urls:
await context.add_requests(list(related_urls))
# todo: use llm to determine next action
if __name__ == '__main__':
sites = pb.read('sites', filter='activated=True')
wiseflow_logger.info('execute all sites one time')
async def run_all_sites():
await asyncio.gather(*[pipeline(site['url'].rstrip('/')) for site in sites])
await crawler.run([site['url'].rstrip('/') for site in sites])
asyncio.run(run_all_sites())

View File

@ -1,6 +1,5 @@
openai
loguru
httpx
pocketbase
pydantic
json_repair==0.*

18
core/run.sh Executable file
View File

@ -0,0 +1,18 @@
#!/bin/bash
set -o allexport
source .env
set +o allexport
if ! pgrep -x "pocketbase" > /dev/null; then
if ! netstat -tuln | grep ":8090" > /dev/null && ! lsof -i :8090 > /dev/null; then
echo "Starting PocketBase..."
pb/pocketbase serve --http=0.0.0.0:8090 &
else
echo "Port 8090 is already in use."
fi
else
echo "PocketBase is already running."
fi
python general_process.py

View File

@ -1,9 +1,8 @@
#!/bin/bash
# 从 .env 文件中加载环境变量
if [ -f .env ]; then
export $(grep -v '^#' .env | xargs)
fi
set -o allexport
source .env
set +o allexport
# 启动 PocketBase
pb/pocketbase serve --http=0.0.0.0:8090 &

View File

@ -1,5 +0,0 @@
#!/bin/bash
set -o allexport
source .env
set +o allexport
exec uvicorn backend:app --reload --host localhost --port 8077

View File

@ -1 +0,0 @@
pb/pocketbase serve

View File

@ -1,5 +0,0 @@
#!/bin/bash
set -o allexport
source .env
set +o allexport
exec python tasks.py