diff --git a/core/custom_process/README.md b/core/custom_process/README.md new file mode 100644 index 0000000..c5952ca --- /dev/null +++ b/core/custom_process/README.md @@ -0,0 +1,30 @@ +wiseflow 致力于通过一套通用流程(使用视觉大模型驱动的可以自主使用爬虫工具的智能体)处理所有页面。 + +不过我们也为客户保留自定义处理的灵活性。 + +为添加自定义处理逻辑单元请遵照如下规范: + +1、逻辑处理单元应该是一个函数(而不是类); + +2、入参只接受两个:url(要处理的 url,请只提供一个 url,而不是列表,因为主函数会处理队列逻辑) 和 logger 对象(这意味着不要为你的自定义处理单元添加日志对象); + +3、出参需要返回解析后的文章详情(dict),和信息列表(list),以及解析出来的需要添加到工作队列的 url 结合(元组)。出参必须同时返回这三个结果,没有的话,可以分别传出 {} [] set() + +article 的字典必须包括 'url'(str), 'title'(str), 'author'(str), 'publish_date'(date 日期对象,注意是日期)四个键值,额外可以添加一个 'sceenshort' 值是一个 png 文件的路径。 + +4、在 core/custom_crawlers/__init__.py 中注册,参考: + +```pyhton +from .mp_crawler import mp_crawler + +customer_crawler_map = {'mp.weixin.qq.com': mp_crawler} +``` + +注意键使用域名,可以使用 urllib.parse获取: + +```pyhton +from urllib.parse import urlparse + +parsed_url = urlparse("site's url") +domain = parsed_url.netloc +``` \ No newline at end of file diff --git a/core/custom_process/__init__.py b/core/custom_process/__init__.py new file mode 100644 index 0000000..48289f3 --- /dev/null +++ b/core/custom_process/__init__.py @@ -0,0 +1,4 @@ +from .mp_process import mp_crawler + +customer_crawler_map = {} +# customer_crawler_map = {'mp.weixin.qq.com': mp_crawler} diff --git a/core/custom_process/mp_process.py b/core/custom_process/mp_process.py new file mode 100644 index 0000000..e31085b --- /dev/null +++ b/core/custom_process/mp_process.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +# warining: the mp_crawler will be deprecated in future version, we try to use general_process handle mp articles + +from core.agents import get_info +import httpx +from bs4 import BeautifulSoup +from datetime import datetime, date +import re +import asyncio + + +header = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'} + + +async def mp_crawler(url: str, logger) -> tuple[dict, list, set]: + if not url.startswith('https://mp.weixin.qq.com') and not url.startswith('http://mp.weixin.qq.com'): + logger.warning(f'{url} is not a mp url, you should not use this function') + return {}, [], set() + + url = url.replace("http://", "https://", 1) + + async with httpx.AsyncClient() as client: + for retry in range(2): + try: + response = await client.get(url, headers=header, timeout=30) + response.raise_for_status() + break + except Exception as e: + if retry < 1: + logger.info(f"{e}\nwaiting 1min") + await asyncio.sleep(60) + else: + logger.warning(e) + return {}, [], set() + + soup = BeautifulSoup(response.text, 'html.parser') + + if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'): + # 文章目录 + urls = {li.attrs['data-link'].replace("http://", "https://", 1) for li in soup.find_all('li', class_='album__list-item')} + simple_urls = set() + for url in urls: + cut_off_point = url.find('chksm=') + if cut_off_point != -1: + url = url[:cut_off_point - 1] + simple_urls.add(url) + return {}, [], simple_urls + + # Get the original release date first + pattern = r"var createTime = '(\d{4}-\d{2}-\d{2}) \d{2}:\d{2}'" + match = re.search(pattern, response.text) + + if match: + date_only = match.group(1) + publish_time = datetime.strptime(date_only, "%Y-%m-%d") + else: + publish_time = date.today() + + # Get description content from < meta > tag + try: + meta_description = soup.find('meta', attrs={'name': 'description'}) + summary = meta_description['content'].strip() if meta_description else '' + # card_info = soup.find('div', id='img-content') + # Parse the required content from the < div > tag + rich_media_title = soup.find('h1', id='activity-name').text.strip() \ + if soup.find('h1', id='activity-name') \ + else soup.find('h1', class_='rich_media_title').text.strip() + profile_nickname = soup.find('div', class_='wx_follow_nickname').text.strip() + except Exception as e: + logger.warning(f"not mp format: {url}\n{e}") + # For mp.weixin.qq.com types, mp_crawler won't work, and most likely neither will the other two + return {}, [], set() + + if not rich_media_title or not profile_nickname: + logger.warning(f"failed to analysis {url}, no title or profile_nickname") + return {}, [], set() + + # Parse text and image links within the content interval + # Todo This scheme is compatible with picture sharing MP articles, but the pictures of the content cannot be obtained, + # because the structure of this part is completely different, and a separate analysis scheme needs to be written + # (but the proportion of this type of article is not high). + texts = [] + content_area = soup.find('div', id='js_content') + if content_area: + # 提取文本 + for section in content_area.find_all(['section', 'p'], recursive=False): # 遍历顶级section + text = section.get_text(separator=' ', strip=True) + if text and text not in texts: + texts.append(text) + cleaned_texts = [t for t in texts if t.strip()] + content = '\n'.join(cleaned_texts) + else: + logger.warning(f"failed to analysis contents {url}") + return {}, [], set() + if content: + content = f"[from {profile_nickname}]{content}" + else: + # If the content does not have it, but the summary has it, it means that it is an mp of the picture sharing type. + # At this time, you can use the summary as the content. + content = f"[from {profile_nickname}]{summary}" + + infos = get_info(content, logger) + article = {'url': url, + 'title': rich_media_title, + 'author': profile_nickname, + 'publish_date': publish_time} + + return article, infos, set() diff --git a/core/general_process.py b/core/general_process.py new file mode 100644 index 0000000..c1b7a54 --- /dev/null +++ b/core/general_process.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +from utils.pb_api import PbTalker +from utils.general_utils import get_logger +from agents.get_info import GeneralInfoExtractor +from bs4 import BeautifulSoup +import os +import json +import asyncio +from custom_process import customer_crawler_map +from urllib.parse import urlparse, urljoin +import hashlib +from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext + + +project_dir = os.environ.get("PROJECT_DIR", "") +if project_dir: + os.makedirs(project_dir, exist_ok=True) +os.environ['CRAWLEE_STORAGE_DIR'] = os.path.join(project_dir, 'crawlee_storage') +screenshot_dir = os.path.join(project_dir, 'crawlee_storage', 'screenshots') + +wiseflow_logger = get_logger('general_process', f'{project_dir}/general_process.log') +pb = PbTalker(wiseflow_logger) +ie = GeneralInfoExtractor(pb, wiseflow_logger) + +# Global variables +working_list = set() +existing_urls = {url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']} +lock = asyncio.Lock() + +async def save_to_pb(article: dict, infos: list): + # saving to pb process + screenshot = article.pop('screenshot') if 'screenshot' in article else None + article_id = pb.add(collection_name='articles', body=article) + if not article_id: + wiseflow_logger.error('add article failed, writing to cache_file') + with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f: + json.dump(article, f, ensure_ascii=False, indent=4) + return + if screenshot: + file = open(screenshot, 'rb') + file_name = os.path.basename(screenshot) + message = pb.upload('articles', article_id, 'screenshot', file_name, file) + file.close() + if not message: + wiseflow_logger.warning(f'{article_id} upload screenshot failed, file location: {screenshot}') + + for info in infos: + info['articles'] = [article_id] + _ = pb.add(collection_name='agents', body=info) + if not _: + wiseflow_logger.error('add insight failed, writing to cache_file') + with open(os.path.join(project_dir, 'cache_insights.json'), 'a', encoding='utf-8') as f: + json.dump(info, f, ensure_ascii=False, indent=4) + + +async def pipeline(url: str): + global working_list, existing_urls + working_list.add(url) + crawler = PlaywrightCrawler( + # Limit the crawl to max requests. Remove or increase it for crawling all links. + max_requests_per_crawl=100, + ) + # Define the default request handler, which will be called for every request. + @crawler.router.default_handler + async def request_handler(context: PlaywrightCrawlingContext) -> None: + context.log.info(f'Processing {context.request.url} ...') + # Handle dialogs (alerts, confirms, prompts) + async def handle_dialog(dialog): + context.log.info(f'Closing dialog: {dialog.message}') + await dialog.accept() + + context.page.on('dialog', handle_dialog) + + # Extract data from the page. + # future work: try to use a visual-llm do all the job... + text = await context.page.inner_text('body') + wiseflow_logger.debug(f"got text: {text}") + + html = await context.page.inner_html('body') + soup = BeautifulSoup(html, 'html.parser') + links = soup.find_all('a', href=True) + base_url = context.request.url + link_dict = {} + for a in links: + new_url = a.get('href') + text = a.text.strip() + if new_url and text: + absolute_url = urljoin(base_url, new_url) + link_dict[text] = absolute_url + wiseflow_logger.debug(f'found {len(link_dict)} more links') + + screenshot_file_name = f"{hashlib.sha256(context.request.url.encode()).hexdigest()}.png" + await context.page.screenshot(path=os.path.join(screenshot_dir, screenshot_file_name), full_page=True) + wiseflow_logger.debug(f'screenshot saved to {screenshot_file_name}') + + # get infos by llm + infos, author, publish_date, related_urls = await ie(text, link_dict, base_url, wiseflow_logger) + if infos: + # get author and publish date by llm + wiseflow_logger.debug(f'LLM result -- author: {author}, publish_date: {publish_date}') + article = { + 'url': context.request.url, + 'title': await context.page.title(), + 'author': author, + 'publish_date': publish_date, + 'screenshot': os.path.join(screenshot_dir, screenshot_file_name), + 'tags': [info['name'] for info in infos] + } + await save_to_pb(article, infos) + + # find any related urls + related_urls = await get_more_related_urls(html, wiseflow_logger) + wiseflow_logger.debug(f'got {len(related_urls)} more urls') + if related_urls: + async with lock: + new_urls = related_urls - existing_urls + working_list.update(new_urls) + + # todo: use llm to determine next action + + while working_list: + async with lock: + if not working_list: + break + url = working_list.pop() + existing_urls.add(url) + parsed_url = urlparse(url) + domain = parsed_url.netloc + if domain in customer_crawler_map: + wiseflow_logger.debug(f'routed to customer process for {domain}') + try: + article, infos, related_urls = await customer_crawler_map[domain](url, wiseflow_logger) + except Exception as e: + wiseflow_logger.error(f'error occurred in crawling {url}: {e}') + continue + + if infos and article: + wiseflow_logger.debug("receiving new infos from customer crawler, saving to pb") + article['tags'] = [info['name'] for info in infos] + await save_to_pb(article, infos) + if related_urls: + wiseflow_logger.debug('receiving new related_urls from customer crawler, adding to working_list') + new_urls = related_urls - existing_urls + working_list.update(new_urls) + continue + try: + await crawler.run([url]) + except Exception as e: + wiseflow_logger.error(f'error occurred in crawling {url}: {e}') + + +if __name__ == '__main__': + import asyncio + + asyncio.run(pipeline()) diff --git a/core/pb/pb_migrations/1733204939_created_articles.js b/core/pb/pb_migrations/1733204939_created_articles.js new file mode 100644 index 0000000..a2816a1 --- /dev/null +++ b/core/pb/pb_migrations/1733204939_created_articles.js @@ -0,0 +1,121 @@ +/// +migrate((app) => { + const collection = new Collection({ + "createRule": null, + "deleteRule": null, + "fields": [ + { + "autogeneratePattern": "[a-z0-9]{15}", + "hidden": false, + "id": "text3208210256", + "max": 15, + "min": 15, + "name": "id", + "pattern": "^[a-z0-9]+$", + "presentable": false, + "primaryKey": true, + "required": true, + "system": true, + "type": "text" + }, + { + "exceptDomains": null, + "hidden": false, + "id": "url4101391790", + "name": "url", + "onlyDomains": null, + "presentable": false, + "required": false, + "system": false, + "type": "url" + }, + { + "autogeneratePattern": "", + "hidden": false, + "id": "text724990059", + "max": 0, + "min": 0, + "name": "title", + "pattern": "", + "presentable": false, + "primaryKey": false, + "required": false, + "system": false, + "type": "text" + }, + { + "autogeneratePattern": "", + "hidden": false, + "id": "text3182418120", + "max": 0, + "min": 0, + "name": "author", + "pattern": "", + "presentable": false, + "primaryKey": false, + "required": false, + "system": false, + "type": "text" + }, + { + "hidden": false, + "id": "date2025149370", + "max": "", + "min": "", + "name": "publish_date", + "presentable": false, + "required": false, + "system": false, + "type": "date" + }, + { + "hidden": false, + "id": "file1486429761", + "maxSelect": 1, + "maxSize": 0, + "mimeTypes": [], + "name": "screenshot", + "presentable": false, + "protected": false, + "required": false, + "system": false, + "thumbs": [], + "type": "file" + }, + { + "hidden": false, + "id": "autodate2990389176", + "name": "created", + "onCreate": true, + "onUpdate": false, + "presentable": false, + "system": false, + "type": "autodate" + }, + { + "hidden": false, + "id": "autodate3332085495", + "name": "updated", + "onCreate": true, + "onUpdate": true, + "presentable": false, + "system": false, + "type": "autodate" + } + ], + "id": "pbc_4287850865", + "indexes": [], + "listRule": null, + "name": "articles", + "system": false, + "type": "base", + "updateRule": null, + "viewRule": null + }); + + return app.save(collection); +}, (app) => { + const collection = app.findCollectionByNameOrId("pbc_4287850865"); + + return app.delete(collection); +}) diff --git a/core/pb/pb_migrations/1733234529_created_focus_points.js b/core/pb/pb_migrations/1733234529_created_focus_points.js new file mode 100644 index 0000000..eb6b8c2 --- /dev/null +++ b/core/pb/pb_migrations/1733234529_created_focus_points.js @@ -0,0 +1,94 @@ +/// +migrate((app) => { + const collection = new Collection({ + "createRule": null, + "deleteRule": null, + "fields": [ + { + "autogeneratePattern": "[a-z0-9]{15}", + "hidden": false, + "id": "text3208210256", + "max": 15, + "min": 15, + "name": "id", + "pattern": "^[a-z0-9]+$", + "presentable": false, + "primaryKey": true, + "required": true, + "system": true, + "type": "text" + }, + { + "autogeneratePattern": "", + "hidden": false, + "id": "text2695655862", + "max": 0, + "min": 0, + "name": "focuspoint", + "pattern": "", + "presentable": false, + "primaryKey": false, + "required": false, + "system": false, + "type": "text" + }, + { + "autogeneratePattern": "", + "hidden": false, + "id": "text2284106510", + "max": 0, + "min": 0, + "name": "explanation", + "pattern": "", + "presentable": false, + "primaryKey": false, + "required": false, + "system": false, + "type": "text" + }, + { + "hidden": false, + "id": "bool806155165", + "name": "activated", + "presentable": false, + "required": false, + "system": false, + "type": "bool" + }, + { + "hidden": false, + "id": "autodate2990389176", + "name": "created", + "onCreate": true, + "onUpdate": false, + "presentable": false, + "system": false, + "type": "autodate" + }, + { + "hidden": false, + "id": "autodate3332085495", + "name": "updated", + "onCreate": true, + "onUpdate": true, + "presentable": false, + "system": false, + "type": "autodate" + } + ], + "id": "pbc_3385864241", + "indexes": [], + "listRule": null, + "name": "focus_points", + "system": false, + "type": "base", + "updateRule": null, + "viewRule": null + }); + + return app.save(collection); +}, (app) => { + const collection = app.findCollectionByNameOrId("pbc_3385864241"); + + return app.delete(collection); +}) diff --git a/core/pb/pb_migrations/1733234644_updated_focus_points.js b/core/pb/pb_migrations/1733234644_updated_focus_points.js new file mode 100644 index 0000000..1fce9b3 --- /dev/null +++ b/core/pb/pb_migrations/1733234644_updated_focus_points.js @@ -0,0 +1,42 @@ +/// +migrate((app) => { + const collection = app.findCollectionByNameOrId("pbc_3385864241") + + // update field + collection.fields.addAt(1, new Field({ + "autogeneratePattern": "", + "hidden": false, + "id": "text2695655862", + "max": 0, + "min": 0, + "name": "focuspoint", + "pattern": "", + "presentable": false, + "primaryKey": false, + "required": true, + "system": false, + "type": "text" + })) + + return app.save(collection) +}, (app) => { + const collection = app.findCollectionByNameOrId("pbc_3385864241") + + // update field + collection.fields.addAt(1, new Field({ + "autogeneratePattern": "", + "hidden": false, + "id": "text2695655862", + "max": 0, + "min": 0, + "name": "focuspoint", + "pattern": "", + "presentable": false, + "primaryKey": false, + "required": false, + "system": false, + "type": "text" + })) + + return app.save(collection) +}) diff --git a/core/run_all.sh b/core/run_all.sh new file mode 100755 index 0000000..a90f212 --- /dev/null +++ b/core/run_all.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# 从 .env 文件中加载环境变量 +if [ -f .env ]; then + export $(grep -v '^#' .env | xargs) +fi + +# 启动 PocketBase +pb/pocketbase serve --http=0.0.0.0:8090 & +pocketbase_pid=$! + +# 启动 Python 任务 +python tasks.py & +python_pid=$! + +# 启动 Uvicorn +# uvicorn backend:app --reload --host 0.0.0.0 --port 8077 & +# uvicorn_pid=$! + +# 定义信号处理函数 +trap 'kill $pocketbase_pid $python_pid' SIGINT SIGTERM + +# 等待所有进程结束 +wait \ No newline at end of file diff --git a/dashboard/backend.py b/dashboard/backend.py new file mode 100644 index 0000000..8eb3105 --- /dev/null +++ b/dashboard/backend.py @@ -0,0 +1,48 @@ +from fastapi import FastAPI, BackgroundTasks +from pydantic import BaseModel +from typing import Literal, Optional +from fastapi.middleware.cors import CORSMiddleware + + +# backend的操作也应该是针对 pb 操作的,即添加信源、兴趣点等都应该存入 pb,而不是另起一个进程实例 +# 当然也可以放弃 pb,但那是另一个问题,数据和设置的管理应该是一套 +# 简单说用户侧(api dashboard等)和 core侧 不应该直接对接,都应该通过底层的data infrastructure 进行 + +class Request(BaseModel): + """ + Input model + input = {'user_id': str, 'type': str, 'content':str, 'addition': Optional[str]} + Type is one of "text", "publicMsg", "site" and "url"; + """ + user_id: str + type: Literal["text", "publicMsg", "file", "image", "video", "location", "chathistory", "site", "attachment", "url"] + content: str + addition: Optional[str] = None + + +app = FastAPI( + title="WiseFlow Union Backend", + description="From Wiseflow Team.", + version="0.3.1", + openapi_url="/openapi.json" +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + +@app.get("/") +def read_root(): + msg = "Hello, this is Wise Union Backend, version 0.3.1" + return {"msg": msg} + + +@app.post("/feed") +async def call_to_feed(background_tasks: BackgroundTasks, request: Request): + background_tasks.add_task(message_manager, _input=request.model_dump()) + return {"msg": "received well"} diff --git a/general_crawler_test.py b/general_crawler_test.py new file mode 100644 index 0000000..b818596 --- /dev/null +++ b/general_crawler_test.py @@ -0,0 +1,13 @@ +from core.custom_process import crawler as general_crawler +from pprint import pprint +import asyncio + +test_list = ['http://society.people.com.cn/n1/2024/1202/c1008-40373268.html'] + +async def crawler_test(): + for test in test_list: + data = await general_crawler.run([test]) + print(type(data)) + +if __name__ == '__main__': + asyncio.run(crawler_test())