使用Java-Netty实现长连接

本文最后更新于:2022年8月14日 上午

简介

使用Netty与硬件设备通讯,优点,达到百万连接没有任何难度

使用

1.maven依赖

1
2
3
4
5
6
<!--netty所需-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.46.Final</version>
</dependency>

2.配置启动服务

在application.yml添加配置

1
2
3
4
5
#netty配置
netty:
enable: true
server:
port: 8543

创建启动服务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* @author AoDeng
* @date 2022/8/12
*/
@Component
public class NettyServer {

private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

@Value("${netty.server.port}")
private Integer nettyPort;

@Value("${netty.enable}")
private boolean enable;

@Autowired
private NettyServerEventHandle nettyEventHandle;

private EventLoopGroup boss = new NioEventLoopGroup();

private EventLoopGroup work = new NioEventLoopGroup();

/**
* 启动 Netty
*/
@PostConstruct
public void start() {
if (enable) {
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss, work)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(nettyPort))
//服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
.option(ChannelOption.SO_BACKLOG, 128)
//Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。
//可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(60, 60, 20, TimeUnit.SECONDS))
.addLast(nettyEventHandle);
}
});
//绑定并开始接受传入的连接。
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("Start Netty Serve Success..");
}
} catch (Exception e) {
log.error("启动Netty服务异常,异常原因:{}", e.getMessage());
}
}
}

/**
* 销毁
*/
@PreDestroy
public void destroy() {
boss.shutdownGracefully().syncUninterruptibly();
work.shutdownGracefully().syncUninterruptibly();
log.info("Close Netty Serve Success..");
}
}

3.事件监听,业务处理

在channelRead方法中,接收消息,处理相关业务即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* @author AoDeng
* @date 2022/8/12
*/
@Component
@ChannelHandler.Sharable
public class NettyServerEventHandle extends ChannelInboundHandlerAdapter {

private static final Logger log = LoggerFactory.getLogger(NettyServerEventHandle.class);

/**
* 事件监听
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//获取当前信道状态
IdleStateEvent event = (IdleStateEvent) evt;
//连接客户端ID
String channelId = ctx.channel().id().toString();
switch (event.state()) {
//读空闲
case READER_IDLE:
break;
//写空闲
case WRITER_IDLE:
break;
//读写空闲
case ALL_IDLE:
log.info("服务端向客户端ID:{}发送心跳数据..", channelId);
ctx.writeAndFlush(MessageUtil.sendMessage(JSONObject.toJSONString(NettyConfig.SEND_HEAR_MESSAGE)));
break;
default:
break;
}
super.userEventTriggered(ctx, evt);
}

/**
* 接收到消息的时候触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
//消息体
String info = MessageUtil.getMessage(msg);
//连接客户端ID
String channelId = ctx.channel().id().toString();
log.info("(处理前)收到ID:{}发来的消息:{}", channelId, info+" : "+info.length());

//TODO 消息体根据具体业务处理转换...

log.info("(处理后)收到客户端ID:{}发来的消息:{}", channelId, info+" : "+info.length());
//处理业务开始...

//TODO 收到消息后处理具体业务...

} catch (Exception e) {
log.error(e.getMessage(),e);
ctx.writeAndFlush(MessageUtil.sendMessage(JSONObject.toJSONString(NettyConfig.RESPONSE_ERROR_MESSAGE)));
}
}


/**
* 绑定
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
//获取客户端IP地址进行身份验证
String ipAddress = ctx.channel().remoteAddress().toString();
String channelId = ctx.channel().id().toString();
log.info("客户端ID:{},IP地址:{}已经成功连接....", channelId, ipAddress);
}

/**
* 取消绑定
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {

}

/**
* 抛出异常的时候触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
String channelId = ctx.channel().id().toString();
String ipAddress = ctx.channel().remoteAddress().toString();
log.error("客户端ID:{},IP地址:{}发生了异常....", channelId, ipAddress);
super.exceptionCaught(ctx, cause);
}
}

消息编码工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @author AoDeng
* @date 2022/8/12
*/
public class MessageUtil {
/**
* 发送消息编码
*
* @param message 消息体
* @return 编码之后的消息
*/
public static ByteBuf sendMessage(String message) {
return Unpooled.wrappedBuffer((message).getBytes((CharsetUtil.UTF_8)));
}

/**
* 接收消息编码
*
* @param message 消息体
* @return 解码之后的消息
*/
public static String getMessage(Object message) {
return ((ByteBuf) message).toString(CharsetUtil.UTF_8);
}
}

常量

1
2
3
4
5
6
7
8
9
10
11
/**
* @author AoDeng
* @date 2022/8/12
*/
public class NettyConfig {
public static final String SEND_HEAR_MESSAGE = "服务器发送心跳数据..";

public static final String RESPONSE_HEAR_MESSAGE = "服务器收到客户端响应数据..";

public static final String RESPONSE_ERROR_MESSAGE = "服务器处理消息错误,请稍等重试..";
}

测试运行

启动项目,下载个tcp工具,访问127.0.0.1:8543 即可模拟通讯

源码

https://github.com/java-aodeng/demo-netty-20220812