Q: Как отключить надоедливые сообщения в логах?
A: Иногда Spark очень много чего выводит в логи, особенно это касается стриминга, где мы получаем однообразную информацию на каждом такте работы системы.
Уменьшить количество информации можно изменив уровень логгирования.
spark.sparkContext.setLogLevel("ERROR")
Опять же, всегда можно вернуться и на более детальный уровень, впрочем Spark предоставляет много интересных методов на своих датасетах, чтобы получить дополнительную отладочную информацию.
Q: Стриминг работает неравномерно, когда данные текут неравномерно, как-то можно это починить?
A: Когда работаешь через DStreams, то время такта задается на уровне конфигурации всего стриминга.
val ssc = new StreamingContext(conf, Seconds(3))
Что же происходит в StructuredStreaming?
По умолчанию ивенты/записи накапливаются во временных структурах данных на каждом такте и выталкиваются при достижении определенного порога.
Однако, при нерановмерном потоке данных использование ресурсов кластера будет тоже неравномерным, что не всегда требуется. Иногда нужно конфигурировать продолжительность временного диапазона, в рамках которого должно происходить "микроагрегирование".
Это можно сделать, конфигурируя конкретный стрим, например на запись, при помощи вызова метода .trigger()
val writer = result.writeStream
.trigger(ProcessingTime(3000))
.format("console")
.start()
Q: Примеры со стримингом тормозят на локальной машине, хотя все ядра, кроме одного не заняты, как их задействовать?
A: В отличие от классического Spark Core, где 1 ядра хватает для примитивной разработки на девелоперской машинке, в случае тестирования и отладки Spark Streaming нам нужно минимум 2 ядра - для драйвера и для ресивера, если же ресиверов планируется поднять больше, то и ядер следует выделить дольше.
val spark = SparkSession.builder
.master("local[2]")
или иной вариант для задействования всех логических ядер
val spark = SparkSession.builder
.master("local[*]")
Стало лучше? То-то же!