Batch - stream compatibility

All functions in this library use the same logic for batch & stream mode, so a state generated with batch mode is fully compatible with state generated streaming mode. This allows various options regarding saving & loading a model, for example you can pre-train in batch mode & continue training in stream mode with a stream-static join.

To start a streaming Kalman filter from a batch state, a snapshot of the state as a dataframe is needed. You can typically use the state with largest stateIndex as a snapshot of the state.

// Batch dataframe of measurements
val batchMeasurements: DataFrame = ...

val batchFilter = new LinearKalmanFilter(2, 1)
  .setStateKeyCol("stateKey")
  .setMeasurementCol("measurement")

// Create a state snapshot from the latest output
val batchState = batchFilter.transform(batchMeasurements)
  .filter(s"stateIndex = $batchMeasurementCount")
  .select("stateKey", "state").cache()
batchState.show()
/*
+--------+--------------------+
|stateKey|               state|
+--------+--------------------+
|       0|[[19.515662167212...|
|       1|[[19.468300376746...|
+--------+--------------------+
*/

This state snapshot will be used as an initial value for the second filter. With a stream-static join on stateKey column, the state snapshot can be added to the streaming ‘measurements’ dataframe. Using setInitialStateDistributionCol setting to specify the initial state, stream training can be resumed.

// This will copy batch filter, but initial state will be read from 'state' column
val streamFilter = batchFilter
  .setInitialStateDistributionCol("state")

// Static-stream join to add the initial state column.
val streamMeasurements = streamDF
  .join(batchState, "stateKey")

val query = streamFilter.transform(streamMeasurements)
  .writeStream
  .queryName("LKFStreamBatchInit")
  .outputMode("append")
  .format("console")
  .start()

/**
 *
 * -------------------------------------------
 * Batch: 0
 * -------------------------------------------
 * +--------+----------+-----+
 * |stateKey|stateIndex|state|
 * +--------+----------+-----+
 * +--------+----------+-----+
 *
 * -------------------------------------------
 * Batch: 1
 * -------------------------------------------
 * +--------+----------+--------------------+
 * |stateKey|stateIndex|               state|
 * +--------+----------+--------------------+
 * |       0|         1|[[20.633594973582...|
 * |       0|         2|[[21.789119165418...|
 * |       0|         3|[[22.863165204359...|
 * |       1|         1|[[20.455445849573...|
 * |       1|         2|[[21.844582344505...|
 * |       1|         3|[[22.451603629903...|
 * +--------+----------+--------------------+
 *
 * -------------------------------------------
 * Batch: 2
 * -------------------------------------------
 * +--------+----------+--------------------+
 * |stateKey|stateIndex|               state|
 * +--------+----------+--------------------+
 * |       0|         4|[[23.906030752251...|
 * |       0|         5|[[25.025250483006...|
 * |       1|         4|[[23.195987426484...|
 * |       1|         5|[[24.199653312063...|
 * +--------+----------+--------------------+
 */

} See examples for the complete code

Restarts

In case of a failure or intentional shutdown in streaming mode, spark checkpointing mechanism can be used as usual.

df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("memory")
  .start()

The internal state of this library is maintained with avro, so the state will be restored from checkpoints successfully most of the time. If you make a change that’s not allowed by spark (i.e changes listed here) and need to migrate the state, you can use the pattern in the previous section to recover from a separate data store.

Event time and ordering

If measurements are associated with a timestamp and ordered processing of measurements is desired, event time column can be set from the input dataframe. This will cause measurements to be processed in ascending order of event time column.

Note that setting event time column will not guarantee end-to-end ordered processing in stream mode. Ordering is only guaranteed per minibatch. Append output mode is used in the filters, so if strict ordering in streaming mode is desired aggregated measurements with a specific time window and watermark should be used as an input to filters. All filters also support setting watermark duration along with event time column to help propagating watermarks.

val filter = new LinearKalmanFilter(2, 1)
  .setMeasurementCol("measurement")
  .setEventTimeCol("eventTime")
  .setWatermarkDuration("10 seconds")

Expiring State

To cleanup unused state, state timeout can be enabled. Enabling state timeout will clear the state after the specified timeout duration passes. If a state receives measurements after it times out, the state will be initialized as if it received no measurements. Supported values are none, process and event

  • none: No state timeout, state is kept indefinitely.

  • process: Process time based state timeout, state will be cleared if no measurements are received for a duration based on processing time. Effects all states. Timeout duration must be set with setStateTimeoutDuration.

  • event: Event time based state timeout, state will be cleared if no measurements are received for a duration based on event time determined by watermark. Effects all states. Timeout duration must be set with setStateTimeoutDuration. Additionally, event time column and it’s watermark duration must be set with setEventTimeCol and setWatermarkDuration. Note that this will result in dropping measurements occuring later than the watermark.

    // Event time based state timeout. States receiving no measurements for 12 hours will be cleared.
    // Timeout duration is measured with event time, so event time column must be set.
    val filter = new LinearKalmanFilter(2, 1)
      .setStateKeyCol("modelId")
      .setMeasurementCol("measurement")
      .setEventTimeCol("eventTime")
      .setStateTimeoutDuration("12 hours")
      .setStateTimeoutMode("event")
    
    // Process time based state timeout. States receiving no measurements for 12 hours will be cleared.
    // Timeout duration is measured with processing time. Therefore, it's not necessary to set event time column
    val filter = new LinearKalmanFilter(2, 1)
      .setStateKeyCol("modelId")
      .setMeasurementCol("measurement")
      .setStateTimeoutDuration("12 hours")
      .setStateTimeoutMode("process")
    

Version upgrades

Semantic versioning is used. In principle, in streaming mode you can update the version of this library without any state incompatibilities from previously checkpointed state. If a release of this library to cause state incompatibility, this will only happen in major releases. However, spark version upgrades might render checkpointed state unusable (Due to other stateful transormations in the code, etc..) so it’s always advised to save the state variables in a separate data store and resume the streaming pipeline using the pattern in the previous section.