bge_m3_embedding_server/state.rs
1// Copyright (c) 2026 J. Patrick Fulton
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shared application state threaded through Axum handlers via [`Arc<AppState>`].
16//!
17//! [`AppState`] holds the worker pool, readiness flag, concurrency semaphore,
18//! and the live cost-model handle. [`TuningInfo`] captures the static
19//! memory-detection snapshot written once before the background probe starts.
20
21use crate::binpack::CostModel;
22use crate::embedder::EmbedPool;
23use crate::sysinfo::{MemoryReading, MemorySource};
24use arc_swap::ArcSwap;
25use std::sync::atomic::{AtomicBool, AtomicU8};
26use std::sync::{Arc, OnceLock};
27use tokio::sync::Semaphore;
28
29/// Status of the background memory probe.
30///
31/// Stored in `AppState.probe_status` as an `AtomicU8` so it can be updated
32/// from the background probe task without holding a lock.
33#[repr(u8)]
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ProbeStatus {
36 /// Probe not run — a cost-model override (`BGE_M3_DISABLE_AUTO_BUDGET`,
37 /// `BGE_M3_TOKEN_BUDGET`, or explicit A/B env vars) was in effect.
38 Disabled = 0,
39 /// Probe is running in the background; workers are using conservative defaults.
40 Running = 1,
41 /// Probe completed successfully; fitted `(a, b)` are now active.
42 Complete = 2,
43 /// Probe failed or produced invalid coefficients; conservative defaults remain.
44 Failed = 3,
45 /// Probe was skipped — valid coefficients loaded from the EFS cache file.
46 CacheHit = 4,
47}
48
49impl ProbeStatus {
50 /// Returns the JSON-serializable string representation used in `/health`.
51 #[must_use]
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::Disabled => "disabled",
55 Self::Running => "running",
56 Self::Complete => "complete",
57 Self::Failed => "failed",
58 Self::CacheHit => "cache_hit",
59 }
60 }
61
62 /// Converts a raw `u8` (from `AtomicU8::load`) back to `ProbeStatus`.
63 #[must_use]
64 pub fn from_u8(v: u8) -> Self {
65 match v {
66 0 => Self::Disabled,
67 1 => Self::Running,
68 2 => Self::Complete,
69 4 => Self::CacheHit,
70 _ => Self::Failed,
71 }
72 }
73}
74
75/// Shared application state injected into every request handler via [`axum::extract::State`].
76pub struct AppState {
77 /// The embedding worker pool. Handles dense and sparse embedding requests.
78 pub pool: EmbedPool,
79 /// Atomic flag set to `true` once model warm-up and readiness probes complete.
80 ///
81 /// Handlers check this before dispatching to the pool to return `503`
82 /// while models are still loading.
83 pub ready: AtomicBool,
84 /// Maximum batch size enforced by the handler layer.
85 pub max_batch: usize,
86 /// Total number of workers configured at startup.
87 ///
88 /// Used by the `/health` endpoint to report degraded state when
89 /// `live_workers < total_workers`.
90 pub total_workers: usize,
91 /// Maximum tokenized sequence length in use.
92 pub max_seq_length: usize,
93 /// Static memory-detection info written once before the probe starts.
94 ///
95 /// Written to `OnceLock` as soon as memory detection completes (before the
96 /// background probe finishes), so `/health` can show `memory_source`,
97 /// `available_bytes`, and `model_rss_bytes_per_worker` even while the probe
98 /// is still running.
99 pub tuning: OnceLock<TuningInfo>,
100 /// Live cost-model coefficients.
101 ///
102 /// Initialized to conservative defaults at startup. Updated atomically by
103 /// the background probe (or cache-hit path) once fitted coefficients are
104 /// available. All workers share this same handle and observe the update
105 /// lock-free on their next `session.run()` call.
106 pub cost_model: Arc<ArcSwap<CostModel>>,
107 /// Current state of the background memory probe.
108 ///
109 /// Updated atomically from the background probe task. Read by `/health`
110 /// to expose `probe_status` in the `tuning` block.
111 pub probe_status: AtomicU8,
112 /// Concurrency gate for in-flight embedding requests.
113 ///
114 /// Initialized to `max(cfg_workers - 1, 1)` permits, reserving one worker
115 /// slot for the background auto-budget probe. Raised to `cfg_workers`
116 /// atomically on every terminal probe-status transition (`Disabled`,
117 /// `CacheHit`, `Complete`, `Failed`) so full concurrency is available once
118 /// the probe no longer needs a reserved worker.
119 ///
120 /// Test helpers set this to `usize::MAX` (effectively uncapped) so that
121 /// existing tests do not need to acquire a permit.
122 pub request_permits: Arc<Semaphore>,
123}
124
125/// Static workspace memory info surfaced by the `/health` endpoint.
126///
127/// Written once to [`AppState::tuning`] immediately after memory detection.
128/// The cost-model fields (`a`, `b`, `max_workspace_bytes`) are served
129/// dynamically from [`AppState::cost_model`] so they reflect the live
130/// probe result rather than the initial conservative defaults.
131#[derive(Debug, Clone, serde::Serialize)]
132pub struct TuningInfo {
133 /// Where the available-memory reading came from.
134 pub memory_source: String,
135 /// Total available bytes detected at startup.
136 pub available_bytes: usize,
137 /// Measured model session RSS delta (bytes) — max across all workers.
138 ///
139 /// Accurate on Linux via `/proc/self/status`; `0` on other platforms.
140 pub model_rss_bytes_per_worker: usize,
141 /// Worst-case total peak memory (bytes) when all workers run simultaneously
142 /// at their per-worker workspace ceiling.
143 ///
144 /// Formula: `cfg_workers × per_worker_workspace + cfg_workers × model_rss + OS_HEADROOM`.
145 pub worst_case_peak_bytes: usize,
146 /// Worst-case peak as a percentage of detected available memory.
147 ///
148 /// A value above 90% triggers a startup `WARN` log.
149 pub utilization_pct: f64,
150}
151
152impl TuningInfo {
153 /// Creates a [`TuningInfo`] from a memory reading and probe measurements.
154 #[must_use]
155 #[allow(clippy::too_many_arguments)]
156 pub fn new(
157 mem: &MemoryReading,
158 model_rss_per_worker: usize,
159 worst_case_peak_bytes: usize,
160 utilization_pct: f64,
161 ) -> Self {
162 Self {
163 memory_source: mem.source.to_string(),
164 available_bytes: mem.available_bytes,
165 model_rss_bytes_per_worker: model_rss_per_worker,
166 worst_case_peak_bytes,
167 utilization_pct,
168 }
169 }
170
171 /// Convenience builder for the case where memory detection was not possible
172 /// (macOS without cgroup support, or probe disabled).
173 #[must_use]
174 #[allow(dead_code)]
175 pub fn unknown(model_rss_per_worker: usize) -> Self {
176 Self {
177 memory_source: MemorySource::HostRam.to_string(),
178 available_bytes: 0,
179 model_rss_bytes_per_worker: model_rss_per_worker,
180 worst_case_peak_bytes: 0,
181 utilization_pct: 0.0,
182 }
183 }
184}