diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index a03d54d..4fe9573 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -42,6 +42,7 @@ struct AppState { node_types: Mutex>, // node_id → "native" | "browser" node_busy: Mutex>, // Solmut joilla on aktiivinen tehtävä pending_task_ids: Mutex>, // Hubin jakamat task_id:t (gamification-validointi) + pending_responses: Mutex>>, // task_id → oneshot API-vastaukselle api_rate_limits: Mutex>, // IP → (ikkuna-alku, pyyntömäärä) db: db::NodeDb, } @@ -264,6 +265,7 @@ async fn main() { node_types: Mutex::new(HashMap::new()), node_busy: Mutex::new(std::collections::HashSet::new()), pending_task_ids: Mutex::new(std::collections::HashSet::new()), + pending_responses: Mutex::new(HashMap::new()), api_rate_limits: Mutex::new(HashMap::new()), db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())), }); @@ -875,11 +877,18 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } else if msg_type == "llm_done" { // Vapautetaan solmu ja tarkistetaan task_id:n aitous state.node_busy.lock().unwrap().remove(&node_id); - let valid_task = if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) { - state.pending_task_ids.lock().unwrap().remove(tid) + let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string()); + let valid_task = if let Some(ref tid) = task_id { + state.pending_task_ids.lock().unwrap().remove(tid.as_str()) } else { false }; + + // Jos API-pyyntö odottaa tätä vastausta, reititetään suoraan oneshot-kanavaan + let api_sender = task_id.as_ref().and_then(|tid| { + state.pending_responses.lock().unwrap().remove(tid) + }); + { let mut json = json; if let Some(obj) = json.as_object_mut() { @@ -899,6 +908,12 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { state.db.increment_tasks(node_id); obj.insert("node_id".to_string(), serde_json::json!(node_id)); } + + if let Some(sender) = api_sender { + // API-pyyntö: reititetään vastaus suoraan odottajalle + let _ = sender.send(json.clone()); + } + // UI-broadcast jatkuu normaalisti let _ = state.stats_tx.send(json.to_string()); let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false); @@ -908,7 +923,7 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { { let mut task_count = state.total_tasks.lock().unwrap(); *task_count += 1; - + if active_incentives && valid_task { let mut tokens = state.nodes_tokens.lock().unwrap(); let balance = tokens.entry(node_id).or_insert(0); @@ -916,7 +931,7 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { current_balance = *balance; } } - + if active_incentives && ui_sync { if let Some(tx) = state.node_channels.read().await.get(&node_id) { let msg = serde_json::json!({ @@ -926,45 +941,50 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { let _ = tx.send(msg.to_string()); } } - + broadcast_stats(&state).await; } } else if msg_type == "llm_error" { state.node_busy.lock().unwrap().remove(&node_id); - if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) { - state.pending_task_ids.lock().unwrap().remove(tid); + let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string()); + if let Some(ref tid) = task_id { + state.pending_task_ids.lock().unwrap().remove(tid.as_str()); } + // Jos API-pyyntö odottaa, reititetään virhe oneshot-kanavaan + let api_sender = task_id.as_ref().and_then(|tid| { + state.pending_responses.lock().unwrap().remove(tid) + }); { let mut json = json; if let Some(obj) = json.as_object_mut() { obj.insert("node_id".to_string(), serde_json::json!(node_id)); } + if let Some(sender) = api_sender { + let _ = sender.send(json.clone()); + } let _ = state.stats_tx.send(json.to_string()); } } else if msg_type == "user_text" { - // Käyttäjän lähettämä teksti — broadcastataan pair_taskina ja llm_promptina + // Käyttäjän lähettämä teksti — kohdennettu reititys lähettäjäsolmulle let text = json.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string(); let task_type = json.get("task_type").and_then(|v| v.as_str()).unwrap_or("tokenize"); if !text.is_empty() { let preview: String = text.chars().take(80).collect(); tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, preview); - match task_type { - "tokenize" => { - let msg = serde_json::json!({ - "type": "single_tokenize", - "text": text, - }); - let _ = state.stats_tx.send(msg.to_string()); - } - _ => { - // LLM-prompti: lähetetään VAIN valitulle mallille, ei kaikille (välttää turhaa ruuhkaa ja busy-tiloja) - let prompt = serde_json::json!({ - "type": "llm_prompt", - "prompt": text, - "model": task_type, - }); - let _ = state.stats_tx.send(prompt.to_string()); - } + let msg = match task_type { + "tokenize" => serde_json::json!({ + "type": "single_tokenize", + "text": text, + }), + _ => serde_json::json!({ + "type": "llm_prompt", + "prompt": text, + "model": task_type, + }), + }; + // Lähetetään takaisin lähettäjäsolmulle (käyttäjä haluaa oman tekstinsä tuloksen) + if let Some(tx) = state.node_channels.read().await.get(&node_id) { + let _ = tx.send(msg.to_string()); } } } @@ -1161,8 +1181,9 @@ async fn api_chat_completions( msg.as_object_mut().unwrap().insert("max_tokens".to_string(), serde_json::json!(mt)); } - // Odotuskanava valmiiksi (solmu palauttaa tuloksen stats_tx kautta) - let mut rx = state.stats_tx.subscribe(); + // Oneshot-kanava: solmu palauttaa tuloksen suoraan tälle pyynnölle + let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::(); + state.pending_responses.lock().unwrap().insert(payload.task_id.clone(), resp_tx); // Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle { @@ -1171,48 +1192,34 @@ async fn api_chat_completions( let _ = tx.send(msg.to_string()); tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model); } else { + state.pending_responses.lock().unwrap().remove(&payload.task_id); return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi reitityksen aikana").into_response(); } } - let timeout = tokio::time::timeout(std::time::Duration::from_secs(600), async move { - loop { - let msg_str = match rx.recv().await { - Ok(msg) => msg, - Err(broadcast::error::RecvError::Lagged(n)) => { - tracing::debug!("API-kanava lagged {} viestiä", n); - continue; - } - Err(_) => return Ok(None), // Kanava suljettu - }; - if let Ok(v) = serde_json::from_str::(&msg_str) { - if v["type"].as_str() == Some("llm_done") { - if let Some(tid) = v["task_id"].as_str() { - if tid == payload.task_id { - return Ok(Some(ChatCompletionResponse { - response: v["response"].as_str().unwrap_or("").to_string(), - model: v["model"].as_str().unwrap_or("").to_string(), - tokens_generated: v["tokens_generated"].as_u64().unwrap_or(0), - })); - } - } - } else if v["type"].as_str() == Some("llm_error") { - if let Some(tid) = v["task_id"].as_str() { - if tid == payload.task_id { - return Err(v["error"].as_str().unwrap_or("Määrittelemätön virhe solmussa").to_string()); - } - } - } - } - } - #[allow(unreachable_code)] - Ok(None) - }).await; + let timeout = tokio::time::timeout(std::time::Duration::from_secs(600), resp_rx).await; match timeout { - Ok(Ok(Some(res))) => axum::Json(res).into_response(), - Ok(Ok(None)) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response(), - Ok(Err(err)) => (axum::http::StatusCode::CONFLICT, err).into_response(), - Err(_) => (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response(), + Ok(Ok(v)) => { + if v["type"].as_str() == Some("llm_error") { + let err = v["error"].as_str().unwrap_or("Määrittelemätön virhe solmussa").to_string(); + (axum::http::StatusCode::CONFLICT, err).into_response() + } else { + axum::Json(ChatCompletionResponse { + response: v["response"].as_str().unwrap_or("").to_string(), + model: v["model"].as_str().unwrap_or("").to_string(), + tokens_generated: v["tokens_generated"].as_u64().unwrap_or(0), + }).into_response() + } + } + Ok(Err(_)) => { + // Oneshot-kanava sulkeutui (solmu katosi) + state.pending_responses.lock().unwrap().remove(&payload.task_id); + (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response() + } + Err(_) => { + state.pending_responses.lock().unwrap().remove(&payload.task_id); + (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response() + } } }