Агрегация и агрегаты

Самый частый кейс для RealTime-приложений - это возможность выводить на табло обновляемые в фоновом режиме некоторые агрегаты: скользящие средние, нарастающие итоги (суммы, количества), максимальный и минимальный элементы временного ряда.

Для этого существует специальный режим агрегации CompleteMode.

В этом режиме мы и запустим вывод на консоль.

    val writer = result.writeStream
      .trigger(ProcessingTime(3000))
      .outputMode(OutputMode.Complete()) // можно передать и строку "complete"
      .format("console")
      .start()

На основе примера из главы "JOIN для бедных" строим агрегацию вдоль каждой страны, выводя нарастающую от раза к разу сумму ряда value:

    val join = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .selectExpr("CAST(key as STRING)", "CAST(value AS INT)")
      .as[(String, String)]
      .join(dictionary, "key")

    val result = join.select($"country", $"value")
      .groupBy($"country")
      .sum("value")

или можно посчитать количество записей, приходящихся на каждую страну:

    val result = join.select($"country", $"value")
      .groupBy($"country")
      .count()

Весьма обширным получается DAG - первая часть нам знакома по предыдущей главе.

Кроме того, что нам нужно провести предагрегацию на своей стороне, нам необходимо

  • восстановить предыдущее накопленное состояние
  • объединить новые результаты с ним
  • сохранить полученные результаты для последущих инкрементальных джоб

Все это можно увидеть ниже.

Ну и на закуску как обычно планы, в которых можно раглядеть и временные директории, куда сохранялось состояние между запусками инкрементальных джоб, и подробную информацию о функциях объединения (полезно, если будете пытаться отладить).

P.S. Если же очень сильно хочется получать сырые данные, из которых получаются агрегаты в режиме AppendMode, то без магии Watermark и оконных функций не обойтись.

results matching ""

    No results matching ""