博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm的messageTimeout
阅读量:7113 次
发布时间:2019-06-28

本文共 11400 字,大约阅读时间需要 38 分钟。

本文主要研究一下storm的messageTimeout

TOPOLOGY_MESSAGE_TIMEOUT_SECS

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

/**     * True if Storm should timeout messages or not. Defaults to true. This is meant to be used in unit tests to prevent tuples from being     * accidentally timed out during the test.     */    @isBoolean    public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts";    /**     * The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within     * this time frame, Storm will fail the message on the spout. Some spouts implementations will then replay the message at a later time.     */    @isInteger    @isPositiveNumber    @NotNull    public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";    /**     * How often a tick tuple from the "__system" component and "__tick" stream should be sent to tasks. Meant to be used as a     * component-specific configuration.     */    @isInteger    public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = "topology.tick.tuple.freq.secs";
  • defaults.yaml中topology.enable.message.timeouts默认为true
  • defaults.yaml中topology.message.timeout.secs默认为30
  • defaults.yaml中topology.tick.tuple.freq.secs默认为null,实际是取的topology.message.timeout.secs的值

StormCommon.addAcker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

public static void addAcker(Map
conf, StormTopology topology) { int ackerNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); Map
inputs = ackerInputs(topology); Map
outputStreams = new HashMap
(); outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); Map
ackerConf = new HashMap<>(); ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum); ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf); for (Bolt bolt : topology.get_bolts().values()) { ComponentCommon common = bolt.get_common(); common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val"))); common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); } for (SpoutSpec spout : topology.get_spouts().values()) { ComponentCommon common = spout.get_common(); Map
spoutConf = componentConf(spout); spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); common.set_json_conf(JSONValue.toJSONString(spoutConf)); common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task"))); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping()); } topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker); }
  • storm在addAcker的时候,使用了Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS的值作为Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS

Executor.setupTicks

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

protected void setupTicks(boolean isSpout) {        final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);        if (tickTimeSecs != null) {            boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);            if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))                || (!enableMessageTimeout && isSpout)) {                LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);            } else {                StormTimer timerTask = workerData.getUserTimer();                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,                                            () -> {                                                TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),                                                                                Constants.SYSTEM_COMPONENT_ID,                                                                                (int) Constants.SYSTEM_TASK_ID,                                                                                Constants.SYSTEM_TICK_STREAM_ID);                                                AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);                                                try {                                                    receiveQueue.publish(tickTuple);                                                    receiveQueue.flush(); // avoid buffering                                                } catch (InterruptedException e) {                                                    LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag.");                                                    Thread.currentThread().interrupt();                                                    return;                                                }                                            }                );            }        }    }
  • Executor在setupTicks的时候,使用了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS作为tickTimeSecs,即tickTuple的调度时间间隔
  • 调度tickTuple的前提之一是有开启Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS
  • 该定时任务每隔tickTimeSecs发射一个tickTuple,该tuple的srcComponent设置为Constants.SYSTEM_COMPONENT_ID(__system),taskId设置为Constants.SYSTEM_TASK_ID(-1),streamId设置为Constants.SYSTEM_TICK_STREAM_ID(__tick)

SpoutExecutor.tupleActionFn

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {        String streamId = tuple.getSourceStreamId();        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {            spoutOutputCollector.flush();        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {            pending.rotate();        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();            if (spoutObj instanceof ICredentialsListener) {                ((ICredentialsListener) spoutObj).setCredentials((Map
) tuple.getValue(0)); } } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) { Long id = (Long) tuple.getValue(0); TupleInfo pendingForId = pending.get(id); if (pendingForId != null) { pending.put(id, pendingForId); } } else { Long id = (Long) tuple.getValue(0); Long timeDeltaMs = (Long) tuple.getValue(1); TupleInfo tupleInfo = pending.remove(id); if (tupleInfo != null && tupleInfo.getMessageId() != null) { if (taskId != tupleInfo.getTaskId()) { throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId()); } Long timeDelta = null; if (hasAckers) { long startTimeMs = tupleInfo.getTimestamp(); if (startTimeMs != 0) { timeDelta = timeDeltaMs; } } if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) { ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo); } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) { failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM"); } } } }
  • SpoutExecutor在tupleActionFn方法接收到Constants.SYSTEM_TICK_STREAM_ID的tickTuple的时候,触发pending.rotate()方法

RotatingMap.rotate

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java

public Map
rotate() { Map
dead = _buckets.removeLast(); _buckets.addFirst(new HashMap
()); if (_callback != null) { for (Entry
entry : dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } return dead; }
  • rotate方法会触发expire

SpoutExecutor.RotatingMap.ExpiredCallback

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public void init(final ArrayList
idToTask, int idToTaskBase) { this.threadId = Thread.currentThread().getId(); executorTransfer.initLocalRecvQueues(); while (!stormActive.get()) { Utils.sleep(100); } LOG.info("Opening spout {}:{}", componentId, taskIds); this.idToTask = idToTask; this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); this.spouts = new ArrayList<>(); for (Task task : idToTask) { if (task != null) { this.spouts.add((ISpout) task.getTaskObject()); } } this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback
() { @Override public void expire(Long key, TupleInfo tupleInfo) { Long timeDelta = null; if (tupleInfo.getTimestamp() != 0) { timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); } failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT"); } }); //...... }
  • SpoutExecutor在init的时候注册了pending的RotatingMap.ExpiredCallback,里头对过期的tuple调用failSpoutMsg

小结

  • spout的messageTimeout相关的参数为Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS(topology.enable.message.timeouts默认true)、Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs默认30)
  • StormCommon在addAcker的时候取Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS作为Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS值,作为tickTimeSecs,即ack的tickTuple的调度时间间隔
  • SpoutExecutor在接收到该tickTuple的时候,触发RotatingMap的rotate操作,进行expire回调,而SpoutExecutor在init的时候,注册了RotatingMap.ExpiredCallback,对过期的tuple进行failSpoutMsg操作,回调spout的fail方法,至此完成messageTimeout的功能(对于没有开启可靠(msgId)消息的spout,其pending队列为空,因而这里的rotate以及ExpiredCallback就相当于没有效果)

doc

转载地址:http://jdwel.baihongyu.com/

你可能感兴趣的文章
css加载会造成阻塞吗?
查看>>
聊聊storm TridentWindowManager的pendingTriggers
查看>>
React 解决fetch跨域请求时session失效
查看>>
翻译_只需20行代码创造JavaScript模板引擎(二)
查看>>
Blockchain和Tangle哪一个是未来?
查看>>
apicloud拉起小程序并传递参数
查看>>
虚拟机Centos6.8安装MYSQL5.7,本地Navicat连接
查看>>
简单聊聊DOM
查看>>
【JavaScript】JavaScript Array 对象(数组)
查看>>
github 上有趣又实用的前端项目(持续更新,欢迎补充)
查看>>
opencv python 直方图均衡化
查看>>
HotFrameLearning 热门框架学习(前言)
查看>>
git团队开发流程
查看>>
【Under-the-hood-ReactJS-Part6】React源码解读
查看>>
深入理解css之vertical-align
查看>>
Laravel事件
查看>>
matlab绘制peano(皮亚诺)曲线和koch(科赫曲线,雪花曲线)分形曲线
查看>>
使用pipenv代替virtualenv管理python包
查看>>
Docker零基础入门指南(四):Docker容器使用
查看>>
React 深入系列4:组件的生命周期
查看>>