use new logger

This commit is contained in:
bigbrother666 2024-04-29 23:06:17 +08:00
parent 350e4d6426
commit 148e145365
10 changed files with 122 additions and 160 deletions

View File

@ -2,27 +2,40 @@ import os
import time import time
import json import json
import uuid import uuid
from get_logger import get_logger from pb_api import PbTalker
from pb_api import pb
from get_report import get_report from get_report import get_report
from get_search import search_insight from get_search import search_insight
from tranlsation_volcengine import text_translate from tranlsation_volcengine import text_translate
from general_utils import get_logger_level
from loguru import logger
project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger_file = os.path.join(project_dir, 'backend_service.log')
dsw_log = get_logger_level()
logger.add(
logger_file,
level=dsw_log,
backtrace=True,
diagnose=True,
rotation="50 MB"
)
pb = PbTalker(logger)
class BackendService: class BackendService:
def __init__(self, name: str = 'backend_server'): def __init__(self):
self.name = name
self.project_dir = os.environ.get("PROJECT_DIR", "") self.project_dir = os.environ.get("PROJECT_DIR", "")
# 1. base initialization # 1. base initialization
self.cache_url = os.path.join(self.project_dir, name) self.cache_url = os.path.join(self.project_dir, 'backend_service')
os.makedirs(self.cache_url, exist_ok=True) os.makedirs(self.cache_url, exist_ok=True)
self.logger = get_logger(name=self.name, file=os.path.join(self.project_dir, f'{self.name}.log'))
# 2. load the llm # 2. load the llm
# self.llm = LocalLlmWrapper() # self.llm = LocalLlmWrapper()
self.memory = {} self.memory = {}
# self.scholar = Scholar(initial_file_dir=os.path.join(self.project_dir, "files"), use_gpu=use_gpu) # self.scholar = Scholar(initial_file_dir=os.path.join(self.project_dir, "files"), use_gpu=use_gpu)
self.logger.info(f'{self.name} init success.') logger.info('backend service init success.')
def report(self, insight_id: str, topics: list[str], comment: str) -> dict: def report(self, insight_id: str, topics: list[str], comment: str) -> dict:
""" """
@ -31,15 +44,15 @@ class BackendService:
:param comment: 修改意见可以传 :param comment: 修改意见可以传
:return: 成功的话返回更新后的insight_id其实跟原id一样, 不成功返回空字符 :return: 成功的话返回更新后的insight_id其实跟原id一样, 不成功返回空字符
""" """
self.logger.debug(f'got new report request insight_id {insight_id}') logger.debug(f'got new report request insight_id {insight_id}')
insight = pb.read('insights', filter=f'id="{insight_id}"') insight = pb.read('insights', filter=f'id="{insight_id}"')
if not insight: if not insight:
self.logger.error(f'insight {insight_id} not found') logger.error(f'insight {insight_id} not found')
return self.build_out(-2, 'insight not found') return self.build_out(-2, 'insight not found')
article_ids = insight[0]['articles'] article_ids = insight[0]['articles']
if not article_ids: if not article_ids:
self.logger.error(f'insight {insight_id} has no articles') logger.error(f'insight {insight_id} has no articles')
return self.build_out(-2, 'can not find articles for insight') return self.build_out(-2, 'can not find articles for insight')
article_list = [pb.read('articles', fields=['title', 'abstract', 'content', 'url', 'publish_time'], filter=f'id="{_id}"') article_list = [pb.read('articles', fields=['title', 'abstract', 'content', 'url', 'publish_time'], filter=f'id="{_id}"')
@ -47,7 +60,7 @@ class BackendService:
article_list = [_article[0] for _article in article_list if _article] article_list = [_article[0] for _article in article_list if _article]
if not article_list: if not article_list:
self.logger.debug(f'{insight_id} has no valid articles') logger.debug(f'{insight_id} has no valid articles')
return self.build_out(-2, f'{insight_id} has no valid articles') return self.build_out(-2, f'{insight_id} has no valid articles')
content = insight[0]['content'] content = insight[0]['content']
@ -59,7 +72,7 @@ class BackendService:
memory = '' memory = ''
docx_file = os.path.join(self.cache_url, f'{insight_id}_{uuid.uuid4()}.docx') docx_file = os.path.join(self.cache_url, f'{insight_id}_{uuid.uuid4()}.docx')
flag, memory = get_report(content, article_list, memory, topics, comment, docx_file, logger=self.logger) flag, memory = get_report(content, article_list, memory, topics, comment, docx_file, logger=logger)
self.memory[insight_id] = memory self.memory[insight_id] = memory
if flag: if flag:
@ -67,13 +80,13 @@ class BackendService:
message = pb.upload('insights', insight_id, 'docx', f'{insight_id}.docx', file) message = pb.upload('insights', insight_id, 'docx', f'{insight_id}.docx', file)
file.close() file.close()
if message: if message:
self.logger.debug(f'report success finish and update to pb-{message}') logger.debug(f'report success finish and update to pb-{message}')
return self.build_out(11, message) return self.build_out(11, message)
else: else:
self.logger.error(f'{insight_id} report generate successfully, however failed to update to pb.') logger.error(f'{insight_id} report generate successfully, however failed to update to pb.')
return self.build_out(-2, 'report generate successfully, however failed to update to pb.') return self.build_out(-2, 'report generate successfully, however failed to update to pb.')
else: else:
self.logger.error(f'{insight_id} failed to generate report, finish.') logger.error(f'{insight_id} failed to generate report, finish.')
return self.build_out(-11, 'report generate failed.') return self.build_out(-11, 'report generate failed.')
def build_out(self, flag: int, answer: str = "") -> dict: def build_out(self, flag: int, answer: str = "") -> dict:
@ -87,7 +100,7 @@ class BackendService:
这个函数的作用是遍历列表中的id 如果对应articleid中没有translation_result则触发翻译并更新articleid记录 这个函数的作用是遍历列表中的id 如果对应articleid中没有translation_result则触发翻译并更新articleid记录
执行本函数后如果收到flag 11则可以再次从pb中请求article-id对应的translation_result 执行本函数后如果收到flag 11则可以再次从pb中请求article-id对应的translation_result
""" """
self.logger.debug(f'got new translate task {article_ids}') logger.debug(f'got new translate task {article_ids}')
flag = 11 flag = 11
msg = '' msg = ''
key_cache = [] key_cache = []
@ -96,12 +109,12 @@ class BackendService:
for article_id in article_ids: for article_id in article_ids:
raw_article = pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"') raw_article = pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"')
if not raw_article or not raw_article[0]: if not raw_article or not raw_article[0]:
self.logger.warning(f'get article {article_id} failed, skipping') logger.warning(f'get article {article_id} failed, skipping')
flag = -2 flag = -2
msg += f'get article {article_id} failed, skipping\n' msg += f'get article {article_id} failed, skipping\n'
continue continue
if raw_article[0]['translation_result']: if raw_article[0]['translation_result']:
self.logger.debug(f'{article_id} translation_result already exist, skipping') logger.debug(f'{article_id} translation_result already exist, skipping')
continue continue
key_cache.append(article_id) key_cache.append(article_id)
@ -111,21 +124,21 @@ class BackendService:
if len(en_texts) < 16: if len(en_texts) < 16:
continue continue
self.logger.debug(f'translate process - batch {k}') logger.debug(f'translate process - batch {k}')
translate_result = text_translate(en_texts, logger=self.logger) translate_result = text_translate(en_texts, logger=logger)
if translate_result and len(translate_result) == 2*len(key_cache): if translate_result and len(translate_result) == 2*len(key_cache):
for i in range(0, len(translate_result), 2): for i in range(0, len(translate_result), 2):
related_id = pb.add(collection_name='article_translation', body={'title': translate_result[i], 'abstract': translate_result[i+1], 'raw': key_cache[int(i/2)]}) related_id = pb.add(collection_name='article_translation', body={'title': translate_result[i], 'abstract': translate_result[i+1], 'raw': key_cache[int(i/2)]})
if not related_id: if not related_id:
self.logger.warning(f'write article_translation {key_cache[int(i/2)]} failed') logger.warning(f'write article_translation {key_cache[int(i/2)]} failed')
else: else:
_ = pb.update(collection_name='articles', id=key_cache[int(i/2)], body={'translation_result': related_id}) _ = pb.update(collection_name='articles', id=key_cache[int(i/2)], body={'translation_result': related_id})
if not _: if not _:
self.logger.warning(f'update article {key_cache[int(i/2)]} failed') logger.warning(f'update article {key_cache[int(i/2)]} failed')
self.logger.debug('done') logger.debug('done')
else: else:
flag = -6 flag = -6
self.logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}') logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}')
msg += f'failed to batch {key_cache}' msg += f'failed to batch {key_cache}'
en_texts = [] en_texts = []
@ -134,27 +147,27 @@ class BackendService:
# 10次停1s避免qps超载 # 10次停1s避免qps超载
k += 1 k += 1
if k % 10 == 0: if k % 10 == 0:
self.logger.debug('max token limited - sleep 1s') logger.debug('max token limited - sleep 1s')
time.sleep(1) time.sleep(1)
if en_texts: if en_texts:
self.logger.debug(f'translate process - batch {k}') logger.debug(f'translate process - batch {k}')
translate_result = text_translate(en_texts, logger=self.logger) translate_result = text_translate(en_texts, logger=logger)
if translate_result and len(translate_result) == 2*len(key_cache): if translate_result and len(translate_result) == 2*len(key_cache):
for i in range(0, len(translate_result), 2): for i in range(0, len(translate_result), 2):
related_id = pb.add(collection_name='article_translation', body={'title': translate_result[i], 'abstract': translate_result[i+1], 'raw': key_cache[int(i/2)]}) related_id = pb.add(collection_name='article_translation', body={'title': translate_result[i], 'abstract': translate_result[i+1], 'raw': key_cache[int(i/2)]})
if not related_id: if not related_id:
self.logger.warning(f'write article_translation {key_cache[int(i/2)]} failed') logger.warning(f'write article_translation {key_cache[int(i/2)]} failed')
else: else:
_ = pb.update(collection_name='articles', id=key_cache[int(i/2)], body={'translation_result': related_id}) _ = pb.update(collection_name='articles', id=key_cache[int(i/2)], body={'translation_result': related_id})
if not _: if not _:
self.logger.warning(f'update article {key_cache[int(i/2)]} failed') logger.warning(f'update article {key_cache[int(i/2)]} failed')
self.logger.debug('done') logger.debug('done')
else: else:
self.logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}') logger.warning(f'translate process - api out of service, can not continue job, aborting batch {key_cache}')
msg += f'failed to batch {key_cache}' msg += f'failed to batch {key_cache}'
flag = -6 flag = -6
self.logger.debug('translation job done.') logger.debug('translation job done.')
return self.build_out(flag, msg) return self.build_out(flag, msg)
def more_search(self, insight_id: str) -> dict: def more_search(self, insight_id: str) -> dict:
@ -162,10 +175,10 @@ class BackendService:
:param insight_id: insight在pb中的id :param insight_id: insight在pb中的id
:return: 成功的话返回更新后的insight_id其实跟原id一样, 不成功返回空字符 :return: 成功的话返回更新后的insight_id其实跟原id一样, 不成功返回空字符
""" """
self.logger.debug(f'got search request for insight {insight_id}') logger.debug(f'got search request for insight {insight_id}')
insight = pb.read('insights', filter=f'id="{insight_id}"') insight = pb.read('insights', filter=f'id="{insight_id}"')
if not insight: if not insight:
self.logger.error(f'insight {insight_id} not found') logger.error(f'insight {insight_id} not found')
return self.build_out(-2, 'insight not found') return self.build_out(-2, 'insight not found')
article_ids = insight[0]['articles'] article_ids = insight[0]['articles']
@ -175,9 +188,9 @@ class BackendService:
else: else:
url_list = [] url_list = []
flag, search_result = search_insight(insight[0]['content'], url_list, logger=self.logger) flag, search_result = search_insight(insight[0]['content'], logger, url_list)
if flag <= 0: if flag <= 0:
self.logger.debug('no search result, nothing happen') logger.debug('no search result, nothing happen')
return self.build_out(flag, 'search engine error or no result') return self.build_out(flag, 'search engine error or no result')
for item in search_result: for item in search_result:
@ -185,14 +198,14 @@ class BackendService:
if new_article_id: if new_article_id:
article_ids.append(new_article_id) article_ids.append(new_article_id)
else: else:
self.logger.warning(f'add article {item} failed, writing to cache_file') logger.warning(f'add article {item} failed, writing to cache_file')
with open(os.path.join(self.cache_url, 'cache_articles.json'), 'a', encoding='utf-8') as f: with open(os.path.join(self.cache_url, 'cache_articles.json'), 'a', encoding='utf-8') as f:
json.dump(item, f, ensure_ascii=False, indent=4) json.dump(item, f, ensure_ascii=False, indent=4)
message = pb.update(collection_name='insights', id=insight_id, body={'articles': article_ids}) message = pb.update(collection_name='insights', id=insight_id, body={'articles': article_ids})
if message: if message:
self.logger.debug(f'insight search success finish and update to pb-{message}') logger.debug(f'insight search success finish and update to pb-{message}')
return self.build_out(11, insight_id) return self.build_out(11, insight_id)
else: else:
self.logger.error(f'{insight_id} search success, however failed to update to pb.') logger.error(f'{insight_id} search success, however failed to update to pb.')
return self.build_out(-2, 'search success, however failed to update to pb.') return self.build_out(-2, 'search success, however failed to update to pb.')

