wiseflow/core/general_process.py

119 lines
4.5 KiB
Python
Raw Normal View History

2024-12-05 20:45:39 +08:00
# -*- coding: utf-8 -*-
from utils.pb_api import PbTalker
2024-12-27 14:07:37 +08:00
from utils.general_utils import get_logger
2024-12-05 20:45:39 +08:00
from agents.get_info import GeneralInfoExtractor
from bs4 import BeautifulSoup
import os
import json
import asyncio
2024-12-27 14:07:37 +08:00
from scrapers import *
from urllib.parse import urlparse
2024-12-09 18:18:10 +08:00
from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext, PlaywrightPreNavigationContext
2024-12-08 21:30:39 +08:00
from datetime import datetime, timedelta
2024-12-05 20:45:39 +08:00
project_dir = os.environ.get("PROJECT_DIR", "")
if project_dir:
os.makedirs(project_dir, exist_ok=True)
2024-12-06 11:42:22 +08:00
2024-12-05 20:45:39 +08:00
os.environ['CRAWLEE_STORAGE_DIR'] = os.path.join(project_dir, 'crawlee_storage')
screenshot_dir = os.path.join(project_dir, 'crawlee_storage', 'screenshots')
wiseflow_logger = get_logger('general_process', project_dir)
2024-12-05 20:45:39 +08:00
pb = PbTalker(wiseflow_logger)
2024-12-06 11:42:22 +08:00
gie = GeneralInfoExtractor(pb, wiseflow_logger)
2024-12-27 14:07:37 +08:00
one_month_ago = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
existing_urls = {url['url'] for url in pb.read(collection_name='infos', fields=['url'], filter=f"created>='{one_month_ago}'")}
2024-12-05 20:45:39 +08:00
2024-12-27 14:07:37 +08:00
async def save_to_pb(url: str, url_title: str, infos: list):
2024-12-05 20:45:39 +08:00
# saving to pb process
for info in infos:
2024-12-10 14:18:03 +08:00
info['url'] = url
2024-12-27 14:07:37 +08:00
info['url_title'] = url_title
2024-12-06 11:42:22 +08:00
_ = pb.add(collection_name='infos', body=info)
2024-12-05 20:45:39 +08:00
if not _:
2024-12-06 11:42:22 +08:00
wiseflow_logger.error('add info failed, writing to cache_file')
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
with open(os.path.join(project_dir, f'{timestamp}_cache_infos.json'), 'w', encoding='utf-8') as f:
2024-12-05 20:45:39 +08:00
json.dump(info, f, ensure_ascii=False, indent=4)
crawler = PlaywrightCrawler(
# Limit the crawl to max requests. Remove or increase it for crawling all links.
# max_requests_per_crawl=1,
2024-12-18 22:45:20 +08:00
max_request_retries=1,
2024-12-08 21:30:39 +08:00
request_handler_timeout=timedelta(minutes=5),
)
2024-12-09 18:18:10 +08:00
@crawler.pre_navigation_hook
async def log_navigation_url(context: PlaywrightPreNavigationContext) -> None:
context.log.info(f'Navigating to {context.request.url} ...')
@crawler.router.default_handler
async def request_handler(context: PlaywrightCrawlingContext) -> None:
2024-12-27 14:07:37 +08:00
await context.page.wait_for_load_state('networkidle')
await context.page.wait_for_timeout(2000)
# Handle dialogs (alerts, confirms, prompts)
async def handle_dialog(dialog):
context.log.info(f'Closing dialog: {dialog.message}')
await dialog.accept()
context.page.on('dialog', handle_dialog)
2024-12-27 14:07:37 +08:00
context.log.info('successfully finish fetching')
2025-01-02 10:14:33 +08:00
wiseflow_logger.info(context.request.url)
2024-12-27 14:07:37 +08:00
html = await context.page.inner_html('head')
soup = BeautifulSoup(html, 'html.parser')
web_title = soup.find('title')
if web_title:
web_title = web_title.get_text().strip()
else:
web_title = ''
parsed_url = urlparse(context.request.url)
domain = parsed_url.netloc
2024-12-27 14:07:37 +08:00
base_tag = soup.find('base', href=True)
if base_tag and base_tag.get('href'):
base_url = base_tag['href']
else:
2025-01-03 13:17:24 +08:00
# 如果没有 base 标签,使用当前页面的 URL 路径作为 base url
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
if not base_url.endswith('/'):
# 如果路径不以 / 结尾,则去掉最后一个路径段
base_url = base_url.rsplit('/', 1)[0] + '/'
2024-12-27 14:07:37 +08:00
html = await context.page.inner_html('body')
if domain in custom_scrapers:
action_dict, link_dict, text = custom_scrapers[domain](html, base_url)
else:
action_dict, link_dict, text = general_scraper(html, base_url)
is_list, results = await gie(link_dict, text, base_url)
if is_list and results:
new_urls = [url for url in results if url != base_url and
url != context.request.url and
url not in existing_urls]
if new_urls:
await context.add_requests(new_urls)
existing_urls.update(new_urls)
return
if results:
await save_to_pb(context.request.url, web_title, results)
2024-12-05 20:45:39 +08:00
# todo: use llm to determine next action
2024-12-10 14:18:03 +08:00
"""
screenshot_file_name = f"{hashlib.sha256(context.request.url.encode()).hexdigest()}.png"
await context.page.screenshot(path=os.path.join(screenshot_dir, screenshot_file_name), full_page=True)
wiseflow_logger.debug(f'screenshot saved to {screenshot_file_name}')
"""
2024-12-05 20:45:39 +08:00
if __name__ == '__main__':
sites = pb.read('sites', filter='activated=True')
wiseflow_logger.info('execute all sites one time')
async def run_all_sites():
await crawler.run([site['url'].rstrip('/') for site in sites])
2024-12-05 20:45:39 +08:00
asyncio.run(run_all_sites())