You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
5.5 KiB

5 months ago
from docx import Document
from paddlenlp import Taskflow
from pprint import pprint
from qwen_agent.agents import Assistant
import re
import json_repair
import time
import math
tagTask = Taskflow("ner")
prompt='''
.上述文本判断地名是否正确你可以使用工具利用互联网查询你只能在[正确,错误,简称,未知]三种选项中选择答案,回答格式[{placeName:地名,"回答":"答案"},{placeName:地名,"回答":"答案"}]不做过多的解释,严格按回答格式作答;
不做过多的解释,严格按回答格式作答;
'''
# prompt='''
# .请回答以上问题,
# ,回答格式[{“placeName”:"原文","回答":"答案"},{“placeName”:"原文","回答":"答案"}],不做过多的解释,严格按回答格式作答;
# 不做过多的解释,严格按回答格式作答;
# '''
llm_cfg = {
#'model': 'qwen1.5-72b-chat',
'model':"qwen2-72b",
'model_server': 'http://127.0.0.1:1025/v1', # base_url, also known as api_base
# 'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13',
}
bot = Assistant(llm=llm_cfg,
name='Assistant',
# description='使用RAG检索并回答,支持文件类型:PDF/Word/PPT/TXT/HTML。'
)
#获取全文内容
def getDocxToTextAll(name):
docxPath=name
document = Document(docxPath)
# 逐段读取docx文档的内容
levelList=[]
words=[]
addStart = False
levelText=""
i = 0
for paragraph in document.paragraphs:
# 判断该段落的标题级别
# 这里用isTitle()临时代表,具体见下文介绍的方法
text = paragraph.text
if text.strip():#非空判断
# print("非空")
words.append(text)
# 将所有段落文本拼接成一个字符串,并用换行符分隔
print("placeNameTask",len(words))
text = '\n'.join(words)
# 将文本写入txt文件
with open("checkPlaceName.txt", 'w', encoding='utf-8') as txt_file:
txt_file.write(text)
#得到全文和地名有关的内容
def placeNameTask(text):
batchNum=20
sentences = re.split(r'[。\n]', text)
# 去掉空字符
sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
# 计算总字符数
total_chars = len(sentences)
# 计算有多少份
num_chunks = math.ceil(total_chars / batchNum)
# 按batchNum字为一份进行处理
chunks = [sentences[i:i + batchNum] for i in range(0, total_chars, batchNum)]
placeList = []
# 打印每一份的内容
for i, chunk in enumerate(chunks):
yield f"文档地名检查---文档解析进度:{i + 1}/{num_chunks}"
wenBen=".".join(chunk)
print(chunk)
res = tagTask(wenBen)
isplace = False
for zuhe in res:
# 上一个的地名,这一个还是地名,就和上一个相加代替这个
if isplace:
name = placeList[len(placeList) - 1]
if zuhe[1].find("组织机构类") >= 0 or zuhe[1].find("世界地区类") >= 0: # or zuhe[1] == "ns"
isplace = True
new_text = zuhe[0].replace("\n", "")
placeList[len(placeList) - 1] = name + new_text
continue
if zuhe[1].find("组织机构类") >= 0 or zuhe[1].find("世界地区类") >= 0:
isplace = True
new_text = zuhe[0].replace("\n", "")
placeList.append(new_text)
else:
isplace = False
print("-" * 40)
# 打印总份数
yield "文档地名检查---文档解析完成"
placeList=list(dict.fromkeys(placeList))
yield placeList
#主方法
def checkPlaceName(filename):
yield f"文档地名检查---开始处理文档..." # 每次生成一个数字就发送
getDocxToTextAll(filename)
with open("checkPlaceName.txt", "r",encoding='utf-8') as f:
gettext = f.read()
yield f"文档地名检查---开始解析文档..." # 每次生成一个数字就发送
# propnList=placeNameTask(gettext)
for item in placeNameTask(gettext):
if isinstance(item, str):
yield item
else:
final_list = item # 获取最终结果
propnStr = ",".join(final_list)
print("placeNameTask",propnStr)
messages = [{'role': 'user', 'content': [{'text': propnStr + prompt}]}]
runList = []
yield f"文档地名检查---结果生成中..." # 每次生成一个数字就发送
cishu=0
for rsp in bot.run(messages):
runList.append(rsp)
if cishu>3:
cishu=0
yield "文档地名检查---结果生成中"+'.'*cishu
cishu+=1
data = runList[len(runList) - 1][0]["content"]
print("placeNameTask",data)
parsed_data = json_repair.loads(data.replace('`', ''))
# 如果需要进一步操作,例如只关注“正确”的回答
error_places = [place for place in parsed_data if place['回答'] == '错误']
print("placeNameTask",error_places)
returnInfo = "发现异常地名<br />";
if len(error_places)>0:
for t in error_places:
keyword= t['placeName']
# 查找包含关键字的段落
paragraphs = re.findall(r'.*?' + re.escape(keyword) + r'.*?\n', gettext)
yuanwen= paragraphs[0].replace(keyword,f"**{keyword}**").replace("\n","")
returnInfo+="原文:" + yuanwen + "<br />出现异常地名:**" + keyword + "**!请注意" + "<br />";
yield returnInfo
print(returnInfo)
else:
yield "**未发现发现异常地名**"