Skip to main content
Buffer is a library with two main components: multiple producers and a single consumer. Producers and the consumer communicate exclusively through a queue manifest in object storage — there is no direct network path between them and no stateful broker to operate.

Producers

Producers accept arbitrary byte entries from callers, buffer them in memory, and periodically flush them as binary data batches to object storage. Each producer exposes a minimal API:
pub async fn produce(
    &self,
    entries: Vec<Bytes>,
    metadata: Bytes,
) -> Result<WriteHandle>
Flushes are triggered either by elapsed time (default: 100 ms) or by batch size (default: 64 MiB). Data batches are binary and optionally compressed. Each batch is named with a ULID, which encodes a millisecond-precision timestamp. Once a batch is flushed, producers append its location and metadata to the queue. The queue is a single binary manifest in object storage that tracks the locations and metadata of data batches in append order. Producers interact with it through a queue producer, which uses compare-and-swap (CAS) writes to ensure that concurrent producers do not overwrite each other’s entries: an append succeeds only if the manifest has not been modified since it was read. On conflict, the queue producer re-reads the manifest and retries. Additionally, the queue producer assigns consecutive sequence numbers to queue entries. The manifest format is designed for append efficiency. Existing entries are never deserialized during append, so appending a new entry is O(1) in the length of the queue. The WriteHandle returned by produce() contains a DurabilityWatcher. Callers can await_durable() to block until the entries have been persisted to object storage and enqueued in the manifest. This avoids data loss during failover, but it does not provide read-your-own-writes consistency — durability refers to the data batch, not to the database write.

Consumer

The consumer is the read-side counterpart to the producers. It iterates over data batches in object storage and delivers them to the database in the order recorded in the queue:
pub async fn next_batch(&mut self) -> Result<Option<ConsumedBatch>>
pub async fn ack(&mut self, sequence: u64) -> Result<()>
A call to next_batch() reads the next location in the queue manifest, fetches the data batch from object storage, and returns the data, metadata, and a unique sequence number to the caller. Sequence numbers are assigned by the queue producer at ingestion time in increasing order without gaps, and they are used both for acknowledgement and for tracking progress. Acknowledgements must be in order: acknowledging a sequence number that is not immediately after the last one is rejected, ensuring that no batch is silently skipped. While multiple producers can write to a queue manifest, only one consumer can read from it. This matches OpenData’s single-writer paradigm and prevents zombie consumers — surviving after an infrastructure failure — from acknowledging batches and making them invisible to the active consumer. Single-consumer exclusivity is enforced by an epoch stored in the queue manifest: when a consumer starts, it reads the epoch, increments it, and writes it back. Reads with a stale epoch are rejected, so any zombie consumer is fenced as soon as a new one takes over. The consumer is also responsible for cleaning up processed data batches. At startup and every 100 acknowledged batches, all entries for acknowledged batches are dequeued from the manifest. A dedicated garbage collector task then deletes the corresponding data batches from object storage. The consumer API supports exactly-once delivery. To achieve it, the writer atomically persists the batch data and the sequence number from next_batch() to the database. If the atomic write succeeds but a failure occurs before ack() returns, a new writer can construct a new consumer with that last persisted sequence number so it resumes right after it. The new consumer also bumps the epoch, fencing the old one and preventing any duplicate writes.

Read-ahead and batched acks

The serial next_batch / ack API caps a consumer at one manifest read and one ack write per batch. Fetching a batch is an I/O wait; decoding it is CPU work; and writing it to another system is yet more I/O work. Doing everything on one task in a tight loop wastes both the object-store bandwidth and the machine’s cores. For high-throughput consumers, Consumer exposes a parallel-fetch path:
pub async fn next_descriptors(&mut self, max: usize)
    -> Result<Vec<BatchDescriptor>>;
pub async fn fetch_descriptor(&mut self, d: BatchDescriptor)
    -> Result<ConsumedBatch>;
