Tehtävien reitityksen tilatieto ja työjono: task_routed-viesti UI:lle, 30s jono kun solmut varattuja
Hub broadcastaa task_routed-viestin joka kertoo reitityksen tilan: - "routed": vapaa solmu löytyi, tehtävä reititetty suoraan - "queued": kaikki solmut varattuja, odotetaan vapautumista (max 30s poll) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -972,24 +972,86 @@ async fn api_chat_completions(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Etsitään ensimmäinen VAPAA solmu, joka vastaa pyydettyä mallia
|
// Etsitään vapaa tai varattu solmu, joka vastaa pyydettyä mallia
|
||||||
let target_node = {
|
let (target_node_free, target_node_any, total_matching) = {
|
||||||
let tasks = state.node_tasks.lock().unwrap();
|
let tasks = state.node_tasks.lock().unwrap();
|
||||||
let busy = state.node_busy.lock().unwrap();
|
let busy = state.node_busy.lock().unwrap();
|
||||||
tasks.iter().find(|(node_id, task)| {
|
let matching: Vec<u64> = tasks.iter().filter(|(_, task)| {
|
||||||
let model_match = if payload.model == "qwen-coder" {
|
if payload.model == "qwen-coder" {
|
||||||
*task == "qwen-coder-05b" || *task == "qwen-coder"
|
*task == "qwen-coder-05b" || *task == "qwen-coder"
|
||||||
} else {
|
} else {
|
||||||
**task == payload.model
|
**task == payload.model
|
||||||
};
|
}
|
||||||
model_match && !busy.contains(node_id)
|
}).map(|(k, _)| *k).collect();
|
||||||
}).map(|(k, _)| *k)
|
let free = matching.iter().find(|id| !busy.contains(id)).copied();
|
||||||
|
let any = matching.first().copied();
|
||||||
|
(free, any, matching.len())
|
||||||
};
|
};
|
||||||
|
|
||||||
let target_node_id = match target_node {
|
// Broadcastataan reititystila UI:lle
|
||||||
Some(id) => id,
|
let task_id = payload.task_id.clone();
|
||||||
None => {
|
|
||||||
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (kaikki varattuja tai ei käynnissä)").into_response();
|
if target_node_any.is_none() {
|
||||||
|
// Ei yhtään solmua tälle mallille
|
||||||
|
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei solmua tälle mallille (käynnistä malli selaimessa)").into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
let target_node_id;
|
||||||
|
if let Some(free_id) = target_node_free {
|
||||||
|
// Vapaa solmu löytyi — reititetään suoraan
|
||||||
|
target_node_id = free_id;
|
||||||
|
let node_type = if state.node_tasks.lock().unwrap().get(&free_id).map(|t| t.contains("native")).unwrap_or(false) { "natiivi" } else { "selain" };
|
||||||
|
let routing_msg = serde_json::json!({
|
||||||
|
"type": "task_routed",
|
||||||
|
"task_id": task_id,
|
||||||
|
"node_id": free_id,
|
||||||
|
"node_type": node_type,
|
||||||
|
"status": "routed",
|
||||||
|
"message": format!("Reititetty solmulle #{}", free_id),
|
||||||
|
});
|
||||||
|
let _ = state.stats_tx.send(routing_msg.to_string());
|
||||||
|
} else {
|
||||||
|
// Kaikki solmut varattuja — odotetaan vapautumista (max 30s)
|
||||||
|
let queue_msg = serde_json::json!({
|
||||||
|
"type": "task_routed",
|
||||||
|
"task_id": task_id,
|
||||||
|
"status": "queued",
|
||||||
|
"message": format!("Kaikki {} solmua varattuja — odotetaan vapautumista...", total_matching),
|
||||||
|
});
|
||||||
|
let _ = state.stats_tx.send(queue_msg.to_string());
|
||||||
|
|
||||||
|
// Pollaa busy-tilaa 500ms välein, max 30s
|
||||||
|
let mut waited = 0u32;
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||||
|
waited += 500;
|
||||||
|
let free = {
|
||||||
|
let tasks = state.node_tasks.lock().unwrap();
|
||||||
|
let busy = state.node_busy.lock().unwrap();
|
||||||
|
tasks.iter().find(|(node_id, task)| {
|
||||||
|
let model_match = if payload.model == "qwen-coder" {
|
||||||
|
*task == "qwen-coder-05b" || *task == "qwen-coder"
|
||||||
|
} else {
|
||||||
|
**task == payload.model
|
||||||
|
};
|
||||||
|
model_match && !busy.contains(node_id)
|
||||||
|
}).map(|(k, _)| *k)
|
||||||
|
};
|
||||||
|
if let Some(id) = free {
|
||||||
|
target_node_id = id;
|
||||||
|
let routing_msg = serde_json::json!({
|
||||||
|
"type": "task_routed",
|
||||||
|
"task_id": task_id,
|
||||||
|
"node_id": id,
|
||||||
|
"status": "routed",
|
||||||
|
"message": format!("Solmu #{} vapautui — reititetään ({:.1}s jonossa)", id, waited as f64 / 1000.0),
|
||||||
|
});
|
||||||
|
let _ = state.stats_tx.send(routing_msg.to_string());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if waited >= 30000 {
|
||||||
|
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Aikakatkaisu: kaikki solmut varattuja 30s ajan").into_response();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1007,14 +1069,14 @@ async fn api_chat_completions(
|
|||||||
// Odotuskanava valmiiksi (solmu palauttaa tuloksen stats_tx kautta)
|
// Odotuskanava valmiiksi (solmu palauttaa tuloksen stats_tx kautta)
|
||||||
let mut rx = state.stats_tx.subscribe();
|
let mut rx = state.stats_tx.subscribe();
|
||||||
|
|
||||||
// Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle (Reititysarkkitehtuuri)
|
// Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle
|
||||||
{
|
{
|
||||||
let channels = state.node_channels.read().await;
|
let channels = state.node_channels.read().await;
|
||||||
if let Some(tx) = channels.get(&target_node_id) {
|
if let Some(tx) = channels.get(&target_node_id) {
|
||||||
let _ = tx.send(msg.to_string());
|
let _ = tx.send(msg.to_string());
|
||||||
tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model);
|
tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model);
|
||||||
} else {
|
} else {
|
||||||
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi pyynnön aikana").into_response();
|
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi reitityksen aikana").into_response();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user