from docx import Document from paddlenlp import Taskflow from pprint import pprint from qwen_agent.agents import Assistant import re import json_repair import time tagTask = Taskflow("ner") prompt=''' .上述文本判断地名是否正确,你可以使用工具利用互联网查询,你只能在[正确,错误,简称,未知]三种选项中选择答案,回答格式[{“placeName”:“地名”,"回答":"答案"},{“placeName”:“地名”,"回答":"答案"}],不做过多的解释,严格按回答格式作答; 不做过多的解释,严格按回答格式作答; ''' # prompt=''' # .请回答以上问题, # ,回答格式[{“placeName”:"原文","回答":"答案"},{“placeName”:"原文","回答":"答案"}],不做过多的解释,严格按回答格式作答; # 不做过多的解释,严格按回答格式作答; # ''' llm_cfg = { #'model': 'qwen1.5-72b-chat', 'model':"qwen2-72b", 'model_server': 'http://127.0.0.1:1025/v1', # base_url, also known as api_base # 'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13', } bot = Assistant(llm=llm_cfg, name='Assistant', # description='使用RAG检索并回答,支持文件类型:PDF/Word/PPT/TXT/HTML。' ) #获取全文内容 def getDocxToTextAll(name): docxPath=name document = Document(docxPath) # 逐段读取docx文档的内容 levelList=[] words=[] addStart = False levelText="" i = 0 for paragraph in document.paragraphs: # 判断该段落的标题级别 # 这里用isTitle()临时代表,具体见下文介绍的方法 text = paragraph.text if text.strip():#非空判断 # print("非空") words.append(text) # 将所有段落文本拼接成一个字符串,并用换行符分隔 print("placeNameTask",len(words)) text = '\n'.join(words) # 将文本写入txt文件 with open("checkPlaceName.txt", 'w', encoding='utf-8') as txt_file: txt_file.write(text) #得到全文和地名有关的内容 def placeNameTask(text): res = tagTask(text) print(res) placeList = [] isplace = False for zuhe in res: # 上一个的地名,这一个还是地名,就和上一个相加代替这个 if isplace: name = placeList[len(placeList) - 1] if zuhe[1].find("组织机构类")>=0 or zuhe[1].find("世界地区类")>=0:# or zuhe[1] == "ns" isplace = True new_text = zuhe[0].replace("\n", "") placeList[len(placeList) - 1] = name + new_text continue if zuhe[1].find("组织机构类")>=0 or zuhe[1].find("世界地区类")>=0: isplace = True new_text = zuhe[0].replace("\n", "") placeList.append(new_text) else: isplace = False placeList=list(dict.fromkeys(placeList)) return placeList #主方法 def checkPlaceName(filename): getDocxToTextAll(filename) start_time=time.time() error_places = [] for batch in read_file_in_batches('checkPlaceName.txt'): res=process_batch(batch) if(len(res)>0): error_places.extend(res) pprint(error_places) end_time = time.time() # 计算执行时间 elapsed_time = end_time - start_time print(f"checkPlaceName程序执行时间: {elapsed_time} 秒") return error_places def read_file_in_batches(file_path, batch_size=5000): """ 分批读取文本文件 :param file_path: 文件路径 :param batch_size: 每批处理的字符数 :return: 生成器,每次返回一批文本 """ with open(file_path, 'r', encoding='utf-8') as file: batch = [] char_count = 0 for line in file: batch.append(line) char_count += len(line) if char_count >= batch_size: yield ''.join(batch) batch = [] char_count = 0 if batch: yield ''.join(batch) def process_batch(batch): """ 处理一批文本 :param batch: 一批文本 """ # 在这里添加你的处理逻辑 # sentences = re.split(r'[。\n]', batch) # sentences = [sentence.strip() for sentence in sentences if sentence.strip()] propnList=placeNameTask(batch) # words=[] # for placeName in propnList: # word="原文:{},先从分析原文是否含有错误地名,若含有错误地名,请回答包含错误地名,若不包含错误地名,请从【具体的公司或组织名称,非具体的公司或组织名称,与政府有关的公司或组织名称,其他组织名称,地名】中选择最合适的一个作为答案".format(placeName) # words.append(word) propnStr = ",".join(propnList) print("placeNameTask",propnStr) messages = [{'role': 'user', 'content': [{'text': propnStr + prompt}]}] runList = [] for rsp in bot.run(messages): runList.append(rsp) data = runList[len(runList) - 1][0]["content"] print("placeNameTask",data) parsed_data = json_repair.loads(data.replace('`', '')) # 遍历列表 for item in parsed_data: print(f"地名: {item['placeName']}, 回答: {item['回答']}") # 如果需要进一步操作,例如只关注“正确”的回答 error_places = [place for place in parsed_data if place['回答'] == '错误'] print("placeNameTask",error_places) if len(error_places)>0: for t in error_places: keyword= t['placeName'] # 查找包含关键字的段落 paragraphs = re.findall(r'.*?' + re.escape(keyword) + r'.*?\n', batch) t["yuanwen"]=paragraphs[0] return error_places else: return error_places