4.1.2.1. 入门应用
还是之前的TimeSrver
使用Netty来实现
4.1.2.1. 服务端
/**
* @author zhuqiang
* @version 1.0.1 2017/2/16 16:02
* @date 2017/2/16 16:02
* @since 1.0
*/
public class TimeSrver {
private Logger log = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws InterruptedException {
int port = 8086;
new TimeSrver().bind(port);
}
public void bind(int port) throws InterruptedException {
// 配置服务端的NIO线程组
// 包含一组NIO线程,专门用于网络事件的处理
// 实际上他们就是Reactor线程组。
//用于接收客户端的链接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用于SocketChannel的网络读写
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 引导配置
ServerBootstrap starp = new ServerBootstrap();
try {
starp.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定通道类型
.option(ChannelOption.SO_BACKLOG, 1024) // 缓存大小?
.childHandler(new ChildChannelHandler()); // 绑定处理器
// 绑定端口,同步等待成功。 ChannelFuture: 类似Jdk.Future, 用于异步操作的通知回调
ChannelFuture channelFuture = starp.bind(port).sync();
// 等待服务器监听端口关闭。该方法会阻塞,链路关闭后,会被唤醒
channelFuture.channel().closeFuture().sync();
} finally {
//优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHanler());
}
}
private class TimeServerHanler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 返回缓冲区可读字节数并创建容器
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req); //读取可读字节数的内容
String body = new String(req, "UTF-8");
log.info("== 请求消息:{}", body);
ByteBuf resp = Unpooled.copiedBuffer(new Date().toString().getBytes());
ctx.write(resp); // 异步发送应答
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将消息发送队列中的消息写入到 SocketChannel中发送给对方
// 性能考虑,放置频繁唤醒Selector进行消息发送。
// Netty的方法并不直接将消息写入 SocketChannel中
// 调用write只是把消息放到了发送缓冲数组中。
// 通过flush方法将缓冲区中的消息全部写入到SocketChannel中
ctx.flush();
// 但是通过实际测试,在请求先进来的时候,会先执行该方法是什么原因呢?
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常,释放相关句柄资源
ctx.close();
cause.printStackTrace();
}
}
}
4.1.2.1. 客户端
/**
* @author zhuqiang
* @version 1.0.1 2017/2/16 16:53
* @date 2017/2/16 16:53
* @since 1.0
*/
public class TimeClient {
public static void main(String[] args) throws InterruptedException {
int port = 8086;
new TimeClient().connect("127.0.0.1", port);
}
public void connect(String host, int port) throws InterruptedException {
// 配置客户端NIO线程组
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 通道建立后,绑定我们的处理类
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步链接操作
ChannelFuture future = bootstrap.connect(host, port).sync();
// 同步阻塞,链路关闭才被唤醒
future.channel().closeFuture().sync();
} finally {
//优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
private class TimeClientHandler extends ChannelHandlerAdapter {
private Logger log = LoggerFactory.getLogger(getClass());
private final ByteBuf firstMessage;
public TimeClientHandler() {
byte[] req = {};
try {
req = (Thread.currentThread().getName() + " 发起请求").getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
/**
* 客户端和服务端TCP链路建立成功之后,该方法被调用
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
/**
* 服务端返回应答消息时,该方法被调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
log.info("== 接收到消息:{}", body);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
这里的示例没有解决读半包的问题。后面会利用Netty体on个的默认编解码功能解决该问题。
4.1.2.1. 打包和部署
使用netty开发的都不是web应用。直接打成jar包部署。