并发编程中,经常会遇到资源竞争问题,而保持竞争资源的正确使用,可以通过锁的方式,但synchronized block对性能的影响很大,本文要说的 ** 无锁环形队列 ** 通过利用CAS特性,可以使锁的力度降到最低。
下面实例代码分为三部分
- RingBuffer 环形队列
- ProductThread 生产者线程
- ConsumeThread 消费者线程
** RingBuffer.java **
package com.nextbang.queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by nextbang on 17/1/23.
*/
public class RingBuffer {
private static AtomicInteger productIndex = new AtomicInteger(0);
private static AtomicInteger consumeIndex = new AtomicInteger(0);
private static int MAX_LENGTH = 4;
private static String[] dataArr = new String[MAX_LENGTH];
private static final int maxSpinNums = 1000;
public static boolean write(String content) {
// 获取写入位置
int oldWriteIndex = productIndex.get();
// 追上一圈则写入失败
if ((oldWriteIndex - consumeIndex.get()) >= MAX_LENGTH) {
return false;
}
int indexAfterWrite = oldWriteIndex + 1;
if (indexAfterWrite > Integer.MAX_VALUE) {
indexAfterWrite = 0;
}
int spinNums = 0;
for (;;) {
if (spinNums++ >= maxSpinNums) {
return false;
}
if (productIndex.compareAndSet(oldWriteIndex, indexAfterWrite)) {
dataArr[oldWriteIndex & (MAX_LENGTH - 1)] = content;
break;
}
}
return true;
}
public static String read() {
// 获取读取位置
int oldReadIndex = consumeIndex.get();
if (productIndex.get() <= oldReadIndex) {
return null;
}
int indexAfterRead = oldReadIndex + 1;
if (indexAfterRead > Integer.MAX_VALUE) {
indexAfterRead = 0;
}
for (;;) {
if (consumeIndex.compareAndSet(oldReadIndex, indexAfterRead)) {
return dataArr[oldReadIndex & (MAX_LENGTH - 1)];
}
}
}
public static void main(String[] args) {
System.out.println(RingBuffer.write("a"));
System.out.println(RingBuffer.read());
}
}
** ProductThread **
package com.nextbang.queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by nextbang on 17/1/23.
*/
public class ProductThread extends Thread{
private String content=null;
public ProductThread(String content){
this.content = content;
}
public void run(){
System.out.println(RingBuffer.write(content));
}
public static void main(String[] args){
ExecutorService executorService = Executors.newFixedThreadPool(3);
for(int i=0;i<10;i++){
executorService.execute(new ProductThread("" + i));
}
}
}
** ConsumeThread **
package com.nextbang.queue;
import com.sun.tools.internal.xjc.reader.Ring;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by nextbang on 17/1/23.
*/
public class ConsumeThread extends Thread{
public void run(){
System.out.println(RingBuffer.read());
}
public static void main(String[] args){
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new ConsumeThread());
}
}
代码github地址
https://github.com/nextbang/unlock_queue
参考文章
http://coolshell.cn/articles/8239.html
http://hellosure.iteye.com/blog/1126541
http://shanshanpt.github.io/2016/11/01/disruptor.html