flink 序列化性能优化跟陷阱
flink 类型和序列化机制
flink 支持的数据类型
- Java Tuples 跟 Scala Case 类
- Java POJOs
- 基础类型(Primitive Types : int/long/string/char/short/boolean 等)
- 普通的类(非POJO)
- Values
- Hadoop Writable
- 特殊类型(Scala : Either, Option, Try; Java : List, Map)
flink 支持的序列化
- Tuple
- Row
- Pojo
- Avro
- Protobuf (via Kryo)
- Thrift (via Kryo)
- Kryo
flink 序列化性能
可以看到 flink
内置的 Tuple
跟 Row
性能最好, POJO
次之,
一般 Tuple
跟 POJO
使用的频率最高, 但是只有POJO
跟 Avro
支持 Schema
升级
POJO 一不小心就可能回退到 Kryo
POJO
第一个要求是符合 Java Bean
规范, 但是目前(1.12.2) 还不支持特殊的类型(List, Map等)
为了避免 POJO
序列化回退, 开发过程中可以开启
env.getConfig().disableGenericTypes();
当POJO
中包含List/Map
处理方式
public class Pojo1 { public int id; public List<String> names } public static void main(String[] args) { ExecutionConfig config = new ExecutionConfig(); config.disableGenericTypes(); TypeInformation<Pojo1> information = Types.POJO(Pojo1.class); // TypeInformation<Pojo1> information = TypeInformation.of(Pojo1.class); //Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. TypeSerializer<Pojo1> serializer = information.createSerializer(config); }
因为POJO
包含了不支持的List
该序列化, names
字段序列化会回退到 Kryo
序列化
解决方案
1: 把 List
换成 Array
public static class Pojo1 { public int id; // public List<String> names; public String[] names; } public static void main(String[] args) { ExecutionConfig config = new ExecutionConfig(); config.disableGenericTypes(); TypeInformation<Pojo1> information = Types.POJO(Pojo1.class); TypeSerializer<Pojo1> serializer = information.createSerializer(config); }
2: 指定 POJO
字段 TypeInformation
public static void main(String[] args) { ExecutionConfig config = new ExecutionConfig(); config.disableGenericTypes(); Map<String, TypeInformation<?>> map = new HashMap<>(); map.put("id", Types.INT); map.put("names", Types.LIST(Types.STRING)); TypeInformation<Pojo1> information = Types.POJO(Pojo1.class, map); TypeSerializer<Pojo1> serializer = information.createSerializer(config); }
3: 自定义 TypeInfoFactory
@TypeInfo(MyPojo1Factory.class) public static class Pojo1 { public int id; public List<String> names; } public static class MyPojo1Factory extends TypeInfoFactory<Pojo1> { @Override public TypeInformation<Pojo1> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { Map<String, TypeInformation<?>> map = new HashMap<>(); map.put("id", Types.INT); map.put("names", Types.LIST(Types.STRING)); TypeInformation<Pojo1> information = Types.POJO(Pojo1.class, map); return information; } } public static void main(String[] args) { ExecutionConfig config = new ExecutionConfig(); config.disableGenericTypes(); TypeInformation<Pojo1> information = Types.POJO(Pojo1.class); // TypeInformation<Pojo1> information = TypeInformation.of(Pojo1.class); TypeSerializer<Pojo1> serializer = information.createSerializer(config); }
引用
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/types_serialization.html
- https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html