原文连接 https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html

Apache Flink 的状态处理器 API 为使用 Flink DataSet API 读取, 写入和修改 SavepointCheckpoint 提供了强大的功能.
这有利于分析相关的状态模式, 检查状态差异进行故障排查或审计以及新作业状态的初始化.

概要

为了更好的理解在一个批处理的上下文中 Savepoint 是怎么运行的, 可以把 Flink 状态当成传统的关系数据库来理解.
一个 database 有一到多个 namespace, 每个 namespace 包含一组table, table 又包含相应的 columnvalue, 好比在同一作用域下的 key
Savepoint 表示一个 Flink 作业在特定时间点的状态,该时间点由许多操作组成, 这些操作包含各种状态, 包括 KeyedStateOperatorState.

MapStateDescriptor<Integer, Double> CURRENCY_RATES = new MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE);
 
class CurrencyConverter extends BroadcastProcessFunction<Transaction, CurrencyRate, Transaction> {
 
  public void processElement(
        Transaction value,
        ReadOnlyContext ctx,
        Collector<Transaction> out) throws Exception {
 
     Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId);
     if (rate != null) {
        value.amount *= rate;
     }
     out.collect(value);
  }
 
  public void processBroadcastElement(
        CurrencyRate value,
        Context ctx,
        Collector<Transaction> out) throws Exception {
        ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, value.rate);
  }
}
  
class Summarize extends RichFlatMapFunction<Transaction, Summary> {
  transient ValueState<Double> totalState;
  transient ValueState<Integer> countState;
 
  public void open(Configuration configuration) throws Exception {
     totalState = getRuntimeContext().getState(new ValueStateDescriptor<>("total", Types.DOUBLE));
     countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT));
  }
 
  public void flatMap(Transaction value, Collector<Summary> out) throws Exception {
     Summary summary = new Summary();
     summary.total = value.amount;
     summary.count = 1;
 
     Double currentTotal = totalState.value();
     if (currentTotal != null) {
        summary.total += currentTotal;
     }
 
     Integer currentCount = countState.value();
     if (currentCount != null) {
        summary.count += currentCount;
     }
     countState.update(summary.count);
 
     out.collect(summary);
  }
}
 
DataStream<Transaction> transactions = . . .
BroadcastStream<CurrencyRate> rates = . . .
transactions
  .connect(rates)
  .process(new CurrencyConverter())
  .uid("currency_converter")
  .keyBy(transaction -> transaction.accountId)
  .flatMap(new Summarize())
  .uid("summarize")

该作业包含多个的操作, 跟各种状态. 在分析该状态时,我们可以通过指定操作的 uid 来确定数据的范围, 然后就可以查看该操作的状态了
CurrencyConverter 有个广播状态, 这是一种非分区的操作状态, 通常操作状态中的任何两个元素之间没有关系,因此我们可以将每个值视为自己的行
将此与 Summarize 进行对比, Summarize 包含两个 Keyed 状态, 因为两个状态都在相同的键下, 我们可以安全地假设这两个值之间存在某种关系
因此 Keyed 状态可以被理解为每个操作包含一个键列和 n 个值列的单个表, 这些意味着可以使用以下伪 sql 命令描述此作业的状态

CREATE NAMESPACE currency_converter;
 
CREATE TABLE currency_converter.rates (
   value Tuple2<Integer, Double>
);
 
CREATE NAMESPACE summarize;
 
CREATE TABLE summarize.keyed_state (
   key   INTEGER PRIMARY KEY,
   total DOUBLE,
   count INTEGER
);

通常 Savepointdatabase 关系可以概括为

* savepoint 是一个 database
* 指定 uid 的操作是一个 namespace
* 每个操作状态代表一个 table
   * 操作状态中的每个元素表示该表中的单个行
* 每个包含 Keyed 状态的运算符都有一个 "keyed_state"* 每个 keyed_state 表都有一个键列映射运算符的键值
   * 每个注册状态代表表中的一列
   * 表中的每一行都映射到一个键

读取状态

读取状态首先指定有效 savepointcheckpoint 的路径,以及使用的 StateBackend. 恢复的兼容性保证跟 DataStream 程序中定义的一致

ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend());

