Читаем из Kafka

Для того, чтобы прочитать из Kafka и направить эти данные в Spark, надо сконфигурировать чтеца:

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: Boolean)
    )

Здесь присутствуют уже знакомые читателю классы для десериализации данных, адрес кластера Kafka, а также настройки, отвечающие за правила потребления из топика.

Затем задается конфигурация и поднимается контекст (справедливо для Spark 1.*).

    val conf = new SparkConf().setAppName("DStream counter").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))

После задается сам стриминг, которые будет запускать свой микробатчинг каждые 3 секунды. Стриминг будет запущен локально (ресивер и драйвер на одной машине).

    ssc.start()
    ssc.awaitTermination()

Дело за малым: создать прямой стрим прямо в недра Kafka.

    val topicName = "messages"
    val topics = Array(topicName)

    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))

И описать логику обработки каждой RDD, получаемой после каждого вздоха нашего стриминга.

    stream.foreachRDD {
      rdd => println("Amount of elems " + rdd.count)
    }

В примере на консоль выводится количество элементов в каждой RDD.

results matching ""

    No results matching ""