falcon agent模块源码解析

导读

falcon是小米开源的监控平台,广泛用于许多互联网公司。agent模块是metric采集服务,它就像搬运工,将各种metric输送到transfer服务。如果说这些是它的主要工作,那么插件安装,同步信任ip名单等便是它的副业。本着抓主要矛盾的原则,本文不包含副业解读。

main函数

main函数主要做一些初始化工作,并启动http服务。

 func main() {
      //入参cfg := flag.String("c","cfg.json","configuration file")
        version := flag.Bool("v", false,"show version")
        check := flag.Bool("check", false,"check collector")
        //入参解析
      flag.Parse()
      if *version {
          fmt.Println(g.VERSION)
         os.Exit(0)
}
//check collect功能是否正常
if *check {
    funcs.CheckCollector()
    os.Exit(0)
}
//解析配置
g.ParseConfig(*cfg)
if g.Config().Debug {
g.InitLog("debug")
}else {
g.InitLog("info")
}

//初始化工作目录
g.InitRootDir()
//初始化本地ip,通过向heartbeat服务发起一次连接获取本地ip
g.InitLocalIp()
//初始化rpc client,new一个结构体出来
g.InitRpcClients()
//初始化一系列Mapper,内含采集metric的各种函数
funcs.BuildMappers()
//初始化更新cpu 硬盘统计协程
go cron.InitDataHistory()
//启动同步agent状态定时器
cron.ReportAgentStatus()
//启动同步插件定时器,插件安装异步实现
cron.SyncMinePlugins()
//启动配置的buildin  metric采集项定时器
cron.SyncBuiltinMetrics()
//启动同步信任ip定时器
cron.SyncTrustableIps()
//启动采集metric定时器
cron.Collect()
//启动http服务
go http.Start()
//阻塞
select {}
}

长连接

为了回避创建连接销毁连接的开销,agent与transfer通过长连接通信。调用下面的call函数上报metric到transfer。

func (this *SingleConnRpcClient) Call(method string, argsinterface{}, replyinterface{}) error {
    //上锁,协程安全
    this.Lock()
    defer this.Unlock()
    / /长连接生成函数
      err := this.serverConn()
      if err != nil {
        return err
}
timeout := time.Duration(10 * time.Second)
done := make(chan error,1)
go func() {
//发起调用
err := this.rpcClient.Call(method, args, reply)
done <- err
}()
select {
case <-time.After(timeout):
//超时,连接可能已经不可用,需要关闭连接
log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
this.close()
return errors.New(this.RpcServer +" rpc call timeout")
case err := <-done:
if err != nil {
//如果出错也关闭连接,不管什么错误,有点粗暴
this.close()
return err
  }
}
return nil
}

生成连接。

func (this *SingleConnRpcClient) serverConn() error {
if this.rpcClient != nil {
return nil
}
var err error
var retry int =1
      for {
//出错的时候,调用close函数会把rpcClient 置为nil,从而触发重建连接
if this.rpcClient != nil {
     return nil
}
//发起连接
this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
if err != nil {
    log.Printf("dial %s fail: %v", this.RpcServer, err)
    if retry >3 {
        return err
}
//重试3次,每次休眠2的retry幂次方
time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
retry++
continue
      }
return err
}
}

metric采集上传

采集metric主要入口

func Collect() {

//开关检测

if !g.Config().Transfer.Enabled {
      return  }
//没有配置transfer地址
if len(g.Config().Transfer.Addrs) ==0 {
return  
 }

//开始执行各个采集函数,funcs.Mappers是在main函数里初始化好的

for _, v :=range funcs.Mappers {
go collect(int64(v.Interval), v.Fs)
}
}

具体调用采集函数的入口

