Публикация данных в Kafka

Написать простой поставщик данных в Kafka несложно, но давайте определимся с тем, куда он будет писать, что он будет писать и какими порциями. Все эти параметры могут быть настроены программно при создании экземпляра KafkaProducer

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("batch.size", 65536);
props.put("buffer.memory", 10000000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Так как во всех примерах Kafka поднята локально, на порту по умолчанию, то первый параметр очевиден. Далее мы конфигурируем размер батча и буфера в памяти, на который Producer может рассчитывать.

Кроме того, для пары ключ-значения нужно указать классы из пакета Kafka ответственных за сериализацию значений.

Самое время объявить и инициализировать нашу черную дыру, попутно отправив туда чумодан с пропертями:

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Пришло время, используя producer, отправить наши данные (используя очередное значение счетчика в качестве ключа и значения)

 String topicName = "messages";

 for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i)));

 }

В качестве входного параметра нужно передать имя топика (хотя данные будут писаться в конкретную партицию), в нашем случае это имя "messages".

Расширенный контроль над происходящим

Когда необходимо контролировать, что и куда было отправлено, какой offset был сгенерирован, стоит создать и передать в качестве параметра метода .send() свой собственный Callback с методом .onCompletion().

А уже через геттеры параметра metadata можно достать много интерсной информации, среди которых есть и номер партиции, в которую реально были записаны данные, а также timestamp, который нам еще пригодится в будущем.

 producer.send(new ProducerRecord<>(topicName, Integer.toString(i), Integer.toString(i)), 
                    (metadata, exception) -> System.out.println("Topic: " + metadata.topic() +
                            " offset: " + metadata.offset() +
                            " partition #: " + metadata.partition() +
                            " timestamp: " + metadata.timestamp()));

Можно обработать и исключительную ситуацию, но это уже на любителя.

Тотальный контроль над записью

Под контрольным тоталем предлагается понимать возможность жестко указывать в какую партицию топика нужно писать (и тут мы подбираемся к использованию наших знаний о внутреннем устройстве Kafka).

Для этого нам нужно добавить специальный класс, воплощающий интерфейс Partitioner и передать метаинформацию о нем в качестве параметра.

Причем этот класс может содержать в себе весьма сложную условную логику маршрутизации данных в зависимости от топика и самих данных, которые приходят в метод .partition().

public class UnfairPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if(topic.equals("messages")){
            if(Integer.valueOf((String) key) % 10 == 2) {
                return 0;
            } else {
                return 1;
            }
        }
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

В этом примере 1/10 часть данных отправляется в первую партицию, 9/10 - во вторую, а в третью и последующие не отправляется ничего.

Параметр, в которым мы переопределим отвественного за маршрутизацию выглядит следующим образом:

props.put("partitioner.class", UnfairPartitioner.class);

Запустив пример с выводом на экран метаданных, можно убедиться, что данные теперь иначе расбегаются по партициям.

results matching ""

    No results matching ""