一、高性能生产者-消费者:无锁实现
BlockigQueue
使用锁和阻塞等待来实现线程间的同步,而ConcureentLinkedQueue
使用大量的无锁CAS操作,相比BlockigQueue
的性能要好的多。但是使用CAS进行编程是非常困难的,不过现成的Disruptor
框架帮我们实现了这一功能。
1.1 无锁的缓冲框架:Disruptor
Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列,它使用无锁的方式实现了一个环形队列,非常适合生产者-消费者模式。在Disruptor中,使用了环形队列来代替普通的线性队列,这个环形队列内部实现是一个普通的数组。对于一般的队列,势必要提供队列头部head和尾部tail两个指针用于出队和入队,这样无疑就增加了线程协作的复杂度。但如果队列的环形的,则只需要提供一个当前队列的位置cursor,利用这个cursor既可以出队也可以入队。由于是环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。为了能够快速从一个序列sequence对应数组的实际位置(每次有元素入队,序列就加1),Disruptor要求我们必须将数组的大小设置为2的整数次方。这样通过sequence&(queueSize-1)就能立即定位到实际的元素位置index。这个要比取余(%)操作快得多。
如图所示,显示了RingBuffer的结构,生产者向缓冲区中写入数据,而消费者从中读取数据,生产者写入数据使用CAS操作,消费者读取数据时,为了防止多个消费者处理同一个数据,也使用CAS操作进行保护。这种固定大小的环形队列的另一个好处就是可以做到完全内存复用。在系统运行过程中,不会有新的空间需要分配或者老的空间需要回收。因此,可以大大减少系统分配空间以及回收空间的额外开销。
1.2 生产者-消费者案例
这里使用的Disruptor版本是disruptor-3.3.2。这里生产者不断产生证书,消费者读取生产者的数据,并计算其平方。
代表数据的PCData:
public class PCData {
private long value;
public void set(long value) {
this.value = value;
}
public long get() {
return value;
}
}
消费者实现为WorkHandler接口,它来自Disruptor框架:
public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData event) throws Exception {
System.out.println(Thread.currentThread().getId() + ":Event: --" +
event.get() * event.get() + "--");
}
}
消费者的作用是读取数据进行处理。这里,数据的读取已经由Disruptor进行封装,onEvent()方法为框架的回调方法。因此,这个只需要简单地进行数据处理即可。
PCData的工厂类。它会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例:
public class PCDataFactory implements EventFactory<PCData>{
@Override
public PCData newInstance() {
return new PCData();
}
}
生产者:
public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer byteBuffer){
long sequence = ringBuffer.next();
try {
PCData event = ringBuffer.get(sequence);
event.set(byteBuffer.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
生产者需要一个RingBuffer的引用,也就是环形缓冲区。它有一个重要的方法pushData()将产生的数据推入缓冲区。方法pushData()接收一个ByteBuffer对象。在ByteBuffer中可以用来包装任何数据类型。pushData()的功能就是将传入的ByteBuffer中的数据提取出来,并装载到环形缓冲区中。
上述第12行代码,通过next()方法得到下一个可用的序列号。通过序列号,取得下一个空闲可用的PCData,并且将PCData的数据设为期望值,这个值最终会传递给消费者。最后,在第21行,进行数据发布。只有发布后的数据才会真正被消费者看见。
至此,我们的生产者、消费者和数据都已经准备就绪。只差一个统筹规划的主函数将所有内容整合起来:
public static void main(String[] args) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
EventFactory<PCData> factory = new EventFactory<PCData>() {
@Override
public PCData newInstance() {
return new PCData();
}
};
//设置缓冲区大小,一定要是2的整数次幂
int bufferSize = 1024;
WaitStrategy startegy = new BlockingWaitStrategy();
//创建disruptor,它封装了整个Disruptor的使用,提供了一些便捷的API.
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory, bufferSize, executor, ProducerType.MULTI, startegy);
//设置消费者,系统会将每一个消费者实例映射到一个系统中,也就是提供4个消费者线程.
disruptor.handleEventsWithWorkerPool(new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());
//启动并初始化disruptor系统.
disruptor.start();
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
//创建生产者
Producer productor = new Producer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
//生产者不断向缓冲区中存入数据.
for (long l=0;true;l++){
byteBuffer.putLong(0,l);
productor.pushData(byteBuffer);
Thread.sleep(new Random().nextInt(500));
System.out.println("add data "+l);
}
}
Disruptor的性能要比BlocakingQueue至少高一个数量级以上。
1.3 提高消费者的响应时间:选择合适的策略
Disruptor为我们提供了几个策略,这些策略由WaitStrategy接口进行封装。
- BlockingWaitStrategy:默认策略。和BlockingQueue是非常类似的,他们都使用了Lock(锁)和Condition(条件)进行数据监控和线程唤醒。因为涉及到线程的切换,BlockingWaitStrategy策略是最省CPU的,但在高并发下性能表现是最差的一种等待策略。
- SleepingWaitStrategy:这个策略也是对CPU非常保守的。它会在循环中不断等待数据。它会先进行自旋等待,如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1)进行线程休眠,以确保不占用太多的CPU数据。因此,这个策略对于数据处理可能产生比较高的平均延时。适用于对延时要求不是特别高的场合,好处是他对生产者线程的影响最小。典型的场景是异步日志。
- YieldWaitStrategy:用于低延时场合。消费者线程会不断循环监控缓冲区变化,在循环内部,它会使用Thread.yield()让出CPU给别的线程执行时间。如果需要高性能系统,并且对延迟有较高要求,则可以考虑这种策略。这种策略相当于消费者线程变成了一个内部执行Thread.yield()的死循环,因此最好有多于消费者线程的逻辑CPU(“双核四线程”中的四线程),否则整个应用会受到影响。
- BusySpinWaitStrategy:疯狂等待策略。它就是一个死循环,消费者线程会尽最大努力监控缓冲区的变化。它会吃掉CPU所有资源。所以只在非常苛刻的场合使用它。因为这个策略等同于开一个死循环监控。因此,物理CPU数量必须大于消费者线程数。因为如果是逻辑核,那么另外一个逻辑核必然会受到这种超密集计算的影响而不能正常工作。
1.4 CPU Cache的优化:解决伪共享问题
我们知道,为了提高CPU的速度,CPU有一个高速缓存Cache。在高速缓存中,读写数据的最小单位是缓存行(Cache Line),它是主内存(memory)复制到 缓存(Cache)的最小单位,一般为32~128byte(字节)。
假如两个变量存放在同一个缓存行中,在多线程访问中,可能互相影响彼此的性能。如图,运行在CPU1上的线程更新了X,那么CPU2伤的缓存行就会失效,同一行的Y即使没有修改也会变成无效,导致Cache无法命中。接着,如果在CPU2上的线程更新了Y,则导致CPU1上的缓存行又失效(此时,同一行的X)。这无疑是一个潜在的性能杀手,如果CPU经常不能命中缓存,那么系统的吞吐量会急剧下降。
为了使这种情况不发生,一种可行的做法就是在X变量前后空间都占据一定的位置(暂叫padding,用来填充Cache Line)。这样,当内存被读入缓存中时,这个缓存行中,只有X一个变量实际是有效的,因此就不会发生多个线程同时修改缓存行中不同变量而导致变量全体失效的情况。
具体实现如下:
public class FalseSharing implements Runnable {
public final static int NUM_THREADS = 4;
public final static long ITERATIONS = 500L * 1000L * 1000L;
private final int arrayIndex;
private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
static {
for(int i=0; i<longs.length; i++) {
longs[i] = new VolatileLong();
}
}
public FalseSharing(final int arrayIndex) {
this.arrayIndex = arrayIndex;
}
public static void main(String[] args) throws Exception {
final long start = System.currentTimeMillis();
runTest();
System.out.println("duration = " + (System.currentTimeMillis() - start));
}
private static void runTest() throws InterruptedException {
Thread[] threads = new Thread[NUM_THREADS];
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(new FalseSharing(i));
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
t.join();
}
}
@Override
public void run() {
long i = ITERATIONS + 1;
while(0 != --i) {
longs[arrayIndex].value = i;
}
}
public final static class VolatileLong {
public volatile long value = 0L;
public long p1, p2, p3, p4, p5, p6, p7;
}
}
在VolatileLong中,准备了7个long型变量用来填充缓存。实际上,只有VolatileLong.value是会被使用的。而那些p1、p2等仅仅用于将数组第一个VolatileLong.value和第二个VolatileLong.value分开,防止它们进入同一个缓存行。
Disruptor框架充分考虑了这个问题,它的核心组件Sequence会被非常频繁的访问(每次入队,它都会被加1),其基本结构如下:
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding {
//省略具体实现
}
虽然在Sequence中,主要使用的只有value。但是,通过LhsPadding和RhsPadding,在这个value的前后安置了一些占位空间,使得value可以无冲突的存在于缓存中。此外,对于Disruptor的环形缓冲区RingBuffer,它内部的数组是通过以下语句构造的:
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
实际产生的数组大小是缓冲区实际大小再加上两倍的BUFFER_PAD。这就相当于在这个数组的头部和尾部两段各增加了BUFFER_PAD个填充,使得整个数组被载入Cache时不会受到其他变量的影响而失效。
二、Furture模式
Furture模式的核心思想是异步调用。当我们需要调用一个函数方法时,可能不急着要结果,让它在后台慢慢处理这个请求,此时调用者可以先处理其它任务,在真正需要数据的时候再去尝试获取需要的数据。
2.1 Furture模式的主要角色
Furture模式的主要参与者如下:
参与者 | 作用 |
---|---|
Main | 系统启动,调用Client发出请求 |
Client | 返回Data对象,立即返回FurtureData,并开启ClientThread线程装配RealData |
Data | 返回数据的接口 |
FurtureData | Future数据,构造很快但是是一个虚拟的数据,需要装配RealData |
RealData | 真实数据,其构造比较慢 |
它的核心结构如图所示:
2.2 Future模式的简单实现
在这个实现中,有一个核心接口Data,这就是客户端希望获取的数据。在Future模式中,这个Data接口有两个重要的实现,分别是RealData,也就是真是数据,这就是我们最终需要获得的,有价值的信息。另外一个就是FutureData,它就是用来提取RealData的一个“订单”。因此FutureData是可以立即返回得到。
下面是Data接口:
public interface Data {
public String getResult();
}
FutureData实现了一个快速返回的RealData包装。它只是一个包装,或者说是一个RealData的虚拟实现。因此,它可以很快被构造并返回。当使用FutrueData的getResult()方法的时候,程序阻塞,直到RealData准备好并注入到FutureData中,才最终返回数据。FutureData是Future模式的关键,它实际上是真实数据RealData的代理,封装了获取RealData的等待过程。
public class FutureData implements Data {
RealData realData = null; //FutureData是RealData的封装
boolean isReady = false; //是否已经准备好
public synchronized void setRealData(RealData realData) {
if(isReady)
return;
this.realData = realData;
isReady = true;
notifyAll(); //RealData已经被注入到FutureData中了,通知getResult()方法
}
@Override
public synchronized String getResult() throws InterruptedException {
if(!isReady) {
wait(); //一直等到RealData注入到FutureData中
}
return realData.getResult();
}
}
RealData是最终需要使用的数据模型。它的构造很慢。在这里,使用sleep()函数模拟这个过程,简单地模拟一个字符串的构造。
public class RealData implements Data {
protected String data;
public RealData(String data) {
//利用sleep方法来表示RealData构造过程是非常缓慢的
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
}
@Override
public String getResult() {
return data;
}
}
Client主要实现了获取FutureData,并开启构造RealData的线程。并在接受请求后,很快的返回FutureData。注意,它不会等待将数据真的构造完毕再返回,而是立即返回FutureData,即使这个时候FutureData并没有真实数据。
public class Client {
public Data request(final String string) {
final FutureData futureData = new FutureData();
new Thread(new Runnable() {
@Override
public void run() {
//RealData的构建很慢,所以放在单独的线程中运行
RealData realData = new RealData(string);
futureData.setRealData(realData);
}
}).start();
return futureData; //先直接返回FutureData
}
}
最后,就是主函数Main,它主要负责调用Client发起请求,并消费返回的数据。
public static void main(String[] args) {
Client client = new Client();
Data data = client.request("name");
System.out.println("请求完毕");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("数据 = " + data.getResult());
}
2.3 JDK中的Future模式
下面是JDK内置Future模式的基本结构。
[图片上传失败...(image-283e98-1536761892098)]
其中Future接口就类似于订单或者说是契约。通过它,可以得到真实的数据。RunnableFuture继承了Future和Runnable两个接口,其中run()方法用于构造真实的数据。它有一个具体的实现FutureTask类。FutureTask有一个内部的Sync,一些实质性工作,会委托Sync类实现。而Sync类最终会调用Callable接口,完成实际数据的组装工作。
Callable()接口只有一个方法call(),它会返回需要构造的实际数据。这个Callable接口也是这个Future框架和应用程序之间的重要接口。如果我们要实现自己的业务系统,通常需要实现自己的Callable对象。此外,FutureTask类也与应用程序密切相关,通常,我们会使用Callable实例构造一个FutureTask实例,并将它提交给线程池。下面将展示内置的Future模式的使用:
public class RealData implements Callable<String> {
private String para;
public RealData(String para) {
this.para = para;
}
@Override
public String call() throws Exception {
StringBuffer sb = new StringBuffer();
for(int i=0; i<10; i++) {
sb.append(para);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
return sb.toString();
}
}
上述代码实现了Callable接口,它的call()方法会构造我们需要的真实数据并返回。当然这个过程可以是缓慢的,这里使用Thread.sleep()模拟它:
public class FutureMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<String> future = new FutureTask<>(new RealData("a"));
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(future);
System.out.println("请求完毕");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("数据 = " + future.get());
}
}
除了基本的功能外,JDK还为Future接口提供了一些简单的控制功能:
boolean cancel(boolean mayInterruptIfRunning); //取消任务
boolean isCancelled(); //是否已经取消
boolean isDone(); //是否已经完成
V get() throws InterruptedException,ExecutionException //取得返回对象
V get(long timeout, TimeUnit unit); //取得返回对象,可以设置超时时间
三、并行流水线
虽然并发算法可以充分发挥多核CPU的性能,但并非所有的计算都可以改造成并发形式。执行过程中有数据相关性的运算都是无法完美并行化的。假如现在有两个数,B和C。如果要计算(B+C)*B/2
,那么这个运算过程就是无法并行的。原因是,如果B+C
没有执行完成,则永远算不出(B+C)*B
,这就是数据相关性。
遇到这种情况,可以将日常生产中的流水线思想应用到程序开发中。虽然(B+C)*B/2
无法并行,但是如果需要计算一大堆B和C,可以将它流水化。首先将计算过程拆分为三个步骤:
P1:A=B+C
P2:D=AxB
P3:D=D/2
上述步骤中P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。此时,P3的计算结果就是最终需要的答案。P1接收B和C的值,并求和,将结果输入P2。P2求乘积后输入给P3。P3将D除以2得到最终值。一旦这条流水线建立,只需要一个计算步骤就可以得到(B+C)*B/2
的结果。为了实现这个功能,需要定义一个在线程间携带结果进行信息交换的载体:
public class Msg {
public double i;
public double j;
public String orgStr = null;
}
P1计算的是加法:
public class Plus implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while(true) {
try {
Msg msg = bq.take();
msg.j = msg.i + msg.j;
Multiply.bq.add(msg);
} catch (InterruptedException e) {
}
}
}
}
上述代码中,P1取得封装了两个操作数的Msg,并进行求和,将结果传递给乘法线程P2。当没有数据需要处理时,P1进行等待。
P2计算乘法:
public class Multiply implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while(true) {
try {
Msg msg = bq.take();
msg.i = msg.i * msg.j;
Div.bq.add(msg);
} catch (InterruptedException e) {
}
}
}
}
P2计算相乘结果后,将中间结果传递给除法线程P3。
P3计算除法:
public class Div implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while(true) {
try {
Msg msg = bq.take();
msg.i = msg.i / 2;
System.out.println(msg.orgStr + "=" + msg.i);
} catch (InterruptedException e) {
}
}
}
}
最后是提交任务的主线程,这里,提交100万个请求,让线程组进行计算:
public class PStreamMain {
public static void main(String[] args) {
new Thread(new Plus()).start();
new Thread(new Multiply()).start();
new Thread(new Div()).start();
long s1 = System.currentTimeMillis();
for(int i=1; i<=1000; i++) {
for(int j=1; j<=1000; j++) {
Msg msg = new Msg();
msg.i = i;
msg.j = j;
msg.orgStr = "((" + i + "+" + j + ")*" + i + ")/2";
Plus.bq.add(msg);
}
}
}
}
上述代码中,将数据提交给P1加法线程,开启流水线的计算。在多核或者分布式场景中,这种设计思路可以有效地将有依赖关系的操作分配在不同的线程中进行计算,尽可能利用多核优势。
四、并行搜索
给定一个数组,我们要查找满足条件的元素。对于串行程序来说,只要遍历一下数组就可以得到结果。但如果要使用并行方式,则需要额外增加一些线程间的通信机制,使各个线程可以有效地运行。一种简单的策略就是将原始数据集合按照期望的线程数进行分割。每个线程各自独立搜索,当其中一个线程找到数据后,立即返回结果即可。
现在假设有一个整数数组,我们需要查找数组内的元素:
static int[] arr;
定义线程池、线程数量以及存放结果的变量result。在result中,我们会保存符合条件的元素在arr数组中的下标。默认为-1,表示没有找到给定元素。
public static final int THREADNUM = 2;
static ExecutorService pool = Executors.newCachedThreadPool();
static AtomicInteger result = new AtomicInteger(-1);
并发搜索会要求每个线程查找arr中的一段,因此,搜索函数必须指定线程需要搜索的起始和结束位置
public static int search(int searchValue,int beginPos,int endPos) {
int i = 0;
for(i=beginPos; i<endPos; i++) {
if(result.get() >= 0) {
return result.get();
}
if(arr[i] == searchValue) {
if(!result.compareAndSet(-1, i)) {
return result.get();
}
return i;
}
}
return -1;
}
上述代码中,首先通过result判断是否已经有其他线程找到了需要的结果。如果已经找到,则立即返回不再进行查找。如果没有找到,则进行下一步搜索。第7行代码成立则表示当前线程找到了需要的数据,那么就会将结果保存到result变量中。这里使用CAS操作,如果设置失败,则表示其他线程已经先我一步找到了结果。因此,可以无视失败的情况,找到结果后,进行返回。
定义一个线程进行查找,它会调用前面的search()方法:
public static class SearchTask implements Callable<Integer> {
int begin,end,searchValue;
public SearchTask(int searchValue,int begin,int end) {
this.begin = begin;
this.end = end;
this.searchValue = searchValue;
}
@Override
public Integer call() throws Exception {
int re = search(searchValue,begin,end);
return re;
}
}
最后是pSearch()并行查找函数,它会根据线程数量对arr数组进行划分,并建立对应的任务提交给线程池处理:
public static int pSearch(int searchValue) throws InterruptedException,ExecutionException {
int subArrSize = arr.length/THREADNUM+1;
List<Future<Integer>> re = new ArrayList<Future<Integer>>();
for(int i=0; i<arr.length; i+=subArrSize) {
int end = i + subArrSize;
if(end>=arr.length) end = arr.length;
re.add(pool.submit(new SearchTask(searchValue, i, end)));
}
for(Future<Integer> fu : re) {
if(fu.get() >= 0) return fu.get();
}
return -1;
}
上述代码中使用了JDK内置的Future模式,其中4~8行将原始数组arr划分为若干段,并根据划分结果建立子任务。每一个子任务都会返回一个Future对象,通过Future对象可以获得线程组得到的最终结果。在这里,由于线程之间通过result共享彼此的信息,因此只要当一个线程成功返回后,其他线程都会立即返回。因此,不会出现由于排在前面的任务长时间无法结束而导致整个搜索结果无法立即获取的情况。
五、并行排序
对于大部分排序操作都是串行执行的,但是当数据量很大时,就需要使用并行排序,但是并行排序的难度很大。下面介绍几种相对简单的并行排序算法。
5.1 分离数据相关性:奇偶交换排序
对于奇偶交换排序来说,它将排序过程分为两个阶段,奇交换和偶交换。对于奇交换来说,它总是比较奇数索引以及其相邻的后续元素。而偶交换总是比较偶数索引和其相邻的后续元素。并且,奇交换和偶交换会成对出现,这样才能保证比较和交换涉及到数组中的每一个元素。
下面是奇偶交换排序的串行实现:
public static void oddEvenSort(int[] arr) {
int exchFlag = 1, start = 0;
while(exchFlag == 1 || start == 1) {
exchFlag = 0;
for(int i=start; i<arr.length-1; i+=2) {
if(arr[i] > arr[i+1]) {
int temp = arr[i];
arr[i] = arr[i+1];
arr[i+1] = temp;
exchFlag = 1;
}
}
if(start == 0)
start = 1;
else
start = 0;
}
}
其中,exchFlag用来记录当前迭代是否发生了数据交换,而start变量用来表示是奇交换还是偶交换。初始时,start为0,表示进行偶交换,每次迭代结束后,切换start的状态。如果上一次比较交换发生了数据交换,或者当前正在进行的是奇交换,循环就不会停止,直到程序不再发生交换,并且当前进行的是偶交换为止(表示奇偶交换已经成对出现)。
并行模式代码如下:
static int arr[];
static int exchFlag = 1;
static final int NUM_ARR = 10000;
static {
arr = new int[NUM_ARR];
for(int i=0; i<NUM_ARR; i++) {
arr[i] = new Random().nextInt(10000);
}
}
static synchronized void setExchFlag(int v) {
exchFlag = v;
}
static synchronized int getExchFlag() {
return exchFlag;
}
public static class OddEvenSortTask implements Runnable {
int i;
CountDownLatch latch;
public OddEvenSortTask(int i, CountDownLatch latch) {
this.i = i;
this.latch = latch;
}
@Override
public void run() {
if(arr[i] > arr[i+1]) {
int temp = arr[i];
arr[i] = arr[i+1];
arr[i+1] = temp;
setExchFlag(1);
}
latch.countDown();
}
}
public static void pOddEventSort() throws InterruptedException {
int start = 0;
ExecutorService pool = Executors.newCachedThreadPool();
while(getExchFlag() == 1 || start == 1) {
setExchFlag(0);
//偶数的数组长度,当start为1时,只有len/2-1个线程
CountDownLatch latch = new CountDownLatch(arr.length/2 - (arr.length%2==0?start:0));
for(int i=start; i<arr.length-1; i+=2) {
pool.submit(new OddEvenSortTask(i, latch));
}
latch.await();
if(start == 0)
start = 1;
else
start = 0;
}
}
上述代码定义了奇偶排序的任务类。该任务的主要工作是进行数据比较和必要交换。并行排序的 主体是pOddEventSort()方法,它使用CountDownLatch记录线程数量,对于每一次迭代,使用单独的线程对每一次元素比较和交换进行操作。在下一次迭代前,必须等待上一次迭代所有线程的完成。
5.2 改进的插入排序:希尔排序
插入排序的基本思想是:一个未排序的数组(或链表)可以分为两个部分,前半部分是已经排序的,后半部分是未排序的。在进行排序时,只需要在未排序的部分选择一个元素,将其插入到前面有序的数组中即可。最终,未排序的部分会越来越少,直到为0,那么排序就完成了。
插入排序的实现如下:
public static void insertSort(int[] arr) {
int length = arr.length;
int j, i, key;
for(int i=1; i<length; i++) {
//key为要准备插入的元素
key = arr[i];
j = i - 1;
while(j>=0 && arr[j]>key) {
arr[j+1] = arr[j];
j--;
}
//找到合适的位置插入key
arr[j+1] = key;
}
}
上述代码第6行,提取要准备插入的元素(也就是未排序序列中的第一个元素)。接着,在已排序队列中找到这个元素的插入位置(第8~10行),并进行插入(第13行)即可。
简单的插入排序是很难并行化的。因为这一次的 数据插入依赖于上一次得到的有序序列,因此多个步骤之间无法并行。为此,可以对插入排序进行扩展,这就是希尔排序。
希尔排序将整个数组根据间隔h分隔为若干个子数组。子数组互相穿插在一起,每一次排序时,分别对每一个子数组进行排序。在每一次排序完成后,可以递减h的值,进行下轮更加精细的排序。直到h为1,此时等价于一次插入排序。
希尔排序的一个主要优点是,即使一个较小的元素在数组的末尾,由于每次元素移动都以h为间隔进行,因此数组末尾的小元素可以在很少的交换次数下,就被换到最接近元素最终位置的地方。
希尔排序的串行实现:
public static void shellSort(int[] arr) {
//计算出最大的h
int h = 1;
while(h<=arr.length/3) {
h = h*3+1;
}
while(h>0) {
for(int i=h; i<arr.length; i++) {
if(arr[i]<arr[i-h]) {
int tmp = arr[i];
int j = i - h;
while(j>=0 && arr[j]>tmp) {
arr[j+h] = arr[j];
j-=h;
}
arr[j+h] = tmp;
}
}
h = (h-1)+3;
}
}
上述代码4~6行,计算一个合适的h值,接着正式进行希尔排序。第8行的for循环进行间隔为h的插入排序,每次排序结束后,递减h的值。直到h为1,退化为插入排序。
希尔排序每次都针对不同的子数组进行排序,各个子数组之间是完全独立的。因此,改写成并行程序:
public class ParallelShellSort {
static int arr[];
static final int ARRNUM = 1000;
static {
arr = new int[ARRNUM];
for (int i = 0; i < ARRNUM; i++) {
arr[i] = new Random().nextInt(1000);
}
}
public static class ShellSortTask implements Runnable {
int i = 0;
int h = 0;
CountDownLatch l;
public ShellSortTask(int i,int h,CountDownLatch latch) {
this.i = i;
this.h = h;
this.l = latch;
}
@Override
public void run() {
if(arr[i] < arr[i-h]) {
int tmp = arr[i];
int j = i - h;
while(j>=0 && arr[j] > tmp) {
arr[j+h] = arr[j];
j -= h;
}
arr[j+h] = tmp;
}
l.countDown();
}
}
public static void pShellSort() throws InterruptedException {
int h = 1;
CountDownLatch latch = null;
ExecutorService pool = Executors.newCachedThreadPool();
while(h<=arr.length/3) {
h = h*3 + 1;
}
while(h>0) {
System.out.println("h=" + h);
if(h>=4)
latch = new CountDownLatch(arr.length-h);
for(int i=h; i<arr.length; i++) {
if(h>=4) {
pool.execute(new ShellSortTask(i, h, latch));
} else {
if(arr[i] < arr[i-h]) {
int tmp = arr[i];
int j = i -h;
while(j>=0 && arr[j]>tmp) {
arr[j+h] = arr[j];
j -= h;
}
arr[j+h] = tmp;
}
}
}
latch.await();
h = (h-1)/3;
}
}
public static void main(String[] args) throws InterruptedException {
pShellSort();
for(int i=0; i<ARRNUM; i++) {
System.out.println(arr[i]);
}
}
}
上述代码中定义ShellSortTask作为并行任务。一个ShellSortTask的作用是根据给定的起始位置和h,对子数组进行排序,因此可以完全并行化。为控制线程数量,这里定义并行主函数pShellSort()在h大于或等于4时使用并行线程,否则则退化为传统的插入排序。每次计算后,递减h的值。