func collect(sec int64, fns []func() []*model.MetricValue) {
t := time.NewTicker(time.Second * time.Duration(sec))
defer t.Stop()
for {
<-t.C
hostname, err := g.Hostname()
if err != nil {
continue
              }
mvs := []*model.MetricValue{}
ignoreMetrics := g.Config().IgnoreMetrics
for _, fn :=range fns {
//调用采集函数
items := fn()
if items == nil {
continue
 }
if len(items) ==0 {
        continue
}

for _, mv :=range items {
  if b, ok := ignoreMetrics[mv.Metric]; ok && b {
      continue
 }else {
mvs = append(mvs, mv)
}
}
}
now := time.Now().Unix()
for j :=0; j < len(mvs); j++ {
mvs[j].Step = sec
mvs[j].Endpoint = hostname
mvs[j].Timestamp = now
}
//通过长连接将metrics send到transfer
g.SendToTransfer(mvs)
}
}

以上是agent采集metric的主要流程。各个采集功能函数都在funcs.Mappers里面,如果需要crud采集功能,只需修改funcs.Mappers相关逻辑即可。用一个采集内存的功能函数做例,其他的异曲同工。

//返回值连同其他采集结果调用SendToTransfer发送到transfer
func MemMetrics() []*model.MetricValue {
//获取内存信息
m, err := nux.MemInfo()
if err != nil {
log.Println(err)
return nil
}
memFree := m.MemFree + m.Buffers + m.Cached
if m.MemAvailable >0 {
memFree = m.MemAvailable
}
memUsed := m.MemTotal - memFree
pmemFree :=0.0
      pmemUsed :=0.0
      if m.MemTotal !=0 {
pmemFree = float64(memFree) *100.0 / float64(m.MemTotal)
pmemUsed = float64(memUsed) *100.0 / float64(m.MemTotal)
}
pswapFree :=0.0
      pswapUsed :=0.0
      if m.SwapTotal !=0 {
pswapFree = float64(m.SwapFree) *100.0 / float64(m.SwapTotal)
pswapUsed = float64(m.SwapUsed) *100.0 / float64(m.SwapTotal)
}
return []*model.MetricValue{
//总内存
              GaugeValue("mem.memtotal", m.MemTotal),
//used的内存
              GaugeValue("mem.memused", memUsed),
//free的内存
              GaugeValue("mem.memfree", memFree),
//swap总内存
              GaugeValue("mem.swaptotal", m.SwapTotal),
//swap used内存
              GaugeValue("mem.swapused", m.SwapUsed),
//swap free内存
              GaugeValue("mem.swapfree", m.SwapFree),
//内存free比例
              GaugeValue("mem.memfree.percent", pmemFree),
//内存使用比例
              GaugeValue("mem.memused.percent", pmemUsed),
//swap free比例
              GaugeValue("mem.swapfree.percent", pswapFree),
//swap used比例
              GaugeValue("mem.swapused.percent", pswapUsed),
      }
}

SendToTransfer函数核心还是调用SendMetrics函数

func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
rand.Seed(time.Now().UnixNano())
//随机选取transfer节点,负载均衡
for _, i :=range rand.Perm(len(Config().Transfer.Addrs)) {
addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}

//在这个函数里面会调用长连接那一节的call函数,来初始化或者复用连接

if updateMetrics(c, metrics, resp) {
break
              }
}
}

上面说的是agent自己采集的metric,用户也可以调用push接口主动上传。比如mysql,mongdb等这些外部的进程的采集。这是push接口的主要实现,

func configPushRoutes() {
http.HandleFunc("/v1/push",func(w http.ResponseWriter, req *http.Request) {
    if req.ContentLength ==0 {
        http.Error(w,"body is blank", http.StatusBadRequest)
        return
       }
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics)
if err != nil {
          http.Error(w,"connot decode body", http.StatusBadRequest)
return
         }

//同样调用的SendToTransfer
g.SendToTransfer(metrics)
w.Write([]byte("success"))
})
}

结束

纸上得来终觉浅,绝知此事要躬行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容