4.1.2.3. 分隔符和定长解码器
TCP
以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式。
- 消息长度固定,累加读取到长度总和为定长 Leng 的报文后,就认为读取到了一个完整的消息:将计数器重置,重新开心读取下一个数据报。
- 将会车换行符作为消息的结束标准,例如FTP协议,这种方式在文本协议中应用比较广泛
- 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
- 通过在消息头中定义长度字段来标识消息的总长度
Netty对上面4种应用做了统一的抽象。提供4中解码器来解决对应的问题。
4.1.2.3.1. DelimiterBasedFrameDecoder
自动完成分隔符做结束标准的消息解码
public class EchoServer {
Logger log = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8086);
}
public void bind(int port) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap starp = new ServerBootstrap();
starp.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 定义分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
// 超过1024还没有找到分隔符就抛出异常
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture channelFuture = starp.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
log.info("=====: " + body);
// 再将原始消息返回客户端
// 由于使用了解码器,所以拿到消息的时候已经没有分隔符了
ByteBuf echo = Unpooled.copiedBuffer((body + "$_").getBytes());
ctx.writeAndFlush(echo);
}
// 发生异常关闭
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
public class EchoClient {
Logger log = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws InterruptedException {
new EchoClient().connect("127.0.0.1", 8086);
}
public void connect(String host, int port) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 发起异步链接操作
ChannelFuture future = bootstrap.connect(host, port).sync();
// 同步阻塞,链路关闭才被唤醒
future.channel().closeFuture().sync();
} finally {
//优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
public class EchoClientHandler extends ChannelHandlerAdapter {
// 通道激活之后
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(("这是我的请求:" + i + "$_").getBytes()));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("==== : " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
通过这一个骨架其实就能看出来了。服务端和客户端都是先配置 , 和给定一个业务处理器。 就搞定了
4.1.2.3.2. FixedLengthFrameDecoder
自动完成对定长消息的解码: 无论输入什么,服务端都将按固定长度分割。
public class EchoServer {
Logger log = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8086);
}
public void bind(int port) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap starp = new ServerBootstrap();
starp.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture channelFuture = starp.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("===== : " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
使用Windows自带的telnet 来测试该功能
4.1.2.3.3. telnet 测试定长应用
在cmd中运行:telnet 127.0.0.1 8086
如果提示(在windows7中):
'telnet' 不是内部或外部命令,也不是可运行的程序
解决方案:
控制面板\所有控制面板项\程序和功能\打开或关闭Windows功能 -> 勾选Telent客户端
链接之后,输入123456,你会发现,后台打印了12345,6并没有被打印出来。可见已经按照定长解码器的设置进行工作了。(我们程序中的定长解码器设置成了5,所以是按照5的长度进行分割)
注:可通过se localecho
命令打开本地回显,在输入的时候能看到自己输入的是什么