前言
在开发中如果碰到需要执行一些耗时比较长的任务,但是又要保证任务不能丢失,比如执行过程中由于某种原因app发生了crash,需要在下次app启动后的适当时机重新执行任务的情况,该如何解决?
解决方案
- 假设我们需要在后台执行的任务类型有上传图片的任务A,发送聊天消息的任务B,视频编辑处理的任务C,所有耗时比较久并且重要性很高,需要保证不能丢失的任务都可以抽象为一个TaskModel。这个TaskModel保留了任务执行需要的基本信息,以及用于队列管理的必要属性。方案的主要思路就是维护一个用于管理TaskModel的串行任务队列TaskQueueService(后面简称为TaskQueue),这个TaskQueue结合数据库对外提供管理TaskModel的各类方法,比如AddTask等,在一个任务需要被执行时,就会通过TaskQueue的add方法添加到队列中,并且同时将TaskModel持久化到数据库中,保证任务不会丢失。然后TaskQueue根据自己的某种规则从数据库中取出TaskModel,对任务进行调度执行。
类结构说明
-
我们抽象出任务的基类为TaskModel,需要执行的具体任务类型A、B、C为继承TaskModel的类TaskModelA、TaskModelB、TaskModelC,然后在各自的类中添加业务需要的基本信息,以及根据需要重写基类的方法。TaskModel结构如下类图所示:
taskID为自增Id,当task被添加到队列,写入数据库中时会自动加1。
status表示任务执行的状态,是一个枚举值,枚举类型分为init、running、suspend、finish、fail、remove六种状态,在taskmodel刚被写入数据中时是init的状态,在taskModel执行失败后会将状态置为fail。
priority表示Task的优先级,默认值为low,在taskQueue中可以根据task的优先级对高优先级的任务优先调度执行。
customID用于对task做某种标记,方便从数据库查询。
className存储具体任务的类型如TaskModelA。
data内存储TaskModel归档后的data数据
runCount用于存储task执行的次数,可以用于控制重试次数
run()方法是子类继承TaskModel后必须实现的方法,里面写任务执行的逻辑
prepareForAddToQueue()方法是在run之前会执行的方法,如果重写了该方法,那么在taskQueue执行task之前会先调用prepare方法,返回为yes才会继续执行task的run方法,prepare方法可以用于对task的一些校验。
retryNextTime()方法通过调用taskQueue的retryTask方法将自身的status重置为init状态,那么在taskQueue执行接下来的任务时,就会重新执行到该task。
suspend()方法是将自身的status改为suspend挂起状态,那么taskQueue在执行接下来的task时,由于只会取status为init的task执行,就不会执行到suspend状态的task。当需要重新执行已挂起的task时,调用retry方法就可以重新将该task添加到执行队列中。
-
TaskQueueService是我们维护的任务队列,它是一个单例对象。TaskQueueService的结构如下所示:
taskSignal用于在task执行改变状态时对外发送信号,在对应的controller中可以通过taskSignal传出的task做一些ui或者业务逻辑。
runningTask表示当前队列正在执行的taskModel
suspendTasks保存了所有被挂起的taskModel
- runNextTask()方法为TaskQueue最核心的方法,该方法中用异步执行通过某种规则从数据库中取出的最合适的taskModel。
架构图
- 如上图所示,假设所有的task都为相同优先级,TaskQueue的runNextTask方法中取从数据库筛选出来status为init状态并且根据taskId排序,拿到最先进入的一个task去执行。在task执行完成后,又会触发runNextTask方法,从而继续从数据库读取下一个最合适的taskModel去执行任务。
实现
TaskModel
- 首先TaskModel是需要存储在数据库中的,结合上一章WCDB的使用,我们将TaskModel继承自RSModel类,以支持数据库写入。TaskModel的.h文件十分简洁,主要是一些关键属性的暴露和task操作方法的抽象:
#import "RSModel.h"
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelStatus) {
RSTaskQueueTaskModelStatusInit,
RSTaskQueueTaskModelStatusRunning,
RSTaskQueueTaskModelStatusSuspend,
RSTaskQueueTaskModelStatusFinish,
RSTaskQueueTaskModelStatusFail,
RSTaskQueueTaskModelStatusRemove,
};
typedef NS_ENUM(NSInteger ,RSTaskQueueTaskModelPriority) {
RSTaskQueueTaskModelPriorityLow,
RSTaskQueueTaskModelPriorityMiddle,
RSTaskQueueTaskModelPriorityHigh,
};
@interface RSTaskQueueTaskModel : RSModel
@property (nonatomic, assign) NSInteger taskId;
@property (nonatomic, assign) RSTaskQueueTaskModelStatus status;
@property (nonatomic, assign) RSTaskQueueTaskModelPriority priority;
@property (nonatomic, strong) NSString *customId;
@property (nonatomic, strong) NSString *customType;
@property (nonatomic, strong) NSString *className;
@property (nonatomic, strong) NSData *data;
@property (nonatomic, assign) NSInteger runCount;
-(void)run;//任务运行主入口,子类需要实现它
-(void)suspend;//执行异步操作时调用,可以挂起当前任务,防止任务队列被阻塞
-(void)pop;//任务执行成功需要显式调用,将任务从队列中移除
-(void)fail;//任务执行失败需要显式调用, 修改任务的状态
-(void)retryNextTime;//重新入队,等待重试
-(BOOL)prepareForAddToQueue;//任务被插入任务队列前,自动调用,子类可以重载它。返回失败则任务不用被加入任务队列
-(BOOL)remove;//从任务队列中移除任务
@end
TaskModel的.m文件需要定义类文件中绑定到数据库表的字段以及主键的设置、默认值的设置以及约束等。
#import "RSTaskQueueTaskModel+WCTTableCoding.h"
#import "RSTaskQueueTaskModel.h"
#import <WCDB/WCDB.h>
#import "RSDBService.h"
#import "RSTaskQueueService.h"
#import <YYModel.h>
@implementation RSTaskQueueTaskModel
WCDB_IMPLEMENTATION(RSTaskQueueTaskModel)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, taskId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customId)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, customType)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, className)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, data)
WCDB_SYNTHESIZE(RSTaskQueueTaskModel, runCount)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, status, RSTaskQueueTaskModelStatusInit)
WCDB_SYNTHESIZE_DEFAULT(RSTaskQueueTaskModel, priority, RSTaskQueueTaskModelPriorityLow)
WCDB_PRIMARY_ASC_AUTO_INCREMENT(RSTaskQueueTaskModel, taskId)
//设置status和priority的默认值,以及taskId自增
-(instancetype)init {
self = [super init];
if (self) {
static dispatch_once_t token;
dispatch_once(&token, ^{
[RSTaskQueueTaskModel createDBTable];
});
self.isAutoIncrement = YES;
self.runCount = 0;
}
return self;
}
-(instancetype)initWithCoder:(NSCoder *)aDecoder {
self = [super initWithCoder:aDecoder];
if (self) {
}
return self;
}
+(void)createDBTable {
if ([[RSDBService db] createTableAndIndexesOfName:NSStringFromClass([RSTaskQueueTaskModel class]) withClass:[RSTaskQueueTaskModel class]]) {
NSLog(@"creat table RSTaskQueueTaskModel success");
} else {
NSLog(@"creat table RSTaskQueueTaskModel fail");
}
}
-(BOOL)prepareForAddToQueue {
self.className = NSStringFromClass([self class]);
self.data = [self yy_modelToJSONData];
//这里使用YYModel将model转换为data存入数据库
return YES;
}
-(void)run {
}
-(void)suspend {
[[RSTaskQueueService shareInstance] suspendTask:self];
}
-(void)pop {
[[RSTaskQueueService shareInstance] popTask:self];
}
-(void)retryNextTime {
[[RSTaskQueueService shareInstance] retryTask:self];
}
-(void)fail {
[[RSTaskQueueService shareInstance] failTask:self];
}
-(BOOL)remove {
return [[RSTaskQueueService shareInstance] removeTask:self];
}
//以上的操作方法实现实际上就是调用TaskQueue的方法,传入参数为本身,所以重点还是在TaskQueue对任务的调度。TaskModel只是一个支持写入数据库的对Task抽象的Model
@end
TaskQueueService
TaskQueueService的.h基本和类图中的属性方法一致,按照实际需求多写了几个从数据库查询taskModel的接口。
#import <Foundation/Foundation.h>
#import "RSTaskQueueTaskModel.h"
@interface RSTaskQueueService : NSObject
@property (nonatomic, strong) RACSubject *taskSignal;
@property (nonatomic, strong) RSTaskQueueTaskModel *runingTask;
@property (nonatomic, strong) NSMutableArray<RSTaskQueueTaskModel *> *suspendTasks;
+(RSTaskQueueService *)shareInstance;
-(BOOL)addTask:(RSTaskQueueTaskModel *)task;
-(BOOL)retryTask:(RSTaskQueueTaskModel *)task;
-(void)runNextTask;
-(void)popTask:(RSTaskQueueTaskModel *)task;
-(BOOL)failTask:(RSTaskQueueTaskModel *)task;
-(BOOL)removeTask:(RSTaskQueueTaskModel *)task;
//-(void)finishTaskWaitForNextTime:(RSTaskQueueTaskModel *)task;
-(BOOL)suspendTask:(RSTaskQueueTaskModel *)task;
-(void)resetAllTasks;
-(void)retryAllFailTasks;
- (BOOL)deleteTaskWithClassName:(NSString *)className customId:(NSString*)customId;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className customId:(NSString*)customId;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className status:(RSTaskQueueTaskModelStatus)status;
-(NSArray<RSTaskQueueTaskModel *>*)getTasksClassName:(NSString *)className;
-(BOOL)isAllTaskFinish;
@end
在.m中,需要实现taskQueue的单例初始化方法,通过调用[RSTaskQueueService shareInstance]来获取taskQueue。
+(RSTaskQueueService *)shareInstance {
static dispatch_once_t once;
static id sharedInstance;
dispatch_once(&once, ^{
sharedInstance = [[RSTaskQueueService alloc] init];
});
return sharedInstance;
}
-(instancetype)init {
self = [super init];
if (self) {
self.runingTask = nil;
self.taskSignal = [RACSubject subject];
self.suspendTasks = [[NSMutableArray alloc] init];
}
return self;
}
在一个任务TaskModel需要被执行是,会调用TaskQueue的addTask方法,如果该任务是符合执行规定的,那么就会将任务写入TaskModel数据库,同时对外发送一个taskSignal信号,以同步当前task的状态,然后调用runNextTask方法去做任务的调度执行。
-(BOOL)addTask:(RSTaskQueueTaskModel *)task {
if ([task prepareForAddToQueue]) {
@synchronized(self) {
BOOL result = [[RSDBService db] insertObject:task into:NSStringFromClass([RSTaskQueueTaskModel class])];
if (result) {
[self.taskSignal sendNext:task];
[self runNextTask];
}
return result;
}
}
return NO;
}
在runNextTask方法中,首先是开辟了一个异步线程,然后判断是否有task正在执行,如果有的话就返回,没有的话从数据库中读取最合适的taskModel来跑。如何判断是否是最合适的筛选条件,也就是task调度的方法了。在这里选取的是status为init并且taskId最小,也就是最先加入的任务来执行。
-(void)runNextTask {
[RSUtils dispatch_async_background:^{
@synchronized(self) {
if (self.runingTask) {
//有任务在执行中
return;
}
NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusInit orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
if ([tmp count] > 0) {
RSLogInfo(@"RSTaskQueueService task count:%ld", tmp.count);
RSTaskQueueTaskModel *task = [tmp firstObject];
Class clazz = NSClassFromString(task.className);
RSTaskQueueTaskModel * object = [[clazz alloc] init];
[object yy_modelSetWithJSON:task.data];
[object setTaskId:task.taskId];
[object setStatus:RSTaskQueueTaskModelStatusRunning];
object.runCount = object.runCount + 1;
BOOL dbResult = [[RSDBService db] updateRowsInTable:DBTableName(RSTaskQueueTaskModel) onProperties:{RSTaskQueueTaskModel.status, RSTaskQueueTaskModel.runCount} withObject:object where:RSTaskQueueTaskModel.taskId==object.taskId];
if (dbResult) {
self.runingTask = object;
[self.taskSignal sendNext:object];
RSLogInfo(@"RSTaskQueueService run taskId:%d", object.taskId);
[object run];
} else {
RSLogError(@"RSTaskQueueService update task status fail taskId:%d", object.taskId);
}
} else {
RSLogInfo(@"RSTaskQueueService is empty");
}
}
}];
}
resetAllTasks是将所有未完成的任务重置的方法,主要用于app初始化的时候调用,以重置之前未执行成功的tasks。
-(void)resetAllTasks {
@synchronized(self) {
self.runingTask = nil;
[self.suspendTasks removeAllObjects];
RSTaskQueueTaskModel *task = [RSTaskQueueTaskModel new];
task.status = RSTaskQueueTaskModelStatusInit;
[[RSDBService db] updateRowsInTable:NSStringFromClass([RSTaskQueueTaskModel class]) onProperty:RSTaskQueueTaskModel.status withObject:task where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusRunning||RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusSuspend];
}
}
retryAllFailTasks方法将所有执行失败的task重新执行,原理还是将status状态由fail改为init状态写入数据库,下一次taskQueue开始run的时候就会考虑执行到这些task了。
-(void)retryAllFailTasks {
NSArray *tmp = [[RSDBService db] getObjectsOfClass:[RSTaskQueueTaskModel class] fromTable:NSStringFromClass([RSTaskQueueTaskModel class]) where:RSTaskQueueTaskModel.status==RSTaskQueueTaskModelStatusFail orderBy:RSTaskQueueTaskModel.taskId.order(WCTOrderedAscending)];
for (RSTaskQueueTaskModel *task in tmp) {
Class clazz = NSClassFromString(task.className);
RSTaskQueueTaskModel * object = [[clazz alloc] init];
[object yy_modelSetWithJSON:task.data];
[object setTaskId:task.taskId];
[object setStatus:task.status];
[object retryNextTime];
}
}
其他的方法就不列出了,代码都差不多,核心点还是在于结合数据库修改taskModel的状态,然后taskQueue在执行runNextTask的时候就会自动执行最合适的task。
结束
其实整个方案的架构并不复杂,通俗易懂,主要是在于实现以及解决应用中的问题。方案是可以根据需求随时修改的,没有最好的方案,只有最合适的方案。当然文中的taskQueueService还有更多优化的空间,有不合理的地方大家可以提出,一起交流,共同进步。