View File

@ -3,10 +3,27 @@
""" """
import schedule import schedule
import time import time
import os
from work_process import ServiceProcesser from work_process import ServiceProcesser
from pb_api import pb from general_utils import get_logger_level
from loguru import logger
from pb_api import PbTalker
sp = ServiceProcesser() project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger_file = os.path.join(project_dir, 'scanning_task.log')
dsw_log = get_logger_level()
logger.add(
logger_file,
level=dsw_log,
backtrace=True,
diagnose=True,
rotation="50 MB"
)
pb = PbTalker(logger)
sp = ServiceProcesser(pb=pb, logger=logger)
counter = 0 counter = 0
@ -31,8 +48,10 @@ def task():
print('\033[0;33mno work for this loop\033[0m') print('\033[0;33mno work for this loop\033[0m')
counter += 1 counter += 1
schedule.every().hour.at(":38").do(task) schedule.every().hour.at(":38").do(task)
task()
while True: while True:
schedule.run_pending() schedule.run_pending()
time.sleep(60) time.sleep(60)

View File

@ -144,6 +144,24 @@ def extract_and_convert_dates(input_string):
return ''.join(matches[0]) return ''.join(matches[0])
return None return None
def get_logger_level() -> str:
level_map = {
'silly': 'CRITICAL',
'verbose': 'DEBUG',
'info': 'INFO',
'warn': 'WARNING',
'error': 'ERROR',
}
level: str = os.environ.get('WS_LOG', 'info').lower()
if level not in level_map:
raise ValueError(
'WiseFlow LOG should support the values of `silly`, '
'`verbose`, `info`, `warn`, `error`'
)
return level_map.get(level, 'info')
""" """
# from InternLM/huixiangdou # from InternLM/huixiangdou
# another awsome work # another awsome work

