mirror of
https://github.com/TeamWiseFlow/wiseflow.git
synced 2025-01-23 02:20:20 +08:00
282 lines
12 KiB
Python
282 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
from bs4 import BeautifulSoup
|
|
import re
|
|
from crawl4ai import CrawlResult
|
|
from .scraper_data import ScraperResultData
|
|
|
|
# 定义所有可能包含文本的块级和内联元素
|
|
text_elements = {
|
|
# 块级元素
|
|
'div', 'section', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
|
|
# 内联文本元素
|
|
'span', 'em', 'strong'
|
|
}
|
|
|
|
|
|
def mp_scraper(fetch_result: CrawlResult | dict) -> ScraperResultData:
|
|
if isinstance(fetch_result, dict):
|
|
url = fetch_result['url']
|
|
raw_html = fetch_result['html']
|
|
cleaned_html = fetch_result['cleaned_html']
|
|
raw_markdown = fetch_result['markdown']
|
|
media = fetch_result['media']['images']
|
|
elif isinstance(fetch_result, CrawlResult):
|
|
url = fetch_result.url
|
|
raw_html = fetch_result.html
|
|
cleaned_html = fetch_result.cleaned_html
|
|
raw_markdown = fetch_result.markdown
|
|
media = fetch_result.media['images']
|
|
else:
|
|
raise TypeError('fetch_result must be a CrawlResult or a dict')
|
|
|
|
content = ''
|
|
images = []
|
|
|
|
if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
|
|
# album page type
|
|
soup = BeautifulSoup(raw_html, 'html.parser')
|
|
for li in soup.find_all('li', class_='album__list-item'):
|
|
u_text = li.get_text(strip=True)
|
|
u_title = li.attrs['data-title'].strip()
|
|
_url = li.attrs['data-link'].replace("http://", "https://", 1)
|
|
if not _url or _url.startswith('javas') or _url.startswith('#') or _url.startswith('mailto:') or _url.startswith('data:') or _url.startswith('about:blank'):
|
|
continue
|
|
|
|
cut_off_point = _url.find('chksm=')
|
|
if cut_off_point != -1:
|
|
_url = _url[:cut_off_point - 1]
|
|
|
|
if u_title in u_text:
|
|
description = u_text
|
|
else:
|
|
description = f'{u_title}-{u_text}'
|
|
content += f'[{description}]({_url})\n'
|
|
return ScraperResultData(content=content, images=images)
|
|
|
|
def process_content(content_div):
|
|
# 3.1 处理所有 <img> 元素
|
|
for img in content_div.find_all('img', attrs={'data-src': True}, recursive=True):
|
|
data_type = img.get('data-type', '')
|
|
if data_type in ['gif', 'svg']:
|
|
continue
|
|
src = img.get('data-src')
|
|
if not src or src.startswith('#') or src.startswith('about:blank'):
|
|
src = None
|
|
text = img.get('alt', '').strip()
|
|
if not src:
|
|
img.replace_with(text)
|
|
continue
|
|
images.append(src)
|
|
# find all area urls related to this img
|
|
area_urls = set()
|
|
if img.get('usemap'):
|
|
# remove the # at the beginning of the map name
|
|
map_name = img.get('usemap').lstrip('#')
|
|
# find the map tag
|
|
map_tag = content_div.find('map', {'name': map_name})
|
|
if map_tag:
|
|
# get all area tags under the map
|
|
for area in map_tag.find_all('area', href=True):
|
|
area_href = area.get('href')
|
|
area_urls.add(area_href)
|
|
area.decompose()
|
|
# delete the whole map tag
|
|
map_tag.decompose()
|
|
area_urls = ')[]('.join(area_urls)
|
|
replacement_text = f'![{text}]({src})[]({area_urls})' if area_urls else f'![{text}]({src})'
|
|
img.replace_with(replacement_text)
|
|
|
|
for media in content_div.find_all(['video', 'audio', 'source', 'embed', 'iframe', 'figure'], src=True, recursive=True):
|
|
src = media.get('src')
|
|
if not src or src.startswith('javascript:') or src.startswith('#') or src.startswith('mailto:') or src.startswith('data:') or src.startswith('about:blank'):
|
|
src = None
|
|
text = media.get_text().strip() or media.get('alt', '').strip()
|
|
if src:
|
|
media.replace_with(f"[{text}]({src})")
|
|
else:
|
|
media.decompose()
|
|
|
|
for obj in content_div.find_all('object', data=True, recursive=True):
|
|
data = obj.get('data')
|
|
if not data or data.startswith('javascript:') or data.startswith('#') or data.startswith('mailto:') or data.startswith('data:') or data.startswith('about:blank'):
|
|
data = None
|
|
text = obj.get_text().strip() or obj.get('alt', '').strip()
|
|
if data:
|
|
obj.replace_with(f"[{text}]({data})")
|
|
else:
|
|
obj.decompose()
|
|
|
|
# process links at last, so that we can keep the image and media info in the link
|
|
for a in content_div.find_all('a', href=True, recursive=True):
|
|
href = a.get('href')
|
|
if not href or href.startswith('javascript:') or href.startswith('#') or href.startswith('about:blank'):
|
|
href = None
|
|
text = a.get_text().strip()
|
|
if href:
|
|
a.replace_with(f"[{text}]({href})")
|
|
else:
|
|
a.decompose()
|
|
|
|
# handle lists
|
|
for list_tag in content_div.find_all(['ul', 'ol'], recursive=True):
|
|
list_text = []
|
|
for idx, item in enumerate(list_tag.find_all('li')):
|
|
list_text.append(f"{idx + 1}. {item.get_text().strip()}")
|
|
list_text = '\t'.join(list_text)
|
|
list_tag.replace_with(f"{list_text}\n")
|
|
|
|
# handle strikethrough text
|
|
for del_tag in content_div.find_all(['del', 's'], recursive=True):
|
|
del_text = del_tag.get_text().strip()
|
|
if del_text:
|
|
del_tag.replace_with(f"{del_text}(maybe_outdated)")
|
|
else:
|
|
del_tag.decompose()
|
|
|
|
# handle tables
|
|
for table in content_div.find_all('table', recursive=True):
|
|
table_text = []
|
|
|
|
# handle caption
|
|
caption = table.find('caption')
|
|
if caption:
|
|
table_text.append(caption.get_text().strip())
|
|
|
|
# get headers
|
|
headers = []
|
|
for th in table.find_all('th'):
|
|
headers.append(th.get_text().strip())
|
|
|
|
# handle all rows (including tbody and tfoot)
|
|
for row in table.find_all('tr'):
|
|
# get the first cell value
|
|
# try to find th as first_val
|
|
first_cell = row.find(['th', 'td'])
|
|
if not first_cell:
|
|
continue
|
|
first_val = first_cell.get_text().strip()
|
|
cells = row.find_all('td')
|
|
if not cells:
|
|
continue
|
|
# handle remaining cells
|
|
for idx, cell in enumerate(cells):
|
|
cell_text = cell.get_text().strip()
|
|
if not cell_text or cell_text == first_val:
|
|
continue
|
|
|
|
header_text = headers[idx] if idx < len(headers) else ''
|
|
cell_str = f"{first_val}-{header_text}-{cell_text}"
|
|
table_text.append(cell_str)
|
|
|
|
# replace the table with the processed text
|
|
table_text = '\n'.join(table_text)
|
|
table.replace_with(f"\n{table_text}\n")
|
|
|
|
# 3.3 按照子元素获取文本内容,统一换行
|
|
content_parts = []
|
|
for element in content_div.children:
|
|
if element.name in ['br', 'br/', 'br /', 'hr', 'hr/', 'hr /', 'wbr']:
|
|
content_parts.append('\n')
|
|
elif element.name in text_elements:
|
|
text = element.get_text(strip=True)
|
|
if text:
|
|
content_parts.append(text)
|
|
# 只在块级元素后添加换行符
|
|
if element.name in {'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'}:
|
|
content_parts.append('\n')
|
|
if element.name in {'div', 'section'}:
|
|
content_parts.append('# ')
|
|
elif isinstance(element, str):
|
|
text = element.strip()
|
|
if text:
|
|
content_parts.append(text)
|
|
|
|
return ''.join(content_parts).strip()
|
|
|
|
soup = BeautifulSoup(cleaned_html, 'html.parser')
|
|
|
|
# 1. 查找第一个包含 <h1> 元素的 div 块,提取 title
|
|
h1_tag = soup.find('h1')
|
|
if h1_tag:
|
|
h1_div = h1_tag.parent
|
|
title = h1_tag.get_text(strip=True)
|
|
else:
|
|
# 如果找不到的话 说明是已删除或者分享页
|
|
soup = BeautifulSoup(raw_html, 'html.parser')
|
|
# 从 original_panel_tool 中找到 data-url
|
|
share_source = soup.find('span', id='js_share_source')
|
|
if share_source and share_source.get('data-url'):
|
|
data_url = share_source['data-url']
|
|
# 替换 http 为 https
|
|
data_url = data_url.replace('http://', 'https://', 1)
|
|
if not data_url or not data_url.startswith('https://mp.weixin.qq.com'):
|
|
# maybe a new_type_article
|
|
return ScraperResultData(title='maybe a new_type_article')
|
|
# 从 js_content 中获取描述文本
|
|
content_div = soup.find('div', id='js_content')
|
|
if not content_div:
|
|
# maybe a new_type_article
|
|
return ScraperResultData(title='maybe a new_type_article')
|
|
des = content_div.get_text(strip=True)
|
|
return ScraperResultData(content=f'[{des}]({data_url})')
|
|
else:
|
|
# a deleted page
|
|
return ScraperResultData()
|
|
|
|
# 2. 判断这个子块下面包含几个非空 div 子块
|
|
sub_divs = [div for div in h1_div.find_all('div', recursive=False) if len(div.contents) > 0]
|
|
num_sub_divs = len(sub_divs)
|
|
|
|
if num_sub_divs == 1:
|
|
# 2.1 如果只包含一个子块
|
|
strong_tag = sub_divs[0].find('strong')
|
|
if strong_tag:
|
|
author = strong_tag.get_text(strip=True)
|
|
# 查找包含日期和时间的span标签
|
|
date_span = sub_divs[0].find('span', string=re.compile(r'\d{4}年\d{2}月\d{2}日\s+\d{2}:\d{2}'))
|
|
# 如果找到日期,只提取日期部分
|
|
if date_span:
|
|
publish_date = date_span.get_text(strip=True).split()[0] # 只取日期部分
|
|
else:
|
|
publish_date = None
|
|
title = 'maybe a new_type_article'
|
|
# 提取与包含 <h1> 元素的 div 块平级的紧挨着的下一个 div 块作为 content
|
|
content_div = h1_div.find_next_sibling('div')
|
|
content = title + '\n\n' + process_content(content_div)
|
|
else:
|
|
author = None
|
|
publish_date = None
|
|
content = raw_markdown
|
|
images = [d['src'] for d in media]
|
|
|
|
elif num_sub_divs >= 2:
|
|
# 2.2 如果包含两个及以上子块
|
|
a_tag = sub_divs[0].find('a', href="javascript:void(0);")
|
|
if a_tag:
|
|
author = a_tag.get_text(strip=True)
|
|
# 查找下一个包含日期时间的em标签
|
|
date_em = sub_divs[0].find('em', string=re.compile(r'\d{4}年\d{2}月\d{2}日\s+\d{2}:\d{2}'))
|
|
if date_em:
|
|
# 只提取日期部分
|
|
publish_date = date_em.get_text(strip=True).split()[0]
|
|
else:
|
|
publish_date = None
|
|
title = 'maybe a new_type_article'
|
|
else:
|
|
author = None
|
|
publish_date = None
|
|
title = 'maybe a new_type_article'
|
|
# 剩下的 div 子块合起来作为 content
|
|
content_divs = sub_divs[1:]
|
|
content = '# '.join([process_content(div) for div in content_divs])
|
|
content = title + '\n\n' + content
|
|
else:
|
|
author = None
|
|
publish_date = None
|
|
content = 'maybe a new_type_article'
|
|
|
|
if len(images) > 2:
|
|
images = images[1:-1]
|
|
return ScraperResultData(title=title, content=content, images=images, author=author, publish_date=publish_date)
|