本地文件队列-异步隔离设计

常见的异步方式:

创建异步线程

每个新创建一个线程来执行异步任务,任务结束线程也终止。
线程的创建成本比较大,不建议使用。

使用Queue,producer/consumer方式

在内部创建一个Queue,worker线程直接将异步处理的任务放入queue,一个或多个异步线程从queue中消费并执行任务。

线程池

用线程池来替换每次创建线程,减少线程创建的成本,线程被复用,一次创建多处使用。

和使用Queue类似,也是通过BlockingQueue实现,但策略上更复杂,向线程池提交Callable&Runnable任务,由线程池调度执行。

参考:java.util.concurrent.ThreadPoolExecutor#execute

spring @Async注解

通过注解来来简化了异步编程,只需要在需要异步的方法上使用@Async注解即可。
其本质也是在线程池功能上扩展的,将异步执行方法封装为一个Callable,然后提交给线程池。

org.springframework.aop.interceptor.AsyncExecutionInterceptor:

public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            }
        };

        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

详细参考:http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#scheduling-annotation-support-async

背景和场景

产生的背景

在项目中使用了@Async来执行异步任务,但在线上运行时出现了一次OOM的故障。通过分析发现是,线程池队列设置的比较大,当时的JVM内存给的也比较少(2048M),异步任务方法参数中传了大量的数据,任务执行被后端数据库阻塞(后端数据库变慢),最后导致缓存了大量的数据被放到线程池队列。其实JVM内存配置合适,线程池队列数合适,并配置合适的RejectedExecutionHandler策略。

产生这个组件,1) 旨在替换内存队列的异步方式 2) 用来方便扩展集成分布式MQ

异步隔离

除了上面背景和场景,开发这个组件的另一个初衷就是有效异步隔离和作为一个降级备份方案。
也是主要实现了文件队列方式的一个原因。

当我们使用分布式MQ时,难免分布式MQ宕机或者其他网络等原因导致不能生产消息,或者阻塞影响到本身的业务,出现这种情况时可以降级到本地文件队列。

本地文件队列的优点是速度快,只要文件系统不出问题可以认为不会被阻塞。缺点是本地文件队列生产的消息必须自己来消费,出现故障时消息消费会延迟,文件系统的损坏也会导致消息丢失。主要看使用的姿势,更看重哪一方面了。

基本架构设计思路

采用producer/consumer生产消费设计模式。

参考了@Async思路,定义一个注解@AsyncExecutable, 使用Spring拦截器拦截注解了@AsyncExecutable的方法,可以使用AOP或者BeanPostProcessor来应用拦截器。

producer

拦截器拦截到@AsyncExecutable方法后,将该方法所有的参数和方法信息作为Message,并序列化Message,序列化采用Kryo或者Json,将序列化后的信息放入队列。

class Message {

     String beanName;
     String klassName;
     String methodName;
     Class<?>[] argTypes;
     Object[] args;
     boolean hasTransactional = true;
     
}

consumer

有1个调度主线程和worker线程组成,主线程负责从队列中拉取消息,并分发到worker线程,worker线程采用线程池,使用了spring提供的TaskExecutor。

worker线程反序列化消息为Message对象,并根据Message中的方法信息在spring ApplicationContext中查找到spring 管理的bean,并通过反射来调用。

队列

队列抽象了一个BlockableQueue, 通过BlockableQueue具体实现来扩展,可以是内存,文件,或分布式MQ。

 public interface BlockableQueue<T> {

    String DefaultQueueName = "fileQueue";

    /**
     * push一个消息到队列
     *
     * @param t
     * @return
     */
    boolean offer(T t);

    /**
     * 从队列pop一个消息,如果队列中无可用消息,则阻塞
     *
     * @return
     * @throws InterruptedException
     */
    T take() throws InterruptedException;

    /**
     * 从队列pop一个消息,如果队列中无可用消息,则返回null
     *
     * @return
     */
    T poll();

    /**
     * 队列中消息数量
     *
     * @return
     */
    int size();
}

通用的默认实现:

 
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class DefaultBlockableQueue implements BlockableQueue<byte[]> {
    final Lock lock = new ReentrantLock();
    final Condition notEmpty = lock.newCondition();
    private Queue<byte[]> queue = null;


    public FileBlockableQueue(Queue<byte[]> queue) {
        this.queue = queue;
    }

    @Override
    public boolean offer(byte[] bytes) {
        lock.lock();
        try {
            boolean v = queue.offer(bytes);
            notEmpty.signal();
            return v;
        } finally {
            lock.unlock();
        }
    }


    @Override
    public byte[] take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0) {
                notEmpty.await();
            }
            byte[] bytes = queue.poll();
            return bytes;
        } finally {
            lock.unlock();
        }

    }

    @Override
    public byte[] poll() {
        return queue.poll();
    }

    @Override
    public int size() {
        return queue.size();
    }
}

本文实现了一个文件队列,采用去哪儿文件队列实现,这是一个fork:https://github.com/tietang/fqueue

对编程模型来说不用关心异步细节,只需要在需要异步的方法上注解@AsyncExecutable即可。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容