读取操作状态时, 只需指定操作 uid, 状态名称和类型信息

DataSet<Integer> listState  = savepoint.readListState(
    "my-uid",
    "list-state",
    Types.INT);

DataSet<Integer> unionState = savepoint.readUnionState(
    "my-uid",
    "union-state",
    Types.INT);
 
DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState(
    "my-uid",
    "broadcast-state",
    Types.INT,
    Types.INT);

自定义的 TypeSerializer 的状态仍然可以使用, 只要在 StateDescriptor 指定即可

DataSet<Integer> listState = savepoint.readListState(
    "uid",
    "list-state", 
    Types.INT,
    new MyCustomIntSerializer());

当读取 Keyed 状态时, 用户指定 KeyedStateReaderFunction 以允许读取任意列和复杂状态类型, 例如 ListState, MapStateAggregatingState
这意味着操作包含有状态过程函数,例如:

public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Integer, Void> {
 
   ValueState<Integer> state;
 
   @Override
   public void open(Configuration parameters) {
      state = getRuntimeContext().getState(stateDescriptor);
   }
 
   @Override
   public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception {
      state.update(value + 1);
   }
}

然后它可以通过定义输出类型和相应的 KeyedStateReaderFunction 来读取

class KeyedState {
  Integer key;
  Integer value;
}
 
class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> {
  ValueState<Integer> state;
 
  @Override
  public void open(Configuration parameters) {
     state = getRuntimeContext().getState(stateDescriptor);
  }
 
  @Override
  public void processKey(
    Integer key,
    Context ctx,
    Collector<KeyedState> out) throws Exception {
 
     KeyedState data = new KeyedState();
     data.key    = key;
     data.value  = state.value();
     out.collect(data);
  }
}
 
DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());

Note: 使用 KeyedStateReaderFunction 时,必须在 open 内部注册所有状态描述符, 否则任何调用 RuntimeContext#getState,RuntimeContext#getListStateRuntimeContext#getMapState 都将导致 RuntimeException

写新 Savepoint

Savepoint 状态, 其中一个 Savepoint 可能有许多操作, 并且任何特定操作的状态都是使用 BootstrapTransformation 创建的
BootstrapTransformation 以包含要写入状态的值的 DataSet 开始. 根据您是否正在写入键控或操作状态, 可以选择键入转换. 最后, 根据转换应用引导函数
Flink 提供用于写入 Keyed 状态的 KeyedStateBootstrapFunction, 用于写入非 Keyed 状态的 StateBootstrapFunction 和用于写入广播状态的 BroadcastStateBootstrapFunction

public  class Account {
    public int id;

    public double amount;	

    public long timestamp;
}
 
public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> {
    ValueState<Double> state;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total",Types.DOUBLE);
        state = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(Account value, Context ctx) throws Exception {
        state.update(value.amount);
    }
}
 
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);

BootstrapTransformation<Account> transformation = OperatorTransformation
    .bootstrapWith(accountDataSet)
    .keyBy(acc -> acc.id)
    .transform(new AccountBootstrapper());

KeyedStateBootstrapFunction 支持设置事件时间和处理时间计时器, 定时器不会触发计算, 只有在 DataStream 应用程序中恢复后才会激活. 如果设置了处理时间计时器, 但状态在该时间过去之后才恢复, 则计时器将在启动时立即触发
一旦创建了一个或多个转换,它们就可以组合成一个 Savepoint, Savepoint 是使用状态后端和最大并行度创建的, 它们可以包含任意数量的操作

Savepoint
    .create(backend, 128)
    .withOperator("uid1", transformation1)
    .withOperator("uid2", transformation2)
    .write(savepointPath);

除了从头开始创建 Savepoint 之外, 还可以基于现有 Savepoint, 例如在为现有作业引导单个新操作时

Savepoint
    .load(backend, oldPath)
    .withOperator("uid", transformation)
    .write(newPath);

Note: 当基于现有状态建立新的 Savepoint 时, 状态处理器 API 会对指向现有运算符的指针进行浅复制. 这意味着两个保存点共享状态, 并且一个保存点不能在不损坏另一个保存点的情况下被删除!