博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Azkaban的线程系列 37:QueueProcessor线程的任务处理&executor存活监控
阅读量:6454 次
发布时间:2019-06-23

本文共 3773 字,大约阅读时间需要 12 分钟。

hot3.png

有个线程,AzkabanWebServer-QueueProcessor-Thread.

下面分析下这个线程到底干嘛的!!!

====================================================================================

stop in azkaban.executor.ExecutorManager$QueueProcessorThread.run

====================================================================================

public void run() {

// Loops till QueueProcessorThread is shutdown

while (!shutdown) {

// 一直循环

synchronized (this) {

//

try {

// start processing queue if active, other wait for

// sometime

if (isActive) {

// 开始处理任务

processQueuedFlows(activeExecutorRefreshWindowInMilisec,

activeExecutorRefreshWindowInFlows);

}

wait(QUEUE_PROCESSOR_WAIT_IN_MS);// 等待1秒

} catch (Exception e) {

logger.error("QueueProcessorThread Interrupted. Probably to shut down.", e);

}

}

}

}

所以接下来要看这个processQueuedFlows干嘛的?

====================================================================================

// process flow with current snapshot of activeExecutors

selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));

从这里来看,就是把任务分发到具体的executor上,

不过这个函数其实还做了其它的事情!那就是executor的存活监控

// if we have dispatched more than maxContinuousFlowProcessed or

// It has been more then activeExecutorsRefreshWindow millisec

// since we

// refreshed

// 满足上面的条件就探测executor的存活性

if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow

|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {

// Refresh executorInfo for all activeExecutors

refreshExecutors();

lastExecutorRefreshTime = currentTime;

currentContinuousFlowProcessed = 0;

}

所以接下来,我们来看看refreshExecutors的代码实现!

===================================================================================

jdb azkaban.webapp.AzkabanWebServer -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in  azkaban.executor.ExecutorManager.refreshExecutors

==============================================================================

其中刷新周期

azkProps.getLong(AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),

private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS = "azkaban.activeexecutor.refresh.milisecinterval";

默认为50秒刷1次更新请求

其实是通过一个线程池来跑的,

Future<String> fetchExecutionInfo = executorInforRefresherService.submit(new Callable<String>() {

@Override

public String call() throws Exception {

return callExecutorForJsonString(executor.getHost(), executor.getPort(), "/serverStatistics",

null);

}

});

那么,这个线程池的个数呢?

executorInforRefresherService = Executors

.newFixedThreadPool(azkProps.getInt(AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));

private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS = "azkaban.executorinfo.refresh.maxThreads";

默认就是5个了,好,回来看怎么执行!

==============================================================================

发出HTTP请求 /serverStatistics

然后结果更新到executor,所以重点就是 /serverStatistics在executor中的执行过程!

==============================================================================

root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");

jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in azkaban.execapp.ServerStatisticsServlet.doGet

stop in azkaban.execapp.ServerStatisticsServlet.populateStatistics

run

其实就是生成本地的信息汇总

1)

fillRemainingMemoryPercent(stats);

azkaban.execapp.ServerStatisticsServlet.fillRemainingMemoryPercent

 

2)

fillRemainingFlowCapacityAndLastDispatchedTime(stats);

stop in azkaban.execapp.ServerStatisticsServlet.fillRemainingFlowCapacityAndLastDispatchedTime

3)

fillCpuUsage(stats);

==============================================================================

其实就是返回这3种信息

对于web server来说,5秒内拿到返回内容后,更新本地消息

  public void setExecutorInfo(ExecutorInfo info) {

    this.cachedExecutorStats = info;

    this.lastStatsUpdatedTime = new Date();

  }

以此来作为存活监控!

转载于:https://my.oschina.net/qiangzigege/blog/658020

你可能感兴趣的文章
POJ3304:Segments——题解
查看>>
48.EXt.Data.JsonReader()
查看>>
UML关系图
查看>>
一个action读取另一个action里的session
查看>>
leetcode 175. Combine Two Tables
查看>>
如何给一个数组对象去重
查看>>
Guava包学习-Cache
查看>>
2019-06-12 Java学习日记之JDBC
查看>>
灯箱效果(点击小图 弹出大图集 然后轮播)
查看>>
linux c 笔记 线程控制(二)
查看>>
samba服务器配置
查看>>
vue.js笔记
查看>>
【Unity3D入门教程】Unity3D之GUI浅析
查看>>
Hive 简单操作
查看>>
湘潭1247 Pair-Pair(树状数组)
查看>>
idea 不能粘贴复制问题
查看>>
IEnumerable<T>
查看>>
IntelliJ IDEA 注册码
查看>>
linux 上面配置apache2的虚拟目录
查看>>
Linux学习总结 (未完待续...)
查看>>