scrapers updated

This commit is contained in:
bigbrother666 2024-06-19 10:05:10 +08:00
parent 3859d8974a
commit 82f0041469
20 changed files with 457 additions and 429 deletions

View File

@ -138,7 +138,7 @@ For commercial use and customization cooperation, please contact **Email: 352529
- Commercial customers, please register with us. The product promises to be free forever. - Commercial customers, please register with us. The product promises to be free forever.
- For customized customers, we provide the following services according to your sources and business needs: - For customized customers, we provide the following services according to your sources and business needs:
- Custom proprietary parsers - Dedicated crawler and parser for customer business scenario sources
- Customized information extraction and classification strategies - Customized information extraction and classification strategies
- Targeted LLM recommendations or even fine-tuning services - Targeted LLM recommendations or even fine-tuning services
- Private deployment services - Private deployment services

View File

@ -141,7 +141,7 @@ SiliconFlow 在线推理服务兼容openai SDK并同时提供上述三个模
- 商用客户请联系我们报备登记,产品承诺永远免费。) - 商用客户请联系我们报备登记,产品承诺永远免费。)
- 对于定制客户,我们会针对您的信源和业务需求提供如下服务: - 对于定制客户,我们会针对您的信源和业务需求提供如下服务:
- 定制专有解析器 - 针对客户业务场景信源的专用爬虫和解析器
- 定制信息提取和分类策略 - 定制信息提取和分类策略
- 针对性llm推荐甚至微调服务 - 针对性llm推荐甚至微调服务
- 私有化部署服务 - 私有化部署服务

View File

@ -136,7 +136,7 @@ Für kommerzielle Nutzung und maßgeschneiderte Kooperationen kontaktieren Sie u
- Kommerzielle Kunden, bitte registrieren Sie sich bei uns. Das Produkt verspricht für immer kostenlos zu sein. - Kommerzielle Kunden, bitte registrieren Sie sich bei uns. Das Produkt verspricht für immer kostenlos zu sein.
- Für maßgeschneiderte Kunden bieten wir folgende Dienstleistungen basierend auf Ihren Quellen und geschäftlichen Anforderungen: - Für maßgeschneiderte Kunden bieten wir folgende Dienstleistungen basierend auf Ihren Quellen und geschäftlichen Anforderungen:
- Benutzerdefinierte proprietäre Parser - Dedizierter Crawler und Parser für Kunden-Geschäftsszenario-Quellen
- Angepasste Strategien zur Informationsextraktion und -klassifizierung - Angepasste Strategien zur Informationsextraktion und -klassifizierung
- Zielgerichtete LLM-Empfehlungen oder sogar Feinabstimmungsdienste - Zielgerichtete LLM-Empfehlungen oder sogar Feinabstimmungsdienste
- Dienstleistungen für private Bereitstellungen - Dienstleistungen für private Bereitstellungen

View File

@ -139,7 +139,7 @@ Pour une utilisation commerciale et des coopérations de personnalisation, veuil
- Clients commerciaux, veuillez vous inscrire auprès de nous. Le produit promet d'être gratuit pour toujours. - Clients commerciaux, veuillez vous inscrire auprès de nous. Le produit promet d'être gratuit pour toujours.
- Pour les clients ayant des besoins spécifiques, nous offrons les services suivants en fonction de vos sources et besoins commerciaux : - Pour les clients ayant des besoins spécifiques, nous offrons les services suivants en fonction de vos sources et besoins commerciaux :
- Parseurs propriétaires personnalisés - Crawler et analyseur dédiés pour les sources de scénarios commerciaux des clients
- Stratégies d'extraction et de classification de l'information sur mesure - Stratégies d'extraction et de classification de l'information sur mesure
- Recommandations LLM ciblées ou même services de fine-tuning - Recommandations LLM ciblées ou même services de fine-tuning
- Services de déploiement privé - Services de déploiement privé

View File

@ -136,7 +136,7 @@ SiliconFlow のオンライン推論サービスはOpenAI SDKと互換性があ
- 商用顧客の方は、登録をお願いします。この製品は永久に無料であることをお約束します。 - 商用顧客の方は、登録をお願いします。この製品は永久に無料であることをお約束します。
- カスタマイズが必要な顧客のために、ソースとビジネスニーズに応じて以下のサービスを提供します: - カスタマイズが必要な顧客のために、ソースとビジネスニーズに応じて以下のサービスを提供します:
- カスタム専用パーサー - お客様のビジネスシーンソース用の専用クローラーとパーサー
- カスタマイズされた情報抽出と分類戦略 - カスタマイズされた情報抽出と分類戦略
- 特定の LLM 推奨または微調整サービス - 特定の LLM 推奨または微調整サービス
- プライベートデプロイメントサービス - プライベートデプロイメントサービス

View File

@ -32,9 +32,10 @@ see more (when backend started) http://127.0.0.1:7777/docs
``` ```
wiseflow wiseflow
|- dockerfiles |- dockerfiles
|- tasks.py |- ...
|- backend.py
|- core |- core
|- tasks.py
|- backend.py
|- insights |- insights
|- __init__.py # main process |- __init__.py # main process
|- get_info.py # module use llm to get a summary of information and match tags |- get_info.py # module use llm to get a summary of information and match tags
@ -48,6 +49,6 @@ wiseflow
|- utils # tools |- utils # tools
``` ```
Although the two general-purpose page parsers included in wiseflow can be applied to the parsing of most static pages, for actual business, we still recommend that customers subscribe to our professional information service (supporting designated sources), or write their own proprietary crawlers. Although the general_scraper included in wiseflow can be applied to the parsing of most static pages, for actual business, we still recommend that customers to write their own crawlers aiming the actual info source.
See core/scrapers/README.md for integration instructions for proprietary crawlers See core/scrapers/README.md for integration instructions for proprietary crawlers

View File

@ -2,7 +2,7 @@ from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel from pydantic import BaseModel
from typing import Literal, Optional from typing import Literal, Optional
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from insights import pipeline from insights import message_manager
class Request(BaseModel): class Request(BaseModel):
@ -41,5 +41,5 @@ def read_root():
@app.post("/feed") @app.post("/feed")
async def call_to_feed(background_tasks: BackgroundTasks, request: Request): async def call_to_feed(background_tasks: BackgroundTasks, request: Request):
background_tasks.add_task(pipeline, _input=request.model_dump()) background_tasks.add_task(message_manager, _input=request.model_dump())
return {"msg": "received well"} return {"msg": "received well"}

View File

