-
方法一:Netty实现 —— 需要另外占用一个与springboot项目中tomcat不通的端口;
-
方法二:spring-boot-starter-websocket实现 —— 可共用springboot项目中tomcat使用的端口;
一、Netty实现
1、引入netty的依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency>
2、为了方便打包编译,我就使用springboot项目启动nettyServer吧:
可以借助ApplicationRunner,也可以借助CommandLineRunner完成,在Springboot项目启动后,即可自动执行Runner中的run()方法:
@Component
public class NettyChatRunner implements ApplicationRunner {
@Value("${websocket.port}")
private Integer port;
@Value("${websocket.netty}")
private String path;
@Override
public void run(ApplicationArguments args) throws Exception {
new NettyChatServer(port, path).start();
}
}
3、如果有Netty基础,这个NettyServer服务,就很容易读懂了:
public class NettyChatServer {
private int port;
private String path;
public NettyChatServer(int port, String path){
this.port = port;
this.path = path;
}
public void start(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(4);
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new HttpServerCodec()) //将请求和响应进行http编码或解码
.addLast(new HttpObjectAggregator(64*1024)) //解码成FullHttpRequest,因为HttpServerCodec无法解码Post请求体中的参数
.addLast(new ChunkedWriteHandler()) //大数据流的支持
// .addLast(new HttpRequestHandler("/chat")) //专门用来处理普通的http请求的handler
.addLast(new WebSocketServerProtocolHandler(path)) //专门用来处理websocket请求的handler
.addLast(new MyWebsocketHandler()); // 自定义的websocket处理handler处理类
}
});
ChannelFuture future = bootstrap.bind(port).sync();
if (future.isSuccess()){
System.out.println("websocket服务启动成功,端口为:" + port);
}
ChannelFuture closeFuture = future.channel().closeFuture().sync();// 等待连接关闭
if (closeFuture.isSuccess()){
System.out.println("websocket服务被关闭!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
最重要的socketChannel.pipeline中的每一个handler都做了注释;
4、然后就是所有netty开发的项目中最重要的环节——自定义Handler:
@Slf4j
public class MyWebsocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 维护后期所有链接上来的客户端的channel信息,相当于连接池
private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 当有客户端连接时,执行的逻辑 ——> 需要将连上的客户端记录下来,并通知其它用户
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel newConnect = ctx.channel();
//通知其它所有的用户,有新用户上线
log.info("[欢迎]" + newConnect.remoteAddress() + "进入聊天室");
channels.forEach(ch -> {
if (ch != newConnect){
ch.writeAndFlush("[欢迎]" + newConnect.remoteAddress() + "进入聊天室");
}
});
channels.add(newConnect);
}
// 当有客户端断开连接时,执行的逻辑 ——> 将连接移除,并通知其他用户
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel leaveConnect = ctx.channel();
log.info("[再见]" + leaveConnect.remoteAddress() + "离开聊天室");
//通知其它所有的用户,有用户下线
channels.forEach(ch -> {
if (ch != leaveConnect){
ch.writeAndFlush("[再见]" + leaveConnect.remoteAddress() + "离开聊天室");
}
});
channels.remove(leaveConnect);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel causeChannel = ctx.channel();
log.error(causeChannel.remoteAddress() + ":异常退出:" + cause.getCause());
causeChannel.close();
channels.remove(causeChannel);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
Channel incoming = ctx.channel();
log.info("用户<" + incoming.remoteAddress() + ">说:" + msg.text());
//通知其它用户
channels.forEach(ch -> {
if (ch != incoming){
ch.writeAndFlush(new TextWebSocketFrame("用户" + incoming.remoteAddress() + "说:" + msg.text()));
} else {
ch.writeAndFlush(new TextWebSocketFrame("我说:" + msg.text()));
}
});
}
}
二、spring-boot-starter-websocket实现
1、引入依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.5.7</version> </dependency>
2、通过实现 WebsocketConfigurer ,完成自定义handler和拦截器的配置:
@Configuration
@EnableWebSocket //重要,容易忘
public class WebSocketConfig implements WebSocketConfigurer {
@Value("${websocket.http}")
private String path;
@Resource
private ChatHandler chatHandler;
@Resource
private ChatInterceptor chatInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatHandler, path) // 配置主处理逻辑
.addInterceptors(chatInterceptor) // 握手拦截器,可以做一些连接校验等
.setAllowedOrigins("*"); // 解决跨域
}
}
3、自定义的拦截器ChatHandler(字节重写对应的方法即可):
@Slf4j
@Component
public class ChatHandler extends TextWebSocketHandler {
public static AtomicInteger onlineNum = new AtomicInteger(0);
public static CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
/**
* 连接建立时
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String sessionId = session.getId();
log.info("【欢迎】用户【" + sessionId + "】上线了!");
sessions.forEach(s -> {
if (s.getId() != sessionId){
try {
s.sendMessage(new TextMessage("【欢迎】用户【" + sessionId + "】上线了!"));
} catch (IOException e) {
e.printStackTrace();
}
}
});
sessions.add(session);
onlineNum.getAndIncrement();
}
/**
* 收到消息是时
* @param session
* @param message
* @throws Exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
String sessionId = session.getId();
sessions.forEach(s -> {
try {
if (s.getId() != sessionId){
s.sendMessage(new TextMessage("用户【" + sessionId + "】说:" + message.getPayload()));
}else {
s.sendMessage(new TextMessage("【我】说:" + message.getPayload()));
}
}catch (IOException e) {
e.printStackTrace();
}
});
}
/**
* 发生错误时
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
}
/**
* 连接关闭后
* @param session
* @param closeStatus
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
String sessionId = session.getId();
log.info("【再见】用户【" + sessionId + "】下线了");
sessions.forEach(s -> {
if (s.getId() != sessionId){
try {
s.sendMessage(new TextMessage("【再见】用户【" + sessionId + "】上线了!"));
} catch (IOException e) {
e.printStackTrace();
}
}
});
sessions.remove(session);
onlineNum.getAndDecrement();
}
/**
*
* @return
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
4、自定义握手拦截,完成一些校验之类的工作:
@Component
public class ChatInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// return false;
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
其实,除了通过实现WebsocketHandler,并重写其中对应的方法外,还提供了一种注解方式的实现,只需要在对应的方法上添加以下注解,皆可实现与上面handler相同的功能,更加灵活!
@OnOpen
@OnClose
@OnError
@OnMessage
以上功能的自测截图,略!还可以通过一个websocket给另外一个websocket推送外部消息,很灵活!



