diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index 260567a..ed0a6e8 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -1094,7 +1094,7 @@ async fn api_chat_completions( } // Etsitään vapaa solmu — priorisoidaan natiivisolmut (GPU) selaimen edelle - let (target_node_free, target_node_any, total_matching) = { + let (target_node, _total_matching) = { let tasks = state.node_tasks.lock().unwrap(); let busy = state.node_busy.lock().unwrap(); let node_types = state.node_types.lock().unwrap(); @@ -1110,82 +1110,34 @@ async fn api_chat_completions( **task == payload.model } }).map(|(k, _)| *k).collect(); - // Vapaat solmut: natiivi ensin, sitten selain - let free_native = matching.iter().find(|id| { - !busy.contains(id) && node_types.get(id).map(|t| t == "native").unwrap_or(false) + // Etsitään mikä tahansa matchaava solmu (natiivi priorisoidaan) + let native = matching.iter().find(|id| { + node_types.get(id).map(|t| t == "native").unwrap_or(false) }).copied(); - let free_any = matching.iter().find(|id| !busy.contains(id)).copied(); - let free = free_native.or(free_any); - let any = matching.first().copied(); - (free, any, matching.len()) + let any = native.or_else(|| matching.first().copied()); + (any, matching.len()) }; - // Broadcastataan reititystila UI:lle let task_id = payload.task_id.clone(); - if target_node_any.is_none() { - // Ei yhtään solmua tälle mallille - return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei solmua tälle mallille (käynnistä malli selaimessa)").into_response(); - } + let target_node_id = match target_node { + Some(id) => id, + None => { + return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei solmua tälle mallille (käynnistä malli selaimessa)").into_response(); + } + }; - let target_node_id; - if let Some(free_id) = target_node_free { - // Vapaa solmu löytyi — reititetään suoraan - target_node_id = free_id; - let node_type = if state.node_tasks.lock().unwrap().get(&free_id).map(|t| t.contains("native")).unwrap_or(false) { "natiivi" } else { "selain" }; + // Reititystila UI:lle + { let routing_msg = serde_json::json!({ "type": "task_routed", "task_id": task_id, - "node_id": free_id, - "node_type": node_type, + "node_id": target_node_id, "status": "routed", - "message": format!("Reititetty solmulle #{}", free_id), + "message": format!("Reititetty solmulle #{}", target_node_id), }); let _ = state.stats_tx.send(routing_msg.to_string()); - } else { - // Kaikki solmut varattuja — odotetaan vapautumista (max 30s) - let queue_msg = serde_json::json!({ - "type": "task_routed", - "task_id": task_id, - "status": "queued", - "message": format!("Kaikki {} solmua varattuja — odotetaan vapautumista...", total_matching), - }); - let _ = state.stats_tx.send(queue_msg.to_string()); - - // Pollaa busy-tilaa 500ms välein, max 30s - let mut waited = 0u32; - loop { - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - waited += 500; - let free = { - let tasks = state.node_tasks.lock().unwrap(); - let busy = state.node_busy.lock().unwrap(); - tasks.iter().find(|(node_id, task)| { - let model_match = if payload.model == "qwen-coder" { - *task == "qwen-coder-05b" || *task == "qwen-coder" - } else { - **task == payload.model - }; - model_match && !busy.contains(node_id) - }).map(|(k, _)| *k) - }; - if let Some(id) = free { - target_node_id = id; - let routing_msg = serde_json::json!({ - "type": "task_routed", - "task_id": task_id, - "node_id": id, - "status": "routed", - "message": format!("Solmu #{} vapautui — reititetään ({:.1}s jonossa)", id, waited as f64 / 1000.0), - }); - let _ = state.stats_tx.send(routing_msg.to_string()); - break; - } - if waited >= 30000 { - return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Aikakatkaisu: kaikki solmut varattuja 30s ajan").into_response(); - } - } - }; + } // Merkitään solmu varatuksi ja task_id jaetuksi state.node_busy.lock().unwrap().insert(target_node_id);