diff --git a/network-poc/hub/src/db.rs b/network-poc/hub/src/db.rs index 1de9ed6..e3f1b77 100644 --- a/network-poc/hub/src/db.rs +++ b/network-poc/hub/src/db.rs @@ -152,6 +152,24 @@ impl NodeDb { conn.last_insert_rowid() } + pub fn update_session_task(&self, node_id: u64, task: &str) { + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); + let _ = conn.execute( + "UPDATE node_sessions SET selected_task = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL", + params![task, node_id as i64], + ); + } + + /// Sulkee saman IP:n viewer-sessiot kun aktiivinen node liittyy + pub fn close_viewers_by_ip(&self, ip: &str) { + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); + let now = chrono::Utc::now().to_rfc3339(); + let _ = conn.execute( + "UPDATE node_sessions SET disconnected_at = ?1 WHERE ip = ?2 AND disconnected_at IS NULL AND (selected_task = 'viewer' OR selected_task = 'codelab-viewer')", + params![now, ip], + ); + } + pub fn close_session(&self, node_id: u64) { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index f20fa21..8aef7a9 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -25,7 +25,7 @@ const ALLOWED_ORIGINS: &[&str] = &[ ]; // Sallitut viestityyypit clientilta -const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "download_progress", "user_text"]; +const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "download_progress", "user_text", "single_tokenize_done"]; struct AppState { next_node_id: Mutex, @@ -158,7 +158,7 @@ async function load() { ].map(s => `
${s.v}
${s.l}
`).join(''); // Sessions — lajittelu: 1) aktiiviset nodet (online + ei viewer), 2) katsojat (online + viewer), 3) offline - const taskNames = {'tokenize':'Tokenisaatio','smollm-135m':'SmolLM 135M','qwen-05b':'Qwen2.5 0.5B','phi3-mini':'Phi-3 Mini','qwen-coder-05b':'Coder 0.5B','qwen-coder-3b':'Coder 3B','viewer':'Katsoja'}; + const taskNames = {'tokenize':'Tokenisaatio','smollm-135m':'SmolLM 135M','qwen-05b':'Qwen2.5 0.5B','phi3-mini':'Phi-3 Mini','qwen-coder-05b':'Coder 0.5B','qwen-coder-3b':'Coder 3B','viewer':'Katsoja','codelab-viewer':'Koodilabra'}; sessions.sort((a, b) => { const aOnline = !a.disconnected_at; const bOnline = !b.disconnected_at; @@ -321,28 +321,9 @@ async fn main() { }); let _ = state_for_task.stats_tx.send(phi3_msg.to_string()); - // Coder-promptit — pieniä Python-tehtäviä - let code_prompts = vec![ - "Write a Python function that checks if a number is prime.", - "Write a Python function that reverses a string without using slicing.", - "Write a Python function to find the factorial of a number using recursion.", - "Write a Python function that returns the Fibonacci sequence up to n numbers.", - "Write a Python function to check if a string is a palindrome.", - "Write a Python function that sorts a list using bubble sort.", - "Write a Python function to count the occurrences of each character in a string.", - "Write a Python function that flattens a nested list.", - "Write a Python function to find the greatest common divisor of two numbers.", - "Write a Python function that converts Celsius to Fahrenheit.", - ]; - let code_idx = (rng_state as usize / 13) % code_prompts.len(); - let coder_msg = serde_json::json!({ - "type": "llm_prompt", - "prompt": code_prompts[code_idx], - "model": "qwen-coder", - }); - let _ = state_for_task.stats_tx.send(coder_msg.to_string()); + // Coder ei saa automaattisia tehtäviä — vain käyttäjän user_text - tracing::debug!("Tehtävät lähetetty: pair + smollm + qwen + phi3 + coder"); + tracing::debug!("Tehtävät lähetetty: pair + smollm + qwen + phi3"); } }); @@ -617,11 +598,21 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { map.insert(node_id, allocated); } - // Tallennetaan sessiotieto tietokantaan - state.db.insert_session(node_id, &ip.to_string(), node_type, &json); - - // Tallennetaan valittu tehtävä muistiin reititystä varten let selected_task = json.get("selected_task").and_then(|v| v.as_str()).unwrap_or("tokenize").to_string(); + let is_viewer = selected_task == "viewer" || selected_task == "codelab-viewer"; + let existing = state.node_tasks.lock().unwrap().contains_key(&node_id); + + if existing { + // Sama yhteys, eri tehtävä → päivitetään + state.db.update_session_task(node_id, &selected_task); + tracing::info!("Solmu {} päivitti tehtävän → {}", node_id, selected_task); + } else { + // Uusi yhteys — suljetaan saman IP:n viewer-sessiot jos tämä on aktiivinen node + if !is_viewer { + state.db.close_viewers_by_ip(&ip.to_string()); + } + state.db.insert_session(node_id, &ip.to_string(), node_type, &json); + } state.node_tasks.lock().unwrap().insert(node_id, selected_task); if node_type == "native" { @@ -726,6 +717,14 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } broadcast_stats(&state).await; } + } else if msg_type == "single_tokenize_done" { + { + let mut json = json.clone(); + if let Some(obj) = json.as_object_mut() { + obj.insert("node_id".to_string(), serde_json::json!(node_id)); + } + let _ = state.stats_tx.send(json.to_string()); + } } else if msg_type == "llm_chunk" { { let mut json = json; @@ -770,14 +769,11 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, &text[..text.len().min(80)]); match task_type { "tokenize" => { - // Tokenisoidaan käyttäjän teksti EN-puolella, FI jätetään tyhjäksi - let pair = serde_json::json!({ - "type": "pair_task", - "en": text, - "fi": text, - "user_submitted": true, + let msg = serde_json::json!({ + "type": "single_tokenize", + "text": text, }); - let _ = state.stats_tx.send(pair.to_string()); + let _ = state.stats_tx.send(msg.to_string()); } _ => { // LLM-prompti diff --git a/network-poc/node/Cargo.toml b/network-poc/node/Cargo.toml index 26cd341..e7201ae 100644 --- a/network-poc/node/Cargo.toml +++ b/network-poc/node/Cargo.toml @@ -26,9 +26,9 @@ web-sys = { version = "0.3.68", features = [ ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -burn = { version = "0.21.0-pre.2", default-features = false, features = ["wgpu", "ndarray"] } -burn-wgpu = "0.21.0-pre.2" -burn-ndarray = "0.21.0-pre.2" +burn = { version = "0.14.0", features = ["wgpu", "ndarray"] } +burn-wgpu = "0.14.0" +burn-ndarray = "0.14.0" wasm-bindgen-futures = "0.4" console_error_panic_hook = "0.1.7" reqwest = { version = "0.12", default-features = false, features = ["json"] } diff --git a/network-poc/node/src/lib.rs b/network-poc/node/src/lib.rs index 7df3a50..542f180 100644 --- a/network-poc/node/src/lib.rs +++ b/network-poc/node/src/lib.rs @@ -22,8 +22,15 @@ macro_rules! console_log { static GPU_LOAD_PERCENT: AtomicU32 = AtomicU32::new(50); static HAS_WEBGPU: AtomicBool = AtomicBool::new(true); static SELECTED_TASK: AtomicU32 = AtomicU32::new(0); -// Estää rinnakkaiset LLM-inferenssit (vain yksi kerrallaan) static LLM_BUSY: AtomicBool = AtomicBool::new(false); +// Käsitelläänkö hubin automaattisia tehtäviä +static AUTO_TASKS: AtomicBool = AtomicBool::new(true); + +#[wasm_bindgen] +pub fn set_auto_tasks(enabled: bool) { + AUTO_TASKS.store(enabled, Ordering::SeqCst); + console_log!("[Wasm] Automaattiset tehtävät: {}", if enabled { "päällä" } else { "pois" }); +} #[wasm_bindgen] pub fn set_gpu_load(load: u32) { @@ -110,6 +117,30 @@ fn tokenize_text(tokenizer: &tokenizers::Tokenizer, text: &str) -> serde_json::V } } +/// Tokenisoi yksittäisen tekstin ja lähettää tuloksen hubille +async fn run_single_tokenize(text: String, ws: Rc>) { + let cached_tok = storage::load_from_idb("tokenizer.json").await.unwrap_or(None); + let Some(bytes) = cached_tok else { return; }; + let Ok(tokenizer) = tokenizers::Tokenizer::from_bytes(&bytes) else { return; }; + + let perf = web_sys::window().unwrap().performance().unwrap(); + let start = perf.now(); + let result = tokenize_text(&tokenizer, &text); + let duration_ms = perf.now() - start; + + let token_count = result["token_count"].as_u64().unwrap_or(0); + let cpt = result["chars_per_token"].as_f64().unwrap_or(0.0); + console_log!("Tokenisaatio: \"{}\" → {} tokenia | {:.2} m/t | {:.2}ms", + &text[..text.len().min(50)], token_count, cpt, duration_ms); + + let msg = serde_json::json!({ + "type": "single_tokenize_done", + "result": result, + "duration_ms": (duration_ms * 100.0).round() / 100.0, + }); + let _ = ws.borrow().send_with_str(&msg.to_string()); +} + /// Tokenisoi en/fi-parin, vertaa tehokkuutta ja lähettää tuloksen hubille async fn run_pair_comparison(en_text: String, fi_text: String, ws: Rc>) { let load_pct = GPU_LOAD_PERCENT.load(Ordering::SeqCst); @@ -194,8 +225,9 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso let msg: String = txt.into(); let current_task = SELECTED_TASK.load(Ordering::SeqCst); + let auto_on = AUTO_TASKS.load(Ordering::SeqCst); - if msg.contains("pair_task") && current_task == 0 { + if msg.contains("pair_task") && current_task == 0 && auto_on { // Vain tokenisaatiosolmut käsittelevät pair_task-viestejä if let Ok(task) = serde_json::from_str::(&msg) { let en = task.get("en").and_then(|v| v.as_str()).unwrap_or("").to_string(); @@ -207,7 +239,17 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso }); } } - } else if msg.contains("llm_prompt") && current_task == 1 { + } else if msg.contains("single_tokenize") && current_task == 0 { + if let Ok(task) = serde_json::from_str::(&msg) { + let text = task.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if !text.is_empty() { + let ws_for_async = ws_clone.clone(); + wasm_bindgen_futures::spawn_local(async move { + run_single_tokenize(text, ws_for_async).await; + }); + } + } + } else if msg.contains("llm_prompt") && current_task == 1 && auto_on { // Vain SmolLM-solmut, ja vain yksi inferenssi kerrallaan if LLM_BUSY.load(Ordering::SeqCst) { // Ohitetaan — edellinen inferenssi vielä käynnissä @@ -223,7 +265,7 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso }); } } - } else if msg.contains("llm_prompt") && current_task == 2 { + } else if msg.contains("llm_prompt") && current_task == 2 && auto_on { // Qwen2.5-0.5B if LLM_BUSY.load(Ordering::SeqCst) { } else if let Ok(task) = serde_json::from_str::(&msg) { @@ -237,7 +279,7 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso }); } } - } else if msg.contains("llm_prompt") && current_task == 3 { + } else if msg.contains("llm_prompt") && current_task == 3 && auto_on { // Phi-3 Mini if LLM_BUSY.load(Ordering::SeqCst) { } else if let Ok(task) = serde_json::from_str::(&msg) { diff --git a/network-poc/node/src/qwen.rs b/network-poc/node/src/qwen.rs index 42be9ec..bf33a44 100644 --- a/network-poc/node/src/qwen.rs +++ b/network-poc/node/src/qwen.rs @@ -199,6 +199,7 @@ pub async fn run_qwen_inference(prompt: String, ws: Rc>) { let _ = ws.borrow().send_with_str(&chunk.to_string()); } tokens_generated += 1; + crate::sleep_ms(0).await; } let gen_time = perf.now() - start_gen; diff --git a/network-poc/node/src/qwen_coder.rs b/network-poc/node/src/qwen_coder.rs index d22d40c..5f6c0f5 100644 --- a/network-poc/node/src/qwen_coder.rs +++ b/network-poc/node/src/qwen_coder.rs @@ -262,6 +262,9 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use let _ = ws.borrow().send_with_str(&chunk.to_string()); } tokens_generated += 1; + + // Yield — vapautetaan selaimen event loop joka tokenin jälkeen + crate::sleep_ms(0).await; } let gen_time = perf.now() - start_gen; diff --git a/network-poc/node/src/smollm.rs b/network-poc/node/src/smollm.rs index feb80a2..ada64a2 100644 --- a/network-poc/node/src/smollm.rs +++ b/network-poc/node/src/smollm.rs @@ -118,14 +118,11 @@ pub async fn run_smollm_inference(prompt: String, ws: Rc>) { Err(e) => { console_log!("[SmolLM] Malli-virhe: {}", e); return; } }; - let use_gpu = crate::HAS_WEBGPU.load(std::sync::atomic::Ordering::SeqCst); - if use_gpu { - console_log!("[SmolLM] Burn WebGPU inferenssi..."); - run_burn_inference::(prompt, model_bytes, tokenizer, ws, perf.clone()).await; - } else { - console_log!("[SmolLM] Burn NdArray (CPU) inferenssi..."); - run_burn_inference::(prompt, model_bytes, tokenizer, ws, perf.clone()).await; - } + // Burn 0.14 wgpu ei yhteensopiva nykyisten selainten kanssa (maxInterStageShaderComponents) + // Burn 0.21-pre.2 cubecl-runtime ei käänny Wasmille (println! puuttuu) + // → NdArray kunnes Burn 0.21 stable + Wasm-tuki + console_log!("[SmolLM] Burn NdArray (CPU) inferenssi..."); + run_burn_inference::(prompt, model_bytes, tokenizer, ws, perf.clone()).await; } async fn run_burn_inference( diff --git a/network-poc/static/index.html b/network-poc/static/index.html index 99f97f0..bc080a7 100644 --- a/network-poc/static/index.html +++ b/network-poc/static/index.html @@ -119,6 +119,8 @@ .main-panel { display: none; } .main-panel.active { display: block; } + @keyframes spin { to { transform: rotate(360deg); } } + .code-output { font-family: 'Courier New', Courier, monospace; background: #010409; @@ -500,6 +502,15 @@ + +
+ + (10s välein) +
+
@@ -655,7 +666,7 @@