多线程设计模式5-single threaded Execution模式(附分布式环境下的操作)
single threaded Execution模式主要是用于确保同一时间内只能让一个线程执行处理,说通俗点就是对synchronized的标准化使用方式,这是比较基础的,所以我们前面重点介绍下如何保证同一个Jvm进程内的多线程同步,后面扩展开来,保证多个Jvm进程间多线程同步(分布式环境)。两者有很大的相似性。
单jvm进程下:
先看下一个简单的例子:
public class Ticket {
private int counter = 100;
public void dec() {
if(counter>0) {
System.out.println(Thread.currentThread().getName() + "号窗口卖出:" + this.counter-- + "号票");
}else{
System.out.println("票已售完");
}
}
}
public class StationThread extends Thread{
Ticket ticket;
public StationThread(Ticket ticket) {
this.ticket = ticket;
}
@Override
public void run() {
while (true) {
try{
Thread.sleep(500);
}catch(InterruptedException e){
e.printStackTrace();
}
ticket.dec();
}
}
}
public class Main {
public static void main(String[] args) {
System.out.println("Testing...");
Ticket ticket = new Ticket();
new StationThread(ticket).start();
new StationThread(ticket).start();
new StationThread(ticket).start();
}
}
这里打印结果可知,执行明显是错误的:
Testing...
Thread-0号窗口卖出:100号票
Thread-1号窗口卖出:99号票
Thread-2号窗口卖出:98号票
Thread-1号窗口卖出:97号票
Thread-2号窗口卖出:97号票
Thread-0号窗口卖出:97号票
Thread-2号窗口卖出:96号票
Thread-1号窗口卖出:96号票
Thread-0号窗口卖出:96号票
......
为什么会出错呢?因为Ticket不是线程安全的,this.counter—并不是一个原子性的操作,其中包含了读取,修改,写入,多个线程执行的时候,这些命令会交错执行,导致执行结果与预期不一致。
接下来,我们改下Ticket的方法:
public synchronized void dec() {
if(counter>0) {
System.out.println(Thread.currentThread().getName() + "号窗口卖出:" + this.counter-- + "号票");
}else{
System.out.println("票已售完");
}
}
添加了synchronized后,执行结果正常:
Testing...
Thread-0号窗口卖出:100号票
Thread-1号窗口卖出:99号票
Thread-2号窗口卖出:98号票
Thread-0号窗口卖出:97号票
Thread-1号窗口卖出:96号票
Thread-2号窗口卖出:95号票
Thread-0号窗口卖出:94号票
Thread-1号窗口卖出:93号票
Thread-2号窗口卖出:92号票
Thread-1号窗口卖出:91号票
Thread-2号窗口卖出:90号票
Thread-0号窗口卖出:89号票
Thread-1号窗口卖出:88号票
Thread-0号窗口卖出:87号票
Thread-2号窗口卖出:86号票
Thread-1号窗口卖出:85号票
Thread-2号窗口卖出:84号票
Thread-0号窗口卖出:83号票
Thread-1号窗口卖出:82号票
Thread-2号窗口卖出:81号票
Thread-0号窗口卖出:80号票
Thread-1号窗口卖出:79号票
Thread-2号窗口卖出:78号票
Thread-0号窗口卖出:77号票
......
synchronized保证了方法只能由一个线程,防止了由多个线程交错执行的情况。我们知道,编写线程安全的代码,核心在于对状态访问操作进行管理,特别是共享和可变的状态,这里Ticket就是一个共享资源(SharedResource),通过single thread execution模式,将非安全的方法声明为synchronized方法,确保同一时间只被一个线程访问。
使用时注意事项:
1、死锁问题:
死锁指多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。就如同一座桥,只能容纳一辆车,有两辆车,相向而行,分到桥的中途,需要占据对方的空间才能通过。因此,就一直处于阻塞状态。
死锁主要是由于多个线程加锁顺序不一致导致的,可以从这里角度分析,防止死锁。
2、性能
获取锁花费时间,线程冲突会引起等待,为了提高性能,需要管理好锁的临界区,确定同步代码块的合理大小。
多jvm进程下:
分布式锁的方式:
一、通过如下命令在redis中实现分布式锁的功能
SET key value NX EX max-lock-time
其中NX是操作模式,表示只在键不存在时,才对键进行设置操作。
EX max-lock-time用于设置键的过期时间为max-lock-time秒。
这个命令连续两次执行结果如下:
test-redis:0>SET not-exists-key "value" NX EX 60
OK
test-redis:0>SET not-exists-key "value" NX EX 60
NULL
过六十秒后再执行:
test-redis:0>SET not-exists-key "value" NX EX 60
OK
这样,当一个线程执行命令成功,说明key原本不存在,该线程成功得到了锁;当设置失败时,说明key已经存在,该线程抢锁失败。
当到达过期时间,或者key被删除(del),说明锁被释放,其他线程可以继续执行这个命令来获取锁。
通过redis实现分布式锁有许多实现细节需要注意的:
1、很多人会通过setnx代替SET key value NX命令,但前者没有设置过期时间的参数,因此设置key值和设置过期时间便成为一个复合操作,不具备原子性,当一个线程设置了key值,但未设置过期时间,这时相关的节点挂了,但key一直存在,那其他线程就永远无法获取这个锁了(死锁)。
2、过期时间很难设置,如果设置短了,假设获得锁的A线程的任务还没执行完成,这时候锁就被释放了,其他线程就会获得锁,导致难以预料的一系列后果。如A线程执行完后误删了后一个线程的锁,共享数据被破坏等等。对此,我们可以通过开一个守护线程,当线程任务未执行完成,给锁续期。
二、zookeeper实现分布式锁
zookeeper分布式锁,实现更加完善,封装更好一点,因此,使用更加方便。
首先介绍下zookeeper分布式锁的实现原理。
为了构建这个锁,zookeeper创建一个持久的znode,它将作为父节点。试图获得锁的客户端将在父节点下面创建顺序的、临时的子节点。锁是由子节点具有最低的序列号的客户端进程拥有的。在图1中,锁节点有三个子节点,而节点1在这个时间点拥有锁,因为它的序列号是最低的。如果客户端创建的节点不是最小节点,就获得该节点的上一顺序节点,并给它注册watcher,同时在这里阻塞,等待监听事件的发生。当完成之后,关闭ZooKeeper连接,进而可以引发监听事件,释放该锁(在删除节点1之后,锁被释放), 然后拥有节点2的客户端拥有这个锁,以此类推。
zookeeper实现类似等待队列的机制,大大提升了抢锁的效率。
另外我们来看下,zookeeper有没有类似redis分布式锁那样的问题。我们发现它是不需要设置过期时间的,当任务完成时,客户端会删除节点,进而释放锁;当客户端挂掉,相应的临时会自动删除,锁被释放,其下一个序列的节点会收到通知,获取锁;当连接中断时则根据配置的重试机制重新连接。
Apache Curator,包含了对zookeeper分布式锁的实现,下面是使用代码,有兴趣可以研究下源码:
1、创建一个重试策略,然后使用CuratorFrameworkFactory.newClient()来获得CuratorFramework的实例
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
client.start();
2、为特定的锁路径(lockPath)创建一个进程互斥锁,获取锁,执行一些操作,然后释放锁。
InterProcessLock lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {
try {
// do work while we hold the lock
} catch (Exception ex) {
// handle exceptions as appropriate
} finally {
lock.release();
}
} else {
// we timed out waiting for lock, handle appropriately
}
3、不要忘记关闭client
client.close();