main

ARTICLE

Streaming Data at MediaMath

September 30, 2020 — by MediaMath    

Welcome to MediaMath’s Tech Series, which features voices from our Engineering team, responsible for developing and maintaining our industry-leading tech stack. Check back every few weeks for technical deep dives from these expert engineers.

At MediaMath, an entity is defined as an object that can be uniquely identified and has a state which can change over time. The change in entity state drives business decisions for downstream consumers. The entity state is shared across multiple services and a consistent view for all consumer services is required.

MediaMath is a leading omnichannel demand-side platform that offers a rich set of capabilities for marketers to manage and execute their campaigns. Our omnichannel platform supports execution, management, targeting and reporting all in a centralized location. Under the hood, multiple services are at play that share entity state to achieve that uniform experience. Any change in one part of the flow can affect other aspects of the workflow handled by a different service.

To ensure that state is consistent across all services, we use an -compliant database to store entities. ACID guarantees the database avoids the problem of inconsistent views. All services read and write to a database that provides transactional guarantees. The records in the database can be safely considered the source of truth.

As MediaMath scaled, this source of truth potentially became a single point of failure. During heavy load, the database would experience delays serving requests to multiple clients. Adding read replicas and vertically scaling the database mitigated some bottlenecks, but we realized it was not the ideal solution.

CQRS: Command Query Responsibility Segregation

CQRS is a well-known pattern first described by Greg Young and well-explained in this article by Martin Fowler. It means having separate models to read information and update information, also called CommandQuerySeparation.

The CQRS pattern tends towards a micro-services architecture having separate services for reading and writing data. It decouples the responsibility between read and write service so that each service can be built independently to maximize efficiency. It is ideal for event-based architecture where decisions are taken only when an event occurs. Each service can be scaled independently. This can be huge when there is a disparity between reads and writes. Along with the benefits, there are some caveats especially related to the consistency of data. Most of the systems built on CQRS guarantee eventual consistency with some accepted latency.

After a period of assessment on our use-case, we decided a CQRS-based service would meet our needs to build the entity platform.

Enter `Changes`

Changes is MediaMath’s distributed entity platform. Based on the read model of CQRS, Changes facilitates the exchange of entity data across internal teams. All entity updates on the database are served to the consumers within an accepted latency through the platform. All updates are emitted as events that are available for consumption for any services subscribing to them.

Changes provides several guarantees:

  1. Durability: Messages are stored to non-volatile storage and can be accessed repeatedly.
  2. At least once delivery: If an event has been acknowledged, it will be delivered to all consumers who ask for it.
  3. Sequential consistency: Event order is maintained for all consumers.
  4. Globally distributed: Events will be available across the globe.

Changes was developed to solve multiple problems:

  1. Reducing the load on our legacy database deployment: As consumers of entity data move away from it, it reduces significant load on the database, thus reducing the risk of choking our single point of failure due to the read clients.
  2. Support multiple subscribers: Changes is a centralized, highly available platform that can support many subscribers reading concurrently to consume entity data.
  3. Scalability: Due to its distributed nature, as the number of subscribers or the entity data scales up, Changes is built to scale along by simply adding more nodes to the system.

Architecture

Changes is built on Kafka, a distributed log service optimized for low latency, high throughput systems and provides scalability and fault-tolerance right out of the box. The set of services that makes Changes are written in Go and heavily optimized to reduce latency in delivering entity data.

Changes offers two kinds of data:

  1. Event data or history: All events in the order they were committed to the database.
  2. Entity data or snapshot: Current state of the entity as on the database.

Both history and snapshot data are available as messages in Kafka topic. All consumers of entities are supposed to have their own consumer client to read data from those topics. We leverage the Confluent Schema Registry to manage schemas for all messages published to the topics.

There is also a separate daemon service that constantly monitors events on both topics to ensure consistency and accepted latency.

Event sourcing: history

The event data is stored in what we call history topics. A history topic contains the ordered stream of events. Kafka guarantees ordering only within a single partition. To ensure ordering across all entity types, we publish all updates to a single partitioned history topic. The history topic has a retention period of 30 days; hence, the ordered events go back as far as 30 days. Any update that occurred more than a month (30 days) back will be removed from the history topic.

To get the state of an entity at a certain point in time (within the last 30 days in our case), we can replay all messages from the beginning of the topic up to that point in time. Messages re-played from the beginning of a history topic cover a 30-day-old snapshot and should provide you the current state of any entity. The time concept in Kafka is record/message offset. This is useful for auditing events that happened in the past.

Entity state: snapshot

Entity snapshot is defined as the current state of the entity. Every entity is uniquely identified by a 64-bit integer entity ID. Think of it as a primary key of a record in a database. These are stored in snapshot topics, which will eventually keep only the last record for every entity ID.

The snapshot topics are defined per entity type, have multiple partitions and are log compacted. Every Kafka record has a key and a value. The key is the entity ID of the record. Log compaction deletes all stale entries for a particular key and keeps only the latest one. However, it is possible to have multiple records of the same key which have not been compacted yet. Any cache sourced from snapshot topics should only consider the last record for every key. The cache should be updated for the same key.

At any point, a snapshot topic contains the entire state of an entity type. This helps bootstrap new services or old ones that need a cache refresh. Simply consume all messages in a snapshot topic and build the cache.

Changes in production

Changes has been running in production at MediaMath since 2016 and has evolved to support large-scale updates. Changes observes 3.5 million updates per day on our database alone and 2.1 million updates pass through as captured events to be published downstream per day. The numbers are current over a seven-day rolling average.

Changes has strong latency guarantees for when the data would be available for consumption from the time it is committed on the database. Over the previous week, we are currently at 100% delivery under one hour, 89% under one minute and 81.8% within five seconds. This considers all factors including change data capture, network latency and synchronization in Kafka.