《通义千问AI落地—上》:后端接口
/**
* 历史对话记录; sessionId---> 历史记录
*/
private static final ConcurrentHashMap<String, List<Message>> history = new ConcurrentHashMap<>();
@Override
public void chat(ChatMessageRequest msg, Principal principal) throws NoApiKeyException, InputRequiredException {
String sessionId = msg.getSessionId();
//用户发送的消息入库
CompletableFuture.runAsync(() -> {
saveMsg(msg.getContent(), sessionId, Role.USER, getLocalDate());
});
Message message = Message.builder().role(Role.USER.getValue()).content(msg.getContent()).build();
// 创建QwenParam对象,设置参数
GenerationParam param = GenerationParam.builder()
.model(module) // 模型版本 qwen-max
.messages(getHistory(sessionId)) // 消息内容,如果需要启用多伦连续对话的话,就把用户历史消息以及GPT回复的消息一起放进去
.resultFormat(GenerationParam.ResultFormat.MESSAGE)
.topP(0.8)
.enableSearch(true)
.apiKey(apiKey) // 你的apiKey,需要到阿里云百炼官网申请
.incrementalOutput(true)
.build();
// 调用生成接口,获取Flowable对象
Flux<GenerationResult> result = Flux.from(gen.streamCall(param));
StringBuffer builder = new StringBuffer();
DateTime finalLocalTime = getLocalDate();
Flux.from(result)
// 控制发送频率
.delayElements(Duration.ofMillis(200)).doOnNext(res -> {
String output = res.getOutput().getChoices().get(0).getMessage().getContent();
if (output == null || "".equals(output)) {
return;
}
// 将生成的消息通过websocket发送给前端,websocket内容将在下篇文章介绍
sendMsg(output, sessionId, principal);
builder.append(output);
}).doFinally(signalType -> {
//消息发送结束,告诉前端
sendMsg("!$$---END---$$!", sessionId, principal);
//消息入库
CompletableFuture.runAsync(() -> {
saveMsg(builder.toString(), sessionId, Role.ASSISTANT, finalLocalTime);
buildHistory(sessionId,
Message.builder().role(Role.ASSISTANT.getValue()).content(builder.toString()));
});
}).onErrorResume(str -> {
if (str instanceof ApiException) {
ApiException exception = (ApiException) str;
log.error("接口调用出现错误:{}", exception.getMessage());
}
sendMsg("GPT接口调用出现错误,该功能暂时无法使用,敬请期待.", sessionId, principal);
return Mono.empty();
}).subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池中执行
.subscribe();
}
/**
* 每日凌晨自动清理历史对话缓存,防止缓存过大
*/
@Scheduled(cron = "0 59 23 * * ?")
private void autoCleanHistory() {
history.clear();
}
/**
* 构建历史消息
*/
private void buildHistory(String sessionId, MessageBuilder<?, ?> message) {
List<Message> historyMessages = history.computeIfAbsent(sessionId, k -> {
List<ChatMessageVO> list = sessionService.getById(sessionId).getMessages();
List<Message> getMsgList = new ArrayList<>();
if (list.isEmpty()) return getMsgList;
MessageBuilder<?, ?> msg = Message.builder();
//只取后面60条,历史消息太多,一是过快消耗token,二是压力太大
list.subList(Math.max(0, list.size() - 60), list.size()).forEach(item -> {
if (!"".equals(item.getContent())) {
msg.content(item.getContent()).role(item.getRole()).build();
getMsgList.add(msg.build());
}
});
return getMsgList;
});
// 添加消息到列表
historyMessages.add(message.build());
history.remove(sessionId);
history.put(sessionId, historyMessages);
}
private List<Message> getHistory(String sessionId) {
List<Message> list = history.get(sessionId);
if (list == null || list.isEmpty()) {
return new ArrayList<>();
}
list.removeIf(item -> ("".equals(item.getContent())));
List<Message> hist = list.subList(Math.max(0, list.size() - 80), list.size());
history.remove(sessionId);
history.put(sessionId, hist);
return hist;
}