# -*- coding:utf-8 -*- # from pycorrector import MacBertCorrector # m = MacBertCorrector("shibing624/macbert4csc-base-chinese") from qwen_agent.agents import Assistant from docx import Document from pprint import pprint import re from paddlenlp import Taskflow import json import time import json_repair import math from docx.opc.pkgreader import _SerializedRelationships, _SerializedRelationship from docx.opc.oxml import parse_xml import asyncio def load_from_xml_v2(baseURI, rels_item_xml): """ Return |_SerializedRelationships| instance loaded with the relationships contained in *rels_item_xml*. Returns an empty collection if *rels_item_xml* is |None|. """ srels = _SerializedRelationships() if rels_item_xml is not None: rels_elm = parse_xml(rels_item_xml) for rel_elm in rels_elm.Relationship_lst: if rel_elm.target_ref in ('../NULL', 'NULL'): continue srels._srels.append(_SerializedRelationship(baseURI, rel_elm)) return srels _SerializedRelationships.load_from_xml = load_from_xml_v2 import logging import logging.config log_config = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', }, }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'formatter': 'standard', 'level': logging.INFO, }, 'file': { 'class': 'logging.FileHandler', 'filename': 'Logger.log', 'formatter': 'standard', 'level': logging.INFO, }, }, 'loggers': { '': { 'handlers': ['console', 'file'], 'level': logging.INFO, 'propagate': True, }, } } logging.config.dictConfig(log_config) logger = logging.getLogger("checkDocumentError") llm_cfg = { # 'model': 'qwen1.5-72b-chat', 'model': "qwen2-72b", 'model_server': 'http://127.0.0.1:1025/v1', # base_url, also known as api_base # 'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13', } bot = Assistant(llm=llm_cfg, name='Assistant', # description='使用RAG检索并回答,支持文件类型:PDF/Word/PPT/TXT/HTML。' ) # prompt=''' # 是否存在错别字,若存在请指出,不做其他方面的校验,你只能在[存在,不存在,未知]选项中选择答案, # 回答格式[{“placeName”:“原文”,"改正后":"改正的内容","回答":"答案"},{“placeName”:“原文”,"改正后":"改正的内容","回答":"答案"}],不做过多的解释,严格按回答格式作答; # ''' prompt = ''' 请回答以上问题,[是,否]选项中选择答案,原文内容,标点符号保持不变,如果有错请给出解析,没有错则不用给解析 回答格式请按照以下json格式[{"placeName":"序号","回答":"答案","解析","解析内容"},{"placeName":"序号","回答":"答案","解析","解析内容"}],不做过多的解释,严格按回答格式作答; ''' def getDocxToTextAll(name): docxPath = name document = Document(docxPath) # 逐段读取docx文档的内容 levelList = [] words = [] addStart = False levelText = "" i = 0 for paragraph in document.paragraphs: # 判断该段落的标题级别 # 这里用isTitle()临时代表,具体见下文介绍的方法 text = paragraph.text if text.strip(): # 非空判断 # print("非空") words.append(text) # 将所有段落文本拼接成一个字符串,并用换行符分隔 text = '\n'.join(words) # 将文本写入txt文件 with open("checkDocumentError.txt", 'w', encoding='utf-8') as txt_file: txt_file.write(text) def getDocumentError(filename): yield f"文档纠错---开始处理文档..." try: getDocxToTextAll(filename) except Exception as e: logger.warning(e) yield "文档无法打开,请检查文档内容" return with open("checkDocumentError.txt", "r", encoding='utf-8') as f: gettext = f.read() yield f"文档纠错---开始解析文档..." # 每次生成一个数字就发送 final_list = [] for item in documentErrorTask(gettext): if isinstance(item, str): yield item else: final_list = item # 获取最终结果 resInfo = "发现错别字
" if (len(final_list) > 0): for i in final_list: yuanwen = i["placeName"].replace("\n", "") jianyi = i["jianyi"].replace("\n", "") resInfo += "原文:" + yuanwen + "
建议:**" + jianyi + "**
" yield resInfo logger.info(resInfo) else: yield "**未发现错别字**" def documentErrorTask(text): """ 分批读取文本文件 :param file_path: 文件路径 :param batch_size: 每批处理的字符数 :return: 生成器,每次返回一批文本 """ yield "文档纠错---启动中...." corrector = Taskflow("text_correction", device_id=1) batchNum = 20 sentences = re.split(r'[。\n]', text) # 去掉空字符 sentences = [sentence.strip() for sentence in sentences if sentence.strip()] # 计算总字符数 total_chars = len(sentences) # 计算有多少份 num_chunks = math.ceil(total_chars / batchNum) # 按batchNum字为一份进行处理 chunks = [sentences[i:i + batchNum] for i in range(0, total_chars, batchNum)] placeList = [] # 打印每一份的内容 err = [] for i, chunk in enumerate(chunks): yield f"文档纠错---文档解析进度:{i + 1}/{num_chunks}" try: res = corrector(chunk) except Exception as e: logger.warning(chunk) logger.warning("文档纠错--错别字识别出错\n", e) continue lines_with_greeting = [place for place in res if len(place['errors']) > 0] if len(lines_with_greeting) > 0: num = 0 wenti = [] # 记录问题的数组 keyword_list = [] # 记录问题 for t in lines_with_greeting: temp_errorWords = [] keyword = t['source'] keyword_list.append(keyword) for item in t["errors"]: for key, value in item['correction'].items(): temp_errorWords.append(key) wenti.append( "{}、原文:{}。问题:【{}】这些字是否为当前原文的错别字".format(num, keyword, ",".join(temp_errorWords))) num += 1 words = "\n".join(wenti) messages = [{'role': 'user', 'content': [{'text': words + prompt}]}] runList = [] yield f"文档纠错---内容解析中..." # 每次生成一个数字就发送 cishu = 0 for rsp in bot.run(messages): runList.append(rsp) if cishu > 3: cishu = 0 yield "文档纠错---内容解析中" + '.' * cishu cishu += 1 data = runList[len(runList) - 1][0]["content"] parsed_data = json_repair.loads(data.replace("\\", "").replace('`', '')) resListerr = [] for place in parsed_data: try: if place['回答'] == '是': place["placeName"] = keyword_list[int(place["placeName"])] place["jianyi"] = place["解析"] resListerr.append(place) except Exception as e: logger.warning(parsed_data) logger.warning(place) logger.warning("文档纠错--错别字提取出错\n", e) continue if (len(resListerr) > 0): err.extend(resListerr) # 打印总份数 yield "文档地名检查---文档解析完成" yield err