modify scrapers

This commit is contained in:
bigbrother666sh 2025-01-12 16:22:37 +08:00
parent d99cb4fef9
commit 26bf9a573a
15 changed files with 580 additions and 27 deletions

View File

@ -1,19 +0,0 @@
## 自定义解析器的注册
`core/scrapers/__init__.py` 中注册,参考:
```python
from .mp import mp_scarper
customer_scrapers = {'mp.weixin.qq.com': mp_scarper}
```
注意键使用域名,可以使用 `urllib.parse` 获取:
```python
from urllib.parse import urlparse
parsed_url = urlparse("site's url")
domain = parsed_url.netloc
```

View File

@ -1,5 +0,0 @@
# from .xxx import xx_scraper
# from .xxx import xx_config
custom_scrapers = {}
crawl4ai_custom_configs = {}

View File

@ -5,4 +5,4 @@ pydantic
#json_repair==0.*
beautifulsoup4
requests
crawl4ai==0.4.245
crawl4ai==0.4.247

76
core/scrapers/README.md Normal file
View File

@ -0,0 +1,76 @@
## 配置自定义 Crawl4ai 抓取 config
如果信源需要对应特殊的抓取配置,可以在 `core/scrapers/__init__.py` 中编辑对应的 crawler_config并在 `custom_fetching_configs` 中注册。
## 自定义解析器
### 解析器
对于从网页内容中提取关注信息这一任务而言,直接把 html 编码送给 llm 并不是一个好主意。在该类型任务中,我们期待 llm 表现的类似人类,侧重点在于内容的理解,而不是 html 的解析。且不说直接送入 html 编码还会造成额外(非常大量)的 token 消耗和处理效率的降低。
将 html 转为易于意思理解的 markdown 是目前领域内比较通用的做法,这方面 Crawl4ai 提供了比较成熟的解决方案。
然而这指的是一般情况天下没有万能的招数对于某些特定的信源Crawl4ai 的默认解析器并不能很好的工作,比如微信公众号文章,这个时候我们就需要为信源自定义解析器。
简单的说,解析器的作用就是将 html 编码转为 markdown 文本,并在这个过程中尽量过滤不必要信息(因为后一步是通过 llm 进行提炼,所以这一步要求不高),但也尽可能的保留 html 版面布局信息(这很重要)。
### deep_scraper
我们进一步发现,直接将 markdown 全文送入 llm 解析也存在缺陷。
我在这里仅举一个例子:
*很多网站喜欢在文章页面底部或者侧边栏加入推荐阅读板块,如果说这些推荐阅读只是链接列表还好,但事实上,很多时候他们还包括内容简介,这些简介的长度并不短,甚至有可能跟页面主体正文长度相当。这个时候如果我们将 markdown 整体扔给 llm就会发现很难为llm 指定明确的工作策略——如果直接舍弃这些推荐阅读内容(先不说很难指定清晰的舍弃策略),但我们不能保证这里面不包含关注点内容;而如果保留这些内容,那么很可能 llm 就无法聚焦该页面的核心内容。或者 llm 会从这些简介中进行信息提取,但是这些简介对应额外的链接,这些后续的链接也会在后面进行爬取,这就可能带来提取出大量重复信息的情况。*
事实上,这里我们需要做的工作是分块,这有点类似 RAG 系统中的 chunk ,但不同的是,这里我们不需要考虑 chunk 的粒度,而是需要考虑页面布局的粒度。因为我们面对的是 html 页面,而不是 pdf、word……
这一点很重要,我们需要按 html 的页面布局进行分块,而不是按语义逻辑分块!因为这影响了后续我们如何判断对不同的块采用合同提取策略。这也就是 wiseflow 为何不使用已有的文档智能工具,而是自写了 deep_scraper 的原因。
当然,另一个选择是直接使用视觉大模型进行 layout 的识别,但实践中我们也发现,这需要能够获取不受干扰的网页截图,但这个操作会极大增加系统复杂度以及降低处理速度,且效果并不稳定(比如对于页面弹窗的处理……)。
另一个不使用文档智能和视觉大模型的原因,是因为相比于 pdf、word 这种完全的非结构数据, html 编码本身就已经包含了全部 layout 信息,转化为 markdown 的过程实际上也保留了这些信息(通过\n # 这些符号),所以直接通过一定的规则对 markdown 进行分块并分别处理是可行的。
这就是 wiseflow deep_scraper 的主要功能归纳起来1、按布局信息对markdown进行分块2、分析每个块的类型并按不同策略进行预处理便于最终 llm 的提取。
### 注册自定义解析器
wiseflow 的默认工作流程是:
*crawl4ai 获取 html并初步转化为raw_markdown此过程应用默认的 config --> deep_scraper 进行分块处理 --> 分块后的内容 送入 llm 进行信息提取。*
如前所言,如果需要为特定信源配置特殊的 crawl4ai 获取策略(包括 raw_markdown 的转化策略),可以在 `core/scrapers/__init__.py` 中注册自定义的crawler_config
同时也可以为特定信源配置自定义的 scraper自定义 scraper 的输入为crawl4ai的fetching_result输出为将要被送入 llm 进行分析的链接字典和文本块列表。使用自定义 scraper 时wiseflow 的处理流程为:
*crawl4ai 获取 html并初步转化为raw_markdown此过程应用默认的 config或指定 config --> 自定义 scraper 进行分块处理 --> 分块后的内容 送入 llm 进行信息提取。*
自定义 scraper 可以内部调用deep_scraper作为后处理流程如mp_scraper也可以完全自定义全部流程。
scraper 输入的 fetch_result 为一个 dict格式如下
输出为 ScraperResultData包含 url、content、links、images 四个字段。
`core/scrapers/__init__.py` 中注册,参考:
```python
from .mp import mp_scarper
customer_scrapers = {'mp.weixin.qq.com': mp_scarper}
```
注意键使用域名,可以使用 `urllib.parse` 获取:
```python
from urllib.parse import urlparse
parsed_url = urlparse("site's url")
domain = parsed_url.netloc
```

View File

@ -0,0 +1,9 @@
from crawl4ai import CrawlerRunConfig
# from .xxx import xx_config
custom_scrapers = {}
custom_fetching_configs = {}
crawler_config = CrawlerRunConfig(delay_before_return_html=2.0, markdown_generator=md_generator,
wait_until='commit', magic=True, scan_full_page=True)

258
core/scrapers/mp_scraper.py Normal file
View File

@ -0,0 +1,258 @@
from bs4 import BeautifulSoup
import re
from .scraper_data import ScraperResultData
# 定义所有可能包含文本的块级和内联元素
text_elements = {
# 块级元素
'div', 'section', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
# 内联文本元素
'span', 'em', 'strong'
}
def mp_scraper(fetch_result: dict) -> ScraperResultData:
url = fetch_result['url']
raw_html = fetch_result['html']
cleaned_html = fetch_result['cleaned_html']
content = ''
links = {}
images = []
if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
# album page type
soup = BeautifulSoup(raw_html, 'html.parser')
for li in soup.find_all('li', class_='album__list-item'):
u_text = li.get_text(strip=True)
u_title = li.attrs['data-title'].strip()
_url = li.attrs['data-link'].replace("http://", "https://", 1)
if not _url or _url.startswith('javas') or _url.startswith('#') or _url.startswith('mailto:') or _url.startswith('data:') or _url.startswith('about:blank'):
continue
cut_off_point = _url.find('chksm=')
if cut_off_point != -1:
_url = _url[:cut_off_point - 1]
if u_title in u_text:
description = u_text
else:
description = f'{u_title}-{u_text}'
if _url and description:
if _url not in links:
links[_url] = description
else:
links[_url] = f'{links[_url]}|{description}'
return ScraperResultData(url=url, content=content, links=links, images=images)
def process_content(content_div):
# 3.1 处理所有 <img> 元素
for img in content_div.find_all('img', attrs={'data-src': True}, recursive=True):
src = img.get('data-src')
if not src or src.startswith('#') or src.startswith('about:blank'):
src = None
text = img.get('alt', '').strip()
if not src:
img.replace_with(text)
continue
images.append(src)
# find all area urls related to this img
area_urls = set()
if img.get('usemap'):
# remove the # at the beginning of the map name
map_name = img.get('usemap').lstrip('#')
# find the map tag
map_tag = content_div.find('map', {'name': map_name})
if map_tag:
# get all area tags under the map
for area in map_tag.find_all('area', href=True):
area_href = area.get('href')
area_urls.add(area_href)
area.decompose()
# delete the whole map tag
map_tag.decompose()
area_urls = ')[]('.join(area_urls)
replacement_text = f'![{text}]({src})[]({area_urls})' if area_urls else f'![{text}]({src})'
img.replace_with(replacement_text)
for media in content_div.find_all(['video', 'audio', 'source', 'embed', 'iframe', 'figure'], src=True, recursive=True):
src = media.get('src')
if not src or src.startswith('javascript:') or src.startswith('#') or src.startswith('mailto:') or src.startswith('data:') or src.startswith('about:blank'):
src = None
text = media.get_text().strip() or media.get('alt', '').strip()
if src:
media.replace_with(f"[{text}]({src})")
else:
media.decompose()
for obj in content_div.find_all('object', data=True, recursive=True):
data = obj.get('data')
if not data or data.startswith('javascript:') or data.startswith('#') or data.startswith('mailto:') or data.startswith('data:') or data.startswith('about:blank'):
data = None
text = obj.get_text().strip() or obj.get('alt', '').strip()
if data:
obj.replace_with(f"[{text}]({data})")
else:
obj.decompose()
# process links at last, so that we can keep the image and media info in the link
for a in content_div.find_all('a', href=True, recursive=True):
href = a.get('href')
if not href or href.startswith('javascript:') or href.startswith('#') or href.startswith('about:blank'):
href = None
text = a.get_text().strip()
if href:
a.replace_with(f"[{text}]({href})")
else:
a.decompose()
# handle lists
for list_tag in content_div.find_all(['ul', 'ol'], recursive=True):
list_text = []
for idx, item in enumerate(list_tag.find_all('li')):
list_text.append(f"{idx + 1}. {item.get_text().strip()}")
list_text = '\t'.join(list_text)
list_tag.replace_with(f"{list_text}\n")
# handle strikethrough text
for del_tag in content_div.find_all(['del', 's'], recursive=True):
del_text = del_tag.get_text().strip()
if del_text:
del_tag.replace_with(f"{del_text}(maybe_outdated)")
else:
del_tag.decompose()
# handle tables
for table in content_div.find_all('table', recursive=True):
table_text = []
# handle caption
caption = table.find('caption')
if caption:
table_text.append(caption.get_text().strip())
# get headers
headers = []
for th in table.find_all('th'):
headers.append(th.get_text().strip())
# handle all rows (including tbody and tfoot)
for row in table.find_all('tr'):
# get the first cell value
# try to find th as first_val
first_cell = row.find(['th', 'td'])
if not first_cell:
continue
first_val = first_cell.get_text().strip()
cells = row.find_all('td')
if not cells:
continue
# handle remaining cells
for idx, cell in enumerate(cells):
cell_text = cell.get_text().strip()
if not cell_text or cell_text == first_val:
continue
header_text = headers[idx] if idx < len(headers) else ''
cell_str = f"{first_val}-{header_text}-{cell_text}"
table_text.append(cell_str)
# replace the table with the processed text
table_text = '\n'.join(table_text)
table.replace_with(f"\n{table_text}\n")
# 3.3 按照子元素获取文本内容,统一换行
content_parts = []
for element in content_div.children:
if element.name in ['br', 'br/', 'br /', 'hr', 'hr/', 'hr /', 'wbr']:
content_parts.append('\n')
elif element.name in text_elements:
text = element.get_text(strip=True)
if text:
content_parts.append(text)
# 只在块级元素后添加换行符
if element.name in {'div', 'section', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'}:
content_parts.append('\n')
elif isinstance(element, str):
text = element.strip()
if text:
content_parts.append(text)
return '\n'.join(content_parts).strip()
soup = BeautifulSoup(cleaned_html, 'html.parser')
# 1. 查找第一个包含 <h1> 元素的 div 块,提取 title
h1_tag = soup.find('h1')
if h1_tag:
h1_div = h1_tag.parent
title = h1_tag.get_text(strip=True)
else:
# 如果找不到的话 说明是已删除或者分享页
soup = BeautifulSoup(raw_html, 'html.parser')
# 从 original_panel_tool 中找到 data-url
share_source = soup.find('span', id='js_share_source')
if share_source and share_source.get('data-url'):
data_url = share_source['data-url']
# 替换 http 为 https
data_url = data_url.replace('http://', 'https://', 1)
if not data_url or not data_url.startswith('https://mp.weixin.qq.com'):
return ScraperResultData(url=url, content=content, links=links, images=images)
# 从 js_content 中获取描述文本
content_div = soup.find('div', id='js_content')
if not content_div:
return ScraperResultData(url=url, content=content, links=links, images=images)
des = content_div.get_text(strip=True)
return ScraperResultData(url=url, content=content, links={data_url: des}, images=images)
else:
return ScraperResultData(url=url, content=content, links=links, images=images)
# 2. 判断这个子块下面包含几个非空 div 子块
sub_divs = [div for div in h1_div.find_all('div', recursive=False) if len(div.contents) > 0]
num_sub_divs = len(sub_divs)
if num_sub_divs == 1:
# 2.1 如果只包含一个子块
strong_tag = sub_divs[0].find('strong')
if strong_tag:
author = strong_tag.get_text(strip=True)
# 查找包含日期和时间的span标签
date_span = sub_divs[0].find('span', string=re.compile(r'\d{4}\d{2}\d{2}\s+\d{2}:\d{2}'))
# 如果找到日期,只提取日期部分
if date_span:
publish_date = date_span.get_text(strip=True).split()[0] # 只取日期部分
else:
publish_date = None
# 提取与包含 <h1> 元素的 div 块平级的紧挨着的下一个 div 块作为 content
content_div = h1_div.find_next_sibling('div')
content = title + '\n\n' + process_content(content_div)
else:
author = None
publish_date = None
content = fetch_result['markdown']
elif num_sub_divs >= 2:
# 2.2 如果包含两个及以上子块
a_tag = sub_divs[0].find('a', href="javascript:void(0);")
if a_tag:
author = a_tag.get_text(strip=True)
# 查找下一个包含日期时间的em标签
date_em = sub_divs[0].find('em', string=re.compile(r'\d{4}\d{2}\d{2}\s+\d{2}:\d{2}'))
if date_em:
# 只提取日期部分
publish_date = date_em.get_text(strip=True).split()[0]
else:
publish_date = None
else:
author = None
publish_date = None
# 剩下的 div 子块合起来作为 content
content_divs = sub_divs[1:]
content = '\n\n'.join([process_content(div) for div in content_divs])
content = title + '\n\n' + content
else:
author = None
publish_date = None
content = title
return ScraperResultData(url=url, content=content, links=links, images=images, author=author, publish_date=publish_date, title=title)

View File

@ -0,0 +1,39 @@
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class ScraperResultData:
"""用于存储网页抓取数据的数据类"""
url: str
content: Optional[str] = None
links: Optional[Dict[str, str]] = None
images: Optional[List[str]] = None
author: Optional[str] = None
publish_date: Optional[str] = None
title: Optional[str] = None
base: Optional[str] = None
def __post_init__(self):
# 验证 url 是否存在且为字符串类型
if not isinstance(self.url, str) or not self.url.strip():
raise ValueError("URL 必须是非空字符串")
# 初始化可选字段
if self.images is None:
self.images = []
if self.links is None:
self.links = {}
# 确保 publish_date 是字符串格式
if self.publish_date is not None:
if isinstance(self.publish_date, datetime):
self.publish_date = self.publish_date.isoformat()
elif not isinstance(self.publish_date, str):
self.publish_date = str(self.publish_date)
# 确保 images 是列表类型
if self.images is not None:
if not isinstance(self.images, list):
raise ValueError("images 必须是列表类型")

View File

@ -7,6 +7,7 @@ project_root = os.path.dirname(current_dir) # 获取父目录
sys.path.append(project_root)
from core.utils.deep_scraper import deep_scraper, common_chars
from core.custom_scrapers.mp_scraper import mp_scraper
def check_url_text(text):
print(f"processing: {text}")
@ -89,8 +90,21 @@ if __name__ == '__main__':
with open(file, 'r') as f:
html_sample = json.load(f)
_url = html_sample['url']
raw_markdown = html_sample['markdown']
used_img = {d['src']: d['alt'] for d in html_sample['media']['images']}
if _url.startswith('https://mp.weixin.qq.com'):
result = mp_scraper(html_sample)
print(f'url: {result.url}')
print(f'content: {result.content}')
print(f'links: {result.links}')
print(f'author: {result.author}')
print(f'publish_date: {result.publish_date}')
print(f'images: {len(result.images)}')
for img in result.images:
print(img)
raw_markdown = result.content
used_img = result.images
else:
raw_markdown = html_sample['markdown']
used_img = {d['src']: d['alt'] for d in html_sample['media']['images']}
except Exception as e:
print('sample format error, try to use craw4ai_fething.py to get sample')
print(f"error: {e}")

59
test/read_markdown.py Normal file
View File

@ -0,0 +1,59 @@
import os
import json
import re
def read_markdown_from_json_files(directory_path):
# Get all JSON files in the directory
json_files = [f for f in os.listdir(directory_path) if f.endswith('.json')]
img_pattern = r'!\[(.*?)\]\((.*?)\)'
link_pattern = r'\[(.*?)\]\((.*?)\)'
# Process each JSON file
for json_file in sorted(json_files):
file_path = os.path.join(directory_path, json_file)
print(f"\nProcessing file: {json_file}")
print("-" * 50)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
markdown = data.get('markdown')
# Find the longest consecutive newlines in the markdown text
if markdown:
# Find all sequences of newlines and get their lengths
max_newlines = max(len(match) for match in re.findall(r'\n+', markdown)) if re.search(r'\n+', markdown) else 0
print(f"Longest consecutive newlines: {max_newlines}")
if max_newlines < 2:
sections = [markdown]
else:
sections = markdown.split('\n' * max_newlines)
for i, section in enumerate(sections):
print(f"Section {i + 1}:")
print(section)
print('\n\n')
newline_count = section.count('\n')
# 处理图片标记 ![alt](src)
img_pattern = r'!\[(.*?)\]\((.*?)\)'
matches = re.findall(img_pattern, section)
for alt, src in matches:
# 替换为新格式 §alt||src§
section = section.replace(f'![{alt}]({src})', f'§{alt}||{src}§')
# 处理链接标记 [text](url)
matches = re.findall(link_pattern, section)
# 从text中去掉所有matches部分
for link_text, link_url in matches:
section = section.replace(f'[{link_text}]({link_url})', '')
if len(section) == 0:
print("no text in section")
continue
print(f"newline/text ratio: {newline_count/len(section)*100}")
print(f"links/section ratio: {len(matches)/len(section)*100}")
print("-" * 50)
if __name__ == "__main__":
# Path to the webpage_samples directory
samples_dir = os.path.dirname(os.path.abspath(__file__)) + "/webpage_samples"
read_markdown_from_json_files(samples_dir)

0
weixin_mp/__init__.py Normal file
View File

View File

@ -0,0 +1 @@
websockets

8
weixin_mp/run_weixin.sh Normal file
View File

@ -0,0 +1,8 @@
#!/bin/bash
# export CONFIGS='avatars'
# export WX_BOT_ENDPOINT='127.0.0.1:8066'
# export MAIN_SERVICE_ENDPOINT='http://127.0.0.1:7777/'
# export VERBOSE=True
python weixin.py

113
weixin_mp/weixin.py Normal file
View File

@ -0,0 +1,113 @@
import websockets
import json
import re
import httpx
import asyncio
import os, sys
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir) # 获取父目录
sys.path.append(project_root)
from core.general_process import main_process, wiseflow_logger
from typing import Optional
import logging
logging.getLogger("httpx").setLevel(logging.WARNING)
# 千万注意扫码登录时不要选择“同步历史消息”,否则会造成 bot 上来挨个回复历史消息
# 先检查下 wx 的登录状态,同时获取已登录微信的 wxid
WX_BOT_ENDPOINT = os.environ.get('WX_BOT_ENDPOINT', '127.0.0.1:8066')
wx_url = f"http://{WX_BOT_ENDPOINT}/api/"
try:
# 发送GET请求
response = httpx.get(f"{wx_url}checklogin")
response.raise_for_status() # 检查HTTP响应状态码是否为200
# 解析JSON响应
data = response.json()
# 检查status字段
if data['data']['status'] == 1:
# 记录wxid
self_wxid = data['data']['wxid']
wiseflow_logger.info(f"已登录微信账号: {self_wxid}")
else:
# 抛出异常
wiseflow_logger.error("未检测到任何登录信息,将退出")
raise ValueError("登录失败status不为1")
except Exception as e:
wiseflow_logger.error(f"无法链接微信端点:{wx_url}, 错误:\n{e}")
raise ValueError("登录失败,无法连接")
# 获取登录微信昵称,用于后面判断是否@自己的消息
response = httpx.get(f"{wx_url}userinfo")
response.raise_for_status() # 检查HTTP响应状态码是否为200
# 解析JSON响应
data = response.json()
self_nickname = data['data'].get('nickname', " ")
wiseflow_logger.info(f"self_nickname: {self_nickname}")
# 如果要选定只监控部分公众号,请在同一文件夹内创建 config.json 文件,内容为要监控的公众号列表
# 注意这里要写公众号的原始id即 gh_ 开头的id, 可以通过历史 logger 获取
config_file = 'config.json'
if not os.path.exists(config_file):
config = None
else:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
#如下 pattern 仅适用于public msg的解析群内分享的公号文章不在此列
# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg
item_pattern = re.compile(r'<item>(.*?)</item>', re.DOTALL)
url_pattern = re.compile(r'<url><!\[CDATA\[(.*?)]]></url>')
async def get_public_msg(websocket_uri):
reconnect_attempts = 0
max_reconnect_attempts = 3
while True:
try:
async with websockets.connect(websocket_uri, max_size=10 * 1024 * 1024) as websocket:
while True:
response = await websocket.recv()
datas = json.loads(response)
for data in datas["data"]:
if "StrTalker" not in data or "Content" not in data:
wiseflow_logger.warning(f"invalid data:\n{data}")
continue
user_id = data["StrTalker"]
items = item_pattern.findall(data["Content"])
# Iterate through all < item > content, extracting < url > and < summary >
todo_urls = set()
for item in items:
url_match = url_pattern.search(item)
url = url_match.group(1) if url_match else None
if not url:
wiseflow_logger.warning(f"can not find url in \n{item}")
continue
# URL processing, http is replaced by https, and the part after chksm is removed.
url = url.replace('http://', 'https://')
cut_off_point = url.find('chksm=')
if cut_off_point != -1:
url = url[:cut_off_point - 1]
# summary_match = summary_pattern.search(item)
# addition = summary_match.group(1) if summary_match else None
todo_urls.add(url)
await main_process(todo_urls)
except websockets.exceptions.ConnectionClosedError as e:
wiseflow_logger.error(f"Connection closed with exception: {e}")
reconnect_attempts += 1
if reconnect_attempts <= max_reconnect_attempts:
wiseflow_logger.info(f"Reconnecting attempt {reconnect_attempts}...")
await asyncio.sleep(1)
else:
wiseflow_logger.error("Max reconnect attempts reached. Exiting.")
break
except Exception as e:
wiseflow_logger.error(f"PublicMsgHandler error: {e}")
uri_public = f"ws://{WX_BOT_ENDPOINT}/ws/publicMsg"
asyncio.run(get_public_msg(uri_public))