4. Encoding and Evolution
One of the core requirements for data systems is evolvability. Features are added over time, requirements are better understood, or business needs change.
Data models and schema flexibility: Changes to the application can require a change to the underlying data.
- Relational databases follow a “one schema rule”. Although the schema can be changed, only one schema is enforced at a given time.
- Schema-on-read databases do not enforce a fixed schema. This allows the database to contain a mix of old and new data formats written at different times.
In large systems, it is typical to roll out features in stages. This leads to situations where different versions of code must talk to each other.
- Server-side rolling upgrades: The new version is rolled out to a few nodes at a time. After checking whether the new version is running smoothly, it is gradually rolled out to all the nodes.
- Client-side updates: The client may not update their apps for weeks or months after the schema change.
Because multiple versions of the code and data coexist, it requires maintaining compatibility in two directions:
- Backwards compatibility: Newer code can read data written by older code. This is simpler since new code is written with knowledge of the old format.
- Forwards compatibility: Older code can read data written by newer code. This is more difficult because it requires the old code to gracefully ignore additions made by a version of the code that didn’t exist when it was written.
Encoding Data
Programs typically work with data in two distinct forms:
- In-memory: Data is kept in objects, structs, lists, arrays, etc. These are optimized for efficient access by the CPU using pointers.
- Byte sequences: When writing to a file or sending data over a network, data must be encoded as a self-contained sequence of bytes. The original pointers only make sense within a specific process and are not useful to other processes.
The translation from the in-memory to the byte sequence representation is called encoding, and the reverse is decoding.
Language-Specific Formats
Most modern languages have built-in encoding libraries (java.io.Serializable, pickle in Python, Marshal in Ruby). These are convenient, but it is a bad idea to use them in large applications:
1. Language lock-in: The encoding is often tied to a specific language, making it difficult to integrate with systems written in other languages.
2. Security: To restore data into objects, the decoder must be able to instantiate arbitrary classes. This is a classic security problem because attackers can potentially instantiate dangerous classes and achieve remote code execution.
3. Neglected Versioning: Forwards and backwards compatibility is typically not considered in these libraries.
4. Efficiency: These formats are often slow and produce bloated, large byte sequences (Java’s built-in serialization is a classic example).
Textual Formats: JSON, XML, CSV
JSON, XML, and CSV are standardized, human-readable encodings that are widely supported. However, they have subtle flaws:
1. Numerical Ambiguity: XML and CSV cannot distinguish between a number and a string of digits without an external schema.
- JSON distinguishes numbers from strings but does not distinguish integers from floating-point numbers and lacks a specified precision.
2. Binary Strings: JSON and XML support Unicode text well but do not natively support binary strings (sequences of bytes).
- Developers work around this by using Base64 encoding, which increases the data size by 33%.
3. Schema Complexity: While JSON and XML have optional schema support, they are very complicated. Most applications hardcode the logic for interpreting data types correctly instead.
- CSV doesn’t have any schema, leaving it up to the application to define row and column meanings. This can lead to some inconsistent parsing; e.g. what if the values contain commas or newlines?
For cases where human readability is less important than performance, there are binary encoding variants for the textual formats:
- JSON: MessagePack, BSON, BJSON, UBJSON, BISON, etc
- XML: WBXML, Fast Infoset
However, most of these formats are schemaless, which forces them to store every field name alongside the data values in the encoded byte sequence. This overhead significantly restricts how much you can actually compress the data, often resulting in only a small space reduction compared to the original text.
Thrift & Protocol Buffers
Apache Thrift and Protocol Buffers (protobuf) are binary encoding libraries that use a schema to define the data structure before any encoding happens.
message Person {
required string user_name = 1;
optional int64 favorite_number = 2;
repeated string interests = 3;
}
After defining the schema, the protobuf compiler will use the schema to generate new source files in the target language. These files contain methods that know how to turn an object into the correct binary sequence.
One of the major benefits of having a schema is that it doesn’t require storing the field name alongside the data values. Instead, the schema contains field tags that act as a compact alias for the fields.
However, the downside is that there are specific rules for how the datamodel can change over time.
- You can add new fields as long as they have new tag numbers. Old code will ignore tags it doesn’t recognize (forwards compatibility).
- New code can read old data as long as the existing tag numbers do not change. However, any new field you add must be optional or have a default value.
- If not, then new code would fail when trying to read old data that lacks the field.
- You can only remove optional fields, and you must never use that specific tag number again to avoid confusing newer code.
Avro
Apache Avro is another binary encoding format that is significantly different from protobuf and Thrift. It also uses a schema to specify the structure of the data being encoded, but it has no tag numbers.
- The benefit of this is that the Avro binary encoding is the most compact out of the other encodings discussed.
- This is also much more friendly to dynamically generated schemas, since it doesn’t require manually updating the field tags (like in protobuf).
- As a result, the reader must go through the fields in the exact order they appear in the schema.
Because of this, it requires thinking more carefully about schema evolution.
- The writer’s schema is the schema the application used when it encoded the data.
- The reader’s schema is the one the application expects when it decodes the data.
The schemas don’t have to be the same: they only need to be compatible. At that point, the Avro reader will read both schemas side by side and resolve the differences.
- Writer and reader can have schemas in different orders, since schema resolution will resolve the fields by field names.
- If a field appears in the writer’s schema but not the reader’s schema, it is ignored. If the code reading the data expects some field, but the writer’s schema does not contain a field of that name, it is filled in with a default value (declared in the reader’s schema).
There are a few rules to maintain forwards and backwards compatibility:
- You can only add or remove a field that has a default value.
- Changing a datatype is possible if the reader can resolve the writer’s type into its own. This is governed by type promotion; e.g. widening from
inttolong.- This is backwards compatible (
intin the old writer version can be converted tolongin the new reader version), but not forwards compatible (longin the new writer version can’t be safely casted tointin the old reader version).
- This is backwards compatible (
- Changing a field name is possible but requires an alias.
- For example, if we rename a field from
loginIDtousername, we can redefine the field asusernamebut add an alias ofloginID. - This is backwards compatible: the reader schema looks for
username, sees that it is not present in the writer schema, then looks atloginIDto decode the data. - This is not forwards compatible: the reader has no knowledge of
usernameor the alias. The writer schema will containusername, it will not know what to do and fail (unless there is a default).
- For example, if we rename a field from
- Adding a branch to a union type is also backwards compatible but not forwards compatible. This is required if you want to allow a field to be
nullsince that is not an acceptable default in Avro.
Practically, we need an approach to send the writer’s schema to the reader. It is not an option to send the full schema with every record since this would ruin the space efficiency of the binary encoding.
Depending on how the data is being moved, there are a few approaches:
- Large files (object container files): The writer includes the full writer’s schema once in a header at the beginning of a file.
- The file becomes self-describing and the overhead is negligible when spread across millions of records.
- Databases with individual records: Because the records are being written at different times by different versions, they might use entirely different schemas.
- The writer prepends a small version number or schema hash to every record.
- Using the version number, the reader looks up the corresponding schema in a central Schema Registry to decode the record.
- Network connections (RPC): During the initial connection handshake, the two processes negotiate which schema they will use.
Dataflow Models
We talked about ways to encode/decode data but we haven’t discussed how the data actually flows from one process to another. This is a dataflow model.
Dataflow through Databases
A sender writes to the database, and the receiver reads from the database later.
During a rolling upgrade, or an environment where multiple versions of the service access the same database, we will have newer code and older code running in parallel. These can lead to the following read-modify-write trap:
- A newer version of the code adds a field and writes it to the database.
- An older version of the code reads that record.
- The older code decodes the record into an in-memory object, modifies a different field, and writes the object back to the database.
- Because the older code’s model didn’t include the new field, it is often lost during the re-encoding process.
To avoid this, databases require both directions of compatibility:
- Backwards compatibility: Current code must be able to read data written by previous versions of the code.
- Forward compatibility: A record written by a new version of the code might be updated by an older version still running in the cluster.
- Encoding formats (like Avro, Protobuf) support preserving unknown fields, but the application code must be carefully designed to keep those unknown fields intact during a read-modify-write cycle.
Dataflow through Services: REST and RPC
When processes need to communicate over a network, the most common arrangement is the client-server model. In this setup, servers expose an API (the service), and clients connect to perform requests.
Web Services
When HTTP is the underlying protocol, it is called a web service. There are two styles:
- REST (Representational State Transfer): A design philosophy, not a protocol, that uses standard HTTP features like URLs, cache control, and authentication. It uses simple formats like JSON.
- SOAP: An XML-based protocol that aims to be independent of HTTP. It uses WSDL (Web Services Description Language) for code generation, which is useful in statically typed languages but complex to manage.
Flaws with the RPC model
RPC frameworks (gRPC, Thrift, Avro) attempt to make a remote network request look like a local function call within the same process (location transparency). However, this is fundamentally flawed, because network requests differ drastically from local calls:
- Unpredictability: A local call either succeeds or fails based on your parameters. Network requests can fail or be lost due to external factors outside your control.
- Timeouts: A network request may return without a result due to a timeout, leaving you unsure if the request actually reached the server or not.
- Retries: If you retry a failed request, you might perform the action twice (unless you build in idempotence)
- Latency: Local calls have predictable execution times, whereas network latency is wildly variable depending on congestion and server load.
- Parameters: You can pass references to local objects efficiently. For RPC, all data must be encoded into a sequence of bytes, which becomes problematic for large objects.
Evolution
For the system to evolve, clients and servers must be deployable independently.
- We usually assume that all servers will be updated first, and then all the clients.
- This means that we only need backwards compatibility on requests, and forward compatibility on responses.
- The provider of a service cannot force the users to upgrade, so compatibility needs to be maintained for a long time.
- If a breaking change is unavoidable, service providers must maintain multiple versions of the API.
Message-Passing Dataflow
The middleground between RPC and databases are asynchronous message-passing systems. Instead of a direct network connection, the sender transmits a message to a message broker.
Message Brokers
Using a broker (Kafka, ActiveMQ, NATS) offers several benefits over direct RPC:
- Reliability through buffering: The broker acts as a buffer if the recipient is temporarily unavailable or overloaded, which prevents the sender from crashing or losing data.
- Automatic redelivery: If a consumer process crashes, the broker can redeliver the message once the process restarts, preventing data loss.
- Network decoupling: The sender doesn’t need to know the specific IP address or port of the recipient; it only needs to know the topic or queue name.
- Fan-out capability: A single message can be published to a topic and consumed by multiple different services simultaneously.
- Logical decoupling: The publisher simply sends the message and doesn’t need to care who is consuming it.
Distributed Actor Frameworks
The actor model is a programming model for concurrency in a single process. Instead of managing threads, locks, or shared memory, logic is divided into actors:
- State isolation: each actor maintains its own local state that is not shared with any other actor.
- Single-threaded processing: An actor processes only one message at a time, eliminating race conditions within the actor’s logic.
- Asynchronous messaging: Actors communicate exclusively by sending and receiving asynchronous messages.
A distributed actor framework is a system that takes the actor model and extends it so that it works across multiple nodes in a network. In particular, a framework becomes distributed when it provides location transparency.
- The framework handles network complexity. When Actor A sends a message to Actor B, it uses the same code regardless of the location of Actor B.
- If the actor is remote, the framework transparently encodes the message into bytes, sends it over the wire, and decodes it on the other side.
- These models usually assume messages can be lost (even locally), and have strategies to handle resilience.
For evolution, a distributed actor framework integrates a message broker and the programming language into one package. However, this comes with the same compatibility hurdles:
- If you perform a rolling upgrade, a node running new code might send a message to a node running old code.
- You must choose an encoding format that supports backward and forward compatibility, or your actors will crash when they receive a message from a different version.