View File

@ -1,65 +0,0 @@
from __future__ import annotations
import logging
import os
from typing import Optional
def _get_logger_level() -> str:
"""
refer to : https://docs.python.org/3/library/logging.html#logging-levels
"""
level_map = {
'silly': 'CRITICAL',
'verbose': 'DEBUG',
'info': 'INFO',
'warn': 'WARNING',
'error': 'ERROR',
'silent': 'NOTSET'
}
level: str = os.environ.get('WS_LOG', 'info').lower()
if level not in level_map:
raise ValueError(
'WiseFlow LOG should support the values of `silly`, '
'`verbose`, `info`, `warn`, `error`, `silent`'
)
return level_map.get(level, 'info')
def get_logger(name: Optional[str] = None, file: Optional[str] = None) -> logging.Logger:
"""get the logger object
Args:
name (str): the logger name
file (Optional[str], optional): the log file name. Defaults to None.
Examples:
logger = get_logger("WechatyPuppet")
logger = get_logger("WechatyPuppet", file="wechaty-puppet.log")
logger.info('log info ...')
Returns:
logging.Logger: the logger object
"""
dsw_log = _get_logger_level()
log_formatter = logging.Formatter(
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# create logger and set level to debug
logger = logging.getLogger(name)
logger.handlers = []
logger.setLevel(dsw_log)
logger.propagate = False
# create file handler and set level to debug
# file = os.environ.get(DSW_LOG_KEY, file)
if file:
file_handler = logging.FileHandler(file, 'a', encoding='utf-8')
file_handler.setLevel(dsw_log)
file_handler.setFormatter(log_formatter)
logger.addHandler(file_handler)
# create console handler and set level to info
console_handler = logging.StreamHandler()
console_handler.setLevel(dsw_log)
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
return logger

View File

@ -11,7 +11,7 @@ from bs4 import BeautifulSoup
# 国内的应用场景sogou搜索应该不错了还支持weixin、百科搜索 # 国内的应用场景sogou搜索应该不错了还支持weixin、百科搜索
# 海外的应用场景可以考虑使用duckduckgo或者google_search的sdk # 海外的应用场景可以考虑使用duckduckgo或者google_search的sdk
# 尽量还是不要自己host一个搜索引擎吧虽然有类似https://github.com/StractOrg/stract/tree/main的开源方案但毕竟这是两套工程 # 尽量还是不要自己host一个搜索引擎吧虽然有类似https://github.com/StractOrg/stract/tree/main的开源方案但毕竟这是两套工程
def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge: bool = False, logger=None) -> (int, list): def search_insight(keyword: str, logger, exist_urls: list[Union[str, Path]], knowledge: bool = False) -> (int, list):
""" """
搜索网页 搜索网页
:param keyword: 要搜索的主题 :param keyword: 要搜索的主题
@ -48,10 +48,8 @@ def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge:
if href_f not in exist_urls: if href_f not in exist_urls:
relist.append(href_f) relist.append(href_f)
except Exception as e: except Exception as e:
if logger: logger.error(f"search {url} error: {e}")
logger.error(f"search {url} error: {e}")
else:
print(f"search {url} error: {e}")
if not knowledge: if not knowledge:
url = f"https://www.sogou.com/sogou?ie=utf8&p=40230447&interation=1728053249&interV=&pid=sogou-wsse-7050094b04fd9aa3&query={keyword}" url = f"https://www.sogou.com/sogou?ie=utf8&p=40230447&interation=1728053249&interV=&pid=sogou-wsse-7050094b04fd9aa3&query={keyword}"
try: try:
@ -71,10 +69,7 @@ def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge:
if href_f not in exist_urls: if href_f not in exist_urls:
relist.append(href_f) relist.append(href_f)
except Exception as e: except Exception as e:
if logger: logger.error(f"search {url} error: {e}")
logger.error(f"search {url} error: {e}")
else:
print(f"search {url} error: {e}")
if not relist: if not relist:
return -7, [] return -7, []
@ -85,7 +80,7 @@ def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge:
if url in exist_urls: if url in exist_urls:
continue continue
exist_urls.append(url) exist_urls.append(url)
flag, value = simple_crawler(url) flag, value = simple_crawler(url, logger)
if flag != 11: if flag != 11:
continue continue
from_site = urlparse(url).netloc from_site = urlparse(url).netloc