@ -1,13 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from scrapers import * from scrapers.general_crawler import general_crawler
from utils.general_utils import extract_urls, compare_phrase_with_list from utils.general_utils import extract_urls, compare_phrase_with_list
from .get_info import get_info, pb, project_dir, logger, info_rewrite from .get_info import get_info, pb, project_dir, logger, info_rewrite
import os import os
import json import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from urllib.parse import urlparse
import re import re
import asyncio
# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg # The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg
@ -19,119 +19,57 @@ expiration_days = 3
existing_urls = [url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']] existing_urls = [url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']]
async def get_articles(urls: list[str], expiration: datetime, cache: dict = {}) -> list[dict]: async def pipeline(url: str, cache: dict = {}):
articles = [] working_list = [url]
for url in urls: while working_list:
logger.debug(f"fetching {url}") url = working_list[0]
if url.startswith('https://mp.weixin.qq.com') or url.startswith('http://mp.weixin.qq.com'): working_list.pop(0)
flag, result = await mp_crawler(url, logger) logger.debug(f"start processing {url}")
else:
flag, result = await general_crawler(url, logger)
if flag != 11: # get article process
flag, result = await general_crawler(url, logger)
if flag == 1:
logger.info('get new url list, add to work list')
to_add = [u for u in result if u not in existing_urls and u not in working_list]
working_list.extend(to_add)
continue
elif flag <= 0:
logger.error("got article failed, pipeline abort")
# existing_urls.append(url)
continue continue
existing_urls.append(url) expiration = datetime.now() - timedelta(days=expiration_days)
expiration_date = expiration.strftime('%Y-%m-%d') expiration_date = expiration.strftime('%Y-%m-%d')
article_date = int(result['publish_time']) article_date = int(result['publish_time'])
if article_date < int(expiration_date.replace('-', '')): if article_date < int(expiration_date.replace('-', '')):
logger.info(f"publish date is {article_date}, too old, skip") logger.info(f"publish date is {article_date}, too old, skip")
existing_urls.append(url)
continue continue
if url in cache: for k, v in cache.items():
for k, v in cache[url].items(): if v:
if v: result[k] = v
result[k] = v
articles.append(result)
return articles # get info process
logger.debug(f"article: {result['title']}")
insights = get_info(f"title: {result['title']}\n\ncontent: {result['content']}")
article_id = pb.add(collection_name='articles', body=result)
async def pipeline(_input: dict):
cache = {}
source = _input['user_id'].split('@')[-1]
logger.debug(f"received new task, user: {source}, Addition info: {_input['addition']}")
global existing_urls
expiration_date = datetime.now() - timedelta(days=expiration_days)
# If you can get the url list of the articles from the input content, then use the get_articles function here directly;
# otherwise, you should use a proprietary site scaper (here we provide a general scraper to ensure the basic effect)
if _input['type'] == 'publicMsg':
items = item_pattern.findall(_input["content"])
# Iterate through all < item > content, extracting < url > and < summary >
for item in items:
url_match = url_pattern.search(item)
url = url_match.group(1) if url_match else None
if not url:
logger.warning(f"can not find url in \n{item}")
continue
# URL processing, http is replaced by https, and the part after chksm is removed.
url = url.replace('http://', 'https://')
cut_off_point = url.find('chksm=')
if cut_off_point != -1:
url = url[:cut_off_point-1]
if url in existing_urls:
logger.debug(f"{url} has been crawled, skip")
continue
if url in cache:
logger.debug(f"{url} already find in item")
continue
summary_match = summary_pattern.search(item)
summary = summary_match.group(1) if summary_match else None
cache[url] = {'source': source, 'abstract': summary}
articles = await get_articles(list(cache.keys()), expiration_date, cache)
elif _input['type'] == 'site':
# for the site url, Usually an article list page or a website homepage
# need to get the article list page
# You can use a general scraper, or you can customize a site-specific crawler, see scrapers/README_CN.md
urls = extract_urls(_input['content'])
if not urls:
logger.debug(f"can not find any url in\n{_input['content']}")
return
articles = []
for url in urls:
parsed_url = urlparse(url)
domain = parsed_url.netloc
if domain in scraper_map:
result = scraper_map[domain](url, expiration_date.date(), existing_urls, logger)
else:
result = await general_scraper(url, expiration_date.date(), existing_urls, logger)
articles.extend(result)
elif _input['type'] == 'text':
urls = extract_urls(_input['content'])
if not urls:
logger.debug(f"can not find any url in\n{_input['content']}\npass...")
return
articles = await get_articles(urls, expiration_date)
elif _input['type'] == 'url':
# this is remained for wechat shared mp_article_card
# todo will do it in project awada (need finish the generalMsg api first)
articles = []
else:
return
for article in articles:
logger.debug(f"article: {article['title']}")
insights = get_info(f"title: {article['title']}\n\ncontent: {article['content']}")
article_id = pb.add(collection_name='articles', body=article)
if not article_id: if not article_id:
await asyncio.sleep(1)
# do again # do again
article_id = pb.add(collection_name='articles', body=article) article_id = pb.add(collection_name='articles', body=result)
if not article_id: if not article_id:
logger.error('add article failed, writing to cache_file') logger.error('add article failed, writing to cache_file')
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f: with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
json.dump(article, f, ensure_ascii=False, indent=4) json.dump(result, f, ensure_ascii=False, indent=4)
continue continue
if not insights: if not insights:
continue continue
existing_urls.append(url)
# post process
article_tags = set() article_tags = set()
old_insights = pb.read(collection_name='insights', filter=f"updated>'{expiration_date}'", fields=['id', 'tag', 'content', 'articles']) old_insights = pb.read(collection_name='insights', filter=f"updated>'{expiration_date}'", fields=['id', 'tag', 'content', 'articles'])
for insight in insights: for insight in insights:
@ -171,9 +109,51 @@ async def pipeline(_input: dict):
_ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)}) _ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)})
if not _: if not _:
# do again # do again
await asyncio.sleep(1)
_ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)}) _ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)})
if not _: if not _:
logger.error(f'update article failed - article_id: {article_id}') logger.error(f'update article failed - article_id: {article_id}')
article['tag'] = list(article_tags) result['tag'] = list(article_tags)
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f: with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
json.dump(article, f, ensure_ascii=False, indent=4) json.dump(result, f, ensure_ascii=False, indent=4)
async def message_manager(_input: dict):
source = _input['user_id'].split('@')[-1]
logger.debug(f"received new task, user: {source}, Addition info: {_input['addition']}")
if _input['type'] == 'publicMsg':
items = item_pattern.findall(_input["content"])
# Iterate through all < item > content, extracting < url > and < summary >
for item in items:
url_match = url_pattern.search(item)
url = url_match.group(1) if url_match else None
if not url:
logger.warning(f"can not find url in \n{item}")
continue
# URL processing, http is replaced by https, and the part after chksm is removed.
url = url.replace('http://', 'https://')
cut_off_point = url.find('chksm=')
if cut_off_point != -1:
url = url[:cut_off_point-1]
if url in existing_urls:
logger.debug(f"{url} has been crawled, skip")
continue
summary_match = summary_pattern.search(item)
summary = summary_match.group(1) if summary_match else None
cache = {'source': source, 'abstract': summary}
await pipeline(url, cache)
elif _input['type'] == 'text':
urls = extract_urls(_input['content'])
if not urls:
logger.debug(f"can not find any url in\n{_input['content']}\npass...")
# todo get info from text process
return
await asyncio.gather(*[pipeline(url) for url in urls])
elif _input['type'] == 'url':
# this is remained for wechat shared mp_article_card
# todo will do it in project awada (need finish the generalMsg api first)
return
else:
return

