Операции над входящим потоком

Конечно, просто вывод информации на консоль, это не слишком интересно. Но давайте, сохраняя возможность быстрого вывода и проверки результатов, применим набор операций к входящему потоку. Начнем с простейщей фильтрации.

Эй, стрим, фильтруй ивенты!

Т.к. мы смотрим на все это, как на бесконечные таблицы, то и использовать нам пристало операции из DataFrame, а не RDD-подобные. Посему выбираем .where() и оставляем дишь те ивенты, где значение кратно 10.

   val result = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
      .where("value % 10 == 0")

Да и если заглянуть в веб-мордочку нашего Spark, а точнее - на закладку SQL, можно найти DAG, генерируемый для очередной джобы. Из 3000 строк получается 300, а затем выполняется проекция для вывода на экран. Сгенерированные планы прилагаются.

Но достаточно неудобно заходить в план, чтобы узнать количество отпроцессеных ивентов.

Кажется, на DataFrame была какая-то функция вроде .count()

Подсчет количества элементов во входящем потоке

Лихо пишем код и пытаемся его скомпилировать.

    val result = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
      .where("value % 10 == 0")
      .count()

Однако, компилятор обязательно расскажет, что стрим из такого кода уже не получится.

Structured Streaming - штука пока экспериментальная и не все сочетания операций тут возможны и имплементированы. Спасибо Scala - мы узнаем об этом на этапе компиляции.

При помощи лишнего оператора groupBy можно добиться взаимоности от стрима и таки получить необходимый результат.

    val result = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
      .where("value % 10 == 0")
      .groupBy()
      .count()

Все скомпилируется, но в рантайме упадет с Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

Перекрашиваем стрим

Что за outputMode, что за watermark? Никто нас об этом не предупреждал.

Про "режимы вывода" и "водяные знаки" будет говорено далее, а пока просто попробуем переключиться в CompleteMode - эта настройка задается как параметр writeStream

    val writer = result.writeStream
      .trigger(ProcessingTime(3000))
      .outputMode("complete") <<== вот тут мы задаем OutputMode (по умолчанию стоит append)
      .format("console")
      .start()

Однако, после запуска выясняется, что результат монотонно возрастает. Да-да, CompleteMode выдает нам число ивентов, прошедших через него со старта джобы. Эдакий нарастающий итог. Иногда - это то, что нужно. Но сейчас мы хотели решить немного другую задачу (и на DStreams она решалась очень просто).

Проблемы решена частично, но еще любопытнее, как работает подсчет нарастающего итога.

А работает он через сохранение состояния и агрегации вновь вычисленного значения с предыдущим.

Ну и результат снова сохраняется для последующих объединений.

При этом любопытно, как простой логический план превращается в достаточно сложный физический план исполнения.

Count в AppendMode, пожалуйста!

Да, мы тут замечтались с этими картинками и ациклическими направленными графами вычислений, пора и честь знать.

Тут придется поработать глубоко и подробно с временными интервалами. Необходимо задать величину окна, за которое будут агрегироваться ивенты (выбрать равным triggerTime), кроме того для всех агргационных действий надо задать watermark, который можно установить равным 0 (чтобы упростить расчеты).

Но откуда взять timestamp? Благодаря чтению из Kafka, у нас он имеется и называется точно также.

Итоговый код выглядит так:

import org.apache.spark.sql.functions.window

val result = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
      .as[(String, String, Timestamp)]
      .where("value % 10 == 0")
      .withWatermark("timestamp", "0 seconds")
      .groupBy(window($"timestamp", "3 seconds") as 'date)
      .count()


    val writer = result.writeStream
      .trigger(ProcessingTime(3000))
      .outputMode("append")
      .format("console")
      .start()

    writer.awaitTermination()

Чтобы вся эта шарманка крутилась Spark-у требуется построить весьма серьезное дерево вычислений

Мы видим странный оператор Expand, удваивающий количество строк (которые впрочем снова фильтруются и возвращаются к норме). Затем данные подгатавливаются для агрегации и передаются на вывод. Т.к. это appendMode, то и логика работы с состоянием здесь иная.

Итоговый набор планов велик и страшен, но пытливый спарковод, помня предыдущие стадии, сможет прочесть происходящее.

Положа руку на сердце, хочется закутаться в теплый пледик и убежать в мир Storm или DStreams.

Но самые отважные смогут продолжить путешествие дальше сквозь пока еще не зрелый, экспериментальный API.

results matching ""

    No results matching ""