flink 支持的数据类型

  1. Java Tuples 跟 Scala Case 类
  2. Java POJOs
  3. 基础类型(Primitive Types : int/long/string/char/short/boolean 等)
  4. 普通的类(非POJO)
  5. Values
  6. Hadoop Writable
  7. 特殊类型(Scala : Either, Option, Try; Java : List, Map)

flink 支持的序列化

  1. Tuple
  2. Row
  3. Pojo
  4. Avro
  5. Protobuf (via Kryo)
  6. Thrift (via Kryo)
  7. Kryo

flink serialization performance results

可以看到 flink 内置的 TupleRow 性能最好, POJO 次之,
一般 TuplePOJO使用的频率最高, 但是只有POJOAvro 支持 Schema 升级

POJO 一不小心就可能回退到 Kryo

POJO 第一个要求是符合 Java Bean 规范, 但是目前(1.12.2) 还不支持特殊的类型(List, Map等)
为了避免 POJO序列化回退, 开发过程中可以开启

env.getConfig().disableGenericTypes();

POJO 中包含List/Map 处理方式

  public class Pojo1 {
    public int id;
    public List<String> names
  }
  public static void main(String[] args) {
    ExecutionConfig config = new ExecutionConfig();
    config.disableGenericTypes();

    TypeInformation<Pojo1> information = Types.POJO(Pojo1.class);
//    TypeInformation<Pojo1> information = TypeInformation.of(Pojo1.class);

    //Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
    TypeSerializer<Pojo1> serializer = information.createSerializer(config);

  }

因为POJO包含了不支持的List该序列化, names 字段序列化会回退到 Kryo序列化

解决方案

1: 把 List 换成 Array

  public static class Pojo1 {
    public int id;
//    public List<String> names;
    public String[] names;
  }

  public static void main(String[] args) {
    ExecutionConfig config = new ExecutionConfig();
    config.disableGenericTypes();

    TypeInformation<Pojo1> information = Types.POJO(Pojo1.class);

    TypeSerializer<Pojo1> serializer = information.createSerializer(config);

  }

2: 指定 POJO 字段 TypeInformation

  public static void main(String[] args) {
    ExecutionConfig config = new ExecutionConfig();
    config.disableGenericTypes();

    Map<String, TypeInformation<?>> map = new HashMap<>();
    map.put("id", Types.INT);
    map.put("names", Types.LIST(Types.STRING));
    TypeInformation<Pojo1> information = Types.POJO(Pojo1.class, map);

    TypeSerializer<Pojo1> serializer = information.createSerializer(config);
  }

3: 自定义 TypeInfoFactory


  @TypeInfo(MyPojo1Factory.class)
  public static class Pojo1 {
    public int id;
    public List<String> names;
  }

  public static class MyPojo1Factory extends TypeInfoFactory<Pojo1> {

    @Override
    public TypeInformation<Pojo1> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
      Map<String, TypeInformation<?>> map = new HashMap<>();
      map.put("id", Types.INT);
      map.put("names", Types.LIST(Types.STRING));
      TypeInformation<Pojo1> information = Types.POJO(Pojo1.class, map);
      return information;
    }
  }

  public static void main(String[] args) {
    ExecutionConfig config = new ExecutionConfig();
    config.disableGenericTypes();

    TypeInformation<Pojo1> information = Types.POJO(Pojo1.class);
//    TypeInformation<Pojo1> information = TypeInformation.of(Pojo1.class);
    TypeSerializer<Pojo1> serializer = information.createSerializer(config);

  }

引用

  • https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/types_serialization.html
  • https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html