package xin.merlin.myplayerbackend.utils.websocket; import com.alibaba.fastjson2.JSONObject; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import xin.merlin.myplayerbackend.config.RabbitMQConfig; import java.io.IOException; import java.time.LocalDateTime; import java.util.Set; @Slf4j @Component @RequiredArgsConstructor public class PlayroomWebSocketHandler extends TextWebSocketHandler { private final PlayroomWebSocketSessionManager sessionManager; private final RabbitTemplate rabbitTemplate; private final StringRedisTemplate redis; private final String IN_ROOM_BASE_KEY = "room:"; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { Integer userId = (Integer) session.getAttributes().get("id"); String payload = "{\"cmd\": \"PING\", \"to\": "+ userId +", \"time\":\""+ LocalDateTime.now() +"\", \"playroom\":"+ session.getAttributes().get("r_id")+ "}"; sessionManager.addSession(userId, session); rabbitTemplate.convertAndSend(RabbitMQConfig.WS_VIDEO_QUEUE,payload); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { JSONObject payload = JSONObject.parseObject(message.getPayload()); Integer userId = (Integer) session.getAttributes().get("id"); if(payload.get("cmd").equals("PONG")) { if(Boolean.TRUE.equals(redis.opsForSet().isMember(IN_ROOM_BASE_KEY + session.getAttributes().get("r_id"), String.valueOf(userId)))){ return; } redis.opsForSet().add(IN_ROOM_BASE_KEY+session.getAttributes().get("r_id"), String.valueOf(userId)); } if (payload.get("cmd").equals("VIDEO_SYNC")){ String url = payload.getString("url"); String timestamp = payload.getString("timestamp"); // Integer playroom = (Integer) session.getAttributes().get("r_id"); String sending = "{\"cmd\": \"VIDEO_SYNC\", \"from\": "+ userId +", \"time\":\""+ LocalDateTime.now() +"\", \"url\":\""+ url +"\", \"timestamp\":\""+ timestamp +"\", \"playroom\":"+ session.getAttributes().get("r_id") +"}"; Set members = redis.opsForSet().members(IN_ROOM_BASE_KEY+session.getAttributes().get("r_id")); try { for (String member : members) { sessionManager.sendToUser(Integer.valueOf(member),sending); } } catch (IOException | NumberFormatException e) { log.error(e.getMessage()); throw new RuntimeException(e); } } if(payload.get("cmd").equals("VIDEO_PLAY") || payload.get("cmd").equals("VIDEO_PAUSE")){ Set members = redis.opsForSet().members(IN_ROOM_BASE_KEY+session.getAttributes().get("r_id")); try { for (String member : members) { sessionManager.sendToUser(Integer.valueOf(member),message.getPayload()); } } catch (IOException | NumberFormatException e) { log.error(e.getMessage()); throw new RuntimeException(e); } } } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { Integer userId = (Integer) session.getAttributes().get("id"); redis.opsForSet().remove(IN_ROOM_BASE_KEY+session.getAttributes().get("r_id"), userId.toString()); sessionManager.removeSession(userId); } @EventListener public void onShutdown(ContextClosedEvent event) { log.info("Shutting down... closing websocket sessions"); sessionManager.closeAll(); } }