From 26bf9a573a65a8882eee93331cd5f6083effa5ec Mon Sep 17 00:00:00 2001 From: bigbrother666sh Date: Sun, 12 Jan 2025 16:22:37 +0800 Subject: [PATCH] modify scrapers --- core/custom_fetchings/README.md | 19 -- core/custom_fetchings/__init__.py | 5 - core/requirements.txt | 2 +- core/scrapers/README.md | 76 ++++++ .../README_EN.md | 0 core/scrapers/__init__.py | 9 + core/{utils => scrapers}/deep_scraper.py | 0 core/scrapers/mp_scraper.py | 258 ++++++++++++++++++ core/scrapers/scraper_data.py | 39 +++ test/deep_scraper_test.py | 18 +- test/read_markdown.py | 59 ++++ weixin_mp/__init__.py | 0 weixin_mp/requirements.txt | 1 + weixin_mp/run_weixin.sh | 8 + weixin_mp/weixin.py | 113 ++++++++ 15 files changed, 580 insertions(+), 27 deletions(-) delete mode 100644 core/custom_fetchings/README.md delete mode 100644 core/custom_fetchings/__init__.py create mode 100644 core/scrapers/README.md rename core/{custom_fetchings => scrapers}/README_EN.md (100%) create mode 100644 core/scrapers/__init__.py rename core/{utils => scrapers}/deep_scraper.py (100%) create mode 100644 core/scrapers/mp_scraper.py create mode 100644 core/scrapers/scraper_data.py create mode 100644 test/read_markdown.py create mode 100644 weixin_mp/__init__.py create mode 100644 weixin_mp/requirements.txt create mode 100644 weixin_mp/run_weixin.sh create mode 100644 weixin_mp/weixin.py diff --git a/core/custom_fetchings/README.md b/core/custom_fetchings/README.md deleted file mode 100644 index 610c340..0000000 --- a/core/custom_fetchings/README.md +++ /dev/null @@ -1,19 +0,0 @@ -## 自定义解析器的注册 - -在 `core/scrapers/__init__.py` 中注册,参考: - -```python -from .mp import mp_scarper - -customer_scrapers = {'mp.weixin.qq.com': mp_scarper} -``` - -注意键使用域名,可以使用 `urllib.parse` 获取: - - -```python -from urllib.parse import urlparse - -parsed_url = urlparse("site's url") -domain = parsed_url.netloc -``` \ No newline at end of file diff --git a/core/custom_fetchings/__init__.py b/core/custom_fetchings/__init__.py deleted file mode 100644 index 038c723..0000000 --- a/core/custom_fetchings/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# from .xxx import xx_scraper -# from .xxx import xx_config - -custom_scrapers = {} -crawl4ai_custom_configs = {} \ No newline at end of file diff --git a/core/requirements.txt b/core/requirements.txt index 4419a8a..9523478 100644 --- a/core/requirements.txt +++ b/core/requirements.txt @@ -5,4 +5,4 @@ pydantic #json_repair==0.* beautifulsoup4 requests -crawl4ai==0.4.245 \ No newline at end of file +crawl4ai==0.4.247 \ No newline at end of file diff --git a/core/scrapers/README.md b/core/scrapers/README.md new file mode 100644 index 0000000..ac5dd32 --- /dev/null +++ b/core/scrapers/README.md @@ -0,0 +1,76 @@ +## 配置自定义 Crawl4ai 抓取 config + +如果信源需要对应特殊的抓取配置,可以在 `core/scrapers/__init__.py` 中编辑对应的 crawler_config,并在 `custom_fetching_configs` 中注册。 + +## 自定义解析器 + +### 解析器 + +对于从网页内容中提取关注信息这一任务而言,直接把 html 编码送给 llm 并不是一个好主意。在该类型任务中,我们期待 llm 表现的类似人类,侧重点在于内容的理解,而不是 html 的解析。且不说直接送入 html 编码还会造成额外(非常大量)的 token 消耗和处理效率的降低。 + +将 html 转为易于意思理解的 markdown 是目前领域内比较通用的做法,这方面 Crawl4ai 提供了比较成熟的解决方案。 + +然而这指的是一般情况,天下没有万能的招数,对于某些特定的信源,Crawl4ai 的默认解析器并不能很好的工作,比如微信公众号文章,这个时候我们就需要为信源自定义解析器。 + +简单的说,解析器的作用就是将 html 编码转为 markdown 文本,并在这个过程中尽量过滤不必要信息(因为后一步是通过 llm 进行提炼,所以这一步要求不高),但也尽可能的保留 html 版面布局信息(这很重要)。 + +### deep_scraper + +我们进一步发现,直接将 markdown 全文送入 llm 解析也存在缺陷。 + +我在这里仅举一个例子: + +*很多网站喜欢在文章页面底部或者侧边栏加入推荐阅读板块,如果说这些推荐阅读只是链接列表还好,但事实上,很多时候他们还包括内容简介,这些简介的长度并不短,甚至有可能跟页面主体正文长度相当。这个时候如果我们将 markdown 整体扔给 llm,就会发现很难为llm 指定明确的工作策略——如果直接舍弃这些推荐阅读内容(先不说很难指定清晰的舍弃策略),但我们不能保证这里面不包含关注点内容;而如果保留这些内容,那么很可能 llm 就无法聚焦该页面的核心内容。或者 llm 会从这些简介中进行信息提取,但是这些简介对应额外的链接,这些后续的链接也会在后面进行爬取,这就可能带来提取出大量重复信息的情况。* + +事实上,这里我们需要做的工作是分块,这有点类似 RAG 系统中的 chunk ,但不同的是,这里我们不需要考虑 chunk 的粒度,而是需要考虑页面布局的粒度。因为我们面对的是 html 页面,而不是 pdf、word…… + +这一点很重要,我们需要按 html 的页面布局进行分块,而不是按语义逻辑分块!因为这影响了后续我们如何判断对不同的块采用合同提取策略。这也就是 wiseflow 为何不使用已有的文档智能工具,而是自写了 deep_scraper 的原因。 + +当然,另一个选择是直接使用视觉大模型进行 layout 的识别,但实践中我们也发现,这需要能够获取不受干扰的网页截图,但这个操作会极大增加系统复杂度以及降低处理速度,且效果并不稳定(比如对于页面弹窗的处理……)。 + +另一个不使用文档智能和视觉大模型的原因,是因为相比于 pdf、word 这种完全的非结构数据, html 编码本身就已经包含了全部 layout 信息,转化为 markdown 的过程实际上也保留了这些信息(通过\n # 这些符号),所以直接通过一定的规则对 markdown 进行分块并分别处理是可行的。 + +这就是 wiseflow deep_scraper 的主要功能,归纳起来:1、按布局信息对markdown进行分块;2、分析每个块的类型,并按不同策略进行预处理,便于最终 llm 的提取。 + +### 注册自定义解析器 + +wiseflow 的默认工作流程是: + +*crawl4ai 获取 html,并初步转化为raw_markdown(此过程应用默认的 config) --> deep_scraper 进行分块处理 --> 分块后的内容 送入 llm 进行信息提取。* + +如前所言,如果需要为特定信源配置特殊的 crawl4ai 获取策略(包括 raw_markdown 的转化策略),可以在 `core/scrapers/__init__.py` 中注册自定义的crawler_config; + +同时也可以为特定信源配置自定义的 scraper,自定义 scraper 的输入为crawl4ai的fetching_result,输出为将要被送入 llm 进行分析的链接字典和文本块列表。使用自定义 scraper 时,wiseflow 的处理流程为: + +*crawl4ai 获取 html,并初步转化为raw_markdown(此过程应用默认的 config或指定 config) --> 自定义 scraper 进行分块处理 --> 分块后的内容 送入 llm 进行信息提取。* + +自定义 scraper 可以内部调用deep_scraper作为后处理流程(如mp_scraper),也可以完全自定义全部流程。 + +scraper 输入的 fetch_result 为一个 dict,格式如下: + + + + + + + + +输出为 ScraperResultData,包含 url、content、links、images 四个字段。 + +在 `core/scrapers/__init__.py` 中注册,参考: + +```python +from .mp import mp_scarper + +customer_scrapers = {'mp.weixin.qq.com': mp_scarper} +``` + +注意键使用域名,可以使用 `urllib.parse` 获取: + + +```python +from urllib.parse import urlparse + +parsed_url = urlparse("site's url") +domain = parsed_url.netloc +``` \ No newline at end of file diff --git a/core/custom_fetchings/README_EN.md b/core/scrapers/README_EN.md similarity index 100% rename from core/custom_fetchings/README_EN.md rename to core/scrapers/README_EN.md diff --git a/core/scrapers/__init__.py b/core/scrapers/__init__.py new file mode 100644 index 0000000..307c463 --- /dev/null +++ b/core/scrapers/__init__.py @@ -0,0 +1,9 @@ +from crawl4ai import CrawlerRunConfig +# from .xxx import xx_config + +custom_scrapers = {} +custom_fetching_configs = {} + + +crawler_config = CrawlerRunConfig(delay_before_return_html=2.0, markdown_generator=md_generator, + wait_until='commit', magic=True, scan_full_page=True) diff --git a/core/utils/deep_scraper.py b/core/scrapers/deep_scraper.py similarity index 100% rename from core/utils/deep_scraper.py rename to core/scrapers/deep_scraper.py diff --git a/core/scrapers/mp_scraper.py b/core/scrapers/mp_scraper.py new file mode 100644 index 0000000..6d696d0 --- /dev/null +++ b/core/scrapers/mp_scraper.py @@ -0,0 +1,258 @@ +from bs4 import BeautifulSoup +import re +from .scraper_data import ScraperResultData + +# 定义所有可能包含文本的块级和内联元素 +text_elements = { + # 块级元素 + 'div', 'section', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', + # 内联文本元素 + 'span', 'em', 'strong' +} + + +def mp_scraper(fetch_result: dict) -> ScraperResultData: + url = fetch_result['url'] + raw_html = fetch_result['html'] + cleaned_html = fetch_result['cleaned_html'] + + content = '' + links = {} + images = [] + + if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'): + # album page type + soup = BeautifulSoup(raw_html, 'html.parser') + for li in soup.find_all('li', class_='album__list-item'): + u_text = li.get_text(strip=True) + u_title = li.attrs['data-title'].strip() + _url = li.attrs['data-link'].replace("http://", "https://", 1) + if not _url or _url.startswith('javas') or _url.startswith('#') or _url.startswith('mailto:') or _url.startswith('data:') or _url.startswith('about:blank'): + continue + + cut_off_point = _url.find('chksm=') + if cut_off_point != -1: + _url = _url[:cut_off_point - 1] + + if u_title in u_text: + description = u_text + else: + description = f'{u_title}-{u_text}' + if _url and description: + if _url not in links: + links[_url] = description + else: + links[_url] = f'{links[_url]}|{description}' + return ScraperResultData(url=url, content=content, links=links, images=images) + + def process_content(content_div): + # 3.1 处理所有 元素 + for img in content_div.find_all('img', attrs={'data-src': True}, recursive=True): + src = img.get('data-src') + if not src or src.startswith('#') or src.startswith('about:blank'): + src = None + text = img.get('alt', '').strip() + if not src: + img.replace_with(text) + continue + images.append(src) + # find all area urls related to this img + area_urls = set() + if img.get('usemap'): + # remove the # at the beginning of the map name + map_name = img.get('usemap').lstrip('#') + # find the map tag + map_tag = content_div.find('map', {'name': map_name}) + if map_tag: + # get all area tags under the map + for area in map_tag.find_all('area', href=True): + area_href = area.get('href') + area_urls.add(area_href) + area.decompose() + # delete the whole map tag + map_tag.decompose() + area_urls = ')[]('.join(area_urls) + replacement_text = f'![{text}]({src})[]({area_urls})' if area_urls else f'![{text}]({src})' + img.replace_with(replacement_text) + + for media in content_div.find_all(['video', 'audio', 'source', 'embed', 'iframe', 'figure'], src=True, recursive=True): + src = media.get('src') + if not src or src.startswith('javascript:') or src.startswith('#') or src.startswith('mailto:') or src.startswith('data:') or src.startswith('about:blank'): + src = None + text = media.get_text().strip() or media.get('alt', '').strip() + if src: + media.replace_with(f"[{text}]({src})") + else: + media.decompose() + + for obj in content_div.find_all('object', data=True, recursive=True): + data = obj.get('data') + if not data or data.startswith('javascript:') or data.startswith('#') or data.startswith('mailto:') or data.startswith('data:') or data.startswith('about:blank'): + data = None + text = obj.get_text().strip() or obj.get('alt', '').strip() + if data: + obj.replace_with(f"[{text}]({data})") + else: + obj.decompose() + + # process links at last, so that we can keep the image and media info in the link + for a in content_div.find_all('a', href=True, recursive=True): + href = a.get('href') + if not href or href.startswith('javascript:') or href.startswith('#') or href.startswith('about:blank'): + href = None + text = a.get_text().strip() + if href: + a.replace_with(f"[{text}]({href})") + else: + a.decompose() + + # handle lists + for list_tag in content_div.find_all(['ul', 'ol'], recursive=True): + list_text = [] + for idx, item in enumerate(list_tag.find_all('li')): + list_text.append(f"{idx + 1}. {item.get_text().strip()}") + list_text = '\t'.join(list_text) + list_tag.replace_with(f"{list_text}\n") + + # handle strikethrough text + for del_tag in content_div.find_all(['del', 's'], recursive=True): + del_text = del_tag.get_text().strip() + if del_text: + del_tag.replace_with(f"{del_text}(maybe_outdated)") + else: + del_tag.decompose() + + # handle tables + for table in content_div.find_all('table', recursive=True): + table_text = [] + + # handle caption + caption = table.find('caption') + if caption: + table_text.append(caption.get_text().strip()) + + # get headers + headers = [] + for th in table.find_all('th'): + headers.append(th.get_text().strip()) + + # handle all rows (including tbody and tfoot) + for row in table.find_all('tr'): + # get the first cell value + # try to find th as first_val + first_cell = row.find(['th', 'td']) + if not first_cell: + continue + first_val = first_cell.get_text().strip() + cells = row.find_all('td') + if not cells: + continue + # handle remaining cells + for idx, cell in enumerate(cells): + cell_text = cell.get_text().strip() + if not cell_text or cell_text == first_val: + continue + + header_text = headers[idx] if idx < len(headers) else '' + cell_str = f"{first_val}-{header_text}-{cell_text}" + table_text.append(cell_str) + + # replace the table with the processed text + table_text = '\n'.join(table_text) + table.replace_with(f"\n{table_text}\n") + + # 3.3 按照子元素获取文本内容,统一换行 + content_parts = [] + for element in content_div.children: + if element.name in ['br', 'br/', 'br /', 'hr', 'hr/', 'hr /', 'wbr']: + content_parts.append('\n') + elif element.name in text_elements: + text = element.get_text(strip=True) + if text: + content_parts.append(text) + # 只在块级元素后添加换行符 + if element.name in {'div', 'section', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'}: + content_parts.append('\n') + elif isinstance(element, str): + text = element.strip() + if text: + content_parts.append(text) + + return '\n'.join(content_parts).strip() + + soup = BeautifulSoup(cleaned_html, 'html.parser') + + # 1. 查找第一个包含

