Skip to main content

bge_m3_embedding_server/handler/
dense.rs

1// Copyright (c) 2026 J. Patrick Fulton
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `POST /v1/embeddings` handler — OpenAI-compatible dense embeddings.
16
17use std::sync::Arc;
18use std::time::Instant;
19
20use axum::{extract::State, Json};
21
22use super::common::{check_ready, validate_input};
23use crate::error::AppError;
24use crate::models::{DenseEmbeddingData, DenseRequest, DenseResponse, Usage};
25use crate::state::AppState;
26
27/// Handles `POST /v1/embeddings` — returns dense (float32) embeddings.
28///
29/// # Errors
30///
31/// - [`AppError::ServiceUnavailable`] if the model is not ready or no workers are live.
32/// - [`AppError::InvalidRequest`] if the batch is empty, exceeds `max_batch`, or any
33///   text exceeds the per-string character limit.
34/// - [`AppError::Internal`] if the embedding pool returns an inference error.
35///
36/// # Panics
37///
38/// Panics if the request semaphore has been closed — should not occur in normal operation.
39#[tracing::instrument(
40    skip(state, req),
41    fields(
42        batch_size,
43        prompt_tokens,
44        chunks,
45        max_chunk_seq,
46        tokenize_ms,
47        inference_ms,
48        total_ms
49    )
50)]
51pub async fn dense_embeddings(
52    State(state): State<Arc<AppState>>,
53    Json(req): Json<DenseRequest>,
54) -> Result<Json<DenseResponse>, AppError> {
55    check_ready(&state)?;
56    let texts = req.input.0;
57    drop(req.model);
58    validate_input(&texts, state.max_batch)?;
59    let batch_size = texts.len();
60    tracing::Span::current().record("batch_size", batch_size);
61
62    let prompt_tokens: usize = texts.iter().map(|t| t.chars().count() / 4 + 1).sum();
63    tracing::Span::current().record("prompt_tokens", prompt_tokens);
64
65    let t0 = Instant::now();
66
67    // Acquire a concurrency permit before dispatching to the worker pool.
68    // This is released on drop when the handler returns (success or error).
69    let _permit = Arc::clone(&state.request_permits)
70        .acquire_owned()
71        .await
72        .expect("request semaphore is never closed");
73
74    let (embeddings, embed_stats) = state.pool.dense(texts).await?;
75
76    let total_ms = u64::try_from(t0.elapsed().as_millis()).unwrap_or(u64::MAX);
77    tracing::Span::current()
78        .record("chunks", embed_stats.chunks)
79        .record("max_chunk_seq", embed_stats.max_chunk_seq)
80        .record("tokenize_ms", embed_stats.tokenize_ms)
81        .record("inference_ms", embed_stats.inference_ms)
82        .record("total_ms", total_ms);
83    tracing::info!(
84        route = "dense",
85        batch_size,
86        prompt_tokens,
87        chunks = embed_stats.chunks,
88        max_chunk_seq = embed_stats.max_chunk_seq,
89        total_token_positions = embed_stats.total_token_positions,
90        tokenize_ms = embed_stats.tokenize_ms,
91        inference_ms = embed_stats.inference_ms,
92        total_ms,
93        "embedding request complete"
94    );
95
96    let data = embeddings
97        .into_iter()
98        .enumerate()
99        .map(|(index, embedding)| DenseEmbeddingData {
100            object: "embedding",
101            index,
102            embedding,
103        })
104        .collect();
105
106    Ok(Json(DenseResponse {
107        object: "list",
108        model: "bge-m3",
109        data,
110        usage: Usage {
111            prompt_tokens,
112            total_tokens: prompt_tokens,
113        },
114    }))
115}