new deep scraper

This commit is contained in:
bigbrother666sh 2025-01-15 00:33:41 +08:00
parent 3523b126c7
commit aa49216acb
3 changed files with 53 additions and 35 deletions

View File

@ -6,7 +6,7 @@
# action_dict needs to be extracted from raw html, which is not covered by this script
import re
from urllib.parse import urlparse, urljoin
from urllib.parse import urljoin
common_file_exts = [
@ -128,7 +128,7 @@ def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple
text = text.replace(_sec, link_text + _key, 1)
# 检查链接是否是常见文件类型或顶级域名
# todo: get_more_url 时再处理
# todo: 最后提取是否添加到 more_link时或者主流程时再处理
"""
has_common_ext = any(url.endswith(ext) for ext in common_file_exts)
has_common_tld = any(url.endswith(tld) or url.endswith(tld + '/') for tld in common_tlds)
@ -138,14 +138,16 @@ def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple
# 处理文本中的其他图片标记
img_pattern = r'(§(.*?)\|\|(.*?)§)'
matches = re.findall(img_pattern, text)
remained_text = re.sub(img_pattern, '', text).strip()
remained_text_len = len(remained_text )
for _sec, alt, src in matches:
if not src or src.startswith('#') or src not in used_img:
if not src or src.startswith('#'):
text = text.replace(_sec, alt, 1)
continue
img_src = normalize_url(src, base_url)
if not img_src:
text = text.replace(_sec, alt, 1)
elif len(alt) > 2:
elif src not in used_img or remained_text_len > 5 or len(alt) > 2:
_key = f"[img{len(link_dict)+1}]"
link_dict[_key] = img_src
text = text.replace(_sec, alt + _key, 1)
@ -176,8 +178,18 @@ def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple
return text
sections = raw_markdown.split('# ') # use '# ' to avoid # in url
texts = [check_url_text(text) for text in sections]
texts = [text for text in texts if text.strip()]
texts = []
for i, section in enumerate(sections):
# filter the possible navigate section and footer section
section_remain = re.sub(r'\[.*?]\(.*?\)', '', section).strip()
section_remain_len = len(section_remain)
total_links = len(re.findall(r'\[.*?]\(.*?\)', section))
print(f"section {i}")
print(f"ratio: {total_links/section_remain_len}")
processed_p = [check_url_text(p) for p in section.split('\n\n')]
processed_p = [p for p in processed_p if p.strip()]
texts.append('\n\n'.join(processed_p))
return link_dict, texts, to_be_recognized_by_visual_llm

View File

@ -1,5 +1,6 @@
from bs4 import BeautifulSoup
import re
from crawl4ai import CrawlResult
from .scraper_data import ScraperResultData
# 定义所有可能包含文本的块级和内联元素
@ -11,13 +12,12 @@ text_elements = {
}
def mp_scraper(fetch_result: dict) -> ScraperResultData:
url = fetch_result['url']
raw_html = fetch_result['html']
cleaned_html = fetch_result['cleaned_html']
def mp_scraper(fetch_result: CrawlResult) -> ScraperResultData:
url = fetch_result.url
raw_html = fetch_result.html
cleaned_html = fetch_result.cleaned_html
content = ''
links = {}
images = []
if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
@ -38,17 +38,13 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData:
description = u_text
else:
description = f'{u_title}-{u_text}'
if _url and description:
if _url not in links:
links[_url] = description
else:
links[_url] = f'{links[_url]}|{description}'
return ScraperResultData(url=url, content=content, links=links, images=images)
content += f'[{description}]({_url})\n'
return ScraperResultData(content=content, images=images)
def process_content(content_div):
# 3.1 处理所有 <img> 元素
for img in content_div.find_all('img', attrs={'data-src': True}, recursive=True):
data_type = img.get('data-type')
data_type = img.get('data-type', '')
if data_type in ['gif', 'svg']:
continue
src = img.get('data-src')
@ -200,15 +196,18 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData:
# 替换 http 为 https
data_url = data_url.replace('http://', 'https://', 1)
if not data_url or not data_url.startswith('https://mp.weixin.qq.com'):
return ScraperResultData(url=url, content=content, links=links, images=images)
# maybe a new_type_article
return ScraperResultData(title='maybe a new_type_article')
# 从 js_content 中获取描述文本
content_div = soup.find('div', id='js_content')
if not content_div:
return ScraperResultData(url=url, content=content, links=links, images=images)
# maybe a new_type_article
return ScraperResultData(title='maybe a new_type_article')
des = content_div.get_text(strip=True)
return ScraperResultData(url=url, content=content, links={data_url: des}, images=images)
return ScraperResultData(content=f'[{des}]({data_url})')
else:
return ScraperResultData(url=url, content=content, links=links, images=images)
# a deleted page
return ScraperResultData()
# 2. 判断这个子块下面包含几个非空 div 子块
sub_divs = [div for div in h1_div.find_all('div', recursive=False) if len(div.contents) > 0]
@ -226,6 +225,7 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData:
publish_date = date_span.get_text(strip=True).split()[0] # 只取日期部分
else:
publish_date = None
title = 'maybe a new_type_article'
# 提取与包含 <h1> 元素的 div 块平级的紧挨着的下一个 div 块作为 content
content_div = h1_div.find_next_sibling('div')
content = title + '\n\n' + process_content(content_div)
@ -246,9 +246,11 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData:
publish_date = date_em.get_text(strip=True).split()[0]
else:
publish_date = None
title = 'maybe a new_type_article'
else:
author = None
publish_date = None
title = 'maybe a new_type_article'
# 剩下的 div 子块合起来作为 content
content_divs = sub_divs[1:]
content = '# '.join([process_content(div) for div in content_divs])
@ -256,6 +258,8 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData:
else:
author = None
publish_date = None
content = title
content = 'maybe a new_type_article'
return ScraperResultData(url=url, content=content, links=links, images=images, author=author, publish_date=publish_date, title=title)
if len(images) > 2:
images = images[1:-1]
return ScraperResultData(title=title, content=content, images=images, author=author, publish_date=publish_date)

View File

@ -1,13 +1,13 @@
from dataclasses import dataclass
from typing import List, Dict, Optional
from typing import List, Optional
from datetime import datetime
@dataclass
class ScraperResultData:
"""用于存储网页抓取数据的数据类"""
url: str
# url: str
content: Optional[str] = None
links: Optional[Dict[str, str]] = None
# links: Optional[Dict[str, str]] = None
images: Optional[List[str]] = None
author: Optional[str] = None
publish_date: Optional[str] = None
@ -15,16 +15,18 @@ class ScraperResultData:
base: Optional[str] = None
def __post_init__(self):
# 验证 url 是否存在且为字符串类型
if not isinstance(self.url, str) or not self.url.strip():
raise ValueError("URL 必须是非空字符串")
# 初始化可选字段
if self.images is None:
self.images = []
if self.links is None:
self.links = {}
if self.title is None:
self.title = ""
if self.author is None:
self.author = ""
if self.content is None:
self.content = ""
# 确保 publish_date 是字符串格式
if self.publish_date is not None: