博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty入门篇(1)
阅读量:5956 次
发布时间:2019-06-19

本文共 30847 字,大约阅读时间需要 102 分钟。

上一篇    下一篇 

一、为什么选择Netty

Netty是最流行的框架之一、健壮性、功能、性能、可定制性和可扩展性在同类框架中首屈一指,因此被大规模使用,例如ROCKETMQ的NameSRV,例如Hadoop的Avro,例如Dubbo中的RPC通信等等。。

为什么选择Netty?

  • API简单;
  • 功能强大,预置了选多的编码功能,支持多种主流协议;
  • 定制能力强,通过ChannelHandler对通信框架进行灵活的扩展;
  • 性能强;
  • 成熟,修改已发现的JDK nio BUG
  • 社区活跃
  • 经过大规模的商业应用考验,质量得到验证。

二、使用Netty开发TimeServer

环境准备: pom.xml

4.0.0
demo
netty
1.0-SNAPSHOT
io.netty
netty-all
4.1.5.Final
org.apache.maven.plugins
maven-compiler-plugin
3.1
1.8
1.8

1. Netty TimeServer

 

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class TimeServer {    public void bind(int port) throws Exception {        // 配置服务端的NIO线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            //Netty启动Nio服务端的辅助类            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 1024) //设置服务端tcp参数                    .childHandler(new ChildChannelHandler());            // 绑定端口,同步等待成功            ChannelFuture f = b.bind(port).sync();            // 进行阻塞,等待服务端监听端口关闭            f.channel().closeFuture().sync();        } finally {            // 优雅退出,释放线程池资源            System.out.println("服务器关闭...");            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    private class ChildChannelHandler extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new TimeServer().bind(port); }}
  • EventLoopGroup Reactor线程组 2个
  • ServerBootstrap:Server端辅助工具
  • 设置channel: NioServerSocketChannel
  • option: 服务端tcp option设置,这里以backlog 1024为例..
  • 增加childHandler
  • f.channel().closeFuture().sync()表示进行阻塞,等待服务器端链路关闭之后main函数才退出

2. TimeServerHandler

1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6  7 /** 8  * @author lilinfeng 9  * @version 1.010  * @date 2014年2月14日11  */12 public class TimeServerHandler extends ChannelInboundHandlerAdapter {13 14 15     @Override16     public void channelRead(ChannelHandlerContext ctx, Object msg)17             throws Exception {18         ByteBuf buf = (ByteBuf) msg;19         byte[] req = new byte[buf.readableBytes()];20         buf.readBytes(req);21         String body = new String(req, "UTF-8");22         System.out.println("The time server receive order : " + body);23         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(24                 System.currentTimeMillis()).toString() : "BAD ORDER";25         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());26         ctx.write(resp);27     }28 29     @Override30     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {31         ctx.flush();32     }33 34     @Override35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {36         ctx.close();37     }38 }

(1) 18行做类型转换,将msg转换为Netty的ByteBuf对象,这个对象比ByteBuffer更加强大和灵活。

(2) 19行到20行通过ByteBuf的readableBytes获取缓冲区可读的字节数,根据可读的字节数创建byte数组。将缓冲区的内容读取到byte数组中。

(3) 31行发现了flush方法,其作用是将消息发送队列中的消息写入到SocketChannel中发送给对方。从性能上考虑,为了防止频繁唤醒Selector进行消息发送,Netty的write方法不直接写入到SocketChannel中,调用write方法只会写入到缓冲数组中,调用flush方法,才会写入到SocketChannel中。

(4) 36行的close()是在发生异常后释放资源

总结: 就是比NIO舒服太多了.

3. Time Client

1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 10 /**11  * @author lilinfeng12  * @version 1.013  * @date 2014年2月14日14  */15 public class TimeClient {16 17     public void connect(int port, String host) throws Exception {18         // 配置客户端NIO线程组19         EventLoopGroup group = new NioEventLoopGroup();20         try {21             Bootstrap b = new Bootstrap();22             b.group(group).channel(NioSocketChannel.class)23                     .option(ChannelOption.TCP_NODELAY, true)24                     .handler(new ChannelInitializer
() {25 @Override26 public void initChannel(SocketChannel ch)27 throws Exception {28 ch.pipeline().addLast(new TimeClientHandler());29 }30 });31 32 // 发起异步连接操作33 ChannelFuture f = b.connect(host, port).sync();34 35 // 当代客户端链路关闭36 f.channel().closeFuture().sync();37 } finally {38 // 优雅退出,释放NIO线程组39 group.shutdownGracefully();40 }41 }42 43 /**44 * @param args45 * @throws Exception46 */47 public static void main(String[] args) throws Exception {48 int port = 8080;49 if (args != null && args.length > 0) {50 try {51 port = Integer.valueOf(args[0]);52 } catch (NumberFormatException e) {53 // 采用默认值54 }55 }56 new TimeClient().connect(port, "127.0.0.1");57 }58 }

(1) 19行创建客户端处理I/O读写的NioEventLoopGroup线程组,然后继续创建辅助类Bootstrap,并且对其配置,此处配置为 NioSocketChannel,然后为其添加Handler。

(2) 这里Handler直接使用匿名内部类

(3) 33行的connect发送异步连接请求,然后阻塞直到关闭。

4. TimeClientHandler

1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6  7 import java.util.logging.Logger; 8  9 /**10  * @author lilinfeng11  * @version 1.012  * @date 2014年2月14日13  */14 public class TimeClientHandler extends ChannelInboundHandlerAdapter {15 16     private static final Logger logger = Logger17             .getLogger(TimeClientHandler.class.getName());18 19     private final ByteBuf firstMessage;20 21     /**22      * Creates a client-side handler.23      */24     public TimeClientHandler() {25         byte[] req = "QUERY TIME ORDER".getBytes();26         firstMessage = Unpooled.buffer(req.length);27         firstMessage.writeBytes(req);28 29     }30 31     @Override32     public void channelActive(ChannelHandlerContext ctx) {33         ctx.writeAndFlush(firstMessage);34     }35 36     @Override37     public void channelRead(ChannelHandlerContext ctx, Object msg)38             throws Exception {39         ByteBuf buf = (ByteBuf) msg;40         byte[] req = new byte[buf.readableBytes()];41         buf.readBytes(req);42         String body = new String(req, "UTF-8");43         System.out.println("Now is : " + body);44     }45 46     @Override47     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {48         // 释放资源49         logger.warning("Unexpected exception from downstream : "50                 + cause.getMessage());51         ctx.close();52     }53 }

 

这里重点关注3个方法: channelActive channelRead和exceptionCaught

(1) 当客户端和服务器端成功创建链路,调用channelActive方法,发送查询时间的指令给服务端,调用writeAndFlush方法发送数据。

(2) 39行开始调用channelRead,读取数据,49行处理异常时释放资源即可。

三、TCP 粘包/拆包问题的解决之道

1、TCP得粘包和拆包问题

  • TCP是一个流协议
  • TCP底层不了解业务数据含义,即不知道多少个字节算是业务上的一整体数据
  • 因此业务上认为,一个完整的包会被TCP拆分为多个包进行发送,也有可能将多个小的包封装成一个大包进行发送、

用下图进行描述,假设client发送了2个包,D1和D2,服务器端读到的数据是不确定的

存在4种可能:

server 分2次,分别读到D1,D2,完美巧合,没有粘包和拆包

server一次读到D1,D2,D1和D2粘在一起,称为粘包

server分2次,第一次读到D1和D2的部分内容,第二次读到了D2的剩余内容,拆包

server分2次,第一次读到D1的部分内容D1_1,第二次读到D1剩下的内容D1_2和完整的D2。

如果此时服务器端TCP接收的滑窗非常的小、而且数据包D1和D2都比较大,很有可能发生第5种可能性,服务器端多次才能将D1和D2接收完全,期间发生多次拆包...即上4种情况的多次组合...

下面我们来分析一下原因:

3个原因:

(1) 应用程序write写入的字节大于套接口(scoket)发送缓冲的大小;

(2) 进行MSS大小的TCP分段;

(3) 以太网帧的payload大于MTU进行IP分片

总结就是:不可避免...

解决思路:

(1) 定长数据,例如每个报文200bytes,不够空格补充...

(2) 在包围增加回车换行符或者其他的特殊字符进行分割,例如FTP协议

(3) 将消息分为消息头和消息体,消息头中包含消息总长度(或者消息体长度)content-length,通常的设计思路为消息头的第一个字段用int32来表示消息的总长度;

(4) 更复杂的应用层协议

2. 下面我们来模拟未考虑TCP粘包问题导致功能异常

修改上面的代码:

修改TimeServerHandler

1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5  6 /** 7  * @author lilinfeng 8  * @version 1.0 9  * @date 2014年2月14日10  */11 public class TimeServerHandler extends ChannelInboundHandlerAdapter {12 13     private int counter;14 15     @Override16     public void channelRead(ChannelHandlerContext ctx, Object msg)17             throws Exception {18         ByteBuf buf = (ByteBuf) msg;19         byte[] req = new byte[buf.readableBytes()];20         buf.readBytes(req);21         String body = new String(req, "UTF-8").substring(0, req.length22                 - System.getProperty("line.separator").length());23         System.out.println("The time server receive order : " + body24                 + " ; the counter is : " + ++counter);25         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(26                 System.currentTimeMillis()).toString() : "BAD ORDER";27         currentTime = currentTime + System.getProperty("line.separator");28         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());29         ctx.writeAndFlush(resp);30     }31 32     @Override33     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {34         ctx.close();35     }36 }

主要是增加了一个counter进行计数..

修改TimeClientHandler

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.logging.Logger;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class TimeClientHandler extends ChannelInboundHandlerAdapter {    private static final Logger logger = Logger            .getLogger(TimeClientHandler.class.getName());    private final ByteBuf firstMessage;    /**     * Creates a client-side handler.     */    public TimeClientHandler() {        byte[] req = "QUERY TIME ORDER".getBytes();        firstMessage = Unpooled.buffer(req.length);        firstMessage.writeBytes(req);    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        ctx.writeAndFlush(firstMessage);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        ByteBuf buf = (ByteBuf) msg;        byte[] req = new byte[buf.readableBytes()];        buf.readBytes(req);        String body = new String(req, "UTF-8");        System.out.println("Now is : " + body);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // 释放资源        logger.warning("Unexpected exception from downstream : "                + cause.getMessage());        ctx.close();    }}

主要是进行100次连续的发送数据...

由于tcp粘包拆包有一定的随机性,所以每次的结果可能不同,其中一次结果大致上是:

Server端打印:

QUERY TIME ORDER....the counter is : 2

Client端打印:

Now is : Thu Dec 15 15:11:22 CST 2016BAD ORDERBAD ORDER ; the counter is : 1

结果表明:client发送了100条消息,但是server是按照2次接收,只返回2条应答,但是client上的counter为1表明只client也接收了一次,说明这2条也进行了粘包。

3. 解决TCP粘包的TimeServer

 

TimeServer

1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder;10 import io.netty.handler.codec.string.StringDecoder;11 12 /**13  * @author lilinfeng14  * @version 1.015  * @date 2014年2月14日16  */17 public class TimeServer {18 19     public void bind(int port) throws Exception {20         // 配置服务端的NIO线程组21         EventLoopGroup bossGroup = new NioEventLoopGroup();22         EventLoopGroup workerGroup = new NioEventLoopGroup();23         try {24             ServerBootstrap b = new ServerBootstrap();25             b.group(bossGroup, workerGroup)26                     .channel(NioServerSocketChannel.class)27                     .option(ChannelOption.SO_BACKLOG, 1024)28                     .childHandler(new ChildChannelHandler());29             // 绑定端口,同步等待成功30             ChannelFuture f = b.bind(port).sync();31 32             // 等待服务端监听端口关闭33             f.channel().closeFuture().sync();34         } finally {35             // 优雅退出,释放线程池资源36             bossGroup.shutdownGracefully();37             workerGroup.shutdownGracefully();38         }39     }40 41     private class ChildChannelHandler extends ChannelInitializer
{42 @Override43 protected void initChannel(SocketChannel arg0) throws Exception {44 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));45 arg0.pipeline().addLast(new StringDecoder());46 arg0.pipeline().addLast(new TimeServerHandler());47 }48 }49 50 /**51 * @param args52 * @throws Exception53 */54 public static void main(String[] args) throws Exception {55 int port = 8080;56 if (args != null && args.length > 0) {57 try {58 port = Integer.valueOf(args[0]);59 } catch (NumberFormatException e) {60 // 采用默认值61 }62 }63 new TimeServer().bind(port);64 }65 }

重点看44行,增加了2个解码器: LineBasedFrameDecoder和StringDecoder。

继续看TimeServerHandler

1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5  6 /** 7  * @author lilinfeng 8  * @version 1.0 9  * @date 2014年2月14日10  */11 public class TimeServerHandler extends ChannelInboundHandlerAdapter {12 13     private int counter;14 15     @Override16     public void channelRead(ChannelHandlerContext ctx, Object msg)17             throws Exception {18         String body = (String) msg;19         System.out.println("The time server receive order : " + body20                 + " ; the counter is : " + ++counter);21         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(22                 System.currentTimeMillis()).toString() : "BAD ORDER";23         currentTime = currentTime + System.getProperty("line.separator");24         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());25         ctx.writeAndFlush(resp);26     }27 28     @Override29     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {30         ctx.close();31     }32 }

看18行,直接获取之后不是ByteBuf,而直接是一个String对象,代码非常简洁。

TimeClient

1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder;10 import io.netty.handler.codec.string.StringDecoder;11 12 /**13  * @author lilinfeng14  * @version 1.015  * @date 2014年2月14日16  */17 public class TimeClient {18 19     public void connect(int port, String host) throws Exception {20         // 配置客户端NIO线程组21         EventLoopGroup group = new NioEventLoopGroup();22         try {23             Bootstrap b = new Bootstrap();24             b.group(group).channel(NioSocketChannel.class)25                     .option(ChannelOption.TCP_NODELAY, true)26                     .handler(new ChannelInitializer
() {27 @Override28 public void initChannel(SocketChannel ch)29 throws Exception {30 ch.pipeline().addLast(31 new LineBasedFrameDecoder(1024));32 ch.pipeline().addLast(new StringDecoder());33 ch.pipeline().addLast(new TimeClientHandler());34 }35 });36 37 // 发起异步连接操作38 ChannelFuture f = b.connect(host, port).sync();39 40 // 当代客户端链路关闭41 f.channel().closeFuture().sync();42 } finally {43 // 优雅退出,释放NIO线程组44 group.shutdownGracefully();45 }46 }47 48 /**49 * @param args50 * @throws Exception51 */52 public static void main(String[] args) throws Exception {53 int port = 8080;54 if (args != null && args.length > 0) {55 try {56 port = Integer.valueOf(args[0]);57 } catch (NumberFormatException e) {58 // 采用默认值59 }60 }61 new TimeClient().connect(port, "127.0.0.1");62 }63 }

类似TimeServer增加了2个解码器

再看TimeClientHandler

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.logging.Logger;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class TimeClientHandler extends ChannelInboundHandlerAdapter {    private static final Logger logger = Logger            .getLogger(TimeClientHandler.class.getName());    private int counter;    private byte[] req;    /**     * Creates a client-side handler.     */    public TimeClientHandler() {        req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))                .getBytes();    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        ByteBuf message = null;        for (int i = 0; i < 100; i++) {            message = Unpooled.buffer(req.length);            message.writeBytes(req);            ctx.writeAndFlush(message);        }    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        String body = (String) msg;        System.out.println("Now is : " + body + " ; the counter is : "                + ++counter);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // 释放资源        logger.warning("Unexpected exception from downstream : "                + cause.getMessage());        ctx.close();    }}

直接运行发现完全符合我们需求

4. 分析LineBaseFrameDecoder和StringDecoder

LineBasedFrameDecoder的工作原理非常简单:

(1) 遍历ByteBuf中的可读字节,判断看是否有"\n"或者"\r\n",如果有,就以此为结束位置,从可读索引到结束位置区间的字节就组成了一行

(2) 是一个以换行符为结束标志的解码器,支持携带结束符或者不携带结束符2种方式,同时支持配置单行的最大长度。

(3) 超过单行最大长度直接抛异常

StringDecoder的非常简单:

(1) 将接收的对象转换为字符串

(2) 继续调用后面的Handler

因此:

LineBasedFrameDecoder和StringDecoder组合在一起就是行切换文件解码器。

四、分割符解码器的应用

使用DelimiterBasedFrameDecoder即可...

1. EohoServer

1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.buffer.ByteBuf; 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioServerSocketChannel;11 import io.netty.handler.codec.DelimiterBasedFrameDecoder;12 import io.netty.handler.codec.string.StringDecoder;13 import io.netty.handler.logging.LogLevel;14 import io.netty.handler.logging.LoggingHandler;15 16 /**17  * @author lilinfeng18  * @version 1.019  * @date 2014年2月14日20  */21 public class EchoServer {22     public void bind(int port) throws Exception {23         // 配置服务端的NIO线程组24         EventLoopGroup bossGroup = new NioEventLoopGroup();25         EventLoopGroup workerGroup = new NioEventLoopGroup();26         try {27             ServerBootstrap b = new ServerBootstrap();28             b.group(bossGroup, workerGroup)29                     .channel(NioServerSocketChannel.class)30                     .option(ChannelOption.SO_BACKLOG, 100)31                     .handler(new LoggingHandler(LogLevel.INFO))32                     .childHandler(new ChannelInitializer
() {33 @Override34 public void initChannel(SocketChannel ch)35 throws Exception {36 ByteBuf delimiter = Unpooled.copiedBuffer("$_"37 .getBytes());38 ch.pipeline().addLast(39 new DelimiterBasedFrameDecoder(1024,40 delimiter));41 ch.pipeline().addLast(new StringDecoder());42 ch.pipeline().addLast(new EchoServerHandler());43 }44 });45 46 // 绑定端口,同步等待成功47 ChannelFuture f = b.bind(port).sync();48 49 // 等待服务端监听端口关闭50 f.channel().closeFuture().sync();51 } finally {52 // 优雅退出,释放线程池资源53 bossGroup.shutdownGracefully();54 workerGroup.shutdownGracefully();55 }56 }57 58 public static void main(String[] args) throws Exception {59 int port = 8080;60 if (args != null && args.length > 0) {61 try {62 port = Integer.valueOf(args[0]);63 } catch (NumberFormatException e) {64 // 采用默认值65 }66 }67 new EchoServer().bind(port);68 }69 }

(1) 重点在于38行的DelimiterBasedFrameDecoder, 与上面的换行分割符类似,只是可以自定义特殊符号

(2) DelimiterBasedFrameDecoder有2个参数,一个为单行最大长度,一个为自定义符号对象

(3) 如果到达长度仍然没有查询到,就抛出TooLongFrameException异常

2. EchoServerHandler

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    int counter = 0;    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        String body = (String) msg;        System.out.println("This is " + ++counter + " times receive client : ["                + body + "]");        body += "$_";        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());        ctx.writeAndFlush(echo);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();// 发生异常,关闭链路    }}

非常简单直接打印再write即可... 由此也可以看出netty框架比较干净的分离出来了业务逻辑代码。

3. Client端和ClientHandler基本类似

import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class EchoClient {    public void connect(int port, String host) throws Exception {        // 配置客户端NIO线程组        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group).channel(NioSocketChannel.class)                    .option(ChannelOption.TCP_NODELAY, true)                    .handler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_" .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new EchoClient().connect(port, "127.0.0.1"); }}
import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class EchoClientHandler extends ChannelInboundHandlerAdapter {    private int counter;    static final String ECHO_REQ = "Hi, Lilinfeng. Welcome to Netty.$_";    /**     * Creates a client-side handler.     */    public EchoClientHandler() {    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        // ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer(ECHO_REQ        // .getBytes().length);        // buf.writeBytes(ECHO_REQ.getBytes());        for (int i = 0; i < 10; i++) {            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));        }    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        System.out.println("This is " + ++counter + " times receive server : ["                + msg + "]");    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}

运行代码,符合预期..

五、定长解码器

1. 开发服务端

非常简单,直接上代码:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.FixedLengthFrameDecoder;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class EchoServer {    public void bind(int port) throws Exception {        // 配置服务端的NIO线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new FixedLengthFrameDecoder(20)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new EchoServer().bind(port); }}
import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        System.out.println("Receive client : [" + msg + "]");    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();// 发生异常,关闭链路    }}

2. 使用telnet进行访问

(1) 我使用的是Xshell,直接命令

(2) telnet 127.0.0.1 8080

(3) 再随便输入字符,发现每20个字符,服务端显示一次,符合预期

 

转载于:https://www.cnblogs.com/carl10086/p/6183030.html

你可能感兴趣的文章
Linux嵌入式GDB调试环境搭建
查看>>
java分析jvm常用指令
查看>>
【Linux】Linux 在线安装yum
查看>>
oracle 管理操作 (转)
查看>>
DEV 等待窗口
查看>>
VS2017发布微服务到docker
查看>>
lombok
查看>>
Dev-FAT-UAT-PRO
查看>>
Android开发学习总结(五)——Android应用目录结构分析(转)
查看>>
[PHP]PHP rpc框架hprose测试
查看>>
Atom 编辑器系列视频课程
查看>>
C#三种定时器
查看>>
范数 L1 L2
查看>>
协同过滤及大数据处理
查看>>
Java8 本地DateTime API
查看>>
jQuery 增加 删除 修改select option
查看>>
[原][osgearth]osgearthviewer读取earth文件,代码解析(earth文件读取的一帧)
查看>>
springboot 常用插件
查看>>
一个基于特征向量的近似网页去重算法——term用SVM人工提取训练,基于term的特征向量,倒排索引查询相似文档,同时利用cos计算相似度...
查看>>
[转]Newtonsoft.Json高级用法
查看>>