主要难点在于实现一个KafkaSpout,用于Storm接收从Kafka传来的消息

//发送第一步,加入需要发送列表中

protected void setWaitingToEmit(ConsumerRecords consumerRecords) {

for (TopicPartition tp : consumerRecords.partitions()) {

waitingToEmit.put(tp, new ArrayList<>(consumerRecords.records(tp)));

}

}

protected void emitIfWaitingNotEmitted() {

//从waitingToEmit列表获取将要发送的事件,开始发送

Iterator>> waitingToEmitIter = waitingToEmit.values().iterator();

LOG.info("real event emit begin");

outerLoop:

while (waitingToEmitIter.hasNext()) {

List> waitingToEmitForTp = waitingToEmitIter.next();

while (!waitingToEmitForTp.isEmpty()) {

final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0));

if (emittedTuple) {

LOG.error("event emit has failed");

break outerLoop;

}

}

waitingToEmitIter.remove();

}

}

//实际发送过程

private boolean emitOrRetryTuple(ConsumerRecord record) {

final TopicPartition tp = new TopicPartition(record.topic(), record.partition());

final KafkaSpoutMessageId msgId = retryService.getMessageId(record);

if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked

LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);

} else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail

LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);

} else {

final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);

if (isAtLeastOnceProcessing()

&& committedOffset != null

&& committedOffset.offset() > record.offset()

&& commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {

// Ensures that after a topology with this id is started, the consumer fetch

// position never falls behind the committed offset (STORM-2844)

throw new IllegalStateException("Attempting to emit a message that has already been committed."

+ " This should never occur when using the at-least-once processing guarantee.");

}

final List tuple = kafkaSpoutConfig.getTranslator().apply(record);

if (isEmitTuple(tuple)) {

final boolean isScheduled = retryService.isScheduled(msgId);

// not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried

if (!isScheduled || retryService.isReady(msgId)) {

final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;

if (!isAtLeastOnceProcessing()) {

if (kafkaSpoutConfig.isTupleTrackingEnforced()) {

collector.emit(stream, tuple, msgId);

LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);

} else {

collector.emit(stream, tuple);

LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);

}

} else {

emitted.add(msgId);

offsetManagers.get(tp).addToEmitMsgs(msgId.offset());

if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.

retryService.remove(msgId);

}

collector.emit(stream, tuple, msgId);

tupleListener.onEmit(tuple, msgId);

LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);

}

return true;

}

} else {

/*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately

* to allow its offset to be commited to Kafka*/

LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);

if (isAtLeastOnceProcessing()) {

msgId.setNullTuple(true);

offsetManagers.get(tp).addToEmitMsgs(msgId.offset());

ack(msgId);

}

}

}

return false;

}

相关文章