When you design your processing topology, Kafka Streams can infer the configuration of the required resources.
The inference seems to work at first; however, as time passes, you may encounter new changes in your logic and default configuration that somehow surprises you.
I wanted to explain here how this can be avoided and how to take care of some technical aspects of Kafka Streams.

If you’ve found that Apache Kafka and Kafka Streams are the right tools to solve your problem, you’ll need to read the documentation, experiment with the code, and then find the right moment to run and deploy.

Kafka Streams comes with handy data processing operators which are well wrapped into the DSL. So you can filter data, map one representation to another, route messages according to various criteria, and even aggregate and reduce the flow of information into time slices. Kafka Streams differentiate two types of operators: stateless and stateful. The latter require some kind of a store in which the state is saved, which is where the data dualism of Kafka Streams comes in. The state is saved in a local internal database as well in an internal Kafka Streams topic on a cluster.

These things usually work out-of-the-box. When you design your processing topology, Kafka Streams can infer the configuration of the required resources. The inference seems to work at first; however, as time passes, you may encounter new changes in your logic and default configuration that somehow surprises you.

I wanted to explain here how this can be avoided and how to take care of some technical aspects of Kafka Streams.

Lessons learned

We have already encountered such situations in our team. We were quite happy with a simple data flow which was expressed in Streams DSL. We configured only the necessary parts of intermediate topics or stores but later found that we had to change business logic. We did this as an intermediate step in our existing processing chain. This caused Kafka to build a new processing topology which didn’t fit well with the intermediate topic we already had filled with existing data. You can imagine the problems that this brought us.

Our initial stream processing looked like this:

(Some code omitted for brevity.)

Topology buildTopology() {
    this.streamsBuilder
        .addStateStore(Stores
            .keyValueStoreBuilder(Stores.persistentKeyValueStore("processedEntityID"),
                Serdes.String(), Serdes.serdeFrom(
                    new ProcessedEntitySerdes.ProcessedEntitySerializer(this.objectMapper),
                    new ProcessedEntitySerdes.ProcessedEntityDeserializer(this.objectMapper)
        )).withCachingEnabled());

    KStream<String, String> input = this.streamsBuilder
        .stream(topics.getInput(), Consumed.with(Serdes.String(), Serdes.String()));

    KStream<String, Tuple2<String, ProcessedEntity>> jointStream = input
        .transformValues(() -> new ProcessedEntityIdTransformer(this.clock),
            "processedEntityID");

    KStream<String, Tuple2<String, ProcessedEntity>>[] streams = jointStream.branch(
        this::deadLetterPredicate,
        (id, value) -> true);

    streams[1]
        .to(topics.getDeadletter(), Produced.with(
            Serdes.String(), Serdes.String()));

    KTable<String, Entity> entityTable = streams[0]
        .groupByKey()
        .aggregate(
            Entity::empty,
            this.aggregation(),
            Materialized.with(
                Serdes.String(),
                Serdes.serdeFrom(
                    new EntitySerdes.EntitySerializer(this.objectMapper),
                    new EntitySerdes.EntityDeserializer(this.objectMapper))));

    entityTable
        .toStream()
        .to(topics.getOutput(), Produced.with(
            Serdes.String(),
            Serdes.serdeFrom(
                new EntitySerdes.EntitySerializer(this.objectMapper),
                new EntitySerdes.EntityDeserializer(this.objectMapper))));

    return this.streamsBuilder.build();
}

We can see topology graph:

The API explicitly required us to name only one store, connected with the transform operator – processedEntityID. Kafka Streams used it to create a convention for local store names and backup Kafka topic with the derived name: application-processedEntityID-changelog.

In our topology, there was also a stateful operation _aggregate_ which needed a local store to act upon. By default, Streams gave it the name: KSTREAM-AGGREGATE-STATE-STORE-0000000007

It then assigned as a backlog topic: application-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog 

Finally, it created these internal topics in a cluster with default parameters:

Topic:application-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog	PartitionCount:16	ReplicationFactor:3
Topic:application-processedEntityID-changelog	PartitionCount:16	ReplicationFactor:3

As you can see, internal topics have a shared partition count and replication factor, derived from input and output topics. Any other topic configuration settings, like retention time, segment configuration or failover where not explicitly specified. This means that they share global cluster defaults. Moreover, the aggregate operation created a store and backup topic with a default name.

At some point later, we have to introduce some additional processing in our stream by adding some intermediate steps in the whole processing.

However, Kafka Streams calculated new topology and shifted the processor tasks a bit further:

The previous local store plus internal topic used for aggregate was called: KSTREAM-AGGREGATE-STATE-STORE-0000000007

I received a new name at this point: KSTREAM-AGGREGATE-STATE-STORE-0000000009.

The result was that our local store cannot be properly initialized from the already processed data after rolling out a new version of the application.

However, in Kafka Streams, there is a way to configure a state store for stateful operations.

We also configured internal Kafka topics for local stores, the configuration of which allows the handling of partial cluster failures (min.in.sync.replicas), proper data cleanup policy, or data log rolling behavior of the cluster.

We finally fixed our code by applying topic configuration and naming the stores properly.

As we didn’t do it properly at the very beginning, we notice a discrepancy between the step  KSTREAM-AGGREGATE-0000000009 and the previous store name KSTREAM-AGGREGATE-STATE-STORE-0000000007. In the future, we need to make sure to avoid this problem.

Always pass a fully-configured `Materialized` instance of your operation API:

Materialized
    .<String, Entity, KeyValueStore<Bytes, byte[]>>as("KSTREAM-AGGREGATE-STATE-STORE-0000000007")
    .withLoggingEnabled(HashMap.<String, String>empty()
        .put(TopicConfig.SEGMENT_BYTES_CONFIG, "104857600")
        .put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
        .put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
        .put(TopicConfig.RETENTION_MS_CONFIG, "604800000")
        .toJavaMap())
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.serdeFrom(
        new EntitySerdes.EntitySerializer(this.objectMapper),
        new EntitySerdes.EntityDeserializer(this.objectMapper))));

In similar way, you should also fully configure your local store by calling `StoreBuilder` methods:

Stores    .keyValueStoreBuilder(Stores.persistentKeyValueStore("processedEntityID"),
        Serdes.String(), Serdes.serdeFrom(
            new ProcessedEntitySerializer(this.objectMapper),
            new ProcessedEntityDeserializer(this.objectMapper)))
    .withCachingEnabled()
    .withLoggingEnabled(HashMap.<String, String>empty()
        .put(TopicConfig.SEGMENT_BYTES_CONFIG, "10485760")
        .put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
        .put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
        .toJavaMap());

The final code looks like this:

And a final topology takes this form:

Kafka internal topic inspection shows us the applied changes:

Topic:application-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog	PartitionCount:16	ReplicationFactor:3 Configs:segment.bytes=104857600,retention.ms=604800000,cleanup.policy=delete,min.insync.replicas=2
Topic:application-processedEntityID-changelog	PartitionCount:16	ReplicationFactor:3	Configs:cleanup.policy=compact,min.insync.replicas=2,segment.bytes=10485760

We can certainly say that we achieved a better design and learned some lessons along the way.

Summary

In most cases, the topology in Kafka Streams applications does not often change.

If it does, it is good to take countermeasures upfront as the consequences could be hard to overcome later on.

If you need Kafka Streams stateful operations, always consult Javadoc for the overloaded method with a ‘Materialized’ instance. And remember to configure it fully.

If you need local stores, fully configure your ‘StoreBuilder’.

Happy coding!

Thanks to

Disclaimer

code snippets built against Apache Kafka Streams version 1.1.1