Skip to main content

Documentation Index

Fetch the complete documentation index at: https://opendata.dev/docs/llms.txt

Use this file to discover all available pages before exploring further.

Buffer is a Rust library. Configuration is expressed as plain Rust structs that the host service constructs and hands to the Producer and Consumer APIs. Two top-level structs govern the write and read sides: ProducerConfig and ConsumerConfig. Both share the same ObjectStoreConfig from the common crate. The structs derive Serialize/Deserialize so host services may embed them in their own config formats, but the canonical surface documented here is the Rust API.

ProducerConfig

Controls where data batches and the queue manifest are written, how often batches are flushed, and when backpressure is applied.
pub struct ProducerConfig {
    /// Object store provider. Determines where data batches and the
    /// queue manifest are persisted.
    pub object_store: ObjectStoreConfig,

    /// Path prefix for data batch objects in object storage.
    /// Default: "ingest"
    pub data_path_prefix: String,

    /// Path to the queue manifest in object storage.
    /// Default: "ingest/manifest"
    pub manifest_path: String,

    /// Flush the in-memory batch when this much time has elapsed since
    /// the last flush.
    /// Default: 100 ms
    pub flush_interval: Duration,

    /// Flush the in-memory batch when its total size in bytes (entries
    /// plus metadata) exceeds this threshold.
    /// Default: 64 MiB (67_108_864)
    pub flush_size_bytes: usize,

    /// Maximum number of input entry vectors that may be buffered for
    /// the background batch writer before backpressure is applied to
    /// callers.
    /// Default: 1000
    pub max_buffered_inputs: usize,

    /// Compression algorithm applied to the record block of each data
    /// batch.
    /// Default: CompressionType::None
    pub batch_compression: CompressionType,
}

ConsumerConfig

Controls where the queue manifest and data batches are read from, and how stale batches are garbage collected.
pub struct ConsumerConfig {
    /// Object store provider. Must point at the same bucket or directory
    /// as the paired producer.
    pub object_store: ObjectStoreConfig,

    /// Path to the queue manifest in object storage. Must match the
    /// producer's `manifest_path`.
    /// Default: "ingest/manifest"
    pub manifest_path: String,

    /// Path prefix for data batch objects. Must match the producer's
    /// `data_path_prefix`.
    /// Default: "ingest"
    pub data_path_prefix: String,

    /// How often the garbage collector runs.
    /// Default: 5 minutes
    pub gc_interval: Duration,

    /// Minimum age of an unreferenced batch file before it is eligible
    /// for deletion by the garbage collector.
    /// Default: 10 minutes
    pub gc_grace_period: Duration,
}
The grace period prevents the consumer from deleting batches that a producer has just written but not yet enqueued in the manifest.

ObjectStoreConfig

A tagged enum re-exported from the common crate. Three variants are available:
ObjectStoreConfig::Local(LocalObjectStoreConfig {
    path: "./data".to_string(),
})

CompressionType

pub enum CompressionType {
    /// No compression (default).
    None,
    /// Zstandard compression of the record block.
    Zstd,
}

Pairing producers and consumers

A producer and the consumer that drains it must agree on three values:
  • object_store must point at the same bucket or directory.
  • manifest_path must be identical, since it identifies the queue.
  • data_path_prefix must be identical, since the manifest stores batch locations relative to it.
A mismatch on any of these silently routes traffic to a different queue or causes the consumer to fail to fetch batches it sees in the manifest.

Examples

use std::time::Duration;
use common::{ObjectStoreConfig, LocalObjectStoreConfig};
use buffer::{ProducerConfig, ConsumerConfig, CompressionType};

let object_store = ObjectStoreConfig::Local(LocalObjectStoreConfig {
    path: "./data".to_string(),
});

let producer_config = ProducerConfig {
    object_store: object_store.clone(),
    data_path_prefix: "ingest".to_string(),
    manifest_path: "ingest/manifest".to_string(),
    flush_interval: Duration::from_millis(100),
    flush_size_bytes: 64 * 1024 * 1024,
    max_buffered_inputs: 1000,
    batch_compression: CompressionType::None,
};

let consumer_config = ConsumerConfig {
    object_store,
    manifest_path: "ingest/manifest".to_string(),
    data_path_prefix: "ingest".to_string(),
    gc_interval: Duration::from_secs(5 * 60),
    gc_grace_period: Duration::from_secs(10 * 60),
};