From 4a2ace0e25cfb26d7abbb53b9e57c00268616e47 Mon Sep 17 00:00:00 2001 From: bigbrother666 Date: Sat, 22 Jun 2024 16:47:13 +0800 Subject: [PATCH] fix url-repeat and some img path miss base-url --- compose.yaml | 2 +- core/insights/__init__.py | 67 ++++++++++++-------------------- core/scrapers/README.md | 4 +- core/scrapers/README_CN.md | 4 +- core/scrapers/README_de.md | 4 +- core/scrapers/README_fr.md | 4 +- core/scrapers/README_jp.md | 4 +- core/scrapers/general_crawler.py | 31 +++++++++------ core/tasks.py | 2 +- core/utils/general_utils.py | 24 +++++++++--- 10 files changed, 76 insertions(+), 70 deletions(-) diff --git a/compose.yaml b/compose.yaml index a6aba06..2bb755d 100755 --- a/compose.yaml +++ b/compose.yaml @@ -7,7 +7,7 @@ services: # environment: # - TZ=Asia/Shanghai # - LANG=zh_CN.UTF-8 - # - LC_CTYPE=zh_CN.UTF-8 + # - LC_ALL=zh_CN.UTF-8 tty: true stdin_open: true entrypoint: bash docker_entrypoint.sh diff --git a/core/insights/__init__.py b/core/insights/__init__.py index d7dcc80..09f0930 100644 --- a/core/insights/__init__.py +++ b/core/insights/__init__.py @@ -8,6 +8,7 @@ import json from datetime import datetime, timedelta import re import asyncio +from typing import Dict # The XML parsing scheme is not used because there are abnormal characters in the XML code extracted from the weixin public_msg @@ -16,28 +17,25 @@ url_pattern = re.compile(r'') summary_pattern = re.compile(r'', re.DOTALL) expiration_days = 3 -existing_urls = [url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']] +existing_urls = {url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']} -async def pipeline(url: str, cache: dict = {}): - url = url.rstrip('/') - working_list = [url] +async def pipeline(url: str, cache: Dict[str, str] = {}): + working_list = {url} while working_list: - url = working_list[0] - working_list.pop(0) + url = working_list.pop() + existing_urls.add(url) logger.debug(f"start processing {url}") # get article process flag, result = await general_crawler(url, logger) if flag == 1: logger.info('get new url list, add to work list') - to_add = [u for u in result if u not in existing_urls and u not in working_list] - existing_urls.append(url) - working_list.extend(to_add) + new_urls = result - existing_urls + working_list.update(new_urls) continue elif flag <= 0: logger.error("got article failed, pipeline abort") - # existing_urls.append(url) continue expiration = datetime.now() - timedelta(days=expiration_days) @@ -45,7 +43,6 @@ async def pipeline(url: str, cache: dict = {}): article_date = int(result['publish_time']) if article_date < int(expiration_date.replace('-', '')): logger.info(f"publish date is {article_date}, too old, skip") - existing_urls.append(url) continue for k, v in cache.items(): @@ -56,32 +53,27 @@ async def pipeline(url: str, cache: dict = {}): logger.debug(f"article: {result['title']}") article_id = pb.add(collection_name='articles', body=result) if not article_id: - await asyncio.sleep(1) - # do again - article_id = pb.add(collection_name='articles', body=result) - if not article_id: - logger.error('add article failed, writing to cache_file') - with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f: - json.dump(result, f, ensure_ascii=False, indent=4) - continue + logger.error('add article failed, writing to cache_file') + with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f: + json.dump(result, f, ensure_ascii=False, indent=4) + continue insights = get_info(f"title: {result['title']}\n\ncontent: {result['content']}") if not insights: continue - existing_urls.append(url) # post process article_tags = set() - old_insights = pb.read(collection_name='insights', filter=f"updated>'{expiration_date}'", fields=['id', 'tag', 'content', 'articles']) + old_insights = pb.read(collection_name='insights', filter=f"updated>'{expiration_date}'", + fields=['id', 'tag', 'content', 'articles']) for insight in insights: article_tags.add(insight['tag']) insight['articles'] = [article_id] old_insight_dict = {i['content']: i for i in old_insights if i['tag'] == insight['tag']} - # Because what you want to compare is whether the extracted information phrases are talking about the same thing, + # the result wanted is whether the extracted information phrases are talking about the same thing, # it may not be suitable and too heavy to calculate the similarity with a vector model - # Therefore, a simplified solution is used here, directly using the jieba particifier, to calculate whether the overlap between the two phrases exceeds. - + # Therefore, a simplified solution is used here, directly using jieba to calculate whether the overlap between the two phrases exceeds. similar_insights = compare_phrase_with_list(insight['content'], list(old_insight_dict.keys()), 0.65) if similar_insights: to_rewrite = similar_insights + [insight['content']] @@ -93,30 +85,21 @@ async def pipeline(url: str, cache: dict = {}): for old_insight in similar_insights: insight['articles'].extend(old_insight_dict[old_insight]['articles']) if not pb.delete(collection_name='insights', id=old_insight_dict[old_insight]['id']): - # do again - if not pb.delete(collection_name='insights', id=old_insight_dict[old_insight]['id']): - logger.error('delete insight failed') + logger.error('delete insight failed') old_insights.remove(old_insight_dict[old_insight]) insight['id'] = pb.add(collection_name='insights', body=insight) if not insight['id']: - # do again - insight['id'] = pb.add(collection_name='insights', body=insight) - if not insight['id']: - logger.error('add insight failed, writing to cache_file') - with open(os.path.join(project_dir, 'cache_insights.json'), 'a', encoding='utf-8') as f: - json.dump(insight, f, ensure_ascii=False, indent=4) + logger.error('add insight failed, writing to cache_file') + with open(os.path.join(project_dir, 'cache_insights.json'), 'a', encoding='utf-8') as f: + json.dump(insight, f, ensure_ascii=False, indent=4) _ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)}) if not _: - # do again - await asyncio.sleep(1) - _ = pb.update(collection_name='articles', id=article_id, body={'tag': list(article_tags)}) - if not _: - logger.error(f'update article failed - article_id: {article_id}') - result['tag'] = list(article_tags) - with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f: - json.dump(result, f, ensure_ascii=False, indent=4) + logger.error(f'update article failed - article_id: {article_id}') + result['tag'] = list(article_tags) + with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f: + json.dump(result, f, ensure_ascii=False, indent=4) async def message_manager(_input: dict): @@ -150,7 +133,7 @@ async def message_manager(_input: dict): logger.debug(f"can not find any url in\n{_input['content']}\npass...") # todo get info from text process return - await asyncio.gather(*[pipeline(url) for url in urls]) + await asyncio.gather(*[pipeline(url) for url in urls if url not in existing_urls]) elif _input['type'] == 'url': # this is remained for wechat shared mp_article_card diff --git a/core/scrapers/README.md b/core/scrapers/README.md index 9aba8d8..3e04db0 100644 --- a/core/scrapers/README.md +++ b/core/scrapers/README.md @@ -18,8 +18,8 @@ We also provide a parser specifically for WeChat public articles (mp.weixin.qq.c 2. **The parser's input parameters should only include `url` and `logger`:** - `url` is the complete address of the source (type `str`). - `logger` is the logging object (please do not configure a separate logger for your custom source parser). -3. **The parser's output should include `flag` and `result`, formatted as `tuple[int, Union[list, dict]]`:** - - If the `url` is an article list page, `flag` returns `1`, and `result` returns a list of all article page URLs (`list`). +3. **The parser's output should include `flag` and `result`, formatted as `tuple[int, Union[set, dict]]`:** + - If the `url` is an article list page, `flag` returns `1`, and `result` returns a tuple of all article page URLs (`set`). - If the `url` is an article page, `flag` returns `11`, and `result` returns all article details (`dict`), in the following format: ```python diff --git a/core/scrapers/README_CN.md b/core/scrapers/README_CN.md index 205bd00..3e39510 100644 --- a/core/scrapers/README_CN.md +++ b/core/scrapers/README_CN.md @@ -18,8 +18,8 @@ 2. **解析器入参只包括 `url` 和 `logger` 两项:** - `url` 是信源完整地址(`str` 类型) - `logger` 是日志对象(请勿为您的专有信源解析器单独配置 `logger`) -3. **解析器出参包括 `flag` 和 `result` 两项,格式为 `tuple[int, Union[list, dict]]`:** - - 如果 `url` 是文章列表页面,`flag` 返回 `1`,`result` 返回解析出的全部文章页面 URL 列表(`list`)。 +3. **解析器出参包括 `flag` 和 `result` 两项,格式为 `tuple[int, Union[set, dict]]`:** + - 如果 `url` 是文章列表页面,`flag` 返回 `1`,`result` 返回解析出的全部文章页面 URL 集合(`set`)。 - 如果 `url` 是文章页面,`flag` 返回 `11`,`result` 返回解析出的全部文章详情(`dict`),格式如下: ```python diff --git a/core/scrapers/README_de.md b/core/scrapers/README_de.md index 22d67e0..4200859 100644 --- a/core/scrapers/README_de.md +++ b/core/scrapers/README_de.md @@ -18,8 +18,8 @@ Wir stellen auch einen speziellen Parser für WeChat-Artikel (mp.weixin.qq.com) 2. **Die Eingabeparameter des Parsers sollten nur `url` und `logger` umfassen:** - `url` ist die vollständige Adresse der Quelle (Typ `str`). - `logger` ist das Protokollierungsobjekt (bitte konfigurieren Sie keinen separaten Logger für Ihren benutzerdefinierten Quellparser). -3. **Die Ausgabe des Parsers sollte `flag` und `result` umfassen, im Format `tuple[int, Union[list, dict]]`:** - - Wenn die `url` eine Artikellisten-Seite ist, gibt `flag` `1` zurück, und `result` gibt eine Liste aller Artikel-URLs (`list`) zurück. +3. **Die Ausgabe des Parsers sollte `flag` und `result` umfassen, im Format `tuple[int, Union[set, dict]]`:** + - Wenn die `url` eine Artikellisten-Seite ist, gibt `flag` `1` zurück, und `result` gibt eine satz aller Artikel-URLs (`set`) zurück. - Wenn die `url` eine Artikelseite ist, gibt `flag` `11` zurück, und `result` gibt alle Artikeldetails (`dict`) zurück, im folgenden Format: ```python diff --git a/core/scrapers/README_fr.md b/core/scrapers/README_fr.md index 2822dfc..d96583f 100644 --- a/core/scrapers/README_fr.md +++ b/core/scrapers/README_fr.md @@ -18,8 +18,8 @@ Nous fournissons également un analyseur spécialement conçu pour les articles 2. **Les paramètres d'entrée de l'analyseur doivent uniquement inclure `url` et `logger` :** - `url` est l'adresse complète de la source (type `str`). - `logger` est l'objet de journalisation (ne configurez pas de logger séparé pour votre analyseur spécifique). -3. **Les paramètres de sortie de l'analyseur doivent inclure `flag` et `result`, formatés comme `tuple[int, Union[list, dict]]` :** - - Si l'URL est une page de liste d'articles, `flag` renvoie `1` et `result` renvoie la liste de toutes les URL des pages d'articles (`list`). +3. **Les paramètres de sortie de l'analyseur doivent inclure `flag` et `result`, formatés comme `tuple[int, Union[set, dict]]` :** + - Si l'URL est une page de liste d'articles, `flag` renvoie `1` et `result` renvoie la set de toutes les URL des pages d'articles (`set`). - Si l'URL est une page d'article, `flag` renvoie `11` et `result` renvoie tous les détails de l'article (`dict`), au format suivant : ```python diff --git a/core/scrapers/README_jp.md b/core/scrapers/README_jp.md index b3d8685..d5c6bc0 100644 --- a/core/scrapers/README_jp.md +++ b/core/scrapers/README_jp.md @@ -18,8 +18,8 @@ 2. **パーサーの入力パラメーターは `url` と `logger` のみを含むべきです:** - `url` はソースの完全なアドレス(`str` タイプ) - `logger` はロギングオブジェクト(専用のロガーを構成しないでください) -3. **パーサーの出力は `flag` と `result` を含み、形式は `tuple[int, Union[list, dict]]`:** - - `url` が記事リストページの場合、`flag` は `1` を返し、`result` はすべての記事ページURLのリスト(`list`)を返します。 +3. **パーサーの出力は `flag` と `result` を含み、形式は `tuple[int, Union[set, dict]]`:** + - `url` が記事リストページの場合、`flag` は `1` を返し、`result` はすべての記事ページURLのコレクション(`set`)を返します。 - `url` が記事ページの場合、`flag` は `11` を返し、`result` はすべての記事詳細(`dict`)を返します。形式は以下の通りです: ```python diff --git a/core/scrapers/general_crawler.py b/core/scrapers/general_crawler.py index 0efcf60..b662db7 100644 --- a/core/scrapers/general_crawler.py +++ b/core/scrapers/general_crawler.py @@ -2,7 +2,8 @@ # when you use this general crawler, remember followings # When you receive flag -7, it means that the problem occurs in the HTML fetch process. # When you receive flag 0, it means that the problem occurred during the content parsing process. -# when you receive flag 1, the result would be a list, means that the input url is possible a article_list page and the list contains the url of the articles. +# when you receive flag 1, the result would be a tuple, means that the input url is possible a article_list page +# and the set contains the url of the articles. # when you receive flag 11, you will get the dict contains the title, content, url, date, and the source of the article. from gne import GeneralNewsExtractor @@ -65,12 +66,13 @@ Ensure your response fits the following JSON structure, accurately reflecting th It is essential that your output adheres strictly to this format, with each field filled based on the untouched information extracted directly from the HTML source.''' -async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]: +async def general_crawler(url: str, logger) -> tuple[int, Union[set, dict]]: """ - Return article information dict and flag, negative number is error, 0 is no result, 1 is for article_list page, 11 is success + Return article information dict and flag, negative number is error, 0 is no result, 1 is for article_list page, + 11 is success main work flow: - (for weixin public account artilces, which startswith mp.weixin.qq use mp_crawler) + (for weixin public account articles, which startswith mp.weixin.qq use mp_crawler) first get the content with httpx then judge is article list (return all article url and flag 1) or article detail page then try to use gne to extract the information @@ -80,6 +82,7 @@ async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]: # 0. if there's a scraper for this domain, use it (such as mp.weixin.qq.com) parsed_url = urlparse(url) domain = parsed_url.netloc + base_url = f"{parsed_url.scheme}://{domain}" if domain in scraper_map: return await scraper_map[domain](url, logger) @@ -113,19 +116,25 @@ async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]: return -7, {} soup = BeautifulSoup(text, "html.parser") - # Note: The scheme used here is very crude, and it is recommended to write a separate parser for specific business scenarios + # Note: The scheme used here is very crude, + # it is recommended to write a separate parser for specific business scenarios # Parse all URLs if len(url) < 50: - base_url = f"{parsed_url.scheme}://{domain}" urls = set() for link in soup.find_all("a", href=True): - absolute_url = urljoin(base_url, link["href"]).rstrip('/') - if urlparse(absolute_url).netloc == domain and absolute_url != url: + absolute_url = urljoin(base_url, link["href"]) + format_url = urlparse(absolute_url) + # only record same domain links + if not format_url.netloc or format_url.netloc != domain: + continue + # remove hash fragment + absolute_url = f"{format_url.scheme}://{format_url.netloc}{format_url.path}{format_url.params}{format_url.query}" + if absolute_url != url: urls.add(absolute_url) - if len(urls) > 30: + if len(urls) > 24: logger.info(f"{url} is more like an article list page, find {len(urls)} urls with the same netloc") - return 1, list(urls) + return 1, urls # 3. try to use gne to extract the information try: @@ -183,7 +192,7 @@ async def general_crawler(url: str, logger) -> tuple[int, Union[list, dict]]: images = soup.find_all("img") for img in images: try: - image_links.append(img["src"]) + image_links.append(urljoin(base_url, img["src"])) except KeyError: continue result["images"] = image_links diff --git a/core/tasks.py b/core/tasks.py index 3c8a83a..f8248db 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -9,7 +9,7 @@ async def process_site(site, counter): return if counter % site['per_hours'] == 0: logger.info(f"applying {site['url']}") - await pipeline(site['url']) + await pipeline(site['url'].rstrip('/')) async def schedule_pipeline(interval): diff --git a/core/utils/general_utils.py b/core/utils/general_utils.py index 6562a9b..4840ed1 100644 --- a/core/utils/general_utils.py +++ b/core/utils/general_utils.py @@ -5,17 +5,31 @@ import jieba def isURL(string): + if string.startswith("www."): + string = f"https://{string}" result = urlparse(string) return result.scheme != '' and result.netloc != '' def extract_urls(text): - url_pattern = re.compile(r'https?://[-A-Za-z0-9+&@#/%?=~_|!:.;]+[-A-Za-z0-9+&@#/%=~_|]') + # Regular expression to match http, https, and www URLs + url_pattern = re.compile(r'((?:https?://|www\.)[-A-Za-z0-9+&@#/%?=~_|!:,.;]*[-A-Za-z0-9+&@#/%=~_|])') urls = re.findall(url_pattern, text) - - # Filter out those cases that only match to'www. 'without subsequent content, - # and try to add the default http protocol prefix to each URL for easy parsing - cleaned_urls = [url for url in urls if isURL(url)] + # urls = {quote(url.rstrip('/'), safe='/:?=&') for url in urls} + cleaned_urls = set() + for url in urls: + if url.startswith("www."): + url = f"https://{url}" + parsed_url = urlparse(url) + if not parsed_url.netloc: + continue + # remove hash fragment + if not parsed_url.scheme: + # just try https + cleaned_urls.add(f"https://{parsed_url.netloc}{parsed_url.path}{parsed_url.params}{parsed_url.query}") + else: + cleaned_urls.add( + f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}{parsed_url.params}{parsed_url.query}") return cleaned_urls