4.1.2.3. 分隔符和定长解码器

TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式。

  1. 消息长度固定,累加读取到长度总和为定长 Leng 的报文后,就认为读取到了一个完整的消息:将计数器重置,重新开心读取下一个数据报。
  2. 将会车换行符作为消息的结束标准,例如FTP协议,这种方式在文本协议中应用比较广泛
  3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  4. 通过在消息头中定义长度字段来标识消息的总长度

Netty对上面4种应用做了统一的抽象。提供4中解码器来解决对应的问题。

4.1.2.3.1. DelimiterBasedFrameDecoder

自动完成分隔符做结束标准的消息解码

public class EchoServer {
    Logger log = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws InterruptedException {
        new EchoServer().bind(8086);
    }

    public void bind(int port) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap starp = new ServerBootstrap();
            starp.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 定义分隔符
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            // 超过1024还没有找到分隔符就抛出异常
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture channelFuture = starp.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public class EchoServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            log.info("=====: " + body);
            // 再将原始消息返回客户端
            // 由于使用了解码器,所以拿到消息的时候已经没有分隔符了
            ByteBuf echo = Unpooled.copiedBuffer((body + "$_").getBytes());
            ctx.writeAndFlush(echo);
        }

        // 发生异常关闭
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

public class EchoClient {
    Logger log = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws InterruptedException {
        new EchoClient().connect("127.0.0.1", 8086);
    }

    public void connect(String host, int port) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            // 发起异步链接操作
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 同步阻塞,链路关闭才被唤醒
            future.channel().closeFuture().sync();

        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

    public class EchoClientHandler extends ChannelHandlerAdapter {
        // 通道激活之后
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for (int i = 0; i < 100; i++) {
                ctx.writeAndFlush(Unpooled.copiedBuffer(("这是我的请求:" + i + "$_").getBytes()));
            }
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//            ctx.flush();
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("==== : " + msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

通过这一个骨架其实就能看出来了。服务端和客户端都是先配置 , 和给定一个业务处理器。 就搞定了

4.1.2.3.2. FixedLengthFrameDecoder

自动完成对定长消息的解码: 无论输入什么,服务端都将按固定长度分割。

public class EchoServer {
    Logger log = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws InterruptedException {
        new EchoServer().bind(8086);
    }

    public void bind(int port) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap starp = new ServerBootstrap();
            starp.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture channelFuture = starp.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public class EchoServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("===== : " + msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

使用Windows自带的telnet 来测试该功能

4.1.2.3.3. telnet 测试定长应用

在cmd中运行:telnet 127.0.0.1 8086

如果提示(在windows7中)'telnet' 不是内部或外部命令,也不是可运行的程序

解决方案:
控制面板\所有控制面板项\程序和功能\打开或关闭Windows功能 -> 勾选Telent客户端

链接之后,输入123456,你会发现,后台打印了12345,6并没有被打印出来。可见已经按照定长解码器的设置进行工作了。(我们程序中的定长解码器设置成了5,所以是按照5的长度进行分割)

注:可通过se localecho 命令打开本地回显,在输入的时候能看到自己输入的是什么

© All Rights Reserved            updated 2017-02-27 17:18:00

results matching ""

    No results matching ""