You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
6.3 KiB

5 months ago
from docx import Document
from pprint import pprint
from qwen_agent.agents import Assistant
import re
import json_repair
import math
from docx.opc.pkgreader import _SerializedRelationships, _SerializedRelationship
from docx.opc.oxml import parse_xml
def load_from_xml_v2(baseURI, rels_item_xml):
"""
Return |_SerializedRelationships| instance loaded with the
relationships contained in *rels_item_xml*. Returns an empty
collection if *rels_item_xml* is |None|.
"""
srels = _SerializedRelationships()
if rels_item_xml is not None:
rels_elm = parse_xml(rels_item_xml)
for rel_elm in rels_elm.Relationship_lst:
if rel_elm.target_ref in ('../NULL', 'NULL'):
continue
srels._srels.append(_SerializedRelationship(baseURI, rel_elm))
return srels
_SerializedRelationships.load_from_xml = load_from_xml_v2
llm_cfg = {
#'model': 'qwen1.5-72b-chat',
'model':"qwen2-72b-instruct",
'model_server': 'DashScope', # base_url, also known as api_base
'api_key': 'sk-ea89cf04431645b185990b8af8c9bb13',
}
bot = Assistant(llm=llm_cfg,
name='Assistant',
)
# 记录程序开始的时间戳
def getOutlineLevel(inputXml):
"""
功能 从xml字段中提取出<w:outlineLvl w:val="number"/>中的数字number
参数 inputXml
返回 number
"""
start_index = inputXml.find('<w:outlineLvl')
end_index = inputXml.find('>', start_index)
number = inputXml[start_index:end_index + 1]
number = re.search("\d+", number).group()
return number
def isTitle(paragraph):
"""
功能 判断该段落是否设置了大纲等级
参数 paragraph:段落
返回 None:普通正文没有大纲级别 0:一级标题 1:二级标题 2:三级标题
"""
# 如果是空行,直接返回None
if paragraph.text.strip() == '':
return None
# 如果该段落是直接在段落里设置大纲级别的,根据xml判断大纲级别
paragraphXml = paragraph._p.xml
if paragraphXml.find('<w:outlineLvl') >= 0:
return getOutlineLevel(paragraphXml)
# 如果该段落是通过样式设置大纲级别的,逐级检索样式及其父样式,判断大纲级别
targetStyle = paragraph.style
while targetStyle is not None:
# 如果在该级style中找到了大纲级别,返回
if targetStyle.element.xml.find('<w:outlineLvl') >= 0:
return getOutlineLevel(targetStyle.element.xml)
else:
targetStyle = targetStyle.base_style
# 如果在段落、样式里都没有找到大纲级别,返回None
return None
#获取文档中 详细设计方案 章节的所有内容
def getDocxToTitleName(docxPath):
document = Document(docxPath)
# 逐段读取docx文档的内容
levelList=[]
words=[]
addStart = False
levelText=""
i = 0
for paragraph in document.paragraphs:
# 判断该段落的标题级别
# 这里用isTitle()临时代表,具体见下文介绍的方法
text = paragraph.text
if text.strip():#非空判断
level = isTitle(paragraph)
if level=="0":
words.append(text)
return words
def checkTitleName(filename):
prompt = f'''
\n 这些是文章的标题请问{text}在标题中是否可以配对的若有请指出是哪个标题若没有请回到不存在
'''
xushang = "回答格式{‘name’:‘名称’,'answer':‘回答’,“标题”:“标题”}请严格按照格式回答问题,不要做过多我解释"
yield f"文档结构检查----结构分析中{count}/{len(gettext)}"
strword = "\n".join(word) + prompt + xushang
# print(strword)
messages = [{'role': 'user', 'content': [{'text': strword}]}]
runList = []
cishu = 0
for rsp in bot.run(messages):
runList.append(rsp)
# print(rsp)
data = runList[len(runList) - 1][0]["content"]
parsed_data = json_repair.loads(data.replace('`', ''))
print(parsed_data)
# yield '文档结构检查----启动中'
# with open("ce模板.txt", "r",encoding='utf-8') as f:
# gettext = f.readlines()
# count=0
# reserr = []
# try:
# word = getDocxToTitleName(filename)
# except Exception as e:
# print(e)
# yield "文档无法打开,请检查文档内容"
# return
# for text in gettext:
# count+=1
# prompt = f'''
# \n 这些是文章的标题,请问【{text}】在标题中是否可以配对的,若有请指出是哪个标题,若没有请回到不存在
# '''
# xushang="回答格式{‘name’:‘名称’,'answer':‘回答’,“标题”:“标题”}请严格按照格式回答问题,不要做过多我解释"
# yield f"文档结构检查----结构分析中{count}/{len(gettext)}"
# strword = "\n".join(word)+prompt+xushang
# # print(strword)
# messages = [{'role': 'user', 'content': [{'text':strword}]}]
# runList = []
# cishu = 0
# for rsp in bot.run(messages):
# runList.append(rsp)
# # print(rsp)
# data = runList[len(runList) - 1][0]["content"]
# parsed_data = json_repair.loads(data.replace('`', ''))
# print(parsed_data)
# if(parsed_data["answer"]=="不存在"):
# reserr.append(text)
# resInfo="文档结构存在异常:<br>"
# if(len(reserr)>0):
# for i in reserr:
# resInfo+=f"**{i}**<br>"
# yield resInfo
# else:
# yield "文档结构未发现异常"
import logging
# 创建一个记录器
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
# 创建一个处理器
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 创建一个格式化器并将其添加到处理器中
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
# 将处理器添加到记录器中
logger.addHandler(ch)
try:
# 记录一些日志消息
logger.debug('这是一个调试消息')
logger.info('这是一个信息消息')
logger.warning('这是一个警告消息')
logger.error('这是一个错误消息')
logger.critical('这是一个致命错误消息')
except Exception as e:
logger.warning(e)