序
本文主要研究一下storm trident spout的_maxTransactionActive
MasterBatchCoordinator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java
TreeMap<Long, TransactionStatus> _activeTx = new TreeMap<Long, TransactionStatus>();
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
for(String spoutId: _managedSpoutIds) {
_states.add(TransactionalState.newCoordinatorState(conf, spoutId));
}
_currTransaction = getStoredCurrTransaction();
_collector = collector;
Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
if(active==null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = active.intValue();
}
_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
for(int i=0; i<_spouts.size(); i++) {
String txId = _managedSpoutIds.get(i);
_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
}
LOG.debug("Opened {}", this);
}
public void nextTuple() {
sync();
}
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
}
if(_active) {
if(_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if(!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if(attemptId==null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for(TransactionalState state: _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
_activeTx.put(curr, newTransactionStatus);
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
}
}
private static class TransactionStatus {
TransactionAttempt attempt;
AttemptStatus status;
public TransactionStatus(TransactionAttempt attempt) {
this.attempt = attempt;
this.status = AttemptStatus.PROCESSING;
}
@Override
public String toString() {
return attempt.toString() + " <" + status.toString() + ">";
}
}
private static enum AttemptStatus {
PROCESSING,
PROCESSED,
COMMITTING
}
- MasterBatchCoordinator在open方法对_maxTransactionActive进行设置,从Config.TOPOLOGY_MAX_SPOUT_PENDING(
topology.max.spout.pending
),配置文件默认为null,这里在该值为null时设置_maxTransactionActive为1 - nextTuple这里对同时处理的batches的数量进行了控制,只有_activeTx中的batches处理成功或失败之后才能继续下一个batch
- _activeTx是一个treeMap,它以transactionId为key,value是TransactionStatus,它里头包含了TransactionAttempt及AttemptStatus;AttemptStatus有三种状态,分别是PROCESSING、PROCESSED、COMMITTING
TridentSpoutCoordinator
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java
RotatingTransactionalState _state;
public void prepare(Map conf, TopologyContext context) {
_coord = _spout.getCoordinator(_id, conf, context);
_underlyingState = TransactionalState.newCoordinatorState(conf, _id);
_state = new RotatingTransactionalState(_underlyingState, META_DIR);
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
_state.cleanupBefore(attempt.getTransactionId());
_coord.success(attempt.getTransactionId());
} else {
long txid = attempt.getTransactionId();
Object prevMeta = _state.getPreviousState(txid);
Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
_state.overrideState(txid, meta);
collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
}
}
- TridentSpoutCoordinator的execute方法按txid来存取meta,之后往TridentBoltExecutor发射Values(attempt, meta)
TridentBoltExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
RotatingMap<Object, TrackedBatch> _batches;
public void execute(Tuple tuple) {
if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
_lastRotate = now;
}
return;
}
String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
if(batchGroup==null) {
// this is so we can do things like have simple DRPC that doesn't need to use batch processing
_coordCollector.setCurrBatch(null);
_bolt.execute(null, tuple);
_collector.ack(tuple);
return;
}
IBatchID id = (IBatchID) tuple.getValue(0);
//get transaction id
//if it already exists and attempt id is greater than the attempt there
TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
// System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
// + " (" + _batches.size() + ")" +
// "\ntuple: " + tuple +
// "\nwith tracked " + tracked +
// "\nwith id " + id +
// "\nwith group " + batchGroup
// + "\n");
//
// }
//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());
// this code here ensures that only one attempt is ever tracked for a batch, so when
// failures happen you don't get an explosion in memory usage in the tasks
if(tracked!=null) {
if(id.getAttemptId() > tracked.attemptId) {
_batches.remove(id.getId());
tracked = null;
} else if(id.getAttemptId() < tracked.attemptId) {
// no reason to try to execute a previous attempt than we've already seen
return;
}
}
if(tracked==null) {
tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
_batches.put(id.getId(), tracked);
}
_coordCollector.setCurrBatch(tracked);
//System.out.println("TRACKED: " + tracked + " " + tuple);
TupleType t = getTupleType(tuple, tracked);
if(t==TupleType.COMMIT) {
tracked.receivedCommit = true;
checkFinish(tracked, tuple, t);
} else if(t==TupleType.COORD) {
int count = tuple.getInteger(1);
tracked.reportedTasks++;
tracked.expectedTupleCount+=count;
checkFinish(tracked, tuple, t);
} else {
tracked.receivedTuples++;
boolean success = true;
try {
_bolt.execute(tracked.info, tuple);
if(tracked.condition.expectedTaskReports==0) {
success = finishBatch(tracked, tuple);
}
} catch(FailedException e) {
failBatch(tracked, e);
}
if(success) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
}
_coordCollector.setCurrBatch(null);
}
public static class TrackedBatch {
int attemptId;
BatchInfo info;
CoordCondition condition;
int reportedTasks = 0;
int expectedTupleCount = 0;
int receivedTuples = 0;
Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
boolean failed = false;
boolean receivedCommit;
Tuple delayedAck = null;
public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) {
this.info = info;
this.condition = condition;
this.attemptId = attemptId;
receivedCommit = condition.commitStream == null;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}
private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {
if(tracked.failed) {
failBatch(tracked);
_collector.fail(tuple);
return;
}
CoordCondition cond = tracked.condition;
boolean delayed = tracked.delayedAck==null &&
(cond.commitStream!=null && type==TupleType.COMMIT
|| cond.commitStream==null);
if(delayed) {
tracked.delayedAck = tuple;
}
boolean failed = false;
if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {
if(tracked.receivedTuples == tracked.expectedTupleCount) {
finishBatch(tracked, tuple);
} else {
//TODO: add logging that not all tuples were received
failBatch(tracked);
_collector.fail(tuple);
failed = true;
}
}
if(!delayed && !failed) {
_collector.ack(tuple);
}
}
private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
boolean success = true;
try {
_bolt.finishBatch(tracked.info);
String stream = COORD_STREAM(tracked.info.batchGroup);
for(Integer task: tracked.condition.targetTasks) {
_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
}
if(tracked.delayedAck!=null) {
_collector.ack(tracked.delayedAck);
tracked.delayedAck = null;
}
} catch(FailedException e) {
failBatch(tracked, e);
success = false;
}
_batches.remove(tracked.info.batchId.getId());
return success;
}
private void failBatch(TrackedBatch tracked, FailedException e) {
if(e!=null && e instanceof ReportedFailedException) {
_collector.reportError(e);
}
tracked.failed = true;
if(tracked.delayedAck!=null) {
_collector.fail(tracked.delayedAck);
tracked.delayedAck = null;
}
}
- TridentBoltExecutor使用RotatingMap(
_batches
)来存放batch的信息,key为txid,而valute为TrackedBatch - 在调用_bolt.execute(tracked.info, tuple)方法时,传递了BatchInfo,它里头的state值为_bolt.initBatchState(batchGroup, id),通过_bolt的initBatchState得来的,这是在第一次_batches里头没有该txid信息的时候,第一次创建的时候调用
- 这里的checkFinish也是根据batch对应的TrackedBatch信息来进行判断的;finishBatch的时候会调用_bolt.finishBatch(tracked.info),传递batchInfo过去;failBatch也是对batch对应的TrackedBatch进行操作
BatchInfo
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/BatchInfo.java
public class BatchInfo {
public IBatchID batchId;
public Object state;
public String batchGroup;
public BatchInfo(String batchGroup, IBatchID batchId, Object state) {
this.batchGroup = batchGroup;
this.batchId = batchId;
this.state = state;
}
}
- BatchInfo里头包含了batchId,state以及batchGroup信息
TridentSpoutExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
public Object initBatchState(String batchGroup, Object batchId) {
return null;
}
public void execute(BatchInfo info, Tuple input) {
// there won't be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
} else {
throw new FailedException("Received commit for different transaction attempt");
}
} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
// valid to delete before what's been committed since
// those batches will never be accessed again
_activeBatches.headMap(attempt.getTransactionId()).clear();
_emitter.success(attempt);
} else {
_collector.setBatch(info.batchId);
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeBatches.put(attempt.getTransactionId(), attempt);
}
}
public void finishBatch(BatchInfo batchInfo) {
}
- TridentSpoutExecutor的execute方法,也是根据txid来区分各自batch的信息
SubtopologyBolt
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java
public Object initBatchState(String batchGroup, Object batchId) {
ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
p.startBatch(ret);
}
return ret;
}
public void execute(BatchInfo batchInfo, Tuple tuple) {
String sourceStream = tuple.getSourceStreamId();
InitialReceiver ir = _roots.get(sourceStream);
if(ir==null) {
throw new RuntimeException("Received unexpected tuple " + tuple.toString());
}
ir.receive((ProcessorContext) batchInfo.state, tuple);
}
public void finishBatch(BatchInfo batchInfo) {
for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
p.finishBatch((ProcessorContext) batchInfo.state);
}
}
protected static class InitialReceiver {
List<TridentProcessor> _receivers = new ArrayList<>();
RootFactory _factory;
ProjectionFactory _project;
String _stream;
public InitialReceiver(String stream, Fields allFields) {
// TODO: don't want to project for non-batch bolts...???
// how to distinguish "batch" streams from non-batch streams?
_stream = stream;
_factory = new RootFactory(allFields);
List<String> projected = new ArrayList<>(allFields.toList());
projected.remove(0);
_project = new ProjectionFactory(_factory, new Fields(projected));
}
public void receive(ProcessorContext context, Tuple tuple) {
TridentTuple t = _project.create(_factory.create(tuple));
for(TridentProcessor r: _receivers) {
r.execute(context, _stream, t);
}
}
public void addReceiver(TridentProcessor p) {
_receivers.add(p);
}
public Factory getOutputFactory() {
return _project;
}
}
- SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的
- execute方法使用的是各自batch的ProcessorContext(
batchInfo.state
),调用TridentProcessor的execute方法,使用的是各自batch的ProcessorContext - finishBatch方法也一样,将(ProcessorContext) batchInfo.state传递给TridentProcessor.finishBatch
AggregateProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AggregateProcessor.java
public void startBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
}
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
_collector.setContext(processorContext);
_agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
}
public void finishBatch(ProcessorContext processorContext) {
_collector.setContext(processorContext);
_agg.complete(processorContext.state[_context.getStateIndex()], _collector);
}
- AggregateProcessor的startBatch、execute、finishBatch方法都使用了ProcessorContext的state,而该ProcessorContext从SubtopologyBolt传递过来的就是区分batch的
EachProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/EachProcessor.java
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
List<Factory> parents = tridentContext.getParentTupleFactories();
if(parents.size()!=1) {
throw new RuntimeException("Each operation can only have one parent");
}
_context = tridentContext;
_collector = new AppendCollector(tridentContext);
_projection = new ProjectionFactory(parents.get(0), _inputFields);
_function.prepare(conf, new TridentOperationContext(context, _projection));
}
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
_collector.setContext(processorContext, tuple);
_function.execute(_projection.create(tuple), _collector);
}
public void startBatch(ProcessorContext processorContext) {
}
public void finishBatch(ProcessorContext processorContext) {
}
- EachProcessor则是将ProcessorContext设置到_collector,然后调用_function.execute的时候,将_collector传递过去;这里的_collector为AppendCollector
AppendCollector
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AppendCollector.java
public class AppendCollector implements TridentCollector {
OperationOutputFactory _factory;
TridentContext _triContext;
TridentTuple tuple;
ProcessorContext context;
public AppendCollector(TridentContext context) {
_triContext = context;
_factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());
}
public void setContext(ProcessorContext pc, TridentTuple t) {
this.context = pc;
this.tuple = t;
}
@Override
public void emit(List<Object> values) {
TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);
for(TupleReceiver r: _triContext.getReceivers()) {
r.execute(context, _triContext.getOutStreamId(), toEmit);
}
}
@Override
public void reportError(Throwable t) {
_triContext.getDelegateCollector().reportError(t);
}
public Factory getOutputFactory() {
return _factory;
}
}
- 当_function.execute使用AppendCollector进行emit的时候,AppendCollector会将这些tuple交给TupleReceiver去处理,而传递过去的context为EachProcessor设置的ProcessorContext,即每个batch自己的ProcessorContext;TupleReceiver的execute方法可能对ProcessorContext进行存取,这个也是batch维度的,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中
小结
- storm的trident使用[id,count]数据来告诉下游的TridentBoltExecutor来结束一个batch;而TridentBoltExecutor在接收[id,count]数据的时候,会先判断tracked.reportedTasks是否等于cond.expectedTaskReports(
这个在上游的TridentBoltExecutor的parallelism大于1的时候用来聚合这些task的数据
),相等之后再判断tracked.receivedTuples是否等于tracked.expectedTupleCount,相等才能进行finishBatch - storm的trident spout的_maxTransactionActive参数根据Config.TOPOLOGY_MAX_SPOUT_PENDING(
topology.max.spout.pending
)进行设置,配置文件默认为null,在该值为null时_maxTransactionActive为1 - MasterBatchCoordinator对同时处理的batches的数量进行了控制,只有_activeTx中的batches处理成功或失败之后才能继续下一个batch;而当并行有多个_activeTx的时候,下游的TridentBoltExecutor也能够区分batch来进行处理,不会造成混乱;比如SubtopologyBolt在initBatchState的时候,创建ProcessorContext的也是带有batchId的标识,这样子不同的batch并行的话,它们的ProcessorContext也是区分开来的;SubtopologyBolt里头调用的TridentProcessor有的会使用ProcessorContext来存储结果,比如AggregateProcessor将聚合结果存放到自己batch的processorContext.state中