View File

@ -2,16 +2,13 @@ import os
from pocketbase import PocketBase # Client also works the same from pocketbase import PocketBase # Client also works the same
from pocketbase.client import FileUpload from pocketbase.client import FileUpload
from typing import BinaryIO from typing import BinaryIO
from get_logger import get_logger
class PbTalker: class PbTalker:
def __init__(self) -> None: def __init__(self, logger) -> None:
self.project_dir = os.environ.get("PROJECT_DIR", "")
# 1. base initialization # 1. base initialization
os.makedirs(self.project_dir, exist_ok=True)
self.logger = get_logger(name='pb_talker', file=os.path.join(self.project_dir, 'pb_talker.log'))
url = f"http://{os.environ.get('PB_API_BASE', '127.0.0.1:8090')}" url = f"http://{os.environ.get('PB_API_BASE', '127.0.0.1:8090')}"
self.logger = logger
self.logger.debug(f"initializing pocketbase client: {url}") self.logger.debug(f"initializing pocketbase client: {url}")
self.client = PocketBase(url) self.client = PocketBase(url)
auth = os.environ.get('PB_API_AUTH', '') auth = os.environ.get('PB_API_AUTH', '')
@ -82,6 +79,3 @@ class PbTalker:
self.logger.error(f"pocketbase update failed: {e}") self.logger.error(f"pocketbase update failed: {e}")
return '' return ''
return res.id return res.id
pb = PbTalker()

