Scala - 小巧高性能日志采集器

多功能Scala精简Filebeat日志采集版,可高度定制化。

依赖包

compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.7'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'com.google.guava', name: 'guava', version: '27.0.1-jre'

入口 App.scala

import java.io.File
import java.util.concurrent.{Executors, LinkedBlockingDeque, ScheduledExecutorService, TimeUnit}

import com.google.common.cache.LoadingCache
import org.apache.commons.io.filefilter.IOFileFilter

object App {

  //线程池
  val scheduledThreadPool: ScheduledExecutorService = Executors.newScheduledThreadPool(5)

  //filebeat 实例
  var instalnces = Map[String, JobThread]()


  def main(args: Array[String]): Unit = {

    //过滤优先级 includeSuffix => ignoreOlder => includePaths => excludePaths

    val dbPath = "/Users/lake/dounine/git/sr-galaxy-serv-loghub/db"

    val job = JobUtil.createJob(
      dbPath,
      logPath = "/Users/huanghuanlai/dounine/git/sr-galaxy-serv-loghub/logdir2",
      jobName = "test",
      includeSuffix = "log,txt", //日志后缀
      ignoreOlder = "24h", //忽略多久不更新的文件
      intervalFileStatus = "1s", //监听当前文件变动频率
      intervalScanFile = "30s", //扫描目录中匹配条件的频率
      includePaths = ".*", //匹配路径(正则表达式)
      excludePaths = "", //排除路径(正则表达式)
      handlerFileClose = "24h" //自动关闭多久不活跃文件句柄
    )

    instalnces += ("test" -> job)

    instalnces.values.foreach {
      scheduledThreadPool.execute
    }

    TimeUnit.SECONDS.sleep(10)

    scheduledThreadPool.shutdown()
  }
}

case class Job(
                jobName: String,
                workPath: String,
                logPath: String,
                intervalFileStatus: String,
                intervalScanFile: String,
                dirFilter: IOFileFilter,
                fileFilter: IOFileFilter,
                linesBlockQueue: LinkedBlockingDeque[String],
                handlerFiles: LoadingCache[String, File],
                seekDB: LoadingCache[String, java.lang.Long] 
              )

JobThread.scala

import java.io.{File, RandomAccessFile}
import java.util.concurrent.TimeUnit
import java.util.function.Consumer

import org.apache.commons.io.FileUtils

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

class JobThread(job: Job) extends Runnable {

  initSeekDb(job.jobName)

  override def run(): Unit = {

    App.scheduledThreadPool.scheduleAtFixedRate(new Runnable {
      //scan match log file
      override def run(): Unit = {
        val dirFile = FileUtils.getFile(job.logPath)
        val logFiles = FileUtils.listFiles(dirFile, job.fileFilter, job.dirFilter)
        logFiles.forEach(new Consumer[File] {
          override def accept(logFile: File): Unit = {
            job.handlerFiles.put(logFile.getAbsolutePath, logFile)
          }
        })
      }
    }, 0, JobUtil.getSecondsByAlias(job.intervalScanFile), TimeUnit.SECONDS)

    App.scheduledThreadPool.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = {
        job.handlerFiles.asMap().values().forEach(new Consumer[File] {
          override def accept(t: File): Unit = {
            val fileLength = t.length()
            val fileDbSeek = job.seekDB.get(t.getAbsolutePath)
            val lines = if (fileDbSeek == -1) {
              //新文件
              readLinesForSeek(0, t)
            } else if (fileDbSeek < t.length()) {
              //文件修改
              readLinesForSeek(fileDbSeek, t)
            } else {
              //文件未更新
              Array[String]()
            }
            if (lines.nonEmpty) {
              lines.foreach(job.linesBlockQueue.add)
            }
            job.seekDB.put(t.getAbsolutePath, fileLength)
          }
        })
        flushCacheSeekToDb()
      }
    }, 1, JobUtil.getSecondsByAlias(job.intervalFileStatus), TimeUnit.SECONDS)

    App.scheduledThreadPool.schedule(new Runnable {
      override def run(): Unit = {
        while (!App.scheduledThreadPool.isShutdown) {
          val line: String = job.linesBlockQueue.poll()
          if (null != line) {
            println(s"line = ${line}") //TODO 对文件新增的每一行做业务操作
          }
        }
      }
    }, 1, TimeUnit.MILLISECONDS)

  }

  def flushCacheSeekToDb(): Unit = {
    val dbFile = FileUtils.getFile(s"${job.workPath}/${job.jobName}/seek.txt")
    val seekLines = FileUtils.readLines(dbFile, "utf-8")
    val tmpList = ListBuffer.empty ++= job.seekDB.asMap().keys
    var matchCount = 0
    val matchLine = seekLines.map {
      line => {
        val lineInfos = line.split("\\:")
        val currentSeek = job.seekDB.get(lineInfos(0))
        tmpList -= lineInfos(0)
        if (!currentSeek.equals(lineInfos(1).toLong)) {
          //seek索引不相同,更新
          matchCount += 1
          s"${lineInfos(0)}:${currentSeek}"
        } else {
          line
        }
      }
    }.toList ++ tmpList.map {
      //插入没有匹配到的文件,新文件
      filePath => {
        matchCount += 1
        filePath + ":" + job.seekDB.get(filePath)
      }
    }
    if (matchCount > 0) {
      FileUtils.writeLines(dbFile, matchLine, false)
    }
  }

  def initSeekDb(dbName: String): Unit = {
    val dbFold = FileUtils.getFile(s"${job.workPath}/${job.jobName}")
    if (!dbFold.exists()) {
      dbFold.mkdirs()
    }
    val dbFile = FileUtils.getFile(s"${job.workPath}/${job.jobName}/seek.txt")
    if (!dbFile.exists()) {
      dbFile.createNewFile()
    }
  }

  def readLinesForSeek(seek: Long, file: File): Array[String] = {
    val randomFile = new RandomAccessFile(file, "r")
    randomFile.seek(seek)
    val byteList = new Array[Byte]((file.length() - seek).toInt)
    randomFile.readFully(byteList)
    randomFile.close()
    new String(byteList, "utf-8").split("\n")
  }

}

