From 62d42395ba903b2f43be5a5f1d16ce18a5054fbb Mon Sep 17 00:00:00 2001 From: zhouhaibin Date: Mon, 23 Sep 2024 10:35:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=A4=A7=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=E5=BA=94=E7=94=A8=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/sse/listener/SSEListener.java | 122 +++++++++ .../controller/Qwen72bController.java | 249 ++++++++++++++++-- .../job/Qwen72bJobExecutor.java | 27 ++ 3 files changed, 370 insertions(+), 28 deletions(-) create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SSEListener.java create mode 100644 zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/job/Qwen72bJobExecutor.java diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SSEListener.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SSEListener.java new file mode 100644 index 0000000..7fc623a --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SSEListener.java @@ -0,0 +1,122 @@ +package org.dromara.common.sse.listener; + +import jakarta.servlet.http.HttpServletResponse; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import okhttp3.Response; +import okhttp3.sse.EventSource; +import okhttp3.sse.EventSourceListener; +import com.alibaba.fastjson.JSON; +import org.dromara.common.sse.dto.ChatGlmDto; + +import java.util.concurrent.CountDownLatch; + +@Slf4j +@Data +public class SSEListener extends EventSourceListener { + + private CountDownLatch countDownLatch = new CountDownLatch(1); + + private ChatGlmDto chatGlmDto; + + private HttpServletResponse rp; + + private StringBuffer output = new StringBuffer(); + + public SSEListener(ChatGlmDto chatGlmDto, HttpServletResponse response) { + this.chatGlmDto = chatGlmDto; + this.rp = response; + } + + /** + * {@inheritDoc} + * 建立sse连接 + */ +// @Override + public void onOpen(final EventSource eventSource, final Response + response) { + if (rp != null) { + rp.setContentType("text/event-stream"); + rp.setCharacterEncoding("UTF-8"); + rp.setStatus(200); + log.info("建立sse连接..." + JSON.toJSONString(chatGlmDto)); + } else { + log.info("客户端非sse推送" + JSON.toJSONString(chatGlmDto)); + } + } + + /** + * 事件 + * + * @param eventSource + * @param id + * @param type + * @param data + */ + @Override + public void onEvent(EventSource eventSource, String id, String type, String data) { + try { + output.append(data); + if ("finish".equals(type)) { + log.info("请求结束{} {}", chatGlmDto.getMessageId(), output.toString()); + } + if ("error".equals(type)) { + log.info("{}: {}source {}", chatGlmDto.getMessageId(), data, JSON.toJSONString(chatGlmDto)); + } + if (rp != null) { + if ("\n".equals(data)) { + rp.getWriter().write("event:" + type + "\n"); + rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n"); + rp.getWriter().write("data:\n\n"); + rp.getWriter().flush(); + } else { + String[] dataArr = data.split("\\n"); + for (int i = 0; i < dataArr.length; i++) { + if (i == 0) { + rp.getWriter().write("event:" + type + "\n"); + rp.getWriter().write("id:" + chatGlmDto.getMessageId() + "\n"); + } + if (i == dataArr.length - 1) { + rp.getWriter().write("data:" + dataArr[i] + "\n\n"); + rp.getWriter().flush(); + } else { + rp.getWriter().write("data:" + dataArr[i] + "\n"); + rp.getWriter().flush(); + } + } + } + + } + } catch (Exception e) { + log.error("消息错误[" + JSON.toJSONString(chatGlmDto) + "]", e); + countDownLatch.countDown(); + throw new RuntimeException(e); + } + + } + + /** + * {@inheritDoc} + */ + @Override + public void onClosed(final EventSource eventSource) { + log.info("sse连接关闭:{}", chatGlmDto.getMessageId()); + log.info("结果输出:{}" + output.toString()); + countDownLatch.countDown(); + } + + /** + * {@inheritDoc} + */ + @Override + public void onFailure(final EventSource eventSource, final Throwable t, final Response response) { + log.error("使用事件源时出现异常... [响应:{}]...", chatGlmDto.getMessageId()); + countDownLatch.countDown(); + } + + public CountDownLatch getCountDownLatch() { + return this.countDownLatch; + } + +} + diff --git a/zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/controller/Qwen72bController.java b/zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/controller/Qwen72bController.java index 9d1e604..060ec75 100644 --- a/zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/controller/Qwen72bController.java +++ b/zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/controller/Qwen72bController.java @@ -10,15 +10,25 @@ import java.util.concurrent.TimeUnit; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.IdUtil; +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.servlet.http.HttpServletResponse; import lombok.RequiredArgsConstructor; import cn.dev33.satoken.annotation.SaCheckPermission; import okhttp3.*; import okhttp3.RequestBody; import org.dromara.common.core.utils.StringUtils; +import org.dromara.common.sse.dto.ChatGlmDto; +import org.dromara.common.sse.listener.SSEListener; +import org.dromara.common.sse.utils.ExecuteSSEUtil; +import org.dromara.system.domain.vo.SysOssVo; +import org.dromara.system.service.ISysOssService; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import org.springframework.validation.annotation.Validated; @@ -26,7 +36,7 @@ import org.springframework.validation.annotation.Validated; import org.dromara.common.web.core.BaseController; import org.dromara.common.core.domain.R; -import org.dromara.productManagement.service.IPmgSupplierInformationService; +import com.alibaba.fastjson.JSON; import org.springframework.web.multipart.MultipartFile; @@ -41,6 +51,11 @@ import org.springframework.web.multipart.MultipartFile; @RestController @RequestMapping("/productManagement/supplierInformation") public class Qwen72bController extends BaseController { + @Value("${chat.filePath}") + private String chatFilePath; + @Value("${chat.chatUrl}") + private String chatUrl; + private final ISysOssService ossService; private final OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(300, TimeUnit.SECONDS)//连接超时(单位:秒) @@ -52,30 +67,8 @@ public class Qwen72bController extends BaseController { @SaCheckPermission("productManagement:supplierInformation:list") @PostMapping(value = "/importData", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) public R list111(@RequestPart("file") MultipartFile file,String content) throws IOException, ExecutionException, InterruptedException { - String destDir = "D:\\python项目39"; - System.out.println(content); - String originalFilename = file.getOriginalFilename(); - if (originalFilename == null) { - throw new IllegalArgumentException("文件名不能为空"); - } - - // 获取文件的后缀 - String suffix = FileUtil.getSuffix(originalFilename); - - // 生成唯一的文件名 - String name = IdUtil.fastSimpleUUID() + "." + suffix; - - // 构建目标文件路径 - String destPath = destDir + File.separator + name; - - // 创建目标文件 - File destFile = new File(destPath); - FileUtil.writeFromStream(file.getInputStream(), destFile); - if(!sendRequest("http://183.136.156.2:50000/upload", destPath)){ - return R.fail("上传失败"); - } - String fileName="uploads/"+name; // 使用Hutool的FileUtil将MultipartFile保存到目标文件 + String fileName= uploadFile(file); // // String result = ""; // String infozong = ""; @@ -224,7 +217,7 @@ public class Qwen72bController extends BaseController { // } // }); // String strings = completableFuture.get(); - CompletableFuture checkPlaceNameData = fetchDataAsync("http://183.136.156.2:50000/checkPlaceName?filename=" + fileName) + CompletableFuture checkPlaceNameData = fetchDataAsync(chatUrl+"/checkPlaceName?filename=" + fileName) .thenApply(data -> { List entity = null; if (StringUtils.isNotBlank(data)) { @@ -248,7 +241,7 @@ public class Qwen72bController extends BaseController { }); CompletableFuture checkCompanyNameData = checkPlaceNameData - .thenCompose(result -> fetchDataAsync("http://183.136.156.2:50000/checkCompanyName?filename=" + fileName)) + .thenCompose(result -> fetchDataAsync(chatUrl+"/checkCompanyName?filename=" + fileName)) .thenApply(data -> { List entity = null; if (StringUtils.isNotBlank(data)) { @@ -272,7 +265,7 @@ public class Qwen72bController extends BaseController { }); CompletableFuture getDocumentErrorData = checkCompanyNameData - .thenCompose(result -> fetchDataAsync("http://183.136.156.2:50000/getDocumentError?filename=" + fileName)) + .thenCompose(result -> fetchDataAsync(chatUrl+"/getDocumentError?filename=" + fileName)) .thenApply(data -> { List entity = null; if (StringUtils.isNotBlank(data)) { @@ -295,7 +288,7 @@ public class Qwen72bController extends BaseController { return info; }); CompletableFuture checkRepeatTextData = getDocumentErrorData - .thenCompose(result -> fetchDataAsync("http://183.136.156.2:50000/getDocumentError?filename=" + fileName+"§ionName="+content)) + .thenCompose(result -> fetchDataAsync(chatUrl+"/getDocumentError?filename=" + fileName+"§ionName="+content)) .thenApply(data -> { // 对第二个接口的数据进行处理 List entity =null; @@ -363,6 +356,201 @@ public class Qwen72bController extends BaseController { return future; } + + @PostMapping(value = "/sse-invoke", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void sse(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("content") String context, HttpServletResponse rp) { + try { +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/stream?context="+context, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkPlaceName", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkPlaceName(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("content") String context, HttpServletResponse rp) { + try { + String fileName = uploadFile(fileInfo); +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + //http://183.136.156.2:50000/see/checkPlaceName?filename="+fileName + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkPlaceName?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkRepeatText", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkRepeatText(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("content") String context, HttpServletResponse rp) { + try { + String fileName = uploadFile(fileInfo); +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkRepeatText?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkCompanyName", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkCompanyName(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("content") String context, HttpServletResponse rp) { + try { + String fileName = uploadFile(fileInfo); +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkCompanyName?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkDocumentError", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkDocumentError(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("content") String context, HttpServletResponse rp) { + try { + String fileName = uploadFile(fileInfo); +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkDocumentErrorWeb?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkTitleName", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkTitleName(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("content") String context, HttpServletResponse rp) { + try { + String fileName = uploadFile(fileInfo); +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkTitleName?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkDocumentAll/checkPlaceName", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkDocumentAllCheckPlaceName(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("uuid") String uuid, HttpServletResponse rp) { + try { + String fileName = uploadFile(fileInfo,uuid); +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + //http://183.136.156.2:50000/see/checkPlaceName?filename="+fileName + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkPlaceName?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkDocumentAll/checkRepeatText", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkDocumentAllCheckRepeatText(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("uuid") String uuid, HttpServletResponse rp) { + try { + String fileName = "uploads/"+uuid; +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkRepeatText?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkDocumentAll/checkCompanyName", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkDocumentAllCheckCompanyName(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("uuid") String uuid, HttpServletResponse rp) { + try { + String fileName = "uploads/"+uuid; +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkCompanyName?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkDocumentAll/checkDocumentError", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkDocumentAllCcheckDocumentError(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("uuid") String uuid, HttpServletResponse rp) { + try { + String fileName = "uploads/"+uuid; +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkDocumentErrorWeb?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + @PostMapping(value = "/sse/checkDocumentAll/checkTitleName", consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = "text/event-stream;charset=UTF-8") + public void checkDocumentAllCheckTitleName(@RequestPart(name = "fileInfo", required = false) MultipartFile fileInfo, @RequestPart("uuid") String uuid, HttpServletResponse rp) { + try { + String fileName = "uploads/"+uuid; +// String token = ApiTokenUtil.generateClientToken(API_KEY); + ChatGlmDto chatGlmDto = new ChatGlmDto(); + SSEListener sseListener = new SSEListener(chatGlmDto, rp); + ExecuteSSEUtil.executeSSE(chatUrl+"/sse/checkTitleName?filename="+fileName, sseListener, JSON.toJSONString(chatGlmDto)); + } catch (Exception e) { + e.printStackTrace(); + } + } + public String uploadFile(MultipartFile file) throws IOException { +// String destDir = chatFilePath; +// String originalFilename = file.getOriginalFilename(); +// if (originalFilename == null) { +// throw new IllegalArgumentException("文件名不能为空"); +// } +// +// // 获取文件的后缀 +// String suffix = FileUtil.getSuffix(originalFilename); +// +// // 生成唯一的文件名 +// String name = IdUtil.fastSimpleUUID() + "." + suffix; +// +// // 构建目标文件路径 +// String destPath = destDir + File.separator + name; +// +// // 创建目标文件 +// File destFile = new File(destPath); +// FileUtil.writeFromStream(file.getInputStream(), destFile); +// if(!sendRequest(chatUrl+"/upload", destPath)){ +// throw new RuntimeException("上传文件失败"); +// } + SysOssVo oss = ossService.upload(file); + String fileName = oss.getOriginalName(); + + RequestBody requestBody = new MultipartBody.Builder() + .setType(MultipartBody.FORM) + .addFormDataPart("file", fileName, + RequestBody.create(okhttp3.MediaType.parse(MediaType.APPLICATION_OCTET_STREAM_VALUE), file.getBytes())) + .build(); + Request request = new Request.Builder() + .url(chatUrl+"/upload") // 替换为你的Flask服务器地址 + .post(requestBody) + .build(); + Response response = client.newCall(request).execute(); + return "uploads/"+fileName; + } + public String uploadFile(MultipartFile file,String uuid) throws IOException { + String destDir = chatFilePath; + String originalFilename = file.getOriginalFilename(); + if (originalFilename == null) { + throw new IllegalArgumentException("文件名不能为空"); + } + + // 获取文件的后缀 + String suffix = FileUtil.getSuffix(originalFilename); + + // 生成唯一的文件名 + String name = uuid; + + // 构建目标文件路径 + String destPath = destDir + File.separator + name; + + // 创建目标文件 + File destFile = new File(destPath); + FileUtil.writeFromStream(file.getInputStream(), destFile); + if(!sendRequest(chatUrl+"/upload", destPath)){ + throw new RuntimeException("上传文件失败"); + } + return "uploads/"+name; + } public boolean sendRequest(String url,String filePath) { // 替换为你要上传的文件路径 File file = new File(filePath); @@ -388,4 +576,9 @@ public class Qwen72bController extends BaseController { } return false; } + @JobExecutor(name="ces") + public ExecuteResult jobExecute(JobArgs jobArgs) { + System.out.println(jobArgs.getJobParams()); + return ExecuteResult.success("测试成功aaaaaaa"); + } } diff --git a/zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/job/Qwen72bJobExecutor.java b/zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/job/Qwen72bJobExecutor.java new file mode 100644 index 0000000..12f3839 --- /dev/null +++ b/zaojiaManagement/zaojia-productManagement/src/main/java/org/dromara/productManagement/job/Qwen72bJobExecutor.java @@ -0,0 +1,27 @@ +package org.dromara.productManagement.job; + +import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.springframework.stereotype.Component; + +@Component +public class Qwen72bJobExecutor { + @JobExecutor(name = "checkRepeatTextJobExecute") + + public ExecuteResult checkRepeatTextJobExecute(JobArgs jobArgs) { + SnailJobLog.LOCAL.info("testJobExecutor. jobArgs:{}", JsonUtil.toJsonString(jobArgs)); + SnailJobLog.REMOTE.info("testJobExecutor. jobArgs:{}", JsonUtil.toJsonString(jobArgs)); + + return ExecuteResult.success("测试成功"); + } +} + + +/** + * @author opensnail + * @date 2024-05-17 + */ +