元素的 div 块,提取 title + h1_tag = soup.find('h1') + if h1_tag: + h1_div = h1_tag.parent + title = h1_tag.get_text(strip=True) + else: + # 如果找不到的话 说明是已删除或者分享页 + soup = BeautifulSoup(raw_html, 'html.parser') + # 从 original_panel_tool 中找到 data-url + share_source = soup.find('span', id='js_share_source') + if share_source and share_source.get('data-url'): + data_url = share_source['data-url'] + # 替换 http 为 https + data_url = data_url.replace('http://', 'https://', 1) + if not data_url or not data_url.startswith('https://mp.weixin.qq.com'): + return ScraperResultData(url=url, content=content, links=links, images=images) + # 从 js_content 中获取描述文本 + content_div = soup.find('div', id='js_content') + if not content_div: + return ScraperResultData(url=url, content=content, links=links, images=images) + des = content_div.get_text(strip=True) + return ScraperResultData(url=url, content=content, links={data_url: des}, images=images) + else: + return ScraperResultData(url=url, content=content, links=links, images=images) + + # 2. 判断这个子块下面包含几个非空 div 子块 + sub_divs = [div for div in h1_div.find_all('div', recursive=False) if len(div.contents) > 0] + num_sub_divs = len(sub_divs) + + if num_sub_divs == 1: + # 2.1 如果只包含一个子块 + strong_tag = sub_divs[0].find('strong') + if strong_tag: + author = strong_tag.get_text(strip=True) + # 查找包含日期和时间的span标签 + date_span = sub_divs[0].find('span', string=re.compile(r'\d{4}年\d{2}月\d{2}日\s+\d{2}:\d{2}')) + # 如果找到日期,只提取日期部分 + if date_span: + publish_date = date_span.get_text(strip=True).split()[0] # 只取日期部分 + else: + publish_date = None + # 提取与包含

