部署平台分布式锁设计


[TOC]


摘要

该文档针对平台部署服务过程中遇到的临界资源互斥同步问题所设计的解决方案进行详细阐述。

背景

平台支持对服务的安装,更新,启停,备份,卸载,查询等操作,由平台发起命令,agent执行任务,由于agent是多线程并发处理任务,试想一下,多用户对主机上同一服务的进行操作,例如同时卸载和备份,会出现什么可怕的情况。老版本的所有操作,都是人工通过简单if else状态判断,没有从根本上发现问题的本质,并且耦合了业务属性,没有对其进行足够的抽象。

主要名词解释

英文名称或缩写 名词解释
curator 操作zookeeper客户端
zookeeper 一个分布式的,应用程序协调服务
平台 部署平台
agent 前置执行脚本工具

方案设计

平台锁提供了两种实现方式,一种是基于jdk原生的ReentrantLock,另一种是基于zookeeper的分布式锁,两种方式更有优劣,原生方式的好处是不需要引入第三方组件,由自己在内存中维护锁,zookeeper好处是如果未来平台支持集群,那么锁服务可以无缝衔接。

zookeeper分布式锁

zookeeper分布式锁实现可以参考《从Paxos到Zookeeper分布式一致性原理与实现》里面的分布式锁,也可以直接用curator,封装的很好,里面有实现。

排它锁(截图来自从Paxos到zookeeper)

共享锁 (截图来自从Paxos到zookeeper)


锁的设计是针对一个项目下单台机器上的单台服务,这样既能保证锁的细度,又能保证agent并发操作多服务。锁的路径标记形如“/projectId/sericeId/serviceName”的形式(模仿zookeeper的树形结构), 通常情况下 projectId和serviceId就能确定一个唯一服务,加上serviceName可以作为调试信息。锁路径的生成由统一的构造器LockPathBuilder生成,保证形式统一。

ReentrantServiceLock由自己编写一个key:value 的map,来维护锁的信息,zookeeper则不需要为此考虑。

类设计

锁的设计采用工厂方法设计模式,通过工厂接口和锁接口面向接口开发,可方便扩展其他锁的实现方式,利用统一的LockPathBuilder来生成锁的路径,保证路径的规范统一,借助于javaSPI机制,可以在不修改代码的情况下替换应用默认的锁实现方式。

流程图

  • 构建当前操作服务的锁的路径
  • 获取锁
  • 上锁
  • 对agent发命令进行操作
  • 解锁

使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
String lockPath = LockPathBuilder.newLock().withProjectId(service.getProjectId())
.withServiceName(service.getAbbreviation())
.withServiceId(serviceId)
.build(); //lockPath创建

//删除备份所需要的参数
param.put("serviceName", service.getAbbreviation());
param.put("backupPath", path);
ServiceLock serviceLock = SERVICE_LOCK_FACTORY.getLock(lockPath);//获取此服务锁
serviceLock.lock(); //上锁
//对此agent发送命令
Future future = agentService.synPushTask(ip, ScriptConstant.remove_service_backup, param, UUID.randomUUID().toString(), AgentResultMsg.class);
AgentResultMsg result = null;
try {
result = (AgentResultMsg) future.get(20, TimeUnit.SECONDS);//等待结果时间
} catch (TimeoutException e) {
restResult.setSuccess(false);
restResult.setMessage("删除备份超时");
} catch (Exception e) {
logger.error(e);
} finally {
//释放锁
serviceLock.unlock();
}

上述过程有个隐含的步骤,那就是一定要有超时机制,不然agent挂掉,一直不返回数据,会造成死锁,平台在这一版本中支持了同步调用agent,保证能在有限的时间内完成调用,并在同一个线程中释放锁。

关于同步调用如何实现会在以后的案例中分析。

测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* Created by liuroy on 2017/9/25.
*/
public class ReentrantServiceLockTest {

@Test
public void testServiceLock() {
ServiceLockFactory lockFactory = SpiServiceFactory.getService(ServiceLockFactory.class);
Runnable task1 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("test1");
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
Thread.sleep(3000);
System.out.println("Thread " + Thread.currentThread().getId() + " running");
System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};

new Thread(task1).start();
Runnable task2 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("test1");
if (!lock.tryLock(2000, TimeUnit.MILLISECONDS)){
System.out.println("Thread " + Thread.currentThread().getId() + " unget lock");
}
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
System.out.println("Thread " + Thread.currentThread().getId() + " running");
Thread.sleep(3000);
System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};

Runnable task3 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("test1");
lock.lock();
System.out.println("Thread " + Thread.currentThread().getId() + " getted lock");
System.out.println("Thread " + Thread.currentThread().getId() + " running");
Thread.sleep(4000);
System.out.println("Thread " + Thread.currentThread().getId() + " run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};

new Thread(task2).start();
new Thread(task3).start();


try {
Thread.sleep(20000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* Created by liuroy on 2017/9/25.
*/
public class ZookeeperServiceLockTest {

private ServiceLockFactory lockFactory;

@Before
public void init() {
lockFactory = new ZookeeperServiceLockFactory();
}

@Test
public void testServiceLock() {
Runnable task1 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("testmmm/test");
lock.lock();
System.out.println("Thread1 getted lock");
Thread.sleep(3000);
System.out.println("Thread1 running");
System.out.println("Thread1 run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};

new Thread(task1).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Runnable task2 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("testmmm/test");
if (!lock.tryLock(2000, TimeUnit.MILLISECONDS)){
System.out.println("Thread2 unget lock");
}
lock.lock();
System.out.println("Thread2 getted lock");
System.out.println("Thread2 running");
Thread.sleep(3000);
System.out.println("Thread2 run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};

Runnable task3 = new Runnable() {
public void run() {
ServiceLock lock = null;
try {
lock = lockFactory.getLock("testmmm/test");
lock.lock();
System.out.println("Thread3 getted lock");
System.out.println("Thread3 running");
Thread.sleep(4000);
System.out.println("Thread3 run oven");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lock != null)
lock.unlock();
}
}
};
new Thread(task3).start();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(task2).start();



try {
Thread.sleep(20000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}

@After
public void fini() {
}
}