From 57c6506f91aee964ee208aaf54199de9be5cab09 Mon Sep 17 00:00:00 2001 From: Jaakko Vanhala Date: Sun, 5 Apr 2026 08:25:13 +0300 Subject: [PATCH] Bugikorjaukset ja tietoturvaparannukset: broadcast lag, busy-reititys, rate limiting, gamification-validointi, XSS, base64, DOM-vuoto Co-Authored-By: Claude Opus 4.6 (1M context) --- TODO.md | 34 ++++++++- network-poc/hub/Cargo.toml | 1 + network-poc/hub/src/main.rs | 136 +++++++++++++++++++++++----------- network-poc/static/index.html | 19 +++-- 4 files changed, 140 insertions(+), 50 deletions(-) diff --git a/TODO.md b/TODO.md index 43afc3a..6293074 100644 --- a/TODO.md +++ b/TODO.md @@ -6,4 +6,36 @@ - [x] **Tulosten varmentaminen:** Proof of Compute / Konsensus-mekanismi, jossa sama tehtävä annetaan kahdelle solmulle, ja tila hyväksytään vasta kun ristiintarkastus täsmää. - [x] **Optimaalinen laitekiihdytys:** Selainpuolen laajennus tulevaa WebNN-standardia (NPU API) varten WebGPU:n rinnalle. - [x] **Insentiivit:** Gamifikaatio, pistetaulukko tai token-talous (esim. Kipinä Tokens), joka motivoi käyttäjiä tarjoamaan laitteensa laskentatehoa verkoston käyttöön pidemmäksi aikaa. -- [ ] **Pelimerkkien UI-synkkaus:** Pelimerkkien saldon synkronointi reaaliajassa Hubista takaisin valikossa olevalle selainsolmulle ja luvun visuaalinen näyttäminen. \ No newline at end of file +- [x] **Pelimerkkien UI-synkkaus:** Pelimerkkien saldon synkronointi reaaliajassa Hubista takaisin valikossa olevalle selainsolmulle ja luvun visuaalinen näyttäminen. +- [x] **XSS-suojaus:** HTML-escape kaikelle backend-datalle joka renderöidään DOM:iin (prompt, response, tokenisaatiotekstit). +- [x] **System prompt -vuoto:** Agents-pipelinen system prompt ei enää näy käyttäjälle vastauksissa. +- [x] **Token-saldon data race:** Korjattu atomiseksi operaatioksi. +- [x] **UTF-8 slicing panic:** Korjattu kaikki `&text[..n]` → `text.chars().take(n)`. +- [x] **Tensor dim unwrap:** Lisätty virheenkäsittely tyhjälle tensorille natiivisolmussa. +- [x] **llm_error-viestien tuki:** Lisätty hubiin ja frontendiin, streaming-kortti siivoutuu virhetilanteessa. +- [x] **Malli-cache (selain):** QwenModel pidetään muistissa `thread_local! MODEL_CACHE`:ssa, `clear_kv_cache()` promptien välillä. +- [x] **Malli-cache (natiivi):** `LlmEngine` pitää mallin muistissa, `fresh_model()` poistettu. +- [x] **Sampling:** Greedy argmax korvattu temperature + top-k + repetition penalty -samplingillä (sekä selain että natiivi). +- [x] **Stop-sekvenssit:** Generointi katkaistaan kun malli alkaa tuottaa selityksiä. +- [x] **Codelab/Agents-reititys:** `llm_done` ja `llm_chunk` reitittyy `task_id`:n perusteella oikeaan näkymään. +- [x] **Broadcast Lag:** `RecvError::Lagged` käsitellään gracefully sekä sender-taskissa että API-endpointissa — solmu ei enää tipu verkosta. +- [x] **Busy-tila reititys:** Hub seuraa solmujen busy-tilaa (`node_busy`). Tehtäviä ei enää reititetä varatuille solmuille. +- [x] **Rate limiting:** `/api/v1/chat/completions` rajoittaa max 10 pyyntöä/minuutti per IP. +- [x] **Gamification-validointi:** Kipinä-merkkejä jaetaan vain tehtävistä joiden `task_id` on hubin jakama (`pending_task_ids`). +- [x] **Base64:** Oma base64-dekooderi korvattu `base64`-cratella. +- [x] **Atominen siivous:** Solmun disconnect-siivouksessa kaikki lukot otetaan kerralla. +- [x] **DOM-vuoto:** Terminaalin trim ei enää poista aktiivista streaming-riviä. + +## Havaitut Bugaavat Ominaisuudet ja Arkkitehtuuriongelmat + +### Keskitaso (eivät estä käyttöä) + +- [ ] **Origin-headerin validoinnin ohitus:** Natiivisolmut eivät lähetä Origin-headeria, joten tarkistus ohitetaan. Hyökkääjä voi esiintyä natiivisolmuna. Korjaus: vaadi autentikaatio natiivisolmuilta (API-avain tai token). +- [ ] **Kovakoodattu oletussalasana:** Admin-paneelin oletussalasana on `"kipina"` jos `ADMIN_PASSWORD`-ympäristömuuttujaa ei aseta. Tuotannossa pitää asettaa pakollisesti. Varoitus logitetaan. + +### Arkkitehtuuriparannukset (tulevaisuus) + +- [ ] **E2E-salaus:** Promptit ja vastaukset kulkevat selkokielisinä WebSocketin yli. Placeholder-kommentti koodissa, mutta ei toteutusta. +- [ ] **Proof of Work / konsensus:** Solmu voi lähettää väärennettyjä tuloksia. Merkitty TODO:ksi, mutta ei toteutusta. +- [ ] **WebGPU-inferenssi Candle-mallille:** Selainsolmu käyttää aina CPU:ta Candle-inferenssiin. Candle ei vielä tue WebGPU:ta. +- [ ] **Streaming yield -optimointi:** Pitkillä generoinneilla (>128 tok) selaimen event loop voi jäätyä hetkeksi koska generointilooppi ajetaan synkronisessa closuressa. Korjaus: pilko generointilooppi eriin ja yield joka N:s token. diff --git a/network-poc/hub/Cargo.toml b/network-poc/hub/Cargo.toml index 24715d2..86d6585 100644 --- a/network-poc/hub/Cargo.toml +++ b/network-poc/hub/Cargo.toml @@ -15,3 +15,4 @@ uuid = { version = "1.7.0", features = ["v4", "serde"] } futures = "0.3" rusqlite = { version = "0.31", features = ["bundled"] } chrono = "0.4" +base64 = "0.22" diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index d486cab..cf8c2ff 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -39,6 +39,9 @@ struct AppState { ip_connections: Mutex>, node_ips: Mutex>, node_tasks: Mutex>, // node_id → selected_task + node_busy: Mutex>, // Solmut joilla on aktiivinen tehtävä + pending_task_ids: Mutex>, // Hubin jakamat task_id:t (gamification-validointi) + api_rate_limits: Mutex>, // IP → (ikkuna-alku, pyyntömäärä) db: db::NodeDb, } @@ -257,6 +260,9 @@ async fn main() { ip_connections: Mutex::new(HashMap::new()), node_ips: Mutex::new(HashMap::new()), node_tasks: Mutex::new(HashMap::new()), + node_busy: Mutex::new(std::collections::HashSet::new()), + pending_task_ids: Mutex::new(std::collections::HashSet::new()), + api_rate_limits: Mutex::new(HashMap::new()), db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())), }); @@ -419,15 +425,23 @@ async fn api_stats( } fn check_admin_auth(headers: &axum::http::HeaderMap) -> bool { - let password = std::env::var("ADMIN_PASSWORD").unwrap_or_else(|_| "kipina".to_string()); + let password = match std::env::var("ADMIN_PASSWORD") { + Ok(p) if !p.is_empty() => p, + _ => { + tracing::warn!("ADMIN_PASSWORD ei ole asetettu — käytetään oletusta 'kipina' (ÄLÄ käytä tuotannossa!)"); + "kipina".to_string() + } + }; if let Some(auth) = headers.get("authorization").and_then(|v| v.to_str().ok()) { if auth.starts_with("Basic ") { - if let Ok(decoded) = String::from_utf8( - base64_decode(auth.trim_start_matches("Basic ").trim()) - ) { - // Tarkistetaan "user:password" — käyttäjänimi ei väliä - if let Some(pass) = decoded.split(':').nth(1) { - return pass == password; + use base64::Engine; + if let Ok(decoded_bytes) = base64::engine::general_purpose::STANDARD + .decode(auth.trim_start_matches("Basic ").trim()) + { + if let Ok(decoded) = String::from_utf8(decoded_bytes) { + if let Some(pass) = decoded.split(':').nth(1) { + return pass == password; + } } } } @@ -435,20 +449,6 @@ fn check_admin_auth(headers: &axum::http::HeaderMap) -> bool { false } -fn base64_decode(input: &str) -> Vec { - // Yksinkertainen base64-dekooderi - const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - let mut out = Vec::new(); - let bytes: Vec = input.bytes().filter(|&b| b != b'=').collect(); - for chunk in bytes.chunks(4) { - let vals: Vec = chunk.iter().filter_map(|&b| TABLE.iter().position(|&t| t == b).map(|p| p as u8)).collect(); - if vals.len() >= 2 { out.push((vals[0] << 2) | (vals[1] >> 4)); } - if vals.len() >= 3 { out.push((vals[1] << 4) | (vals[2] >> 2)); } - if vals.len() >= 4 { out.push((vals[2] << 6) | vals[3]); } - } - out -} - fn admin_unauthorized() -> axum::response::Response { axum::response::Response::builder() .status(401) @@ -606,12 +606,19 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { let sender_task = tokio::spawn(async move { loop { tokio::select! { - Ok(msg) = rx.recv() => { - if sender.send(Message::Text(msg)).await.is_err() { break; } + result = rx.recv() => { + match result { + Ok(msg) => { + if sender.send(Message::Text(msg)).await.is_err() { break; } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::debug!("Broadcast lagged {} viestiä — ohitetaan", n); + continue; + } + Err(_) => break, // Kanava suljettu + } } Some(direct_msg) = node_rx.recv() => { - // E2E Encrypt placeholder - tähän tulisi kyseisen the_node_id:n asymmetrisen avaimen salaus - // let encrypted_msg = encrypt_e2e(direct_msg, node_public_key); if sender.send(Message::Text(direct_msg)).await.is_err() { break; } } else => break, @@ -812,6 +819,13 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { let _ = state.stats_tx.send(json.to_string()); } } else if msg_type == "llm_done" { + // Vapautetaan solmu ja tarkistetaan task_id:n aitous + state.node_busy.lock().unwrap().remove(&node_id); + let valid_task = if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) { + state.pending_task_ids.lock().unwrap().remove(tid) + } else { + false + }; { let mut json = json; if let Some(obj) = json.as_object_mut() { @@ -841,7 +855,7 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { let mut task_count = state.total_tasks.lock().unwrap(); *task_count += 1; - if active_incentives { + if active_incentives && valid_task { let mut tokens = state.nodes_tokens.lock().unwrap(); let balance = tokens.entry(node_id).or_insert(0); *balance += 20; // Palkkio: 20 Kipinä-merkkiä @@ -862,6 +876,10 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { broadcast_stats(&state).await; } } else if msg_type == "llm_error" { + state.node_busy.lock().unwrap().remove(&node_id); + if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) { + state.pending_task_ids.lock().unwrap().remove(tid); + } { let mut json = json; if let Some(obj) = json.as_object_mut() { @@ -898,23 +916,23 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } } - // Yhteys katkesi — merkitään session päättyneeksi ja siivotaan + // Yhteys katkesi — merkitään session päättyneeksi ja siivotaan atomisesti state.db.close_session(node_id); - state.node_tasks.lock().unwrap().remove(&node_id); { + // Lukitaan kaikki kerralla, jotta solmu ei ole osittain siivottu + let mut tasks = state.node_tasks.lock().unwrap(); let mut conns = state.ip_connections.lock().unwrap(); + let mut ips = state.node_ips.lock().unwrap(); + let mut vram = state.nodes_vram.lock().unwrap(); + let mut busy = state.node_busy.lock().unwrap(); + tasks.remove(&node_id); + busy.remove(&node_id); if let Some(count) = conns.get_mut(&ip) { *count = count.saturating_sub(1); - if *count == 0 { - conns.remove(&ip); - } + if *count == 0 { conns.remove(&ip); } } - } - { - state.node_ips.lock().unwrap().remove(&node_id); - } - { - state.nodes_vram.lock().unwrap().remove(&node_id); + ips.remove(&node_id); + vram.remove(&node_id); } tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip); broadcast_stats(&state).await; @@ -936,28 +954,49 @@ struct ChatCompletionResponse { async fn api_chat_completions( axum::extract::State(state): axum::extract::State>, + ConnectInfo(addr): ConnectInfo, axum::Json(payload): axum::Json, ) -> axum::response::Response { - - // Etsitään ensimmäinen vapaa solmu, joka vastaa pyydettyä mallia + // Rate limiting: max 10 pyyntöä per IP per minuutti + { + let mut limits = state.api_rate_limits.lock().unwrap(); + let now = std::time::Instant::now(); + let entry = limits.entry(addr.ip()).or_insert((now, 0)); + if now.duration_since(entry.0).as_secs() >= 60 { + *entry = (now, 1); // Uusi ikkuna + } else { + entry.1 += 1; + if entry.1 > 10 { + return (axum::http::StatusCode::TOO_MANY_REQUESTS, "Liian monta pyyntöä — yritä minuutin kuluttua").into_response(); + } + } + } + + // Etsitään ensimmäinen VAPAA solmu, joka vastaa pyydettyä mallia let target_node = { let tasks = state.node_tasks.lock().unwrap(); - tasks.iter().find(|(_, task)| { - if payload.model == "qwen-coder" { + let busy = state.node_busy.lock().unwrap(); + tasks.iter().find(|(node_id, task)| { + let model_match = if payload.model == "qwen-coder" { *task == "qwen-coder-05b" || *task == "qwen-coder" } else { **task == payload.model - } + }; + model_match && !busy.contains(node_id) }).map(|(k, _)| *k) }; let target_node_id = match target_node { Some(id) => id, None => { - return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (Käynnistä malli selaimessa)").into_response(); + return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (kaikki varattuja tai ei käynnissä)").into_response(); } }; + // Merkitään solmu varatuksi ja task_id jaetuksi + state.node_busy.lock().unwrap().insert(target_node_id); + state.pending_task_ids.lock().unwrap().insert(payload.task_id.clone()); + let msg = serde_json::json!({ "type": "llm_prompt", "prompt": payload.prompt, @@ -980,7 +1019,15 @@ async fn api_chat_completions( } let timeout = tokio::time::timeout(std::time::Duration::from_secs(120), async move { - while let Ok(msg_str) = rx.recv().await { + loop { + let msg_str = match rx.recv().await { + Ok(msg) => msg, + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::debug!("API-kanava lagged {} viestiä", n); + continue; + } + Err(_) => return Ok(None), // Kanava suljettu + }; if let Ok(v) = serde_json::from_str::(&msg_str) { if v["type"].as_str() == Some("llm_done") { if let Some(tid) = v["task_id"].as_str() { @@ -1001,6 +1048,7 @@ async fn api_chat_completions( } } } + #[allow(unreachable_code)] Ok(None) }).await; diff --git a/network-poc/static/index.html b/network-poc/static/index.html index 2d441d3..98f571e 100644 --- a/network-poc/static/index.html +++ b/network-poc/static/index.html @@ -2066,7 +2066,7 @@ div.style.color = '#a5d6ff'; div.innerHTML = ` ✓ ${model} ${tokGen} tok | ${durMs}ms | ${tokS} tok/s`; term.appendChild(div); - while (term.children.length > 50) term.removeChild(term.firstChild); + while (term.children.length > 50 && !term.firstChild.querySelector('.stream-content')) term.removeChild(term.firstChild); term.scrollTop = term.scrollHeight; document.querySelectorAll('.avatar-card').forEach(c => c.classList.remove('active')); @@ -2192,7 +2192,7 @@ div.className = 'terminal-line'; div.innerHTML = `$ kpn run ${model} "${promptShort}"`; term.appendChild(div); - while (term.children.length > 50) term.removeChild(term.firstChild); + while (term.children.length > 50 && !term.firstChild.querySelector('.stream-content')) term.removeChild(term.firstChild); term.scrollTop = term.scrollHeight; } } @@ -2453,10 +2453,17 @@ if (msg.includes('[Coder]') && msg.includes('model') && msg.includes('tallennettu')) { setStep('step-model', 'done', '100%'); } if (msg.includes('[Coder]') && msg.includes('Rakennetaan')) { setStep('step-build', 'active'); } if (msg.includes('[Coder]') && msg.includes('Malli ladattu')) { - // Malli on valmis — merkataan kaikki vaiheet valmiiksi (myös cache-hitillä) + // Malli on valmis — merkataan kaikki vaiheet valmiiksi setStep('step-wasm', 'done'); setStep('step-tokenizer', 'done'); - setStep('step-model', 'done', 'cache'); + + const pctSpan = document.getElementById('step-model-pct'); + if (pctSpan && pctSpan.textContent.includes('100%')) { + setStep('step-model', 'done', '100%'); + } else { + setStep('step-model', 'done', 'cache'); + } + setStep('step-build', 'done'); setStep('step-ready', 'done'); } @@ -2519,7 +2526,9 @@ coderWsReady = true; if (pendingCodePrompt) { - sendCodeToHub(pendingCodePrompt); + setTimeout(() => { + sendCodeToHub(pendingCodePrompt); + }, 800); pendingCodePrompt = null; } } catch(e) {