元素的 div 块平级的紧挨着的下一个 div 块作为 content + content_div = h1_div.find_next_sibling('div') + content = title + '\n\n' + process_content(content_div) + else: + author = None + publish_date = None + content = fetch_result['markdown'] + + elif num_sub_divs >= 2: + # 2.2 如果包含两个及以上子块 + a_tag = sub_divs[0].find('a', href="javascript:void(0);") + if a_tag: + author = a_tag.get_text(strip=True) + # 查找下一个包含日期时间的em标签 + date_em = sub_divs[0].find('em', string=re.compile(r'\d{4}年\d{2}月\d{2}日\s+\d{2}:\d{2}')) + if date_em: + # 只提取日期部分 + publish_date = date_em.get_text(strip=True).split()[0] + else: + publish_date = None + else: + author = None + publish_date = None + # 剩下的 div 子块合起来作为 content + content_divs = sub_divs[1:] + content = '\n\n'.join([process_content(div) for div in content_divs]) + content = title + '\n\n' + content + else: + author = None + publish_date = None + content = title + + return ScraperResultData(url=url, content=content, links=links, images=images, author=author, publish_date=publish_date, title=title) diff --git a/core/scrapers/scraper_data.py b/core/scrapers/scraper_data.py new file mode 100644 index 0000000..0284b24 --- /dev/null +++ b/core/scrapers/scraper_data.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass +from typing import List, Dict, Optional +from datetime import datetime + +@dataclass +class ScraperResultData: + """用于存储网页抓取数据的数据类""" + url: str + content: Optional[str] = None + links: Optional[Dict[str, str]] = None + images: Optional[List[str]] = None + author: Optional[str] = None + publish_date: Optional[str] = None + title: Optional[str] = None + base: Optional[str] = None + + def __post_init__(self): + # 验证 url 是否存在且为字符串类型 + if not isinstance(self.url, str) or not self.url.strip(): + raise ValueError("URL 必须是非空字符串") + + # 初始化可选字段 + if self.images is None: + self.images = [] + + if self.links is None: + self.links = {} + + # 确保 publish_date 是字符串格式 + if self.publish_date is not None: + if isinstance(self.publish_date, datetime): + self.publish_date = self.publish_date.isoformat() + elif not isinstance(self.publish_date, str): + self.publish_date = str(self.publish_date) + + # 确保 images 是列表类型 + if self.images is not None: + if not isinstance(self.images, list): + raise ValueError("images 必须是列表类型") diff --git a/test/deep_scraper_test.py b/test/deep_scraper_test.py index 4ffa0ce..8d892b0 100644 --- a/test/deep_scraper_test.py +++ b/test/deep_scraper_test.py @@ -7,6 +7,7 @@ project_root = os.path.dirname(current_dir) # 获取父目录 sys.path.append(project_root) from core.utils.deep_scraper import deep_scraper, common_chars +from core.custom_scrapers.mp_scraper import mp_scraper def check_url_text(text): print(f"processing: {text}") @@ -89,8 +90,21 @@ if __name__ == '__main__': with open(file, 'r') as f: html_sample = json.load(f) _url = html_sample['url'] - raw_markdown = html_sample['markdown'] - used_img = {d['src']: d['alt'] for d in html_sample['media']['images']} + if _url.startswith('https://mp.weixin.qq.com'): + result = mp_scraper(html_sample) + print(f'url: {result.url}') + print(f'content: {result.content}') + print(f'links: {result.links}') + print(f'author: {result.author}') + print(f'publish_date: {result.publish_date}') + print(f'images: {len(result.images)}') + for img in result.images: + print(img) + raw_markdown = result.content + used_img = result.images + else: + raw_markdown = html_sample['markdown'] + used_img = {d['src']: d['alt'] for d in html_sample['media']['images']} except Exception as e: print('sample format error, try to use craw4ai_fething.py to get sample') print(f"error: {e}") diff --git a/test/read_markdown.py b/test/read_markdown.py new file mode 100644 index 0000000..09821de --- /dev/null +++ b/test/read_markdown.py @@ -0,0 +1,59 @@ +import os +import json +import re + +def read_markdown_from_json_files(directory_path): + # Get all JSON files in the directory + json_files = [f for f in os.listdir(directory_path) if f.endswith('.json')] + img_pattern = r'!\[(.*?)\]\((.*?)\)' + link_pattern = r'\[(.*?)\]\((.*?)\)' + + # Process each JSON file + for json_file in sorted(json_files): + file_path = os.path.join(directory_path, json_file) + print(f"\nProcessing file: {json_file}") + print("-" * 50) + + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + markdown = data.get('markdown') + # Find the longest consecutive newlines in the markdown text + if markdown: + # Find all sequences of newlines and get their lengths + max_newlines = max(len(match) for match in re.findall(r'\n+', markdown)) if re.search(r'\n+', markdown) else 0 + print(f"Longest consecutive newlines: {max_newlines}") + if max_newlines < 2: + sections = [markdown] + else: + sections = markdown.split('\n' * max_newlines) + + for i, section in enumerate(sections): + print(f"Section {i + 1}:") + print(section) + print('\n\n') + newline_count = section.count('\n') + # 处理图片标记 ![alt](src) + img_pattern = r'!\[(.*?)\]\((.*?)\)' + matches = re.findall(img_pattern, section) + for alt, src in matches: + # 替换为新格式 §alt||src§ + section = section.replace(f'![{alt}]({src})', f'§{alt}||{src}§') + # 处理链接标记 [text](url) + matches = re.findall(link_pattern, section) + # 从text中去掉所有matches部分 + for link_text, link_url in matches: + section = section.replace(f'[{link_text}]({link_url})', '') + + if len(section) == 0: + print("no text in section") + continue + print(f"newline/text ratio: {newline_count/len(section)*100}") + print(f"links/section ratio: {len(matches)/len(section)*100}") + print("-" * 50) + + +if __name__ == "__main__": + # Path to the webpage_samples directory + samples_dir = os.path.dirname(os.path.abspath(__file__)) + "/webpage_samples" + read_markdown_from_json_files(samples_dir) diff --git a/weixin_mp/__init__.py b/weixin_mp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/weixin_mp/requirements.txt b/weixin_mp/requirements.txt new file mode 100644 index 0000000..7a38911 --- /dev/null +++ b/weixin_mp/requirements.txt @@ -0,0 +1 @@ +websockets \ No newline at end of file diff --git a/weixin_mp/run_weixin.sh b/weixin_mp/run_weixin.sh new file mode 100644 index 0000000..70495ea --- /dev/null +++ b/weixin_mp/run_weixin.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +# export CONFIGS='avatars' +# export WX_BOT_ENDPOINT='127.0.0.1:8066' +# export MAIN_SERVICE_ENDPOINT='http://127.0.0.1:7777/' +# export VERBOSE=True + +python weixin.py \ No newline at end of file diff --git a/weixin_mp/weixin.py b/weixin_mp/weixin.py new file mode 100644 index 0000000..a6df3d7 --- /dev/null +++ b/weixin_mp/weixin.py @@ -0,0 +1,113 @@ +import websockets +import json +import re +import httpx +import asyncio +import os, sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) # 获取父目录 +sys.path.append(project_root) + +from core.general_process import main_process, wiseflow_logger +from typing import Optional +import logging + +logging.getLogger("httpx").setLevel(logging.WARNING) + +# 千万注意扫码登录时不要选择“同步历史消息”,否则会造成 bot 上来挨个回复历史消息 +# 先检查下 wx 的登录状态,同时获取已登录微信的 wxid +WX_BOT_ENDPOINT = os.environ.get('WX_BOT_ENDPOINT', '127.0.0.1:8066') +wx_url = f"http://{WX_BOT_ENDPOINT}/api/" +try: + # 发送GET请求 + response = httpx.get(f"{wx_url}checklogin") + response.raise_for_status() # 检查HTTP响应状态码是否为200 + + # 解析JSON响应 + data = response.json() + + # 检查status字段 + if data['data']['status'] == 1: + # 记录wxid + self_wxid = data['data']['wxid'] + wiseflow_logger.info(f"已登录微信账号: {self_wxid}") + else: + # 抛出异常 + wiseflow_logger.error("未检测到任何登录信息,将退出") + raise ValueError("登录失败,status不为1") +except Exception as e: + wiseflow_logger.error(f"无法链接微信端点:{wx_url}, 错误:\n{e}") + raise ValueError("登录失败,无法连接") + +# 获取登录微信昵称,用于后面判断是否@自己的消息 +response = httpx.get(f"{wx_url}userinfo") +response.raise_for_status() # 检查HTTP响应状态码是否为200 +# 解析JSON响应 +data = response.json() +self_nickname = data['data'].get('nickname', " ") +wiseflow_logger.info(f"self_nickname: {self_nickname}") + +# 如果要选定只监控部分公众号,请在同一文件夹内创建 config.json 文件,内容为要监控的公众号列表 +# 注意这里要写公众号的原始id,即 gh_ 开头的id, 可以通过历史 logger 获取 +config_file = 'config.json' +if not os.path.exists(config_file): + config = None +else: + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + + +#如下 pattern 仅适用于public msg的解析,群内分享的公号文章不在此列 +# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg +item_pattern = re.compile(r'(.*?)', re.DOTALL) +url_pattern = re.compile(r'') + +async def get_public_msg(websocket_uri): + reconnect_attempts = 0 + max_reconnect_attempts = 3 + while True: + try: + async with websockets.connect(websocket_uri, max_size=10 * 1024 * 1024) as websocket: + while True: + response = await websocket.recv() + datas = json.loads(response) + + for data in datas["data"]: + if "StrTalker" not in data or "Content" not in data: + wiseflow_logger.warning(f"invalid data:\n{data}") + continue + user_id = data["StrTalker"] + + items = item_pattern.findall(data["Content"]) + # Iterate through all < item > content, extracting < url > and < summary > + todo_urls = set() + for item in items: + url_match = url_pattern.search(item) + url = url_match.group(1) if url_match else None + if not url: + wiseflow_logger.warning(f"can not find url in \n{item}") + continue + # URL processing, http is replaced by https, and the part after chksm is removed. + url = url.replace('http://', 'https://') + cut_off_point = url.find('chksm=') + if cut_off_point != -1: + url = url[:cut_off_point - 1] + # summary_match = summary_pattern.search(item) + # addition = summary_match.group(1) if summary_match else None + todo_urls.add(url) + await main_process(todo_urls) + except websockets.exceptions.ConnectionClosedError as e: + wiseflow_logger.error(f"Connection closed with exception: {e}") + reconnect_attempts += 1 + if reconnect_attempts <= max_reconnect_attempts: + wiseflow_logger.info(f"Reconnecting attempt {reconnect_attempts}...") + await asyncio.sleep(1) + else: + wiseflow_logger.error("Max reconnect attempts reached. Exiting.") + break + except Exception as e: + wiseflow_logger.error(f"PublicMsgHandler error: {e}") + +uri_public = f"ws://{WX_BOT_ENDPOINT}/ws/publicMsg" +asyncio.run(get_public_msg(uri_public))