From 8b8ba01af3e9514a91f05d769cb34d1faa0fbd33 Mon Sep 17 00:00:00 2001 From: Jaakko Vanhala Date: Mon, 13 Apr 2026 06:50:45 +0300 Subject: [PATCH] Toipuminen yhteyskatkoksesta: hub ilmoittaa API:lle, node reconnectaa MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Hub: kun node katoaa kesken tehtävän, palauttaa virheen API-kutsulle - Hub: node_active_task seuraa mikä tehtävä on kesken - Hub: timeout 600s → 120s - Node: reconnect nollaa busy-tilan ja näyttää sen TUI:ssa --- network-poc/hub/src/main.rs | 33 ++++++++++++++++++++++++++--- network-poc/native-node/src/main.rs | 18 ++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index f8bff3b..28b06b7 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -42,6 +42,7 @@ struct AppState { node_types: Mutex>, // node_id → "native" | "browser" node_paused: Mutex>, // node_id → onko tauolla node_busy: Mutex>, // Solmut joilla on aktiivinen tehtävä + node_active_task: Mutex>, // node_id → task_id (mikä tehtävä on kesken) pending_task_ids: Mutex>, // Hubin jakamat task_id:t (gamification-validointi) pending_responses: Mutex>>, // task_id → oneshot API-vastaukselle api_rate_limits: Mutex>, // IP → (ikkuna-alku, pyyntömäärä) @@ -329,6 +330,7 @@ async fn main() { node_types: Mutex::new(HashMap::new()), node_paused: Mutex::new(std::collections::HashSet::new()), node_busy: Mutex::new(std::collections::HashSet::new()), + node_active_task: Mutex::new(HashMap::new()), pending_task_ids: Mutex::new(std::collections::HashSet::new()), pending_responses: Mutex::new(HashMap::new()), api_rate_limits: Mutex::new(HashMap::new()), @@ -908,6 +910,7 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { broadcast_stats(&state).await; } else if msg_type == "pair_done" { state.node_busy.lock().unwrap().remove(&node_id); + state.node_active_task.lock().unwrap().remove(&node_id); { let mut json = json; // Siirretään omistajuus muokkausta varten if let Some(obj) = json.as_object_mut() { @@ -994,6 +997,7 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } else if msg_type == "llm_done" { // Vapautetaan solmu ja tarkistetaan task_id:n aitous state.node_busy.lock().unwrap().remove(&node_id); + state.node_active_task.lock().unwrap().remove(&node_id); let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string()); let valid_task = if let Some(ref tid) = task_id { state.pending_task_ids.lock().unwrap().remove(tid.as_str()) @@ -1063,6 +1067,7 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } } else if msg_type == "llm_error" { state.node_busy.lock().unwrap().remove(&node_id); + state.node_active_task.lock().unwrap().remove(&node_id); let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string()); if let Some(ref tid) = task_id { state.pending_task_ids.lock().unwrap().remove(tid.as_str()); @@ -1109,6 +1114,22 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { // Yhteys katkesi — merkitään session päättyneeksi ja siivotaan atomisesti state.db.close_session(node_id); + + // Jos solmulla oli kesken tehtävä, ilmoitetaan odottavalle API-kutsulle + let lost_task_id = state.node_active_task.lock().unwrap().remove(&node_id); + if let Some(tid) = lost_task_id { + tracing::warn!("Solmu {} katosi kesken tehtävän {} — palautetaan virhe API:lle", node_id, tid); + state.pending_task_ids.lock().unwrap().remove(&tid); + if let Some(resp_tx) = state.pending_responses.lock().unwrap().remove(&tid) { + let err = serde_json::json!({ + "type": "llm_error", + "error": format!("Solmu #{} katosi kesken laskennan (task {})", node_id, tid), + "task_id": tid + }); + let _ = resp_tx.send(err); + } + } + { // Lukitaan kaikki kerralla, jotta solmu ei ole osittain siivottu let mut tasks = state.node_tasks.lock().unwrap(); @@ -1308,6 +1329,7 @@ async fn api_chat_completions( // Merkitään solmu varatuksi ja task_id jaetuksi state.node_busy.lock().unwrap().insert(target_node_id); + state.node_active_task.lock().unwrap().insert(target_node_id, payload.task_id.clone()); state.pending_task_ids.lock().unwrap().insert(payload.task_id.clone()); let mut msg = serde_json::json!({ @@ -1340,7 +1362,7 @@ async fn api_chat_completions( } } - let timeout = tokio::time::timeout(std::time::Duration::from_secs(600), resp_rx).await; + let timeout = tokio::time::timeout(std::time::Duration::from_secs(120), resp_rx).await; match timeout { Ok(Ok(v)) => { @@ -1356,12 +1378,17 @@ async fn api_chat_completions( } } Ok(Err(_)) => { - // Oneshot-kanava sulkeutui (solmu katosi) + // Oneshot-kanava sulkeutui (solmu katosi kesken laskennan) state.pending_responses.lock().unwrap().remove(&payload.task_id); - (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response() + state.node_busy.lock().unwrap().remove(&target_node_id); + state.node_active_task.lock().unwrap().remove(&target_node_id); + (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Solmu katosi kesken laskennan — yritä uudelleen").into_response() } Err(_) => { + // Timeout — solmu ei vastannut ajoissa state.pending_responses.lock().unwrap().remove(&payload.task_id); + state.node_busy.lock().unwrap().remove(&target_node_id); + state.node_active_task.lock().unwrap().remove(&target_node_id); (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response() } } diff --git a/network-poc/native-node/src/main.rs b/network-poc/native-node/src/main.rs index 02d27b4..3a8cee9 100644 --- a/network-poc/native-node/src/main.rs +++ b/network-poc/native-node/src/main.rs @@ -634,9 +634,27 @@ async fn main() { } } + // Yhteys katkesi — nollataan TUI:n busy-tila + { + let mut st = tui_state.write().await; + let lost_task = st.cur_task_id.clone(); + if let Some(tid) = lost_task { + st.push_log("Network", format!("Tehtävä {} keskeytyi yhteyden katketessa", tid), None); + } + st.cur_task_id = None; + st.cur_prompt = None; + st.node_id = None; + st.status = "RECONNECTING".to_string(); + st.push_log("Network", "Yhteys hubiin katkesi — yhdistetään uudelleen 5s...".to_string(), None); + } tracing::warn!("Yhteys hubiin katkesi — yritetään uudelleen 5s..."); } Err(e) => { + { + let mut st = tui_state.write().await; + st.status = "RECONNECTING".to_string(); + st.push_log("Network", format!("Yhdistäminen epäonnistui: {} — yritetään 5s...", e), None); + } tracing::warn!("Hubiin yhdistäminen epäonnistui: {} — yritetään uudelleen 5s...", e); } }