Kafka Key Compaction Alternative on GCP

Daniel Collins
Google Cloud - Community
7 min readJun 1, 2021

--

Martin Fowler described the “event sourcing” pattern more than 15 years ago. The idea is that software systems scale better when all changes to the system state are captured in a log and any service that needs a particular materialization of the state can compute it on the fly from the log. For example, if you have a series of updates to documents in your content management system (CMS), you can initialize the search indexes for the content using the history of these updates, rather than the latest state of the documents. This decouples the indexing system from the production database used by the CMS and keeps the load on the database down. This also lets you search through past versions, if the search service designers deem that useful.

This idea found a perfect match with Apache Kafka, which provided horizontally scalable log storage. However, even with horizontal scaling, reprocessing the entire history of a system is often impractical. This led to a critical insight in the design of Kafka: it supports compaction of the log, discarding older entries for a given key. In this article, we consider another practical way to implement event sourcing: the Firestore Listener API. To get into the details, it’s worth stepping back to the use case for event sourcing more precisely, as a state replication problem.

State replication use case

A way to think about event sourcing is as asynchronous state replication. In this model, as in event sourcing, a set of distributed clients build eventually consistent local copies of a central source of truth by tracking any updates. They may rely on a complete log, but this is an implementation detail. The purpose of this pattern is to ensure in-memory access to a mostly up-to-date copy of the source of truth, while efficiently being notified of any changes. An example of an application where this is critical is distribution of configuration to multiple instances of a distributed service.

Kafka key compaction

The state replication pattern can be implemented by utilizing Apache Kafka as an ordered log, retaining the history of values for each key stored. Apache Kafka is an open source distributed storage system that combines the real-time nature of messaging middleware with several features typical of databases. Many of the data transfer use cases can be implemented on GCP with low impedance using either Pub/Sub or Pub/Sub Lite, but database-like use cases can often be better solved using other fully managed GCP services.

When new clients need to initialize their replica of the state, they will start reading at the start of the log, apply every change that has ever happened to its local state, and continue listening for future updates as they arrive. Key compaction makes this access pattern more efficient by periodically purging older, redundant events from the backlog so new clients do not need to process them. However, all newly connecting clients still need to read every event in the backlog to determine the value for any given key.

Key compaction reduces the overhead of this access pattern when there are many messages. When key compaction is enabled, older messages with the same key (a user-supplied string) will be deleted at some point after a newer message with the same key is published. This allows clients to read the full backlog of messages to build up the state for every existing key, then see new messages as they are published to update their internal state.

Firestore listeners

Cloud Firestore is a scalable, fully managed NoSQL database. While it is quite different from Kafka, it exposes exactly the tailing semantics provided by a Kafka key-compacted topic with Firestore listeners. While Firestore is primarily used for mobile applications, Firestore listeners solve the asynchronous replication problem for data replication between services with a higher-level API than Kafka’s.

Firestore listeners enable clients to receive initial values of a set of documents and continuous updates as the documents are updated. In addition to the Kafka key compaction model where every client is interested in every document update, Firestore also allows for update filtering to only receive updates about entries you are interested in.

The state replication pattern is straightforward to implement using Firestore. When clients connect, they can create a listener for the set of keys they are interested in. The listener will begin delivering the current value for all those keys to the client, and will then send any updates to the set of keys that happen while the client is connected. When clients connect for the first time in a session, they will receive the full set of source data.

Firestore also does not require all clients to use the listener pattern. You can mix standard document queries with listeners without issue, if a particular use case would be better served by one or the other.

Implementation comparison

What follows will compare a user of Kafka key compaction and Firestore Listeners for disseminating some kind of configuration to all front ends of an application. Suppose we need to keep track of some set of attributes defining configuration of an object, such as a set of preferences expressed by a user. This could also be a set of revisions for a document in a content management system or counts and locations for items in a warehouse inventory. We model this as a map from user identifiers to a UserConfig protocol buffer (or protobuf) message. For efficiency, we want to keep a recent copy of this map loaded in the front end application memory. The following interface will be implemented using both systems:

interface ConfigCache extends AutoCloseable {
// Returns the configuration if it exists, or null if it does not.
// Does not block on I/O.
@Nullable UserConfig get(String user_id);
};

Kafka

In Kafka, this pattern can be implemented using a key-compacted user_configs topic representing the source of truth for all user configurations, with the user_id as their message key. When a configuration is updated, a new message will be written to the topic with the UserConfig protobuf message serialized as the payload field. In addition, to delete keys, a new message needs to be published with a special tombstone header since the Kafka topic always retains at least one record for every key.

Firestore

When using Firestore, a Listener can be used to implement this pattern. User configurations will be inserted as documents into the user_configs collection with the user_id as their document id. When a configuration is updated, a document will be written to the collection with the UserConfig protobuf message serialized as a payload Blob field.

Scalability

Kafka is well known for its horizontal scalability and configurability. Cloud Firestore scales with read traffic automatically, and offers both regional and multi-regional high availability instances. However, it has some limits and quotas where it may not be a fit on its own for all existing users of Kafka key compaction:

  • 10,000 document writes / 10 MiB/s write rate per database: This can be worked around by staging large payloads out of band (on GCS for example) and only putting the URL in the database.
  • 1 write per second per document (sustained over time): This should only affect users expecting to rapidly change individual keys. It should be noted that a high write rate per-key with asynchronous replication means the local cache will rarely be consistent with the source of truth, regardless of the system used to implement this pattern.
  • 1 million concurrent connections: This is far more than would be expected for most backend applications.
  • 1MiB maximum document size: This is the same as the default value for Kafka, but cannot be increased for Firestore. Smaller documents will result in lower latency when being sent to listeners.

Cost

This cost analysis for the config system described above assumes 10,000 records each changing once per day. Each configuration is assumed to be 1KiB in size, for a total configuration size of approximately 10 MiB. It also assumes that there are 1,000 servers listening for changes in this configuration, each of them restarting once a day to pick up new code. The total amount of document reads would be:

  • 1,000 * 10,000 once per day on each server’s restart
  • 1,000 * 10,000 on average spread throughout the day

This amounts to 20,000,000 documents read per day. Using the Northern Virginia regional pricing, this would cost $6.60 per day for reads. The storage cost would be negligible.

It should be noted that Firestore Listeners allow filtering of queries to the interesting subset, which reduces the cost of processing and access correspondingly. If, for example, a certain server only needs to have configurations loaded for each currently connected user, you can instead listen for changes to individual user configurations with:

db.collection("user_configs").document("some_user_id")
.addSnapshotListener(…);

Or you can filter a subset of configurations where the configuration is supposed to provide lower latency like:

db.collection("user_configs")
.whereEqualTo("is_vip_user", true)
.addSnapshotListener(…);

Conclusion

Google Cloud Firestore can be used to implement event sourcing patterns, similarly to Kafka. Rather than adopting an efficient way to consume a log, Firestore offers a higher level API for effectively the same operation: it allows a client to retrieve initial values for and listen for changes in a set of documents. Some added advantages are that it offers the flexibility of retrieving an arbitrary subset of keys or accessing the key state directly. The drawback of Firestore is primarily some of its scalability limits, when compared to Kafka. See Cloud Firestore documentation for more details on how to create a Firestore database.

--

--