View File

@ -62,7 +62,7 @@ Important guidelines to follow: 1) Adhere strictly to the original news content,
def get_info(article_content: str) -> list[dict]: def get_info(article_content: str) -> list[dict]:
# logger.debug(f'receive new article_content:\n{article_content}') # logger.debug(f'receive new article_content:\n{article_content}')
result = openai_llm([{'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': article_content}], result = openai_llm([{'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': article_content}],
model=get_info_model, logger=logger) model=get_info_model, logger=logger, temperature=0.1)
# results = pattern.findall(result) # results = pattern.findall(result)
texts = result.split('<tag>') texts = result.split('<tag>')

View File

@ -4,7 +4,6 @@ urllib
gne gne
jieba jieba
httpx httpx
chardet
pocketbase pocketbase
pydantic pydantic
uvicorn uvicorn

View File

@ -1,33 +1,56 @@
**This folder is intended for placing crawlers specific to particular sources. Note that the crawlers here should be able to parse the article list URL of the source and return a dictionary of article details.** We provide a general page parser that can intelligently retrieve article lists from sources. For each article URL, it first attempts to use `gne` for parsing, and if that fails, it will try using `llm`.
>
> # Custom Crawler Configuration This solution allows scanning and extracting information from most general news and portal sources.
>
> After writing the crawler, place the crawler program in this folder and register it in the scraper_map in `__init__.py`, similar to: **However, we strongly recommend that users develop custom parsers for specific sources tailored to their actual business scenarios for more ideal and efficient scanning.**
>
> ```python We also provide a parser specifically for WeChat public articles (mp.weixin.qq.com).
> {'www.securityaffairs.com': securityaffairs_scraper}
> ``` **If you are willing to contribute your custom source-specific parsers to this repository, we would greatly appreciate it!**
>
> Here, the key is the source URL, and the value is the function name. ## Custom Source Parser Development Specifications
>
> The crawler should be written in the form of a function with the following input and output specifications: ### Specifications
>
> Input: **Remember It should be an asynchronous function**
> - expiration: A `datetime.date` object, the crawler should only fetch articles on or after this date.
> - existings: [str], a list of URLs of articles already in the database. The crawler should ignore the URLs in this list. 1. **The parser should be able to intelligently distinguish between article list pages and article detail pages.**
> 2. **The parser's input parameters should only include `url` and `logger`:**
> Output: - `url` is the complete address of the source (type `str`).
> - [dict], a list of result dictionaries, each representing an article, formatted as follows: - `logger` is the logging object (please do not configure a separate logger for your custom source parser).
> `[{'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [Path]}, {...}, ...]` 3. **The parser's output should include `flag` and `result`, formatted as `tuple[int, Union[list, dict]]`:**
> - If the `url` is an article list page, `flag` returns `1`, and `result` returns a list of all article page URLs (`list`).
> Note: The format of `publish_time` should be `"%Y%m%d"`. If the crawler cannot fetch it, the current date can be used. - If the `url` is an article page, `flag` returns `11`, and `result` returns all article details (`dict`), in the following format:
>
> Additionally, `title` and `content` are mandatory fields. ```python
> {'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [str]}
> # Generic Page Parser ```
>
> We provide a generic page parser here, which can intelligently fetch article lists from the source. For each article URL, it will first attempt to parse using gne. If it fails, it will then attempt to parse using llm. _Note: `title` and `content` cannot be empty._
>
> Through this solution, it is possible to scan and extract information from most general news and portal sources. **Note: `publish_time` should be in the format `"%Y%m%d"` (date only, no `-`). If the scraper cannot fetch it, use the current date.**
>
> **However, we still strongly recommend that users write custom crawlers themselves or directly subscribe to our data service for more ideal and efficient scanning.** - If parsing fails, `flag` returns `0`, and `result` returns an empty dictionary `{}`.
_`pipeline` will try other parsing solutions (if any) upon receiving `flag` 0._
- If page retrieval fails (e.g., network issues), `flag` returns `-7`, and `result` returns an empty dictionary `{}`.
_`pipeline` will not attempt to parse again in the same process upon receiving `flag` -7._
### Registration
After writing your scraper, place the scraper program in this folder and register the scraper in `scraper_map` under `__init__.py`, similar to:
```python
{'domain': 'crawler def name'}
```
It is recommended to use urllib.parse to get the domain:
```python
from urllib.parse import urlparse
parsed_url = urlparse("site's url")
domain = parsed_url.netloc
```

View File

