import uuid from langchain_chroma import Chroma from langchain_community.embeddings import DashScopeEmbeddings from langchain_community.document_loaders import TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from qwen_agent.agents import Assistant import json_repair import json embeddings = DashScopeEmbeddings(dashscope_api_key="sk-ea89cf04431645b185990b8af8c9bb13") # embeddings = HuggingFaceEmbeddings(model_name="shibing624/text2vec-base-chinese",model_kwargs={"device":"npu:5"}) device_id=0 import re import time from docx import Document import shutil from docx.opc.pkgreader import _SerializedRelationships, _SerializedRelationship from docx.opc.oxml import parse_xml import logging import logging.config import requests # from myLogger import outLog # outLog.logger = logging.getLogger("checkRepeatText") userLog=None def load_from_xml_v2(baseURI, rels_item_xml): """ Return |_SerializedRelationships| instance loaded with the relationships contained in *rels_item_xml*. Returns an empty collection if *rels_item_xml* is |None|. """ srels = _SerializedRelationships() if rels_item_xml is not None: rels_elm = parse_xml(rels_item_xml) for rel_elm in rels_elm.Relationship_lst: if rel_elm.target_ref in ('../NULL', 'NULL'): continue srels._srels.append(_SerializedRelationship(baseURI, rel_elm)) return srels _SerializedRelationships.load_from_xml = load_from_xml_v2 # 记录程序开始的时间戳 def getOutlineLevel(inputXml): """ 功能 从xml字段中提取出中的数字number 参数 inputXml 返回 number """ start_index = inputXml.find('', start_index) number = inputXml[start_index:end_index + 1] number = re.search("\d+", number).group() return number def isTitle(paragraph): """ 功能 判断该段落是否设置了大纲等级 参数 paragraph:段落 返回 None:普通正文,没有大纲级别 0:一级标题 1:二级标题 2:三级标题 """ # 如果是空行,直接返回None if paragraph.text.strip() == '': return None # 如果该段落是直接在段落里设置大纲级别的,根据xml判断大纲级别 paragraphXml = paragraph._p.xml if paragraphXml.find('= 0: return getOutlineLevel(paragraphXml) # 如果该段落是通过样式设置大纲级别的,逐级检索样式及其父样式,判断大纲级别 targetStyle = paragraph.style while targetStyle is not None: # 如果在该级style中找到了大纲级别,返回 if targetStyle.element.xml.find('= 0: return getOutlineLevel(targetStyle.element.xml) else: targetStyle = targetStyle.base_style # 如果在段落、样式里都没有找到大纲级别,返回None return None #寻找标题名称 def findTitleName(docxPath): loopCount = 0 while True: loopCount+=1 if(loopCount>=60): raise Exception("文档读取超时,或文档存在问题无法读取") break try: document = Document(docxPath) break except Exception as e: time.sleep(1) pass # 逐段读取docx文档的内容 titleWords=[] firstTitle = 0 firstTitleName="" secondTitle = 0 sanjiTitle = 0 levelText="" count = 0 numid =0 wordContent={} total = len(document.paragraphs) addStart = False#是否重新添加 yield "文档相似性检查----文档内容解析中",str(count),str(total) for paragraph in document.paragraphs: count+=1 yield "文档相似性检查----文档内容解析中",str(count),str(total) # 判断该段落的标题级别 # 这里用isTitle()临时代表,具体见下文介绍的方法 text = paragraph.text if text.strip():#非空判断 level = isTitle(paragraph) if level=="0": firstTitle+=1 secondTitle = 0 if(text.find("附件")>=0): continue titleWords.append("一级标题:".format(firstTitle)+text) addStart=True firstTitleName=text elif level=="1": secondTitle+=1 sanjiTitle=0 # words.append("\t"+"{}.{}".format(firstTitle,secondTitle)+text) # titleWords.append("第{}章的二级标题:".format(firstTitle,firstTitle,secondTitle)+text) elif level=="2": sanjiTitle += 1 # words.append("\t"+"{}.{}".format(firstTitle,secondTitle)+text) # titleWords.append("第{}章的三级标题".format(firstTitle, secondTitle,firstTitle, secondTitle,sanjiTitle) + text) ##先判断是不是一级标题 if addStart: wordContent[firstTitleName]=[] addStart=False if level: levelText=f"{int(level)+1}级标题-"+text else: if(text.startswith("图") or text.startswith("注:")): continue if (len(text)>30 and firstTitleName): numid+=1 wordContent[firstTitleName].append("{}:".format(levelText)+text) findTitleName_llm_cfg = { #'model': 'qwen1.5-72b-chat', 'model':"qwen2-72b", 'model_server': 'http://127.0.0.1:1025/v1', # base_url, also known as api_base # 'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13', } yield '文档相似性检查----检查是否存在详细设计方案' findTitleName_bot = Assistant(llm=findTitleName_llm_cfg, name='Assistant', system_message='按照要求选择最合适的,是唯一的' ) prompt='''\n是文档的大纲,一级标题组成,哪一章存在与方案相关的内容 类似详细设计方案,详细服务方案,详细建设方案为最相关的,优先选择 类似设计方案,服务方案,建设方案为次相关,次级选择 类似方案是最后选择 按照这样的顺序选择最合适的 你只能从这两个答案中选择一个:{"name":"一级标题名称","answer":"存在"}或{"name":"","answer":"不存在"},不做过多的解释,严格按回答格式作答 ''' # print("\n".join(titleWords)+prompt) messages = [({'role': 'user', 'content': "\n".join(titleWords)+prompt})] runList=[] for rsp in findTitleName_bot.run(messages): runList.append(rsp) data = runList[len(runList) - 1][0]["content"] parsed_data = json_repair.loads(data.replace('`', '')) try: if(parsed_data["answer"]=="存在"): yield parsed_data["name"],wordContent else: yield "文档相似性检查----未找到与详细设计方案相关内容,无法进行相似性比较" except Exception as e: userLog.warning(e) userLog.warning(data) userLog.warning(parsed_data) yield "文档相似性检查----检查遇到问题,请联系管理员" #获取文档中 详细设计方案 章节的所有内容 # def getDocxToText(docxPath,titleName,vector_store_path): def getDocxToText(titleName,wordContent,vector_store_path): # loopCount = 0 # while True: # loopCount+=1 # if(loopCount>=15): # raise Exception("文档读取超时,或文档存在问题无法读取") # break # try: # document = Document(docxPath) # break # except Exception as e: # time.sleep(1) # pass # # 逐段读取docx文档的内容 # levelList=[] words=[] # addStart = False # levelText="" # i = 0 # count = 0 # total = len(document.paragraphs) # yield "文档相似性检查----文档内容解析中",count,total # for paragraph in document.paragraphs: # count+=1 # yield "文档相似性检查----文档内容解析中",count,total # # 判断该段落的标题级别 # # 这里用isTitle()临时代表,具体见下文介绍的方法 # text = paragraph.text # if text.strip():#非空判断 # if titleName: # level = isTitle(paragraph) # if(addStart and level=="0"): # addStart=False # if(level=="0" and (titleName.find(text)>=0 or text.find(titleName)>=0)): # addStart=True # if level: # levelList.append("{}:".format(level)+paragraph.text) # levelText=f"{int(level)+1}级标题-"+text # else: # if addStart: # if(text.startswith("图") or text.startswith("注:")): # continue # if(len(text)>30): # i=i+1 # words.append("{}:".format(levelText)+text) # 将所有段落文本拼接成一个字符串,并用换行符分隔 # 遍历字典,查找包含 "标题的" 的键 for key, value in wordContent.items(): if (titleName.find(key)>=0 or key.find(titleName)>=0): words.extend(value) # 将对应的值添加 if len(words)==0: raise Exception("checkRepeatText,获取长度为0") text = '\n'.join(words) userLog.info(f"文档相似性检查----需要处理的总数是{len(words)}") # 将文本写入txt文件 with open("checkRepeatText.txt", 'w', ) as txt_file: txt_file.write(text) time.sleep(1) yield "文档相似性检查----文档内容转换中",".","." loader = TextLoader(file_path='checkRepeatText.txt') docs = loader.load() # print(docs) text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10, add_start_index=True, separators=["\n\n", "\n"]) splits = text_splitter.split_documents(docs) uuids = [] yield "文档相似性检查----文档保存中",".","." global embeddings vectorstore = Chroma(persist_directory=vector_store_path, embedding_function=embeddings) for i in range(len(splits)): uuidStr=str(uuid.uuid4()) uuids.append(uuidStr) logging.info(f"checkRepeatTextuuidLen{len(uuids)}") vectorstore.add_documents(documents=splits, ids=uuids) yield "文档相似性检查----校验文档是否已经完成保存",".","." while True: time.sleep(0.3) ress = vectorstore.similarity_search(words[0]) if (len(ress) > 0): break yield words,uuids,vectorstore # @app.route('/checkRepeatText/', methods=['GET']) def checkRepeatText(filename,user_id,outLog): global userLog userLog=outLog.get_queue(user_id,"checkRepeatText") yield "文档相似性检查---启动中...." userLog.info("文档相似性检查---任务开始") vector_store_path="vector_store"+str(uuid.uuid4()) for titleName in findTitleName(filename): if(isinstance(titleName ,tuple)): if(len(titleName)==3): yield titleName[0]+titleName[1]+"/"+titleName[2] else: yield titleName if(isinstance(titleName ,tuple)): # try: yield "文档相似性检查----文档内容转换中" try: for words,uuids,vectorstore in getDocxToText(titleName[0],titleName[1],vector_store_path): if isinstance(words, str): yield words+uuids+vectorstore except Exception as e: yield f"文档相似性检查----文档内容获取失败,未找到**{titleName}**相关内容或文件无法正常打开。可以尝试用WORD或WPS打开文件,进行修复并另存,用另存的文件再做一次尝试。" userLog.warning(e) userLog.warning(f"文档相似性检查----文档内容获取失败,未找到**{titleName}**相关内容或文档打开失败") outLog.mark_done(user_id, "checkRepeatText") return # 记录程序开始的时间戳‘ reslist = [] count = 0 for i in words: count += 1 yield f"文档相似性检查--对{titleName[0]}章节,进行文档内容检查中{count}/{len(words)}" result = vectorstore.similarity_search(i) textTag = i.split(":")[0] for content in result: text = content.page_content tag = text.split(":")[0].replace('\n', '') if (textTag.find(tag) >= 0): continue try: url = "http://0.0.0.0:8192/taskflow/checkRepeatText" headers = {"Content-Type": "application/json"} data = { "data": { "text": [[i[i.find(':') + 1:], text[text.find(':') + 1:]]], } } r = requests.post(url=url, headers=headers, data=json.dumps(data)) res = json.loads(r.text) res=res["data"] # res = similarity([[i[i.find(':') + 1:], text[text.find(':') + 1:]]]) except Exception as e: userLog.warning("文档相似性检查--发生异常:") userLog.warning(e) userLog.warning(i) userLog.warning(text) continue if (res[0]["similarity"] >= 0.96): # 判断重复内容是否被放入 if (len(reslist) > 0): isExist = False for neirong in reslist: if i in neirong.values(): isExist = True break if not isExist: # reslist.append({"yuanwen1":i[i.find(':') + 1:],"yuanwen2":text[text.find(':') + 1:],"similarity":res[0]["similarity"]}) userLog.info("【在"+i[:i.find(':')].replace("\n","")+"下包含:"+i[i.find(':') + 1:].replace("\n","")+"
在"+text[:text.find(':')].replace("\n","")+"**下包含:"+text[text.find(':') + 1:].replace("\n","")+"
以上两段内容相似度:"+'{:.2f}'.format(res[0]["similarity"])+"】") reslist.append({"yuanwen1":i.replace("\n",""),"yuanwen2":text.replace("\n",""),"similarity":res[0]["similarity"]}) else: reslist.append({"yuanwen1":i.replace("\n",""),"yuanwen2":text.replace("\n",""),"similarity":res[0]["similarity"]}) # print(i.split(":")[1] + "\n" + text.split(":")[1]) userLog.info("【在"+i[:i.find(':')].replace("\n","")+"下包含:"+i[i.find(':') + 1:].replace("\n","")+"
在"+text[:text.find(':')].replace("\n","")+"**下包含:"+text[text.find(':') + 1:].replace("\n","")+"
以上两段内容相似度:"+'{:.2f}'.format(res[0]["similarity"])+"】") # vectorstore.delete(ids=uuids) shutil.rmtree(vector_store_path) resInfo=f"对{titleName[0]}章节,发现相似内容:
" if(len(reslist)>0): for res in reslist: resInfo+="【在**"+res["yuanwen1"][:res["yuanwen1"].find(':')]+"**下包含:"+res["yuanwen1"][res["yuanwen1"].find(':') + 1:]+"
在**"+res["yuanwen2"][:res["yuanwen2"].find(':')]+"**下包含:"+res["yuanwen2"][res["yuanwen2"].find(':') + 1:]+"
以上两段内容***相似度***:"+'{:.2f}'.format(res['similarity'])+"】
" yield resInfo else: yield "**未发现相似内容**" userLog.info("文档相似性检查----未发现相似内容**") outLog.mark_done(user_id, "checkRepeatText")