# 基于 document 锁实现悲观锁并发控制
全局锁一次性就锁整个 index,对这个 index 的所有增删改操作都会被 block 住,如果上锁不频繁,还可以,比较简单
细粒度的一个锁:document 锁,每次只对你要操作的 doc 进行上锁。不影响其他 doc
document 锁的思路,是用 脚本 进行上锁,如下
POST /fs/lock/1/_update
{
"upsert": { "process_id": 123 },
"script": {
"lang": "groovy",
"inline": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';", "params": {
"process_id": 123
}
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
upsert:指定的 doc 不存在的时候使用该内容初始化 doc
script:指定的 doc 存在的时候使用该脚本
params:script 中可以使用
process_id:
充当一个唯一标识,比如:某个服务实例 ID + 当前操作的线程 ID 组成一个唯一标识
assert false,不是当前进程加锁的话,则抛出异常
ctx.op='noop',不做任何修改
TIP
这里需要传入参数,会导致 es 识别脚本语言有误,所以这里需要明确协商 groovy; 同时,inline 默认是禁止的,之前讲过用文件方式,文件方式是默认支持的
开启 inline 需要在 elasticsearch.yml 配置文件中增加属性 script.inline: on
完成之后需要重启 es
当 process_id 相等的时候返回的数据如下
{
"_index": "fs",
"_type": "lock",
"_id": "1",
"_version": 3,
"result": "noop",
"_shards": {
"total": 0,
"successful": 0,
"failed": 0
}
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
当 process_id 不相等的时候返回的数据如下
{
"error": {
"root_cause": [
{
"type": "remote_transport_exception",
"reason": "[KHsngUp][127.0.0.1:9300][indices:data/write/update[s]]"
}
],
"type": "illegal_argument_exception",
"reason": "failed to execute script",
"caused_by": {
"type": "script_exception",
"reason": "error evaluating if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';",
"caused_by": {
"type": "power_assertion_error",
"reason": "assert false\n"
},
"script_stack": [],
"script": "",
"lang": "groovy"
}
},
"status": 400
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
还可以执行下 refresh 加速生效
POST /fs/_refresh
1
加锁成之后就可以处理自己的业务操作了。处理完成之后释放锁
DELETE /fs/lock/1
1
或者使用批量语法解锁
PUT /fs/lock/_bulk
{ "delete": { "_id": 1}}
1
2
2