improve the crawler

This commit is contained in:
bigbrother666 2024-04-09 11:38:51 +08:00
parent 25abb316b3
commit 01e2aaf3b1
8 changed files with 114 additions and 190 deletions

View File

@ -16,9 +16,8 @@ else:
urls = []
sp = ServiceProcesser()
sp(sites=urls)
'''
def task():
sp(sites=urls)
@ -29,6 +28,3 @@ schedule.every().day.at("01:17").do(task)
while True:
schedule.run_pending()
time.sleep(60)
site1 = https://www.welivesecurity.com/en/
site2 = https://www.scmagazine.com/
'''

View File

@ -125,6 +125,25 @@ def is_chinese(string):
return (non_chinese_count/len(string)) < 0.68
def extract_and_convert_dates(input_string):
# 定义匹配不同日期格式的正则表达式
patterns = [
r'(\d{4})-(\d{2})-(\d{2})', # 匹配YYYY-MM-DD格式
r'(\d{4})/(\d{2})/(\d{2})', # 匹配YYYY/MM/DD格式
r'(\d{4})\.(\d{2})\.(\d{2})', # 匹配YYYY.MM.DD格式
r'(\d{4})\\(\d{2})\\(\d{2})', # 匹配YYYY\MM\DD格式
r'(\d{4})(\d{2})(\d{2})' # 匹配YYYYMMDD格式
]
matches = []
for pattern in patterns:
matches = re.findall(pattern, input_string)
if matches:
break
if matches:
return ''.join(matches[0])
return None
"""
# from InternLM/huixiangdou
# another awsome work

View File

@ -85,7 +85,7 @@ def search_insight(keyword: str, exist_urls: list[Union[str, Path]], knowledge:
if url in exist_urls:
continue
exist_urls.append(url)
flag, value = simple_crawler(url, logger)
flag, value = simple_crawler(url)
if flag != 11:
continue
from_site = urlparse(url).netloc

View File

@ -15,7 +15,6 @@
输入:
- expiration datetime的date.date()对象,爬虫应该只抓取这之后(含这一天)的文章
- existings[str], 数据库已有文章的url列表爬虫应该忽略这个列表里面的url
- logger主流程的logger对象如果爬虫需要单独logger这个logger接收了可以不用
输出:
- [dict]返回结果列表每个dict代表一个文章格式如下

View File

@ -2,18 +2,22 @@ from pathlib import Path
from urllib.parse import urlparse
import re
from .simple_crawler import simple_crawler
import json
import requests
import httpx
from bs4 import BeautifulSoup
from bs4.element import Comment
from llms.dashscope_wrapper import dashscope_llm
from datetime import datetime, date
from requests.compat import urljoin
import chardet
from general_utils import extract_and_convert_dates
from get_logger import get_logger
import os
header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}
project_dir = os.environ.get("PROJECT_DIR", "")
logger = get_logger(name='general_scraper', file=os.path.join(project_dir, f'general_scraper.log'))
def tag_visible(element: Comment) -> bool:
@ -35,83 +39,56 @@ def text_from_soup(soup: BeautifulSoup) -> str:
def parse_html_content(out: str) -> dict:
# 发现llm出来的结果有时会在键值或者内容的引号外面出现\n \t安全起见全部去除反正后续分析时llm也不看内容的换行这些
dct = {'title': '', 'abstract': '', 'content': '', 'publish_time': ''}
pattern = re.compile(r'\"\"\"(.*?)\"\"\"', re.DOTALL)
result = pattern.findall(out)
out = result[0]
dict_str = out.strip("```").strip("python").strip("json").strip()
dict_str = dict_str.replace("\n", "").replace("\t", "")
# 先正则解析出{}中的内容
dict_str = re.findall(r'{(.*?)}', dict_str)
# dict_str = dict_str[0].replace("'", '"') #会误伤
# json loads 要求双引号, 且需要把\n等转译
dct = json.loads('{' + dict_str[0] + '}')
date_str = re.findall(r"\d{4}-\d{2}-\d{2}", dct['publish_time'])
if date_str:
dct['publish_time'] = date_str[0].replace("-", "")
result = result[0].strip()
dict_strs = result.split('||')
if not dict_strs:
dict_strs = result.split('|||')
if not dict_strs:
return dct
if len(dict_strs) == 3:
dct['title'] = dict_strs[0].strip()
dct['content'] = dict_strs[1].strip()
elif len(dict_strs) == 4:
dct['title'] = dict_strs[0].strip()
dct['content'] = dict_strs[2].strip()
dct['abstract'] = dict_strs[1].strip()
else:
date_str = re.findall(r"\d{4}\.\d{2}\.\d{2}", dct['publish_time'])
if date_str:
dct['publish_time'] = date_str[0].replace(".", "")
else:
date_str = re.findall(r"\d{4}\d{2}\d{2}", dct['publish_time'])
if date_str:
dct['publish_time'] = date_str[0]
else:
dct['publish_time'] = datetime.strftime(datetime.today(), "%Y%m%d")
return dct
date_str = extract_and_convert_dates(dict_strs[-1])
if date_str:
dct['publish_time'] = date_str
else:
dct['publish_time'] = datetime.strftime(datetime.today(), "%Y%m%d")
return dct
sys_info = '''你是一个html网页解析器你将接收一段用户从网页html文件中提取的文本请解析出其标题、摘要、内容和发布日期。
发布日期的格式为XXXX-XX-XX如果找不到则为空内容不要包含标题作者和发布日期
请务必按照Python字典的格式输出key和value使用双引号包裹key分别为titleabstractcontent和publish_time输出结果请整体用三引号包裹如下所示
# qwen1.5-72b解析json格式太容易出错网页上的情况太多比如经常直接使用英文的"这样后面json.loads就容易出错……
sys_info = '''你是一个html网页解析器你将接收一段用户从网页html文件中提取的文本请解析出其标题、摘要、内容和发布日期发布日期格式为YYYY-MM-DD
结果请按照以下格式返回整体用三引号包裹
"""
{"title": "解析出的标题", "abstract": "解析出的摘要", "content": "解析出的内容", "publish_time": "解析出的发布日期XXXX-XX-XX"}
"""'''
标题||摘要||内容||发布日期XXXX-XX-XX
"""
'''
def llm_crawler(url: str | Path, logger=None) -> (int, dict):
def llm_crawler(url: str | Path) -> (int, dict):
"""
返回文章信息dict和flag负数为报错0为没有结果11为成功
参考https://mp.weixin.qq.com/s/4J-kofsfFDiV1FxGlTJLfA
测试URL
url = "https://so.html5.qq.com/page/real/search_news?docid=70000021_40665eb6afe80152"
url = "https://mp.weixin.qq.com/s?src=11&timestamp=1709999167&ver=5128&signature=e0Tssc4COc*p-RkKaPwUMrGePUxko8N621VxARnI8uKDg*l5C7Z8gBC6RDUAnyGqvmzJ5WEzvaO-T7GvMRw9LwNaJS3Hh2tyaITdmsaVtY9JsSmsidX6u4SqxirGsRdo&new=1"
"""
# 发送 HTTP 请求获取网页内容
try:
response = requests.get(url, timeout=60)
except:
if logger:
logger.error(f"cannot connect {url}")
else:
print(f"cannot connect {url}")
return -7, {}
if response.status_code != 200:
if logger:
logger.error(f"cannot connect {url}")
else:
print(f"cannot connect {url}")
return -7, {}
rawdata = response.content
encoding = chardet.detect(rawdata)['encoding']
if encoding is not None and encoding.lower() == 'utf-8':
try:
with httpx.Client() as client:
response = client.get(url, headers=header, timeout=30)
rawdata = response.content
encoding = chardet.detect(rawdata)['encoding']
text = rawdata.decode(encoding)
except:
if logger:
logger.error(f"{url} decode error, aborting")
else:
print(f"{url} decode error, aborting")
return 0, {}
else:
if logger:
logger.error(f"{url} undetected coding, aborting")
else:
print(f"{url} undetected coding, aborting")
return 0, {}
except Exception as e:
logger.error(e)
return -7, {}
# 使用 BeautifulSoup 解析 HTML 内容
soup = BeautifulSoup(text, "html.parser")
@ -120,17 +97,12 @@ def llm_crawler(url: str | Path, logger=None) -> (int, dict):
html_lines = [line.strip() for line in html_lines if line.strip()]
html_text = "\n".join(html_lines)
if len(html_text) > 29999:
if logger:
logger.warning(f"{url} content too long for llm parsing")
else:
print(f"{url} content too long for llm parsing")
logger.warning(f"{url} content too long for llm parsing")
return 0, {}
if not html_text or html_text.startswith('服务器错误') or html_text.startswith('您访问的页面') or html_text.startswith('403'):
if logger:
logger.warning(f"can not get {url} from the Internet")
else:
print(f"can not get {url} from the Internet")
if not html_text or html_text.startswith('服务器错误') or html_text.startswith('您访问的页面') or html_text.startswith('403')\
or html_text.startswith('出错了'):
logger.warning(f"can not get {url} from the Internet")
return -7, {}
messages = [
@ -140,19 +112,13 @@ def llm_crawler(url: str | Path, logger=None) -> (int, dict):
llm_output = dashscope_llm(messages, "qwen1.5-72b-chat", logger=logger)
try:
info = parse_html_content(llm_output)
except Exception as e:
except Exception:
msg = f"can not parse {llm_output}"
if logger:
logger.warning(msg)
else:
print(msg)
logger.debug(msg)
return 0, {}
if len(info['title']) < 5 or len(info['content']) < 24:
if logger:
logger.warning(f"{info} not valid")
else:
print(f"{info} not valid")
if len(info['title']) < 4 or len(info['content']) < 24:
logger.debug(f"{info} not valid")
return 0, {}
info["url"] = str(url)
@ -185,21 +151,12 @@ def llm_crawler(url: str | Path, logger=None) -> (int, dict):
return 11, info
def general_scraper(site: str, expiration: date, existing: list[str], logger=None) -> list[dict]:
def general_scraper(site: str, expiration: date, existing: list[str]) -> list[dict]:
try:
response = requests.get(site, header, timeout=60)
except:
if logger:
logger.error(f"cannot connect {site}")
else:
print(f"cannot connect {site}")
return []
if response.status_code != 200:
if logger:
logger.error(f"cannot connect {site}")
else:
print(f"cannot connect {site}")
with httpx.Client() as client:
response = client.get(site, headers=header, timeout=30)
except Exception as e:
logger.error(e)
return []
page_source = response.text
@ -209,25 +166,18 @@ def general_scraper(site: str, expiration: date, existing: list[str], logger=Non
base_url = parsed_url.scheme + '://' + parsed_url.netloc
urls = [urljoin(base_url, link["href"]) for link in soup.find_all("a", href=True)]
if not urls:
if logger:
logger.warning(f"can not find any link from {site}, maybe it's an article site...")
logger.warning(f"can not find any link from {site}, maybe it's an article site...")
if site in existing:
if logger:
logger.warning(f"{site} has been crawled before, skip it")
else:
print(f"{site} has been crawled before, skip it")
logger.debug(f"{site} has been crawled before, skip it")
return []
flag, result = simple_crawler(site, logger=logger)
flag, result = simple_crawler(site)
if flag != 11:
flag, result = llm_crawler(site, logger=logger)
flag, result = llm_crawler(site)
if flag != 11:
return []
publish_date = datetime.strptime(result['publish_time'], '%Y%m%d')
if publish_date.date() < expiration:
if logger:
logger.warning(f"{site} is too old, skip it")
else:
print(f"{site} is too old, skip it")
logger.debug(f"{site} is too old, skip it")
return []
else:
return [result]
@ -235,23 +185,17 @@ def general_scraper(site: str, expiration: date, existing: list[str], logger=Non
articles = []
for url in urls:
if url in existing:
if logger:
logger.warning(f"{url} has been crawled before, skip it")
else:
print(f"{url} has been crawled before, skip it")
logger.debug(f"{url} has been crawled before, skip it")
continue
existing.append(url)
flag, result = simple_crawler(url, logger=logger)
flag, result = simple_crawler(url)
if flag != 11:
flag, result = llm_crawler(url, logger=logger)
flag, result = llm_crawler(url)
if flag != 11:
continue
publish_date = datetime.strptime(result['publish_time'], '%Y%m%d')
if publish_date.date() < expiration:
if logger:
logger.warning(f"{url} is too old, skip it")
else:
print(f"{url} is too old, skip it")
logger.debug(f"{url} is too old, skip it")
else:
articles.append(result)

View File

@ -1,90 +1,54 @@
from gne import GeneralNewsExtractor
import requests
import httpx
from bs4 import BeautifulSoup
from datetime import datetime
from pathlib import Path
import re
from general_utils import extract_and_convert_dates
import chardet
from get_logger import get_logger
import os
extractor = GeneralNewsExtractor()
header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}
project_dir = os.environ.get("PROJECT_DIR", "")
logger = get_logger(name='simple_crawler', file=os.path.join(project_dir, f'simple_crawler.log'))
def simple_crawler(url: str | Path, logger=None) -> (int, dict):
def simple_crawler(url: str | Path) -> (int, dict):
"""
返回文章信息dict和flag负数为报错0为没有结果11为成功
"""
try:
response = requests.get(url, header, timeout=60)
except:
if logger:
logger.error(f"cannot connect {url}")
else:
print(f"cannot connect {url}")
return -7, {}
if response.status_code != 200:
if logger:
logger.error(f"cannot connect {url}")
else:
print(f"cannot connect {url}")
return -7, {}
rawdata = response.content
encoding = chardet.detect(rawdata)['encoding']
if encoding is not None and encoding.lower() == 'utf-8':
try:
with httpx.Client() as client:
response = client.get(url, headers=header, timeout=30)
rawdata = response.content
encoding = chardet.detect(rawdata)['encoding']
text = rawdata.decode(encoding)
except:
if logger:
logger.error(f"{url} decode error, aborting")
else:
print(f"{url} decode error, aborting")
return 0, {}
else:
if logger:
logger.error(f"{url} undetected coding, aborting")
else:
print(f"{url} undetected coding, aborting")
return 0, {}
except Exception as e:
logger.error(e)
return -7, {}
result = extractor.extract(text)
if not result:
if logger:
logger.error(f"gne cannot extract {url}")
else:
print(f"gne cannot extract {url}")
logger.error(f"gne cannot extract {url}")
return 0, {}
if len(result['title']) < 5 or len(result['content']) < 24:
if logger:
logger.warning(f"{result} not valid")
else:
print(f"{result} not valid")
if len(result['title']) < 4 or len(result['content']) < 24:
logger.info(f"{result} not valid")
return 0, {}
if result['title'].startswith('服务器错误') or result['title'].startswith('您访问的页面') or result['title'].startswith('403'):
if logger:
logger.warning(f"can not get {url} from the Internet")
else:
print(f"can not get {url} from the Internet")
if result['title'].startswith('服务器错误') or result['title'].startswith('您访问的页面') or result['title'].startswith('403')\
or result['content'].startswith('This website uses cookies') or result['title'].startswith('出错了'):
logger.warning(f"can not get {url} from the Internet")
return -7, {}
date_str = re.findall(r"\d{4}-\d{2}-\d{2}", result['publish_time'])
date_str = extract_and_convert_dates(result['publish_time'])
if date_str:
result['publish_time'] = date_str[0].replace("-", "")
result['publish_time'] = date_str
else:
date_str = re.findall(r"\d{4}\.\d{2}\.\d{2}", result['publish_time'])
if date_str:
result['publish_time'] = date_str[0].replace(".", "")
else:
date_str = re.findall(r"\d{4}\d{2}\d{2}", result['publish_time'])
if date_str:
result['publish_time'] = date_str[0]
else:
result['publish_time'] = datetime.strftime(datetime.today(), "%Y%m%d")
result['publish_time'] = datetime.strftime(datetime.today(), "%Y%m%d")
soup = BeautifulSoup(text, "html.parser")
try:
@ -93,7 +57,7 @@ def simple_crawler(url: str | Path, logger=None) -> (int, dict):
result['abstract'] = meta_description["content"]
else:
result['abstract'] = ''
except:
except Exception:
result['abstract'] = ''
result['url'] = str(url)

View File

@ -59,13 +59,13 @@ class ServiceProcesser:
# 定义扫描源列表如果不指定就默认遍历scraper_map, 另外这里还要考虑指定的source不在scraper_map的情况这时应该使用通用爬虫
sources = sites if sites else list(scraper_map.keys())
new_articles = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for site in sources:
if site in scraper_map:
futures.append(executor.submit(scraper_map[site], expiration, existings, self.logger))
futures.append(executor.submit(scraper_map[site], expiration, existings))
else:
futures.append(executor.submit(general_scraper, site, expiration, existings, self.logger))
futures.append(executor.submit(general_scraper, site, expiration, existings))
concurrent.futures.wait(futures)
for future in futures:
try:

View File

@ -11,4 +11,6 @@ report_type = 网络安全情报
[sites]
site3 = https://www.hackread.com/
site2 = http://sh.people.com.cn/
site1 = https://www.xuexi.cn/
site1 = https://www.xuexi.cn/
site4 = https://www.defensenews.com/
site5 = https://www.meritalk.com