From aa49216acb8c8d2267affb78dd0f6fc6b9344a7e Mon Sep 17 00:00:00 2001 From: bigbrother666sh Date: Wed, 15 Jan 2025 00:33:41 +0800 Subject: [PATCH] new deep scraper --- core/scrapers/deep_scraper.py | 24 +++++++++++++++----- core/scrapers/mp_scraper.py | 42 +++++++++++++++++++---------------- core/scrapers/scraper_data.py | 22 +++++++++--------- 3 files changed, 53 insertions(+), 35 deletions(-) diff --git a/core/scrapers/deep_scraper.py b/core/scrapers/deep_scraper.py index 388a4c5..b9135b6 100644 --- a/core/scrapers/deep_scraper.py +++ b/core/scrapers/deep_scraper.py @@ -6,7 +6,7 @@ # action_dict needs to be extracted from raw html, which is not covered by this script import re -from urllib.parse import urlparse, urljoin +from urllib.parse import urljoin common_file_exts = [ @@ -128,7 +128,7 @@ def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple text = text.replace(_sec, link_text + _key, 1) # 检查链接是否是常见文件类型或顶级域名 - # todo: get_more_url 时再处理 + # todo: 最后提取是否添加到 more_link时或者主流程时再处理 """ has_common_ext = any(url.endswith(ext) for ext in common_file_exts) has_common_tld = any(url.endswith(tld) or url.endswith(tld + '/') for tld in common_tlds) @@ -138,14 +138,16 @@ def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple # 处理文本中的其他图片标记 img_pattern = r'(§(.*?)\|\|(.*?)§)' matches = re.findall(img_pattern, text) + remained_text = re.sub(img_pattern, '', text).strip() + remained_text_len = len(remained_text ) for _sec, alt, src in matches: - if not src or src.startswith('#') or src not in used_img: + if not src or src.startswith('#'): text = text.replace(_sec, alt, 1) continue img_src = normalize_url(src, base_url) if not img_src: text = text.replace(_sec, alt, 1) - elif len(alt) > 2: + elif src not in used_img or remained_text_len > 5 or len(alt) > 2: _key = f"[img{len(link_dict)+1}]" link_dict[_key] = img_src text = text.replace(_sec, alt + _key, 1) @@ -176,8 +178,18 @@ def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple return text sections = raw_markdown.split('# ') # use '# ' to avoid # in url - texts = [check_url_text(text) for text in sections] - texts = [text for text in texts if text.strip()] + texts = [] + for i, section in enumerate(sections): + # filter the possible navigate section and footer section + section_remain = re.sub(r'\[.*?]\(.*?\)', '', section).strip() + section_remain_len = len(section_remain) + total_links = len(re.findall(r'\[.*?]\(.*?\)', section)) + print(f"section {i}") + print(f"ratio: {total_links/section_remain_len}") + + processed_p = [check_url_text(p) for p in section.split('\n\n')] + processed_p = [p for p in processed_p if p.strip()] + texts.append('\n\n'.join(processed_p)) return link_dict, texts, to_be_recognized_by_visual_llm \ No newline at end of file diff --git a/core/scrapers/mp_scraper.py b/core/scrapers/mp_scraper.py index af6ba88..8957a39 100644 --- a/core/scrapers/mp_scraper.py +++ b/core/scrapers/mp_scraper.py @@ -1,5 +1,6 @@ from bs4 import BeautifulSoup import re +from crawl4ai import CrawlResult from .scraper_data import ScraperResultData # 定义所有可能包含文本的块级和内联元素 @@ -11,13 +12,12 @@ text_elements = { } -def mp_scraper(fetch_result: dict) -> ScraperResultData: - url = fetch_result['url'] - raw_html = fetch_result['html'] - cleaned_html = fetch_result['cleaned_html'] - +def mp_scraper(fetch_result: CrawlResult) -> ScraperResultData: + url = fetch_result.url + raw_html = fetch_result.html + cleaned_html = fetch_result.cleaned_html + content = '' - links = {} images = [] if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'): @@ -38,17 +38,13 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: description = u_text else: description = f'{u_title}-{u_text}' - if _url and description: - if _url not in links: - links[_url] = description - else: - links[_url] = f'{links[_url]}|{description}' - return ScraperResultData(url=url, content=content, links=links, images=images) + content += f'[{description}]({_url})\n' + return ScraperResultData(content=content, images=images) def process_content(content_div): # 3.1 处理所有 元素 for img in content_div.find_all('img', attrs={'data-src': True}, recursive=True): - data_type = img.get('data-type') + data_type = img.get('data-type', '') if data_type in ['gif', 'svg']: continue src = img.get('data-src') @@ -200,15 +196,18 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: # 替换 http 为 https data_url = data_url.replace('http://', 'https://', 1) if not data_url or not data_url.startswith('https://mp.weixin.qq.com'): - return ScraperResultData(url=url, content=content, links=links, images=images) + # maybe a new_type_article + return ScraperResultData(title='maybe a new_type_article') # 从 js_content 中获取描述文本 content_div = soup.find('div', id='js_content') if not content_div: - return ScraperResultData(url=url, content=content, links=links, images=images) + # maybe a new_type_article + return ScraperResultData(title='maybe a new_type_article') des = content_div.get_text(strip=True) - return ScraperResultData(url=url, content=content, links={data_url: des}, images=images) + return ScraperResultData(content=f'[{des}]({data_url})') else: - return ScraperResultData(url=url, content=content, links=links, images=images) + # a deleted page + return ScraperResultData() # 2. 判断这个子块下面包含几个非空 div 子块 sub_divs = [div for div in h1_div.find_all('div', recursive=False) if len(div.contents) > 0] @@ -226,6 +225,7 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: publish_date = date_span.get_text(strip=True).split()[0] # 只取日期部分 else: publish_date = None + title = 'maybe a new_type_article' # 提取与包含

