from docx import Document from paddlenlp import Taskflow from pprint import pprint from qwen_agent.agents import Assistant import re import json_repair import time import math tagTask = Taskflow("ner") prompt=''' .上述文本判断地名是否正确,你可以使用工具利用互联网查询,你只能在[正确,错误,简称,未知]三种选项中选择答案,回答格式[{“placeName”:“地名”,"回答":"答案"},{“placeName”:“地名”,"回答":"答案"}],不做过多的解释,严格按回答格式作答; 不做过多的解释,严格按回答格式作答; ''' # prompt=''' # .请回答以上问题, # ,回答格式[{“placeName”:"原文","回答":"答案"},{“placeName”:"原文","回答":"答案"}],不做过多的解释,严格按回答格式作答; # 不做过多的解释,严格按回答格式作答; # ''' llm_cfg = { #'model': 'qwen1.5-72b-chat', 'model':"qwen2-72b", 'model_server': 'http://127.0.0.1:1025/v1', # base_url, also known as api_base # 'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13', } bot = Assistant(llm=llm_cfg, name='Assistant', # description='使用RAG检索并回答,支持文件类型:PDF/Word/PPT/TXT/HTML。' ) #获取全文内容 def getDocxToTextAll(name): docxPath=name document = Document(docxPath) # 逐段读取docx文档的内容 levelList=[] words=[] addStart = False levelText="" i = 0 for paragraph in document.paragraphs: # 判断该段落的标题级别 # 这里用isTitle()临时代表,具体见下文介绍的方法 text = paragraph.text if text.strip():#非空判断 # print("非空") words.append(text) # 将所有段落文本拼接成一个字符串,并用换行符分隔 print("placeNameTask",len(words)) text = '\n'.join(words) # 将文本写入txt文件 with open("checkPlaceName.txt", 'w', encoding='utf-8') as txt_file: txt_file.write(text) #得到全文和地名有关的内容 def placeNameTask(text): batchNum=20 sentences = re.split(r'[。\n]', text) # 去掉空字符 sentences = [sentence.strip() for sentence in sentences if sentence.strip()] # 计算总字符数 total_chars = len(sentences) # 计算有多少份 num_chunks = math.ceil(total_chars / batchNum) # 按batchNum字为一份进行处理 chunks = [sentences[i:i + batchNum] for i in range(0, total_chars, batchNum)] placeList = [] # 打印每一份的内容 for i, chunk in enumerate(chunks): yield f"文档地名检查---文档解析进度:{i + 1}/{num_chunks}" wenBen=".".join(chunk) print(chunk) res = tagTask(wenBen) isplace = False for zuhe in res: # 上一个的地名,这一个还是地名,就和上一个相加代替这个 if isplace: name = placeList[len(placeList) - 1] if zuhe[1].find("组织机构类") >= 0 or zuhe[1].find("世界地区类") >= 0: # or zuhe[1] == "ns" isplace = True new_text = zuhe[0].replace("\n", "") placeList[len(placeList) - 1] = name + new_text continue if zuhe[1].find("组织机构类") >= 0 or zuhe[1].find("世界地区类") >= 0: isplace = True new_text = zuhe[0].replace("\n", "") placeList.append(new_text) else: isplace = False print("-" * 40) # 打印总份数 yield "文档地名检查---文档解析完成" placeList=list(dict.fromkeys(placeList)) yield placeList #主方法 def checkPlaceName(filename): yield f"文档地名检查---开始处理文档..." # 每次生成一个数字就发送 getDocxToTextAll(filename) with open("checkPlaceName.txt", "r",encoding='utf-8') as f: gettext = f.read() yield f"文档地名检查---开始解析文档..." # 每次生成一个数字就发送 # propnList=placeNameTask(gettext) for item in placeNameTask(gettext): if isinstance(item, str): yield item else: final_list = item # 获取最终结果 propnStr = ",".join(final_list) print("placeNameTask",propnStr) messages = [{'role': 'user', 'content': [{'text': propnStr + prompt}]}] runList = [] yield f"文档地名检查---结果生成中..." # 每次生成一个数字就发送 cishu=0 for rsp in bot.run(messages): runList.append(rsp) if cishu>3: cishu=0 yield "文档地名检查---结果生成中"+'.'*cishu cishu+=1 data = runList[len(runList) - 1][0]["content"] print("placeNameTask",data) parsed_data = json_repair.loads(data.replace('`', '')) # 如果需要进一步操作,例如只关注“正确”的回答 error_places = [place for place in parsed_data if place['回答'] == '错误'] print("placeNameTask",error_places) returnInfo = "发现异常地名
"; if len(error_places)>0: for t in error_places: keyword= t['placeName'] # 查找包含关键字的段落 paragraphs = re.findall(r'.*?' + re.escape(keyword) + r'.*?\n', gettext) yuanwen= paragraphs[0].replace(keyword,f"**{keyword}**").replace("\n","") returnInfo+="原文:" + yuanwen + "
出现异常地名:**" + keyword + "**!请注意" + "
"; yield returnInfo print(returnInfo) else: yield "**未发现发现异常地名**"