对于ambari-collector 部分源码流程的简单理解

工作中遇到了ambari指标的搜集和展示问题,因需要增添部分python脚本对平台的脚本数据进行汇聚和整理,所以需要理解ambari-collector的部分流程,所以进行了简要的阅读,故做此分析,以防止后续遗忘。以下代码都存在删减,仅供参考。

  • 首先来看一下main.py
from core.controller import Controller
def main(argv=None):
  # Allow Ctrl-C
  stop_handler = bind_signal_handlers()
  server_process_main(stop_handler)
def server_process_main(stop_handler, scmStatus=None):
  if scmStatus is not None:
    scmStatus.reportStartPending()
  config = Configuration()
  _init_logging(config)
  controller = Controller(config, stop_handler)
  logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
  controller.start()
  print "Server out at: " + SERVER_OUT_FILE
  print "Server log at: " + SERVER_LOG_FILE
  save_pid(os.getpid(), PID_OUT_FILE)
  if scmStatus is not None:
    scmStatus.reportStarted()
  #The controller thread finishes when the stop event is signaled
  controller.join()
  remove_file(PID_OUT_FILE)
  pass

由上述代码可以看出来,server_process_main 里面实例化了Controller类,并controller.start()开启了一个线程,那么我们看一下Controller里面的的代码(有删略)

from emitter import Emitter
class Controller(threading.Thread):
  def __init__(self, config, stop_handler):
    # Process initialization code
    threading.Thread.__init__(self)
    self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
  def run(self):
    self.start_emitter()
    Timer(1, self.addsecond).start()
    while True:
      if (self.event_queue.full()):
        logger.warn('Event Queue full!! Suspending further collections.')
      else:
        self.enqueque_events()
      pass
      if 0 == self._stop_handler.wait(self.sleep_interval):
        logger.info('Shutting down Controller thread')
        break
      if not self._t is None:
        self._t.cancel()
        self._t.join(5)
     self.emitter.join(5)
     pass
  def start_emitter(self):
    self.emitter.start()

run函数里面执行了start_emitter(),然后对Emitter进行实例化,执行了emmitter.start(),接下来我们看一下Emitter的代码

class Emitter(threading.Thread):
  COLLECTOR_URL = "xxxxx"
  RETRY_SLEEP_INTERVAL = 5
  MAX_RETRY_COUNT = 3
  def __init__(self, config, application_metric_map, stop_handler):
    threading.Thread.__init__(self)
    self.lock = threading.Lock()
    self.collector_address = config.get_server_address()
    self.send_interval = config.get_send_interval()
    self._stop_handler = stop_handler
    self.application_metric_map = application_metric_map
  def run(self):
    while True:
      try:
        self.submit_metrics()
      except Exception, e:
        logger.warn('Unable to emit events. %s' % str(e))
      pass
      if 0 == self._stop_handler.wait(self.send_interval):
        logger.info('Shutting down Emitter thread')
        return
    pass
def submit_metrics(self):
    retry_count = 0
    # This call will acquire lock on the map and clear contents before returning
    # After configured number of retries the data will not be sent to the
    # collector
    json_data = self.application_metric_map.flatten(None, True)
    if json_data is None:
      logger.info("Nothing to emit, resume waiting.")
      return
    pass
    response = None
    while retry_count < self.MAX_RETRY_COUNT:
      try:
        response = self.push_metrics(json_data)
      except Exception, e:
        logger.warn('Error sending metrics to server. %s' % str(e))
      pass
      if response and response.getcode() == 200:
        retry_count = self.MAX_RETRY_COUNT
      else:
        logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
        retry_count += 1
        #Wait for the service stop event instead of sleeping blindly
        if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
          return
      pass
    pass
  def push_metrics(self, data):
    headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
    server = self.COLLECTOR_URL.format(self.collector_address.strip())
    logger.info("server: %s" % server)
    logger.debug("message to sent: %s" % data)
    req = urllib2.Request(server, data, headers)
    response = urllib2.urlopen(req, timeout=int(self.send_interval - 10))
    if response:
      logger.debug("POST response from server: retcode = {0}".format(response.getcode()))
      logger.debug(str(response.read()))
    pass
    return response

