feat: introduce middleware rabbitmq and redis

This commit is contained in:
merlin
2025-12-11 10:37:03 +08:00
parent a1a23bec7a
commit 44133d3667
22 changed files with 404 additions and 42 deletions

View File

@@ -2,13 +2,19 @@ package xin.merlin.myplayerbackend.utils.websocket;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import xin.merlin.myplayerbackend.config.RabbitMQConfig;
import xin.merlin.myplayerbackend.service.OnlineStatusService;
@Slf4j
@@ -18,9 +24,16 @@ public class CustomWebSocketHandler extends TextWebSocketHandler {
private final WebSocketSessionManager webSocketSessionManager;
private final RabbitTemplate rabbitTemplate;
private final OnlineStatusService onlineStatusService;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Integer userId = (Integer) session.getAttributes().get("userId");
onlineStatusService.online(userId);
webSocketSessionManager.addSession(userId, session);
log.info("用户 {} 已连接", userId);
}
@@ -28,29 +41,42 @@ public class CustomWebSocketHandler extends TextWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
JSONObject msg = JSON.parseObject(payload);
String type = msg.getString("type");
switch (type) {
case "chat":
Integer toUser = msg.getInteger("toUser");
webSocketSessionManager.sendToUser(toUser, payload);
break;
rabbitTemplate.convertAndSend(RabbitMQConfig.WS_MESSAGE_QUEUE, payload);
case "broadcast":
webSocketSessionManager.broadcast(payload);
break;
case "signal":
webSocketSessionManager.sendToUser(msg.getInteger("toUser"), payload);
break;
}
// JSONObject msg = JSON.parseObject(payload);
// String type = msg.getString("type");
// switch (type) {
// case "chat":
// Integer toUser = msg.getInteger("toUser");
// webSocketSessionManager.sendToUser(toUser, payload);
// break;
//
// case "broadcast":
// webSocketSessionManager.broadcast(payload);
// break;
//
// case "signal":
// webSocketSessionManager.sendToUser(msg.getInteger("toUser"), payload);
// break;
//
// }
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
Integer userId = (Integer) session.getAttributes().get("userId");
onlineStatusService.offline(userId);
webSocketSessionManager.removeSession(userId);
log.info("用户 {} 已断开", userId);
}
@EventListener
public void onShutdown(ContextClosedEvent event) {
log.info("Shutting down... closing websocket sessions");
webSocketSessionManager.closeAll();
}
}

View File

@@ -0,0 +1,42 @@
package xin.merlin.myplayerbackend.utils.websocket;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.config.RabbitMQConfig;
import xin.merlin.myplayerbackend.utils.websocket.command.CommandDispatcher;
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketMessageConsumer {
private final WebSocketSessionManager sessionManager;
private final CommandDispatcher commandDispatcher;
@RabbitListener(queues = RabbitMQConfig.WS_MESSAGE_QUEUE)
public void onMessage(String json) {
try {
JSONObject msg = JSON.parseObject(json);
String to = msg.getString("to");
if (to == null || to.isEmpty()) {
// 这是广播 / 系统消息
sessionManager.broadcast(json);
} else {
// 单发消息
// sessionManager.sendToUser(Integer.valueOf(to), json);
commandDispatcher.dispatch(msg);
}
} catch (Exception e) {
log.info(e.getMessage());
}
}
}

View File

@@ -2,6 +2,7 @@ package xin.merlin.myplayerbackend.utils.websocket;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
@@ -37,5 +38,12 @@ public class WebSocketSessionManager {
}
}
public void closeAll() {
websocketSessions.forEach((uid, session) -> {
try {
session.close(CloseStatus.GOING_AWAY);
} catch (Exception ignored) {}
});
}
}

View File

@@ -0,0 +1,10 @@
package xin.merlin.myplayerbackend.utils.websocket.command;
import com.alibaba.fastjson2.JSONObject;
import java.io.IOException;
public interface BaseCommandHandler {
void handle(JSONObject msg) throws IOException;
}

View File

@@ -0,0 +1,34 @@
package xin.merlin.myplayerbackend.utils.websocket.command;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.command.impl.*;
import java.io.IOException;
@Component
@RequiredArgsConstructor
public class CommandDispatcher {
private final Ping pingCommand;
private final Message messageCommand;
private final Typing typingCommand;
private final Heartbeat heartbeatCommand;
private final SystemNotify systemNotifyCommand;
public void dispatch(JSONObject msg) throws IOException {
String cmd = msg.getString("cmd");
switch (cmd) {
case "PING" -> pingCommand.handle(msg);
case "MESSAGE" -> messageCommand.handle(msg);
case "TYPING" -> typingCommand.handle(msg);
case "HEARTBEAT" -> heartbeatCommand.handle(msg);
case "SYSTEM_NOTIFY" -> systemNotifyCommand.handle(msg);
default -> {
System.err.println("Unknown command: " + cmd);
}
}
}
}

View File

@@ -0,0 +1,22 @@
package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.time.Duration;
@Component
@RequiredArgsConstructor
public class Heartbeat implements BaseCommandHandler {
private final RedisTemplate<String, String> redis;
@Override
public void handle(JSONObject msg) {
String userId = msg.getString("from");
redis.opsForValue().set("online:" + userId, "1", Duration.ofMinutes(5));
}
}

View File

@@ -0,0 +1,23 @@
package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@Component
@RequiredArgsConstructor
public class Message implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
String to = msg.getString("to");
sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString());
}
}

View File

@@ -0,0 +1,24 @@
package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@Component
@RequiredArgsConstructor
public class Ping implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
String from = msg.getString("from");
msg.put("cmd", "PONG");
sessionManager.sendToUser(Integer.valueOf(from), msg.toJSONString());
}
}

View File

@@ -0,0 +1,22 @@
package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@Component
@RequiredArgsConstructor
public class SystemNotify implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
sessionManager.broadcast(msg.toJSONString());
}
}

View File

@@ -0,0 +1,22 @@
package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@Component
@RequiredArgsConstructor
public class Typing implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
String to = msg.getString("to");
sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString());
}
}