From 148e145365fefaf19e324eac76815f4e729e6127 Mon Sep 17 00:00:00 2001 From: bigbrother666 Date: Mon, 29 Apr 2024 23:06:17 +0800 Subject: [PATCH] use new logger --- client/backend/__init__.py | 91 ++++++++++++---------- client/backend/background_task.py | 23 +++++- client/backend/general_utils.py | 18 +++++ client/backend/get_logger.py | 65 ---------------- client/backend/get_search.py | 15 ++-- client/backend/pb_api.py | 10 +-- client/backend/requirements.txt | 3 +- client/backend/scrapers/general_scraper.py | 17 ++-- client/backend/scrapers/simple_crawler.py | 9 +-- client/backend/work_process.py | 31 ++++---- 10 files changed, 122 insertions(+), 160 deletions(-) delete mode 100644 client/backend/get_logger.py diff --git a/client/backend/__init__.py b/client/backend/__init__.py index d0fb691..d582270 100644 --- a/client/backend/__init__.py +++ b/client/backend/__init__.py @@ -2,27 +2,40 @@ import os import time import json import uuid -from get_logger import get_logger -from pb_api import pb +from pb_api import PbTalker from get_report import get_report from get_search import search_insight from tranlsation_volcengine import text_translate +from general_utils import get_logger_level +from loguru import logger + +project_dir = os.environ.get("PROJECT_DIR", "") +os.makedirs(project_dir, exist_ok=True) +logger_file = os.path.join(project_dir, 'backend_service.log') +dsw_log = get_logger_level() + +logger.add( + logger_file, + level=dsw_log, + backtrace=True, + diagnose=True, + rotation="50 MB" +) +pb = PbTalker(logger) class BackendService: - def __init__(self, name: str = 'backend_server'): - self.name = name + def __init__(self): self.project_dir = os.environ.get("PROJECT_DIR", "") # 1. base initialization - self.cache_url = os.path.join(self.project_dir, name) + self.cache_url = os.path.join(self.project_dir, 'backend_service') os.makedirs(self.cache_url, exist_ok=True) - self.logger = get_logger(name=self.name, file=os.path.join(self.project_dir, f'{self.name}.log')) # 2. load the llm # self.llm = LocalLlmWrapper() self.memory = {} # self.scholar = Scholar(initial_file_dir=os.path.join(self.project_dir, "files"), use_gpu=use_gpu) - self.logger.info(f'{self.name} init success.') + logger.info('backend service init success.') def report(self, insight_id: str, topics: list[str], comment: str) -> dict: """ @@ -31,15 +44,15 @@ class BackendService: :param comment: 修改意见,可以传‘’ :return: 成功的话返回更新后的insight_id(其实跟原id一样), 不成功返回空字符 """ - self.logger.debug(f'got new report request insight_id {insight_id}') + logger.debug(f'got new report request insight_id {insight_id}') insight = pb.read('insights', filter=f'id="{insight_id}"') if not insight: - self.logger.error(f'insight {insight_id} not found') + logger.error(f'insight {insight_id} not found') return self.build_out(-2, 'insight not found') article_ids = insight[0]['articles'] if not article_ids: - self.logger.error(f'insight {insight_id} has no articles') + logger.error(f'insight {insight_id} has no articles') return self.build_out(-2, 'can not find articles for insight') article_list = [pb.read('articles', fields=['title', 'abstract', 'content', 'url', 'publish_time'], filter=f'id="{_id}"') @@ -47,7 +60,7 @@ class BackendService: article_list = [_article[0] for _article in article_list if _article] if not article_list: - self.logger.debug(f'{insight_id} has no valid articles') + logger.debug(f'{insight_id} has no valid articles') return self.build_out(-2, f'{insight_id} has no valid articles') content = insight[0]['content'] @@ -59,7 +72,7 @@ class BackendService: memory = '' docx_file = os.path.join(self.cache_url, f'{insight_id}_{uuid.uuid4()}.docx') - flag, memory = get_report(content, article_list, memory, topics, comment, docx_file, logger=self.logger) + flag, memory = get_report(content, article_list, memory, topics, comment, docx_file, logger=logger) self.memory[insight_id] = memory if flag: @@ -67,13 +80,13 @@ class BackendService: message = pb.upload('insights', insight_id, 'docx', f'{insight_id}.docx', file) file.close() if message: - self.logger.debug(f'report success finish and update to pb-{message}') + logger.debug(f'report success finish and update to pb-{message}') return self.build_out(11, message) else: - self.logger.error(f'{insight_id} report generate successfully, however failed to update to pb.') + logger.error(f'{insight_id} report generate successfully, however failed to update to pb.') return self.build_out(-2, 'report generate successfully, however failed to update to pb.') else: - self.logger.error(f'{insight_id} failed to generate report, finish.') + logger.error(f'{insight_id} failed to generate report, finish.') return self.build_out(-11, 'report generate failed.') def build_out(self, flag: int, answer: str = "") -> dict: @@ -87,7 +100,7 @@ class BackendService: 这个函数的作用是遍历列表中的id, 如果对应article——id中没有translation_result,则触发翻译,并更新article——id记录 执行本函数后,如果收到flag 11,则可以再次从pb中请求article-id对应的translation_result """ - self.logger.debug(f'got new translate task {article_ids}') + logger.debug(f'got new translate task {article_ids}') flag = 11 msg = '' key_cache = [] @@ -96,12 +109,12 @@ class BackendService: for article_id in article_ids: raw_article = pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"') if not raw_article or not raw_article[0]: - self.logger.warning(f'get article {article_id} failed, skipping') + logger.warning(f'get article {article_id} failed, skipping') flag = -2 msg += f'get article {article_id} failed, skipping\n' continue if raw_article[0]['translation_result']: - self.logger.debug(f'{article_id} translation_result already exist, skipping') + logger.debug(f'{article_id} translation_result already exist, skipping') continue key_cache.append(article_id) @@ -111,21 +124,21 @@ class BackendService: if len(en_texts) < 16: continue - self.logger.debug(f'translate process - batch {k}') - translate_result = text_translate(en_texts, logger=self.logger) + logger.debug(f'translate process - batch {k}') + translate_result = text_translate(en_texts, logger=logger) if translate_result and len(translate_result) == 2*len(key_cache): for i in range(0, len(translate_result), 2): related_id = pb.add(collection_name='article_translation', body={'title': translate_result[i], 'abstract': translate_result[i+1], 'raw': key_cache[int(i/2)]}) if not related_id: - self.logger.warning(f'write article_translation {key_cache[int(i/2)]} failed') + logger.warning(f'write article_translation {key_cache[int(i/2)]} failed') else: _ = pb.update(collection_name='articles', id=key_cache[int(i/2)], body={'translation_result': related_id}) if not _: - self.logger.warning(f'update article {key_cache[int(i/2)]} failed') - self.logger.debug('done') + logger.warning(f'update article {key_cache[int(i/2)]} failed') + logger.debug('done') else: flag = -6 - self.logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}') + logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}') msg += f'failed to batch {key_cache}' en_texts = [] @@ -134,27 +147,27 @@ class BackendService: # 10次停1s,避免qps超载 k += 1 if k % 10 == 0: - self.logger.debug('max token limited - sleep 1s') + logger.debug('max token limited - sleep 1s') time.sleep(1) if en_texts: - self.logger.debug(f'translate process - batch {k}') - translate_result = text_translate(en_texts, logger=self.logger) + logger.debug(f'translate process - batch {k}') + translate_result = text_translate(en_texts, logger=logger) if translate_result and len(translate_result) == 2*len(key_cache): for i in range(0, len(translate_result), 2): related_id = pb.add(collection_name='article_translation', body={'title': translate_result[i], 'abstract': translate_result[i+1], 'raw': key_cache[int(i/2)]}) if not related_id: - self.logger.warning(f'write article_translation {key_cache[int(i/2)]} failed') + logger.warning(f'write article_translation {key_cache[int(i/2)]} failed') else: _ = pb.update(collection_name='articles', id=key_cache[int(i/2)], body={'translation_result': related_id}) if not _: - self.logger.warning(f'update article {key_cache[int(i/2)]} failed') - self.logger.debug('done') + logger.warning(f'update article {key_cache[int(i/2)]} failed') + logger.debug('done') else: - self.logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}') + logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}') msg += f'failed to batch {key_cache}' flag = -6 - self.logger.debug('translation job done.') + logger.debug('translation job done.') return self.build_out(flag, msg) def more_search(self, insight_id: str) -> dict: @@ -162,10 +175,10 @@ class BackendService: :param insight_id: insight在pb中的id :return: 成功的话返回更新后的insight_id(其实跟原id一样), 不成功返回空字符 """ - self.logger.debug(f'got search request for insight: {insight_id}') + logger.debug(f'got search request for insight: {insight_id}') insight = pb.read('insights', filter=f'id="{insight_id}"') if not insight: - self.logger.error(f'insight {insight_id} not found') + logger.error(f'insight {insight_id} not found') return self.build_out(-2, 'insight not found') article_ids = insight[0]['articles'] @@ -175,9 +188,9 @@ class BackendService: else: url_list = [] - flag, search_result = search_insight(insight[0]['content'], url_list, logger=self.logger) + flag, search_result = search_insight(insight[0]['content'], logger, url_list) if flag <= 0: - self.logger.debug('no search result, nothing happen') + logger.debug('no search result, nothing happen') return self.build_out(flag, 'search engine error or no result') for item in search_result: @@ -185,14 +198,14 @@ class BackendService: if new_article_id: article_ids.append(new_article_id) else: - self.logger.warning(f'add article {item} failed, writing to cache_file') + logger.warning(f'add article {item} failed, writing to cache_file') with open(os.path.join(self.cache_url, 'cache_articles.json'), 'a', encoding='utf-8') as f: json.dump(item, f, ensure_ascii=False, indent=4) message = pb.update(collection_name='insights', id=insight_id, body={'articles': article_ids}) if message: - self.logger.debug(f'insight search success finish and update to pb-{message}') + logger.debug(f'insight search success finish and update to pb-{message}') return self.build_out(11, insight_id) else: - self.logger.error(f'{insight_id} search success, however failed to update to pb.') + logger.error(f'{insight_id} search success, however failed to update to pb.') return self.build_out(-2, 'search success, however failed to update to pb.') diff --git a/client/backend/background_task.py b/client/backend/background_task.py index 48bfa2e..832328f 100644 --- a/client/backend/background_task.py +++ b/client/backend/background_task.py @@ -3,10 +3,27 @@ """ import schedule import time +import os from work_process import ServiceProcesser -from pb_api import pb +from general_utils import get_logger_level +from loguru import logger +from pb_api import PbTalker -sp = ServiceProcesser() +project_dir = os.environ.get("PROJECT_DIR", "") +os.makedirs(project_dir, exist_ok=True) +logger_file = os.path.join(project_dir, 'scanning_task.log') +dsw_log = get_logger_level() + +logger.add( + logger_file, + level=dsw_log, + backtrace=True, + diagnose=True, + rotation="50 MB" +) +pb = PbTalker(logger) + +sp = ServiceProcesser(pb=pb, logger=logger) counter = 0 @@ -31,8 +48,10 @@ def task(): print('\033[0;33mno work for this loop\033[0m') counter += 1 + schedule.every().hour.at(":38").do(task) +task() while True: schedule.run_pending() time.sleep(60) diff --git a/client/backend/general_utils.py b/client/backend/general_utils.py index 51c1377..e5e03aa 100644 --- a/client/backend/general_utils.py +++ b/client/backend/general_utils.py @@ -144,6 +144,24 @@ def extract_and_convert_dates(input_string): return ''.join(matches[0]) return None + +def get_logger_level() -> str: + level_map = { + 'silly': 'CRITICAL', + 'verbose': 'DEBUG', + 'info': 'INFO', + 'warn': 'WARNING', + 'error': 'ERROR', + } + level: str = os.environ.get('WS_LOG', 'info').lower() + if level not in level_map: + raise ValueError( + 'WiseFlow LOG should support the values of `silly`, ' + '`verbose`, `info`, `warn`, `error`' + ) + return level_map.get(level, 'info') + + """ # from InternLM/huixiangdou # another awsome work diff --git a/client/backend/get_logger.py b/client/backend/get_logger.py deleted file mode 100644 index e5655c6..0000000 --- a/client/backend/get_logger.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import annotations -import logging -import os -from typing import Optional - - -def _get_logger_level() -> str: - """ - refer to : https://docs.python.org/3/library/logging.html#logging-levels - """ - level_map = { - 'silly': 'CRITICAL', - 'verbose': 'DEBUG', - 'info': 'INFO', - 'warn': 'WARNING', - 'error': 'ERROR', - 'silent': 'NOTSET' - } - level: str = os.environ.get('WS_LOG', 'info').lower() - if level not in level_map: - raise ValueError( - 'WiseFlow LOG should support the values of `silly`, ' - '`verbose`, `info`, `warn`, `error`, `silent`' - ) - return level_map.get(level, 'info') - - -def get_logger(name: Optional[str] = None, file: Optional[str] = None) -> logging.Logger: - """get the logger object - Args: - name (str): the logger name - file (Optional[str], optional): the log file name. Defaults to None. - Examples: - logger = get_logger("WechatyPuppet") - logger = get_logger("WechatyPuppet", file="wechaty-puppet.log") - logger.info('log info ...') - Returns: - logging.Logger: the logger object - """ - dsw_log = _get_logger_level() - - log_formatter = logging.Formatter( - fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - - # create logger and set level to debug - logger = logging.getLogger(name) - logger.handlers = [] - logger.setLevel(dsw_log) - logger.propagate = False - - # create file handler and set level to debug - # file = os.environ.get(DSW_LOG_KEY, file) - if file: - file_handler = logging.FileHandler(file, 'a', encoding='utf-8') - file_handler.setLevel(dsw_log) - file_handler.setFormatter(log_formatter) - logger.addHandler(file_handler) - - # create console handler and set level to info - console_handler = logging.StreamHandler() - console_handler.setLevel(dsw_log) - console_handler.setFormatter(log_formatter) - logger.addHandler(console_handler) - - return logger diff --git a/client/backend/get_search.py b/client/backend/get_search.py index 9f21507..7b2c19b 100644 --- a/client/backend/get_search.py +++ b/client/backend/get_search.py @@ -11,7 +11,7 @@ from bs4 import BeautifulSoup # 国内的应用场景,sogou搜索应该不错了,还支持weixin、百科搜索 # 海外的应用场景可以考虑使用duckduckgo或者google_search的sdk # 尽量还是不要自己host一个搜索引擎吧,虽然有类似https://github.com/StractOrg/stract/tree/main的开源方案,但毕竟这是两套工程 -def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge: bool = False, logger=None) -> (int, list): +def search_insight(keyword: str, logger, exist_urls: list[Union[str, Path]], knowledge: bool = False) -> (int, list): """ 搜索网页 :param keyword: 要搜索的主题 @@ -48,10 +48,8 @@ def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge: if href_f not in exist_urls: relist.append(href_f) except Exception as e: - if logger: - logger.error(f"search {url} error: {e}") - else: - print(f"search {url} error: {e}") + logger.error(f"search {url} error: {e}") + if not knowledge: url = f"https://www.sogou.com/sogou?ie=utf8&p=40230447&interation=1728053249&interV=&pid=sogou-wsse-7050094b04fd9aa3&query={keyword}" try: @@ -71,10 +69,7 @@ def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge: if href_f not in exist_urls: relist.append(href_f) except Exception as e: - if logger: - logger.error(f"search {url} error: {e}") - else: - print(f"search {url} error: {e}") + logger.error(f"search {url} error: {e}") if not relist: return -7, [] @@ -85,7 +80,7 @@ def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge: if url in exist_urls: continue exist_urls.append(url) - flag, value = simple_crawler(url) + flag, value = simple_crawler(url, logger) if flag != 11: continue from_site = urlparse(url).netloc diff --git a/client/backend/pb_api.py b/client/backend/pb_api.py index 981ddc3..2c6fac8 100644 --- a/client/backend/pb_api.py +++ b/client/backend/pb_api.py @@ -2,16 +2,13 @@ import os from pocketbase import PocketBase # Client also works the same from pocketbase.client import FileUpload from typing import BinaryIO -from get_logger import get_logger class PbTalker: - def __init__(self) -> None: - self.project_dir = os.environ.get("PROJECT_DIR", "") + def __init__(self, logger) -> None: # 1. base initialization - os.makedirs(self.project_dir, exist_ok=True) - self.logger = get_logger(name='pb_talker', file=os.path.join(self.project_dir, 'pb_talker.log')) url = f"http://{os.environ.get('PB_API_BASE', '127.0.0.1:8090')}" + self.logger = logger self.logger.debug(f"initializing pocketbase client: {url}") self.client = PocketBase(url) auth = os.environ.get('PB_API_AUTH', '') @@ -82,6 +79,3 @@ class PbTalker: self.logger.error(f"pocketbase update failed: {e}") return '' return res.id - - -pb = PbTalker() diff --git a/client/backend/requirements.txt b/client/backend/requirements.txt index da55fb9..6a3baf6 100644 --- a/client/backend/requirements.txt +++ b/client/backend/requirements.txt @@ -14,4 +14,5 @@ faiss-cpu # for cpu-only environment pocketbase==0.10.0 gne chardet -schedule \ No newline at end of file +schedule +loguru \ No newline at end of file diff --git a/client/backend/scrapers/general_scraper.py b/client/backend/scrapers/general_scraper.py index 9fc226d..e753929 100644 --- a/client/backend/scrapers/general_scraper.py +++ b/client/backend/scrapers/general_scraper.py @@ -10,15 +10,10 @@ from datetime import datetime, date from requests.compat import urljoin import chardet from general_utils import extract_and_convert_dates -from get_logger import get_logger -import os header = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'} -project_dir = os.environ.get("PROJECT_DIR", "") -os.makedirs(project_dir, exist_ok=True) -logger = get_logger(name='general_scraper', file=os.path.join(project_dir, f'general_scraper.log')) def tag_visible(element: Comment) -> bool: @@ -75,7 +70,7 @@ sys_info = '''你是一个html网页解析器,你将接收一段用户从网 ''' -def llm_crawler(url: str | Path) -> (int, dict): +def llm_crawler(url: str | Path, logger) -> (int, dict): """ 返回文章信息dict和flag,负数为报错,0为没有结果,11为成功 参考:https://mp.weixin.qq.com/s/4J-kofsfFDiV1FxGlTJLfA @@ -152,7 +147,7 @@ def llm_crawler(url: str | Path) -> (int, dict): return 11, info -def general_scraper(site: str, expiration: date, existing: list[str]) -> list[dict]: +def general_scraper(site: str, expiration: date, existing: list[str], logger) -> list[dict]: try: with httpx.Client() as client: response = client.get(site, headers=header, timeout=30) @@ -171,9 +166,9 @@ def general_scraper(site: str, expiration: date, existing: list[str]) -> list[di if site in existing: logger.debug(f"{site} has been crawled before, skip it") return [] - flag, result = simple_crawler(site) + flag, result = simple_crawler(site, logger) if flag != 11: - flag, result = llm_crawler(site) + flag, result = llm_crawler(site, logger) if flag != 11: return [] publish_date = datetime.strptime(result['publish_time'], '%Y%m%d') @@ -189,9 +184,9 @@ def general_scraper(site: str, expiration: date, existing: list[str]) -> list[di logger.debug(f"{url} has been crawled before, skip it") continue existing.append(url) - flag, result = simple_crawler(url) + flag, result = simple_crawler(url, logger) if flag != 11: - flag, result = llm_crawler(url) + flag, result = llm_crawler(url, logger) if flag != 11: continue publish_date = datetime.strptime(result['publish_time'], '%Y%m%d') diff --git a/client/backend/scrapers/simple_crawler.py b/client/backend/scrapers/simple_crawler.py index 8baef82..3f6593d 100644 --- a/client/backend/scrapers/simple_crawler.py +++ b/client/backend/scrapers/simple_crawler.py @@ -5,19 +5,14 @@ from datetime import datetime from pathlib import Path from general_utils import extract_and_convert_dates import chardet -from get_logger import get_logger -import os + extractor = GeneralNewsExtractor() header = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'} -project_dir = os.environ.get("PROJECT_DIR", "") -os.makedirs(project_dir, exist_ok=True) -logger = get_logger(name='simple_crawler', file=os.path.join(project_dir, f'simple_crawler.log')) - -def simple_crawler(url: str | Path) -> (int, dict): +def simple_crawler(url: str | Path, logger) -> (int, dict): """ 返回文章信息dict和flag,负数为报错,0为没有结果,11为成功 """ diff --git a/client/backend/work_process.py b/client/backend/work_process.py index f51dbc1..18bf939 100644 --- a/client/backend/work_process.py +++ b/client/backend/work_process.py @@ -1,11 +1,9 @@ import os import json import requests -from get_logger import get_logger from datetime import datetime, timedelta, date from scrapers import scraper_map from scrapers.general_scraper import general_scraper -from pb_api import pb from urllib.parse import urlparse from get_insight import get_insight from general_utils import is_chinese @@ -20,14 +18,13 @@ expiration_str = expiration_date.strftime("%Y%m%d") class ServiceProcesser: - def __init__(self, name: str = 'Work_Processor', record_snapshot: bool = False): - self.name = name + def __init__(self, pb, logger, record_snapshot: bool = False): self.project_dir = os.environ.get("PROJECT_DIR", "") # 1. base initialization - self.cache_url = os.path.join(self.project_dir, name) + self.cache_url = os.path.join(self.project_dir, 'scanning_task') os.makedirs(self.cache_url, exist_ok=True) - self.logger = get_logger(name=self.name, file=os.path.join(self.project_dir, f'{self.name}.log')) - + self.pb = pb + self.logger = logger # 2. load the llm # self.llm = LocalLlmWrapper() # if you use the local-llm @@ -39,7 +36,7 @@ class ServiceProcesser: else: self.snap_short_server = None - self.logger.info(f'{self.name} init success.') + self.logger.info('scanning task init success.') def __call__(self, expiration: date = expiration_date, sites: list[str] = None): # 先清空一下cache @@ -48,7 +45,7 @@ class ServiceProcesser: self.logger.debug(f'clear cache -- {cache}') # 从pb数据库中读取所有文章url # 这里publish_time用int格式,综合考虑下这个是最容易操作的模式,虽然糙了点 - existing_articles = pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}') + existing_articles = self.pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}') all_title = {} existings = [] for article in existing_articles: @@ -64,7 +61,7 @@ class ServiceProcesser: if site in scraper_map: futures.append(executor.submit(scraper_map[site], expiration, existings)) else: - futures.append(executor.submit(general_scraper, site, expiration, existings)) + futures.append(executor.submit(general_scraper, site, expiration, existings, self.logger)) concurrent.futures.wait(futures) for future in futures: try: @@ -83,7 +80,7 @@ class ServiceProcesser: value['content'] = f"({from_site} 报道){value['content']}" value['images'] = json.dumps(value['images']) - article_id = pb.add(collection_name='articles', body=value) + article_id = self.pb.add(collection_name='articles', body=value) if article_id: cache[article_id] = value @@ -103,13 +100,13 @@ class ServiceProcesser: for insight in new_insights: if not insight['content']: continue - insight_id = pb.add(collection_name='insights', body=insight) + insight_id = self.pb.add(collection_name='insights', body=insight) if not insight_id: self.logger.warning(f'write insight {insight} to pb failed, writing to cache_file') with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', encoding='utf-8') as f: json.dump(insight, f, ensure_ascii=False, indent=4) for article_id in insight['articles']: - raw_article = pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"') + raw_article = self.pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"') if not raw_article or not raw_article[0]: self.logger.warning(f'get article {article_id} failed, skipping') continue @@ -119,11 +116,11 @@ class ServiceProcesser: continue translate_text = text_translate([raw_article[0]['title'], raw_article[0]['abstract']], target_language='zh', logger=self.logger) if translate_text: - related_id = pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id}) + related_id = self.pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id}) if not related_id: self.logger.warning(f'write article_translation {article_id} failed') else: - _ = pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id}) + _ = self.pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id}) if not _: self.logger.warning(f'update article {article_id} failed') else: @@ -139,7 +136,7 @@ class ServiceProcesser: else: text_for_insight = text_translate([value['title']], logger=self.logger) if text_for_insight: - insight_id = pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]}) + insight_id = self.pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]}) if not insight_id: self.logger.warning(f'write insight {text_for_insight[0]} to pb failed, writing to cache_file') with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', @@ -156,7 +153,7 @@ class ServiceProcesser: try: snapshot = requests.get(f"{self.snap_short_server}/zip", {'url': value['url']}, timeout=60) file = open(snapshot.text, 'rb') - _ = pb.upload('articles', key, 'snapshot', key, file) + _ = self.pb.upload('articles', key, 'snapshot', key, file) file.close() except Exception as e: self.logger.warning(f'error when snapshot {value["url"]}, {e}')