diff --git a/core/scrapers/README.md b/core/scrapers/README.md index ac5dd32..090aa76 100644 --- a/core/scrapers/README.md +++ b/core/scrapers/README.md @@ -2,9 +2,7 @@ 如果信源需要对应特殊的抓取配置,可以在 `core/scrapers/__init__.py` 中编辑对应的 crawler_config,并在 `custom_fetching_configs` 中注册。 -## 自定义解析器 - -### 解析器 +## 解析器(Scraper) 对于从网页内容中提取关注信息这一任务而言,直接把 html 编码送给 llm 并不是一个好主意。在该类型任务中,我们期待 llm 表现的类似人类,侧重点在于内容的理解,而不是 html 的解析。且不说直接送入 html 编码还会造成额外(非常大量)的 token 消耗和处理效率的降低。 diff --git a/core/scrapers/__init__.py b/core/scrapers/__init__.py index 307c463..d0a2e99 100644 --- a/core/scrapers/__init__.py +++ b/core/scrapers/__init__.py @@ -1,9 +1,15 @@ from crawl4ai import CrawlerRunConfig -# from .xxx import xx_config +from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator custom_scrapers = {} custom_fetching_configs = {} +md_generator = DefaultMarkdownGenerator( + options={ + "skip_internal_links": True, + "escape_html": True + } + ) crawler_config = CrawlerRunConfig(delay_before_return_html=2.0, markdown_generator=md_generator, wait_until='commit', magic=True, scan_full_page=True) diff --git a/core/utils/action_dict_scraper.py b/core/scrapers/action_dict_scraper.py similarity index 100% rename from core/utils/action_dict_scraper.py rename to core/scrapers/action_dict_scraper.py diff --git a/core/scrapers/deep_scraper.py b/core/scrapers/deep_scraper.py index e7c636c..388a4c5 100644 --- a/core/scrapers/deep_scraper.py +++ b/core/scrapers/deep_scraper.py @@ -26,232 +26,158 @@ common_tlds = [ common_chars = ',.!;:,;:、一二三四五六七八九十#*@% \t\n\r|*-_…>#' def normalize_url(url: str, base_url: str) -> str: - url = url.strip().lower() - if url.startswith(("javascript:", "mailto:", "javacript:", "tel:", "sms:", "data:", "file:", "ftp:", "about:", "chrome:", "blob:", "ws:", "wss:", "view-source:")): - return '' - """ - if "<" in url and url.endswith(">"): - # 暂时应对 crawl4ai 的特殊情况 - part1, part2 = url.split("<") - if part2.startswith("http"): - url = part2[:-1] - else: - parsed_base = urlparse(part1) - url = f"{parsed_base.scheme}://{parsed_base.netloc}/{part2[:-1]}" - """ - if url.startswith("www."): + url = url.strip() + if url.startswith(('www.', 'WWW.')): _url = f"https://{url}" + elif url.startswith('/www.'): + _url = f"https:/{url}" elif url.startswith("//"): _url = f"https:{url}" - elif url.startswith(('http:/', 'https:/')): + elif url.startswith(('http://', 'https://')): _url = url + elif url.startswith('http:/'): + _url = f"http://{url[6:]}" + elif url.startswith('https:/'): + _url = f"https://{url[7:]}" else: _url = urljoin(base_url, url) - # 处理url中path部分的多余斜杠 - parsed = urlparse(_url) - path = parsed.path - # 将连续的多个/替换为单个/ - normalized_path = re.sub(r'/+', '/', path) - # 重新组装url - _url = f"{parsed.scheme}://{parsed.netloc}{normalized_path}" - if parsed.query: - _url = f"{_url}?{parsed.query}" - if parsed.fragment: - _url = f"{_url}#{parsed.fragment}" - return _url + + _ss = _url.split('//') + if len(_ss) == 2: + return '//'.join(_ss) + else: + return _ss[0] + '//' + '/'.join(_ss[1:]) -def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple[dict, tuple[str, dict]]: + +def deep_scraper(raw_markdown: str, base_url: str, used_img: list[str]) -> tuple[dict, list[str], dict]: link_dict = {} + to_be_recognized_by_visual_llm = {} def check_url_text(text): - text = text.strip() - left_bracket = text.find('[') - right_paren = text.rfind(')') - - if -1 in [left_bracket, right_paren] or left_bracket > right_paren: - return text - - # 检查左括号前的文本是否包含至少2个有效字符 - prefix = text[:left_bracket] - pre_valid_chars = [c for c in prefix if not c.isdigit() and c not in common_chars] - if len(pre_valid_chars) >= 50: - return text - - suffix = text[right_paren+1:] - suf_valid_chars = [c for c in suffix if c not in common_chars] - if len(pre_valid_chars) >= 2 and len(suf_valid_chars) >= 1: - return text - - if len(suf_valid_chars) >= 36: - return text + # text = text.strip() + # for special url formate from crawl4ai 0.4.247 + text = re.sub(r'', '', text).strip() # 处理图片标记 ![alt](src) - img_pattern = r'!\[(.*?)\]\((.*?)\)' + img_pattern = r'(!\[(.*?)\]\((.*?)\))' matches = re.findall(img_pattern, text) - - for alt, src in matches: + for _sec,alt, src in matches: # 替换为新格式 §alt||src§ - text = text.replace(f'![{alt}]({src})', f'§{alt}||{src}§') + text = text.replace(_sec, f'§{alt}||{src}§', 1) # 找到所有[part0](part1)格式的片段 - link_pattern = r'\[(.*?)\]\((.*?)\)' + link_pattern = r'(\[(.*?)\]\((.*?)\))' matches = re.findall(link_pattern, text) - # 从text中去掉所有matches部分 - for link_text, link_url in matches: - text = text.replace(f'[{link_text}]({link_url})', '') - - img_marker_pattern = r'§(.*?)\|\|(.*?)§' - img_marker_matches = re.findall(img_marker_pattern, text) - alt_img_alt = "" - alt_img_src = "" - if img_marker_matches: - alt_img_alt = img_marker_matches[0][0] - alt_img_src = img_marker_matches[0][1] - for alt, src in img_marker_matches: - text = text.replace(f'§{alt}||{src}§', '') - - text = text.strip() - - for link_text, link_url in matches: + for _sec, link_text, link_url in matches: + print("found link sec:", _sec) # 处理 \"***\" 格式的片段 quote_pattern = r'\"(.*?)\"' # 提取所有引号包裹的内容 - link_alt = ''.join(re.findall(quote_pattern, link_url)) - if link_alt not in link_text: - link_text = f"{link_text} {link_alt}" - # 去掉所有引号包裹的内容 - _url = re.sub(quote_pattern, '', link_url).strip() - if not _url or _url.startswith('#'): - continue - url = normalize_url(_url, base_url) - if not url: - continue - # 检查链接是否是常见文件类型或顶级域名 - has_common_ext = any(url.endswith(ext) for ext in common_file_exts) - has_common_tld = any(url.endswith(tld) or url.endswith(tld + '/') for tld in common_tlds) - if has_common_ext or has_common_tld: - continue + _title = ''.join(re.findall(quote_pattern, link_url)) # 分离§§内的内容和后面的内容 - link_text = link_text.strip() + img_marker_pattern = r'§(.*?)\|\|(.*?)§' inner_matches = re.findall(img_marker_pattern, link_text) for alt, src in inner_matches: link_text = link_text.replace(f'§{alt}||{src}§', '') link_text = link_text.strip() - - if text not in link_text: - link_text = f"{link_text} {text}" - - # 去除首尾的common_chars和数字 - link_text = link_text.strip() - if link_text: - if url not in link_dict: - link_dict[url] = link_text - else: - if link_dict[url].startswith("§to_be_recognized_by_visual_llm_"): - link_dict[url] = link_text - else: - link_dict[url] = f"{link_dict[url]} {link_text}" - - if url in link_dict: - continue + if _title not in link_text: + link_text = f"{_title} - {link_text}" - img_alt = "" - img_src = "" - if inner_matches: + link_text = link_text.strip() + if not link_text and inner_matches: img_alt = inner_matches[0][0].strip() img_src = inner_matches[0][1].strip() + if img_src and not img_src.startswith('#'): + img_src = normalize_url(img_src, base_url) + if not img_src: + link_text = img_alt + elif len(img_alt) > 2: + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + link_text = img_alt + _key + elif any(img_src.endswith(tld) or img_src.endswith(tld + '/') for tld in common_tlds): + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + link_text = img_alt + _key + elif any(img_src.endswith(ext) for ext in common_file_exts if ext not in ['jpg', 'jpeg', 'png']): + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + link_text = img_alt + _key + else: + if img_src not in to_be_recognized_by_visual_llm: + to_be_recognized_by_visual_llm[img_src] = f"§{len(to_be_recognized_by_visual_llm)+1}§" + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + link_text = to_be_recognized_by_visual_llm[img_src] + _key + else: + link_text = img_alt - if not img_src and alt_img_src: - img_src = alt_img_src - img_alt = alt_img_alt + real_url_pattern = r'<(.*?)>' + real_url = re.search(real_url_pattern, link_url) + if real_url: + _url = real_url.group(1).strip() + else: + _url = re.sub(quote_pattern, '', link_url).strip() - if not img_src or img_src.startswith('#'): + if not _url or _url.startswith(('#', 'javascript:')): + text = text.replace(_sec, link_text, 1) continue + url = normalize_url(_url, base_url) + _key = f"[{len(link_dict)+1}]" + link_dict[_key] = url + text = text.replace(_sec, link_text + _key, 1) - img_src = normalize_url(img_src, base_url) + # 检查链接是否是常见文件类型或顶级域名 + # todo: get_more_url 时再处理 + """ + has_common_ext = any(url.endswith(ext) for ext in common_file_exts) + has_common_tld = any(url.endswith(tld) or url.endswith(tld + '/') for tld in common_tlds) + if has_common_ext or has_common_tld: + continue + """ + # 处理文本中的其他图片标记 + img_pattern = r'(§(.*?)\|\|(.*?)§)' + matches = re.findall(img_pattern, text) + for _sec, alt, src in matches: + if not src or src.startswith('#') or src not in used_img: + text = text.replace(_sec, alt, 1) + continue + img_src = normalize_url(src, base_url) if not img_src: - continue - if any(img_src.endswith(tld) or img_src.endswith(tld + '/') for tld in common_tlds): - continue - if any(img_src.endswith(ext) for ext in common_file_exts if ext not in ['jpg', 'jpeg', 'png']): - continue - link_dict[url] = f"{img_alt}§to_be_recognized_by_visual_llm_{img_src}§" - - return '' + text = text.replace(_sec, alt, 1) + elif len(alt) > 2: + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + text = text.replace(_sec, alt + _key, 1) + elif any(img_src.endswith(tld) or img_src.endswith(tld + '/') for tld in common_tlds): + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + text = text.replace(_sec, alt + _key, 1) + elif any(img_src.endswith(ext) for ext in common_file_exts if ext not in ['jpg', 'jpeg', 'png']): + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + text = text.replace(_sec, alt + _key, 1) + else: + if img_src not in to_be_recognized_by_visual_llm: + to_be_recognized_by_visual_llm[img_src] = f"§{len(to_be_recognized_by_visual_llm)+1}§" + _key = f"[img{len(link_dict)+1}]" + link_dict[_key] = img_src + text = text.replace(_sec, to_be_recognized_by_visual_llm[img_src] + _key, 1) - texts = raw_markdown.split('\n\n') - texts = [check_url_text(text) for text in texts] + # 处理文本中的"野 url" + url_pattern = r'((?:https?://|www\.)[-A-Za-z0-9+&@#/%?=~_|!:,.;]*[-A-Za-z0-9+&@#/%=~_|])' + matches = re.findall(url_pattern, text) + for url in matches: + url = normalize_url(url, base_url) + _key = f"[{len(link_dict)+1}]" + link_dict[_key] = url + text = text.replace(url, _key, 1) + + return text + + sections = raw_markdown.split('# ') # use '# ' to avoid # in url + texts = [check_url_text(text) for text in sections] texts = [text for text in texts if text.strip()] - html_text = '\n\n'.join(texts) - # 处理图片标记 ![alt](src) - img_pattern = r'!\[(.*?)\]\((.*?)\)' - matches = re.findall(img_pattern, html_text) - text_link_map = {} - for alt, src in matches: - if src not in used_img: - html_text = html_text.replace(f'![{alt}]({src})', '') - continue - - src = src.strip().lower() - if not src or src.startswith('#'): - html_text = html_text.replace(f'![{alt}]({src})', alt) - continue - - key = f"Ref_{len(text_link_map)+1}" - text_link_map[key] = src - src = normalize_url(src, base_url) - - if not src: - html_text = html_text.replace(f'![{alt}]({src})', f"{alt}[{key}]") - continue - - if any(src.endswith(tld) or src.endswith(tld + '/') for tld in common_tlds): - html_text = html_text.replace(f'![{alt}]({src})', f"{alt}[{key}]") - continue - if any(src.endswith(ext) for ext in common_file_exts if ext not in ['jpg', 'jpeg', 'png']): - html_text = html_text.replace(f'![{alt}]({src})', f"{alt}[{key}]") - continue - html_text = html_text.replace(f'![{alt}]({src})', f" {alt}[{key}]§to_be_recognized_by_visual_llm_{src[1:]}§") # to avoid conflict with the url pattern - - # 接下来要处理所有的[]()文本了 - link_pattern = r'\[(.*?)\]\((.*?)\)' - matches = re.findall(link_pattern, html_text) - for match in matches: - link_text, link_url = match - original_markdown = f'[{link_text}]({link_url})' # 重建原始的 markdown 链接格式 - # 处理 \"***\" 格式的片段 - quote_pattern = r'\"(.*?)\"' - # 提取所有引号包裹的内容 - link_alt = ''.join(re.findall(quote_pattern, link_url)) - if link_alt not in link_text: - link_text = f"{link_text} {link_alt}" - # 去掉所有引号包裹的内容 - _url = re.sub(quote_pattern, '', link_url).strip() - if not _url or _url.startswith('#'): - continue - url = normalize_url(_url, base_url) - if not url: - continue - key = f"Ref_{len(text_link_map)+1}" - text_link_map[key] = url - - html_text = html_text.replace(original_markdown, f'{link_text}[{key}]') - - # 处理文本中的"野 url" - url_pattern = r'((?:https?://|www\.)[-A-Za-z0-9+&@#/%?=~_|!:,.;]*[-A-Za-z0-9+&@#/%=~_|])' - matches = re.findall(url_pattern, html_text) - for url in matches: - url = normalize_url(url, base_url) - if not url: - continue - key = f"Ref_{len(text_link_map)+1}" - text_link_map[key] = url - html_text = html_text.replace(url, f'[{key}]') - - # 去掉文本中所有残存的[]和![] - html_text = html_text.replace('![]', '') # 去掉![] - html_text = html_text.replace('[]', '') # 去掉[] - - return link_dict, (html_text, text_link_map) + return link_dict, texts, to_be_recognized_by_visual_llm \ No newline at end of file diff --git a/core/scrapers/mp_scraper.py b/core/scrapers/mp_scraper.py index 6d696d0..af6ba88 100644 --- a/core/scrapers/mp_scraper.py +++ b/core/scrapers/mp_scraper.py @@ -48,6 +48,9 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: def process_content(content_div): # 3.1 处理所有 元素 for img in content_div.find_all('img', attrs={'data-src': True}, recursive=True): + data_type = img.get('data-type') + if data_type in ['gif', 'svg']: + continue src = img.get('data-src') if not src or src.startswith('#') or src.startswith('about:blank'): src = None @@ -178,7 +181,7 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: if text: content_parts.append(text) - return '\n'.join(content_parts).strip() + return ' '.join(content_parts).strip() soup = BeautifulSoup(cleaned_html, 'html.parser') @@ -248,7 +251,7 @@ def mp_scraper(fetch_result: dict) -> ScraperResultData: publish_date = None # 剩下的 div 子块合起来作为 content content_divs = sub_divs[1:] - content = '\n\n'.join([process_content(div) for div in content_divs]) + content = '# '.join([process_content(div) for div in content_divs]) content = title + '\n\n' + content else: author = None diff --git a/test/deep_scraper_test.py b/test/deep_scraper_test.py index 8d892b0..8a4435e 100644 --- a/test/deep_scraper_test.py +++ b/test/deep_scraper_test.py @@ -6,8 +6,8 @@ current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) # 获取父目录 sys.path.append(project_root) -from core.utils.deep_scraper import deep_scraper, common_chars -from core.custom_scrapers.mp_scraper import mp_scraper +from core.scrapers.deep_scraper import deep_scraper, common_chars +from core.scrapers.mp_scraper import mp_scraper def check_url_text(text): print(f"processing: {text}") @@ -85,21 +85,21 @@ if __name__ == '__main__': for file in files: if not file.endswith('.json'): continue - print(f"processing {file} ...") + #print(f"processing {file} ...") try: with open(file, 'r') as f: html_sample = json.load(f) _url = html_sample['url'] if _url.startswith('https://mp.weixin.qq.com'): result = mp_scraper(html_sample) - print(f'url: {result.url}') - print(f'content: {result.content}') - print(f'links: {result.links}') - print(f'author: {result.author}') - print(f'publish_date: {result.publish_date}') - print(f'images: {len(result.images)}') - for img in result.images: - print(img) + #print(f'url: {result.url}') + #print(f'content: {result.content}') + #print(f'links: {result.links}') + #print(f'author: {result.author}') + #print(f'publish_date: {result.publish_date}') + #print(f'images: {len(result.images)}') + #for img in result.images: + # print(img) raw_markdown = result.content used_img = result.images else: @@ -117,18 +117,18 @@ if __name__ == '__main__': base_url = base_url.rsplit('/', 1)[0] + '/' time_start = time.time() - from_html_link_dict, (from_html_text, from_html_text_link_map) = deep_scraper(raw_markdown, base_url, used_img) + link_dict, texts, to_be_recognized_by_visual_llm = deep_scraper(raw_markdown, base_url, used_img) time_end = time.time() - print(f"time cost for html: {time_end - time_start}s") + #print(f"time cost for html: {time_end - time_start}s") result = { - "link_dict": from_html_link_dict, - "text": from_html_text, - "text_link_map": from_html_text_link_map, + "link_dict": link_dict, + "texts": texts, + "to_be_recognized_by_visual_llm": to_be_recognized_by_visual_llm, } record_folder = file.replace('.json', '') os.makedirs(record_folder, exist_ok=True) with open(os.path.join(record_folder, 'sample.json'), 'w', encoding='utf-8') as f: json.dump(result, f, indent=4, ensure_ascii=False) - print("done") - print("*" * 12) + #print("done") + #print("*" * 12) diff --git a/test/get_info_test.py b/test/get_info_test.py index 8bd1988..f82d073 100644 --- a/test/get_info_test.py +++ b/test/get_info_test.py @@ -4,7 +4,7 @@ import json import asyncio import time from prompts import * - +# prompt 要加上今天是………… current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) # get parent dir sys.path.append(project_root) diff --git a/test/read_markdown.py b/test/read_markdown.py index 09821de..9825bda 100644 --- a/test/read_markdown.py +++ b/test/read_markdown.py @@ -1,6 +1,13 @@ import os import json import re +import sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) # 获取父目录 +sys.path.append(project_root) + +from core.scrapers.mp_scraper import mp_scraper def read_markdown_from_json_files(directory_path): # Get all JSON files in the directory @@ -16,42 +23,30 @@ def read_markdown_from_json_files(directory_path): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) - - markdown = data.get('markdown') + + url = data.get('url') + if url.startswith('https://mp.weixin.qq.com'): + result = mp_scraper(data) + markdown = result.content + else: + markdown = data.get('markdown') + # Find the longest consecutive newlines in the markdown text if markdown: - # Find all sequences of newlines and get their lengths - max_newlines = max(len(match) for match in re.findall(r'\n+', markdown)) if re.search(r'\n+', markdown) else 0 - print(f"Longest consecutive newlines: {max_newlines}") - if max_newlines < 2: - sections = [markdown] - else: - sections = markdown.split('\n' * max_newlines) - - for i, section in enumerate(sections): - print(f"Section {i + 1}:") - print(section) - print('\n\n') - newline_count = section.count('\n') - # 处理图片标记 ![alt](src) - img_pattern = r'!\[(.*?)\]\((.*?)\)' - matches = re.findall(img_pattern, section) - for alt, src in matches: - # 替换为新格式 §alt||src§ - section = section.replace(f'![{alt}]({src})', f'§{alt}||{src}§') - # 处理链接标记 [text](url) - matches = re.findall(link_pattern, section) - # 从text中去掉所有matches部分 - for link_text, link_url in matches: - section = section.replace(f'[{link_text}]({link_url})', '') - - if len(section) == 0: - print("no text in section") - continue - print(f"newline/text ratio: {newline_count/len(section)*100}") - print(f"links/section ratio: {len(matches)/len(section)*100}") - print("-" * 50) + # 处理图片标记 ![alt](src) + matches = re.findall(img_pattern, markdown) + for alt, src in matches: + # 替换为新格式 §alt||img_12§ + markdown = markdown.replace(f'![{alt}]({src})', f'') + matches = re.findall(link_pattern, markdown) + for link_text, link_url in matches: + markdown = markdown.replace(f'[{link_text}]({link_url})', '[url]') + markdown = [m.strip() for m in markdown.split('# ') if m.strip()] + markdown = '\n----------------------------------\n'.join(markdown) + record_file = open(f'{json_file}.txt', 'w', encoding='utf-8') + record_file.write(markdown) + record_file.close() if __name__ == "__main__": # Path to the webpage_samples directory diff --git a/weixin_mp/__init__.py b/weixin_mp/__init__.py index e69de29..a6df3d7 100644 --- a/weixin_mp/__init__.py +++ b/weixin_mp/__init__.py @@ -0,0 +1,113 @@ +import websockets +import json +import re +import httpx +import asyncio +import os, sys + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) # 获取父目录 +sys.path.append(project_root) + +from core.general_process import main_process, wiseflow_logger +from typing import Optional +import logging + +logging.getLogger("httpx").setLevel(logging.WARNING) + +# 千万注意扫码登录时不要选择“同步历史消息”,否则会造成 bot 上来挨个回复历史消息 +# 先检查下 wx 的登录状态,同时获取已登录微信的 wxid +WX_BOT_ENDPOINT = os.environ.get('WX_BOT_ENDPOINT', '127.0.0.1:8066') +wx_url = f"http://{WX_BOT_ENDPOINT}/api/" +try: + # 发送GET请求 + response = httpx.get(f"{wx_url}checklogin") + response.raise_for_status() # 检查HTTP响应状态码是否为200 + + # 解析JSON响应 + data = response.json() + + # 检查status字段 + if data['data']['status'] == 1: + # 记录wxid + self_wxid = data['data']['wxid'] + wiseflow_logger.info(f"已登录微信账号: {self_wxid}") + else: + # 抛出异常 + wiseflow_logger.error("未检测到任何登录信息,将退出") + raise ValueError("登录失败,status不为1") +except Exception as e: + wiseflow_logger.error(f"无法链接微信端点:{wx_url}, 错误:\n{e}") + raise ValueError("登录失败,无法连接") + +# 获取登录微信昵称,用于后面判断是否@自己的消息 +response = httpx.get(f"{wx_url}userinfo") +response.raise_for_status() # 检查HTTP响应状态码是否为200 +# 解析JSON响应 +data = response.json() +self_nickname = data['data'].get('nickname', " ") +wiseflow_logger.info(f"self_nickname: {self_nickname}") + +# 如果要选定只监控部分公众号,请在同一文件夹内创建 config.json 文件,内容为要监控的公众号列表 +# 注意这里要写公众号的原始id,即 gh_ 开头的id, 可以通过历史 logger 获取 +config_file = 'config.json' +if not os.path.exists(config_file): + config = None +else: + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + + +#如下 pattern 仅适用于public msg的解析,群内分享的公号文章不在此列 +# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg +item_pattern = re.compile(r'(.*?)', re.DOTALL) +url_pattern = re.compile(r'') + +async def get_public_msg(websocket_uri): + reconnect_attempts = 0 + max_reconnect_attempts = 3 + while True: + try: + async with websockets.connect(websocket_uri, max_size=10 * 1024 * 1024) as websocket: + while True: + response = await websocket.recv() + datas = json.loads(response) + + for data in datas["data"]: + if "StrTalker" not in data or "Content" not in data: + wiseflow_logger.warning(f"invalid data:\n{data}") + continue + user_id = data["StrTalker"] + + items = item_pattern.findall(data["Content"]) + # Iterate through all < item > content, extracting < url > and < summary > + todo_urls = set() + for item in items: + url_match = url_pattern.search(item) + url = url_match.group(1) if url_match else None + if not url: + wiseflow_logger.warning(f"can not find url in \n{item}") + continue + # URL processing, http is replaced by https, and the part after chksm is removed. + url = url.replace('http://', 'https://') + cut_off_point = url.find('chksm=') + if cut_off_point != -1: + url = url[:cut_off_point - 1] + # summary_match = summary_pattern.search(item) + # addition = summary_match.group(1) if summary_match else None + todo_urls.add(url) + await main_process(todo_urls) + except websockets.exceptions.ConnectionClosedError as e: + wiseflow_logger.error(f"Connection closed with exception: {e}") + reconnect_attempts += 1 + if reconnect_attempts <= max_reconnect_attempts: + wiseflow_logger.info(f"Reconnecting attempt {reconnect_attempts}...") + await asyncio.sleep(1) + else: + wiseflow_logger.error("Max reconnect attempts reached. Exiting.") + break + except Exception as e: + wiseflow_logger.error(f"PublicMsgHandler error: {e}") + +uri_public = f"ws://{WX_BOT_ENDPOINT}/ws/publicMsg" +asyncio.run(get_public_msg(uri_public)) diff --git a/weixin_mp/run_weixin.sh b/weixin_mp/run_weixin.sh index 70495ea..dcfb2a0 100644 --- a/weixin_mp/run_weixin.sh +++ b/weixin_mp/run_weixin.sh @@ -1,8 +1,18 @@ #!/bin/bash -# export CONFIGS='avatars' -# export WX_BOT_ENDPOINT='127.0.0.1:8066' -# export MAIN_SERVICE_ENDPOINT='http://127.0.0.1:7777/' -# export VERBOSE=True +set -o allexport +source ../core/.env +set +o allexport -python weixin.py \ No newline at end of file +if ! pgrep -x "pocketbase" > /dev/null; then + if ! netstat -tuln | grep ":8090" > /dev/null && ! lsof -i :8090 > /dev/null; then + echo "Starting PocketBase..." + ../pb/pocketbase serve --http=127.0.0.1:8090 & + else + echo "Port 8090 is already in use." + fi +else + echo "PocketBase is already running." +fi + +python __init__.py \ No newline at end of file diff --git a/weixin_mp/weixin.py b/weixin_mp/weixin.py deleted file mode 100644 index a6df3d7..0000000 --- a/weixin_mp/weixin.py +++ /dev/null @@ -1,113 +0,0 @@ -import websockets -import json -import re -import httpx -import asyncio -import os, sys - -current_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.dirname(current_dir) # 获取父目录 -sys.path.append(project_root) - -from core.general_process import main_process, wiseflow_logger -from typing import Optional -import logging - -logging.getLogger("httpx").setLevel(logging.WARNING) - -# 千万注意扫码登录时不要选择“同步历史消息”,否则会造成 bot 上来挨个回复历史消息 -# 先检查下 wx 的登录状态,同时获取已登录微信的 wxid -WX_BOT_ENDPOINT = os.environ.get('WX_BOT_ENDPOINT', '127.0.0.1:8066') -wx_url = f"http://{WX_BOT_ENDPOINT}/api/" -try: - # 发送GET请求 - response = httpx.get(f"{wx_url}checklogin") - response.raise_for_status() # 检查HTTP响应状态码是否为200 - - # 解析JSON响应 - data = response.json() - - # 检查status字段 - if data['data']['status'] == 1: - # 记录wxid - self_wxid = data['data']['wxid'] - wiseflow_logger.info(f"已登录微信账号: {self_wxid}") - else: - # 抛出异常 - wiseflow_logger.error("未检测到任何登录信息,将退出") - raise ValueError("登录失败,status不为1") -except Exception as e: - wiseflow_logger.error(f"无法链接微信端点:{wx_url}, 错误:\n{e}") - raise ValueError("登录失败,无法连接") - -# 获取登录微信昵称,用于后面判断是否@自己的消息 -response = httpx.get(f"{wx_url}userinfo") -response.raise_for_status() # 检查HTTP响应状态码是否为200 -# 解析JSON响应 -data = response.json() -self_nickname = data['data'].get('nickname', " ") -wiseflow_logger.info(f"self_nickname: {self_nickname}") - -# 如果要选定只监控部分公众号,请在同一文件夹内创建 config.json 文件,内容为要监控的公众号列表 -# 注意这里要写公众号的原始id,即 gh_ 开头的id, 可以通过历史 logger 获取 -config_file = 'config.json' -if not os.path.exists(config_file): - config = None -else: - with open(config_file, 'r', encoding='utf-8') as f: - config = json.load(f) - - -#如下 pattern 仅适用于public msg的解析,群内分享的公号文章不在此列 -# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg -item_pattern = re.compile(r'(.*?)', re.DOTALL) -url_pattern = re.compile(r'') - -async def get_public_msg(websocket_uri): - reconnect_attempts = 0 - max_reconnect_attempts = 3 - while True: - try: - async with websockets.connect(websocket_uri, max_size=10 * 1024 * 1024) as websocket: - while True: - response = await websocket.recv() - datas = json.loads(response) - - for data in datas["data"]: - if "StrTalker" not in data or "Content" not in data: - wiseflow_logger.warning(f"invalid data:\n{data}") - continue - user_id = data["StrTalker"] - - items = item_pattern.findall(data["Content"]) - # Iterate through all < item > content, extracting < url > and < summary > - todo_urls = set() - for item in items: - url_match = url_pattern.search(item) - url = url_match.group(1) if url_match else None - if not url: - wiseflow_logger.warning(f"can not find url in \n{item}") - continue - # URL processing, http is replaced by https, and the part after chksm is removed. - url = url.replace('http://', 'https://') - cut_off_point = url.find('chksm=') - if cut_off_point != -1: - url = url[:cut_off_point - 1] - # summary_match = summary_pattern.search(item) - # addition = summary_match.group(1) if summary_match else None - todo_urls.add(url) - await main_process(todo_urls) - except websockets.exceptions.ConnectionClosedError as e: - wiseflow_logger.error(f"Connection closed with exception: {e}") - reconnect_attempts += 1 - if reconnect_attempts <= max_reconnect_attempts: - wiseflow_logger.info(f"Reconnecting attempt {reconnect_attempts}...") - await asyncio.sleep(1) - else: - wiseflow_logger.error("Max reconnect attempts reached. Exiting.") - break - except Exception as e: - wiseflow_logger.error(f"PublicMsgHandler error: {e}") - -uri_public = f"ws://{WX_BOT_ENDPOINT}/ws/publicMsg" -asyncio.run(get_public_msg(uri_public))