由上述代码可以看出来,run函数执行的时候,执行了submit_metrics()函数,重点来了,该函数的核心就是 json_data = self.application_metric_map.flatten(None, True),当前类继承自ApplicationMetricsMap,让我们去查看一下ApplicationMetricsMap的代码

def flatten(self, application_id = None, clear_once_flattened = False):
    with self.lock:
      timeline_metrics = { "metrics" : [] }
      local_metric_map = {}
      if application_id:
        if self.app_metric_map.has_key(application_id):
          local_metric_map = { application_id : self.app_metric_map[application_id] }
        else:
          logger.info("application_id: {0}, not present in the map.".format(application_id))
      else:
        local_metric_map = self.app_metric_map.copy()
      pass
      for appId, metrics in local_metric_map.iteritems():
        for metricId, metricData in dict(metrics).iteritems():
          # Create a timeline metric object
          timeline_metric = {
            "hostname" : self.hostname if appId == "HOST" else "",
            "metricname" : metricId,
            #"appid" : "HOST",
            "appid" : appId,
            "instanceid" : "",
            "starttime" : self.get_start_time(appId, metricId),
            "metrics" : metricData
          }
          timeline_metrics[ "metrics" ].append( timeline_metric )
        pass
      pass
      json_data = json.dumps(timeline_metrics) if len(timeline_metrics[ "metrics" ]) > 0 else None
      if clear_once_flattened:
        self.app_metric_map.clear()
      pass
      return json_data
  pass

由此函数可以看得出来,该函数主要就是对数据进行一些合并,汇聚形成新的数据结构,但是当第一次在Controller里面执行start_emmiter()时候,该函数并未执行,因为self.app_metric_map的数据结构并未生成,让我们往前看,在Controller的run函数里面有这么一行代码,self.enqueue_events(),从字面意思看出来是事件入队列,让我们找到该函数,最终进行相互调用后是执行了process_service_collection_event

 def process_service_collection_event(self, event):
    startTime = int(round(time() * 1000))
    metrics = None
    path = os.path.abspath('.')
    for root, dirs, files in os.walk("%s/libs/" % path):
      appid = event.get_group_name().split('_')[0]
      metricgroup = event.get_group_name().split('_')[1]
      if ("%s_metrics.sh" % appid) in filter(lambda x: ".sh" in x, files):
        metrics = {appid: self.service_info.get_service_metrics(appid, metricgroup)}
      else:
        logger.warn('have no %s modules' % appid)
    if metrics:
      for item in metrics:
        self.application_metric_map.put_metric(item, metrics[item], startTime)
    pass

这段代码就是执行各个服务的脚本,然后汇聚数据,最终生成metrics变量,然后执行了self.application_metric_map.put_metric(item, metrics[item], startTime),这个application_metric_map其实就是ApplicationMetricMap类的实例,其中有一个函数如下所示:

 def put_metric(self, application_id, metric_id_to_value_map, timestamp):
    with self.lock:
      for metric_name, value in metric_id_to_value_map.iteritems():
        metric_map = self.app_metric_map.get(application_id)
        if not metric_map:
          metric_map = { metric_name : { timestamp : value } }
          self.app_metric_map[ application_id ] = metric_map
        else:
          metric_id_map = metric_map.get(metric_name)
          if not metric_id_map:
            metric_id_map = { timestamp : value }
            metric_map[ metric_name ] = metric_id_map
          else:
            metric_map[ metric_name ].update( { timestamp : value } )
          pass
        pass
  pass

其实这段代码主要是从脚本中搜集的数据,形成最终的app_metric_map数据,在Controller中一直被无限调用,只是我们第一次执行start_emitter()时候并未执行而已,当从脚本中搜集到数据,才会执行真正的调用,然后通过requests模块,上报到 metrics collector的6188端口中,最终数据落于hbase中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,302评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,232评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,337评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,977评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,920评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,194评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,638评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,319评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,455评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,379评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,426评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,106评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,696评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,786评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,996评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,467评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,043评论 2 341