Yksinkertaistettu reititys: poistettu busy-tila ja jonotus

Pipelinen peräkkäiset kpnRun-kutsut saivat 503 koska hub merkitsi
solmun busyksi eikä vapauttanut sitä ajoissa. Reititetään aina
ensimmäiselle matchaavalle solmulle. LLM_BUSY suojaa Wasm-puolella.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jaakko Vanhala
2026-04-09 20:22:53 +03:00
parent 6c7c2d6dd3
commit f8ea5ed76e

View File

@@ -1094,7 +1094,7 @@ async fn api_chat_completions(
} }
// Etsitään vapaa solmu — priorisoidaan natiivisolmut (GPU) selaimen edelle // Etsitään vapaa solmu — priorisoidaan natiivisolmut (GPU) selaimen edelle
let (target_node_free, target_node_any, total_matching) = { let (target_node, _total_matching) = {
let tasks = state.node_tasks.lock().unwrap(); let tasks = state.node_tasks.lock().unwrap();
let busy = state.node_busy.lock().unwrap(); let busy = state.node_busy.lock().unwrap();
let node_types = state.node_types.lock().unwrap(); let node_types = state.node_types.lock().unwrap();
@@ -1110,82 +1110,34 @@ async fn api_chat_completions(
**task == payload.model **task == payload.model
} }
}).map(|(k, _)| *k).collect(); }).map(|(k, _)| *k).collect();
// Vapaat solmut: natiivi ensin, sitten selain // Etsitään mikä tahansa matchaava solmu (natiivi priorisoidaan)
let free_native = matching.iter().find(|id| { let native = matching.iter().find(|id| {
!busy.contains(id) && node_types.get(id).map(|t| t == "native").unwrap_or(false) node_types.get(id).map(|t| t == "native").unwrap_or(false)
}).copied(); }).copied();
let free_any = matching.iter().find(|id| !busy.contains(id)).copied(); let any = native.or_else(|| matching.first().copied());
let free = free_native.or(free_any); (any, matching.len())
let any = matching.first().copied();
(free, any, matching.len())
}; };
// Broadcastataan reititystila UI:lle
let task_id = payload.task_id.clone(); let task_id = payload.task_id.clone();
if target_node_any.is_none() { let target_node_id = match target_node {
// Ei yhtään solmua tälle mallille Some(id) => id,
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei solmua tälle mallille (käynnistä malli selaimessa)").into_response(); None => {
} return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei solmua tälle mallille (käynnistä malli selaimessa)").into_response();
}
};
let target_node_id; // Reititystila UI:lle
if let Some(free_id) = target_node_free { {
// Vapaa solmu löytyi — reititetään suoraan
target_node_id = free_id;
let node_type = if state.node_tasks.lock().unwrap().get(&free_id).map(|t| t.contains("native")).unwrap_or(false) { "natiivi" } else { "selain" };
let routing_msg = serde_json::json!({ let routing_msg = serde_json::json!({
"type": "task_routed", "type": "task_routed",
"task_id": task_id, "task_id": task_id,
"node_id": free_id, "node_id": target_node_id,
"node_type": node_type,
"status": "routed", "status": "routed",
"message": format!("Reititetty solmulle #{}", free_id), "message": format!("Reititetty solmulle #{}", target_node_id),
}); });
let _ = state.stats_tx.send(routing_msg.to_string()); let _ = state.stats_tx.send(routing_msg.to_string());
} else { }
// Kaikki solmut varattuja — odotetaan vapautumista (max 30s)
let queue_msg = serde_json::json!({
"type": "task_routed",
"task_id": task_id,
"status": "queued",
"message": format!("Kaikki {} solmua varattuja — odotetaan vapautumista...", total_matching),
});
let _ = state.stats_tx.send(queue_msg.to_string());
// Pollaa busy-tilaa 500ms välein, max 30s
let mut waited = 0u32;
loop {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
waited += 500;
let free = {
let tasks = state.node_tasks.lock().unwrap();
let busy = state.node_busy.lock().unwrap();
tasks.iter().find(|(node_id, task)| {
let model_match = if payload.model == "qwen-coder" {
*task == "qwen-coder-05b" || *task == "qwen-coder"
} else {
**task == payload.model
};
model_match && !busy.contains(node_id)
}).map(|(k, _)| *k)
};
if let Some(id) = free {
target_node_id = id;
let routing_msg = serde_json::json!({
"type": "task_routed",
"task_id": task_id,
"node_id": id,
"status": "routed",
"message": format!("Solmu #{} vapautui — reititetään ({:.1}s jonossa)", id, waited as f64 / 1000.0),
});
let _ = state.stats_tx.send(routing_msg.to_string());
break;
}
if waited >= 30000 {
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Aikakatkaisu: kaikki solmut varattuja 30s ajan").into_response();
}
}
};
// Merkitään solmu varatuksi ja task_id jaetuksi // Merkitään solmu varatuksi ja task_id jaetuksi
state.node_busy.lock().unwrap().insert(target_node_id); state.node_busy.lock().unwrap().insert(target_node_id);