元素的 div 块平级的紧挨着的下一个 div 块作为 content content_div = h1_div.find_next_sibling('div') content = title + '\n\n' + process_content(content_div) @@ -246,9 +246,11 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: publish_date = date_em.get_text(strip=True).split()[0] else: publish_date = None + title = 'maybe a new_type_article' else: author = None publish_date = None + title = 'maybe a new_type_article' # 剩下的 div 子块合起来作为 content content_divs = sub_divs[1:] content = '# '.join([process_content(div) for div in content_divs]) @@ -256,6 +258,8 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: else: author = None publish_date = None - content = title + content = 'maybe a new_type_article' - return ScraperResultData(url=url, content=content, links=links, images=images, author=author, publish_date=publish_date, title=title) + if len(images) > 2: + images = images[1:-1] + return ScraperResultData(title=title, content=content, images=images, author=author, publish_date=publish_date) diff --git a/core/scrapers/scraper_data.py b/core/scrapers/scraper_data.py index 0284b24..cc696a8 100644 --- a/core/scrapers/scraper_data.py +++ b/core/scrapers/scraper_data.py @@ -1,13 +1,13 @@ from dataclasses import dataclass -from typing import List, Dict, Optional +from typing import List, Optional from datetime import datetime @dataclass class ScraperResultData: """用于存储网页抓取数据的数据类""" - url: str + # url: str content: Optional[str] = None - links: Optional[Dict[str, str]] = None + # links: Optional[Dict[str, str]] = None images: Optional[List[str]] = None author: Optional[str] = None publish_date: Optional[str] = None @@ -15,16 +15,18 @@ class ScraperResultData: base: Optional[str] = None def __post_init__(self): - # 验证 url 是否存在且为字符串类型 - if not isinstance(self.url, str) or not self.url.strip(): - raise ValueError("URL 必须是非空字符串") - # 初始化可选字段 if self.images is None: self.images = [] - - if self.links is None: - self.links = {} + + if self.title is None: + self.title = "" + + if self.author is None: + self.author = "" + + if self.content is None: + self.content = "" # 确保 publish_date 是字符串格式 if self.publish_date is not None: