Skip to main content

bge_m3_embedding_server/
state.rs

1// Copyright (c) 2026 J. Patrick Fulton
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shared application state threaded through Axum handlers via [`Arc<AppState>`].
16//!
17//! [`AppState`] holds the worker pool, readiness flag, concurrency semaphore,
18//! and the live cost-model handle. [`TuningInfo`] captures the static
19//! memory-detection snapshot written once before the background probe starts.
20
21use crate::binpack::CostModel;
22use crate::embedder::EmbedPool;
23use crate::sysinfo::{MemoryReading, MemorySource};
24use arc_swap::ArcSwap;
25use std::sync::atomic::{AtomicBool, AtomicU8};
26use std::sync::{Arc, OnceLock};
27use tokio::sync::Semaphore;
28
29/// Status of the background memory probe.
30///
31/// Stored in `AppState.probe_status` as an `AtomicU8` so it can be updated
32/// from the background probe task without holding a lock.
33#[repr(u8)]
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ProbeStatus {
36    /// Probe not run — a cost-model override (`BGE_M3_DISABLE_AUTO_BUDGET`,
37    /// `BGE_M3_TOKEN_BUDGET`, or explicit A/B env vars) was in effect.
38    Disabled = 0,
39    /// Probe is running in the background; workers are using conservative defaults.
40    Running = 1,
41    /// Probe completed successfully; fitted `(a, b)` are now active.
42    Complete = 2,
43    /// Probe failed or produced invalid coefficients; conservative defaults remain.
44    Failed = 3,
45    /// Probe was skipped — valid coefficients loaded from the EFS cache file.
46    CacheHit = 4,
47}
48
49impl ProbeStatus {
50    /// Returns the JSON-serializable string representation used in `/health`.
51    #[must_use]
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::Disabled => "disabled",
55            Self::Running => "running",
56            Self::Complete => "complete",
57            Self::Failed => "failed",
58            Self::CacheHit => "cache_hit",
59        }
60    }
61
62    /// Converts a raw `u8` (from `AtomicU8::load`) back to `ProbeStatus`.
63    #[must_use]
64    pub fn from_u8(v: u8) -> Self {
65        match v {
66            0 => Self::Disabled,
67            1 => Self::Running,
68            2 => Self::Complete,
69            4 => Self::CacheHit,
70            _ => Self::Failed,
71        }
72    }
73}
74
75/// Shared application state injected into every request handler via [`axum::extract::State`].
76pub struct AppState {
77    /// The embedding worker pool. Handles dense and sparse embedding requests.
78    pub pool: EmbedPool,
79    /// Atomic flag set to `true` once model warm-up and readiness probes complete.
80    ///
81    /// Handlers check this before dispatching to the pool to return `503`
82    /// while models are still loading.
83    pub ready: AtomicBool,
84    /// Maximum batch size enforced by the handler layer.
85    pub max_batch: usize,
86    /// Total number of workers configured at startup.
87    ///
88    /// Used by the `/health` endpoint to report degraded state when
89    /// `live_workers < total_workers`.
90    pub total_workers: usize,
91    /// Maximum tokenized sequence length in use.
92    pub max_seq_length: usize,
93    /// Static memory-detection info written once before the probe starts.
94    ///
95    /// Written to `OnceLock` as soon as memory detection completes (before the
96    /// background probe finishes), so `/health` can show `memory_source`,
97    /// `available_bytes`, and `model_rss_bytes_per_worker` even while the probe
98    /// is still running.
99    pub tuning: OnceLock<TuningInfo>,
100    /// Live cost-model coefficients.
101    ///
102    /// Initialized to conservative defaults at startup. Updated atomically by
103    /// the background probe (or cache-hit path) once fitted coefficients are
104    /// available. All workers share this same handle and observe the update
105    /// lock-free on their next `session.run()` call.
106    pub cost_model: Arc<ArcSwap<CostModel>>,
107    /// Current state of the background memory probe.
108    ///
109    /// Updated atomically from the background probe task. Read by `/health`
110    /// to expose `probe_status` in the `tuning` block.
111    pub probe_status: AtomicU8,
112    /// Concurrency gate for in-flight embedding requests.
113    ///
114    /// Initialized to `max(cfg_workers - 1, 1)` permits, reserving one worker
115    /// slot for the background auto-budget probe.  Raised to `cfg_workers`
116    /// atomically on every terminal probe-status transition (`Disabled`,
117    /// `CacheHit`, `Complete`, `Failed`) so full concurrency is available once
118    /// the probe no longer needs a reserved worker.
119    ///
120    /// Test helpers set this to `usize::MAX` (effectively uncapped) so that
121    /// existing tests do not need to acquire a permit.
122    pub request_permits: Arc<Semaphore>,
123}
124
125/// Static workspace memory info surfaced by the `/health` endpoint.
126///
127/// Written once to [`AppState::tuning`] immediately after memory detection.
128/// The cost-model fields (`a`, `b`, `max_workspace_bytes`) are served
129/// dynamically from [`AppState::cost_model`] so they reflect the live
130/// probe result rather than the initial conservative defaults.
131#[derive(Debug, Clone, serde::Serialize)]
132pub struct TuningInfo {
133    /// Where the available-memory reading came from.
134    pub memory_source: String,
135    /// Total available bytes detected at startup.
136    pub available_bytes: usize,
137    /// Measured model session RSS delta (bytes) — max across all workers.
138    ///
139    /// Accurate on Linux via `/proc/self/status`; `0` on other platforms.
140    pub model_rss_bytes_per_worker: usize,
141    /// Worst-case total peak memory (bytes) when all workers run simultaneously
142    /// at their per-worker workspace ceiling.
143    ///
144    /// Formula: `cfg_workers × per_worker_workspace + cfg_workers × model_rss + OS_HEADROOM`.
145    pub worst_case_peak_bytes: usize,
146    /// Worst-case peak as a percentage of detected available memory.
147    ///
148    /// A value above 90% triggers a startup `WARN` log.
149    pub utilization_pct: f64,
150}
151
152impl TuningInfo {
153    /// Creates a [`TuningInfo`] from a memory reading and probe measurements.
154    #[must_use]
155    #[allow(clippy::too_many_arguments)]
156    pub fn new(
157        mem: &MemoryReading,
158        model_rss_per_worker: usize,
159        worst_case_peak_bytes: usize,
160        utilization_pct: f64,
161    ) -> Self {
162        Self {
163            memory_source: mem.source.to_string(),
164            available_bytes: mem.available_bytes,
165            model_rss_bytes_per_worker: model_rss_per_worker,
166            worst_case_peak_bytes,
167            utilization_pct,
168        }
169    }
170
171    /// Convenience builder for the case where memory detection was not possible
172    /// (macOS without cgroup support, or probe disabled).
173    #[must_use]
174    #[allow(dead_code)]
175    pub fn unknown(model_rss_per_worker: usize) -> Self {
176        Self {
177            memory_source: MemorySource::HostRam.to_string(),
178            available_bytes: 0,
179            model_rss_bytes_per_worker: model_rss_per_worker,
180            worst_case_peak_bytes: 0,
181            utilization_pct: 0.0,
182        }
183    }
184}