Агрегация и агрегаты
Самый частый кейс для RealTime-приложений - это возможность выводить на табло обновляемые в фоновом режиме некоторые агрегаты: скользящие средние, нарастающие итоги (суммы, количества), максимальный и минимальный элементы временного ряда.
Для этого существует специальный режим агрегации CompleteMode.
В этом режиме мы и запустим вывод на консоль.
val writer = result.writeStream
.trigger(ProcessingTime(3000))
.outputMode(OutputMode.Complete()) // можно передать и строку "complete"
.format("console")
.start()
На основе примера из главы "JOIN для бедных" строим агрегацию вдоль каждой страны, выводя нарастающую от раза к разу сумму ряда value:
val join = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.selectExpr("CAST(key as STRING)", "CAST(value AS INT)")
.as[(String, String)]
.join(dictionary, "key")
val result = join.select($"country", $"value")
.groupBy($"country")
.sum("value")
или можно посчитать количество записей, приходящихся на каждую страну:
val result = join.select($"country", $"value")
.groupBy($"country")
.count()
Весьма обширным получается DAG - первая часть нам знакома по предыдущей главе.
Кроме того, что нам нужно провести предагрегацию на своей стороне, нам необходимо
- восстановить предыдущее накопленное состояние
- объединить новые результаты с ним
- сохранить полученные результаты для последущих инкрементальных джоб
Все это можно увидеть ниже.
Ну и на закуску как обычно планы, в которых можно раглядеть и временные директории, куда сохранялось состояние между запусками инкрементальных джоб, и подробную информацию о функциях объединения (полезно, если будете пытаться отладить).
P.S. Если же очень сильно хочется получать сырые данные, из которых получаются агрегаты в режиме AppendMode, то без магии Watermark и оконных функций не обойтись.