导读
falcon是小米开源的监控平台,广泛用于许多互联网公司。agent模块是metric采集服务,它就像搬运工,将各种metric输送到transfer服务。如果说这些是它的主要工作,那么插件安装,同步信任ip名单等便是它的副业。本着抓主要矛盾的原则,本文不包含副业解读。
main函数
main函数主要做一些初始化工作,并启动http服务。
func main() {
//入参cfg := flag.String("c","cfg.json","configuration file")
version := flag.Bool("v", false,"show version")
check := flag.Bool("check", false,"check collector")
//入参解析
flag.Parse()
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
//check collect功能是否正常
if *check {
funcs.CheckCollector()
os.Exit(0)
}
//解析配置
g.ParseConfig(*cfg)
if g.Config().Debug {
g.InitLog("debug")
}else {
g.InitLog("info")
}
//初始化工作目录
g.InitRootDir()
//初始化本地ip,通过向heartbeat服务发起一次连接获取本地ip
g.InitLocalIp()
//初始化rpc client,new一个结构体出来
g.InitRpcClients()
//初始化一系列Mapper,内含采集metric的各种函数
funcs.BuildMappers()
//初始化更新cpu 硬盘统计协程
go cron.InitDataHistory()
//启动同步agent状态定时器
cron.ReportAgentStatus()
//启动同步插件定时器,插件安装异步实现
cron.SyncMinePlugins()
//启动配置的buildin metric采集项定时器
cron.SyncBuiltinMetrics()
//启动同步信任ip定时器
cron.SyncTrustableIps()
//启动采集metric定时器
cron.Collect()
//启动http服务
go http.Start()
//阻塞
select {}
}
长连接
为了回避创建连接销毁连接的开销,agent与transfer通过长连接通信。调用下面的call函数上报metric到transfer。
func (this *SingleConnRpcClient) Call(method string, argsinterface{}, replyinterface{}) error {
//上锁,协程安全
this.Lock()
defer this.Unlock()
/ /长连接生成函数
err := this.serverConn()
if err != nil {
return err
}
timeout := time.Duration(10 * time.Second)
done := make(chan error,1)
go func() {
//发起调用
err := this.rpcClient.Call(method, args, reply)
done <- err
}()
select {
case <-time.After(timeout):
//超时,连接可能已经不可用,需要关闭连接
log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
this.close()
return errors.New(this.RpcServer +" rpc call timeout")
case err := <-done:
if err != nil {
//如果出错也关闭连接,不管什么错误,有点粗暴
this.close()
return err
}
}
return nil
}
生成连接。
func (this *SingleConnRpcClient) serverConn() error {
if this.rpcClient != nil {
return nil
}
var err error
var retry int =1
for {
//出错的时候,调用close函数会把rpcClient 置为nil,从而触发重建连接
if this.rpcClient != nil {
return nil
}
//发起连接
this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
if err != nil {
log.Printf("dial %s fail: %v", this.RpcServer, err)
if retry >3 {
return err
}
//重试3次,每次休眠2的retry幂次方
time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
retry++
continue
}
return err
}
}
metric采集上传
采集metric主要入口
func Collect() {
//开关检测
if !g.Config().Transfer.Enabled {
return }
//没有配置transfer地址
if len(g.Config().Transfer.Addrs) ==0 {
return
}
//开始执行各个采集函数,funcs.Mappers是在main函数里初始化好的
for _, v :=range funcs.Mappers {
go collect(int64(v.Interval), v.Fs)
}
}
具体调用采集函数的入口
func collect(sec int64, fns []func() []*model.MetricValue) {
t := time.NewTicker(time.Second * time.Duration(sec))
defer t.Stop()
for {
<-t.C
hostname, err := g.Hostname()
if err != nil {
continue
}
mvs := []*model.MetricValue{}
ignoreMetrics := g.Config().IgnoreMetrics
for _, fn :=range fns {
//调用采集函数
items := fn()
if items == nil {
continue
}
if len(items) ==0 {
continue
}
for _, mv :=range items {
if b, ok := ignoreMetrics[mv.Metric]; ok && b {
continue
}else {
mvs = append(mvs, mv)
}
}
}
now := time.Now().Unix()
for j :=0; j < len(mvs); j++ {
mvs[j].Step = sec
mvs[j].Endpoint = hostname
mvs[j].Timestamp = now
}
//通过长连接将metrics send到transfer
g.SendToTransfer(mvs)
}
}
以上是agent采集metric的主要流程。各个采集功能函数都在funcs.Mappers里面,如果需要crud采集功能,只需修改funcs.Mappers相关逻辑即可。用一个采集内存的功能函数做例,其他的异曲同工。
//返回值连同其他采集结果调用SendToTransfer发送到transfer
func MemMetrics() []*model.MetricValue {
//获取内存信息
m, err := nux.MemInfo()
if err != nil {
log.Println(err)
return nil
}
memFree := m.MemFree + m.Buffers + m.Cached
if m.MemAvailable >0 {
memFree = m.MemAvailable
}
memUsed := m.MemTotal - memFree
pmemFree :=0.0
pmemUsed :=0.0
if m.MemTotal !=0 {
pmemFree = float64(memFree) *100.0 / float64(m.MemTotal)
pmemUsed = float64(memUsed) *100.0 / float64(m.MemTotal)
}
pswapFree :=0.0
pswapUsed :=0.0
if m.SwapTotal !=0 {
pswapFree = float64(m.SwapFree) *100.0 / float64(m.SwapTotal)
pswapUsed = float64(m.SwapUsed) *100.0 / float64(m.SwapTotal)
}
return []*model.MetricValue{
//总内存
GaugeValue("mem.memtotal", m.MemTotal),
//used的内存
GaugeValue("mem.memused", memUsed),
//free的内存
GaugeValue("mem.memfree", memFree),
//swap总内存
GaugeValue("mem.swaptotal", m.SwapTotal),
//swap used内存
GaugeValue("mem.swapused", m.SwapUsed),
//swap free内存
GaugeValue("mem.swapfree", m.SwapFree),
//内存free比例
GaugeValue("mem.memfree.percent", pmemFree),
//内存使用比例
GaugeValue("mem.memused.percent", pmemUsed),
//swap free比例
GaugeValue("mem.swapfree.percent", pswapFree),
//swap used比例
GaugeValue("mem.swapused.percent", pswapUsed),
}
}
SendToTransfer函数核心还是调用SendMetrics函数
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
rand.Seed(time.Now().UnixNano())
//随机选取transfer节点,负载均衡
for _, i :=range rand.Perm(len(Config().Transfer.Addrs)) {
addr := Config().Transfer.Addrs[i]
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}
//在这个函数里面会调用长连接那一节的call函数,来初始化或者复用连接
if updateMetrics(c, metrics, resp) {
break
}
}
}
上面说的是agent自己采集的metric,用户也可以调用push接口主动上传。比如mysql,mongdb等这些外部的进程的采集。这是push接口的主要实现,
func configPushRoutes() {
http.HandleFunc("/v1/push",func(w http.ResponseWriter, req *http.Request) {
if req.ContentLength ==0 {
http.Error(w,"body is blank", http.StatusBadRequest)
return
}
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics)
if err != nil {
http.Error(w,"connot decode body", http.StatusBadRequest)
return
}
//同样调用的SendToTransfer
g.SendToTransfer(metrics)
w.Write([]byte("success"))
})
}
结束
纸上得来终觉浅,绝知此事要躬行。