This commit is contained in:
bigbrother666 2024-04-30 12:05:44 +08:00
parent 148e145365
commit cb32415d28
5 changed files with 83 additions and 83 deletions

View File

@ -3,25 +3,9 @@ import time
import json
import uuid
from pb_api import PbTalker
from get_report import get_report
from get_report import get_report, logger, pb
from get_search import search_insight
from tranlsation_volcengine import text_translate
from general_utils import get_logger_level
from loguru import logger
project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger_file = os.path.join(project_dir, 'backend_service.log')
dsw_log = get_logger_level()
logger.add(
logger_file,
level=dsw_log,
backtrace=True,
diagnose=True,
rotation="50 MB"
)
pb = PbTalker(logger)
class BackendService:
@ -72,7 +56,7 @@ class BackendService:
memory = ''
docx_file = os.path.join(self.cache_url, f'{insight_id}_{uuid.uuid4()}.docx')
flag, memory = get_report(content, article_list, memory, topics, comment, docx_file, logger=logger)
flag, memory = get_report(content, article_list, memory, topics, comment, docx_file)
self.memory[insight_id] = memory
if flag:
@ -80,7 +64,7 @@ class BackendService:
message = pb.upload('insights', insight_id, 'docx', f'{insight_id}.docx', file)
file.close()
if message:
logger.debug(f'report success finish and update to pb-{message}')
logger.debug(f'report success finish and update to: {message}')
return self.build_out(11, message)
else:
logger.error(f'{insight_id} report generate successfully, however failed to update to pb.')
@ -204,7 +188,7 @@ class BackendService:
message = pb.update(collection_name='insights', id=insight_id, body={'articles': article_ids})
if message:
logger.debug(f'insight search success finish and update to pb-{message}')
logger.debug(f'insight search success finish and update to: {message}')
return self.build_out(11, insight_id)
else:
logger.error(f'{insight_id} search success, however failed to update to pb.')

View File

