Toipuminen yhteyskatkoksesta: hub ilmoittaa API:lle, node reconnectaa
- Hub: kun node katoaa kesken tehtävän, palauttaa virheen API-kutsulle - Hub: node_active_task seuraa mikä tehtävä on kesken - Hub: timeout 600s → 120s - Node: reconnect nollaa busy-tilan ja näyttää sen TUI:ssa
This commit is contained in:
@@ -42,6 +42,7 @@ struct AppState {
|
|||||||
node_types: Mutex<HashMap<u64, String>>, // node_id → "native" | "browser"
|
node_types: Mutex<HashMap<u64, String>>, // node_id → "native" | "browser"
|
||||||
node_paused: Mutex<std::collections::HashSet<u64>>, // node_id → onko tauolla
|
node_paused: Mutex<std::collections::HashSet<u64>>, // node_id → onko tauolla
|
||||||
node_busy: Mutex<std::collections::HashSet<u64>>, // Solmut joilla on aktiivinen tehtävä
|
node_busy: Mutex<std::collections::HashSet<u64>>, // Solmut joilla on aktiivinen tehtävä
|
||||||
|
node_active_task: Mutex<HashMap<u64, String>>, // node_id → task_id (mikä tehtävä on kesken)
|
||||||
pending_task_ids: Mutex<std::collections::HashSet<String>>, // Hubin jakamat task_id:t (gamification-validointi)
|
pending_task_ids: Mutex<std::collections::HashSet<String>>, // Hubin jakamat task_id:t (gamification-validointi)
|
||||||
pending_responses: Mutex<HashMap<String, tokio::sync::oneshot::Sender<serde_json::Value>>>, // task_id → oneshot API-vastaukselle
|
pending_responses: Mutex<HashMap<String, tokio::sync::oneshot::Sender<serde_json::Value>>>, // task_id → oneshot API-vastaukselle
|
||||||
api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä)
|
api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä)
|
||||||
@@ -329,6 +330,7 @@ async fn main() {
|
|||||||
node_types: Mutex::new(HashMap::new()),
|
node_types: Mutex::new(HashMap::new()),
|
||||||
node_paused: Mutex::new(std::collections::HashSet::new()),
|
node_paused: Mutex::new(std::collections::HashSet::new()),
|
||||||
node_busy: Mutex::new(std::collections::HashSet::new()),
|
node_busy: Mutex::new(std::collections::HashSet::new()),
|
||||||
|
node_active_task: Mutex::new(HashMap::new()),
|
||||||
pending_task_ids: Mutex::new(std::collections::HashSet::new()),
|
pending_task_ids: Mutex::new(std::collections::HashSet::new()),
|
||||||
pending_responses: Mutex::new(HashMap::new()),
|
pending_responses: Mutex::new(HashMap::new()),
|
||||||
api_rate_limits: Mutex::new(HashMap::new()),
|
api_rate_limits: Mutex::new(HashMap::new()),
|
||||||
@@ -908,6 +910,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
broadcast_stats(&state).await;
|
broadcast_stats(&state).await;
|
||||||
} else if msg_type == "pair_done" {
|
} else if msg_type == "pair_done" {
|
||||||
state.node_busy.lock().unwrap().remove(&node_id);
|
state.node_busy.lock().unwrap().remove(&node_id);
|
||||||
|
state.node_active_task.lock().unwrap().remove(&node_id);
|
||||||
{
|
{
|
||||||
let mut json = json; // Siirretään omistajuus muokkausta varten
|
let mut json = json; // Siirretään omistajuus muokkausta varten
|
||||||
if let Some(obj) = json.as_object_mut() {
|
if let Some(obj) = json.as_object_mut() {
|
||||||
@@ -994,6 +997,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
} else if msg_type == "llm_done" {
|
} else if msg_type == "llm_done" {
|
||||||
// Vapautetaan solmu ja tarkistetaan task_id:n aitous
|
// Vapautetaan solmu ja tarkistetaan task_id:n aitous
|
||||||
state.node_busy.lock().unwrap().remove(&node_id);
|
state.node_busy.lock().unwrap().remove(&node_id);
|
||||||
|
state.node_active_task.lock().unwrap().remove(&node_id);
|
||||||
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
||||||
let valid_task = if let Some(ref tid) = task_id {
|
let valid_task = if let Some(ref tid) = task_id {
|
||||||
state.pending_task_ids.lock().unwrap().remove(tid.as_str())
|
state.pending_task_ids.lock().unwrap().remove(tid.as_str())
|
||||||
@@ -1063,6 +1067,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
}
|
}
|
||||||
} else if msg_type == "llm_error" {
|
} else if msg_type == "llm_error" {
|
||||||
state.node_busy.lock().unwrap().remove(&node_id);
|
state.node_busy.lock().unwrap().remove(&node_id);
|
||||||
|
state.node_active_task.lock().unwrap().remove(&node_id);
|
||||||
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
||||||
if let Some(ref tid) = task_id {
|
if let Some(ref tid) = task_id {
|
||||||
state.pending_task_ids.lock().unwrap().remove(tid.as_str());
|
state.pending_task_ids.lock().unwrap().remove(tid.as_str());
|
||||||
@@ -1109,6 +1114,22 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
|
|
||||||
// Yhteys katkesi — merkitään session päättyneeksi ja siivotaan atomisesti
|
// Yhteys katkesi — merkitään session päättyneeksi ja siivotaan atomisesti
|
||||||
state.db.close_session(node_id);
|
state.db.close_session(node_id);
|
||||||
|
|
||||||
|
// Jos solmulla oli kesken tehtävä, ilmoitetaan odottavalle API-kutsulle
|
||||||
|
let lost_task_id = state.node_active_task.lock().unwrap().remove(&node_id);
|
||||||
|
if let Some(tid) = lost_task_id {
|
||||||
|
tracing::warn!("Solmu {} katosi kesken tehtävän {} — palautetaan virhe API:lle", node_id, tid);
|
||||||
|
state.pending_task_ids.lock().unwrap().remove(&tid);
|
||||||
|
if let Some(resp_tx) = state.pending_responses.lock().unwrap().remove(&tid) {
|
||||||
|
let err = serde_json::json!({
|
||||||
|
"type": "llm_error",
|
||||||
|
"error": format!("Solmu #{} katosi kesken laskennan (task {})", node_id, tid),
|
||||||
|
"task_id": tid
|
||||||
|
});
|
||||||
|
let _ = resp_tx.send(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
// Lukitaan kaikki kerralla, jotta solmu ei ole osittain siivottu
|
// Lukitaan kaikki kerralla, jotta solmu ei ole osittain siivottu
|
||||||
let mut tasks = state.node_tasks.lock().unwrap();
|
let mut tasks = state.node_tasks.lock().unwrap();
|
||||||
@@ -1308,6 +1329,7 @@ async fn api_chat_completions(
|
|||||||
|
|
||||||
// Merkitään solmu varatuksi ja task_id jaetuksi
|
// Merkitään solmu varatuksi ja task_id jaetuksi
|
||||||
state.node_busy.lock().unwrap().insert(target_node_id);
|
state.node_busy.lock().unwrap().insert(target_node_id);
|
||||||
|
state.node_active_task.lock().unwrap().insert(target_node_id, payload.task_id.clone());
|
||||||
state.pending_task_ids.lock().unwrap().insert(payload.task_id.clone());
|
state.pending_task_ids.lock().unwrap().insert(payload.task_id.clone());
|
||||||
|
|
||||||
let mut msg = serde_json::json!({
|
let mut msg = serde_json::json!({
|
||||||
@@ -1340,7 +1362,7 @@ async fn api_chat_completions(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let timeout = tokio::time::timeout(std::time::Duration::from_secs(600), resp_rx).await;
|
let timeout = tokio::time::timeout(std::time::Duration::from_secs(120), resp_rx).await;
|
||||||
|
|
||||||
match timeout {
|
match timeout {
|
||||||
Ok(Ok(v)) => {
|
Ok(Ok(v)) => {
|
||||||
@@ -1356,12 +1378,17 @@ async fn api_chat_completions(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Err(_)) => {
|
Ok(Err(_)) => {
|
||||||
// Oneshot-kanava sulkeutui (solmu katosi)
|
// Oneshot-kanava sulkeutui (solmu katosi kesken laskennan)
|
||||||
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
||||||
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response()
|
state.node_busy.lock().unwrap().remove(&target_node_id);
|
||||||
|
state.node_active_task.lock().unwrap().remove(&target_node_id);
|
||||||
|
(axum::http::StatusCode::SERVICE_UNAVAILABLE, "Solmu katosi kesken laskennan — yritä uudelleen").into_response()
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
// Timeout — solmu ei vastannut ajoissa
|
||||||
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
||||||
|
state.node_busy.lock().unwrap().remove(&target_node_id);
|
||||||
|
state.node_active_task.lock().unwrap().remove(&target_node_id);
|
||||||
(axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response()
|
(axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -634,9 +634,27 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Yhteys katkesi — nollataan TUI:n busy-tila
|
||||||
|
{
|
||||||
|
let mut st = tui_state.write().await;
|
||||||
|
let lost_task = st.cur_task_id.clone();
|
||||||
|
if let Some(tid) = lost_task {
|
||||||
|
st.push_log("Network", format!("Tehtävä {} keskeytyi yhteyden katketessa", tid), None);
|
||||||
|
}
|
||||||
|
st.cur_task_id = None;
|
||||||
|
st.cur_prompt = None;
|
||||||
|
st.node_id = None;
|
||||||
|
st.status = "RECONNECTING".to_string();
|
||||||
|
st.push_log("Network", "Yhteys hubiin katkesi — yhdistetään uudelleen 5s...".to_string(), None);
|
||||||
|
}
|
||||||
tracing::warn!("Yhteys hubiin katkesi — yritetään uudelleen 5s...");
|
tracing::warn!("Yhteys hubiin katkesi — yritetään uudelleen 5s...");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
{
|
||||||
|
let mut st = tui_state.write().await;
|
||||||
|
st.status = "RECONNECTING".to_string();
|
||||||
|
st.push_log("Network", format!("Yhdistäminen epäonnistui: {} — yritetään 5s...", e), None);
|
||||||
|
}
|
||||||
tracing::warn!("Hubiin yhdistäminen epäonnistui: {} — yritetään uudelleen 5s...", e);
|
tracing::warn!("Hubiin yhdistäminen epäonnistui: {} — yritetään uudelleen 5s...", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user