JobUtil.scala

import java.io.File
import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}

import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.{FileFilterUtils, IOFileFilter}

object JobUtil {

  def getSecondsByAlias(alias: String): Long = {
    val value = alias.substring(0, alias.length - 1).toLong
    alias.reverse.charAt(0) match {
      case 's' => value
      case 'm' => value * 60
      case 'h' => value * 60 * 60
      case 'd' => value * 60 * 60 * 24 * 30
      case default@_ => default.toLong
    }
  }

  def createJob(workPath: String, logPath: String, jobName: String, includeSuffix: String, ignoreOlder: String, intervalFileStatus: String, intervalScanFile: String, includePaths: String, excludePaths: String, handlerFileClose: String): JobThread = {
    val ignoreOlderSeconds = getSecondsByAlias(ignoreOlder)
    val suffixTypeFilters = includeSuffix.split(",").map {
      suff => FileFilterUtils.suffixFileFilter(suff)
    }
    val ignoreOlderFilter = new IOFileFilter {
      override def accept(file: File): Boolean = (System.currentTimeMillis() - file.lastModified()) / 1000 <= ignoreOlderSeconds

      override def accept(file: File, s: String): Boolean = true
    }
    val includeExcludeFilter = new IOFileFilter {
      override def accept(file: File): Boolean = {
        val excludePathsMatch = excludePaths.split(",").flatMap {
          excludePath => {
            if (file.getAbsolutePath.matches(excludePath)) {
              Array(x = true)
            } else {
              Array[Boolean]()
            }
          }
        }.contains(true)

        val includePathsMatch = includePaths.split(",").flatMap {
          excludePath => {
            if (file.getAbsolutePath.matches(excludePath)) {
              Array(x = true)
            } else {
              Array[Boolean]()
            }
          }
        }.contains(true)

        includePathsMatch || !excludePathsMatch
      }

      override def accept(file: File, s: String): Boolean = true
    }

    val fileFilter = FileFilterUtils.and(
      FileFilterUtils.or(suffixTypeFilters: _*), //文件后缀匹配
      ignoreOlderFilter, //忽略指定时间段以外修改的日志内容
      includeExcludeFilter
    )
    val handlerFiles = CacheBuilder.newBuilder()
      .expireAfterWrite(getSecondsByAlias(handlerFileClose), TimeUnit.SECONDS)
      .build[String, File](new CacheLoader[String, File] {
      override def load(k: String): File = {
        null
      }
    })
    val seekDB = CacheBuilder.newBuilder()
      .expireAfterWrite(24, TimeUnit.HOURS)
      .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long] {
      override def load(path: String): java.lang.Long = {
        val dbFile = FileUtils.getFile(s"${workPath}/${jobName}/seek.txt")
        val seekLines = FileUtils.readLines(dbFile, "utf-8")
        import scala.collection.JavaConversions._
        val matchLine = seekLines.flatMap {
          line => {
            if (line.split("\\:")(0).equals(path)) {
              Array(line)
            } else {
              Array[String]()
            }
          }
        }
        if (matchLine.nonEmpty) {
          matchLine.head.split(":")(1).toLong
        } else {
          -1L
        }
      }
    })
    new JobThread(Job(
      jobName,
      workPath,
      logPath,
      intervalFileStatus,
      intervalScanFile,
      FileFilterUtils.directoryFileFilter(),
      fileFilter,
      new LinkedBlockingDeque[String](),
      handlerFiles,
      seekDB
    ))
  }

谁运行谁知道性能到底有多强


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容