mirror of
https://github.com/TeamWiseFlow/wiseflow.git
synced 2025-01-23 02:20:20 +08:00
145 lines
4.9 KiB
Python
145 lines
4.9 KiB
Python
import os
|
||
import sys
|
||
import re
|
||
|
||
# 将core目录添加到Python路径
|
||
core_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'core')
|
||
sys.path.append(core_path)
|
||
|
||
# 现在可以直接导入模块,因为core目录已经在Python路径中
|
||
from scrapers import *
|
||
from agents.get_info import pre_process
|
||
|
||
def check_url_text(text):
|
||
common_chars = ',.!;:,;:、一二三四五六七八九十#*@% \t\n\r|*-_…>#'
|
||
print(f"processing: {text}")
|
||
left_bracket = text.find('[')
|
||
right_paren = text.rfind(')')
|
||
|
||
if -1 in [left_bracket, right_paren] or left_bracket > right_paren:
|
||
print("not [] or () marker")
|
||
print(f"left_bracket: {left_bracket}, right_paren: {right_paren}")
|
||
return
|
||
|
||
# 检查左括号前的文本是否包含至少2个有效字符
|
||
prefix = text[:left_bracket]
|
||
pre_valid_chars = [c for c in prefix if not c.isdigit() and c not in common_chars]
|
||
if len(pre_valid_chars) >= 50:
|
||
print("prefix has at least 50 valid chars")
|
||
print(f"prefix: {prefix}, valid_chars: {pre_valid_chars}")
|
||
return
|
||
|
||
suffix = text[right_paren+1:]
|
||
suf_valid_chars = [c for c in suffix if c not in common_chars]
|
||
if len(pre_valid_chars) >= 2 and len(suf_valid_chars) >= 1:
|
||
print("prefix has at least 2 valid chars and suffix has at least 1 valid char")
|
||
print(f"prefix: {prefix}, valid_chars: {pre_valid_chars}, suffix: {suffix}, valid_chars: {suf_valid_chars}")
|
||
return
|
||
|
||
if len(suf_valid_chars) >= 36:
|
||
print("suffix has at least 36 valid chars")
|
||
print(f"suffix: {suffix}, valid_chars: {suf_valid_chars}")
|
||
return
|
||
|
||
print('is a isolated url')
|
||
|
||
print("处理图片标记 ![alt](src)")
|
||
img_pattern = r'!\[(.*?)\]\((.*?)\)'
|
||
matches = re.findall(img_pattern, text)
|
||
|
||
for alt, src in matches:
|
||
# 替换为新格式 <alt||src>
|
||
text = text.replace(f'![{alt}]({src})', f'§{alt}||{src}§')
|
||
print(text)
|
||
|
||
print("处理 [part0](part1) 格式的片段")
|
||
link_pattern = r'\[(.*?)\]\((.*?)\)'
|
||
matches = re.findall(link_pattern, text)
|
||
for match in matches:
|
||
print(match)
|
||
|
||
async def main(html_sample, record_file):
|
||
recognized_img_cache = {}
|
||
parsed_url = urlparse(html_sample['url'])
|
||
domain = parsed_url.netloc
|
||
if domain in custom_scrapers:
|
||
result = custom_scrapers[domain](html_sample)
|
||
raw_markdown = result.content
|
||
used_img = result.images
|
||
title = result.title
|
||
base_url = result.base
|
||
author = result.author
|
||
publish_date = result.publish_date
|
||
else:
|
||
raw_markdown = html_sample['markdown']
|
||
media_dict = html_sample['media'] if html_sample['media'] else {}
|
||
used_img = [d['src'] for d in media_dict.get('images', [])]
|
||
title = ''
|
||
base_url = ''
|
||
author = ''
|
||
publish_date = ''
|
||
|
||
if not raw_markdown:
|
||
print(f"no raw_markdown for {file}")
|
||
return
|
||
|
||
if not title:
|
||
title = html_sample.get('title', '')
|
||
if not base_url:
|
||
base_url = html_sample.get('base', '')
|
||
if not base_url:
|
||
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||
|
||
print('base_url:', base_url)
|
||
|
||
if not author:
|
||
author = html_sample.get('author', '')
|
||
if not publish_date:
|
||
publish_date = html_sample.get('publish_date', '')
|
||
|
||
link_dict, links_parts, contents, recognized_img_cache = await pre_process(raw_markdown, base_url, used_img, recognized_img_cache, test_mode=True)
|
||
result = {
|
||
"link_dict": link_dict,
|
||
"links_part": links_parts,
|
||
"contents": contents,
|
||
}
|
||
|
||
with open(record_file, 'w', encoding='utf-8') as f:
|
||
json.dump(result, f, indent=4, ensure_ascii=False)
|
||
print(f"pre process done, saved to {record_file}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
import json
|
||
from urllib.parse import urlparse
|
||
import asyncio
|
||
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('--test_file', '-F', type=str, default='')
|
||
parser.add_argument('--sample_dir', '-D', type=str, default='')
|
||
parser.add_argument('--record_folder', '-R', type=str, default='')
|
||
args = parser.parse_args()
|
||
|
||
test_file = args.test_file
|
||
sample_dir = args.sample_dir
|
||
record_folder = args.record_folder
|
||
if record_folder:
|
||
os.makedirs(record_folder, exist_ok=True)
|
||
|
||
files = []
|
||
if test_file:
|
||
files.append(test_file)
|
||
|
||
if sample_dir:
|
||
files.extend([os.path.join(sample_dir, file) for file in os.listdir(sample_dir)])
|
||
|
||
for file in files:
|
||
if not file.endswith('.json'): continue
|
||
print(f"processing {file} ...")
|
||
with open(file, 'r') as f:
|
||
html_sample = json.load(f)
|
||
record_file = os.path.join(record_folder, f'{os.path.basename(file)}_processed.json')
|
||
|
||
asyncio.run(main(html_sample, record_file))
|