JOIN для бедных

Spark предоставляет широкий выбор для соединения вместе двух датасетов, предназначенных друг другу, и правым и левым и внутренним и декартовым и многими другими способами. По сравнению с мукой MapReduce по воплощению соединений, сделать join в Spark - 1 строчка кода. Но это все касается "статических", иммутабельных датасетов.

Во вселенной Structured Streaming предполагается иметь ту же широкую поддержку, но всему свое время. Пока мы имеем на руках INNER JOIN (и немного RIGHT OUTER JOIN).

Предположим, что мы хотим обогатить наши key-value пары, прибывающие из Kafka, небольшим географическим датасетов (я страсть как люблю географические данные, только и дай мне ими что-нибудь обогатить).

  case class Country(key: String, country: String) // наш маленький бин

  import spark.implicits._ // данный импорт необходим для возможности вызова функции toDS() на коллекции 

  val dictionary = Seq(Country("1", "Russia"), Country("2", "Germany"), Country("3", "USA")).toDS()

Ключа всего три, а в потоке из Kafka, как помнится, их десять. Т.е. не всем записям улыбнется удача в случае INNER JOIN.

Соединяем по полю key, присутствующем в стриме и статическом датасете.

    val result = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
      .join(dictionary, "key")

Ситуация довольно типичная: небольшой (сравнительно с данными, текущими через стрим) статический датасет нужно соединить с много большей по размерам таблицей. Одной из простейших техник оптимизаций в MapReduce была возможность разослать "небольшой" датасет по всем машинам кластера и произвести на каждой из них соединение локально, до стадии Shuffle. Как мы увидим в DAG для данного примера, похожая техника используется и в Spark, оставаясь скрытой для глаз.

3 строки были посланы сквозь сеть и поучаствовали в соединении на стороне стрима.

В физическом плане можно разглядеть оффсеты, топики и партиции, откуда производится чтение данных в рамках этого маленького батча в течение трех секунд.

Правое соединение

Вопреки документации в версии 2.2 не поддерживается. См. класс UnsupportedOperationChecker

Объединение двух стримов

На момент написания статьи и версии 2.2 - не поддерживается.

results matching ""

    No results matching ""