feat: add video websocket handler etc...
All checks were successful
dev middleware install / deploy (push) Successful in 12s

This commit is contained in:
2026-03-23 18:10:01 +08:00
parent 692f6d76c6
commit 9f22f33a9a
22 changed files with 258 additions and 27 deletions

View File

@@ -17,9 +17,9 @@ import xin.merlin.myplayerbackend.service.OnlineStatusService;
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomWebSocketHandler extends TextWebSocketHandler {
public class OnlineWebSocketHandler extends TextWebSocketHandler {
private final WebSocketSessionManager webSocketSessionManager;
private final OnlineWebSocketSessionManager sessionManager;
private final RabbitTemplate rabbitTemplate;
@@ -38,13 +38,14 @@ public class CustomWebSocketHandler extends TextWebSocketHandler {
onlineStatusService.online(userId);
webSocketSessionManager.addSession(userId, session);
sessionManager.addSession(userId, session);
log.info("用户 {} 已连接", userId);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
System.out.println(payload);
rabbitTemplate.convertAndSend(RabbitMQConfig.WS_MESSAGE_QUEUE, payload);
// JSONObject msg = JSON.parseObject(payload);
@@ -72,14 +73,14 @@ public class CustomWebSocketHandler extends TextWebSocketHandler {
onlineStatusService.offline(userId);
webSocketSessionManager.removeSession(userId);
sessionManager.removeSession(userId);
log.info("用户 {} 已断开", userId);
}
@EventListener
public void onShutdown(ContextClosedEvent event) {
log.info("Shutting down... closing websocket sessions");
webSocketSessionManager.closeAll();
sessionManager.closeAll();
}

View File

@@ -11,7 +11,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketSessionManager {
public class OnlineWebSocketSessionManager {
private static final Map<Integer, WebSocketSession> websocketSessions = new ConcurrentHashMap<>();

View File

@@ -0,0 +1,65 @@
package xin.merlin.myplayerbackend.utils.websocket;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import xin.merlin.myplayerbackend.config.RabbitMQConfig;
import java.time.LocalDateTime;
@Slf4j
@Component
@RequiredArgsConstructor
public class PlayroomWebSocketHandler extends TextWebSocketHandler {
private final PlayroomWebSocketSessionManager sessionManager;
private final RabbitTemplate rabbitTemplate;
private final StringRedisTemplate redis;
private final String IN_ROOM_BASE_KEY = "room:";
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Integer userId = (Integer) session.getAttributes().get("id");
String payload = "{\"cmd\": \"PING\", \"to\": "+ userId +", \"time\":\""+ LocalDateTime.now() +"\"}";
sessionManager.addSession(userId, session);
rabbitTemplate.convertAndSend(RabbitMQConfig.WS_VIDEO_QUEUE,payload);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
JSONObject payload = JSONObject.parseObject(message.getPayload());
Integer userId = (Integer) session.getAttributes().get("id");
if(payload.get("cmd").equals("PONG")) {
if(Boolean.TRUE.equals(redis.opsForSet().isMember(IN_ROOM_BASE_KEY + session.getAttributes().get("r_id") + ":", String.valueOf(userId)))){
return;
}
redis.opsForSet().add(IN_ROOM_BASE_KEY+session.getAttributes().get("r_id")+":", String.valueOf(userId));
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
Integer userId = (Integer) session.getAttributes().get("id");
redis.opsForSet().remove(IN_ROOM_BASE_KEY+session.getAttributes().get("r_id")+":", userId.toString());
sessionManager.removeSession(userId);
}
@EventListener
public void onShutdown(ContextClosedEvent event) {
log.info("Shutting down... closing websocket sessions");
sessionManager.closeAll();
}
}

View File

@@ -0,0 +1,47 @@
package xin.merlin.myplayerbackend.utils.websocket;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class PlayroomWebSocketSessionManager {
private static final Map<Integer, WebSocketSession> websocketSessions = new ConcurrentHashMap<>();
public void addSession(Integer id, WebSocketSession session) {websocketSessions.put(id, session);}
public WebSocketSession getSession(Integer id) {return websocketSessions.get(id);}
public void removeSession(Integer id) {websocketSessions.remove(id);}
public Map<Integer, WebSocketSession> getSessions() {return websocketSessions;}
public void sendToUser(Integer userId, String message) throws IOException {
WebSocketSession session = websocketSessions.get(userId);
if (session != null && session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
public void broadcast(String message) throws IOException {
for (WebSocketSession session : websocketSessions.values()) {
if (session.isOpen()) {
session.sendMessage(new TextMessage(message));
}
}
}
public void closeAll() {
websocketSessions.forEach((uid, session) -> {
try {
session.close(CloseStatus.GOING_AWAY);
} catch (Exception ignored) {}
});
}
}

View File

@@ -8,16 +8,19 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.config.RabbitMQConfig;
import xin.merlin.myplayerbackend.utils.websocket.command.CommandDispatcher;
import xin.merlin.myplayerbackend.utils.websocket.command.CommandDispatcherVideo;
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketMessageConsumer {
private final WebSocketSessionManager sessionManager;
private final OnlineWebSocketSessionManager sessionManager;
private final CommandDispatcher commandDispatcher;
private final CommandDispatcherVideo commandDispatcherVideo;
@RabbitListener(queues = RabbitMQConfig.WS_MESSAGE_QUEUE)
public void onMessage(String json) {
try {
@@ -33,8 +36,8 @@ public class WebSocketMessageConsumer {
public void onVideoMessage(String json) {
try {
JSONObject msg = JSON.parseObject(json);
commandDispatcher.dispatch(msg);
commandDispatcherVideo.dispatch(msg);
System.out.println("sending video message");
} catch (Exception e) {
log.info(e.getMessage());
}

View File

@@ -6,5 +6,7 @@ import java.io.IOException;
public interface BaseCommandHandler {
void handle(JSONObject msg) throws IOException;
void handle(String msg, Integer to) throws IOException;
}

View File

@@ -3,6 +3,7 @@ package xin.merlin.myplayerbackend.utils.websocket.command;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.OnlineWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.impl.*;
import java.io.IOException;
@@ -10,6 +11,8 @@ import java.io.IOException;
@Component
@RequiredArgsConstructor
public class CommandDispatcher {
private final OnlineWebSocketSessionManager sessionManager;
private final Ping pingCommand;
private final Message messageCommand;
private final Typing typingCommand;
@@ -31,7 +34,15 @@ public class CommandDispatcher {
case "GROUP_MESSAGE" -> groupMessageCommand.handle(msg);
default -> {
System.err.println("Unknown command: " + cmd);
// System.err.println("Unknown command: " + cmd);
// TODO: 这里应该使用default的一个类统一封装
try {
String to = msg.getString("to");
sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@@ -0,0 +1,34 @@
package xin.merlin.myplayerbackend.utils.websocket.command;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.PlayroomWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.impl.Ping;
import java.io.IOException;
@Component
@RequiredArgsConstructor
public class CommandDispatcherVideo {
private final Ping pingCommand;
public void dispatch(JSONObject msg) throws IOException {
String cmd = msg.getString("cmd");
Integer to = Integer.valueOf(msg.getString("to"));
String jsonString = msg.toJSONString();
switch (cmd){
case "PING" -> pingCommand.handle(jsonString,to);
default -> {
// System.err.println("Unknown command: " + cmd);
// TODO: 这里应该使用default的一个类统一封装
}
}
}
}

View File

@@ -6,7 +6,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.entity.Groups;
import xin.merlin.myplayerbackend.mapper.GroupsMapper;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.OnlineWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@@ -18,7 +18,7 @@ public class GroupMessage implements BaseCommandHandler {
private final GroupsMapper groupsMapper;
private final WebSocketSessionManager sessionManager;
private final OnlineWebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
@@ -44,4 +44,9 @@ public class GroupMessage implements BaseCommandHandler {
}
}
@Override
public void handle(String msg, Integer to) throws IOException {
}
}

View File

@@ -6,6 +6,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
import java.time.Duration;
@Component
@@ -19,4 +20,9 @@ public class Heartbeat implements BaseCommandHandler {
String userId = msg.getString("from");
redis.opsForValue().set("online:" + userId, "1", Duration.ofMinutes(5));
}
@Override
public void handle(String msg, Integer to) throws IOException {
}
}

View File

@@ -3,7 +3,7 @@ package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.OnlineWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@@ -12,7 +12,7 @@ import java.io.IOException;
@RequiredArgsConstructor
public class Message implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
private final OnlineWebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
@@ -22,5 +22,10 @@ TODO: 这里需要处理如果目标用户不在线,如何去完成消息的
String to = msg.getString("to");
sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString());
}
@Override
public void handle(String msg, Integer to) throws IOException {
}
}

View File

@@ -3,7 +3,7 @@ package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.OnlineWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@@ -12,11 +12,16 @@ import java.io.IOException;
@RequiredArgsConstructor
public class PersonalNotify implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
private final OnlineWebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
String to = msg.getString("to");
sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString());
}
@Override
public void handle(String msg, Integer to) throws IOException {
}
}

View File

@@ -3,7 +3,8 @@ package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.OnlineWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.PlayroomWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@@ -12,13 +13,21 @@ import java.io.IOException;
@RequiredArgsConstructor
public class Ping implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
private final OnlineWebSocketSessionManager onlineWebSocketSessionManager;
private final PlayroomWebSocketSessionManager playroomWebSocketSessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
String from = msg.getString("from");
msg.put("cmd", "PONG");
sessionManager.sendToUser(Integer.valueOf(from), msg.toJSONString());
onlineWebSocketSessionManager.sendToUser(Integer.valueOf(from), msg.toJSONString());
}
@Override
public void handle(String msg, Integer to) throws IOException {
System.out.println(to+":"+msg);
playroomWebSocketSessionManager.sendToUser(to, msg);
}
}

View File

@@ -3,7 +3,7 @@ package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.OnlineWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@@ -12,11 +12,16 @@ import java.io.IOException;
@RequiredArgsConstructor
public class SystemNotify implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
private final OnlineWebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
sessionManager.broadcast(msg.toJSONString());
}
@Override
public void handle(String msg, Integer to) throws IOException {
}
}

View File

@@ -3,7 +3,7 @@ package xin.merlin.myplayerbackend.utils.websocket.command.impl;
import com.alibaba.fastjson2.JSONObject;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import xin.merlin.myplayerbackend.utils.websocket.WebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.OnlineWebSocketSessionManager;
import xin.merlin.myplayerbackend.utils.websocket.command.BaseCommandHandler;
import java.io.IOException;
@@ -12,11 +12,16 @@ import java.io.IOException;
@RequiredArgsConstructor
public class Typing implements BaseCommandHandler {
private final WebSocketSessionManager sessionManager;
private final OnlineWebSocketSessionManager sessionManager;
@Override
public void handle(JSONObject msg) throws IOException {
String to = msg.getString("to");
sessionManager.sendToUser(Integer.valueOf(to), msg.toJSONString());
}
@Override
public void handle(String msg, Integer to) throws IOException {
}
}