本文最后更新于:2024年5月19日 下午
简介
使用Netty与硬件设备通讯,优点,达到百万连接没有任何难度
使用
1.maven依赖
1 2 3 4 5 6
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.46.Final</version> </dependency>
|
2.配置启动服务
在application.yml添加配置
1 2 3 4 5
| netty: enable: true server: port: 8543
|
创建启动服务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
@Component public class NettyServer {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
@Value("${netty.server.port}") private Integer nettyPort;
@Value("${netty.enable}") private boolean enable;
@Autowired private NettyServerEventHandle nettyEventHandle;
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
@PostConstruct public void start() { if (enable) { try { ServerBootstrap bootstrap = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(nettyPort)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new IdleStateHandler(60, 60, 20, TimeUnit.SECONDS)) .addLast(nettyEventHandle); } }); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("Start Netty Serve Success.."); } } catch (Exception e) { log.error("启动Netty服务异常,异常原因:{}", e.getMessage()); } } }
@PreDestroy public void destroy() { boss.shutdownGracefully().syncUninterruptibly(); work.shutdownGracefully().syncUninterruptibly(); log.info("Close Netty Serve Success.."); } }
|
3.事件监听,业务处理
在channelRead方法中,接收消息,处理相关业务即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
@Component @ChannelHandler.Sharable public class NettyServerEventHandle extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(NettyServerEventHandle.class);
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String channelId = ctx.channel().id().toString(); switch (event.state()) { case READER_IDLE: break; case WRITER_IDLE: break; case ALL_IDLE: log.info("服务端向客户端ID:{}发送心跳数据..", channelId); ctx.writeAndFlush(MessageUtil.sendMessage(JSONObject.toJSONString(NettyConfig.SEND_HEAR_MESSAGE))); break; default: break; } super.userEventTriggered(ctx, evt); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { String info = MessageUtil.getMessage(msg); String channelId = ctx.channel().id().toString(); log.info("(处理前)收到ID:{}发来的消息:{}", channelId, info+" : "+info.length());
log.info("(处理后)收到客户端ID:{}发来的消息:{}", channelId, info+" : "+info.length());
} catch (Exception e) { log.error(e.getMessage(),e); ctx.writeAndFlush(MessageUtil.sendMessage(JSONObject.toJSONString(NettyConfig.RESPONSE_ERROR_MESSAGE))); } }
@Override public void channelActive(ChannelHandlerContext ctx) { String ipAddress = ctx.channel().remoteAddress().toString(); String channelId = ctx.channel().id().toString(); log.info("客户端ID:{},IP地址:{}已经成功连接....", channelId, ipAddress); }
@Override public void channelInactive(ChannelHandlerContext ctx) {
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { String channelId = ctx.channel().id().toString(); String ipAddress = ctx.channel().remoteAddress().toString(); log.error("客户端ID:{},IP地址:{}发生了异常....", channelId, ipAddress); super.exceptionCaught(ctx, cause); } }
|
消息编码工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
public class MessageUtil {
public static ByteBuf sendMessage(String message) { return Unpooled.wrappedBuffer((message).getBytes((CharsetUtil.UTF_8))); }
public static String getMessage(Object message) { return ((ByteBuf) message).toString(CharsetUtil.UTF_8); } }
|
常量
1 2 3 4 5 6 7 8 9 10 11
|
public class NettyConfig { public static final String SEND_HEAR_MESSAGE = "服务器发送心跳数据..";
public static final String RESPONSE_HEAR_MESSAGE = "服务器收到客户端响应数据..";
public static final String RESPONSE_ERROR_MESSAGE = "服务器处理消息错误,请稍等重试.."; }
|
测试运行
启动项目,下载个tcp工具,访问127.0.0.1:8543 即可模拟通讯
源码
https://github.com/java-aodeng/demo-netty-20220812