diff --git a/client/backend/__init__.py b/client/backend/__init__.py index d582270..0f5f128 100644 --- a/client/backend/__init__.py +++ b/client/backend/__init__.py @@ -3,25 +3,9 @@ import time import json import uuid from pb_api import PbTalker -from get_report import get_report +from get_report import get_report, logger, pb from get_search import search_insight from tranlsation_volcengine import text_translate -from general_utils import get_logger_level -from loguru import logger - -project_dir = os.environ.get("PROJECT_DIR", "") -os.makedirs(project_dir, exist_ok=True) -logger_file = os.path.join(project_dir, 'backend_service.log') -dsw_log = get_logger_level() - -logger.add( - logger_file, - level=dsw_log, - backtrace=True, - diagnose=True, - rotation="50 MB" -) -pb = PbTalker(logger) class BackendService: @@ -72,7 +56,7 @@ class BackendService: memory = '' docx_file = os.path.join(self.cache_url, f'{insight_id}_{uuid.uuid4()}.docx') - flag, memory = get_report(content, article_list, memory, topics, comment, docx_file, logger=logger) + flag, memory = get_report(content, article_list, memory, topics, comment, docx_file) self.memory[insight_id] = memory if flag: @@ -80,7 +64,7 @@ class BackendService: message = pb.upload('insights', insight_id, 'docx', f'{insight_id}.docx', file) file.close() if message: - logger.debug(f'report success finish and update to pb-{message}') + logger.debug(f'report success finish and update to: {message}') return self.build_out(11, message) else: logger.error(f'{insight_id} report generate successfully, however failed to update to pb.') @@ -204,7 +188,7 @@ class BackendService: message = pb.update(collection_name='insights', id=insight_id, body={'articles': article_ids}) if message: - logger.debug(f'insight search success finish and update to pb-{message}') + logger.debug(f'insight search success finish and update to: {message}') return self.build_out(11, insight_id) else: logger.error(f'{insight_id} search success, however failed to update to pb.') diff --git a/client/backend/background_task.py b/client/backend/background_task.py index 832328f..b944fdb 100644 --- a/client/backend/background_task.py +++ b/client/backend/background_task.py @@ -3,27 +3,11 @@ """ import schedule import time -import os +from get_insight import pb, logger from work_process import ServiceProcesser -from general_utils import get_logger_level -from loguru import logger -from pb_api import PbTalker -project_dir = os.environ.get("PROJECT_DIR", "") -os.makedirs(project_dir, exist_ok=True) -logger_file = os.path.join(project_dir, 'scanning_task.log') -dsw_log = get_logger_level() -logger.add( - logger_file, - level=dsw_log, - backtrace=True, - diagnose=True, - rotation="50 MB" -) -pb = PbTalker(logger) - -sp = ServiceProcesser(pb=pb, logger=logger) +sp = ServiceProcesser() counter = 0 @@ -37,8 +21,8 @@ def task(): continue if counter % site['per_hours'] == 0: urls.append(site['url']) - print(f'\033[0;32m task execute loop {counter}\033[0m') - print(urls) + logger.info(f'\033[0;32m task execute loop {counter}\033[0m') + logger.info(urls) if urls: sp(sites=urls) else: diff --git a/client/backend/get_insight.py b/client/backend/get_insight.py index 9a7f372..61e7136 100644 --- a/client/backend/get_insight.py +++ b/client/backend/get_insight.py @@ -8,7 +8,24 @@ from general_utils import isChinesePunctuation, is_chinese from tranlsation_volcengine import text_translate import time import re -from pb_api import pb +import os +from general_utils import get_logger_level +from loguru import logger +from pb_api import PbTalker + +project_dir = os.environ.get("PROJECT_DIR", "") +os.makedirs(project_dir, exist_ok=True) +logger_file = os.path.join(project_dir, 'scanning_task.log') +dsw_log = get_logger_level() + +logger.add( + logger_file, + level=dsw_log, + backtrace=True, + diagnose=True, + rotation="50 MB" +) +pb = PbTalker(logger) max_tokens = 4000 @@ -69,7 +86,7 @@ _rewrite_insight_prompt = f'''你是一名{character},你将被给到一个新 不管新闻列表是何种语言,请仅用中文输出分析结果。''' -def _parse_insight(article_text: str, cache: dict, logger=None) -> (bool, dict): +def _parse_insight(article_text: str, cache: dict) -> (bool, dict): input_length = len(cache) result = dashscope_llm([{'role': 'system', 'content': _first_stage_prompt}, {'role': 'user', 'content': article_text}], 'qwen1.5-72b-chat', logger=logger) @@ -116,7 +133,7 @@ def _parse_insight(article_text: str, cache: dict, logger=None) -> (bool, dict): return False, cache -def _rewrite_insight(context: str, logger=None) -> (bool, str): +def _rewrite_insight(context: str) -> (bool, str): result = dashscope_llm([{'role': 'system', 'content': _rewrite_insight_prompt}, {'role': 'user', 'content': context}], 'qwen1.5-72b-chat', logger=logger) if result: @@ -142,7 +159,7 @@ def _rewrite_insight(context: str, logger=None) -> (bool, str): return False, text -def get_insight(articles: dict, titles: dict, logger=None) -> list: +def get_insight(articles: dict, titles: dict) -> list: context = '' cache = {} for value in articles.values(): @@ -162,7 +179,7 @@ def get_insight(articles: dict, titles: dict, logger=None) -> list: if len(context) < max_tokens: continue - flag, cache = _parse_insight(context, cache, logger) + flag, cache = _parse_insight(context, cache) if flag: logger.warning(f'following articles may not be completely analyzed: \n{context}') @@ -170,7 +187,7 @@ def get_insight(articles: dict, titles: dict, logger=None) -> list: # 据说频繁调用会引发性能下降,每次调用后休息1s。现在轮替调用qwen-72b和max,所以不必了。 time.sleep(1) if context: - flag, cache = _parse_insight(context, cache, logger) + flag, cache = _parse_insight(context, cache) if flag: logger.warning(f'following articles may not be completely analyzed: \n{context}') @@ -230,7 +247,7 @@ def get_insight(articles: dict, titles: dict, logger=None) -> list: if not context: continue - flag, new_insight = _rewrite_insight(context, logger) + flag, new_insight = _rewrite_insight(context) if flag: logger.warning(f'insight {key} may contain wrong') cache[key] = value diff --git a/client/backend/get_report.py b/client/backend/get_report.py index 3b3e1b3..d3e6e17 100644 --- a/client/backend/get_report.py +++ b/client/backend/get_report.py @@ -1,5 +1,6 @@ import random import re +import os from llms.dashscope_wrapper import dashscope_llm from docx import Document from docx.oxml.ns import qn @@ -7,7 +8,23 @@ from docx.shared import Pt, RGBColor from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from datetime import datetime from general_utils import isChinesePunctuation -from pb_api import pb +from general_utils import get_logger_level +from loguru import logger +from pb_api import PbTalker + +project_dir = os.environ.get("PROJECT_DIR", "") +os.makedirs(project_dir, exist_ok=True) +logger_file = os.path.join(project_dir, 'backend_service.log') +dsw_log = get_logger_level() + +logger.add( + logger_file, + level=dsw_log, + backtrace=True, + diagnose=True, + rotation="50 MB" +) +pb = PbTalker(logger) # qwen-72b-chat支持最大30k输入,考虑prompt其他部分,content不应超过30000字符长度 # 如果换qwen-max(最大输入6k),这里就要换成6000,但这样很多文章不能分析了 @@ -34,7 +51,7 @@ if not report_type: _ = pb.update(collection_name='roleplays', id=_role_config_id, body={'report_type': report_type}) -def get_report(insigt: str, articles: list[dict], memory: str, topics: list[str], comment: str, docx_file: str, logger=None) -> (bool, str): +def get_report(insigt: str, articles: list[dict], memory: str, topics: list[str], comment: str, docx_file: str) -> (bool, str): zh_index = ['一', '二', '三', '四', '五', '六', '七', '八', '九', '十', '十一', '十二'] if isChinesePunctuation(insigt[-1]): diff --git a/client/backend/work_process.py b/client/backend/work_process.py index 18bf939..6b8f0eb 100644 --- a/client/backend/work_process.py +++ b/client/backend/work_process.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta, date from scrapers import scraper_map from scrapers.general_scraper import general_scraper from urllib.parse import urlparse -from get_insight import get_insight +from get_insight import get_insight, pb, logger from general_utils import is_chinese from tranlsation_volcengine import text_translate import concurrent.futures @@ -18,13 +18,11 @@ expiration_str = expiration_date.strftime("%Y%m%d") class ServiceProcesser: - def __init__(self, pb, logger, record_snapshot: bool = False): + def __init__(self, record_snapshot: bool = False): self.project_dir = os.environ.get("PROJECT_DIR", "") # 1. base initialization self.cache_url = os.path.join(self.project_dir, 'scanning_task') os.makedirs(self.cache_url, exist_ok=True) - self.pb = pb - self.logger = logger # 2. load the llm # self.llm = LocalLlmWrapper() # if you use the local-llm @@ -36,16 +34,16 @@ class ServiceProcesser: else: self.snap_short_server = None - self.logger.info('scanning task init success.') + logger.info('scanning task init success.') def __call__(self, expiration: date = expiration_date, sites: list[str] = None): # 先清空一下cache - self.logger.info(f'wake, prepare to work, now is {datetime.now()}') + logger.info(f'wake, prepare to work, now is {datetime.now()}') cache = {} - self.logger.debug(f'clear cache -- {cache}') + logger.debug(f'clear cache -- {cache}') # 从pb数据库中读取所有文章url # 这里publish_time用int格式,综合考虑下这个是最容易操作的模式,虽然糙了点 - existing_articles = self.pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}') + existing_articles = pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}') all_title = {} existings = [] for article in existing_articles: @@ -61,13 +59,13 @@ class ServiceProcesser: if site in scraper_map: futures.append(executor.submit(scraper_map[site], expiration, existings)) else: - futures.append(executor.submit(general_scraper, site, expiration, existings, self.logger)) + futures.append(executor.submit(general_scraper, site, expiration, existings, logger)) concurrent.futures.wait(futures) for future in futures: try: new_articles.extend(future.result()) except Exception as e: - self.logger.error(f'error when scraping-- {e}') + logger.error(f'error when scraping-- {e}') for value in new_articles: if not value: @@ -80,81 +78,81 @@ class ServiceProcesser: value['content'] = f"({from_site} 报道){value['content']}" value['images'] = json.dumps(value['images']) - article_id = self.pb.add(collection_name='articles', body=value) + article_id = pb.add(collection_name='articles', body=value) if article_id: cache[article_id] = value all_title[value['title']] = article_id else: - self.logger.warning(f'add article {value} failed, writing to cache_file') + logger.warning(f'add article {value} failed, writing to cache_file') with open(os.path.join(self.cache_url, 'cache_articles.json'), 'a', encoding='utf-8') as f: json.dump(value, f, ensure_ascii=False, indent=4) if not cache: - self.logger.warning(f'no new articles. now is {datetime.now()}') + logger.warning(f'no new articles. now is {datetime.now()}') return # insight 流程 - new_insights = get_insight(cache, all_title, logger=self.logger) + new_insights = get_insight(cache, all_title) if new_insights: for insight in new_insights: if not insight['content']: continue - insight_id = self.pb.add(collection_name='insights', body=insight) + insight_id = pb.add(collection_name='insights', body=insight) if not insight_id: - self.logger.warning(f'write insight {insight} to pb failed, writing to cache_file') + logger.warning(f'write insight {insight} to pb failed, writing to cache_file') with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', encoding='utf-8') as f: json.dump(insight, f, ensure_ascii=False, indent=4) for article_id in insight['articles']: - raw_article = self.pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"') + raw_article = pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"') if not raw_article or not raw_article[0]: - self.logger.warning(f'get article {article_id} failed, skipping') + logger.warning(f'get article {article_id} failed, skipping') continue if raw_article[0]['translation_result']: continue if is_chinese(raw_article[0]['title']): continue - translate_text = text_translate([raw_article[0]['title'], raw_article[0]['abstract']], target_language='zh', logger=self.logger) + translate_text = text_translate([raw_article[0]['title'], raw_article[0]['abstract']], target_language='zh', logger=logger) if translate_text: - related_id = self.pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id}) + related_id = pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id}) if not related_id: - self.logger.warning(f'write article_translation {article_id} failed') + logger.warning(f'write article_translation {article_id} failed') else: - _ = self.pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id}) + _ = pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id}) if not _: - self.logger.warning(f'update article {article_id} failed') + logger.warning(f'update article {article_id} failed') else: - self.logger.warning(f'translate article {article_id} failed') + logger.warning(f'translate article {article_id} failed') else: # 尝试把所有文章的title作为insigts,这是备选方案 if len(cache) < 25: - self.logger.info('generate_insights-warning: no insights and no more than 25 articles so use article title as insights') + logger.info('generate_insights-warning: no insights and no more than 25 articles so use article title as insights') for key, value in cache.items(): if value['title']: if is_chinese(value['title']): text_for_insight = value['title'] else: - text_for_insight = text_translate([value['title']], logger=self.logger) + text_for_insight = text_translate([value['title']], logger=logger) if text_for_insight: - insight_id = self.pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]}) + insight_id = pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]}) if not insight_id: - self.logger.warning(f'write insight {text_for_insight[0]} to pb failed, writing to cache_file') + logger.warning(f'write insight {text_for_insight[0]} to pb failed, writing to cache_file') with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', encoding='utf-8') as f: json.dump({'content': text_for_insight[0], 'articles': [key]}, f, ensure_ascii=False, indent=4) else: - self.logger.warning('generate_insights-error: can not generate insights, pls re-try') - self.logger.info(f'work done, now is {datetime.now()}') + logger.warning('generate_insights-error: can not generate insights, pls re-try') + logger.info(f'work done, now is {datetime.now()}') if self.snap_short_server: - self.logger.info(f'now starting article snapshot with {self.snap_short_server}') + logger.info(f'now starting article snapshot with {self.snap_short_server}') for key, value in cache.items(): if value['url']: try: snapshot = requests.get(f"{self.snap_short_server}/zip", {'url': value['url']}, timeout=60) file = open(snapshot.text, 'rb') - _ = self.pb.upload('articles', key, 'snapshot', key, file) + _ = pb.upload('articles', key, 'snapshot', key, file) file.close() except Exception as e: - self.logger.warning(f'error when snapshot {value["url"]}, {e}') - self.logger.info(f'now snapshot done, now is {datetime.now()}') + logger.warning(f'error when snapshot {value["url"]}, {e}') + logger.info(f'now snapshot done, now is {datetime.now()}')