diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index cf8c2ff..d733306 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -972,24 +972,86 @@ async fn api_chat_completions( } } - // Etsitään ensimmäinen VAPAA solmu, joka vastaa pyydettyä mallia - let target_node = { + // Etsitään vapaa tai varattu solmu, joka vastaa pyydettyä mallia + let (target_node_free, target_node_any, total_matching) = { let tasks = state.node_tasks.lock().unwrap(); let busy = state.node_busy.lock().unwrap(); - tasks.iter().find(|(node_id, task)| { - let model_match = if payload.model == "qwen-coder" { + let matching: Vec = tasks.iter().filter(|(_, task)| { + if payload.model == "qwen-coder" { *task == "qwen-coder-05b" || *task == "qwen-coder" } else { **task == payload.model - }; - model_match && !busy.contains(node_id) - }).map(|(k, _)| *k) + } + }).map(|(k, _)| *k).collect(); + let free = matching.iter().find(|id| !busy.contains(id)).copied(); + let any = matching.first().copied(); + (free, any, matching.len()) }; - let target_node_id = match target_node { - Some(id) => id, - None => { - return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (kaikki varattuja tai ei käynnissä)").into_response(); + // Broadcastataan reititystila UI:lle + let task_id = payload.task_id.clone(); + + if target_node_any.is_none() { + // Ei yhtään solmua tälle mallille + return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei solmua tälle mallille (käynnistä malli selaimessa)").into_response(); + } + + let target_node_id; + if let Some(free_id) = target_node_free { + // Vapaa solmu löytyi — reititetään suoraan + target_node_id = free_id; + let node_type = if state.node_tasks.lock().unwrap().get(&free_id).map(|t| t.contains("native")).unwrap_or(false) { "natiivi" } else { "selain" }; + let routing_msg = serde_json::json!({ + "type": "task_routed", + "task_id": task_id, + "node_id": free_id, + "node_type": node_type, + "status": "routed", + "message": format!("Reititetty solmulle #{}", free_id), + }); + let _ = state.stats_tx.send(routing_msg.to_string()); + } else { + // Kaikki solmut varattuja — odotetaan vapautumista (max 30s) + let queue_msg = serde_json::json!({ + "type": "task_routed", + "task_id": task_id, + "status": "queued", + "message": format!("Kaikki {} solmua varattuja — odotetaan vapautumista...", total_matching), + }); + let _ = state.stats_tx.send(queue_msg.to_string()); + + // Pollaa busy-tilaa 500ms välein, max 30s + let mut waited = 0u32; + loop { + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + waited += 500; + let free = { + let tasks = state.node_tasks.lock().unwrap(); + let busy = state.node_busy.lock().unwrap(); + tasks.iter().find(|(node_id, task)| { + let model_match = if payload.model == "qwen-coder" { + *task == "qwen-coder-05b" || *task == "qwen-coder" + } else { + **task == payload.model + }; + model_match && !busy.contains(node_id) + }).map(|(k, _)| *k) + }; + if let Some(id) = free { + target_node_id = id; + let routing_msg = serde_json::json!({ + "type": "task_routed", + "task_id": task_id, + "node_id": id, + "status": "routed", + "message": format!("Solmu #{} vapautui — reititetään ({:.1}s jonossa)", id, waited as f64 / 1000.0), + }); + let _ = state.stats_tx.send(routing_msg.to_string()); + break; + } + if waited >= 30000 { + return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Aikakatkaisu: kaikki solmut varattuja 30s ajan").into_response(); + } } }; @@ -1003,18 +1065,18 @@ async fn api_chat_completions( "model": payload.model, "task_id": payload.task_id, }); - + // Odotuskanava valmiiksi (solmu palauttaa tuloksen stats_tx kautta) let mut rx = state.stats_tx.subscribe(); - - // Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle (Reititysarkkitehtuuri) + + // Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle { let channels = state.node_channels.read().await; if let Some(tx) = channels.get(&target_node_id) { let _ = tx.send(msg.to_string()); tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model); } else { - return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi pyynnön aikana").into_response(); + return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi reitityksen aikana").into_response(); } }