pub fn fetch_handle(&self) -> ConsumerFetchHandle;
pub async fn ack_through(&mut self, sequence: u64) -> Result<()>;
The design batches the expensive low-volume operations (manifest read, ack) and parallelizes the high-volume operations (fetch, decode). The serial next_batch / ack API still works without changes: next_batch is a thin wrapper over next_descriptors(1) plus fetch_descriptor, and ack keeps its amortize-every-100-calls behavior.

How the split works

next_descriptors(max) reads the manifest once and returns up to max contiguous descriptors past an internal read-ahead cursor. It does no object-store fetch and does not mutate the durable ack frontier. ConsumerFetchHandle is a cheap, cloneable handle obtained from fetch_handle(). It holds an Arc to the object store and the manifest path. Its single method, fetch(&self, descriptor) -> Result<ConsumedBatch>, fetches and decodes one batch, touches no Consumer cursor, and is safe to call concurrently from many tasks. Cloning the handle is O(1). ack_through(sequence) advances the durable ack frontier through sequence in one dequeue write, no matter how far the frontier moves. The dequeue runs first; in-memory state and the buffer.acks counter advance only on success, so a fence or storage error leaves the consumer’s state untouched and the call is safe to retry.

Pipelined consumer pattern

One task owns the Consumer and the manifest; N fetch workers each hold a clone of the handle and pull descriptors off a bounded channel:
let consumer = Consumer::new(config, last_acked).await?;
let fetcher  = consumer.fetch_handle();

// N fetch workers, each holding a clone of the handle:
for _ in 0..fetch_concurrency {
    let f = fetcher.clone();
    let rx = descriptor_rx.clone();
    tokio::spawn(async move {
        while let Some(d) = rx.recv().await {
            let batch = f.fetch(d).await?;
            // ...hand off to a decode / sink stage...
        }
        Ok::<(), Error>(())
    });
}

// Owner task: pull a run of descriptors, fan them out, ack the
// contiguous high watermark.
loop {
    let descriptors = consumer.next_descriptors(K).await?;
    for d in descriptors { descriptor_tx.send(d).await?; }
    // ...wait for the contiguous-completed high watermark, then:
    consumer.ack_through(high).await?;
}
Owner and workers run in parallel because their state is disjoint: two concurrent fetches against distinct descriptors do not contend on the manifest.

Descriptor handout contract

next_descriptors advances an internal last_handed_out_sequence before the caller has fetched the objects. Read-ahead is the point. That places one obligation on the caller: ack_through(n) does not verify that anything below n was processed. It unconditionally advances the durable frontier to n, rejecting only n <= last_acked_sequence. The caller must track which sequences have actually been processed and only call ack_through with the highest fully-processed contiguous sequence. Fetches may complete out of order; ack the contiguous-complete watermark, never past a still-pending sequence. If a descriptor is permanently lost (a worker panics, a channel send is dropped), the frontier stalls at that sequence. Recovery is process restart: Consumer::new(config, last_acked_sequence) re-emits everything the durable frontier has not acked.

Fencing

next_descriptors returns Error::Fenced on the next manifest read after fencing. A worker holding a stale handle keeps fetching successfully against object storage; the manifest owner learns of the fence and propagates it, typically by closing the descriptor channel. ack_through against a fenced manifest returns Error::Fenced and leaves state untouched, so a fenced consumer cannot advance the durable frontier.

When to use which API

The serial next_batch / ack API stays the right choice for low-throughput single-task consumers (the timeseries and vector buffer consumers use it) and for backfill tooling where a tight per-batch ack loop is simpler than the pipeline. Reach for the read-ahead path when consumer throughput is bounded by manifest round-trips (one next_batch cycle per batch caps throughput at 1 / manifest_RTT), when fetch I/O and decode CPU need to overlap across batches, or when the workload is bursty enough that an in-flight bytes ceiling fits better than per-batch flow control. The opendata-contrib generic ingest runtime is one pipelined consumer built on these primitives. Full design rationale, the descriptor handout contract in detail, and the failure-mode catalog live in RFC 0003: Consumer Read-Ahead and ack_through.

Usage example

For an example of how Buffer can be integrated into an ingestion pipeline see ingest into Timeseries.