Как залить данные в Kafka?
Эта возможность появилась только в Spark 2.2, который на момент написания статьи доступен через ночные билды.
Поправим немного build.sbt
name := "Spark Streaming"
version := "1.0"
scalaVersion := "2.11.8"
resolvers += "ASF repository" at "http://repository.apache.org/snapshots"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0-SNAPSHOT"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.0-SNAPSHOT"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0-SNAPSHOT"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0-SNAPSHOT"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.2.0-SNAPSHOT"
Наша цель построить цепочку:
- запись в топик Messages =>
- чтение из этого топика =>
- обработка данных {join со статическим датасетом, группировка данных, сортировка,} =>
- загрузка данных в топик secondaryMessages =>
- чтение из этого топика в другом задании для Spark Cluster =>
- вывод на экран
Начнем с чтения из топика
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StringType
object Kafka_to_Kafka {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local[2]")
.appName("SparkKafka")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "messages")
.option("failOnDataLoss", "false") // полезная опция при работе с Kafka
.load()
Обработаем данные
import spark.implicits._
val dictionary = Seq(Country("1", "Russia"), Country("2", "Germany"), Country("3", "USA")).toDS()
val join = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.selectExpr("CAST(key as STRING)", "CAST(value AS INT)")
.join(dictionary, "key")
val result = join.select($"country", $"value")
.groupBy($"country")
.sum("value")
.select($"country".alias("key"), $"sum(value)".alias("value").cast(StringType))
.orderBy($"key".desc)
Настроим запись в Kafka
val writer = result.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "secondaryMessages")
.option("checkpointLocation", "/home/zaleslaw/checkpoints")
.outputMode(OutputMode.Complete())
.queryName("kafkaStream")
.start()
writer.awaitTermination()
Нам также понадобится указать checkpointLocation (выбирайте всегда отдельные директории для отдельных задач), что позволит нам приблизиться к заветной Fault Tolerance
Ну и запустим параллельную таску, которая будет черпать уже агрегированные данные из Kafka. Это мы уже делали в главе про быстрый старт.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime
object Kafka_to_Console {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local[2]")
.appName("SparkKafka")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "secondaryMessages")
.load()
import spark.implicits._
val result = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val writer = result.writeStream
.trigger(ProcessingTime(3000))
.format("console")
.start()
writer.awaitTermination()
}
}
Понятно, что теперь ваши возможности по конструированию цепочек [pipelines] из кубиков Kafka и Spark - практически ничем не ограничены.