diff --git a/core/custom_fetchings/README.md b/core/custom_fetchings/README.md
deleted file mode 100644
index 610c340..0000000
--- a/core/custom_fetchings/README.md
+++ /dev/null
@@ -1,19 +0,0 @@
-## 自定义解析器的注册
-
-在 `core/scrapers/__init__.py` 中注册,参考:
-
-```python
-from .mp import mp_scarper
-
-customer_scrapers = {'mp.weixin.qq.com': mp_scarper}
-```
-
-注意键使用域名,可以使用 `urllib.parse` 获取:
-
-
-```python
-from urllib.parse import urlparse
-
-parsed_url = urlparse("site's url")
-domain = parsed_url.netloc
-```
\ No newline at end of file
diff --git a/core/custom_fetchings/__init__.py b/core/custom_fetchings/__init__.py
deleted file mode 100644
index 038c723..0000000
--- a/core/custom_fetchings/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-# from .xxx import xx_scraper
-# from .xxx import xx_config
-
-custom_scrapers = {}
-crawl4ai_custom_configs = {}
\ No newline at end of file
diff --git a/core/requirements.txt b/core/requirements.txt
index 4419a8a..9523478 100644
--- a/core/requirements.txt
+++ b/core/requirements.txt
@@ -5,4 +5,4 @@ pydantic
#json_repair==0.*
beautifulsoup4
requests
-crawl4ai==0.4.245
\ No newline at end of file
+crawl4ai==0.4.247
\ No newline at end of file
diff --git a/core/scrapers/README.md b/core/scrapers/README.md
new file mode 100644
index 0000000..ac5dd32
--- /dev/null
+++ b/core/scrapers/README.md
@@ -0,0 +1,76 @@
+## 配置自定义 Crawl4ai 抓取 config
+
+如果信源需要对应特殊的抓取配置,可以在 `core/scrapers/__init__.py` 中编辑对应的 crawler_config,并在 `custom_fetching_configs` 中注册。
+
+## 自定义解析器
+
+### 解析器
+
+对于从网页内容中提取关注信息这一任务而言,直接把 html 编码送给 llm 并不是一个好主意。在该类型任务中,我们期待 llm 表现的类似人类,侧重点在于内容的理解,而不是 html 的解析。且不说直接送入 html 编码还会造成额外(非常大量)的 token 消耗和处理效率的降低。
+
+将 html 转为易于意思理解的 markdown 是目前领域内比较通用的做法,这方面 Crawl4ai 提供了比较成熟的解决方案。
+
+然而这指的是一般情况,天下没有万能的招数,对于某些特定的信源,Crawl4ai 的默认解析器并不能很好的工作,比如微信公众号文章,这个时候我们就需要为信源自定义解析器。
+
+简单的说,解析器的作用就是将 html 编码转为 markdown 文本,并在这个过程中尽量过滤不必要信息(因为后一步是通过 llm 进行提炼,所以这一步要求不高),但也尽可能的保留 html 版面布局信息(这很重要)。
+
+### deep_scraper
+
+我们进一步发现,直接将 markdown 全文送入 llm 解析也存在缺陷。
+
+我在这里仅举一个例子:
+
+*很多网站喜欢在文章页面底部或者侧边栏加入推荐阅读板块,如果说这些推荐阅读只是链接列表还好,但事实上,很多时候他们还包括内容简介,这些简介的长度并不短,甚至有可能跟页面主体正文长度相当。这个时候如果我们将 markdown 整体扔给 llm,就会发现很难为llm 指定明确的工作策略——如果直接舍弃这些推荐阅读内容(先不说很难指定清晰的舍弃策略),但我们不能保证这里面不包含关注点内容;而如果保留这些内容,那么很可能 llm 就无法聚焦该页面的核心内容。或者 llm 会从这些简介中进行信息提取,但是这些简介对应额外的链接,这些后续的链接也会在后面进行爬取,这就可能带来提取出大量重复信息的情况。*
+
+事实上,这里我们需要做的工作是分块,这有点类似 RAG 系统中的 chunk ,但不同的是,这里我们不需要考虑 chunk 的粒度,而是需要考虑页面布局的粒度。因为我们面对的是 html 页面,而不是 pdf、word……
+
+这一点很重要,我们需要按 html 的页面布局进行分块,而不是按语义逻辑分块!因为这影响了后续我们如何判断对不同的块采用合同提取策略。这也就是 wiseflow 为何不使用已有的文档智能工具,而是自写了 deep_scraper 的原因。
+
+当然,另一个选择是直接使用视觉大模型进行 layout 的识别,但实践中我们也发现,这需要能够获取不受干扰的网页截图,但这个操作会极大增加系统复杂度以及降低处理速度,且效果并不稳定(比如对于页面弹窗的处理……)。
+
+另一个不使用文档智能和视觉大模型的原因,是因为相比于 pdf、word 这种完全的非结构数据, html 编码本身就已经包含了全部 layout 信息,转化为 markdown 的过程实际上也保留了这些信息(通过\n # 这些符号),所以直接通过一定的规则对 markdown 进行分块并分别处理是可行的。
+
+这就是 wiseflow deep_scraper 的主要功能,归纳起来:1、按布局信息对markdown进行分块;2、分析每个块的类型,并按不同策略进行预处理,便于最终 llm 的提取。
+
+### 注册自定义解析器
+
+wiseflow 的默认工作流程是:
+
+*crawl4ai 获取 html,并初步转化为raw_markdown(此过程应用默认的 config) --> deep_scraper 进行分块处理 --> 分块后的内容 送入 llm 进行信息提取。*
+
+如前所言,如果需要为特定信源配置特殊的 crawl4ai 获取策略(包括 raw_markdown 的转化策略),可以在 `core/scrapers/__init__.py` 中注册自定义的crawler_config;
+
+同时也可以为特定信源配置自定义的 scraper,自定义 scraper 的输入为crawl4ai的fetching_result,输出为将要被送入 llm 进行分析的链接字典和文本块列表。使用自定义 scraper 时,wiseflow 的处理流程为:
+
+*crawl4ai 获取 html,并初步转化为raw_markdown(此过程应用默认的 config或指定 config) --> 自定义 scraper 进行分块处理 --> 分块后的内容 送入 llm 进行信息提取。*
+
+自定义 scraper 可以内部调用deep_scraper作为后处理流程(如mp_scraper),也可以完全自定义全部流程。
+
+scraper 输入的 fetch_result 为一个 dict,格式如下:
+
+
+
+
+
+
+
+
+输出为 ScraperResultData,包含 url、content、links、images 四个字段。
+
+在 `core/scrapers/__init__.py` 中注册,参考:
+
+```python
+from .mp import mp_scarper
+
+customer_scrapers = {'mp.weixin.qq.com': mp_scarper}
+```
+
+注意键使用域名,可以使用 `urllib.parse` 获取:
+
+
+```python
+from urllib.parse import urlparse
+
+parsed_url = urlparse("site's url")
+domain = parsed_url.netloc
+```
\ No newline at end of file
diff --git a/core/custom_fetchings/README_EN.md b/core/scrapers/README_EN.md
similarity index 100%
rename from core/custom_fetchings/README_EN.md
rename to core/scrapers/README_EN.md
diff --git a/core/scrapers/__init__.py b/core/scrapers/__init__.py
new file mode 100644
index 0000000..307c463
--- /dev/null
+++ b/core/scrapers/__init__.py
@@ -0,0 +1,9 @@
+from crawl4ai import CrawlerRunConfig
+# from .xxx import xx_config
+
+custom_scrapers = {}
+custom_fetching_configs = {}
+
+
+crawler_config = CrawlerRunConfig(delay_before_return_html=2.0, markdown_generator=md_generator,
+ wait_until='commit', magic=True, scan_full_page=True)
diff --git a/core/utils/deep_scraper.py b/core/scrapers/deep_scraper.py
similarity index 100%
rename from core/utils/deep_scraper.py
rename to core/scrapers/deep_scraper.py
diff --git a/core/scrapers/mp_scraper.py b/core/scrapers/mp_scraper.py
new file mode 100644
index 0000000..6d696d0
--- /dev/null
+++ b/core/scrapers/mp_scraper.py
@@ -0,0 +1,258 @@
+from bs4 import BeautifulSoup
+import re
+from .scraper_data import ScraperResultData
+
+# 定义所有可能包含文本的块级和内联元素
+text_elements = {
+ # 块级元素
+ 'div', 'section', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
+ # 内联文本元素
+ 'span', 'em', 'strong'
+}
+
+
+def mp_scraper(fetch_result: dict) -> ScraperResultData:
+ url = fetch_result['url']
+ raw_html = fetch_result['html']
+ cleaned_html = fetch_result['cleaned_html']
+
+ content = ''
+ links = {}
+ images = []
+
+ if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
+ # album page type
+ soup = BeautifulSoup(raw_html, 'html.parser')
+ for li in soup.find_all('li', class_='album__list-item'):
+ u_text = li.get_text(strip=True)
+ u_title = li.attrs['data-title'].strip()
+ _url = li.attrs['data-link'].replace("http://", "https://", 1)
+ if not _url or _url.startswith('javas') or _url.startswith('#') or _url.startswith('mailto:') or _url.startswith('data:') or _url.startswith('about:blank'):
+ continue
+
+ cut_off_point = _url.find('chksm=')
+ if cut_off_point != -1:
+ _url = _url[:cut_off_point - 1]
+
+ if u_title in u_text:
+ description = u_text
+ else:
+ description = f'{u_title}-{u_text}'
+ if _url and description:
+ if _url not in links:
+ links[_url] = description
+ else:
+ links[_url] = f'{links[_url]}|{description}'
+ return ScraperResultData(url=url, content=content, links=links, images=images)
+
+ def process_content(content_div):
+ # 3.1 处理所有 元素
+ for img in content_div.find_all('img', attrs={'data-src': True}, recursive=True):
+ src = img.get('data-src')
+ if not src or src.startswith('#') or src.startswith('about:blank'):
+ src = None
+ text = img.get('alt', '').strip()
+ if not src:
+ img.replace_with(text)
+ continue
+ images.append(src)
+ # find all area urls related to this img
+ area_urls = set()
+ if img.get('usemap'):
+ # remove the # at the beginning of the map name
+ map_name = img.get('usemap').lstrip('#')
+ # find the map tag
+ map_tag = content_div.find('map', {'name': map_name})
+ if map_tag:
+ # get all area tags under the map
+ for area in map_tag.find_all('area', href=True):
+ area_href = area.get('href')
+ area_urls.add(area_href)
+ area.decompose()
+ # delete the whole map tag
+ map_tag.decompose()
+ area_urls = ')[]('.join(area_urls)
+ replacement_text = f'![{text}]({src})[]({area_urls})' if area_urls else f'![{text}]({src})'
+ img.replace_with(replacement_text)
+
+ for media in content_div.find_all(['video', 'audio', 'source', 'embed', 'iframe', 'figure'], src=True, recursive=True):
+ src = media.get('src')
+ if not src or src.startswith('javascript:') or src.startswith('#') or src.startswith('mailto:') or src.startswith('data:') or src.startswith('about:blank'):
+ src = None
+ text = media.get_text().strip() or media.get('alt', '').strip()
+ if src:
+ media.replace_with(f"[{text}]({src})")
+ else:
+ media.decompose()
+
+ for obj in content_div.find_all('object', data=True, recursive=True):
+ data = obj.get('data')
+ if not data or data.startswith('javascript:') or data.startswith('#') or data.startswith('mailto:') or data.startswith('data:') or data.startswith('about:blank'):
+ data = None
+ text = obj.get_text().strip() or obj.get('alt', '').strip()
+ if data:
+ obj.replace_with(f"[{text}]({data})")
+ else:
+ obj.decompose()
+
+ # process links at last, so that we can keep the image and media info in the link
+ for a in content_div.find_all('a', href=True, recursive=True):
+ href = a.get('href')
+ if not href or href.startswith('javascript:') or href.startswith('#') or href.startswith('about:blank'):
+ href = None
+ text = a.get_text().strip()
+ if href:
+ a.replace_with(f"[{text}]({href})")
+ else:
+ a.decompose()
+
+ # handle lists
+ for list_tag in content_div.find_all(['ul', 'ol'], recursive=True):
+ list_text = []
+ for idx, item in enumerate(list_tag.find_all('li')):
+ list_text.append(f"{idx + 1}. {item.get_text().strip()}")
+ list_text = '\t'.join(list_text)
+ list_tag.replace_with(f"{list_text}\n")
+
+ # handle strikethrough text
+ for del_tag in content_div.find_all(['del', 's'], recursive=True):
+ del_text = del_tag.get_text().strip()
+ if del_text:
+ del_tag.replace_with(f"{del_text}(maybe_outdated)")
+ else:
+ del_tag.decompose()
+
+ # handle tables
+ for table in content_div.find_all('table', recursive=True):
+ table_text = []
+
+ # handle caption
+ caption = table.find('caption')
+ if caption:
+ table_text.append(caption.get_text().strip())
+
+ # get headers
+ headers = []
+ for th in table.find_all('th'):
+ headers.append(th.get_text().strip())
+
+ # handle all rows (including tbody and tfoot)
+ for row in table.find_all('tr'):
+ # get the first cell value
+ # try to find th as first_val
+ first_cell = row.find(['th', 'td'])
+ if not first_cell:
+ continue
+ first_val = first_cell.get_text().strip()
+ cells = row.find_all('td')
+ if not cells:
+ continue
+ # handle remaining cells
+ for idx, cell in enumerate(cells):
+ cell_text = cell.get_text().strip()
+ if not cell_text or cell_text == first_val:
+ continue
+
+ header_text = headers[idx] if idx < len(headers) else ''
+ cell_str = f"{first_val}-{header_text}-{cell_text}"
+ table_text.append(cell_str)
+
+ # replace the table with the processed text
+ table_text = '\n'.join(table_text)
+ table.replace_with(f"\n{table_text}\n")
+
+ # 3.3 按照子元素获取文本内容,统一换行
+ content_parts = []
+ for element in content_div.children:
+ if element.name in ['br', 'br/', 'br /', 'hr', 'hr/', 'hr /', 'wbr']:
+ content_parts.append('\n')
+ elif element.name in text_elements:
+ text = element.get_text(strip=True)
+ if text:
+ content_parts.append(text)
+ # 只在块级元素后添加换行符
+ if element.name in {'div', 'section', 'p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'}:
+ content_parts.append('\n')
+ elif isinstance(element, str):
+ text = element.strip()
+ if text:
+ content_parts.append(text)
+
+ return '\n'.join(content_parts).strip()
+
+ soup = BeautifulSoup(cleaned_html, 'html.parser')
+
+ # 1. 查找第一个包含
元素的 div 块,提取 title
+ h1_tag = soup.find('h1')
+ if h1_tag:
+ h1_div = h1_tag.parent
+ title = h1_tag.get_text(strip=True)
+ else:
+ # 如果找不到的话 说明是已删除或者分享页
+ soup = BeautifulSoup(raw_html, 'html.parser')
+ # 从 original_panel_tool 中找到 data-url
+ share_source = soup.find('span', id='js_share_source')
+ if share_source and share_source.get('data-url'):
+ data_url = share_source['data-url']
+ # 替换 http 为 https
+ data_url = data_url.replace('http://', 'https://', 1)
+ if not data_url or not data_url.startswith('https://mp.weixin.qq.com'):
+ return ScraperResultData(url=url, content=content, links=links, images=images)
+ # 从 js_content 中获取描述文本
+ content_div = soup.find('div', id='js_content')
+ if not content_div:
+ return ScraperResultData(url=url, content=content, links=links, images=images)
+ des = content_div.get_text(strip=True)
+ return ScraperResultData(url=url, content=content, links={data_url: des}, images=images)
+ else:
+ return ScraperResultData(url=url, content=content, links=links, images=images)
+
+ # 2. 判断这个子块下面包含几个非空 div 子块
+ sub_divs = [div for div in h1_div.find_all('div', recursive=False) if len(div.contents) > 0]
+ num_sub_divs = len(sub_divs)
+
+ if num_sub_divs == 1:
+ # 2.1 如果只包含一个子块
+ strong_tag = sub_divs[0].find('strong')
+ if strong_tag:
+ author = strong_tag.get_text(strip=True)
+ # 查找包含日期和时间的span标签
+ date_span = sub_divs[0].find('span', string=re.compile(r'\d{4}年\d{2}月\d{2}日\s+\d{2}:\d{2}'))
+ # 如果找到日期,只提取日期部分
+ if date_span:
+ publish_date = date_span.get_text(strip=True).split()[0] # 只取日期部分
+ else:
+ publish_date = None
+ # 提取与包含 元素的 div 块平级的紧挨着的下一个 div 块作为 content
+ content_div = h1_div.find_next_sibling('div')
+ content = title + '\n\n' + process_content(content_div)
+ else:
+ author = None
+ publish_date = None
+ content = fetch_result['markdown']
+
+ elif num_sub_divs >= 2:
+ # 2.2 如果包含两个及以上子块
+ a_tag = sub_divs[0].find('a', href="javascript:void(0);")
+ if a_tag:
+ author = a_tag.get_text(strip=True)
+ # 查找下一个包含日期时间的em标签
+ date_em = sub_divs[0].find('em', string=re.compile(r'\d{4}年\d{2}月\d{2}日\s+\d{2}:\d{2}'))
+ if date_em:
+ # 只提取日期部分
+ publish_date = date_em.get_text(strip=True).split()[0]
+ else:
+ publish_date = None
+ else:
+ author = None
+ publish_date = None
+ # 剩下的 div 子块合起来作为 content
+ content_divs = sub_divs[1:]
+ content = '\n\n'.join([process_content(div) for div in content_divs])
+ content = title + '\n\n' + content
+ else:
+ author = None
+ publish_date = None
+ content = title
+
+ return ScraperResultData(url=url, content=content, links=links, images=images, author=author, publish_date=publish_date, title=title)
diff --git a/core/scrapers/scraper_data.py b/core/scrapers/scraper_data.py
new file mode 100644
index 0000000..0284b24
--- /dev/null
+++ b/core/scrapers/scraper_data.py
@@ -0,0 +1,39 @@
+from dataclasses import dataclass
+from typing import List, Dict, Optional
+from datetime import datetime
+
+@dataclass
+class ScraperResultData:
+ """用于存储网页抓取数据的数据类"""
+ url: str
+ content: Optional[str] = None
+ links: Optional[Dict[str, str]] = None
+ images: Optional[List[str]] = None
+ author: Optional[str] = None
+ publish_date: Optional[str] = None
+ title: Optional[str] = None
+ base: Optional[str] = None
+
+ def __post_init__(self):
+ # 验证 url 是否存在且为字符串类型
+ if not isinstance(self.url, str) or not self.url.strip():
+ raise ValueError("URL 必须是非空字符串")
+
+ # 初始化可选字段
+ if self.images is None:
+ self.images = []
+
+ if self.links is None:
+ self.links = {}
+
+ # 确保 publish_date 是字符串格式
+ if self.publish_date is not None:
+ if isinstance(self.publish_date, datetime):
+ self.publish_date = self.publish_date.isoformat()
+ elif not isinstance(self.publish_date, str):
+ self.publish_date = str(self.publish_date)
+
+ # 确保 images 是列表类型
+ if self.images is not None:
+ if not isinstance(self.images, list):
+ raise ValueError("images 必须是列表类型")
diff --git a/test/deep_scraper_test.py b/test/deep_scraper_test.py
index 4ffa0ce..8d892b0 100644
--- a/test/deep_scraper_test.py
+++ b/test/deep_scraper_test.py
@@ -7,6 +7,7 @@ project_root = os.path.dirname(current_dir) # 获取父目录
sys.path.append(project_root)
from core.utils.deep_scraper import deep_scraper, common_chars
+from core.custom_scrapers.mp_scraper import mp_scraper
def check_url_text(text):
print(f"processing: {text}")
@@ -89,8 +90,21 @@ if __name__ == '__main__':
with open(file, 'r') as f:
html_sample = json.load(f)
_url = html_sample['url']
- raw_markdown = html_sample['markdown']
- used_img = {d['src']: d['alt'] for d in html_sample['media']['images']}
+ if _url.startswith('https://mp.weixin.qq.com'):
+ result = mp_scraper(html_sample)
+ print(f'url: {result.url}')
+ print(f'content: {result.content}')
+ print(f'links: {result.links}')
+ print(f'author: {result.author}')
+ print(f'publish_date: {result.publish_date}')
+ print(f'images: {len(result.images)}')
+ for img in result.images:
+ print(img)
+ raw_markdown = result.content
+ used_img = result.images
+ else:
+ raw_markdown = html_sample['markdown']
+ used_img = {d['src']: d['alt'] for d in html_sample['media']['images']}
except Exception as e:
print('sample format error, try to use craw4ai_fething.py to get sample')
print(f"error: {e}")
diff --git a/test/read_markdown.py b/test/read_markdown.py
new file mode 100644
index 0000000..09821de
--- /dev/null
+++ b/test/read_markdown.py
@@ -0,0 +1,59 @@
+import os
+import json
+import re
+
+def read_markdown_from_json_files(directory_path):
+ # Get all JSON files in the directory
+ json_files = [f for f in os.listdir(directory_path) if f.endswith('.json')]
+ img_pattern = r'!\[(.*?)\]\((.*?)\)'
+ link_pattern = r'\[(.*?)\]\((.*?)\)'
+
+ # Process each JSON file
+ for json_file in sorted(json_files):
+ file_path = os.path.join(directory_path, json_file)
+ print(f"\nProcessing file: {json_file}")
+ print("-" * 50)
+
+ with open(file_path, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+
+ markdown = data.get('markdown')
+ # Find the longest consecutive newlines in the markdown text
+ if markdown:
+ # Find all sequences of newlines and get their lengths
+ max_newlines = max(len(match) for match in re.findall(r'\n+', markdown)) if re.search(r'\n+', markdown) else 0
+ print(f"Longest consecutive newlines: {max_newlines}")
+ if max_newlines < 2:
+ sections = [markdown]
+ else:
+ sections = markdown.split('\n' * max_newlines)
+
+ for i, section in enumerate(sections):
+ print(f"Section {i + 1}:")
+ print(section)
+ print('\n\n')
+ newline_count = section.count('\n')
+ # 处理图片标记 ![alt](src)
+ img_pattern = r'!\[(.*?)\]\((.*?)\)'
+ matches = re.findall(img_pattern, section)
+ for alt, src in matches:
+ # 替换为新格式 §alt||src§
+ section = section.replace(f'![{alt}]({src})', f'§{alt}||{src}§')
+ # 处理链接标记 [text](url)
+ matches = re.findall(link_pattern, section)
+ # 从text中去掉所有matches部分
+ for link_text, link_url in matches:
+ section = section.replace(f'[{link_text}]({link_url})', '')
+
+ if len(section) == 0:
+ print("no text in section")
+ continue
+ print(f"newline/text ratio: {newline_count/len(section)*100}")
+ print(f"links/section ratio: {len(matches)/len(section)*100}")
+ print("-" * 50)
+
+
+if __name__ == "__main__":
+ # Path to the webpage_samples directory
+ samples_dir = os.path.dirname(os.path.abspath(__file__)) + "/webpage_samples"
+ read_markdown_from_json_files(samples_dir)
diff --git a/weixin_mp/__init__.py b/weixin_mp/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/weixin_mp/requirements.txt b/weixin_mp/requirements.txt
new file mode 100644
index 0000000..7a38911
--- /dev/null
+++ b/weixin_mp/requirements.txt
@@ -0,0 +1 @@
+websockets
\ No newline at end of file
diff --git a/weixin_mp/run_weixin.sh b/weixin_mp/run_weixin.sh
new file mode 100644
index 0000000..70495ea
--- /dev/null
+++ b/weixin_mp/run_weixin.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+# export CONFIGS='avatars'
+# export WX_BOT_ENDPOINT='127.0.0.1:8066'
+# export MAIN_SERVICE_ENDPOINT='http://127.0.0.1:7777/'
+# export VERBOSE=True
+
+python weixin.py
\ No newline at end of file
diff --git a/weixin_mp/weixin.py b/weixin_mp/weixin.py
new file mode 100644
index 0000000..a6df3d7
--- /dev/null
+++ b/weixin_mp/weixin.py
@@ -0,0 +1,113 @@
+import websockets
+import json
+import re
+import httpx
+import asyncio
+import os, sys
+
+current_dir = os.path.dirname(os.path.abspath(__file__))
+project_root = os.path.dirname(current_dir) # 获取父目录
+sys.path.append(project_root)
+
+from core.general_process import main_process, wiseflow_logger
+from typing import Optional
+import logging
+
+logging.getLogger("httpx").setLevel(logging.WARNING)
+
+# 千万注意扫码登录时不要选择“同步历史消息”,否则会造成 bot 上来挨个回复历史消息
+# 先检查下 wx 的登录状态,同时获取已登录微信的 wxid
+WX_BOT_ENDPOINT = os.environ.get('WX_BOT_ENDPOINT', '127.0.0.1:8066')
+wx_url = f"http://{WX_BOT_ENDPOINT}/api/"
+try:
+ # 发送GET请求
+ response = httpx.get(f"{wx_url}checklogin")
+ response.raise_for_status() # 检查HTTP响应状态码是否为200
+
+ # 解析JSON响应
+ data = response.json()
+
+ # 检查status字段
+ if data['data']['status'] == 1:
+ # 记录wxid
+ self_wxid = data['data']['wxid']
+ wiseflow_logger.info(f"已登录微信账号: {self_wxid}")
+ else:
+ # 抛出异常
+ wiseflow_logger.error("未检测到任何登录信息,将退出")
+ raise ValueError("登录失败,status不为1")
+except Exception as e:
+ wiseflow_logger.error(f"无法链接微信端点:{wx_url}, 错误:\n{e}")
+ raise ValueError("登录失败,无法连接")
+
+# 获取登录微信昵称,用于后面判断是否@自己的消息
+response = httpx.get(f"{wx_url}userinfo")
+response.raise_for_status() # 检查HTTP响应状态码是否为200
+# 解析JSON响应
+data = response.json()
+self_nickname = data['data'].get('nickname', " ")
+wiseflow_logger.info(f"self_nickname: {self_nickname}")
+
+# 如果要选定只监控部分公众号,请在同一文件夹内创建 config.json 文件,内容为要监控的公众号列表
+# 注意这里要写公众号的原始id,即 gh_ 开头的id, 可以通过历史 logger 获取
+config_file = 'config.json'
+if not os.path.exists(config_file):
+ config = None
+else:
+ with open(config_file, 'r', encoding='utf-8') as f:
+ config = json.load(f)
+
+
+#如下 pattern 仅适用于public msg的解析,群内分享的公号文章不在此列
+# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg
+item_pattern = re.compile(r'- (.*?)
', re.DOTALL)
+url_pattern = re.compile(r'')
+
+async def get_public_msg(websocket_uri):
+ reconnect_attempts = 0
+ max_reconnect_attempts = 3
+ while True:
+ try:
+ async with websockets.connect(websocket_uri, max_size=10 * 1024 * 1024) as websocket:
+ while True:
+ response = await websocket.recv()
+ datas = json.loads(response)
+
+ for data in datas["data"]:
+ if "StrTalker" not in data or "Content" not in data:
+ wiseflow_logger.warning(f"invalid data:\n{data}")
+ continue
+ user_id = data["StrTalker"]
+
+ items = item_pattern.findall(data["Content"])
+ # Iterate through all < item > content, extracting < url > and < summary >
+ todo_urls = set()
+ for item in items:
+ url_match = url_pattern.search(item)
+ url = url_match.group(1) if url_match else None
+ if not url:
+ wiseflow_logger.warning(f"can not find url in \n{item}")
+ continue
+ # URL processing, http is replaced by https, and the part after chksm is removed.
+ url = url.replace('http://', 'https://')
+ cut_off_point = url.find('chksm=')
+ if cut_off_point != -1:
+ url = url[:cut_off_point - 1]
+ # summary_match = summary_pattern.search(item)
+ # addition = summary_match.group(1) if summary_match else None
+ todo_urls.add(url)
+ await main_process(todo_urls)
+ except websockets.exceptions.ConnectionClosedError as e:
+ wiseflow_logger.error(f"Connection closed with exception: {e}")
+ reconnect_attempts += 1
+ if reconnect_attempts <= max_reconnect_attempts:
+ wiseflow_logger.info(f"Reconnecting attempt {reconnect_attempts}...")
+ await asyncio.sleep(1)
+ else:
+ wiseflow_logger.error("Max reconnect attempts reached. Exiting.")
+ break
+ except Exception as e:
+ wiseflow_logger.error(f"PublicMsgHandler error: {e}")
+
+uri_public = f"ws://{WX_BOT_ENDPOINT}/ws/publicMsg"
+asyncio.run(get_public_msg(uri_public))