Avro 的基本使用 | Avro 篇

Posted by Aiden on July 4, 2022

Apache Avro 是一个数据序列化系统。

Avro 提供:

  • 丰富的数据结构。
  • 一种紧凑、快速的二进制数据格式。
  • 一个容器文件,用于存储持久数据。
  • 远程过程调用 (RPC)。
  • 与动态语言的简单集成。代码生成不需要读取或写入数据文件,也不需要使用或实现 RPC 协议。代码生成作为一种可选的优化,只值得为静态类型语言实现。

Avro 依赖于Schema。在读取或者写入avro数据时都需要携带Schema信息。这允许在没有每个值开销的情况下写入每个数据,从而使序列化既快速又小。这也便于使用动态脚本语言,因为数据及其模式是完全自描述的。

当 Avro 数据存储在文件中时,它的schema也随之存储,以便以后任何程序都可以处理文件。

Schema

Avro 的 schema 可以用 Json 形式表示, 形如:

{
  "namespace": "com.ehaier.bigdata.avro.schema",
  "type": "record",
  "name": "User",
  "doc": "demo for test",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favoriteNumber",  "type": ["int", "null"]},
    {"name": "favoriteColor", "type": ["string", "null"]}
  ]
}     

数据支持简单类型与复杂类型.

简单类型
  • null
  • boolean
  • int
  • long
  • float
  • double
  • bytes
  • string

Records

相当于 object 类型, 内部支持一下属性。

  • name 表示这个记录的名字(相当于类名)
  • namespace 名称空间(相当于包名)
  • doc record 说明文档
  • aliases 别名
  • fields Record 内部所包含的字段,每个字段可以包含以下部分内容:
    • name 字段名称
    • doc 字段说明
    • type 字段类型
    • default 默认值
{
    "type" : "record",
    "name" : "testRecord",
    "namespace" : "test.schema"
    "aliases" : ["test_record"],
    "fields" : [
        {"name": "value", "type": "long"}
    ]
}

Enums

枚举类型

  • name 枚举名称
  • namespace 名称空间
  • aliases 枚举别名
  • doc 说明文档
  • symbols 枚举值(数组表示)
  • default 枚举默认值
{
    "type": "enum",
    "name": "suit",
    "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}

Arrays

数组类型

  • items 单个类型的数据类型
{
    "type" : "array",
    "items" : "string",
    "default" : []
}

Maps

映射类型

  • values value 的数据类型
{
    "type" : "map",
    "values" : "string"
}

Union

联合数据类型,它使用数组表示, 例如 ["string", "null"] 他表示可以是null,也可以是字符串。

Avro 的常见操作

引入 avro 的依赖

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>${avro.version}</version>
</dependency>

schema 的构建

方法一:通过Schema描述文件构建

{
  "namespace": "com.ehaier.bigdata.avro.schema",
  "type": "record",
  "name": "User",
  "doc": "demo for test",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favoriteNumber",  "type": ["int", "null"]},
    {"name": "favoriteColor", "type": ["string", "null"]}
  ]
}

通过schema描述文件的构建方式需要maven编译支持, 添加maven依赖.

avro-maven-plugin中引入需要导入的 schema 描述文件

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>${avro.version}</version>
  <executions>
   <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <imports>
          <import>${basedir}/src/main/avro/user.avsc</import>
        </imports>
      </configuration>
    </execution>
  </executions>
</plugin>

mvn 会在编译的时候将目标的描述文件编译成class文件,namespace为对应的包名。name 为对应的类名。

方法二: 直接加载schema描述性文件

private Schema getSchemaByFile() throws IOException {
  InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("user.avsc");
  return new Schema.Parser().parse(inputStream);
}

方法三:代码构建Schema

使用 SchemaBuilder.builder() 来构建 Avro Schema.

builder.recod() 方法开启一个 record. fields() 开启填充所需要的字段开启填充所有字段。endRecord()结束填充字段.

注意在填充字段时,模式是非空的, 如果是需要空的字段,需要明示 nullable()

private Schema getSchemaByCode(){
  SchemaBuilder.TypeBuilder<Schema> builder = SchemaBuilder.builder();       // Schema 构造器

  return builder.record("User")
      .namespace("com.ehaier.bigdata.avro.schema")
      .fields()
      .name("name").type(builder.stringType()).noDefault()
      .name("favoriteNumber").type(builder.nullable().intType()).noDefault()
      .name("favoriteColor").type(builder.nullable().stringType()).noDefault()
      .endRecord();
}

基于Schema填充数据

Avro 在程序中使用 GenericRecord 来维护数据实体

对于Schema的数据填充生成

GenericRecord record = new GenericData.Record(schema);
record.put("name", "aiden");
record.put("favoriteNumber", 1000);
record.put("favoriteColor", "red");

基于描述文件直接生成的Schema对象

对于直接从描述文件直接生成的目标对象, 则可以直接调用。Record name 字段及为目标类名.

User user = User.newBuilder()
    .setName("aiden")
    .setFavoriteNumber(1000)
    .setFavoriteColor("red")
    .build();

这里需要说明的是, 生成的目标对象(User) 为 GenericRecord 的子类。

其中 setter 分别对应Record的字段名

GenericRecord 数据序列化到磁盘

public void serializeRecord() throws IOException {
  Schema schema = getSchemaByCode();

  GenericRecord record = new GenericData.Record(schema);
  record.put("name", "aiden");
  record.put("favoriteNumber", 1000);
  record.put("favoriteColor", "red");

  File file = new File("user1.avro");

  DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<GenericRecord>(schema);
  DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);

  dataFileWriter.create(schema, file);
  dataFileWriter.append(record);
  dataFileWriter.flush();
  dataFileWriter.close();
}
public void serializeRecord2() throws IOException {

  User user = User.newBuilder()
    .setName("aiden")
    .setFavoriteNumber(1000)
    .setFavoriteColor("red")
    .build();

  File file = new File("user2.avro");
  DatumWriter datumWriter = new SpecificDatumWriter<>(User.class);
  DataFileWriter fileWriter = new DataFileWriter<>(datumWriter);

  fileWriter.create(user.getSchema(), file);
  fileWriter.append(user);
  fileWriter.flush();
  fileWriter.close();
}
GenericRecord 数据反序列化
public void deserializeRecord1() throws IOException {

  Schema schema = getSchemaByCode();

  File file = new File("user2.avro");
  DatumReader specificDatumReader = new SpecificDatumReader<>(schema);
  DataFileReader dataFileReader = new DataFileReader(file, specificDatumReader);

  while(dataFileReader.hasNext()){
    GenericRecord data = (GenericRecord) dataFileReader.next(schema);
    System.out.println(data);
  }

  dataFileReader.close();
}
public void deserializeRecord2() throws IOException {

  File file = new File("user1.avro");
  DatumReader datumReader = new SpecificDatumReader(User.class);
  DataFileReader dataFileReader = new DataFileReader(file, datumReader);

  while (dataFileReader.hasNext()){
    User user = (User)dataFileReader.next();
    System.out.println(user);
  }
}

参考文档