# proxy reactor 模型
代理的 reactor 模型,研究该模式的一个大致写法、握手登录认证流程
io.mycat.mycat2.ProxyStarter#start 开始构造的地方
NIOAcceptor acceptor = new NIOAcceptor(new DirectByteBufferPool(proxybean.getBufferPoolPageSize(),
proxybean.getBufferPoolChunkSize(),
proxybean.getBufferPoolPageNumber()));
acceptor.start();
构造里面比较简单。
```java
public NIOAcceptor(BufferPool bufferPool) throws IOException {
super(bufferPool);
this.setName("NIO-Acceptor");
}
@SuppressWarnings("unchecked")
public ProxyReactorThread(BufferPool bufPool) throws IOException {
this.bufPool = bufPool;
// 一个 ProxyReactorThread 线程一个独立的 selector 提高性能
this.selector = Selector.open();
sessionMan = (SessionManager<T>) ProxyRuntime.INSTANCE.getSessionManager();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
主要的选择逻辑
io.mycat.proxy.ProxyReactorThread#run
public void run() {
long ioTimes = 0;
// 构造了一个 当前处理的环境,里面只有两个属性
// counter 计数器和 curSession 当前 session
ReactorEnv reactorEnv = new ReactorEnv();
while (true) {
try {
// 选择了超时的方法,默认是 100 毫秒
selector.select(SELECTOR_TIMEOUT);
final Set<SelectionKey> keys = selector.selectedKeys();
// logger.info("handler keys ,total " + selected);
// 当没有事件的时候处理 pendingJobs ,
// 这里高效的利用了 acceptor 线程的空闲时间
if (keys.isEmpty()) {
if (!pendingJobs.isEmpty()) {
ioTimes = 0;
this.processNIOJob();
}
continue;
}
// 当 acceptor 繁忙的时候,处理 5 次则必须处理一次 pendingJobs
// 防止 acceptor 一直繁忙,pendingJobs 被饿死的情况
else if ((ioTimes > 5) & !pendingJobs.isEmpty()) {
ioTimes = 0;
this.processNIOJob();
}
ioTimes++;
for (final SelectionKey key : keys) {
// 判定了 链接、读、写 事件,委托不同的方法进行处理
try {
int readdyOps = key.readyOps();
reactorEnv.curSession = null;
// 如果当前收到连接请求
if ((readdyOps & SelectionKey.OP_ACCEPT) != 0) {
processAcceptKey(reactorEnv, key);
}
// 如果当前连接事件
else if ((readdyOps & SelectionKey.OP_CONNECT) != 0) {
this.processConnectKey(reactorEnv, key);
} else if ((readdyOps & SelectionKey.OP_READ) != 0) {
this.processReadKey(reactorEnv, key);
} else if ((readdyOps & SelectionKey.OP_WRITE) != 0) {
this.processWriteKey(reactorEnv, key);
}
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Socket IO err :", e);
}
// 当异常的时候,取消掉
// 该 api 会将当期的 key 添加到 待取消的 集合中
// 下一次 选择的时候 select() 会进行清理,对 channel 与 key 进行解绑
key.cancel();
// 且会把当前的 session 进行关闭
// 在关闭的时候还做了大量的操作
if (reactorEnv.curSession != null) {
reactorEnv.curSession.close(false, "Socket IO err:" + e);
this.allSessions.remove(reactorEnv.curSession);
reactorEnv.curSession = null;
}
}
}
keys.clear();
} catch (IOException e) {
logger.warn("caugh error ", e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
acceptor 的主要选择逻辑是比较简单的,是很经典的一个选择处理; 这里有两个值的学习的地方:
使用了超时的 select 方法
这将会在编程的时减少大量的 wakeup() 操作。降低了编程难度,提高了性能,缺点是造成了 cpu 空转,但是第二个有点就弥补了这个缺点
当选择不繁忙的时候主动进行其他的处理
处理连接的线程,肯定是需要高性能的,所以可以断定 这个其他的处理必然不会很耗时。
主要逻辑完成了,但是这个 acceptor 还不能正常接收连接的,服务端的端口还没有。
# ServerSocketChannel 的初始化
io.mycat.mycat2.ProxyStarter#start
与 构造 acceptor 同一个方法里面,默认是非集群。也是当前所阅读的源码
if (clusterBean.isEnable()) {
// 启动集群
startCluster(runtime, clusterBean, acceptor);
} else {
// 未配置集群,直接启动
startProxy(true);
}
2
3
4
5
6
7
8
/**
* 启动代理
* @param isLeader true 主节点,false 从节点
* @throws IOException
*/
public void startProxy(boolean isLeader) throws IOException {
ProxyRuntime runtime = ProxyRuntime.INSTANCE;
MycatConfig conf = runtime.getConfig();
NIOAcceptor acceptor = runtime.getAcceptor();
ProxyConfig proxyConfig = conf.getConfig(ConfigEnum.PROXY);
ProxyBean proxyBean = proxyConfig.getProxy();
// 主要是这一句 是对 ServerSocketChannel 的初始化
if (acceptor.startServerChannel(proxyBean.getIp(), proxyBean.getPort(), ServerType.MYCAT)){
.....
});
}
....
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
io.mycat.proxy.NIOAcceptor#startServerChannel
public boolean startServerChannel(String ip, int port, ServerType serverType) throws IOException {
final ServerSocketChannel serverChannel = getServerSocketChannel(serverType);
if (serverChannel != null && serverChannel.isOpen()) {
return false;
}
....
// 非集群下的初始化
openServerChannel(selector, ip, port, serverType);
return true;
}
private void openServerChannel(Selector selector, String bindIp, int bindPort, ServerType serverType)
throws IOException {
// 经典式的模板代码
final ServerSocketChannel serverChannel = ServerSocketChannel.open();
final InetSocketAddress isa = new InetSocketAddress(bindIp, bindPort);
serverChannel.bind(isa);
serverChannel.configureBlocking(false);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// 这里注册时候需要注意下,注册了一个 attachment
// 一般的写法是在 连接到此 serverChannel 的 key 进行设置 attachment
// 这里是在注册 serverChannel 的时候设置了一个
// 它的作用是:所有连接到此 serverChannel 的 channel 绑定的 key 中的 attachment 的初始值是 serverChannel 的值
serverChannel.register(selector, SelectionKey.OP_ACCEPT, serverType);
if (serverType == ServerType.CLUSTER) {
} else if (serverType == ServerType.LOAD_BALANCER) {
} else {
logger.info("open proxy server port on {}:{}", bindIp, bindPort);
proxyServerSocketChannel = serverChannel;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
到此为止 acceptor 的工作能正常接受连接了
# 连接处理
io.mycat.proxy.NIOAcceptor#processAcceptKey
protected void processAcceptKey(ReactorEnv reactorEnv, SelectionKey curKey) throws IOException {
ServerSocketChannel serverSocket = (ServerSocketChannel) curKey.channel();
// 接收通道,设置为非阻塞模式
final SocketChannel socketChannel = serverSocket.accept();
socketChannel.configureBlocking(false);
logger.info("new Client connected: " + socketChannel);
ServerType serverType = (ServerType) curKey.attachment();
ProxyRuntime proxyRuntime = ProxyRuntime.INSTANCE;
// 获取附着的标识,即得到当前是否为集群通信端口
if (serverType == ServerType.CLUSTER) {
....
} else if (serverType == ServerType.LOAD_BALANCER &&
proxyRuntime.getMyCLuster() != null &&
proxyRuntime.getMyCLuster().getClusterState() == ClusterState.Clustered) {
....
} else {
// 最后委托了该方法进行处理
accept(reactorEnv,socketChannel,serverType);
}
}
private void accept(ReactorEnv reactorEnv,SocketChannel socketChannel,ServerType serverType) throws IOException {
// 找到一个可用的NIO Reactor Thread,交付托管
ProxyReactorThread<?> nioReactor = getProxyReactor(reactorEnv);
// 将通道注册到reactor对象上
// ProxyReactorThread 实际上是一个 io.mycat.proxy.MycatReactorThread
// 这里实际上已经委托了,这里只是把 这个通道添加到了 reactor的队列中了
nioReactor.acceptNewSocketChannel(serverType, socketChannel);
}
// 从池中获取可用 reactor 线程,做了一个简单的取模,相当于均衡负载
private ProxyReactorThread<?> getProxyReactor(ReactorEnv reactorEnv){
if (reactorEnv.counter++ == Integer.MAX_VALUE) {
reactorEnv.counter = 1;
}
int index = reactorEnv.counter % ProxyRuntime.INSTANCE.getNioReactorThreads();
// 获取一个reactor对象
return ProxyRuntime.INSTANCE.getReactorThreads()[index];
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# reactor 接受并触发 session 绑定
io.mycat.proxy.ProxyReactorThread#acceptNewSocketChannel
public void acceptNewSocketChannel(Object keyAttachement, final SocketChannel socketChannel) {
// 只是把创建session的逻辑放到了队列里面
// 可见这个是一个稍微耗时的方法, acceptor 线程当前阶段的工作就完成了,可以返回处理其他请求了,很高效
pendingJobs.offer(() -> {
try {
T session = sessionMan.createSession(keyAttachement, this.bufPool, selector, socketChannel, true);
allSessions.add(session);
} catch (Exception e) {
e.printStackTrace();
logger.warn("regist new connection err " + e);
}
});
}
2
3
4
5
6
7
8
9
10
11
12
13
那么这里放入队列之后,什么时候处理呢?在 nio 开发中,前面也看到了,利用空闲时间做一点事情,那么这里也一样
io.mycat.proxy.ProxyReactorThread#run
while (true) {
try {
selector.select(SELECTOR_TIMEOUT);
final Set<SelectionKey> keys = selector.selectedKeys();
// logger.info("handler keys ,total " + selected);
if (keys.isEmpty()) {
// 这里很快就能处理刚请求连接到 mycat 的连接
if (!pendingJobs.isEmpty()) {
ioTimes = 0;
this.processNIOJob();
}
continue;
// 从队列中取出,注意的是这里的取出后的运行逻辑
private void processNIOJob() {
Runnable nioJob = null;
while ((nioJob = pendingJobs.poll()) != null) {
try {
// 直接用的 run 而不是 start。同步执行之前用 拉姆达表达式放入队列中的逻辑
nioJob.run();
} catch (Exception e) {
logger.warn("run nio job err ", e);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
重点逻辑来了;session 的绑定
# session 绑定
io.mycat.mycat2.MycatSessionManager#createSession
@Override
public MycatSession createSession(Object keyAttachment, BufferPool bufPool, Selector nioSelector,
SocketChannel frontChannel, boolean isAcceptCon) throws IOException {
if (logger.isInfoEnabled()) {
logger.info("MySQL client connected ." + frontChannel);
}
MycatSession session = new MycatSession(bufPool, nioSelector, frontChannel);
// 第一个IO处理器为Client Authorware
// session 构造出来的时候,给定了一个 io.mycat.mycat2.net.MySQLClientAuthHandler
session.setCurNIOHandler(MySQLClientAuthHandler.INSTANCE);
// 默认为透传命令模式
//session.curSQLCommand = DirectPassthrouhCmd.INSTANCE;
// 向MySQL Client发送认证报文(握手包)
session.sendAuthPackge();
session.setSessionManager(this);
allSessions.add(session);
return session;
}
在构造 MycatSession 的时候,把 channel 与 selecter 相关联的
public AbstractSession(BufferPool bufferPool, Selector selector, SocketChannel channel, int socketOpt)
throws IOException {
this.bufPool = bufferPool;
this.nioSelector = selector;
this.channel = channel;
InetSocketAddress clientAddr = (InetSocketAddress) channel.getRemoteAddress();
this.addr = clientAddr.getHostString() + ":" + clientAddr.getPort();
this.host = clientAddr.getHostString();
SelectionKey socketKey = channel.register(nioSelector, socketOpt, this);
this.channelKey = socketKey;
this.proxyBuffer = new ProxyBuffer(this.bufPool.allocate());
this.sessionId = ProxyRuntime.INSTANCE.genSessionId();
this.startTime = System.currentTimeMillis();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 发送认证包(握手包)
io.mycat.mycat2.MycatSession#sendAuthPackge
/**
* 给客户端(front)发送认证报文
*
* @throws IOException
*/
public void sendAuthPackge() throws IOException {
// 生成认证数据
byte[] rand1 = RandomUtil.randomBytes(8);
byte[] rand2 = RandomUtil.randomBytes(12);
// 保存认证数据
byte[] seed = new byte[rand1.length + rand2.length];
System.arraycopy(rand1, 0, seed, 0, rand1.length);
System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
this.seed = seed;
// 发送握手数据包
HandshakePacket hs = new HandshakePacket();
hs.packetId = 0;
hs.protocolVersion = Version.PROTOCOL_VERSION;
hs.serverVersion = Version.SERVER_VERSION;
hs.threadId = this.getSessionId();
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
// hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
hs.write(proxyBuffer);
// 设置frontBuffer 为读取状态
proxyBuffer.flip();
proxyBuffer.readIndex = proxyBuffer.writeIndex;
// 构造握手包后,发送到 channel
this.writeToChannel();
}
io.mycat.proxy.AbstractSession#writeToChannel
/**
* 从内部Buffer数据写入到SocketChannel中发送出去,readState里记录了写到Socket中的数据指针位置 方法,
*/
public void writeToChannel() throws IOException {
checkBufferOwner(true);
ByteBuffer buffer = proxyBuffer.getBuffer();
buffer.limit(proxyBuffer.readIndex);
buffer.position(proxyBuffer.readMark);
// 往通道写入,完成 发送握手数据包 阶段
int writed = channel.write(buffer);
proxyBuffer.readMark += writed; // 记录本次磁轭如到 Channel 中的数据
if (!buffer.hasRemaining()) {
if (proxyBuffer.readMark != proxyBuffer.readIndex) {
logger.error("writeToChannel has finished but readIndex != readLimit, please fix it !!!");
}
if (proxyBuffer.readIndex > buffer.capacity() * 2 / 3) {
proxyBuffer.compact();
} else {
buffer.limit(buffer.capacity());
}
} else {
/**
* 1. writed==0 或者 buffer 中数据没有写完时,注册可写事件 通常发生在网络阻塞或者 客户端
* COM_STMT_FETCH 命令可能会 出现没有写完或者 writed == 0 的情况
*/
logger.debug("register OP_WRITE selectkey .write {} bytes. current channel is {}", writed, channel);
}
checkWriteFinished();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# 接收认证数据包入口
发送完握手数据包,客户端会解析,并构造认证包(登录)到 mycat;而 mycat 所做的则是 读取并解析认证包,如果用户密码等策略能通过,则完成登录
接收到数据的入口还是在 reactor 中;
io.mycat.proxy.ProxyReactorThread#processReadKey
protected void processReadKey(ReactorEnv reactorEnv, SelectionKey curKey) throws IOException {
// only from cluster server socket
// 拿到之前绑定的 session
T session = (T) curKey.attachment();
reactorEnv.curSession = session;
// 在 session 构造的时候放入的是一个 MySQLClientAuthHandler
session.getCurNIOHandler().onSocketRead(session);
}
2
3
4
5
6
7
8
io.mycat.mycat2.net.MySQLClientAuthHandler#onSocketRead
# 读取解析认证数据包
@Override
public void onSocketRead(MycatSession session) throws IOException {
ProxyBuffer frontBuffer = session.getProxyBuffer();
// resolveMySQLPackage 包大概的功能是对兑取到的数据进行是否是半包的判定
// 判定的原理大致是:mysql 协议前面几个字节就能得到该包的长度,
// 通过包长度和当前 buffer 读取到的数据长度对比就能得到是否是完整的包
// 注意:只是判定这些,而不是真正的解析成 对象;
// 具体的解析是由其他需要完整解析的时候再解析出来,
// 比如下面的代码
// CurrPacketType.Full 标识读取的数据包是完整的
if (session.readFromChannel() == false
|| CurrPacketType.Full != session.resolveMySQLPackage(frontBuffer, session.curMSQLPackgInf, false)) {
return;
}
// 从这里开始处理用户登录相关的逻辑
// 处理用户认证报文
try {
AuthPacket auth = new AuthPacket();
auth.read(frontBuffer);
MycatConfig config = ProxyRuntime.INSTANCE.getConfig();
UserConfig userConfig = config.getConfig(ConfigEnum.USER);
UserBean userBean = null;
for (UserBean user : userConfig.getUsers()) {
if (user.getName().equals(auth.user)) {
userBean = user;
break;
}
}
// check user
if (!checkUser(session, userConfig, userBean)) {
failure(session, ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "' with addr '" + session.addr + "'");
return;
}
// check password
if (!checkPassword(session, userBean, auth.password)) {
failure(session, ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because password is error ");
return;
}
// check mycatSchema
switch (checkSchema(userBean, auth.database)) {
case ErrorCode.ER_BAD_DB_ERROR:
failure(session, ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + auth.database + "'");
break;
case ErrorCode.ER_DBACCESS_DENIED_ERROR:
String s = "Access denied for user '" + auth.user + "' to database '" + auth.database + "'";
failure(session, ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
break;
default:
// set mycatSchema
if (auth.database == null) {
session.mycatSchema = (userBean.getSchemas() == null) ?
config.getDefaultSchemaBean() : config.getSchemaBean(userBean.getSchemas().get(0));
} else {
session.mycatSchema = config.getSchemaBean(auth.database);
}
if (Objects.isNull(session.mycatSchema)) {
logger.error(" mycatSchema:{} can not match user: {}", session.mycatSchema, auth.user);
}
logger.debug("set mycatSchema: {} for user: {}", session.mycatSchema, auth.user);
if (success(session, auth)) {
session.clientUser=auth.user;//设置session用户
session.proxyBuffer.reset();
session.answerFront(AUTH_OK);
// 认证通过,设置当前SQL Handler为默认Handler
// 在这里切换了 session 的处理器。只要登录认证通过之后,就由该处理器来
// 来进行处理后续的数据交互等。也就是可以进行发送查询语句了
session.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE);
}
}
} catch (Throwable e) {
logger.warn("Frontend FrontendAuthenticatingState error:", e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# 总结
到这里 mycat 怎么初始化 reactor 模式的,怎么接收 mycat 客户端的登录认证的流程已经 ok 了;
主要的几个概念主角类:
- NIOAcceptor :处理客户端连接
- MycatSession :绑定客户端的 channel 和保存当前会话的一些状态等
- MycatReactorThread :处理读写数据的调度
- MySQLClientAuthHandler :处理握手认证功能
大致步骤:
- 客户端请求连接到 mycat
- NIOAcceptor 进行初始化操作构造绑定 session
- 委托 MySQLClientAuthHandler 处理登录认证
- MySQLClientAuthHandler 发送握手包
- 客户端接到 mycat 的握手包,进行响应发送 验证包(登录)
- MySQLClientAuthHandler 读取认证包解析并验证用户名密码等相关权限是否ok
- 通过之后更换处理器为 DefaultMycatSessionHandler。登录认证完成,可以进行命令的交互了