@ -1,33 +1,56 @@
**这个文件夹下可以放置对应特定信源的爬虫注意这里的爬虫应该是可以解析信源文章列表url并返回文章详情dict的** 我们提供了一个通用页面解析器,该解析器可以智能获取信源文章列表。对于每个文章 URL会先尝试使用 `gne` 进行解析,如果失败,再尝试使用 `llm` 进行解析。
# 专有爬虫配置
写好爬虫后将爬虫程序放在这个文件夹并在__init__.py下的scraper_map中注册爬虫类似
```python
{'www.securityaffairs.com': securityaffairs_scraper}
```
其中key就是信源地址value是函数名
爬虫应该写为函数形式,出入参约定为:
输入:
- expiration datetime的date.date()对象,爬虫应该只抓取这之后(含这一天)的文章
- existings[str], 数据库已有文章的url列表爬虫应该忽略这个列表里面的url
输出:
- [dict]返回结果列表每个dict代表一个文章格式如下
`[{'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [Path]}, {...}, ...]`
注意publish_time格式为`"%Y%m%d"` 如果爬虫抓不到可以用当天日期
另外title和content是必须要有的
# 通用页面解析器
我们这里提供了一个通用页面解析器该解析器可以智能获取信源文章列表接下来对于每一个文章url会先尝试使用 gne 进行解析如果失败的话再尝试使用llm进行解析。
通过这个方案,可以实现对大多数普通新闻类、门户类信源的扫描和信息提取。 通过这个方案,可以实现对大多数普通新闻类、门户类信源的扫描和信息提取。
**然而我们依然强烈建议用户自行写专有爬虫或者直接订阅我们的数据服务,以实现更加理想且更加高效的扫描。** **然而,我们依然强烈建议用户根据实际业务场景编写针对特定信源的专有解析器,以实现更理想且高效的扫描。**
此外我们提供了一个专门针对微信公众号文章mp.weixin.qq.com的解析器。
**如果您愿意将您撰写的特定信源专有解析器贡献至本代码仓库,我们将不胜感激!**
## 专有信源解析器开发规范
### 规范
**记住:这应该是一个异步函数**
1. **解析器应能智能区分文章列表页面和文章详情页面。**
2. **解析器入参只包括 `url``logger` 两项:**
- `url` 是信源完整地址(`str` 类型)
- `logger` 是日志对象(请勿为您的专有信源解析器单独配置 `logger`
3. **解析器出参包括 `flag``result` 两项,格式为 `tuple[int, Union[list, dict]]`**
- 如果 `url` 是文章列表页面,`flag` 返回 `1``result` 返回解析出的全部文章页面 URL 列表(`list`)。
- 如果 `url` 是文章页面,`flag` 返回 `11``result` 返回解析出的全部文章详情(`dict`),格式如下:
```python
{'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [str]}
```
_注意`title` 和 `content` 两项不能为空。_
**注意:`publish_time` 格式为 `"%Y%m%d"`(仅日期,没有 `-`),如果爬虫抓不到可以用当天日期。**
- 如果解析失败,`flag` 返回 `0``result` 返回空字典 `{}`
_`pipeline` 收到 `flag` 0 会尝试其他解析方案如有。_
- 如果页面获取失败(如网络问题),`flag` 返回 `-7``result` 返回空字典 `{}`
_`pipeline` 收到 `flag` -7 同一进程内不会再次尝试解析。_
### 注册
写好爬虫后,将爬虫程序放在该文件夹,并在 `__init__.py` 下的 `scraper_map` 中注册爬虫,类似:
```python
{'domain': 'crawler def name'}
```
建议使用 urllib.parse 获取 domain
```python
from urllib.parse import urlparse
parsed_url = urlparse("site's url")
domain = parsed_url.netloc
```

View File

@ -1,33 +1,56 @@
**In diesem Ordner können Crawlers für spezifische Quellen abgelegt werden. Beachten Sie, dass die Crawlers hier in der Lage sein sollten, die URL der Artikelliste der Quelle zu analysieren und ein Wörterbuch mit Artikeldetails zurückzugeben.** Wir bieten einen allgemeinen Seitenparser an, der intelligent Artikellisten von Quellen abrufen kann. Für jede Artikel-URL wird zuerst versucht, `gne` zur Analyse zu verwenden. Falls dies fehlschlägt, wird `llm` als Alternative genutzt.
>
> # Konfiguration des benutzerdefinierten Crawlers Diese Lösung ermöglicht das Scannen und Extrahieren von Informationen aus den meisten allgemeinen Nachrichtenquellen und Portalen.
>
> Nachdem Sie den Crawler geschrieben haben, platzieren Sie das Crawler-Programm in diesem Ordner und registrieren Sie es in scraper_map in `__init__.py`, ähnlich wie: **Wir empfehlen jedoch dringend, benutzerdefinierte Parser für spezifische Quellen zu entwickeln, die auf Ihre tatsächlichen Geschäftsszenarien abgestimmt sind, um eine idealere und effizientere Erfassung zu erreichen.**
>
> ```python Wir stellen auch einen speziellen Parser für WeChat-Artikel (mp.weixin.qq.com) bereit.
> {'www.securityaffairs.com': securityaffairs_scraper}
> ``` **Falls Sie bereit sind, Ihre speziell entwickelten Parser für bestimmte Quellen zu diesem Code-Repository beizutragen, wären wir Ihnen sehr dankbar!**
>
> Hier ist der Schlüssel die URL der Quelle und der Wert der Funktionsname. ## Entwicklungsspezifikationen für benutzerdefinierte Quellparser
>
> Der Crawler sollte in Form einer Funktion geschrieben werden, mit den folgenden Eingabe- und Ausgabeparametern: ### Spezifikationen
>
> Eingabe: **Denken Sie daran: Es sollte eine asynchrone Funktion sein**
> - expiration: Ein `datetime.date` Objekt, der Crawler sollte nur Artikel ab diesem Datum (einschließlich) abrufen.
> - existings: [str], eine Liste von URLs von Artikeln, die bereits in der Datenbank vorhanden sind. Der Crawler sollte die URLs in dieser Liste ignorieren. 1. **Der Parser sollte in der Lage sein, intelligent zwischen Artikel-Listen-Seiten und Artikel-Detailseiten zu unterscheiden.**
> 2. **Die Eingabeparameter des Parsers sollten nur `url` und `logger` umfassen:**
> Ausgabe: - `url` ist die vollständige Adresse der Quelle (Typ `str`).
> - [dict], eine Liste von Ergebnis-Wörterbüchern, wobei jedes Wörterbuch einen Artikel darstellt, formatiert wie folgt: - `logger` ist das Protokollierungsobjekt (bitte konfigurieren Sie keinen separaten Logger für Ihren benutzerdefinierten Quellparser).
> `[{'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [Path]}, {...}, ...]` 3. **Die Ausgabe des Parsers sollte `flag` und `result` umfassen, im Format `tuple[int, Union[list, dict]]`:**
> - Wenn die `url` eine Artikellisten-Seite ist, gibt `flag` `1` zurück, und `result` gibt eine Liste aller Artikel-URLs (`list`) zurück.
> Hinweis: Das Format von `publish_time` sollte `"%Y%m%d"` sein. Wenn der Crawler es nicht abrufen kann, kann das aktuelle Datum verwendet werden. - Wenn die `url` eine Artikelseite ist, gibt `flag` `11` zurück, und `result` gibt alle Artikeldetails (`dict`) zurück, im folgenden Format:
>
> Darüber hinaus sind `title` und `content` Pflichtfelder. ```python
> {'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [str]}
> # Generischer Seitenparser ```
>
> Wir bieten hier einen generischen Seitenparser an, der intelligent Artikellisten von der Quelle abrufen kann. Für jede Artikel-URL wird zunächst versucht, mit gne zu parsen. Scheitert dies, wird versucht, mit llm zu parsen. _Hinweis: `title` und `content` dürfen nicht leer sein._
>
> Durch diese Lösung ist es möglich, die meisten allgemeinen Nachrichtenquellen und Portale zu scannen und Informationen zu extrahieren. **Hinweis: Das `publish_time`-Format muss `"%Y%m%d"` (nur Datum, ohne `-`) sein. Wenn der Scraper es nicht erfassen kann, verwenden Sie das aktuelle Datum.**
>
> **Wir empfehlen jedoch dringend, dass Benutzer eigene benutzerdefinierte Crawlers schreiben oder direkt unseren Datenservice abonnieren, um eine idealere und effizientere Erfassung zu erreichen.** - Wenn die Analyse fehlschlägt, gibt `flag` `0` zurück, und `result` gibt ein leeres Wörterbuch `{}` zurück.
_Der `pipeline` versucht andere Analysemethoden (falls vorhanden), wenn `flag` 0 zurückgegeben wird._
- Wenn das Abrufen der Seite fehlschlägt (z. B. aufgrund von Netzwerkproblemen), gibt `flag` `-7` zurück, und `result` gibt ein leeres Wörterbuch `{}` zurück.
_Der `pipeline` wird im gleichen Prozess keine weiteren Versuche zur Analyse unternehmen, wenn `flag` -7 zurückgegeben wird._
### Registrierung
Nach dem Schreiben Ihres Scrapers platzieren Sie das Scraper-Programm in diesem Ordner und registrieren den Scraper in `scraper_map` in `__init__.py`, wie folgt:
```python
{'domain': 'Crawler-Funktionsname'}
```
Es wird empfohlen, urllib.parse zur Ermittlung der domain zu verwenden:
```python
from urllib.parse import urlparse
parsed_url = urlparse("l'URL du site")
domain = parsed_url.netloc
```

View File

@ -1,33 +1,56 @@
**Ce dossier est destiné à accueillir des crawlers spécifiques à des sources particulières. Notez que les crawlers ici doivent être capables de parser l'URL de la liste des articles de la source et de retourner un dictionnaire de détails des articles.** Nous proposons un analyseur de pages général capable de récupérer intelligemment les listes d'articles de sources d'information. Pour chaque URL d'article, il tente d'abord d'utiliser `gne` pour l'analyse, et en cas d'échec, il essaie d'utiliser `llm`.
>
> # Configuration du Crawler Personnalisé Cette solution permet de scanner et d'extraire des informations de la plupart des sources de nouvelles générales et des portails d'information.
>
> Après avoir écrit le crawler, placez le programme du crawler dans ce dossier et enregistrez-le dans scraper_map dans `__init__.py`, comme suit : **Cependant, nous recommandons vivement aux utilisateurs de développer des analyseurs personnalisés pour des sources spécifiques en fonction de leurs scénarios d'affaires réels afin d'obtenir une analyse plus idéale et plus efficace.**
>
> ```python Nous fournissons également un analyseur spécialement conçu pour les articles publics WeChat (mp.weixin.qq.com).
> {'www.securityaffairs.com': securityaffairs_scraper}
> ``` **Si vous êtes disposé à contribuer vos analyseurs spécifiques à certaines sources à ce dépôt de code, nous vous en serions très reconnaissants !**
>
> Ici, la clé est l'URL de la source, et la valeur est le nom de la fonction. ## Spécifications pour le Développement d'Analyseurs Spécifiques
>
> Le crawler doit être écrit sous forme de fonction avec les spécifications suivantes pour les entrées et sorties : ### Spécifications
>
> Entrée : **N'oubliez pas : il devrait s'agir d'une fonction asynchrone**
> - expiration : Un objet `datetime.date`, le crawler ne doit récupérer que les articles à partir de cette date (incluse).
> - existings : [str], une liste d'URLs d'articles déjà présents dans la base de données. Le crawler doit ignorer les URLs de cette liste. 1. **L'analyseur doit être capable de distinguer intelligemment entre les pages de liste d'articles et les pages de détail des articles.**
> 2. **Les paramètres d'entrée de l'analyseur doivent uniquement inclure `url` et `logger` :**
> Sortie : - `url` est l'adresse complète de la source (type `str`).
> - [dict], une liste de dictionnaires de résultats, chaque dictionnaire représentant un article, formaté comme suit : - `logger` est l'objet de journalisation (ne configurez pas de logger séparé pour votre analyseur spécifique).
> `[{'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [Path]}, {...}, ...]` 3. **Les paramètres de sortie de l'analyseur doivent inclure `flag` et `result`, formatés comme `tuple[int, Union[list, dict]]` :**
> - Si l'URL est une page de liste d'articles, `flag` renvoie `1` et `result` renvoie la liste de toutes les URL des pages d'articles (`list`).
> Remarque : Le format de `publish_time` doit être `"%Y%m%d"`. Si le crawler ne peut pas le récupérer, la date du jour peut être utilisée. - Si l'URL est une page d'article, `flag` renvoie `11` et `result` renvoie tous les détails de l'article (`dict`), au format suivant :
>
> De plus, `title` et `content` sont des champs obligatoires. ```python
> {'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [str]}
> # Analyseur de Page Générique ```
>
> Nous fournissons ici un analyseur de page générique, qui peut récupérer intelligemment les listes d'articles de la source. Pour chaque URL d'article, il tentera d'abord de parser avec gne. En cas d'échec, il tentera de parser avec llm. _Remarque : `title` et `content` ne peuvent pas être vides._
>
> Grâce à cette solution, il est possible de scanner et d'extraire des informations à partir de la plupart des sources de type actualités générales et portails. **Remarque : `publish_time` doit être au format `"%Y%m%d"` (date uniquement, sans `-`). Si le scraper ne peut pas le récupérer, utilisez la date du jour.**
>
> **Cependant, nous recommandons vivement aux utilisateurs de rédiger eux-mêmes des crawlers personnalisés ou de s'abonner directement à notre service de données pour un scan plus idéal et plus efficace.** - En cas d'échec de l'analyse, `flag` renvoie `0` et `result` renvoie un dictionnaire vide `{}`.
_Le `pipeline` essaiera d'autres solutions d'analyse (si disponibles) après avoir reçu `flag` 0._
- En cas d'échec de la récupération de la page (par exemple, problème réseau), `flag` renvoie `-7` et `result` renvoie un dictionnaire vide `{}`.
_Le `pipeline` n'essaiera pas de réanalyser dans le même processus après avoir reçu `flag` -7._
### Enregistrement
Après avoir écrit votre scraper, placez le programme du scraper dans ce dossier et enregistrez le scraper dans `scraper_map` sous `__init__.py`, de manière similaire :
```python
{'domain': 'nom de la fonction de crawler'}
```
Il est recommandé d'utiliser urllib.parse pour obtenir le domain :
```python
from urllib.parse import urlparse
parsed_url = urlparse("l'URL du site")
domain = parsed_url.netloc
```

View File

@ -1,33 +1,56 @@
**このフォルダには特定のソースに対応したクローラーを配置できます。ここでのクローラーはソースの記事リストURLを解析し、記事の詳細情報を辞書形式で返す必要があります。** 汎用ページパーサーを提供しており、このパーサーは信頼できるソースから記事リストをインテリジェントに取得します。各記事URLに対して、まず `gne` を使用して解析を試み、失敗した場合は `llm` を使用して解析します。
>
> # カスタムクローラーの設定 このソリューションにより、ほとんどの一般的なニュースサイトやポータルサイトからの情報をスキャンして抽出することができます。
>
> クローラーを作成した後、そのプログラムをこのフォルダに配置し、`__init__.py` の scraper_map に次のように登録します: **しかし、より理想的かつ効率的なスキャンを実現するために、ユーザー自身のビジネスシナリオに応じた特定のソース専用のパーサーを開発することを強くお勧めします。**
>
> ```python また、WeChat 公共アカウントの記事mp.weixin.qq.comに特化したパーサーも提供しています。
> {'www.securityaffairs.com': securityaffairs_scraper}
> ``` **特定のソース専用に開発したパーサーをこのリポジトリに貢献していただける場合は、大変感謝いたします!**
>
> ここで、キーはソースのURLで、値は関数名です。 ## 特定ソースパーサー開発規範
>
> クローラーは関数形式で記述し、以下の入力および出力仕様を満たす必要があります: ### 規範
>
> 入力: **覚えておいてください:それは非同期関数でなければなりません**
> - expiration `datetime.date` オブジェクト、クローラーはこの日付以降(この日を含む)の記事のみを取得する必要があります。
> - existings[str]、データベースに既存する記事のURLリスト、クローラーはこのリスト内のURLを無視する必要があります。 1. **パーサーは、記事リストページと記事詳細ページをインテリジェントに区別できる必要があります。**
> 2. **パーサーの入力パラメーターは `url``logger` のみを含むべきです:**
> 出力: - `url` はソースの完全なアドレス(`str` タイプ)
> - [dict]、結果の辞書リスト、各辞書は以下の形式で1つの記事を表します - `logger` はロギングオブジェクト(専用のロガーを構成しないでください)
> `[{'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [Path]}, {...}, ...]` 3. **パーサーの出力は `flag``result` を含み、形式は `tuple[int, Union[list, dict]]`**
> - `url` が記事リストページの場合、`flag` は `1` を返し、`result` はすべての記事ページURLのリスト`list`)を返します。
> 注意:`publish_time`の形式は`"%Y%m%d"`である必要があります。クローラーで取得できない場合は、当日の日付を使用できます。 - `url` が記事ページの場合、`flag` は `11` を返し、`result` はすべての記事詳細(`dict`)を返します。形式は以下の通りです:
>
> さらに、`title`と`content`は必須フィールドです。 ```python
> {'url': str, 'title': str, 'author': str, 'publish_time': str, 'content': str, 'abstract': str, 'images': [str]}
> # 一般ページパーサー ```
>
> ここでは一般的なページパーサーを提供しており、ソースから記事リストをインテリジェントに取得できます。各記事URLに対して、最初に gne を使用して解析を試みます。失敗した場合は、llm を使用して解析を試みます。 _注意`title` と `content` は空であってはなりません。_
>
> このソリューションにより、ほとんどの一般的なニュースおよびポータルソースのスキャンと情報抽出が可能になります。 **注意:`publish_time` の形式は `"%Y%m%d"`(日付のみ、`-` はなし)である必要があります。スクレイパーが取得できない場合は、当日の日付を使用してください。**
>
> **しかし、より理想的かつ効率的なスキャンを実現するために、ユーザー自身でカスタムクローラーを作成するか、直接弊社のデータサービスを購読することを強くお勧めします。** - 解析に失敗した場合、`flag` は `0` を返し、`result` は空の辞書 `{}` を返します。
_`pipeline` は `flag` 0 を受け取ると他の解析ソリューション存在する場合を試みます。_
- ページの取得に失敗した場合(例えば、ネットワークの問題)、`flag` は `-7` を返し、`result` は空の辞書 `{}` を返します。
_`pipeline` は `flag` -7 を受け取ると、同一プロセス内では再解析を試みません。_
### 登録
スクレイパーを作成したら、このフォルダにプログラムを配置し、`__init__.py` の `scraper_map` にスクレイパーを次のように登録してください:
```python
{'domain': 'スクレイパー関数名'}
```
domain の取得には urllib.parse を使用することをお勧めします:
```python
from urllib.parse import urlparse
parsed_url = urlparse("l'URL du site")
domain = parsed_url.netloc
```