@ -3,27 +3,11 @@
"""
import schedule
import time
import os
from get_insight import pb, logger
from work_process import ServiceProcesser
from general_utils import get_logger_level
from loguru import logger
from pb_api import PbTalker
project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger_file = os.path.join(project_dir, 'scanning_task.log')
dsw_log = get_logger_level()
logger.add(
logger_file,
level=dsw_log,
backtrace=True,
diagnose=True,
rotation="50 MB"
)
pb = PbTalker(logger)
sp = ServiceProcesser(pb=pb, logger=logger)
sp = ServiceProcesser()
counter = 0
@ -37,8 +21,8 @@ def task():
continue
if counter % site['per_hours'] == 0:
urls.append(site['url'])
print(f'\033[0;32m task execute loop {counter}\033[0m')
print(urls)
logger.info(f'\033[0;32m task execute loop {counter}\033[0m')
logger.info(urls)
if urls:
sp(sites=urls)
else:

View File

@ -8,7 +8,24 @@ from general_utils import isChinesePunctuation, is_chinese
from tranlsation_volcengine import text_translate
import time
import re
from pb_api import pb
import os
from general_utils import get_logger_level
from loguru import logger
from pb_api import PbTalker
project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger_file = os.path.join(project_dir, 'scanning_task.log')
dsw_log = get_logger_level()
logger.add(
logger_file,
level=dsw_log,
backtrace=True,
diagnose=True,
rotation="50 MB"
)
pb = PbTalker(logger)
max_tokens = 4000
@ -69,7 +86,7 @@ _rewrite_insight_prompt = f'''你是一名{character},你将被给到一个新
不管新闻列表是何种语言请仅用中文输出分析结果'''
def _parse_insight(article_text: str, cache: dict, logger=None) -> (bool, dict):
def _parse_insight(article_text: str, cache: dict) -> (bool, dict):
input_length = len(cache)
result = dashscope_llm([{'role': 'system', 'content': _first_stage_prompt}, {'role': 'user', 'content': article_text}],
'qwen1.5-72b-chat', logger=logger)
@ -116,7 +133,7 @@ def _parse_insight(article_text: str, cache: dict, logger=None) -> (bool, dict):
return False, cache
def _rewrite_insight(context: str, logger=None) -> (bool, str):
def _rewrite_insight(context: str) -> (bool, str):
result = dashscope_llm([{'role': 'system', 'content': _rewrite_insight_prompt}, {'role': 'user', 'content': context}],
'qwen1.5-72b-chat', logger=logger)
if result:
@ -142,7 +159,7 @@ def _rewrite_insight(context: str, logger=None) -> (bool, str):
return False, text
def get_insight(articles: dict, titles: dict, logger=None) -> list:
def get_insight(articles: dict, titles: dict) -> list:
context = ''
cache = {}
for value in articles.values():
@ -162,7 +179,7 @@ def get_insight(articles: dict, titles: dict, logger=None) -> list:
if len(context) < max_tokens:
continue
flag, cache = _parse_insight(context, cache, logger)
flag, cache = _parse_insight(context, cache)
if flag:
logger.warning(f'following articles may not be completely analyzed: \n{context}')
@ -170,7 +187,7 @@ def get_insight(articles: dict, titles: dict, logger=None) -> list:
# 据说频繁调用会引发性能下降每次调用后休息1s。现在轮替调用qwen-72b和max所以不必了。
time.sleep(1)
if context:
flag, cache = _parse_insight(context, cache, logger)
flag, cache = _parse_insight(context, cache)
if flag:
logger.warning(f'following articles may not be completely analyzed: \n{context}')
@ -230,7 +247,7 @@ def get_insight(articles: dict, titles: dict, logger=None) -> list:
if not context:
continue
flag, new_insight = _rewrite_insight(context, logger)
flag, new_insight = _rewrite_insight(context)
if flag:
logger.warning(f'insight {key} may contain wrong')
cache[key] = value

View File

@ -1,5 +1,6 @@
import random
import re
import os
from llms.dashscope_wrapper import dashscope_llm
from docx import Document
from docx.oxml.ns import qn
@ -7,7 +8,23 @@ from docx.shared import Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from datetime import datetime
from general_utils import isChinesePunctuation
from pb_api import pb
from general_utils import get_logger_level
from loguru import logger
from pb_api import PbTalker
project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger_file = os.path.join(project_dir, 'backend_service.log')
dsw_log = get_logger_level()
logger.add(
logger_file,
level=dsw_log,
backtrace=True,
diagnose=True,
rotation="50 MB"
)
pb = PbTalker(logger)
# qwen-72b-chat支持最大30k输入考虑prompt其他部分content不应超过30000字符长度
# 如果换qwen-max最大输入6k),这里就要换成6000,但这样很多文章不能分析了
@ -34,7 +51,7 @@ if not report_type:
_ = pb.update(collection_name='roleplays', id=_role_config_id, body={'report_type': report_type})
def get_report(insigt: str, articles: list[dict], memory: str, topics: list[str], comment: str, docx_file: str, logger=None) -> (bool, str):
def get_report(insigt: str, articles: list[dict], memory: str, topics: list[str], comment: str, docx_file: str) -> (bool, str):
zh_index = ['', '', '', '', '', '', '', '', '', '', '十一', '十二']
if isChinesePunctuation(insigt[-1]):

View File

@ -5,7 +5,7 @@ from datetime import datetime, timedelta, date
from scrapers import scraper_map
from scrapers.general_scraper import general_scraper
from urllib.parse import urlparse
from get_insight import get_insight
from get_insight import get_insight, pb, logger
from general_utils import is_chinese
from tranlsation_volcengine import text_translate
import concurrent.futures
@ -18,13 +18,11 @@ expiration_str = expiration_date.strftime("%Y%m%d")
class ServiceProcesser:
def __init__(self, pb, logger, record_snapshot: bool = False):
def __init__(self, record_snapshot: bool = False):
self.project_dir = os.environ.get("PROJECT_DIR", "")
# 1. base initialization
self.cache_url = os.path.join(self.project_dir, 'scanning_task')
os.makedirs(self.cache_url, exist_ok=True)
self.pb = pb
self.logger = logger
# 2. load the llm
# self.llm = LocalLlmWrapper() # if you use the local-llm
@ -36,16 +34,16 @@ class ServiceProcesser:
else:
self.snap_short_server = None
self.logger.info('scanning task init success.')
logger.info('scanning task init success.')
def __call__(self, expiration: date = expiration_date, sites: list[str] = None):
# 先清空一下cache
self.logger.info(f'wake, prepare to work, now is {datetime.now()}')
logger.info(f'wake, prepare to work, now is {datetime.now()}')
cache = {}
self.logger.debug(f'clear cache -- {cache}')
logger.debug(f'clear cache -- {cache}')
# 从pb数据库中读取所有文章url
# 这里publish_time用int格式综合考虑下这个是最容易操作的模式虽然糙了点
existing_articles = self.pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}')
existing_articles = pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}')
all_title = {}
existings = []
for article in existing_articles:
@ -61,13 +59,13 @@ class ServiceProcesser:
if site in scraper_map:
futures.append(executor.submit(scraper_map[site], expiration, existings))
else:
futures.append(executor.submit(general_scraper, site, expiration, existings, self.logger))
futures.append(executor.submit(general_scraper, site, expiration, existings, logger))
concurrent.futures.wait(futures)
for future in futures:
try:
new_articles.extend(future.result())
except Exception as e:
self.logger.error(f'error when scraping-- {e}')
logger.error(f'error when scraping-- {e}')
for value in new_articles:
if not value:
@ -80,81 +78,81 @@ class ServiceProcesser:
value['content'] = f"({from_site} 报道){value['content']}"
value['images'] = json.dumps(value['images'])
article_id = self.pb.add(collection_name='articles', body=value)
article_id = pb.add(collection_name='articles', body=value)
if article_id:
cache[article_id] = value
all_title[value['title']] = article_id
else:
self.logger.warning(f'add article {value} failed, writing to cache_file')
logger.warning(f'add article {value} failed, writing to cache_file')
with open(os.path.join(self.cache_url, 'cache_articles.json'), 'a', encoding='utf-8') as f:
json.dump(value, f, ensure_ascii=False, indent=4)
if not cache:
self.logger.warning(f'no new articles. now is {datetime.now()}')
logger.warning(f'no new articles. now is {datetime.now()}')
return
# insight 流程
new_insights = get_insight(cache, all_title, logger=self.logger)
new_insights = get_insight(cache, all_title)
if new_insights:
for insight in new_insights:
if not insight['content']:
continue
insight_id = self.pb.add(collection_name='insights', body=insight)
insight_id = pb.add(collection_name='insights', body=insight)
if not insight_id:
self.logger.warning(f'write insight {insight} to pb failed, writing to cache_file')
logger.warning(f'write insight {insight} to pb failed, writing to cache_file')
with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', encoding='utf-8') as f:
json.dump(insight, f, ensure_ascii=False, indent=4)
for article_id in insight['articles']:
raw_article = self.pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"')
raw_article = pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"')
if not raw_article or not raw_article[0]:
self.logger.warning(f'get article {article_id} failed, skipping')
logger.warning(f'get article {article_id} failed, skipping')
continue
if raw_article[0]['translation_result']:
continue
if is_chinese(raw_article[0]['title']):
continue
translate_text = text_translate([raw_article[0]['title'], raw_article[0]['abstract']], target_language='zh', logger=self.logger)
translate_text = text_translate([raw_article[0]['title'], raw_article[0]['abstract']], target_language='zh', logger=logger)
if translate_text:
related_id = self.pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id})
related_id = pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id})
if not related_id:
self.logger.warning(f'write article_translation {article_id} failed')
logger.warning(f'write article_translation {article_id} failed')
else:
_ = self.pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id})
_ = pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id})
if not _:
self.logger.warning(f'update article {article_id} failed')
logger.warning(f'update article {article_id} failed')
else:
self.logger.warning(f'translate article {article_id} failed')
logger.warning(f'translate article {article_id} failed')
else:
# 尝试把所有文章的title作为insigts这是备选方案
if len(cache) < 25:
self.logger.info('generate_insights-warning: no insights and no more than 25 articles so use article title as insights')
logger.info('generate_insights-warning: no insights and no more than 25 articles so use article title as insights')
for key, value in cache.items():
if value['title']:
if is_chinese(value['title']):
text_for_insight = value['title']
else:
text_for_insight = text_translate([value['title']], logger=self.logger)
text_for_insight = text_translate([value['title']], logger=logger)
if text_for_insight:
insight_id = self.pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]})
insight_id = pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]})
if not insight_id:
self.logger.warning(f'write insight {text_for_insight[0]} to pb failed, writing to cache_file')
logger.warning(f'write insight {text_for_insight[0]} to pb failed, writing to cache_file')
with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a',
encoding='utf-8') as f:
json.dump({'content': text_for_insight[0], 'articles': [key]}, f, ensure_ascii=False, indent=4)
else:
self.logger.warning('generate_insights-error: can not generate insights, pls re-try')
self.logger.info(f'work done, now is {datetime.now()}')
logger.warning('generate_insights-error: can not generate insights, pls re-try')
logger.info(f'work done, now is {datetime.now()}')
if self.snap_short_server:
self.logger.info(f'now starting article snapshot with {self.snap_short_server}')
logger.info(f'now starting article snapshot with {self.snap_short_server}')
for key, value in cache.items():
if value['url']:
try:
snapshot = requests.get(f"{self.snap_short_server}/zip", {'url': value['url']}, timeout=60)
file = open(snapshot.text, 'rb')
_ = self.pb.upload('articles', key, 'snapshot', key, file)
_ = pb.upload('articles', key, 'snapshot', key, file)
file.close()
except Exception as e:
self.logger.warning(f'error when snapshot {value["url"]}, {e}')
self.logger.info(f'now snapshot done, now is {datetime.now()}')
logger.warning(f'error when snapshot {value["url"]}, {e}')
logger.info(f'now snapshot done, now is {datetime.now()}')