Быстрый старт со Structured Streaming

В Spark 2.0 появился экспериментальный API под названием Structured Streaming. В версии 2.1 появились новые фичи, которые требуют пристального внимания. Несмотря на это, данный API до сих пор находится в версии alpha.

Давайте вместе попробуем понять, насколько удобно с ним работать.

Проект SparkKafka и его зависимости

Создадим sbt проект SparkKafka, где и попробуем в режиме песочницы возможности экспериментального API.

Начнем с зависимостей.

name := "SparkKafka"

version := "1.0"

scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-core_2.11"  % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11"  % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11"  % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.1.0"

Да, на момент написания статьи можно использовать и scala 2.12, но автор предпочел надежную(sic!) 2.11 и библиотечки, скомпилированные именно под эту версию.

Обратите внимание на именование зависимостей в соотвествии с версией Scala. Если вы решите это изменить, то вам может понадобиться другая зависимость spark-core_2.10, например. Если же вы захотите перейти на другую версию Spark-а, то нужно будет поменять только конец строки: "2.1.0" -> "2.2.0".

Потребитель данных (Consumer) из Kafka

Предположим, что кто-то сейчас заливает данными наш топик messages. Нам остается только подписаться на данный топик и наслаждаться приемом данных.

Начнем с импорта SparkSession - важного объекта, основной точки входа при работе со Spark-ом.

import org.apache.spark.sql.SparkSession

object KafkaConsumerWithStructuredStreaming {
  def main(args: Array[String]): Unit = {

Затем необходимо создать и сконфигурировать экземпляр SparkSession.

  val spark  = SparkSession.builder
      .master("local")
      .appName("SparkKafka")
      .getOrCreate()

Теперь можно приступать к строительству трубопровода из Kafka, например, на консоль. Сконфигурируем для начала источник.

  val stream = spark
      .readStream
      .format("kafka")   // тип источника
      .option("kafka.bootstrap.servers", "localhost:9092, <your_other_host>:9092") // URL до Kafka
      .option("subscribe", "messages") // ну и наш топик
      .load()

Предположим, мы просто хотим извлечь пары key-value для всех записей, которые падают в нашу потенциально-бесконечную таблицу (как бы состоящую из двух колонок key, value).

Для этого нам надо применить к этой таблице оператор select и преобразовать наши ints в строки, для чего нам явно пригодятся имплиситы.

   import spark.implicits._

    val result = stream
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]

Теперь самое время "записать" куда-то этот стрим, например на консоль.

За "запись" у нас отвечает оператор writeStream

    val finalization = result
      .writeStream
      .format("console")
      .start() // старт нашего стриминга, без этого ничего не поедет

    finalization.awaitTermination() // а без этого ничего не будет длится

Вот и готов у нас первый трубопровод для данных. Если данных будет течь по нему слишком много, оператор selectExpr будет выбирать только top-20 записей, дабы не перегружать консоль.

results matching ""

    No results matching ""