View File

@ -1,6 +1,4 @@
from .mp_crawler import mp_crawler from .mp_crawler import mp_crawler
from .general_crawler import general_crawler
from .general_scraper import general_scraper
scraper_map = {} scraper_map = {'mp.weixin.qq.com': mp_crawler}

View File

@ -2,6 +2,8 @@
# when you use this general crawler, remember followings # when you use this general crawler, remember followings
# When you receive flag -7, it means that the problem occurs in the HTML fetch process. # When you receive flag -7, it means that the problem occurs in the HTML fetch process.
# When you receive flag 0, it means that the problem occurred during the content parsing process. # When you receive flag 0, it means that the problem occurred during the content parsing process.
# when you receive flag 1, the result would be a list, means that the input url is possible a article_list page and the list contains the url of the articles.
# when you receive flag 11, you will get the dict contains the title, content, url, date, and the source of the article.
from gne import GeneralNewsExtractor from gne import GeneralNewsExtractor
import httpx import httpx
@ -11,11 +13,13 @@ from urllib.parse import urlparse
from llms.openai_wrapper import openai_llm from llms.openai_wrapper import openai_llm
# from llms.siliconflow_wrapper import sfa_llm # from llms.siliconflow_wrapper import sfa_llm
from bs4.element import Comment from bs4.element import Comment
import chardet
from utils.general_utils import extract_and_convert_dates from utils.general_utils import extract_and_convert_dates
import asyncio import asyncio
import json_repair import json_repair
import os import os
from typing import Union
from requests.compat import urljoin
from scrapers import scraper_map
model = os.environ.get('HTML_PARSE_MODEL', 'gpt-3.5-turbo') model = os.environ.get('HTML_PARSE_MODEL', 'gpt-3.5-turbo')
@ -42,30 +46,44 @@ def text_from_soup(soup: BeautifulSoup) -> str:
return text.strip() return text.strip()
sys_info = '''Your role is to function as an HTML parser, tasked with analyzing a segment of HTML code. Extract the following metadata from the given HTML snippet: the document's title, summary or abstract, main content, and the publication date. Ensure that your response adheres to the JSON format outlined below, encapsulating the extracted information accurately: sys_info = '''Your task is to operate as an HTML content extractor, focusing on parsing a provided HTML segment. Your objective is to retrieve the following details directly from the raw text within the HTML, without summarizing or altering the content:
- The document's title
- The complete main content, as it appears in the HTML, comprising all textual elements considered part of the core article body
- The publication time in its original format found within the HTML
Ensure your response fits the following JSON structure, accurately reflecting the extracted data without modification:
```json ```json
{ {
"title": "The Document's Title", "title": "The Document's Exact Title",
"abstract": "A concise overview or summary of the content", "content": "All the unaltered primary text content from the article",
"content": "The primary textual content of the article", "publish_time": "Original Publication Time as per HTML"
"publish_date": "The publication date in YYYY-MM-DD format"
} }
``` ```
Please structure your output precisely as demonstrated, with each field populated correspondingly to the details found within the HTML code. It is essential that your output adheres strictly to this format, with each field filled based on the untouched information extracted directly from the HTML source.'''
'''
async def general_crawler(url: str, logger) -> (int, dict): async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]:
""" """
Return article information dict and flag, negative number is error, 0 is no result, 11 is success Return article information dict and flag, negative number is error, 0 is no result, 1 is for article_list page, 11 is success
main work flow: main work flow:
(for weixin public account artilces, which startswith mp.weixin.qq use mp_crawler)
first get the content with httpx first get the content with httpx
then judge is article list (return all article url and flag 1) or article detail page
then try to use gne to extract the information then try to use gne to extract the information
when fail, try to use a llm to analysis the html when fail, try to use a llm to analysis the html
""" """
# 0. if there's a scraper for this domain, use it (such as mp.weixin.qq.com)
parsed_url = urlparse(url)
domain = parsed_url.netloc
if domain in scraper_map:
return await scraper_map[domain](url, logger)
# 1. get the content with httpx
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
for retry in range(2): for retry in range(2):
try: try:
@ -74,37 +92,58 @@ async def general_crawler(url: str, logger) -> (int, dict):
break break
except Exception as e: except Exception as e:
if retry < 1: if retry < 1:
logger.info(f"request {url} got error {e}\nwaiting 1min") logger.info(f"can not reach\n{e}\nwaiting 1min")
await asyncio.sleep(60) await asyncio.sleep(60)
else: else:
logger.warning(f"request {url} got error {e}") logger.error(e)
return -7, {} return -7, {}
rawdata = response.content # 2. judge is article list (return all article url and flag 1) or article detail page
encoding = chardet.detect(rawdata)['encoding'] page_source = response.text
text = rawdata.decode(encoding, errors='replace') if page_source:
soup = BeautifulSoup(text, "html.parser") text = page_source
else:
try:
text = response.content.decode('utf-8')
except UnicodeDecodeError:
try:
text = response.content.decode('gbk')
except Exception as e:
logger.error(f"can not decode html {e}")
return -7, {}
soup = BeautifulSoup(text, "html.parser")
# Parse all URLs
base_url = f"{parsed_url.scheme}://{domain}"
urls = set()
for link in soup.find_all("a", href=True):
absolute_url = urljoin(base_url, link["href"])
if urlparse(absolute_url).netloc == domain and absolute_url != url:
urls.add(absolute_url)
if len(urls) > 21:
logger.info(f"{url} is more like an article list page, find {len(urls)} urls with the same netloc")
return 1, list(urls)
# 3. try to use gne to extract the information
try: try:
result = extractor.extract(text) result = extractor.extract(text)
if result['title'].startswith('服务器错误') or result['title'].startswith('您访问的页面') or result[
'title'].startswith('403') \
or result['content'].startswith('This website uses cookies') or result['title'].startswith('出错了'):
logger.warning(f"can not get {url} from the Internet")
return -7, {}
if len(result['title']) < 4 or len(result['content']) < 24:
logger.info(f"gne extract not good: {result}")
result = None
except Exception as e: except Exception as e:
logger.info(f"gne extract error: {e}") logger.info(f"gne extract error: {e}")
result = None result = None
if result['title'].startswith('服务器错误') or result['title'].startswith('您访问的页面') or result[ # 4. try to use a llm to analysis the html
'title'].startswith('403') \ if not result:
or result['content'].startswith('This website uses cookies') or result['title'].startswith('出错了'):
logger.warning(f"can not get {url} from the Internet")
return -7, {}
if len(result['title']) < 4 or len(result['content']) < 24:
logger.info(f"gne extract not good: {result}")
result = None
if result:
info = result
abstract = ''
else:
html_text = text_from_soup(soup) html_text = text_from_soup(soup)
html_lines = html_text.split('\n') html_lines = html_text.split('\n')
html_lines = [line.strip() for line in html_lines if line.strip()] html_lines = [line.strip() for line in html_lines if line.strip()]
@ -123,65 +162,54 @@ async def general_crawler(url: str, logger) -> (int, dict):
{"role": "system", "content": sys_info}, {"role": "system", "content": sys_info},
{"role": "user", "content": html_text} {"role": "user", "content": html_text}
] ]
llm_output = openai_llm(messages, model=model, logger=logger) llm_output = openai_llm(messages, model=model, logger=logger, temperature=0.01)
decoded_object = json_repair.repair_json(llm_output, return_objects=True) result = json_repair.repair_json(llm_output, return_objects=True)
logger.debug(f"decoded_object: {decoded_object}") logger.debug(f"decoded_object: {result}")
if not isinstance(decoded_object, dict): if not isinstance(result, dict):
logger.debug("failed to parse from llm output") logger.debug("failed to parse from llm output")
return 0, {} return 0, {}
if 'title' not in decoded_object or 'content' not in decoded_object: if 'title' not in result or 'content' not in result:
logger.debug("llm parsed result not good") logger.debug("llm parsed result not good")
return 0, {} return 0, {}
info = {'title': decoded_object['title'], 'content': decoded_object['content']}
abstract = decoded_object.get('abstract', '')
info['publish_time'] = decoded_object.get('publish_date', '')
# Extract the picture link, it will be empty if it cannot be extracted. # Extract the picture link, it will be empty if it cannot be extracted.
image_links = [] image_links = []
images = soup.find_all("img") images = soup.find_all("img")
for img in images: for img in images:
try: try:
image_links.append(img["src"]) image_links.append(img["src"])
except KeyError: except KeyError:
continue continue
info["images"] = image_links result["images"] = image_links
# Extract the author information, if it cannot be extracted, it will be empty. # Extract the author information, if it cannot be extracted, it will be empty.
author_element = soup.find("meta", {"name": "author"}) author_element = soup.find("meta", {"name": "author"})
if author_element: if author_element:
info["author"] = author_element["content"] result["author"] = author_element["content"]
else: else:
info["author"] = "" result["author"] = ""
date_str = extract_and_convert_dates(info['publish_time']) # 5. post process
date_str = extract_and_convert_dates(result['publish_time'])
if date_str: if date_str:
info['publish_time'] = date_str result['publish_time'] = date_str
else: else:
info['publish_time'] = datetime.strftime(datetime.today(), "%Y%m%d") result['publish_time'] = datetime.strftime(datetime.today(), "%Y%m%d")
from_site = urlparse(url).netloc from_site = domain.replace('www.', '')
from_site = from_site.replace('www.', '')
from_site = from_site.split('.')[0] from_site = from_site.split('.')[0]
info['content'] = f"[from {from_site}] {info['content']}" result['content'] = f"[from {from_site}] {result['content']}"
try: try:
meta_description = soup.find("meta", {"name": "description"}) meta_description = soup.find("meta", {"name": "description"})
if meta_description: if meta_description:
info['abstract'] = f"[from {from_site}] {meta_description['content'].strip()}" result['abstract'] = f"[from {from_site}] {meta_description['content'].strip()}"
else: else:
if abstract: result['abstract'] = ''
info['abstract'] = f"[from {from_site}] {abstract.strip()}"
else:
info['abstract'] = ''
except Exception: except Exception:
if abstract: result['abstract'] = ''
info['abstract'] = f"[from {from_site}] {abstract.strip()}"
else:
info['abstract'] = ''
info['url'] = url result['url'] = url
return 11, info return 11, result

