Spark的度量系统有以下几部分,也可以参照MetricsSystem类的注释部分
- Instance: 数据实例。Spark的Instance有Master、Worker、ApplicationInfo、StreamingContext等,主要用来提供Source数据、启停MetricsSystem
- Source: 度量数据输入源。Source采集的数据来源于Instance实例属性
- Sink: 度量数据输出源。Spark使用MetricsServlet作为默认Sink
- MetricsConfig: 度量需要的配置信息。initialize()方法初始化properties
- MetricsSystem: instance粒度的Source、Sink控制中心
Source
Spark将度量数据来源抽象为Source接口。提供了ApplicationSource、MasterSource、WorkerSource、DAGSchedulerSource、StreamingSource、JvmSource等实现
private[spark] trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}
- sourceName: 度量源名称
- metricRegistry: 度量源注册对象
具体分析下MasterSource、WorkerSource、JvmSource输入源
private[spark] class MasterSource(val master: Master) extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "master"
// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
override def getValue: Int = master.workers.size
})
// Gauge for alive worker numbers in cluster
metricRegistry.register(MetricRegistry.name("aliveWorkers"), new Gauge[Int]{
override def getValue: Int = master.workers.count(_.state == WorkerState.ALIVE)
})
// Gauge for application numbers in cluster
metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] {
override def getValue: Int = master.apps.size
})
// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
override def getValue: Int = master.apps.count(_.state == ApplicationState.WAITING)
})
}
private[worker] class WorkerSource(val worker: Worker) extends Source {
override val sourceName = "worker"
override val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name("memUsed_MB"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})
// Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})
// Gauge for memory free of this worker
metricRegistry.register(MetricRegistry.name("memFree_MB"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}
MetricRegistry的Gauge统计数据来源于Master、Worker对象的字段属性
JvmSource的MetricSet来源于metrics-jvm包的实现
private[spark] class JvmSource extends Source {
override val sourceName = "jvm"
override val metricRegistry = new MetricRegistry()
metricRegistry.registerAll(new GarbageCollectorMetricSet)
metricRegistry.registerAll(new MemoryUsageGaugeSet)
metricRegistry.registerAll(
new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
}
Source用来创建MetricRegistry对象,并register需要统计的数据指标,指标来源于Instance实例对象属性
Sink
Spark将度量数据统计输出源抽象为Sink接口。提供了ConsoleSink、CsvSink、MetricsServlet、GraphiteSink、JmxSink、Slf4jSink等实现
private[spark] trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
- MetricsServlet: 在Spark UI的jetty服务中创建ServletContextHandler,将度量数据统计展示在浏览器
具体分析下Slf4jSink实现
private[spark] class Slf4jSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val SLF4J_DEFAULT_PERIOD = 10
val SLF4J_DEFAULT_UNIT = "SECONDS"
val SLF4J_KEY_PERIOD = "period"
val SLF4J_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => SLF4J_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
}
// 检查scheduleAtFixedRate周期时间最短1s
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}
主要看start()方法,需要一个reporter对象,以及数据产生的周期时间pollPeriod、pollUnit。start再调用ScheduledReporter.start()
public void start(long period, TimeUnit unit) {
// executor对象是Executors.newSingleThreadScheduledExecutor实现
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
report();
} catch (Exception ex) {
LOG.error("Exception thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
}
}
}, period, period, unit);
}
Sink需要创建reporter、pollPeriod、pollUnit。周期性获取Source数据并reporter
MetricsConfig
读取Metrics相关的配置信息
private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_PREFIX = "*"
private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
private[metrics] val properties = new Properties()
private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null
// 设置default Properties属性
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
/**
* Load properties from various places, based on precedence
* If the same property is set again latter on in the method, it overwrites the previous value
*/
// 入口方法,加载配置信息
def initialize() {
// Add default properties in case there's no properties file
setDefaultProperties(properties)
loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
// Also look for the properties in provided Spark configuration
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
case _ =>
}
// Now, let's populate a list of sub-properties per instance, instance being the prefix that
// appears before the first dot in the property name.
// Add to the sub-properties per instance, the default properties (those with prefix "*"), if
// they don't have that exact same sub-property already defined.
//
// For example, if properties has ("*.class"->"default_class", "*.path"->"default_path",
// "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
// ("driver"->Map("path"->"driver_path", "class"->"default_class")
// Note how class got added to based on the default property, but path remained the same
// since "driver.path" already existed and took precedence over "*.path"
perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
(k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}
/**
* Take a simple set of properties and a regex that the instance names (part before the first dot)
* have to conform to. And, return a map of the first order prefix (before the first dot) to the
* sub-properties under that prefix.
*
* For example, if the properties sent were Properties("*.sink.servlet.class"->"class1",
* "*.sink.servlet.path"->"path1"), the returned map would be
* Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1"))
* Note in the subProperties (value of the returned Map), only the suffixes are used as property
* keys.
* If, in the passed properties, there is only one property with a given prefix, it is still
* "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1"
* the returned Map would contain one key-value pair
* Map("*" -> Properties("sink.servlet.class" -> "class1"))
* Any passed in properties, not complying with the regex are ignored.
*
* @param prop the flat list of properties to "unflatten" based on prefixes
* @param regex the regex that the prefix has to comply with
* @return an unflatted map, mapping prefix with sub-properties under that prefix
*/
// 参考下面图片示例
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
val subProperties = new mutable.HashMap[String, Properties]
prop.asScala.foreach { kv =>
if (regex.findPrefixOf(kv._1.toString).isDefined) {
val regex(prefix, suffix) = kv._1.toString
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
}
}
subProperties
}
// 当key不存在时,获取*对应的properties属性
def getInstance(inst: String): Properties = {
perInstanceSubProperties.get(inst) match {
case Some(s) => s
case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
/**
* Loads configuration from a config file. If no config file is provided, try to get file
* in class path.
*/
private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
var is: InputStream = null
try {
is = path match {
// 标准写法。path存在时FileInputStream读取;不存在读取项目中的metrics.properties文件,通过classloader加载: Utils.getSparkClassLoader.getResourceAsStream
case Some(f) => new FileInputStream(f)
case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
}
if (is != null) {
// load进properties里
properties.load(is)
}
} catch {
case e: Exception =>
val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
logError(s"Error loading configuration file $file", e)
} finally {
if (is != null) {
// 切记必须close
is.close()
}
}
}
}
MetricsSystem
负责register Sources、Sinks,并start sinks。MetricsSystem不是系统的控制中心,而是每个instance一个MetricsSystem对象,负责instance粒度的控制
MetricsSystem类三个核心方法: registerSources()、registerSinks()、sinks.foreach(_.start)
private[spark] class MetricsSystem private (
val instance: String,
conf: SparkConf,
securityMgr: SecurityManager)
extends Logging {
// 构造MetricsConfig对象,用于读取配置信息
private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()
private var running: Boolean = false
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
}
// MetricsConfig对象初始化
metricsConfig.initialize()
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
// 注册StaticSources,也就是CodegenMetrics、HiveCatalogMetrics
StaticSources.allSources.foreach(registerSource)
// 注册Sources
registerSources()
// 获取Sinks
registerSinks()
// 启动Sinks
sinks.foreach(_.start)
}
def stop() {
if (running) {
// foreach调用Sinks的stop方法
sinks.foreach(_.stop)
} else {
logWarning("Stopping a MetricsSystem that is not running")
}
running = false
}
def report() {
// foreach调用Sinks的report方法
sinks.foreach(_.report())
}
/**
* Build a name that uniquely identifies each metric source.
* The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
* If either ID is not available, this defaults to just using <source name>.
*
* @param source Metric source to be named by this method.
* @return An unique metric name for each combination of
* application, executor/driver and metric source.
*/
// 构建registry name
private[spark] def buildRegistryName(source: Source): String = {
val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {
if (metricsNamespace.isDefined && executorId.isDefined) {
// 当instance是driver或executor时,name的元素构成
// {{conf.getOption("spark.app.id")}}.{{conf.getOption("spark.executor.id")}}.{{source.sourceName}}
MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
} else {
// Only Driver and Executor set spark.app.id and spark.executor.id.
// Other instance types, e.g. Master and Worker, are not related to a specific application.
if (metricsNamespace.isEmpty) {
logWarning(s"Using default name $defaultName for source because neither " +
s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
}
if (executorId.isEmpty) {
logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
s"not set.")
}
defaultName
}
} else { defaultName }
}
def getSourcesByName(sourceName: String): Seq[Source] =
sources.filter(_.sourceName == sourceName)
// 注册单个source
def registerSource(source: Source) {
sources += source
try {
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}
// 删除source
def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
}
// 注册所有以source开头的数据源
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
// MetricsSystem.SOURCE_REGEX: "^source\\.(.+)\\.(.+)".r
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
// 反射对象。这里只能反射无参数的Source对象,比如JvmSource
val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
// 获取以sink开头的Sinks
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
// 以sink开头的属性配置: "^sink\\.(.+)\\.(.+)".r
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
// 传入构造函数参数值创建sink对象: kv._2, registry, securityMgr
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
// key是servlet时,转换成MetricsServlet对象
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
// 否则添加到sinks列表
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
}
}
}
}
}
总结
先看下metrics.properties.template模板
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
*.sink.statsd.prefix=spark
*.sink.console.period=10
*.sink.console.unit=seconds
master.sink.console.period=15
master.sink.console.unit=seconds
*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
*.sink.csv.period=1
*.sink.csv.unit=minutes
*.sink.csv.directory=/tmp/
worker.sink.csv.period=10
worker.sink.csv.unit=minutes
*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
*.sink.slf4j.period=1
*.sink.slf4j.unit=minutes
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
- 先读取properties配置信息
- 根据instance name,获取${name}开头的,不存在时读取*开头的属性值,生成instConfig对象
- instConfig基础上再分别获取source或sink开头的sourceConfigs对象
- sourceConfigs获取source class、sink class反射对象。source反射时调用默认的无参构造函数,只能反射比如JvmSource,对于MasterSource需要在Master类里new出来;sink反射时传入了构造函数参数值,参数也是从prop里读取的
- register sources、sinks start
- MetricsSystem负责source、sink的启停,而每个instance单独启停自身的metrics