You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
5.6 KiB
153 lines
5.6 KiB
from docx import Document
|
|
from paddlenlp import Taskflow
|
|
from pprint import pprint
|
|
from qwen_agent.agents import Assistant
|
|
import re
|
|
import json_repair
|
|
import time
|
|
tagTask = Taskflow("ner")
|
|
prompt='''
|
|
.上述文本判断地名是否正确,你可以使用工具利用互联网查询,你只能在[正确,错误,简称,未知]三种选项中选择答案,回答格式[{“placeName”:“地名”,"回答":"答案"},{“placeName”:“地名”,"回答":"答案"}],不做过多的解释,严格按回答格式作答;
|
|
不做过多的解释,严格按回答格式作答;
|
|
'''
|
|
# prompt='''
|
|
# .请回答以上问题,
|
|
# ,回答格式[{“placeName”:"原文","回答":"答案"},{“placeName”:"原文","回答":"答案"}],不做过多的解释,严格按回答格式作答;
|
|
# 不做过多的解释,严格按回答格式作答;
|
|
# '''
|
|
llm_cfg = {
|
|
#'model': 'qwen1.5-72b-chat',
|
|
'model':"qwen2-72b",
|
|
'model_server': 'http://127.0.0.1:1025/v1', # base_url, also known as api_base
|
|
# 'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13',
|
|
}
|
|
bot = Assistant(llm=llm_cfg,
|
|
name='Assistant',
|
|
# description='使用RAG检索并回答,支持文件类型:PDF/Word/PPT/TXT/HTML。'
|
|
)
|
|
#获取全文内容
|
|
def getDocxToTextAll(name):
|
|
docxPath=name
|
|
document = Document(docxPath)
|
|
# 逐段读取docx文档的内容
|
|
levelList=[]
|
|
words=[]
|
|
addStart = False
|
|
levelText=""
|
|
i = 0
|
|
for paragraph in document.paragraphs:
|
|
# 判断该段落的标题级别
|
|
# 这里用isTitle()临时代表,具体见下文介绍的方法
|
|
text = paragraph.text
|
|
if text.strip():#非空判断
|
|
# print("非空")
|
|
words.append(text)
|
|
# 将所有段落文本拼接成一个字符串,并用换行符分隔
|
|
print("placeNameTask",len(words))
|
|
text = '\n'.join(words)
|
|
|
|
# 将文本写入txt文件
|
|
with open("checkPlaceName.txt", 'w', encoding='utf-8') as txt_file:
|
|
txt_file.write(text)
|
|
|
|
#得到全文和地名有关的内容
|
|
def placeNameTask(text):
|
|
res = tagTask(text)
|
|
print(res)
|
|
placeList = []
|
|
isplace = False
|
|
for zuhe in res:
|
|
# 上一个的地名,这一个还是地名,就和上一个相加代替这个
|
|
|
|
if isplace:
|
|
name = placeList[len(placeList) - 1]
|
|
if zuhe[1].find("组织机构类")>=0 or zuhe[1].find("世界地区类")>=0:# or zuhe[1] == "ns"
|
|
isplace = True
|
|
new_text = zuhe[0].replace("\n", "")
|
|
placeList[len(placeList) - 1] = name + new_text
|
|
continue
|
|
if zuhe[1].find("组织机构类")>=0 or zuhe[1].find("世界地区类")>=0:
|
|
isplace = True
|
|
new_text = zuhe[0].replace("\n", "")
|
|
placeList.append(new_text)
|
|
else:
|
|
isplace = False
|
|
placeList=list(dict.fromkeys(placeList))
|
|
return placeList
|
|
#主方法
|
|
def checkPlaceName(filename):
|
|
getDocxToTextAll(filename)
|
|
start_time=time.time()
|
|
error_places = []
|
|
for batch in read_file_in_batches('checkPlaceName.txt'):
|
|
res=process_batch(batch)
|
|
if(len(res)>0):
|
|
error_places.extend(res)
|
|
|
|
pprint(error_places)
|
|
end_time = time.time()
|
|
# 计算执行时间
|
|
elapsed_time = end_time - start_time
|
|
print(f"checkPlaceName程序执行时间: {elapsed_time} 秒")
|
|
return error_places
|
|
|
|
def read_file_in_batches(file_path, batch_size=5000):
|
|
"""
|
|
分批读取文本文件
|
|
:param file_path: 文件路径
|
|
:param batch_size: 每批处理的字符数
|
|
:return: 生成器,每次返回一批文本
|
|
"""
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
batch = []
|
|
char_count = 0
|
|
for line in file:
|
|
batch.append(line)
|
|
char_count += len(line)
|
|
if char_count >= batch_size:
|
|
yield ''.join(batch)
|
|
batch = []
|
|
char_count = 0
|
|
if batch:
|
|
yield ''.join(batch)
|
|
|
|
def process_batch(batch):
|
|
"""
|
|
处理一批文本
|
|
:param batch: 一批文本
|
|
"""
|
|
# 在这里添加你的处理逻辑
|
|
|
|
# sentences = re.split(r'[。\n]', batch)
|
|
# sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
|
|
propnList=placeNameTask(batch)
|
|
# words=[]
|
|
# for placeName in propnList:
|
|
# word="原文:{},先从分析原文是否含有错误地名,若含有错误地名,请回答包含错误地名,若不包含错误地名,请从【具体的公司或组织名称,非具体的公司或组织名称,与政府有关的公司或组织名称,其他组织名称,地名】中选择最合适的一个作为答案".format(placeName)
|
|
# words.append(word)
|
|
propnStr = ",".join(propnList)
|
|
print("placeNameTask",propnStr)
|
|
messages = [{'role': 'user', 'content': [{'text': propnStr + prompt}]}]
|
|
runList = []
|
|
for rsp in bot.run(messages):
|
|
runList.append(rsp)
|
|
data = runList[len(runList) - 1][0]["content"]
|
|
print("placeNameTask",data)
|
|
parsed_data = json_repair.loads(data.replace('`', ''))
|
|
|
|
# 遍历列表
|
|
for item in parsed_data:
|
|
print(f"地名: {item['placeName']}, 回答: {item['回答']}")
|
|
|
|
# 如果需要进一步操作,例如只关注“正确”的回答
|
|
error_places = [place for place in parsed_data if place['回答'] == '错误']
|
|
print("placeNameTask",error_places)
|
|
if len(error_places)>0:
|
|
for t in error_places:
|
|
keyword= t['placeName']
|
|
# 查找包含关键字的段落
|
|
paragraphs = re.findall(r'.*?' + re.escape(keyword) + r'.*?\n', batch)
|
|
t["yuanwen"]=paragraphs[0]
|
|
return error_places
|
|
else:
|
|
return error_places
|