View File

@ -1,87 +0,0 @@
# -*- coding: utf-8 -*-
from urllib.parse import urlparse
from .general_crawler import general_crawler
from .mp_crawler import mp_crawler
import httpx
from bs4 import BeautifulSoup
import asyncio
from requests.compat import urljoin
from datetime import datetime, date
header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}
async def general_scraper(site: str, expiration: date, existing: list[str], logger) -> list[dict]:
logger.debug(f"start processing {site}")
async with httpx.AsyncClient() as client:
for retry in range(2):
try:
response = await client.get(site, headers=header, timeout=30)
response.raise_for_status()
break
except Exception as e:
if retry < 1:
logger.info(f"request {site} got error {e}\nwaiting 1min")
await asyncio.sleep(60)
else:
logger.warning(f"request {site} got error {e}")
return []
page_source = response.text
soup = BeautifulSoup(page_source, "html.parser")
# Parse all URLs
parsed_url = urlparse(site)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
urls = set()
for link in soup.find_all("a", href=True):
absolute_url = urljoin(base_url, link["href"])
if urlparse(absolute_url).netloc == parsed_url.netloc and absolute_url != site:
urls.add(absolute_url)
if not urls:
# maybe it's an article site
logger.info(f"can not find any link from {site}, maybe it's an article site...")
if site in existing:
logger.debug(f"{site} has been crawled before, skip it")
return []
if site.startswith('https://mp.weixin.qq.com') or site.startswith('http://mp.weixin.qq.com'):
flag, result = await mp_crawler(site, logger)
else:
flag, result = await general_crawler(site, logger)
if flag != 11:
return []
publish_date = datetime.strptime(result['publish_time'], '%Y%m%d')
if publish_date.date() < expiration:
logger.debug(f"{site} is too old, skip it")
return []
else:
return [result]
articles = []
for url in urls:
logger.debug(f"start scraping {url}")
if url in existing:
logger.debug(f"{url} has been crawled before, skip it")
continue
existing.append(url)
if url.startswith('https://mp.weixin.qq.com') or url.startswith('http://mp.weixin.qq.com'):
flag, result = await mp_crawler(url, logger)
else:
flag, result = await general_crawler(url, logger)
if flag != 11:
continue
publish_date = datetime.strptime(result['publish_time'], '%Y%m%d')
if publish_date.date() < expiration:
logger.debug(f"{url} is too old, skip it")
else:
articles.append(result)
return articles

View File

@ -26,10 +26,10 @@ async def mp_crawler(url: str, logger) -> (int, dict):
break break
except Exception as e: except Exception as e:
if retry < 1: if retry < 1:
logger.info(f"request {url} got error {e}\nwaiting 1min") logger.info(f"{e}\nwaiting 1min")
await asyncio.sleep(60) await asyncio.sleep(60)
else: else:
logger.warning(f"request {url} got error {e}") logger.warning(e)
return -7, {} return -7, {}
soup = BeautifulSoup(response.text, 'html.parser') soup = BeautifulSoup(response.text, 'html.parser')

View File

@ -9,13 +9,7 @@ async def process_site(site, counter):
return return
if counter % site['per_hours'] == 0: if counter % site['per_hours'] == 0:
logger.info(f"applying {site['url']}") logger.info(f"applying {site['url']}")
request_input = { await pipeline(site['url'])
"user_id": "schedule_tasks",
"type": "site",
"content": site['url'],
"addition": f"task execute loop {counter + 1}"
}
await pipeline(request_input)
async def schedule_pipeline(interval): async def schedule_pipeline(interval):