Categories
Mastering Development

Spring-cloud kafka stream schema registry

I am trying to transform with functionnal programming (and spring cloud stream) an input AVRO message from an input topic, and publish a new message on an output topic.
Here is my transform function :

@Bean
public Function<KStream<String, Data>, KStream<String, Double>> evenNumberSquareProcessor() {
    return kStream -> kStream.transform(() -> new CustomProcessor(STORE_NAME), STORE_NAME);
}

The CustomProcessor is a class that implements the "Transformer" interface.

I have tried the transformation with non AVRO input and it works fine.

My difficulties is how to declare the schema registry in the application.yaml file or in the the spring application.

I have tried a lot of different configurations (it seems difficult to find the right documentation) and each time the application don’t find the settings for the schema.registry.url. I have the following error :

Error creating bean with name ‘kafkaStreamsFunctionProcessorInvoker’:
Invocation of init method failed; nested exception is
java.lang.IllegalStateException:
org.apache.kafka.common.config.ConfigException: Missing required
configuration "schema.registry.url" which has no default value.

Here is my application.yml file :

    spring:
  cloud:
    stream:
      function:
        definition: evenNumberSquareProcessor
      bindings:
        evenNumberSquareProcessor-in-0:
          destination: input
          content-type: application/*+avro
          group: group-1
        evenNumberSquareProcessor-out-0:
          destination: output
      kafka:
        binder:
          brokers: my-cluster-kafka-bootstrap.kafka:9092
          consumer-properties:
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: http://localhost:8081

My spring boot application is declared in this way, with the activation of the schema registry client :

    @EnableSchemaRegistryClient
@SpringBootApplication
public class TransformApplication {
    public static void main(String[] args) {
        SpringApplication.run(TransformApplication.class, args);
    }
}

Thanks for any help you could bring to me.

Regards
CG

Leave a Reply

Your email address will not be published. Required fields are marked *