4.1.2.1. 入门应用

还是之前的TimeSrver使用Netty来实现

4.1.2.1. 服务端

/**
 * @author zhuqiang
 * @version 1.0.1 2017/2/16 16:02
 * @date 2017/2/16 16:02
 * @since 1.0
 */
public class TimeSrver {
    private Logger log = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws InterruptedException {
        int port = 8086;
        new TimeSrver().bind(port);
    }

    public void bind(int port) throws InterruptedException {
        // 配置服务端的NIO线程组
        // 包含一组NIO线程,专门用于网络事件的处理
        // 实际上他们就是Reactor线程组。

        //用于接收客户端的链接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 用于SocketChannel的网络读写
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        // 引导配置
        ServerBootstrap starp = new ServerBootstrap();
        try {
            starp.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 指定通道类型
                    .option(ChannelOption.SO_BACKLOG, 1024) // 缓存大小?
                    .childHandler(new ChildChannelHandler()); // 绑定处理器
            // 绑定端口,同步等待成功。 ChannelFuture: 类似Jdk.Future, 用于异步操作的通知回调
            ChannelFuture channelFuture = starp.bind(port).sync();
            // 等待服务器监听端口关闭。该方法会阻塞,链路关闭后,会被唤醒
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeServerHanler());
        }
    }


    private class TimeServerHanler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            // 返回缓冲区可读字节数并创建容器
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req); //读取可读字节数的内容
            String body = new String(req, "UTF-8");
            log.info("== 请求消息:{}", body);

            ByteBuf resp = Unpooled.copiedBuffer(new Date().toString().getBytes());
            ctx.write(resp); // 异步发送应答
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将消息发送队列中的消息写入到 SocketChannel中发送给对方
            // 性能考虑,放置频繁唤醒Selector进行消息发送。
            // Netty的方法并不直接将消息写入 SocketChannel中
            // 调用write只是把消息放到了发送缓冲数组中。
            // 通过flush方法将缓冲区中的消息全部写入到SocketChannel中
            ctx.flush();
            // 但是通过实际测试,在请求先进来的时候,会先执行该方法是什么原因呢?
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 发生异常,释放相关句柄资源
            ctx.close();
            cause.printStackTrace();
        }
    }
}

4.1.2.1. 客户端

/**
 * @author zhuqiang
 * @version 1.0.1 2017/2/16 16:53
 * @date 2017/2/16 16:53
 * @since 1.0
 */
public class TimeClient {

    public static void main(String[] args) throws InterruptedException {
        int port = 8086;
        new TimeClient().connect("127.0.0.1", port);
    }

    public void connect(String host, int port) throws InterruptedException {
        // 配置客户端NIO线程组
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {

            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 通道建立后,绑定我们的处理类
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            // 发起异步链接操作
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 同步阻塞,链路关闭才被唤醒
            future.channel().closeFuture().sync();

        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

    private class TimeClientHandler extends ChannelHandlerAdapter {
        private Logger log = LoggerFactory.getLogger(getClass());

        private final ByteBuf firstMessage;

        public TimeClientHandler() {
            byte[] req = {};
            try {
                req = (Thread.currentThread().getName() + " 发起请求").getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
        }

        /**
         * 客户端和服务端TCP链路建立成功之后,该方法被调用
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(firstMessage);
        }

        /**
         * 服务端返回应答消息时,该方法被调用
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            log.info("== 接收到消息:{}", body);
            ctx.close();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

这里的示例没有解决读半包的问题。后面会利用Netty体on个的默认编解码功能解决该问题。

4.1.2.1. 打包和部署

使用netty开发的都不是web应用。直接打成jar包部署。

© All Rights Reserved            updated 2017-02-16 17:27:21

results matching ""

    No results matching ""