import uuid from langchain_chroma import Chroma from langchain_community.embeddings import DashScopeEmbeddings from langchain_community.document_loaders import TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from paddlenlp import Taskflow similarity = Taskflow("text_similarity" , truncation=True,max_length=102400) embeddings = DashScopeEmbeddings(dashscope_api_key="sk-ea89cf04431645b185990b8af8c9bb13") vector_store_path="vector_store" vectorstore = Chroma(persist_directory=vector_store_path, embedding_function=embeddings) import re import time from docx import Document # 记录程序开始的时间戳 def getOutlineLevel(inputXml): """ 功能 从xml字段中提取出中的数字number 参数 inputXml 返回 number """ start_index = inputXml.find('', start_index) number = inputXml[start_index:end_index + 1] number = re.search("\d+", number).group() return number def isTitle(paragraph): """ 功能 判断该段落是否设置了大纲等级 参数 paragraph:段落 返回 None:普通正文,没有大纲级别 0:一级标题 1:二级标题 2:三级标题 """ # 如果是空行,直接返回None if paragraph.text.strip() == '': return None # 如果该段落是直接在段落里设置大纲级别的,根据xml判断大纲级别 paragraphXml = paragraph._p.xml if paragraphXml.find('= 0: return getOutlineLevel(paragraphXml) # 如果该段落是通过样式设置大纲级别的,逐级检索样式及其父样式,判断大纲级别 targetStyle = paragraph.style while targetStyle is not None: # 如果在该级style中找到了大纲级别,返回 if targetStyle.element.xml.find('= 0: return getOutlineLevel(targetStyle.element.xml) else: targetStyle = targetStyle.base_style # 如果在段落、样式里都没有找到大纲级别,返回None return None #获取文档中 详细设计方案 章节的所有内容 def getDocxToText(docxPath,titleName): document = Document(docxPath) # 逐段读取docx文档的内容 levelList=[] words=[] addStart = False levelText="" i = 0 for paragraph in document.paragraphs: # 判断该段落的标题级别 # 这里用isTitle()临时代表,具体见下文介绍的方法 text = paragraph.text if text.strip():#非空判断 print("非空") if titleName: level = isTitle(paragraph) if(addStart and level=="0"): addStart=False if(level=="0" and text.find(titleName)>=0): addStart=True if level: levelList.append("{}:".format(level)+paragraph.text) levelText=text else: if addStart: if(text.startswith("图") or text.startswith("注:")): continue i=i+1 words.append("第{}个段落:".format(i)+text) else: words.append(text) # 将所有段落文本拼接成一个字符串,并用换行符分隔 print("checkRepeatText",len(words)) if len(words)==0: raise Exception("I know python!") text = '\n'.join(words) # 将文本写入txt文件 with open("checkRepeatText.txt", 'w', ) as txt_file: txt_file.write(text) time.sleep(3) loader = TextLoader(file_path='checkRepeatText.txt') docs = loader.load() # print(docs) text_splitter = RecursiveCharacterTextSplitter(chunk_size=50, chunk_overlap=10, add_start_index=True, separators=["\n\n", "\n"]) splits = text_splitter.split_documents(docs) uuids = [] print(len(splits)) for i in range(len(splits)): uuids.append(str(uuid.uuid4())) print(len(uuids)) vectorstore = Chroma(persist_directory=vector_store_path, embedding_function=embeddings) vectorstore.add_documents(documents=splits, ids=uuids) while True: time.sleep(0.3) ress = vectorstore.similarity_search(words[0]) if (len(ress) > 0): break return words,uuids # @app.route('/checkRepeatText/', methods=['GET']) def checkRepeatText(filename,titleName): words,uuids=getDocxToText(filename,titleName) try: # 记录程序开始的时间戳‘ reslist = [] count = 0 for i in words: count += 1 result = vectorstore.similarity_search(i) textTag = i.split(":")[0] print(i) for content in result: text = content.page_content tag = text.split(":")[0].replace('\n', '') if (textTag.find(tag) >= 0): continue res = similarity([[i[i.find(':') + 1:], text[text.find(':') + 1:]]]) print(res[0]["similarity"]) if (res[0]["similarity"] > 0.95): # 判断重复内容是否被放入 if (len(reslist) > 0): isExist = False for neirong in reslist: if i[i.find(':') + 1:] in neirong.values(): isExist = True break if not isExist: reslist.append({"yuanwen1":i[i.find(':') + 1:],"yuanwen2":text[text.find(':') + 1:]}) print(reslist) else: reslist.append({"yuanwen1":i[i.find(':') + 1:],"yuanwen2":text[text.find(':') + 1:]}) print(i.split(":")[1] + "\n" + text.split(":")[1]) except Exception as e: print("发生异常:",e) finally: # if(count>=300): # break vectorstore.delete(ids=uuids) print("已删除") print(reslist) return reslist