View File

@ -14,4 +14,5 @@ faiss-cpu # for cpu-only environment
pocketbase==0.10.0 pocketbase==0.10.0
gne gne
chardet chardet
schedule schedule
loguru

View File

@ -10,15 +10,10 @@ from datetime import datetime, date
from requests.compat import urljoin from requests.compat import urljoin
import chardet import chardet
from general_utils import extract_and_convert_dates from general_utils import extract_and_convert_dates
from get_logger import get_logger
import os
header = { header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'} 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}
project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger = get_logger(name='general_scraper', file=os.path.join(project_dir, f'general_scraper.log'))
def tag_visible(element: Comment) -> bool: def tag_visible(element: Comment) -> bool:
@ -75,7 +70,7 @@ sys_info = '''你是一个html网页解析器你将接收一段用户从网
''' '''
def llm_crawler(url: str | Path) -> (int, dict): def llm_crawler(url: str | Path, logger) -> (int, dict):
""" """
返回文章信息dict和flag负数为报错0为没有结果11为成功 返回文章信息dict和flag负数为报错0为没有结果11为成功
参考https://mp.weixin.qq.com/s/4J-kofsfFDiV1FxGlTJLfA 参考https://mp.weixin.qq.com/s/4J-kofsfFDiV1FxGlTJLfA
@ -152,7 +147,7 @@ def llm_crawler(url: str | Path) -> (int, dict):
return 11, info return 11, info
def general_scraper(site: str, expiration: date, existing: list[str]) -> list[dict]: def general_scraper(site: str, expiration: date, existing: list[str], logger) -> list[dict]:
try: try:
with httpx.Client() as client: with httpx.Client() as client:
response = client.get(site, headers=header, timeout=30) response = client.get(site, headers=header, timeout=30)
@ -171,9 +166,9 @@ def general_scraper(site: str, expiration: date, existing: list[str]) -> list[di
if site in existing: if site in existing:
logger.debug(f"{site} has been crawled before, skip it") logger.debug(f"{site} has been crawled before, skip it")
return [] return []
flag, result = simple_crawler(site) flag, result = simple_crawler(site, logger)
if flag != 11: if flag != 11:
flag, result = llm_crawler(site) flag, result = llm_crawler(site, logger)
if flag != 11: if flag != 11:
return [] return []
publish_date = datetime.strptime(result['publish_time'], '%Y%m%d') publish_date = datetime.strptime(result['publish_time'], '%Y%m%d')
@ -189,9 +184,9 @@ def general_scraper(site: str, expiration: date, existing: list[str]) -> list[di
logger.debug(f"{url} has been crawled before, skip it") logger.debug(f"{url} has been crawled before, skip it")
continue continue
existing.append(url) existing.append(url)
flag, result = simple_crawler(url) flag, result = simple_crawler(url, logger)
if flag != 11: if flag != 11:
flag, result = llm_crawler(url) flag, result = llm_crawler(url, logger)
if flag != 11: if flag != 11:
continue continue
publish_date = datetime.strptime(result['publish_time'], '%Y%m%d') publish_date = datetime.strptime(result['publish_time'], '%Y%m%d')

View File

@ -5,19 +5,14 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
from general_utils import extract_and_convert_dates from general_utils import extract_and_convert_dates
import chardet import chardet
from get_logger import get_logger
import os
extractor = GeneralNewsExtractor() extractor = GeneralNewsExtractor()
header = { header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'} 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}
project_dir = os.environ.get("PROJECT_DIR", "")
os.makedirs(project_dir, exist_ok=True)
logger = get_logger(name='simple_crawler', file=os.path.join(project_dir, f'simple_crawler.log'))
def simple_crawler(url: str | Path, logger) -> (int, dict):
def simple_crawler(url: str | Path) -> (int, dict):
""" """
返回文章信息dict和flag负数为报错0为没有结果11为成功 返回文章信息dict和flag负数为报错0为没有结果11为成功
""" """

View File

@ -1,11 +1,9 @@
import os import os
import json import json
import requests import requests
from get_logger import get_logger
from datetime import datetime, timedelta, date from datetime import datetime, timedelta, date
from scrapers import scraper_map from scrapers import scraper_map
from scrapers.general_scraper import general_scraper from scrapers.general_scraper import general_scraper
from pb_api import pb
from urllib.parse import urlparse from urllib.parse import urlparse
from get_insight import get_insight from get_insight import get_insight
from general_utils import is_chinese from general_utils import is_chinese
@ -20,14 +18,13 @@ expiration_str = expiration_date.strftime("%Y%m%d")
class ServiceProcesser: class ServiceProcesser:
def __init__(self, name: str = 'Work_Processor', record_snapshot: bool = False): def __init__(self, pb, logger, record_snapshot: bool = False):
self.name = name
self.project_dir = os.environ.get("PROJECT_DIR", "") self.project_dir = os.environ.get("PROJECT_DIR", "")
# 1. base initialization # 1. base initialization
self.cache_url = os.path.join(self.project_dir, name) self.cache_url = os.path.join(self.project_dir, 'scanning_task')
os.makedirs(self.cache_url, exist_ok=True) os.makedirs(self.cache_url, exist_ok=True)
self.logger = get_logger(name=self.name, file=os.path.join(self.project_dir, f'{self.name}.log')) self.pb = pb
self.logger = logger
# 2. load the llm # 2. load the llm
# self.llm = LocalLlmWrapper() # if you use the local-llm # self.llm = LocalLlmWrapper() # if you use the local-llm
@ -39,7 +36,7 @@ class ServiceProcesser:
else: else:
self.snap_short_server = None self.snap_short_server = None
self.logger.info(f'{self.name} init success.') self.logger.info('scanning task init success.')
def __call__(self, expiration: date = expiration_date, sites: list[str] = None): def __call__(self, expiration: date = expiration_date, sites: list[str] = None):
# 先清空一下cache # 先清空一下cache
@ -48,7 +45,7 @@ class ServiceProcesser:
self.logger.debug(f'clear cache -- {cache}') self.logger.debug(f'clear cache -- {cache}')
# 从pb数据库中读取所有文章url # 从pb数据库中读取所有文章url
# 这里publish_time用int格式综合考虑下这个是最容易操作的模式虽然糙了点 # 这里publish_time用int格式综合考虑下这个是最容易操作的模式虽然糙了点
existing_articles = pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}') existing_articles = self.pb.read(collection_name='articles', fields=['id', 'title', 'url'], filter=f'publish_time>{expiration_str}')
all_title = {} all_title = {}
existings = [] existings = []
for article in existing_articles: for article in existing_articles:
@ -64,7 +61,7 @@ class ServiceProcesser:
if site in scraper_map: if site in scraper_map:
futures.append(executor.submit(scraper_map[site], expiration, existings)) futures.append(executor.submit(scraper_map[site], expiration, existings))
else: else:
futures.append(executor.submit(general_scraper, site, expiration, existings)) futures.append(executor.submit(general_scraper, site, expiration, existings, self.logger))
concurrent.futures.wait(futures) concurrent.futures.wait(futures)
for future in futures: for future in futures:
try: try:
@ -83,7 +80,7 @@ class ServiceProcesser:
value['content'] = f"({from_site} 报道){value['content']}" value['content'] = f"({from_site} 报道){value['content']}"
value['images'] = json.dumps(value['images']) value['images'] = json.dumps(value['images'])
article_id = pb.add(collection_name='articles', body=value) article_id = self.pb.add(collection_name='articles', body=value)
if article_id: if article_id:
cache[article_id] = value cache[article_id] = value
@ -103,13 +100,13 @@ class ServiceProcesser:
for insight in new_insights: for insight in new_insights:
if not insight['content']: if not insight['content']:
continue continue
insight_id = pb.add(collection_name='insights', body=insight) insight_id = self.pb.add(collection_name='insights', body=insight)
if not insight_id: if not insight_id:
self.logger.warning(f'write insight {insight} to pb failed, writing to cache_file') self.logger.warning(f'write insight {insight} to pb failed, writing to cache_file')
with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', encoding='utf-8') as f: with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', encoding='utf-8') as f:
json.dump(insight, f, ensure_ascii=False, indent=4) json.dump(insight, f, ensure_ascii=False, indent=4)
for article_id in insight['articles']: for article_id in insight['articles']:
raw_article = pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"') raw_article = self.pb.read(collection_name='articles', fields=['abstract', 'title', 'translation_result'], filter=f'id="{article_id}"')
if not raw_article or not raw_article[0]: if not raw_article or not raw_article[0]:
self.logger.warning(f'get article {article_id} failed, skipping') self.logger.warning(f'get article {article_id} failed, skipping')
continue continue
@ -119,11 +116,11 @@ class ServiceProcesser:
continue continue
translate_text = text_translate([raw_article[0]['title'], raw_article[0]['abstract']], target_language='zh', logger=self.logger) translate_text = text_translate([raw_article[0]['title'], raw_article[0]['abstract']], target_language='zh', logger=self.logger)
if translate_text: if translate_text:
related_id = pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id}) related_id = self.pb.add(collection_name='article_translation', body={'title': translate_text[0], 'abstract': translate_text[1], 'raw': article_id})
if not related_id: if not related_id:
self.logger.warning(f'write article_translation {article_id} failed') self.logger.warning(f'write article_translation {article_id} failed')
else: else:
_ = pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id}) _ = self.pb.update(collection_name='articles', id=article_id, body={'translation_result': related_id})
if not _: if not _:
self.logger.warning(f'update article {article_id} failed') self.logger.warning(f'update article {article_id} failed')
else: else:
@ -139,7 +136,7 @@ class ServiceProcesser:
else: else:
text_for_insight = text_translate([value['title']], logger=self.logger) text_for_insight = text_translate([value['title']], logger=self.logger)
if text_for_insight: if text_for_insight:
insight_id = pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]}) insight_id = self.pb.add(collection_name='insights', body={'content': text_for_insight[0], 'articles': [key]})
if not insight_id: if not insight_id:
self.logger.warning(f'write insight {text_for_insight[0]} to pb failed, writing to cache_file') self.logger.warning(f'write insight {text_for_insight[0]} to pb failed, writing to cache_file')
with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a', with open(os.path.join(self.cache_url, 'cache_insights.json'), 'a',
@ -156,7 +153,7 @@ class ServiceProcesser:
try: try:
snapshot = requests.get(f"{self.snap_short_server}/zip", {'url': value['url']}, timeout=60) snapshot = requests.get(f"{self.snap_short_server}/zip", {'url': value['url']}, timeout=60)
file = open(snapshot.text, 'rb') file = open(snapshot.text, 'rb')
_ = pb.upload('articles', key, 'snapshot', key, file) _ = self.pb.upload('articles', key, 'snapshot', key, file)
file.close() file.close()
except Exception as e: except Exception as e:
self.logger.warning(f'error when snapshot {value["url"]}, {e}') self.logger.warning(f'error when snapshot {value["url"]}, {e}')