“Walked out this morning, I don’t believe what I saw
The Police
Hundred billion bottles washed up on the shore
Seems I’m not alone at being alone
Hundred billion castaways, looking for a home”
Im ersten Kafka Beitrag ging es um die generelle Arbeitsweise und Architektur des Systems. In diesem Beitrag um die Integration in eine eigene Spring Boot Anwendung. Dieser Beitrag wird recht kurz, weil die Integration tatsächlich sehr einfach ist.
In diesem Beispiel soll der Stammbaum Verwaltung aus früheren Beiträgen eine Hauch von Event-Driven Architecture verliehen werden. Dazu er hält der Stammbaum-Importer die Fähigkeit, für die Personen aus einem Stammbaum, Kafka-Events zu erzeugen. Außerdem soll ein DSGVO Service diese Events empfangen und die Personendaten auf etwaige DSGVO Anforderungen prüfen können.
Für die Kafka Unterstützung im eigenen Projekt sorgt spring-kafka
, dass über eine Dependency in das eigene Maven Projekt gelangt.
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Damit der Staumbaum-Importer eigene Kafka-Events erzeugen kann, benötigt er zunächst eine passende Konfiguration.
@EnableKafka @Configuration public class KafkaProducerConfig { @Bean public KafkaTemplate<String, AncestorEvent> ancestorTemplate(ProducerFactory<String, AncestorEvent> producerFactory) { return new KafkaTemplate<>(producerFactory); } @Bean public ProducerFactory<String, AncestorEvent> producerFactory(KafkaProperties properties, ObjectMapper objectMapper) { JsonSerializer<AncestorEvent> jsonSerializer = new JsonSerializer<>(objectMapper); jsonSerializer.configure(properties.buildProducerProperties(), true); return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(), new StringSerializer(), jsonSerializer); } }
Diese Konfiguration stellt eine ProducerFactory
und ein KafkaTemplate
bereit. Wer schon Erfahrungen mit Spring Boot sammeln konnte, ahnt vermutlich schon, dass mit dem KafkaTemplate
die Events verschickt werden. Da verhält es sich ähnlich wie seine Cousinen JdbcTemplate
und RestTemplate
. Die generischen Typen des KafkaTemplate
sind dabei die Typen für den Schlüssel und den Inhalt des Kafka-Events. Die ProducerFactory
arbeitet im Hintergrund und erzeugt aus den Datentypen die tatsächliche Darstellung des Kafka-Events, die an den Kafka-Server gesendet wird.
Hier wird als Schlüssel ein String verwendet und mit einem StringSerializer
umgewandelt. Der Inhalt der Nachricht ist eine Instanz vom Typ AncestorEvent
, ein eher langweiliges POJO als Wrapper um die eigentliche Person
. Für dieses POJO wird der JsonSerializer
verwendet, der aus der Instanz eine JSON Darstellung produziert. Im Hintergrund wird dafür standardmäßig Jackson verwendet und so können alle kleinen Tricks verwendet werden, die Dr. REST kennt.
In den Application Properties muss dann nur noch die Adresse des Kafka-Servers eingetragen werden und dann kann das Versenden von Kafka-Events beginnen.
spring.kafka.producer.bootstrap-servers=localhost:9092
In diesem Beispiel versendet der Service KafkaEventSender
für jeden Person im Stammbaum einen Kafka-Event. Dazu wird die Methode send
verwendet mit dem Namen des Topics ancestor
, der ID des Stammbaums als Schlüssel und einer Instanz von AncestorEvent
als Inhalt.
@Service public class KafkaEventSender { private final KafkaTemplate<String, AncestorEvent> ancestorTemplate; public KafkaEventSender(KafkaTemplate<String, AncestorEvent> ancestorTemplate) { this.ancestorTemplate= ancestorTemplate; } public void sendAncestorMessage(AncestorTree tree) { tree.getAncestors().stream().map(AncestorEvent::new).forEach(a -> ancestorTemplate.send("ancestors", tree.getId(), a); } }
Dies alles geschieht hier in der Zeile 10 der Klasse KafkaEventSender
und damit ist auch schon das Versenden von Kafka-Events demonstriert.
Damit ein anderer Spring Boot Service diese Kafka-Events empfangen kann, wird eine zweite Konfiguration benötigt. Hier wird symmetrisch zum Sender das Deserialisieren von Schlüssel und Inhalt des Kafka-Events für den Empfänger konfiguriert.
@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public LoggingErrorHandler errorHandler() { return new LoggingErrorHandler(); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AncestorEvent>> kafkaListenerContainerFactory(ConsumerFactory<String, AncestorEvent> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<String, AncestorEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(kafkaConsumerFactory); return factory; } @Bean public ConsumerFactory<String, AncestorEvent> consumerFactory(KafkaProperties properties, ObjectMapper objectMapper) { return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(), new ErrorHandlingDeserializer<>(new StringDeserializer()), new ErrorHandlingDeserializer<>(new JsonDeserializer<>(AncestorEvent.class, objectMapper, false))); } }
Zusätzlich ist hier der ErrorHandlingDeserializer
zu finden, der sich um Fehler bei dem Empfang der Nachrichten kümmert. Beide Seiten sind ja darauf angewiesen, dass die Nachrichten in einem Topic ein gemeinsames Format ausweisen. Es kann aber passieren, dass Nachrichten in einem Topic landen, die von den Empfängern nicht interpretiert werden können. Das kann durch eigene Implementierungsfehler oder durch boshafte Absender geschehen. Die hier dargestellte Konfiguration liest die fehlerhaften Nachrichten und loggt die aufgetretenen Fehler.
Damit ein Spring Boot Service einen Kafka-Event lesen kann, reicht dann die Annotation @KafkaListener
an einer entsprechenden Methode. Diese benötigt minimal einen @Payload
Parameter mit dem Typ des Inhalt des Kafka-Events.
@Service @Slf4j public class DsgvoAncestorListener { private DsgvoService service; public DsgvoAncestorListener(DsgvoService service) { this.service = service; } @KafkaListener(topics = "ancestor", containerFactory = "kafkaListenerContainerFactory") public void listPartner(@Payload AncestorEvent event) { log.info("received ancestor event: " + event); service.check(event.getPerson()); } } }
In diesem Beispiel ist der Service zuständig für die DSGVO Prüfung der Personendaten. Die Person
aus der empfangenen Nachricht wird dazu an den DsgvoService
übergeben, der die tatsächliche Überprüfung übernimmt.
Auch für den Empfang eines Kafka-Events mit einer Spring Boot Anwendung ist also nicht wirklich viel zu implementieren.