在 java 中我倒是较少使用 switch case ,大部分都是if else 搞定
但是在 scala 中 模式匹配 可以说是登峰造极了,你可以用它完成之前很多繁琐机械的问题,最主要是灵活多变,总有一款适合你
先讲讲最简单的
我这里有一个需求
把 hdfs 上的原始日志数据进行压缩,其中压缩方式 有 snappy gzip lzo defalte 大致四种,原先我把每一种压缩方式都写了一个独立的方法,验证都可以正常使用,写代码要时刻有重构的想法,我一看大部分 流程都一致,可以把这些合四为一,把单独不一致的内容拿出来,做case 判断识别
比如
单个的 snappy 压缩
/**
* hdfs 文件 snappy 压缩
*
* @param fs
* @param conf
* @param inpath
* @param outPath
*/
def hdfsFileCompressBySnappyCodec(fs: FileSystem, conf: Configuration, inpath: String, outPath: String): Unit = {
//压缩时 读取fsdata流 写入 compress流【fs-buff-compress]
val inputPath: Path = new Path(inpath)
val inFsData: FSDataInputStream = fs.open(inputPath)
val snappyCC: SnappyCodec = new SnappyCodec()
//val snappyComp:SnappyCompressor=new SnappyCompressor()
snappyCC.setConf(conf)
// val snappyFile :String= getFileOriginName(inpath) +snappyCC.getDefaultExtension
val inSubPath: String = getOutFileSubPath(inpath)
var nOutPath = ""
if (outPath.endsWith("/")) {
nOutPath = outPath.substring(0, outPath.length - 1)
} else {
nOutPath = outPath
}
val snappyFile: String = nOutPath + inSubPath + snappyCC.getDefaultExtension
val outdir: Path = new Path(snappyFile)
val fsDataOutStream: FSDataOutputStream = fs.create(outdir)
val fsBufferOutStream: BufferedOutputStream = new BufferedOutputStream(fsDataOutStream)
val compressOutStream: CompressionOutputStream = snappyCC.createOutputStream(fsBufferOutStream)
val bufInpStream: BufferedInputStream = new BufferedInputStream(inFsData)
val ioBuffer: Array[Byte] = new Array[Byte](64 * 1024)
var readLen: Int = 0
val start = System.currentTimeMillis()
println("snappy codec begining || " + snappyFile)
try {
while ( {
readLen = bufInpStream.read(ioBuffer)
readLen != -1
}) {
compressOutStream.write(ioBuffer, 0, readLen)
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
compressOutStream.flush()
compressOutStream.finish()
compressOutStream.close()
IOUtils.closeStream(inFsData)
IOUtils.closeStream(fsDataOutStream)
//fs.close()
}
val end = System.currentTimeMillis()
val timeCause = end - start
println("snappy codec finish || " + snappyFile + " || 时间消耗 " + timeCause + " ms")
}
其实就是 不同压缩格式 创建不同的 codec
所以我做了 case就是创建不同codec ,通过 查询 发现 这些 codec
都 是 CompressionCodec的子类,
所以就这样办了
方法增加一个 输入的参数 string codecMethod作为制定 压缩格式
val codec:CompressionCodec =codecMethod match {
case "SNAPPY" => new SnappyCodec()
// { codec.asInstanceOf[SnappyCodec].setConf(conf)
// }
case "GZIP" => new GzipCodec()
case "LZO" => new Lz4Codec()
case "DEFALTE" => new DefaultCodec()
case _ => new DefaultCodec()
}
这样一来 剩下的流程都一致,可以做一些 一致的处理
,
除了上边的,其实我还有另一种case的使用 就是在一个方法中 通过
case 调用不同的方法,
我前面讲过 我在最开始前分别写了 四个独立的压缩方法,这四个方法相当于压缩算子,只能对单独文件进行 压缩,可是我们 在hdfs上 往往都是 目录嵌套好几层,直接从最外层 开始压缩,一口气可能就是成千上万个文件了,所以我写了一个 目录压缩方法,来对目录嵌套下的文件进行压缩,并且我在 目录压缩方法中使用了 递归,简直 不能再好了
,当然在目录压缩方法中也是可以指定 压缩格式的,我在这里 是通过调用独立的压缩算子方法实现 的
如
codec match {
case "SNAPPY" => hdfsFileCompressBySnappyCodec(fs, conf, inpath, outPath)
case "GZIP" => hdfsFileCompressByGzipCodec(fs, conf, inpath, outPath)
case "LZO" => LZOCodecHdfsFileCompressBy(fs, conf, inpath, outPath)
case "DEFALTE" =>deflateCompressForHdfsFile(fs,conf,inpath,outPath)
case _ => deflateCompressForHdfsFile(fs,conf,inpath,outPath)
}
具体的目录压缩方法
/**
* 按目录对文件 进行 snappy gzip lzo 压缩
* @param fs
* @param conf
* @param inpath 输入目录
* @param outPath 输出目录
* @param codec 压缩格式 缩写 SNAPPY GZIP LZO DEFALTE 默认为defalte
* @param propertiesPath 压缩文件类型刷选 属性文件路径
*/
def DirCompressBySnappyGzipLzoCodec(fs: FileSystem, conf: Configuration, inpath: String, outPath: String, codec: String="GZIP")(propertiesPath: String="/usr/local/info.properties"): Unit = {
val inputPa: Path = new Path(inpath)
val fsStatus: FileStatus = fs.getFileStatus(inputPa)
var flag = false
try{
if (fsStatus.isFile) {
flag = CompressUtils.boolFilePrefixContains(inpath, propertiesPath)
if (flag) {
codec match {
case "SNAPPY" => hdfsFileCompressBySnappyCodec(fs, conf, inpath, outPath)
case "GZIP" => hdfsFileCompressByGzipCodec(fs, conf, inpath, outPath)
case "LZO" => LZOCodecHdfsFileCompressBy(fs, conf, inpath, outPath)
case "DEFALTE" =>deflateCompressForHdfsFile(fs,conf,inpath,outPath)
case _ => deflateCompressForHdfsFile(fs,conf,inpath,outPath)
}
}
} else if (fsStatus.isDirectory) {
val listFs: Array[FileStatus] = fs.listStatus(inputPa)
listFs.foreach(fil => {
val fsiN = fil.getPath.getName
println("path dir name " + fsiN)
println("path parent " + fil.getPath.getParent)
val uriPath = fil.getPath.getParent.toString
var newInp = ""
if (uriPath.contains(":9000/")) {
val uriIndex = uriPath.indexOf(":9000/")
newInp = uriPath.substring(uriIndex + 5) + "/" + fsiN
} else {
newInp = uriPath + "/" + fsiN
}
if (fil.isFile) {
flag = CompressUtils.boolFilePrefixContains(newInp, propertiesPath)
if (flag) {
codec match {
case "SNAPPY" => hdfsFileCompressBySnappyCodec(fs, conf, newInp, outPath)
case "GZIP" => hdfsFileCompressByGzipCodec(fs, conf, newInp, outPath)
case "LZO" => LZOCodecHdfsFileCompressBy(fs, conf, newInp, outPath)
case "DEFALTE" =>deflateCompressForHdfsFile(fs,conf,inpath,outPath)
case _ => deflateCompressForHdfsFile(fs,conf,inpath,outPath)
}
}
} else {
DirCompressBySnappyGzipLzoCodec(fs, conf, newInp, outPath, codec)(propertiesPath)
}
})
}