简介
- 本文主要是讲基于rxjava包装的一个针对集合做并发操作的工具类
- rxjava文档地址
依赖
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version></version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version></version>
</dependency>
支持四类function
- 传入数据List,返回处理之后的List(不保证顺序)
- 传入数据List,返回处理之后的数据Map
- 传入数据List,分组大小,返回处理之后的List(不保证顺序)
- 传入数据List,分组大小,返回处理之后的数据Map
function接口
public interface List2List<I,R> {
List<R> call(List<I> list);
}
public interface List2Map<I, K, V>{
Map<K, V> call(List<I> list);
}
public interface Object2List<I, R> {
List<R> call(I i);
}
public interface Object2Map<I, K, V> {
Map<K, V> call(I i);
}
线程池及工厂方法
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* 功能描述:
* <p>
* </p>
*
* @author : yuanchao.he
* @version 1.0 2016-03-22
* @since mobile-oppkit-server 1.0
*/
public class ObservableHelper {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 5, SECONDS,
new LinkedBlockingQueue<Runnable>());
private ObservableHelper(){}
private static class ClassHolder{
private static ObservableHelper observableHelper = new ObservableHelper();
}
public static ObservableHelper INSTANCE(){
return ClassHolder.observableHelper;
}
public <T> Observable<T> createObservable(final Func0<T> func) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(func.call());
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.from(executor)).cache();
}
}
ObservableExecutor 并行处理器
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 功能描述:
* <p>
* </p>
*
* @author : yuanchao.he
* @version 1.0 2016-03-22
* @since mobile-oppkit-server 1.0
*/
public class ObservableExecutor {
private ObservableHelper helper = ObservableHelper.INSTANCE();
private static class ExecutorHolder {
private static ObservableExecutor executor = new ObservableExecutor();
}
private ObservableExecutor(){}
public static ObservableExecutor INSTANCE() {
return ExecutorHolder.executor;
}
/**
* 传入List, 对List中的每一项分配一条线程做处理,返回Map结构,自己需要实现Object2Map接口
* @param input
* @param func
* @param <I>
* @param <K>
* @param <V>
* @return
*/
public <I, K, V> Map<K, V> executeObservable(List<I> input, final Object2Map<I, K, V> func) {
if(input==null || input.size()==0){
return Collections.emptyMap();
}
Map<K, V> resultMap = Maps.newHashMap();
Observable.from(input).flatMap(new Func1<I, Observable<Map<K, V>>>() {
@Override
public Observable<Map<K, V>> call(final I i) {
return helper.createObservable(new Func0<Map<K, V>>() {
@Override
public Map<K, V> call() {
return func.call(i);
}
}).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
@Override
public Map<K, V> call(Throwable throwable) {
return Collections.emptyMap();
}
});
}
}).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
@Override
public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
acc.putAll(map);
return acc;
}
}).toBlocking().first();
return resultMap;
}
/**
* 传入List,针对List中每一项分配一条线程做处理,返回List结构,自己实现Object2List接口
* @param input
* @param func
* @param <I>
* @param <R>
* @return
*/
public <I, R> List<R> executeObservable(List<I> input, final Object2List<I, R> func) {
if(input==null || input.size()==0){
return Collections.emptyList();
}
List<R> result = Lists.newLinkedList();
Observable.from(input).flatMap(new Func1<I, Observable<List<R>>>() {
@Override
public Observable<List<R>> call(final I i) {
return helper.createObservable(new Func0<List<R>>() {
@Override
public List<R> call() {
return func.call(i);
}
}).onErrorReturn(new Func1<Throwable, List<R>>() {
@Override
public List<R> call(Throwable throwable) {
return Collections.emptyList();
}
});
}
}).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
@Override
public List<R> call(List<R> acc, List<R> list) {
acc.addAll(list);
return acc;
}
}).toBlocking().first();
return result;
}
/**
* 传入List数据,分组大小,对List数据做分组以后,为每一组分配一个线程做处理,返回Map结构,自己实现List2Map接口
* @param input
* @param partitionSize
* @param functionRMap
* @param <I>
* @param <K>
* @param <V>
* @return
*/
public <I, K, V> Map<K, V> executeObservable(List<I> input, int partitionSize,
final List2Map<I, K, V> functionRMap) {
if(input==null || input.size()==0){
return Collections.emptyMap();
}
if (partitionSize <= 0)
partitionSize = 10;
List<List<I>> lists = Lists.partition(input, partitionSize);
Map<K, V> resultMap = Maps.newHashMap();
Observable.from(lists).flatMap(new Func1<List<I>, Observable<Map<K, V>>>() {
@Override
public Observable<Map<K, V>> call(final List<I> list) {
return helper.createObservable(new Func0<Map<K, V>>() {
@Override
public Map<K, V> call() {
return functionRMap.call(list);
}
}).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
@Override
public Map<K, V> call(Throwable throwable) {
return Collections.emptyMap();
}
});
}
}).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
@Override
public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
acc.putAll(map);
return acc;
}
}).toBlocking().first();
return resultMap;
}
/**
* 传入List数据,分组大小,为每一组数据分配一条线程处理,返回List结构数据,自己实现List2List接口
* @param input
* @param partitionSize
* @param func
* @param <I>
* @param <R>
* @return
*/
public <I, R> List<R> executeObservable(List<I> input, int partitionSize,
final List2List<I, R> func) {
if(input==null || input.size()==0){
return Collections.emptyList();
}
List<R> result = Lists.newLinkedList();
if (partitionSize <= 0)
partitionSize = 10;
List<List<I>> partitions = Lists.partition(input, partitionSize);
Observable.from(partitions).flatMap(new Func1<List<I>, Observable<List<R>>>() {
@Override public Observable<List<R>> call(final List<I> is) {
return helper.createObservable(new Func0<List<R>>() {
@Override public List<R> call() {
return func.call(is);
}
}).onErrorReturn(new Func1<Throwable, List<R>>() {
@Override public List<R> call(Throwable throwable) {
return Collections.emptyList();
}
});
}
}).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
@Override public List<R> call(List<R> acc, List<R> list) {
acc.addAll(list);
return acc;
}
}).toBlocking().first();
return result;
}
}
demos
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
/**
* 功能描述:
* <p>
* </p>
*
* @author : yuanchao.he
* @version 1.0 2016-03-23
* @since mobile-oppkit-server 1.0
*/
public class Demo {
private static ObservableExecutor observableExecutor = ObservableExecutor.INSTANCE();
public static void main(String[] args) {
List<Integer> numbers = Lists.newLinkedList();
for(int i=500;i<=1000;i++){
numbers.add(i);
}
Map<Integer, String> result1 = observableExecutor.executeObservable(numbers,
new Object2Map<Integer, Integer, String>() {
@Override
public Map<Integer, String> call(Integer integer) {
Map<Integer, String> map = Maps.newHashMap();
map.put(integer, String.valueOf(integer*integer%1000));
return map;
}
});
System.out.println(result1);
/**
* 将数字转换为字符串,每个线程处理一个数字转换,并以list的结构返回
*/
List<String> result2 = observableExecutor.executeObservable(numbers,
new Object2List<Integer, String>() {
@Override
public List<String> call(Integer integer) {
List<String> result = Lists.newLinkedList();
result.add(String.valueOf(integer*integer%1000));
return result;
}
});
System.out.println(result2);
/**
* 将数字转换为字符串,对 numbers 分组,每组2个元素,每个线程处理一组,并以map结构返回
*/
Map<Integer, String> result3 = observableExecutor.executeObservable(numbers, 2,
new List2Map<Integer, Integer, String>() {
@Override
public Map<Integer, String> call(List<Integer> list) {
Map<Integer, String> map = Maps.newHashMap();
for (Integer integer : list) {
map.put(integer, String.valueOf(integer*integer%1000));
}
return map;
}
});
System.out.println(result3);
/**
* 将数字转换为字符串 对 numbers 分组,每组2个元素,每个线程处理一组,并以list结构返回
*/
List<String> result4 = observableExecutor.executeObservable(numbers, 2,
new List2List<Integer, String>() {
@Override
public List<String> call(List<Integer> list) {
List<String> result = Lists.newLinkedList(Lists.transform(list,
new Function<Integer, String>() {
@Override
public String apply(Integer input) {
return String.valueOf(input*input%1000);
}
}));
return result;
}
});
System.out.println(result4);
}
}