Producers
Producers accept arbitrary byte entries from callers, buffer them in memory, and periodically flush them as binary data batches to object storage. Each producer exposes a minimal API:WriteHandle returned by produce() contains a DurabilityWatcher.
Callers can await_durable() to block until the entries have been
persisted to object storage and enqueued in the manifest. This avoids data
loss during failover, but it does not provide read-your-own-writes
consistency — durability refers to the data batch, not to the database
write.
Consumer
The consumer is the read-side counterpart to the producers. It iterates over data batches in object storage and delivers them to the database in the order recorded in the queue:next_batch() reads the next location in the queue manifest,
fetches the data batch from object storage, and returns the data,
metadata, and a unique sequence number to the caller. Sequence numbers
are assigned by the queue producer at ingestion time in increasing order
without gaps, and they are used both for acknowledgement and for tracking
progress. Acknowledgements must be in order: acknowledging a sequence
number that is not immediately after the last one is rejected, ensuring
that no batch is silently skipped.
While multiple producers can write to a queue manifest, only one consumer
can read from it. This matches OpenData’s single-writer paradigm and
prevents zombie consumers — surviving after an infrastructure failure —
from acknowledging batches and making them invisible to the active
consumer. Single-consumer exclusivity is enforced by an epoch stored in
the queue manifest: when a consumer starts, it reads the epoch,
increments it, and writes it back. Reads with a stale epoch are rejected,
so any zombie consumer is fenced as soon as a new one takes over.
The consumer is also responsible for cleaning up processed data batches.
At startup and every 100 acknowledged batches, all entries for
acknowledged batches are dequeued from the manifest. A dedicated garbage
collector task then deletes the corresponding data batches from object
storage.
The consumer API supports exactly-once delivery. To achieve it, the
writer atomically persists the batch data and the sequence number from
next_batch() to the database. If the atomic write succeeds but a failure
occurs before ack() returns, a new writer can construct a new consumer
with that last persisted sequence number so it resumes right after it. The
new consumer also bumps the epoch, fencing the old one and preventing any
duplicate writes.
Read-ahead and batched acks
The serialnext_batch / ack API caps a consumer at one manifest read
and one ack write per batch. Fetching a batch is an I/O wait; decoding it
is CPU work; and writing it to another system is yet more I/O work. Doing
everything on one task in a tight loop wastes both the
object-store bandwidth and the machine’s cores. For high-throughput
consumers, Consumer exposes a parallel-fetch path:
next_batch / ack API still works without changes:
next_batch is a thin wrapper over next_descriptors(1) plus
fetch_descriptor, and ack keeps its amortize-every-100-calls behavior.
How the split works
next_descriptors(max) reads the manifest once and returns up to max
contiguous descriptors past an internal read-ahead cursor. It does no
object-store fetch and does not mutate the durable ack frontier.
ConsumerFetchHandle is a cheap, cloneable handle obtained from
fetch_handle(). It holds an Arc to the object store and the manifest
path. Its single method, fetch(&self, descriptor) -> Result<ConsumedBatch>,
fetches and decodes one batch, touches no Consumer cursor, and is safe
to call concurrently from many tasks. Cloning the handle is O(1).
ack_through(sequence) advances the durable ack frontier through
sequence in one dequeue write, no matter how far the frontier moves.
The dequeue runs first; in-memory state and the buffer.acks counter
advance only on success, so a fence or storage error leaves the
consumer’s state untouched and the call is safe to retry.
Pipelined consumer pattern
One task owns theConsumer and the manifest; N fetch workers each hold
a clone of the handle and pull descriptors off a bounded channel:
Descriptor handout contract
next_descriptors advances an internal last_handed_out_sequence
before the caller has fetched the objects. Read-ahead is the point.
That places one obligation on the caller:
ack_through(n) does not verify that anything below n was processed.
It unconditionally advances the durable frontier to n, rejecting only
n <= last_acked_sequence. The caller must track which sequences have
actually been processed and only call ack_through with the highest
fully-processed contiguous sequence. Fetches may complete out of order;
ack the contiguous-complete watermark, never past a still-pending
sequence.
If a descriptor is permanently lost (a worker panics, a channel send is
dropped), the frontier stalls at that sequence. Recovery is process
restart: Consumer::new(config, last_acked_sequence) re-emits everything
the durable frontier has not acked.
Fencing
next_descriptors returns Error::Fenced on the next manifest read
after fencing. A worker holding a stale handle keeps fetching
successfully against object storage; the manifest owner learns of the
fence and propagates it, typically by closing the descriptor channel.
ack_through against a fenced manifest returns Error::Fenced and
leaves state untouched, so a fenced consumer cannot advance the durable
frontier.
When to use which API
The serialnext_batch / ack API stays the right choice for
low-throughput single-task consumers (the timeseries and vector buffer
consumers use it) and for backfill tooling where a tight per-batch ack
loop is simpler than the pipeline.
Reach for the read-ahead path when consumer throughput is bounded by
manifest round-trips (one next_batch cycle per batch caps throughput
at 1 / manifest_RTT), when fetch I/O and decode CPU need to overlap
across batches, or when the workload is bursty enough that an in-flight
bytes ceiling fits better than per-batch flow control.
The opendata-contrib generic ingest runtime is one pipelined consumer
built on these primitives. Full design rationale, the descriptor handout
contract in detail, and the failure-mode catalog live in RFC 0003:
Consumer Read-Ahead and ack_through.