Eigene Spring Boot Kafka Anwendungen

“Walked out this morning, I don’t believe what I saw
Hundred billion bottles washed up on the shore
Seems I’m not alone at being alone
Hundred billion castaways, looking for a home”

The Police

Im ersten Kafka Beitrag ging es um die generelle Arbeitsweise und Architektur des Systems. In diesem Beitrag um die Integration in eine eigene Spring Boot Anwendung. Dieser Beitrag wird recht kurz, weil die Integration tatsächlich sehr einfach ist.

In diesem Beispiel soll der Stammbaum Verwaltung aus früheren Beiträgen eine Hauch von Event-Driven Architecture verliehen werden. Dazu er hält der Stammbaum-Importer die Fähigkeit, für die Personen aus einem Stammbaum, Kafka-Events zu erzeugen. Außerdem soll ein DSGVO Service diese Events empfangen und die Personendaten auf etwaige DSGVO Anforderungen prüfen können.

Für die Kafka Unterstützung im eigenen Projekt sorgt spring-kafka, dass über eine Dependency in das eigene Maven Projekt gelangt.

 <dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Damit der Staumbaum-Importer eigene Kafka-Events erzeugen kann, benötigt er zunächst eine passende Konfiguration.

@EnableKafka
@Configuration
public class KafkaProducerConfig {
  @Bean
  public KafkaTemplate<String, AncestorEvent> ancestorTemplate(ProducerFactory<String, AncestorEvent> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
  }

  @Bean
  public ProducerFactory<String, AncestorEvent> producerFactory(KafkaProperties properties, ObjectMapper objectMapper) {
    JsonSerializer<AncestorEvent> jsonSerializer = new JsonSerializer<>(objectMapper);
    jsonSerializer.configure(properties.buildProducerProperties(), true);
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(), new StringSerializer(), jsonSerializer);
  }
}

Diese Konfiguration stellt eine ProducerFactory und ein KafkaTemplate bereit. Wer schon Erfahrungen mit Spring Boot sammeln konnte, ahnt vermutlich schon, dass mit dem KafkaTemplate die Events verschickt werden. Da verhält es sich ähnlich wie seine Cousinen JdbcTemplate und RestTemplate. Die generischen Typen des KafkaTemplate sind dabei die Typen für den Schlüssel und den Inhalt des Kafka-Events. Die ProducerFactory arbeitet im Hintergrund und erzeugt aus den Datentypen die tatsächliche Darstellung des Kafka-Events, die an den Kafka-Server gesendet wird.

Hier wird als Schlüssel ein String verwendet und mit einem StringSerializer umgewandelt. Der Inhalt der Nachricht ist eine Instanz vom Typ AncestorEvent, ein eher langweiliges POJO als Wrapper um die eigentliche Person. Für dieses POJO wird der JsonSerializer verwendet, der aus der Instanz eine JSON Darstellung produziert. Im Hintergrund wird dafür standardmäßig Jackson verwendet und so können alle kleinen Tricks verwendet werden, die Dr. REST kennt.

In den Application Properties muss dann nur noch die Adresse des Kafka-Servers eingetragen werden und dann kann das Versenden von Kafka-Events beginnen.

spring.kafka.producer.bootstrap-servers=localhost:9092

In diesem Beispiel versendet der Service KafkaEventSender für jeden Person im Stammbaum einen Kafka-Event. Dazu wird die Methode send verwendet mit dem Namen des Topics ancestor, der ID des Stammbaums als Schlüssel und einer Instanz von AncestorEvent als Inhalt.

@Service
public class KafkaEventSender {
  private final KafkaTemplate<String, AncestorEvent> ancestorTemplate;

  public KafkaEventSender(KafkaTemplate<String, AncestorEvent> ancestorTemplate) {
    this.ancestorTemplate= ancestorTemplate;
  }

  public void sendAncestorMessage(AncestorTree tree) {
    tree.getAncestors().stream().map(AncestorEvent::new).forEach(a -> ancestorTemplate.send("ancestors", tree.getId(), a);
  }
}

Dies alles geschieht hier in der Zeile 10 der Klasse KafkaEventSender und damit ist auch schon das Versenden von Kafka-Events demonstriert.

Damit ein anderer Spring Boot Service diese Kafka-Events empfangen kann, wird eine zweite Konfiguration benötigt. Hier wird symmetrisch zum Sender das Deserialisieren von Schlüssel und Inhalt des Kafka-Events für den Empfänger konfiguriert.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

  @Bean
  public LoggingErrorHandler errorHandler() {
    return new LoggingErrorHandler();
  }

  @Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AncestorEvent>>
  kafkaListenerContainerFactory(ConsumerFactory<String, AncestorEvent> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, AncestorEvent> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory);
    return factory;
  }

  @Bean
  public ConsumerFactory<String, AncestorEvent> consumerFactory(KafkaProperties properties, ObjectMapper objectMapper) {
    return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
        new ErrorHandlingDeserializer<>(new StringDeserializer()),
        new ErrorHandlingDeserializer<>(new JsonDeserializer<>(AncestorEvent.class, objectMapper, false)));
  }
}

Zusätzlich ist hier der ErrorHandlingDeserializer zu finden, der sich um Fehler bei dem Empfang der Nachrichten kümmert. Beide Seiten sind ja darauf angewiesen, dass die Nachrichten in einem Topic ein gemeinsames Format ausweisen. Es kann aber passieren, dass Nachrichten in einem Topic landen, die von den Empfängern nicht interpretiert werden können. Das kann durch eigene Implementierungsfehler oder durch boshafte Absender geschehen. Die hier dargestellte Konfiguration liest die fehlerhaften Nachrichten und loggt die aufgetretenen Fehler.

Damit ein Spring Boot Service einen Kafka-Event lesen kann, reicht dann die Annotation @KafkaListener an einer entsprechenden Methode. Diese benötigt minimal einen @Payload Parameter mit dem Typ des Inhalt des Kafka-Events.

@Service
@Slf4j
public class DsgvoAncestorListener {
  private DsgvoService service;
  
  public DsgvoAncestorListener(DsgvoService service) {
    this.service = service;
  }

  @KafkaListener(topics = "ancestor", containerFactory = "kafkaListenerContainerFactory")
  public void listPartner(@Payload AncestorEvent  event) {
      log.info("received ancestor event: " + event);
      service.check(event.getPerson());
    }
  }
}

In diesem Beispiel ist der Service zuständig für die DSGVO Prüfung der Personendaten. Die Person aus der empfangenen Nachricht wird dazu an den DsgvoService übergeben, der die tatsächliche Überprüfung übernimmt.

Auch für den Empfang eines Kafka-Events mit einer Spring Boot Anwendung ist also nicht wirklich viel zu implementieren.

Schreibe einen Kommentar