# -*- coding:utf-8 -*- from docx import Document from qwen_agent.agents import Assistant import re import json_repair import json import math from docx.opc.pkgreader import _SerializedRelationships, _SerializedRelationship from docx.opc.oxml import parse_xml import requests # from myLogger import outLog import time def load_from_xml_v2(baseURI, rels_item_xml): """ Return |_SerializedRelationships| instance loaded with the relationships contained in *rels_item_xml*. Returns an empty collection if *rels_item_xml* is |None|. """ srels = _SerializedRelationships() if rels_item_xml is not None: rels_elm = parse_xml(rels_item_xml) for rel_elm in rels_elm.Relationship_lst: if rel_elm.target_ref in ('../NULL', 'NULL'): continue srels._srels.append(_SerializedRelationship(baseURI, rel_elm)) return srels _SerializedRelationships.load_from_xml = load_from_xml_v2 import logging # outLog.logger = logging.getLogger("checkCompanyName") userLog = None prompt = ''' .根据上述文本判断,是否为具体的公司或组织名称,你可以使用工具利用互联网查询, 你只能在[具体的公司或组织名称,公益组织,简称,统称,泛化组织,政府单位,机关单位,学校,行业类型,其他]选项中选择答案, 回答格式[{“companyName”:“名称”,"回答":"答案"},{“companyName”:“名称”,"回答":"答案"}],不做过多的解释,严格按回答格式作答; ''' llm_cfg = { # 'model': 'qwen1.5-72b-chat', 'model': "qwen2-72b", 'model_server': 'http://127.0.0.1:1025/v1', # base_url, also known as api_base # 'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13', } bot = Assistant(llm=llm_cfg, name='Assistant', # system_message="你是一个地理专家,可以准确的判断地理位置,如果你不确定,可以使用工具" ) def getDocxToTextAll(name): docxPath = name loopCount = 0 while True: loopCount += 1 if (loopCount >= 60): raise Exception("文档读取超时,或文档存在问题无法读取") break try: document = Document(docxPath) break except Exception as e: time.sleep(1) pass # 逐段读取docx文档的内容 words = [] i = 0 for paragraph in document.paragraphs: # 判断该段落的标题级别 # 这里用isTitle()临时代表,具体见下文介绍的方法 text = paragraph.text if text.strip(): # 非空判断 # print("非空") words.append(text) # 将所有段落文本拼接成一个字符串,并用换行符分隔 text = '\n'.join(words) # 将文本写入txt文件 with open("checkCompanyName.txt", 'w', encoding='utf-8') as txt_file: txt_file.write(text) def companyNameTask(text): yield "文档公司或组织名称检查---文档解析中...." userLog.info("文档公司或组织名称检查---任务开始") batchNum = 5 sentences = re.split(r'[、,。\n]', text) # 去掉空字符 sentences = [sentence.strip() for sentence in sentences if sentence.strip()] # 计算总字符数 total_chars = len(sentences) # 计算有多少份 num_chunks = math.ceil(total_chars / batchNum) # 按batchNum字为一份进行处理 chunks = [sentences[i:i + batchNum] for i in range(0, total_chars, batchNum)] placeList = [] # 打印每一份的内容 for i, chunk in enumerate(chunks): yield f"文档公司或组织名称检查---文档解析进度:{i + 1}/{num_chunks}" try: # wenBen = ".".join(chunk) url = "http://0.0.0.0:8191/taskflow/checkPlaceNameServer" headers = {"Content-Type": "application/json"} data = { "data": { "text": chunk, # "text":wenBen } } r = requests.post(url=url, headers=headers, data=json.dumps(data)) res = json.loads(r.text) res = res["data"] # print(res) except Exception as e: userLog.warning(chunk) userLog.warning("文档公司或组织名称检查--错别字识别出错\n") userLog.warning(e) return isplace = False # for zuhe in res: # # 上一个的地名,这一个还是地名,就和上一个相加代替这个 # if isplace: # name = placeList[len(placeList) - 1] # if zuhe[1].find("组织机构类") >= 0: # or zuhe[1] == "ns" # isplace = True # new_text = zuhe[0].replace("\n", "") # placeList[len(placeList) - 1] = name + new_text # continue # if zuhe[1].find("组织机构类") >= 0: # isplace = True # new_text = zuhe[0].replace("\n", "") # placeList.append(new_text) # else: # isplace = False ##案例[[('目前', 'TIME'), ('江北区历史文化档案馆', 'ORG')], [('宁波国研简直,并且在东软', 'ORG'), ('宁波市北仑区教育局', 'ORG'), ('国研信息', 'ORG'), ('浙江省', 'LOC'), ('宁波市金凤区', 'LOC'), ('金凤区', 'LOC')]] for zuhe in res: # 上一个的地名,这一个还是地名,就和上一个相加代替这个 for chid in zuhe: if (chid[1] == "ORG"): new_text = chid[0].replace("\n", "") placeList.append(new_text) # 打印总份数 yield "文档公司或组织名称检查---文档解析完成" placeList = list(dict.fromkeys(placeList)) userLog.debug(placeList) yield placeList def checkCompanyName(filename, user_id, outLog): yield f"文档公司或组织名称检查---开始处理文档..." global userLog userLog = outLog.get_queue(user_id, "checkCompanyName") try: getDocxToTextAll(filename) except Exception as e: userLog.warning(e) userLog.warning("文档公司或组织名称检查---文档无法打开,请检查文档内容") yield "文档公司或组织名称检查---文件无法正常打开。可以尝试用WORD或WPS打开文件,进行修复并另存,用另存的文件再做一次尝试。" outLog.mark_done(user_id, "checkCompanyName") return with open("checkCompanyName.txt", "r", encoding='utf-8') as f: gettext = f.read() yield f"文档公司或组织名称检查---开始解析文档..." # 每次生成一个数字就发送 final_list = "" for item in companyNameTask(gettext): if isinstance(item, str): yield item else: final_list = item # 获取最终结果 propnStr = ",".join(final_list) messages = [{'role': 'user', 'content': [{'text': propnStr + prompt}]}] runList = [] yield f"文档公司或组织名称检查---结果生成中..." # 每次生成一个数字就发送 cishu = 0 for rsp in bot.run(messages): runList.append(rsp) if cishu > 3: cishu = 0 yield "文档公司或组织名称检查---结果生成中" + '.' * cishu cishu += 1 data = runList[len(runList) - 1][0]["content"] parsed_data = json_repair.loads(data.replace('`', '')) error_places = [] for place in parsed_data: try: if place['回答'] == '具体的公司或组织名称': if (place["companyName"] == "北京国研科技咨询有限公司浙江分公司"): continue error_places.append(place) except Exception as e: userLog.warning(place) userLog.warning(e) userLog.warning("文档公司或组织名称检查---组织提出出错") continue returnInfo = "发现异常公司或组织名称
" if len(error_places) > 0: for t in error_places: keyword = t['companyName'].replace("\n", "") # 查找包含关键字的段落 paragraphs = re.findall(r'.*?' + re.escape(keyword) + r'.*?\n', gettext) t["yuanwen"] = paragraphs[0] yuanwen = paragraphs[0].replace(keyword, f"**{keyword}**").replace("\n", "") returnInfo += "原文:" + yuanwen + "
异常公司或组织名称:**" + keyword + "**!请注意" + "
" userLog.info("文档公司或组织名称检查---原文:" + yuanwen + "异常公司或组织名称:" + keyword + "!请注意") yield returnInfo else: yield "**未发现异常公司或组织名称**
" userLog.info("文档公司或组织名称检查---未发现异常公司或组织名称") outLog.mark_done(user_id, "checkCompanyName")