Documentation Index
Fetch the complete documentation index at: https://opendata.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Buffer is a Rust library. Configuration is expressed as plain Rust structs
that the host service constructs and hands to the Producer and Consumer
APIs. Two top-level structs govern the write and read sides:
ProducerConfig and ConsumerConfig. Both share the same
ObjectStoreConfig from the common crate.
The structs derive Serialize/Deserialize so host services may embed
them in their own config formats, but the canonical surface documented
here is the Rust API.
ProducerConfig
Controls where data batches and the queue manifest are written, how often
batches are flushed, and when backpressure is applied.
pub struct ProducerConfig {
/// Object store provider. Determines where data batches and the
/// queue manifest are persisted.
pub object_store: ObjectStoreConfig,
/// Path prefix for data batch objects in object storage.
/// Default: "ingest"
pub data_path_prefix: String,
/// Path to the queue manifest in object storage.
/// Default: "ingest/manifest"
pub manifest_path: String,
/// Flush the in-memory batch when this much time has elapsed since
/// the last flush.
/// Default: 100 ms
pub flush_interval: Duration,
/// Flush the in-memory batch when its total size in bytes (entries
/// plus metadata) exceeds this threshold.
/// Default: 64 MiB (67_108_864)
pub flush_size_bytes: usize,
/// Maximum number of input entry vectors that may be buffered for
/// the background batch writer before backpressure is applied to
/// callers.
/// Default: 1000
pub max_buffered_inputs: usize,
/// Compression algorithm applied to the record block of each data
/// batch.
/// Default: CompressionType::None
pub batch_compression: CompressionType,
}
ConsumerConfig
Controls where the queue manifest and data batches are read from, and
how stale batches are garbage collected.
pub struct ConsumerConfig {
/// Object store provider. Must point at the same bucket or directory
/// as the paired producer.
pub object_store: ObjectStoreConfig,
/// Path to the queue manifest in object storage. Must match the
/// producer's `manifest_path`.
/// Default: "ingest/manifest"
pub manifest_path: String,
/// Path prefix for data batch objects. Must match the producer's
/// `data_path_prefix`.
/// Default: "ingest"
pub data_path_prefix: String,
/// How often the garbage collector runs.
/// Default: 5 minutes
pub gc_interval: Duration,
/// Minimum age of an unreferenced batch file before it is eligible
/// for deletion by the garbage collector.
/// Default: 10 minutes
pub gc_grace_period: Duration,
}
The grace period prevents the consumer from deleting batches that a
producer has just written but not yet enqueued in the manifest.
ObjectStoreConfig
A tagged enum re-exported from the common crate. Three variants are
available:
Local filesystem
AWS S3
In-memory
ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: "./data".to_string(),
})
ObjectStoreConfig::Aws(AwsObjectStoreConfig {
region: "us-west-2".to_string(),
bucket: "my-ingest-bucket".to_string(),
})
ObjectStoreConfig::InMemory
CompressionType
pub enum CompressionType {
/// No compression (default).
None,
/// Zstandard compression of the record block.
Zstd,
}
Pairing producers and consumers
A producer and the consumer that drains it must agree on three values:
object_store must point at the same bucket or directory.
manifest_path must be identical, since it identifies the queue.
data_path_prefix must be identical, since the manifest stores
batch locations relative to it.
A mismatch on any of these silently routes traffic to a different queue
or causes the consumer to fail to fetch batches it sees in the manifest.
Examples
Local development
Production (S3)
Testing (in-memory)
use std::time::Duration;
use common::{ObjectStoreConfig, LocalObjectStoreConfig};
use buffer::{ProducerConfig, ConsumerConfig, CompressionType};
let object_store = ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: "./data".to_string(),
});
let producer_config = ProducerConfig {
object_store: object_store.clone(),
data_path_prefix: "ingest".to_string(),
manifest_path: "ingest/manifest".to_string(),
flush_interval: Duration::from_millis(100),
flush_size_bytes: 64 * 1024 * 1024,
max_buffered_inputs: 1000,
batch_compression: CompressionType::None,
};
let consumer_config = ConsumerConfig {
object_store,
manifest_path: "ingest/manifest".to_string(),
data_path_prefix: "ingest".to_string(),
gc_interval: Duration::from_secs(5 * 60),
gc_grace_period: Duration::from_secs(10 * 60),
};
use std::time::Duration;
use common::{ObjectStoreConfig, AwsObjectStoreConfig};
use buffer::{ProducerConfig, ConsumerConfig, CompressionType};
let object_store = ObjectStoreConfig::Aws(AwsObjectStoreConfig {
region: "us-west-2".to_string(),
bucket: "my-ingest-bucket".to_string(),
});
let producer_config = ProducerConfig {
object_store: object_store.clone(),
data_path_prefix: "ingest".to_string(),
manifest_path: "ingest/manifest".to_string(),
flush_interval: Duration::from_millis(200),
flush_size_bytes: 128 * 1024 * 1024,
max_buffered_inputs: 1000,
batch_compression: CompressionType::Zstd,
};
let consumer_config = ConsumerConfig {
object_store,
manifest_path: "ingest/manifest".to_string(),
data_path_prefix: "ingest".to_string(),
gc_interval: Duration::from_secs(5 * 60),
gc_grace_period: Duration::from_secs(10 * 60),
};
use std::time::Duration;
use common::ObjectStoreConfig;
use buffer::{ProducerConfig, ConsumerConfig, CompressionType};
let producer_config = ProducerConfig {
object_store: ObjectStoreConfig::InMemory,
data_path_prefix: "ingest".to_string(),
manifest_path: "ingest/manifest".to_string(),
flush_interval: Duration::from_millis(100),
flush_size_bytes: 64 * 1024 * 1024,
max_buffered_inputs: 1000,
batch_compression: CompressionType::None,
};
let consumer_config = ConsumerConfig {
object_store: ObjectStoreConfig::InMemory,
manifest_path: "ingest/manifest".to_string(),
data_path_prefix: "ingest".to_string(),
gc_interval: Duration::from_secs(5 * 60),
gc_grace_period: Duration::from_secs(10 * 60),
};