Apache Avro序列化/反序列化数据及Spark读取avro数据

导语

本篇文章主要讲如何使用Apache Avro序列化数据以及如何通过spark将序列化数据转换成DataSet和DataFrame进行操作。

Apache Arvo是什么?


Apache Avro 是一个数据序列化系统。

  1. 支持丰富的数据结构
  1. 快速可压缩的二进制数据格式
  2. 存储持久数据的文件容器
  3. 远程过程调用(RPC)
  4. 动态语言的简单集成

Avro提供Java、Python、C、C++、C#等语言API接口,下面我们通过java的一个实例来说明Avro序列化和反序列化数据。


Avro官网:http://avro.apache.org/
Avro版本:1.8.1
下载Avro相关jar包:avro-tools-1.8.1.jar 该jar包主要用户将定义好的schema文件生成对应的java文件

定义一个schema文件,命名为CustomerAdress.avsc,格式如下:

{
  "namespace":"com.peach.arvo",
  "type": "record",
  "name": "CustomerAddress",
  "fields": [
    {"name":"ca_address_sk","type":"long"},
    {"name":"ca_address_id","type":"string"},
    {"name":"ca_street_number","type":"string"},
    {"name":"ca_street_name","type":"string"},
    {"name":"ca_street_type","type":"string"},
    {"name":"ca_suite_number","type":"string"},
    {"name":"ca_city","type":"string"},
    {"name":"ca_county","type":"string"},
    {"name":"ca_state","type":"string"},
    {"name":"ca_zip","type":"string"},
    {"name":"ca_country","type":"string"},
    {"name":"ca_gmt_offset","type":"double"},
    {"name":"ca_location_type","type":"string"}
  ]
}``` 
* namespace:在生成java文件时import包路径
* type:omplex types(record, enum, array, map, union, and fixed)
* name:生成java文件时的类名
* fileds:schema中定义的字段及类型

在这里schema文件定义完成后,通过上面下载的avro-tools-1.8.1.jar包,来生成java code,命令如下:
```java -jar avro-tools-1.8.1.jar compile schema CustomerAddress.avsc .```
末尾的"."代表java code 生成在当前目录,命令执行成功后显示:
![生成javacode](http://upload-images.jianshu.io/upload_images/4861551-84cc805ecf1dfa48.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
在当前目录的com/peach/avro/目录下有生成相应的CustomerAddress.java文件,待工程创建后使用。
####使用maven创建一个java工程,下面为工程的目录结构
<p>添加maven依赖:</p>

    <dependency>
        <groupId>org.apache.avro</groupId>  
        <artifactId>avro</artifactId>  
        <version>1.8.1</version>  
    </dependency>  

![maven工程目录结构](http://upload-images.jianshu.io/upload_images/4861551-45804c10c2d94871.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

编写代码生成avro数据文件,代码片段

package com.peach;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import com.peach.arvo.CustomerAddress;

/**

  • @author peach

  • 2017-03-02

  • 主要用于生成avro数据文件
    */
    public class GenerateDataApp {
    // private static String customerAddress_avsc_path;
    //
    // static {
    // customerAddress_avsc_path = GenerateDataApp.class.getClass().getResource("/CustomerAddress.avsc").getPath();
    // }
    private static String source_data_path = "F:\data\customer_address.dat"; //源数据文件路 径
    private static String dest_avro_data_path = "F:\data\customeraddress.avro"; //生成的avro数据文件路径

    public static void main(String[] args) {

     try {  
    

// if(customerAddress_avsc_path != null) {
// File file = new File(customerAddress_avsc_path);
// Schema schema = new Schema.Parser().parse(file);
// }
DatumWriter<CustomerAddress> caDatumwriter = new SpecificDatumWriter<>(CustomerAddress.class);
DataFileWriter<CustomerAddress> dataFileWriter = new DataFileWriter<>(caDatumwriter);
dataFileWriter.create(new CustomerAddress().getSchema(), new File(dest_avro_data_path));
loadData(dataFileWriter);
dataFileWriter.close();
} catch (Exception e) {
e.printStackTrace();
}
}

/**  
 * 加载源数据文件  
 * @param dataFileWriter  
 */  
private static void loadData(DataFileWriter<CustomerAddress> dataFileWriter) {  
    File file = new File(source_data_path);  
    if(!file.isFile()) {  
        return;  
    }  
    try {  
        InputStreamReader isr = new InputStreamReader(new FileInputStream(file));  
        BufferedReader reader = new BufferedReader(isr);  
        String line;  
        CustomerAddress address;  
        while ((line = reader.readLine()) != null) {  
            address = getCustomerAddress(line);  
            if (address != null) {  
                dataFileWriter.append(address);  
            }  
        }  
        isr.close();  
        reader.close();  
    } catch (Exception e) {  
        e.printStackTrace();  
    }  
}  

/**  
 * 通过记录封装CustomerAddress对象  
 * @param line  
 * @return  
 */  
private static CustomerAddress getCustomerAddress(String line) {  
    CustomerAddress ca = null;  
    try {  
        if (line != null && line != "") {  
            StringTokenizer token = new StringTokenizer(line, "|"); //使用stringtokenizer拆分字符串时,会去自动除""类型  
            if(token.countTokens() >= 13) {  
                ca = new CustomerAddress();  
                ca.setCaAddressSk(Long.parseLong(token.nextToken()));  
                ca.setCaAddressId(token.nextToken());  
                ca.setCaStreetNumber(token.nextToken());  
                ca.setCaStreetName(token.nextToken());  
                ca.setCaStreetType(token.nextToken());  
                ca.setCaSuiteNumber(token.nextToken());  
                ca.setCaCity(token.nextToken());  
                ca.setCaCounty(token.nextToken());  
                ca.setCaState(token.nextToken());  
                ca.setCaZip(token.nextToken());  
                ca.setCaCountry(token.nextToken());  
                ca.setCaGmtOffset(Double.parseDouble(token.nextToken()));  
                ca.setCaLocationType(token.nextToken());  
            } else {  
                System.err.println(line);  
            }  
        }  
    } catch (NumberFormatException e) {  
        System.err.println(line);  
    }  

    return ca;  
}  

}


动态生成avro文件,通过将数据封装为GenericRecord对象,动态的写入avro文件,以下代码片段

private static void loadData(DataFileWriter<GenericRecord> dataFileWriter, Schema schema) {
File file = new File(sourcePath);
if(file == null) {
logger.error("[peach], source data not found");
return ;
}

    InputStreamReader inputStreamReader = null;  
    BufferedReader bufferedReader = null;  
    try {  
        inputStreamReader = new InputStreamReader(new FileInputStream(file));  
        bufferedReader = new BufferedReader(inputStreamReader);  
        String line;  
        GenericRecord genericRecord;  
        while((line = bufferedReader.readLine()) != null) {  
            if(line != "") {  
                String[] values = line.split("\\|");  
                genericRecord = SchemaUtil.convertRecord(values, schema);  
                if(genericRecord != null) {  
                    dataFileWriter.append(genericRecord);  
                }  
            }  
        }  

    } catch (Exception e) {  
        e.printStackTrace();  
    } finally {  
        try {  
            if(bufferedReader != null) {  
                bufferedReader.close();  
            }  
            if(inputStreamReader != null) {  
                inputStreamReader.close();  
            }  
        } catch (IOException e) {  
        }  
    }  

}  

avro文件生成完成后,创建scala工程,使用sparkapi读取avro文件,添加spark maven 依赖
    <dependency>  
        <groupId>com.peach</groupId>  
        <artifactId>generatedata</artifactId>  
        <version>1.0-SNAPSHOT</version>  
    </dependency>  
    <dependency>  
        <groupId>com.databricks</groupId>  
        <artifactId>spark-avro_2.10</artifactId>  
        <version>2.1.0</version>  
    </dependency>  
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-sql_2.10</artifactId>  
        <version>2.1.0</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-core_2.10</artifactId>  
        <version>2.1.0</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.avro</groupId>  
        <artifactId>avro</artifactId>  
        <version>1.8.1</version>  
    </dependency>  
![maven scala 工程](http://upload-images.jianshu.io/upload_images/4861551-4c3e36494e8b6f36.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

编写scala读取代码,以下代码片段

case class CustomerAddressData(ca_address_sk: Long,
ca_address_id: String,
ca_street_number: String,
ca_street_name: String,
ca_street_type: String,
ca_suite_number: String,
ca_city: String,
ca_county: String,
ca_state: String,
ca_zip: String,
ca_country: String,
ca_gmt_offset: Double,
ca_location_type: String
)
// org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

def main(args: Array[String]): Unit = {
val path = "/Users/zoulihan/Desktop/customeraddress.avro"
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ //为什么要加此段代码?

val _rdd = sc.hadoopFile[AvroWrapper[CustomerAddress], NullWritable, AvroInputFormat[CustomerAddress]](path)  
val ddd = _rdd.map(line => new CustomerAddressData(  
  line._1.datum().getCaAddressSk,  
  line._1.datum().getCaAddressId.toString,  
  line._1.datum().getCaStreetNumber.toString,  
  line._1.datum().getCaStreetName.toString,  
  line._1.datum().getCaStreetType.toString,  
  line._1.datum().getCaSuiteNumber.toString,  
  line._1.datum().getCaCity.toString,  
  line._1.datum().getCaCounty.toString,  
  line._1.datum().getCaState.toString,  
  line._1.datum().getCaZip.toString,  
  line._1.datum().getCaCountry.toString,  
  line._1.datum().getCaGmtOffset,  
  line._1.datum().getCaLocationType.toString  
))  
val ds = sqlContext.createDataset(ddd)  
ds.show()  
val df = ds.toDF();  
df.createTempView("customer_address");

// sqlContext.sql("select count(*) from customer_address").show()
sqlContext.sql("select * from customer_address limit 10").show()
}


<p>spark运行结果</p>

![Paste_Image.png](http://upload-images.jianshu.io/upload_images/4861551-b539947108706374.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

源代码:
https://github.com/javaxsky/avrotospark
扩展:
1.如何将avro数据文件load到hive中
2.通过sparksql将统计后的数据加载到hive中
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容