# mysql select 流程 - 响应
上一张讲解到:
- 语句执行 :io.mycat.backend.mysql.nio.MySQLConnection#execute
- 底层 NIO 写操作:io.mycat.net.NIOSocketWR#doNextWriteCheck
一个 MySQLConnection 是有一个 NIOSocketWR 的(负责读写),是在父类中声明的。
那么入口就很有可能会在这里,而真正的源头大家都知道,是 Reactor 模型中的 NIOReactor.
目前大概涉及了这些主要类
接着继续
# NIOSocketWR.asynRead
debug 打点的地方有两处(因为一次 sql 发送,navicat 会发送多条语句),
这样就能清楚的找到下一次确实是查询 sql 的响应包
io.mycat.net.FrontendConnection#query(byte[])
io.mycat.net.NIOSocketWR#asynRead
1
2
3
4
5
2
3
4
5
io.mycat.net.AbstractConnection#handle
@Override
public void handle(byte[] data) {
if (isSupportCompress()) {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(data, decompressUnfinishedDataQueue);
for (byte[] pack : packs) {
if (pack.length != 0) {
handler.handle(pack);
}
}
} else {
// 这里的实例是 MySQLConnectionHandler
handler.handle(data);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
io.mycat.backend.mysql.nio.MySQLConnectionHandler#handle
@Override
public void handle(byte[] data) {
// 该方法是父类的 BackendAsyncHandler
offerData(data, source.getProcessor().getExecutor());
}
public abstract class BackendAsyncHandler implements NIOHandler {
protected void offerData(byte[] data, Executor executor) {
handleData(data);
}
protected abstract void offerDataError();
// 这里 BackendAsyncHandler 的实例还是 MySQLConnectionHandler
protected abstract void handleData(byte[] data);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 包解析
这里的包解析,需要先了解协议,这里简约的了解下,因为对于响应来说,最后形成一个结果, 在 handleData 中是有状态的。
该知识点可以参考 demo (opens new window)
响应包的结构大致如下:
- 第一个包:列个数;从sql中select xxx,xxx from 中的列个数
- 第二部分:列定义,每个列定义为一个包
- 第三部分:列定义后面跟随一个eof包
- 第四部分:返回值,一行一个包;行内字段顺序和前面的顺序一致
- 第五部分:结果后面跟随一个eof包
TIP
这里需要更改下策略:不要使用 navicat 来,因为在 debug 响应结果的时候,很麻烦。 nio 里面就算一个线程也能处理多个 socket 的交互,有一些 show status 的命令老是乱入。
这里更换为:mysql 自带的命令行来查询:
$ mysql -uroot -p123456 -P8066
$ use TESTDB
$ select * from employee;
io.mycat.backend.mysql.nio.MySQLConnectionHandler#handleData
private volatile List<byte[]> fields;
@Override
protected void handleData(byte[] data) {
switch (resultStatus) {
// 第一阶段
case RESULT_STATUS_INIT:
switch (data[4]) {
case OkPacket.FIELD_COUNT:
handleOkPacket(data);
break;
case ErrorPacket.FIELD_COUNT:
handleErrorPacket(data);
break;
case RequestFilePacket.FIELD_COUNT:
handleRequestPacket(data);
break;
default:
// 获取列个数
resultStatus = RESULT_STATUS_HEADER;
header = data;
// 并初始化列数组
fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,
4));
}
break;
// 第二阶段,获取列定义数据包
case RESULT_STATUS_HEADER:
switch (data[4]) {
case ErrorPacket.FIELD_COUNT:
resultStatus = RESULT_STATUS_INIT;
handleErrorPacket(data);
break;
case EOFPacket.FIELD_COUNT:
// 根据协议,列定义后面有一个 eof 包
resultStatus = RESULT_STATUS_FIELD_EOF;
handleFieldEofPacket(data);
break;
default:
// 会走三次,因为这次测试的表中只有三个字段
fields.add(data);
}
break;
// 第三阶段,行数据包解析
case RESULT_STATUS_FIELD_EOF:
switch (data[4]) {
case ErrorPacket.FIELD_COUNT:
resultStatus = RESULT_STATUS_INIT;
handleErrorPacket(data);
break;
case EOFPacket.FIELD_COUNT:
resultStatus = RESULT_STATUS_INIT;
handleRowEofPacket(data);
break;
default:
handleRowPacket(data);
}
break;
default:
throw new RuntimeException("unknown status!");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
数据包结束处理
io.mycat.backend.mysql.nio.MySQLConnectionHandler#handleFieldEofPacket
/**
* 字段数据包结束处理
*/
private void handleFieldEofPacket(byte[] data) {
ResponseHandler respHand = responseHandler;
if (respHand != null) {
respHand.fieldEofResponse(header, fields, data, source);
} else {
closeNoHandler();
}
}
这里的实例是
io.mycat.backend.mysql.nio.handler.MultiNodeQueryHandler#fieldEofResponse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
具体包解析的时候,看上去非常的复杂,中间涉及到,结果集合并,和排序等操作。本章节就不继续深入了。 最后会走该方法,把结果真正写出到 mycat 的客户端; 后面再单独的去跟中这个处理流程
io.mycat.backend.mysql.nio.handler.MultiNodeQueryHandler#outputMergeResult(io.mycat.server.ServerConnection, byte[], java.util.Iterator<io.mycat.memory.unsafe.row.UnsafeRow>, java.util.concurrent.atomic.AtomicBoolean)
}else{
ByteBuffer byteBuffer = source.writeToBuffer(eof, buffer);
/**
* 真正的开始把Writer Buffer的数据写入到channel 中
*/
session.getSource().write(byteBuffer);
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
这是剩余部分的包,一次性没有发完,发了两个包过来; 行的结果值,一行一个,再带一个 eof 包
# 小结
- NIOReactor 监听 mysql 发送回来的响应信息
- 具体 MySQLConnection.NIOSocketWR、MySQLConnectionHandler 解析包信息
- MultiNodeQueryHandler 最终进行结果集合并等操作,等待接收到最后一个 eof 包。 开始进行业务处理,并且把处理完的结果集构造成 mysql 协议包,返回到 mycat 的客户端
在之前看到过,一个前端连接会绑定一个 mysql 连接,当路由计算的时候 会创建 MultiNodeQueryHandler 出来,相当于 一个前端连接会绑定一个 处理器,当有结果的时候,就能回调具体的前端连接,进行 nio 的写出