-
方法一:Netty实现 —— 需要另外占用一个与springboot项目中tomcat不通的端口;
-
方法二:spring-boot-starter-websocket实现 —— 可共用springboot项目中tomcat使用的端口;
一、Netty实现
1、引入netty的依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency>
2、为了方便打包编译,我就使用springboot项目启动nettyServer吧:
可以借助ApplicationRunner,也可以借助CommandLineRunner完成,在Springboot项目启动后,即可自动执行Runner中的run()方法:
@Component public class NettyChatRunner implements ApplicationRunner { @Value("${websocket.port}") private Integer port; @Value("${websocket.netty}") private String path; @Override public void run(ApplicationArguments args) throws Exception { new NettyChatServer(port, path).start(); } }
3、如果有Netty基础,这个NettyServer服务,就很容易读懂了:
public class NettyChatServer { private int port; private String path; public NettyChatServer(int port, String path){ this.port = port; this.path = path; } public void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(4); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new HttpServerCodec()) //将请求和响应进行http编码或解码 .addLast(new HttpObjectAggregator(64*1024)) //解码成FullHttpRequest,因为HttpServerCodec无法解码Post请求体中的参数 .addLast(new ChunkedWriteHandler()) //大数据流的支持 // .addLast(new HttpRequestHandler("/chat")) //专门用来处理普通的http请求的handler .addLast(new WebSocketServerProtocolHandler(path)) //专门用来处理websocket请求的handler .addLast(new MyWebsocketHandler()); // 自定义的websocket处理handler处理类 } }); ChannelFuture future = bootstrap.bind(port).sync(); if (future.isSuccess()){ System.out.println("websocket服务启动成功,端口为:" + port); } ChannelFuture closeFuture = future.channel().closeFuture().sync();// 等待连接关闭 if (closeFuture.isSuccess()){ System.out.println("websocket服务被关闭!"); } } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
最重要的socketChannel.pipeline中的每一个handler都做了注释;
4、然后就是所有netty开发的项目中最重要的环节——自定义Handler:
@Slf4j public class MyWebsocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 维护后期所有链接上来的客户端的channel信息,相当于连接池 private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 当有客户端连接时,执行的逻辑 ——> 需要将连上的客户端记录下来,并通知其它用户 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel newConnect = ctx.channel(); //通知其它所有的用户,有新用户上线 log.info("[欢迎]" + newConnect.remoteAddress() + "进入聊天室"); channels.forEach(ch -> { if (ch != newConnect){ ch.writeAndFlush("[欢迎]" + newConnect.remoteAddress() + "进入聊天室"); } }); channels.add(newConnect); } // 当有客户端断开连接时,执行的逻辑 ——> 将连接移除,并通知其他用户 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel leaveConnect = ctx.channel(); log.info("[再见]" + leaveConnect.remoteAddress() + "离开聊天室"); //通知其它所有的用户,有用户下线 channels.forEach(ch -> { if (ch != leaveConnect){ ch.writeAndFlush("[再见]" + leaveConnect.remoteAddress() + "离开聊天室"); } }); channels.remove(leaveConnect); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel causeChannel = ctx.channel(); log.error(causeChannel.remoteAddress() + ":异常退出:" + cause.getCause()); causeChannel.close(); channels.remove(causeChannel); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Channel incoming = ctx.channel(); log.info("用户<" + incoming.remoteAddress() + ">说:" + msg.text()); //通知其它用户 channels.forEach(ch -> { if (ch != incoming){ ch.writeAndFlush(new TextWebSocketFrame("用户" + incoming.remoteAddress() + "说:" + msg.text())); } else { ch.writeAndFlush(new TextWebSocketFrame("我说:" + msg.text())); } }); } }
二、spring-boot-starter-websocket实现
1、引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.5.7</version> </dependency>
2、通过实现 WebsocketConfigurer ,完成自定义handler和拦截器的配置:
@Configuration @EnableWebSocket //重要,容易忘 public class WebSocketConfig implements WebSocketConfigurer { @Value("${websocket.http}") private String path; @Resource private ChatHandler chatHandler; @Resource private ChatInterceptor chatInterceptor; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(chatHandler, path) // 配置主处理逻辑 .addInterceptors(chatInterceptor) // 握手拦截器,可以做一些连接校验等 .setAllowedOrigins("*"); // 解决跨域 } }
3、自定义的拦截器ChatHandler(字节重写对应的方法即可):
@Slf4j @Component public class ChatHandler extends TextWebSocketHandler { public static AtomicInteger onlineNum = new AtomicInteger(0); public static CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>(); /** * 连接建立时 * @param session * @throws Exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String sessionId = session.getId(); log.info("【欢迎】用户【" + sessionId + "】上线了!"); sessions.forEach(s -> { if (s.getId() != sessionId){ try { s.sendMessage(new TextMessage("【欢迎】用户【" + sessionId + "】上线了!")); } catch (IOException e) { e.printStackTrace(); } } }); sessions.add(session); onlineNum.getAndIncrement(); } /** * 收到消息是时 * @param session * @param message * @throws Exception */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { String sessionId = session.getId(); sessions.forEach(s -> { try { if (s.getId() != sessionId){ s.sendMessage(new TextMessage("用户【" + sessionId + "】说:" + message.getPayload())); }else { s.sendMessage(new TextMessage("【我】说:" + message.getPayload())); } }catch (IOException e) { e.printStackTrace(); } }); } /** * 发生错误时 * @param session * @param exception * @throws Exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { } /** * 连接关闭后 * @param session * @param closeStatus * @throws Exception */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { String sessionId = session.getId(); log.info("【再见】用户【" + sessionId + "】下线了"); sessions.forEach(s -> { if (s.getId() != sessionId){ try { s.sendMessage(new TextMessage("【再见】用户【" + sessionId + "】上线了!")); } catch (IOException e) { e.printStackTrace(); } } }); sessions.remove(session); onlineNum.getAndDecrement(); } /** * * @return */ @Override public boolean supportsPartialMessages() { return false; } }
4、自定义握手拦截,完成一些校验之类的工作:
@Component public class ChatInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { // return false; return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }
其实,除了通过实现WebsocketHandler,并重写其中对应的方法外,还提供了一种注解方式的实现,只需要在对应的方法上添加以下注解,皆可实现与上面handler相同的功能,更加灵活!
@OnOpen
@OnClose
@OnError
@OnMessage
以上功能的自测截图,略!还可以通过一个websocket给另外一个websocket推送外部消息,很灵活!