mirror of
https://github.com/TeamWiseFlow/wiseflow.git
synced 2025-02-02 18:28:46 +08:00
fix url-repeat and some img path miss base-url
This commit is contained in:
parent
c20c4a0a27
commit
4a2ace0e25
@ -7,7 +7,7 @@ services:
|
||||
# environment:
|
||||
# - TZ=Asia/Shanghai
|
||||
# - LANG=zh_CN.UTF-8
|
||||
# - LC_CTYPE=zh_CN.UTF-8
|
||||
# - LC_ALL=zh_CN.UTF-8
|
||||
tty: true
|
||||
stdin_open: true
|
||||
entrypoint: bash docker_entrypoint.sh
|
||||
|
@ -8,6 +8,7 @@ import json
|
||||
from datetime import datetime, timedelta
|
||||
import re
|
||||
import asyncio
|
||||
from typing import Dict
|
||||
|
||||
|
||||
# The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg
|
||||
@ -16,28 +17,25 @@ url_pattern = re.compile(r'<url><!\[CDATA\[(.*?)]]></url>')
|
||||
summary_pattern = re.compile(r'<summary><!\[CDATA\[(.*?)]]></summary>', re.DOTALL)
|
||||
|
||||
expiration_days = 3
|
||||
existing_urls = [url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']]
|
||||
existing_urls = {url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']}
|
||||
|
||||
|
||||
async def pipeline(url: str, cache: dict = {}):
|
||||
url = url.rstrip('/')
|
||||
working_list = [url]
|
||||
async def pipeline(url: str, cache: Dict[str, str] = {}):
|
||||
working_list = {url}
|
||||
while working_list:
|
||||
url = working_list[0]
|
||||
working_list.pop(0)
|
||||
url = working_list.pop()
|
||||
existing_urls.add(url)
|
||||
logger.debug(f"start processing {url}")
|
||||
|
||||
# get article process
|
||||
flag, result = await general_crawler(url, logger)
|
||||
if flag == 1:
|
||||
logger.info('get new url list, add to work list')
|
||||
to_add = [u for u in result if u not in existing_urls and u not in working_list]
|
||||
existing_urls.append(url)
|
||||
working_list.extend(to_add)
|
||||
new_urls = result - existing_urls
|
||||
working_list.update(new_urls)
|
||||
continue
|
||||
elif flag <= 0:
|
||||
logger.error("got article failed, pipeline abort")
|
||||
# existing_urls.append(url)
|
||||
continue
|
||||
|
||||
expiration = datetime.now() - timedelta(days=expiration_days)
|
||||
@ -45,7 +43,6 @@ async def pipeline(url: str, cache: dict = {}):
|
||||
article_date = int(result['publish_time'])
|
||||
if article_date < int(expiration_date.replace('-', '')):
|
||||
logger.info(f"publish date is {article_date}, too old, skip")
|
||||
existing_urls.append(url)
|
||||
continue
|
||||
|
||||
for k, v in cache.items():
|
||||
@ -56,32 +53,27 @@ async def pipeline(url: str, cache: dict = {}):
|
||||
logger.debug(f"article: {result['title']}")
|
||||
article_id = pb.add(collection_name='articles', body=result)
|
||||
if not article_id:
|
||||
await asyncio.sleep(1)
|
||||
# do again
|
||||
article_id = pb.add(collection_name='articles', body=result)
|
||||
if not article_id:
|
||||
logger.error('add article failed, writing to cache_file')
|
||||
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
|
||||
json.dump(result, f, ensure_ascii=False, indent=4)
|
||||
continue
|
||||
logger.error('add article failed, writing to cache_file')
|
||||
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
|
||||
json.dump(result, f, ensure_ascii=False, indent=4)
|
||||
continue
|
||||
|
||||
insights = get_info(f"title: {result['title']}\n\ncontent: {result['content']}")
|
||||
if not insights:
|
||||
continue
|
||||
|
||||
existing_urls.append(url)
|
||||
# post process
|
||||
article_tags = set()
|
||||
old_insights = pb.read(collection_name='insights', filter=f"updated>'{expiration_date}'", fields=['id', 'tag', 'content', 'articles'])
|
||||
old_insights = pb.read(collection_name='insights', filter=f"updated>'{expiration_date}'",
|
||||
fields=['id', 'tag', 'content', 'articles'])
|
||||
for insight in insights:
|
||||
article_tags.add(insight['tag'])
|
||||
insight['articles'] = [article_id]
|
||||
old_insight_dict = {i['content']: i for i in old_insights if i['tag'] == insight['tag']}
|
||||
|
||||
# Because what you want to compare is whether the extracted information phrases are talking about the same thing,
|
||||
# the result wanted is whether the extracted information phrases are talking about the same thing,
|
||||
# it may not be suitable and too heavy to calculate the similarity with a vector model
|
||||
# Therefore, a simplified solution is used here, directly using the jieba particifier, to calculate whether the overlap between the two phrases exceeds.
|
||||
|
||||
# Therefore, a simplified solution is used here, directly using jieba to calculate whether the overlap between the two phrases exceeds.
|
||||
similar_insights = compare_phrase_with_list(insight['content'], list(old_insight_dict.keys()), 0.65)
|
||||
if similar_insights:
|
||||
to_rewrite = similar_insights + [insight['content']]
|
||||
@ -93,30 +85,21 @@ async def pipeline(url: str, cache: dict = {}):
|
||||
for old_insight in similar_insights:
|
||||
insight['articles'].extend(old_insight_dict[old_insight]['articles'])
|
||||
if not pb.delete(collection_name='insights', id=old_insight_dict[old_insight]['id']):
|
||||
# do again
|
||||
if not pb.delete(collection_name='insights', id=old_insight_dict[old_insight]['id']):
|
||||
logger.error('delete insight failed')
|
||||
logger.error('delete insight failed')
|
||||
old_insights.remove(old_insight_dict[old_insight])
|
||||
|
||||
insight['id'] = pb.add(collection_name='insights', body=insight)
|
||||
if not insight['id']:
|
||||
# do again
|
||||
insight['id'] = pb.add(collection_name='insights', body=insight)
|
||||
if not insight['id']:
|
||||
logger.error('add insight failed, writing to cache_file')
|
||||
with open(os.path.join(project_dir, 'cache_insights.json'), 'a', encoding='utf-8') as f:
|
||||
json.dump(insight, f, ensure_ascii=False, indent=4)
|
||||
logger.error('add insight failed, writing to cache_file')
|
||||
with open(os.path.join(project_dir, 'cache_insights.json'), 'a', encoding='utf-8') as f:
|
||||
json.dump(insight, f, ensure_ascii=False, indent=4)
|
||||
|
||||
_ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)})
|
||||
if not _:
|
||||
# do again
|
||||
await asyncio.sleep(1)
|
||||
_ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)})
|
||||
if not _:
|
||||
logger.error(f'update article failed - article_id: {article_id}')
|
||||
result['tag'] = list(article_tags)
|
||||
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
|
||||
json.dump(result, f, ensure_ascii=False, indent=4)
|
||||
logger.error(f'update article failed - article_id: {article_id}')
|
||||
result['tag'] = list(article_tags)
|
||||
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
|
||||
json.dump(result, f, ensure_ascii=False, indent=4)
|
||||
|
||||
|
||||
async def message_manager(_input: dict):
|
||||
@ -150,7 +133,7 @@ async def message_manager(_input: dict):
|
||||
logger.debug(f"can not find any url in\n{_input['content']}\npass...")
|
||||
# todo get info from text process
|
||||
return
|
||||
await asyncio.gather(*[pipeline(url) for url in urls])
|
||||
await asyncio.gather(*[pipeline(url) for url in urls if url not in existing_urls])
|
||||
|
||||
elif _input['type'] == 'url':
|
||||
# this is remained for wechat shared mp_article_card
|
||||
|
@ -18,8 +18,8 @@ We also provide a parser specifically for WeChat public articles (mp.weixin.qq.c
|
||||
2. **The parser's input parameters should only include `url` and `logger`:**
|
||||
- `url` is the complete address of the source (type `str`).
|
||||
- `logger` is the logging object (please do not configure a separate logger for your custom source parser).
|
||||
3. **The parser's output should include `flag` and `result`, formatted as `tuple[int, Union[list, dict]]`:**
|
||||
- If the `url` is an article list page, `flag` returns `1`, and `result` returns a list of all article page URLs (`list`).
|
||||
3. **The parser's output should include `flag` and `result`, formatted as `tuple[int, Union[set, dict]]`:**
|
||||
- If the `url` is an article list page, `flag` returns `1`, and `result` returns a tuple of all article page URLs (`set`).
|
||||
- If the `url` is an article page, `flag` returns `11`, and `result` returns all article details (`dict`), in the following format:
|
||||
|
||||
```python
|
||||
|
@ -18,8 +18,8 @@
|
||||
2. **解析器入参只包括 `url` 和 `logger` 两项:**
|
||||
- `url` 是信源完整地址(`str` 类型)
|
||||
- `logger` 是日志对象(请勿为您的专有信源解析器单独配置 `logger`)
|
||||
3. **解析器出参包括 `flag` 和 `result` 两项,格式为 `tuple[int, Union[list, dict]]`:**
|
||||
- 如果 `url` 是文章列表页面,`flag` 返回 `1`,`result` 返回解析出的全部文章页面 URL 列表(`list`)。
|
||||
3. **解析器出参包括 `flag` 和 `result` 两项,格式为 `tuple[int, Union[set, dict]]`:**
|
||||
- 如果 `url` 是文章列表页面,`flag` 返回 `1`,`result` 返回解析出的全部文章页面 URL 集合(`set`)。
|
||||
- 如果 `url` 是文章页面,`flag` 返回 `11`,`result` 返回解析出的全部文章详情(`dict`),格式如下:
|
||||
|
||||
```python
|
||||
|
@ -18,8 +18,8 @@ Wir stellen auch einen speziellen Parser für WeChat-Artikel (mp.weixin.qq.com)
|
||||
2. **Die Eingabeparameter des Parsers sollten nur `url` und `logger` umfassen:**
|
||||
- `url` ist die vollständige Adresse der Quelle (Typ `str`).
|
||||
- `logger` ist das Protokollierungsobjekt (bitte konfigurieren Sie keinen separaten Logger für Ihren benutzerdefinierten Quellparser).
|
||||
3. **Die Ausgabe des Parsers sollte `flag` und `result` umfassen, im Format `tuple[int, Union[list, dict]]`:**
|
||||
- Wenn die `url` eine Artikellisten-Seite ist, gibt `flag` `1` zurück, und `result` gibt eine Liste aller Artikel-URLs (`list`) zurück.
|
||||
3. **Die Ausgabe des Parsers sollte `flag` und `result` umfassen, im Format `tuple[int, Union[set, dict]]`:**
|
||||
- Wenn die `url` eine Artikellisten-Seite ist, gibt `flag` `1` zurück, und `result` gibt eine satz aller Artikel-URLs (`set`) zurück.
|
||||
- Wenn die `url` eine Artikelseite ist, gibt `flag` `11` zurück, und `result` gibt alle Artikeldetails (`dict`) zurück, im folgenden Format:
|
||||
|
||||
```python
|
||||
|
@ -18,8 +18,8 @@ Nous fournissons également un analyseur spécialement conçu pour les articles
|
||||
2. **Les paramètres d'entrée de l'analyseur doivent uniquement inclure `url` et `logger` :**
|
||||
- `url` est l'adresse complète de la source (type `str`).
|
||||
- `logger` est l'objet de journalisation (ne configurez pas de logger séparé pour votre analyseur spécifique).
|
||||
3. **Les paramètres de sortie de l'analyseur doivent inclure `flag` et `result`, formatés comme `tuple[int, Union[list, dict]]` :**
|
||||
- Si l'URL est une page de liste d'articles, `flag` renvoie `1` et `result` renvoie la liste de toutes les URL des pages d'articles (`list`).
|
||||
3. **Les paramètres de sortie de l'analyseur doivent inclure `flag` et `result`, formatés comme `tuple[int, Union[set, dict]]` :**
|
||||
- Si l'URL est une page de liste d'articles, `flag` renvoie `1` et `result` renvoie la set de toutes les URL des pages d'articles (`set`).
|
||||
- Si l'URL est une page d'article, `flag` renvoie `11` et `result` renvoie tous les détails de l'article (`dict`), au format suivant :
|
||||
|
||||
```python
|
||||
|
@ -18,8 +18,8 @@
|
||||
2. **パーサーの入力パラメーターは `url` と `logger` のみを含むべきです:**
|
||||
- `url` はソースの完全なアドレス(`str` タイプ)
|
||||
- `logger` はロギングオブジェクト(専用のロガーを構成しないでください)
|
||||
3. **パーサーの出力は `flag` と `result` を含み、形式は `tuple[int, Union[list, dict]]`:**
|
||||
- `url` が記事リストページの場合、`flag` は `1` を返し、`result` はすべての記事ページURLのリスト(`list`)を返します。
|
||||
3. **パーサーの出力は `flag` と `result` を含み、形式は `tuple[int, Union[set, dict]]`:**
|
||||
- `url` が記事リストページの場合、`flag` は `1` を返し、`result` はすべての記事ページURLのコレクション(`set`)を返します。
|
||||
- `url` が記事ページの場合、`flag` は `11` を返し、`result` はすべての記事詳細(`dict`)を返します。形式は以下の通りです:
|
||||
|
||||
```python
|
||||
|
@ -2,7 +2,8 @@
|
||||
# when you use this general crawler, remember followings
|
||||
# When you receive flag -7, it means that the problem occurs in the HTML fetch process.
|
||||
# When you receive flag 0, it means that the problem occurred during the content parsing process.
|
||||
# when you receive flag 1, the result would be a list, means that the input url is possible a article_list page and the list contains the url of the articles.
|
||||
# when you receive flag 1, the result would be a tuple, means that the input url is possible a article_list page
|
||||
# and the set contains the url of the articles.
|
||||
# when you receive flag 11, you will get the dict contains the title, content, url, date, and the source of the article.
|
||||
|
||||
from gne import GeneralNewsExtractor
|
||||
@ -65,12 +66,13 @@ Ensure your response fits the following JSON structure, accurately reflecting th
|
||||
It is essential that your output adheres strictly to this format, with each field filled based on the untouched information extracted directly from the HTML source.'''
|
||||
|
||||
|
||||
async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]:
|
||||
async def general_crawler(url: str, logger) -> tuple[int, Union[set, dict]]:
|
||||
"""
|
||||
Return article information dict and flag, negative number is error, 0 is no result, 1 is for article_list page, 11 is success
|
||||
Return article information dict and flag, negative number is error, 0 is no result, 1 is for article_list page,
|
||||
11 is success
|
||||
|
||||
main work flow:
|
||||
(for weixin public account artilces, which startswith mp.weixin.qq use mp_crawler)
|
||||
(for weixin public account articles, which startswith mp.weixin.qq use mp_crawler)
|
||||
first get the content with httpx
|
||||
then judge is article list (return all article url and flag 1) or article detail page
|
||||
then try to use gne to extract the information
|
||||
@ -80,6 +82,7 @@ async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]:
|
||||
# 0. if there's a scraper for this domain, use it (such as mp.weixin.qq.com)
|
||||
parsed_url = urlparse(url)
|
||||
domain = parsed_url.netloc
|
||||
base_url = f"{parsed_url.scheme}://{domain}"
|
||||
if domain in scraper_map:
|
||||
return await scraper_map[domain](url, logger)
|
||||
|
||||
@ -113,19 +116,25 @@ async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]:
|
||||
return -7, {}
|
||||
|
||||
soup = BeautifulSoup(text, "html.parser")
|
||||
# Note: The scheme used here is very crude, and it is recommended to write a separate parser for specific business scenarios
|
||||
# Note: The scheme used here is very crude,
|
||||
# it is recommended to write a separate parser for specific business scenarios
|
||||
# Parse all URLs
|
||||
if len(url) < 50:
|
||||
base_url = f"{parsed_url.scheme}://{domain}"
|
||||
urls = set()
|
||||
for link in soup.find_all("a", href=True):
|
||||
absolute_url = urljoin(base_url, link["href"]).rstrip('/')
|
||||
if urlparse(absolute_url).netloc == domain and absolute_url != url:
|
||||
absolute_url = urljoin(base_url, link["href"])
|
||||
format_url = urlparse(absolute_url)
|
||||
# only record same domain links
|
||||
if not format_url.netloc or format_url.netloc != domain:
|
||||
continue
|
||||
# remove hash fragment
|
||||
absolute_url = f"{format_url.scheme}://{format_url.netloc}{format_url.path}{format_url.params}{format_url.query}"
|
||||
if absolute_url != url:
|
||||
urls.add(absolute_url)
|
||||
|
||||
if len(urls) > 30:
|
||||
if len(urls) > 24:
|
||||
logger.info(f"{url} is more like an article list page, find {len(urls)} urls with the same netloc")
|
||||
return 1, list(urls)
|
||||
return 1, urls
|
||||
|
||||
# 3. try to use gne to extract the information
|
||||
try:
|
||||
@ -183,7 +192,7 @@ async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]:
|
||||
images = soup.find_all("img")
|
||||
for img in images:
|
||||
try:
|
||||
image_links.append(img["src"])
|
||||
image_links.append(urljoin(base_url, img["src"]))
|
||||
except KeyError:
|
||||
continue
|
||||
result["images"] = image_links
|
||||
|
@ -9,7 +9,7 @@ async def process_site(site, counter):
|
||||
return
|
||||
if counter % site['per_hours'] == 0:
|
||||
logger.info(f"applying {site['url']}")
|
||||
await pipeline(site['url'])
|
||||
await pipeline(site['url'].rstrip('/'))
|
||||
|
||||
|
||||
async def schedule_pipeline(interval):
|
||||
|
@ -5,17 +5,31 @@ import jieba
|
||||
|
||||
|
||||
def isURL(string):
|
||||
if string.startswith("www."):
|
||||
string = f"https://{string}"
|
||||
result = urlparse(string)
|
||||
return result.scheme != '' and result.netloc != ''
|
||||
|
||||
|
||||
def extract_urls(text):
|
||||
url_pattern = re.compile(r'https?://[-A-Za-z0-9+&@#/%?=~_|!:.;]+[-A-Za-z0-9+&@#/%=~_|]')
|
||||
# Regular expression to match http, https, and www URLs
|
||||
url_pattern = re.compile(r'((?:https?://|www\.)[-A-Za-z0-9+&@#/%?=~_|!:,.;]*[-A-Za-z0-9+&@#/%=~_|])')
|
||||
urls = re.findall(url_pattern, text)
|
||||
|
||||
# Filter out those cases that only match to'www. 'without subsequent content,
|
||||
# and try to add the default http protocol prefix to each URL for easy parsing
|
||||
cleaned_urls = [url for url in urls if isURL(url)]
|
||||
# urls = {quote(url.rstrip('/'), safe='/:?=&') for url in urls}
|
||||
cleaned_urls = set()
|
||||
for url in urls:
|
||||
if url.startswith("www."):
|
||||
url = f"https://{url}"
|
||||
parsed_url = urlparse(url)
|
||||
if not parsed_url.netloc:
|
||||
continue
|
||||
# remove hash fragment
|
||||
if not parsed_url.scheme:
|
||||
# just try https
|
||||
cleaned_urls.add(f"https://{parsed_url.netloc}{parsed_url.path}{parsed_url.params}{parsed_url.query}")
|
||||
else:
|
||||
cleaned_urls.add(
|
||||
f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}{parsed_url.params}{parsed_url.query}")
|
||||
return cleaned_urls
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user