diff --git a/network-poc/Dockerfile.prod b/network-poc/Dockerfile.prod index 189c47c..f3e4bbe 100644 --- a/network-poc/Dockerfile.prod +++ b/network-poc/Dockerfile.prod @@ -1,3 +1,4 @@ +# syntax=docker/dockerfile:1 FROM rust:slim AS builder RUN apt-get update && apt-get install -y \ @@ -8,40 +9,34 @@ RUN curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh WORKDIR /app -# 1. Kopioi vain Cargo-tiedostot → riippuvuudet cacheen +# Kopioi kaikki Cargo-tiedostot COPY Cargo.toml ./ COPY Cargo.lock* ./ COPY hub/Cargo.toml hub/Cargo.toml COPY node/Cargo.toml node/Cargo.toml COPY native-node/Cargo.toml native-node/Cargo.toml -# Tyhjät lähteet riippuvuuksien esikääntämistä varten -RUN mkdir -p hub/src node/src native-node/src \ - && echo "fn main(){}" > hub/src/main.rs \ - && echo "" > node/src/lib.rs \ - && mkdir -p node/src && touch node/src/storage.rs \ - && echo "fn main(){}" > native-node/src/main.rs \ - && cargo build --release -p hub 2>/dev/null || true \ - && wasm-pack build node --target web --out-dir ../static/pkg 2>/dev/null || true - -# 2. Kopioi oikea lähdekoodi → vain src käännetään uudelleen +# Kopioi lähdekoodi COPY hub/src hub/src COPY node/src node/src +COPY native-node/src native-node/src COPY static static -# Pakota uudelleenkäännös -RUN touch hub/src/main.rs node/src/lib.rs +# Rakenna Wasm — cache mount pitää Cargo-rekisterin ja target-kansion buildien välillä +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + --mount=type=cache,target=/app/target \ + cd node && wasm-pack build --target web --out-dir ../static/pkg -# Rakenna Wasm-paketti -RUN cd node && wasm-pack build --target web --out-dir ../static/pkg - -# Rakenna Hub release-binääri -RUN cargo build --release -p hub +# Rakenna Hub +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + --mount=type=cache,target=/app/target \ + cargo build --release -p hub \ + && cp /app/target/release/hub /usr/local/bin/hub FROM debian:bookworm-slim RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* -COPY --from=builder /app/target/release/hub /usr/local/bin/hub +COPY --from=builder /usr/local/bin/hub /usr/local/bin/hub COPY --from=builder /app/static /app/static WORKDIR /app diff --git a/network-poc/deploy.sh b/network-poc/deploy.sh index 63900fb..0f6216b 100755 --- a/network-poc/deploy.sh +++ b/network-poc/deploy.sh @@ -3,7 +3,14 @@ set -e SERVER="ubuntu@86.50.252.98" REMOTE_DIR="~/code/agentic-studio/network-poc" -SSH_OPTS="-o StrictHostKeyChecking=no" +KEY="$HOME/.ssh/id_rsa" +SSH_OPTS="-o StrictHostKeyChecking=no -i $KEY" + +# Varmistetaan, että SSH-avain on agentissa +if ! ssh-add -l 2>/dev/null | grep -q id_rsa; then + echo "SSH-avain ei ole agentissa. Lisätään..." + ssh-add "$KEY" +fi echo "=== Kipinä Studio Deploy ===" @@ -11,16 +18,21 @@ echo "=== Kipinä Studio Deploy ===" echo "[1/4] Rakennetaan image lokaalisti..." docker build -f Dockerfile.prod -t kipina-agentic:latest . -# 2. Tallennetaan ja siirretään -echo "[2/4] Siirretään image palvelimelle..." -docker save kipina-agentic:latest | gzip | ssh $SSH_OPTS $SERVER "gunzip | docker load" +# 2. Tallennetaan tiedostoon +echo "[2/5] Pakataan image..." +docker save kipina-agentic:latest | gzip > /tmp/kipina-agentic.tar.gz +echo " Koko: $(du -h /tmp/kipina-agentic.tar.gz | cut -f1)" -# 3. Päivitetään konfiguraatiot -echo "[3/4] Päivitetään konfiguraatiot..." +# 3. Siirretään palvelimelle +echo "[3/5] Siirretään palvelimelle..." +scp $SSH_OPTS /tmp/kipina-agentic.tar.gz $SERVER:/tmp/ scp $SSH_OPTS docker-compose.prod.yml Caddyfile.prod $SERVER:$REMOTE_DIR/ -# 4. Käynnistetään uudelleen -echo "[4/4] Käynnistetään palvelut..." -ssh $SSH_OPTS $SERVER "cd $REMOTE_DIR && docker compose -f docker-compose.prod.yml up -d" +# 4. Ladataan image ja käynnistetään +echo "[4/5] Ladataan image palvelimella..." +ssh $SSH_OPTS $SERVER "gunzip -c /tmp/kipina-agentic.tar.gz | docker load && rm /tmp/kipina-agentic.tar.gz" + +echo "[5/5] Käynnistetään palvelut uudelleen..." +ssh $SSH_OPTS $SERVER "cd $REMOTE_DIR && docker compose -f docker-compose.prod.yml down && docker compose -f docker-compose.prod.yml up -d" echo "=== Valmis! https://kipina.studio ===" diff --git a/network-poc/docker-compose.yml b/network-poc/docker-compose.yml index 46d4c4b..f85ae0b 100644 --- a/network-poc/docker-compose.yml +++ b/network-poc/docker-compose.yml @@ -9,7 +9,7 @@ services: volumes: - .:/app # Käännetään aina käynnistyksen yhteydessä varmuuden vuoksi Wasm uusimmista koodeista, ja päälle pyöräytetään Hub! - command: bash -c "cd node && wasm-pack build --target web --out-dir ../static/pkg && cd ../hub && cargo run" + command: bash -c "cd node && wasm-pack build --dev --target web --out-dir ../static/pkg && cd ../hub && cargo run" # Valinnainen natiivi-solmu — kerää oikeat laitteistotiedot (nvidia-smi-taso) native-node: diff --git a/network-poc/hub/Cargo.toml b/network-poc/hub/Cargo.toml index 5820065..a2943f8 100644 --- a/network-poc/hub/Cargo.toml +++ b/network-poc/hub/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "hub" version = "0.2.0" -edition = "2021" +edition = "2024" [dependencies] axum = { version = "0.7.4", features = ["ws", "macros"] } diff --git a/network-poc/hub/nodes.db b/network-poc/hub/nodes.db new file mode 100644 index 0000000..1314271 Binary files /dev/null and b/network-poc/hub/nodes.db differ diff --git a/network-poc/hub/src/db.rs b/network-poc/hub/src/db.rs index 3af9538..1de9ed6 100644 --- a/network-poc/hub/src/db.rs +++ b/network-poc/hub/src/db.rs @@ -9,6 +9,24 @@ impl NodeDb { pub fn new(path: &str) -> Self { let conn = Connection::open(path).expect("SQLite-tietokantaa ei voitu avata"); + // Poista vanha tietokanta jos skeema on rikki — PoC, ei tuotantodata + let _ = conn.execute_batch(" + CREATE TABLE IF NOT EXISTS _schema_version (version INTEGER); + "); + let version: i64 = conn.query_row( + "SELECT COALESCE(MAX(version), 0) FROM _schema_version", [], |r| r.get(0) + ).unwrap_or(0); + + if version < 2 { + // Pudotetaan vanhat taulut ja luodaan uudet + let _ = conn.execute_batch(" + DROP TABLE IF EXISTS node_sessions; + DROP TABLE IF EXISTS pair_results; + DELETE FROM _schema_version; + INSERT INTO _schema_version VALUES (2); + "); + } + conn.execute_batch(" CREATE TABLE IF NOT EXISTS node_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -35,8 +53,9 @@ impl NodeDb { gpu_temp_c INTEGER, gpu_util_pct INTEGER, - -- Varaus + -- Varaus ja tehtävä allocated_gb INTEGER, + selected_task TEXT DEFAULT 'tokenize', -- WebGPU-tuki has_webgpu BOOLEAN, @@ -70,7 +89,7 @@ impl NodeDb { node_type: &str, auth_data: &serde_json::Value, ) -> i64 { - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); // Selainsolmun tiedot @@ -78,6 +97,7 @@ impl NodeDb { let cpu_cores = auth_data.get("cpu_cores").and_then(|v| v.as_u64()); let ram = auth_data.get("device_memory_gb").and_then(|v| v.as_f64()).map(|v| (v * 1024.0) as i64); let allocated = auth_data.get("allocated_gb").and_then(|v| v.as_u64()); + let selected_task = auth_data.get("selected_task").and_then(|v| v.as_str()); // GPU (selain) let gpu_vendor = auth_data.get("gpu").and_then(|g| g.get("vendor")).and_then(|v| v.as_str()); @@ -108,8 +128,8 @@ impl NodeDb { node_id, ip, node_type, connected_at, platform, hostname, os, cpu_cores, cpu_model, ram_mb, gpu_name, gpu_vendor, gpu_backend, vram_total_mb, vram_used_mb, gpu_temp_c, gpu_util_pct, - allocated_gb, has_webgpu - ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)", + allocated_gb, selected_task, has_webgpu + ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20)", params![ node_id as i64, ip, node_type, now, platform, hostname, os, @@ -124,6 +144,7 @@ impl NodeDb { gpu_temp.map(|v| v as i64), gpu_util.map(|v| v as i64), allocated.map(|v| v as i64), + selected_task, has_webgpu, ], ).expect("Session insert epäonnistui"); @@ -132,7 +153,7 @@ impl NodeDb { } pub fn close_session(&self, node_id: u64) { - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); let _ = conn.execute( "UPDATE node_sessions SET disconnected_at = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL", @@ -141,7 +162,7 @@ impl NodeDb { } pub fn increment_tasks(&self, node_id: u64) { - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let _ = conn.execute( "UPDATE node_sessions SET tasks_completed = tasks_completed + 1 WHERE node_id = ?1 AND disconnected_at IS NULL", params![node_id as i64], @@ -149,12 +170,12 @@ impl NodeDb { } pub fn get_sessions(&self, limit: u32) -> Vec { - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let mut stmt = conn.prepare( "SELECT id, node_id, ip, node_type, connected_at, disconnected_at, platform, hostname, os, cpu_cores, cpu_model, ram_mb, gpu_name, gpu_vendor, gpu_backend, vram_total_mb, gpu_temp_c, gpu_util_pct, - allocated_gb, has_webgpu, tasks_completed + allocated_gb, selected_task, has_webgpu, tasks_completed FROM node_sessions ORDER BY id DESC LIMIT ?1" ).unwrap(); @@ -179,14 +200,15 @@ impl NodeDb { "gpu_temp_c": row.get::<_, Option>(16)?, "gpu_util_pct": row.get::<_, Option>(17)?, "allocated_gb": row.get::<_, Option>(18)?, - "has_webgpu": row.get::<_, Option>(19)?, - "tasks_completed": row.get::<_, i64>(20)?, + "selected_task": row.get::<_, Option>(19)?, + "has_webgpu": row.get::<_, Option>(20)?, + "tasks_completed": row.get::<_, i64>(21)?, })) }).unwrap().filter_map(|r| r.ok()).collect() } pub fn get_pair_results(&self, limit: u32) -> Vec { - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let mut stmt = conn.prepare( "SELECT id, node_id, created_at, en_text, fi_text, en_tokens, fi_tokens, en_chars_per_token, fi_chars_per_token, @@ -212,7 +234,7 @@ impl NodeDb { } pub fn get_stats(&self) -> serde_json::Value { - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let total_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions", [], |r| r.get(0)).unwrap_or(0); let active_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE disconnected_at IS NULL", [], |r| r.get(0)).unwrap_or(0); @@ -247,7 +269,7 @@ impl NodeDb { overhead: f64, duration_ms: f64, ) { - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); let _ = conn.execute( "INSERT INTO pair_results ( diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index e6da32d..ce7f2bc 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -25,7 +25,7 @@ const ALLOWED_ORIGINS: &[&str] = &[ ]; // Sallitut viestityyypit clientilta -const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk"]; +const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "download_progress"]; struct AppState { next_node_id: Mutex, @@ -34,6 +34,7 @@ struct AppState { stats_tx: broadcast::Sender, ip_connections: Mutex>, node_ips: Mutex>, + node_tasks: Mutex>, // node_id → selected_task db: db::NodeDb, } @@ -86,7 +87,7 @@ tr:hover td { background:#1c2333; }
- +
IDTilaTyyppiIPAlustaIDTilaTehtäväTyyppiIPAlusta OSCPURAMGPUVRAM WebGPUTeht.YhdistettyKesto
@@ -161,6 +162,8 @@ async function load() { const online = !s.disconnected_at; const status = online ? 'ONLINE' : 'offline'; const typeBadge = s.node_type === 'native' ? badge('native','blue') : badge('browser','yellow'); + const taskNames = {'tokenize':'Tokenisaatio','smollm-135m':'SmolLM 135M','qwen-05b':'Qwen2.5 0.5B','phi3-mini':'Phi-3 Mini'}; + const taskBadge = badge(taskNames[s.selected_task] || s.selected_task || 'tokenize', s.selected_task === 'tokenize' ? 'green' : 'blue'); const gpuBadge = s.has_webgpu ? badge('WebGPU','green') : badge('CPU','red'); const gpu = s.gpu_name ? `${s.gpu_name}` : '-'; const vram = s.vram_total_mb ? `${s.vram_total_mb} MB` : '-'; @@ -171,7 +174,7 @@ async function load() { const time = s.connected_at ? new Date(s.connected_at).toLocaleString('fi-FI') : ''; const dur = duration(s.connected_at, s.disconnected_at); return ` - ${s.node_id}${status}${typeBadge}${s.ip} + ${s.node_id}${status}${taskBadge}${typeBadge}${s.ip} ${plat}${os}${cores}${ram} ${gpu}${vram}${gpuBadge} ${s.tasks_completed}${time}${dur} @@ -221,6 +224,7 @@ async fn main() { stats_tx: stats_tx.clone(), ip_connections: Mutex::new(HashMap::new()), node_ips: Mutex::new(HashMap::new()), + node_tasks: Mutex::new(HashMap::new()), db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())), }); @@ -258,13 +262,31 @@ async fn main() { let idx = (rng_state as usize) % pairs.len(); let (en, fi) = pairs[idx]; - let task_msg = serde_json::json!({ + // Tokenisointiparit + let pair_msg = serde_json::json!({ "type": "pair_task", "en": en, "fi": fi, }); - tracing::debug!("Kielipari lähetetty: EN({}) vs FI({} merkkiä)", en.len(), fi.len()); - let _ = state_for_task.stats_tx.send(task_msg.to_string()); + let _ = state_for_task.stats_tx.send(pair_msg.to_string()); + + // LLM-promptit + let llm_prompts = vec![ + "Tell me a short joke.", + "What is WebGPU in one sentence?", + "Explain distributed computing briefly.", + "Write a haiku about technology.", + "What makes Rust special?", + ]; + let llm_idx = (rng_state as usize / 7) % llm_prompts.len(); + let llm_msg = serde_json::json!({ + "type": "llm_prompt", + "prompt": llm_prompts[llm_idx], + "model": "smollm-135m", + }); + let _ = state_for_task.stats_tx.send(llm_msg.to_string()); + + tracing::debug!("Tehtävät lähetetty: pair + llm_prompt"); } }); @@ -496,6 +518,10 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { // Tallennetaan sessiotieto tietokantaan state.db.insert_session(node_id, &ip.to_string(), node_type, &json); + // Tallennetaan valittu tehtävä muistiin reititystä varten + let selected_task = json.get("selected_task").and_then(|v| v.as_str()).unwrap_or("tokenize").to_string(); + state.node_tasks.lock().unwrap().insert(node_id, selected_task); + if node_type == "native" { let sys = json.get("system"); let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()).unwrap_or("?"); @@ -529,10 +555,11 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { .and_then(|g| g.get("description").or_else(|| g.get("vendor"))) .and_then(|v| v.as_str()) .unwrap_or("ei GPU:ta"); + let task = json.get("selected_task").and_then(|v| v.as_str()).unwrap_or("tokenize"); tracing::info!( - "Solmu {} (selain) | {} | {} | {} ydintä | ~{} GB RAM | GPU: {} | varaus: {} GB", - node_id, ip, platform, cores, ram, gpu_desc, allocated + "Solmu {} (selain) | {} | {} | {} ydintä | ~{} GB RAM | GPU: {} | tehtävä: {} | varaus: {} GB", + node_id, ip, platform, cores, ram, gpu_desc, task, allocated ); } } @@ -605,11 +632,40 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } let _ = state.stats_tx.send(json.to_string()); } + } else if msg_type == "llm_done" { + { + let mut json = json; + if let Some(obj) = json.as_object_mut() { + let model = obj.get("model").and_then(|v| v.as_str()).unwrap_or("?"); + let prompt = obj.get("prompt").and_then(|v| v.as_str()).unwrap_or(""); + let response = obj.get("response").and_then(|v| v.as_str()).unwrap_or(""); + let tok_gen = obj.get("tokens_generated").and_then(|v| v.as_u64()).unwrap_or(0); + let duration = obj.get("duration_ms").and_then(|v| v.as_f64()).unwrap_or(0.0); + let tok_s = obj.get("tokens_per_sec").and_then(|v| v.as_f64()).unwrap_or(0.0); + + println!(); + println!("\x1b[35m━━━ Solmu {} ━━━ {} ━━━\x1b[0m", node_id, model); + println!(" Prompt: \x1b[33m\"{}\"\x1b[0m", prompt); + println!(" Vastaus: \x1b[32m{}\x1b[0m", response); + println!(" {} tokenia | {:.0}ms | \x1b[36m{:.1} tok/s\x1b[0m", tok_gen, duration, tok_s); + + state.db.increment_tasks(node_id); + obj.insert("node_id".to_string(), serde_json::json!(node_id)); + } + let _ = state.stats_tx.send(json.to_string()); + + { + let mut task_count = state.total_tasks.lock().unwrap(); + *task_count += 1; + } + broadcast_stats(&state).await; + } } } // Yhteys katkesi — merkitään session päättyneeksi ja siivotaan state.db.close_session(node_id); + state.node_tasks.lock().unwrap().remove(&node_id); { let mut conns = state.ip_connections.lock().unwrap(); if let Some(count) = conns.get_mut(&ip) { diff --git a/network-poc/native-node/Cargo.toml b/network-poc/native-node/Cargo.toml index 7db596c..ec9736b 100644 --- a/network-poc/native-node/Cargo.toml +++ b/network-poc/native-node/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "native-node" version = "0.1.0" -edition = "2021" +edition = "2024" [dependencies] tokio = { version = "1.36", features = ["full"] } diff --git a/network-poc/node/Cargo.toml b/network-poc/node/Cargo.toml index 1c6d81f..e7201ae 100644 --- a/network-poc/node/Cargo.toml +++ b/network-poc/node/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "node" version = "0.1.0" -edition = "2021" +edition = "2024" [lib] crate-type = ["cdylib"] @@ -17,6 +17,12 @@ web-sys = { version = "0.3.68", features = [ "MessageEvent", "Performance", "console", + "Request", + "RequestInit", + "Response", + "Headers", + "ReadableStream", + "ReadableStreamDefaultReader", ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -29,4 +35,8 @@ reqwest = { version = "0.12", default-features = false, features = ["json"] } tokenizers = { version = "0.19.1", default-features = false, features = ["unstable_wasm"] } rexie = "0.6" log = "0.4" +candle-core = { version = "0.8" } +candle-nn = "0.8" +candle-transformers = "0.8" +getrandom = { version = "0.3", features = ["wasm_js"] } diff --git a/network-poc/node/src/lib.rs b/network-poc/node/src/lib.rs index 3542d13..c0d532c 100644 --- a/network-poc/node/src/lib.rs +++ b/network-poc/node/src/lib.rs @@ -1,5 +1,5 @@ use wasm_bindgen::prelude::*; -use web_sys::{console, WebSocket, MessageEvent}; +use web_sys::{WebSocket, MessageEvent}; use std::cell::RefCell; use std::rc::Rc; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; @@ -7,15 +7,17 @@ use burn::tensor::Tensor; use burn::backend::{Wgpu, NdArray}; pub mod storage; +pub mod smollm; +#[macro_export] macro_rules! console_log { - ($($t:tt)*) => (console::log_1(&format_args!($($t)*).to_string().into())) + ($($t:tt)*) => (web_sys::console::log_1(&format_args!($($t)*).to_string().into())) } -// Globaali muuttuja GPU Load Sliderille (25-100%) static GPU_LOAD_PERCENT: AtomicU32 = AtomicU32::new(50); -// Onko WebGPU käytettävissä — asetetaan JS-puolelta käynnistyksessä static HAS_WEBGPU: AtomicBool = AtomicBool::new(true); +// Valittu tehtävä: 0=tokenize, 1=smollm-135m, 2=qwen-05b, 3=phi3-mini +static SELECTED_TASK: AtomicU32 = AtomicU32::new(0); #[wasm_bindgen] pub fn set_gpu_load(load: u32) { @@ -148,12 +150,15 @@ async fn run_pair_comparison(en_text: String, fi_text: String, ws: Rc Result<(), JsValue> { +pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_json: String, task_id: u32) -> Result<(), JsValue> { console_error_panic_hook::set_once(); HAS_WEBGPU.store(has_webgpu, Ordering::SeqCst); + SELECTED_TASK.store(task_id, Ordering::SeqCst); let backend_name = if has_webgpu { "WebGPU" } else { "CPU (NdArray)" }; - console_log!("Kipinä Agent Node käynnistyy — backend: {}", backend_name); + let task_names = ["tokenize", "smollm-135m", "qwen-05b", "phi3-mini"]; + let task_name = task_names.get(task_id as usize).unwrap_or(&"tokenize"); + console_log!("Kipinä Agent Node käynnistyy — backend: {} | tehtävä: {}", backend_name, task_name); let device_info = device_info_json.clone(); @@ -182,7 +187,10 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso if let Ok(txt) = e.data().dyn_into::() { let msg: String = txt.into(); - if msg.contains("pair_task") { + let current_task = SELECTED_TASK.load(Ordering::SeqCst); + + if msg.contains("pair_task") && current_task == 0 { + // Vain tokenisaatiosolmut käsittelevät pair_task-viestejä if let Ok(task) = serde_json::from_str::(&msg) { let en = task.get("en").and_then(|v| v.as_str()).unwrap_or("").to_string(); let fi = task.get("fi").and_then(|v| v.as_str()).unwrap_or("").to_string(); @@ -193,6 +201,18 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso }); } } + } else if msg.contains("llm_prompt") && current_task == 1 { + // Vain SmolLM-solmut käsittelevät llm_prompt-viestejä + if let Ok(task) = serde_json::from_str::(&msg) { + let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let model = task.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if !prompt.is_empty() && model == "smollm-135m" { + let ws_for_async = ws_clone.clone(); + wasm_bindgen_futures::spawn_local(async move { + smollm::run_smollm_inference(prompt, ws_for_async).await; + }); + } + } } else if msg.contains("ai_task") { console_log!("Hub task vastaanotettu, ajetaan GPU:lla..."); let ws_for_async = ws_clone.clone(); diff --git a/network-poc/node/src/smollm.rs b/network-poc/node/src/smollm.rs new file mode 100644 index 0000000..5b70304 --- /dev/null +++ b/network-poc/node/src/smollm.rs @@ -0,0 +1,246 @@ +use candle_core::{Device, Tensor, DType}; +use candle_nn::VarBuilder; +use candle_transformers::models::llama::{Llama, LlamaConfig, LlamaEosToks, Cache}; +use candle_transformers::generation::LogitsProcessor; +use wasm_bindgen::JsCast; +use std::cell::RefCell; +use std::rc::Rc; +use web_sys::WebSocket; + +use crate::storage; + +macro_rules! console_log { + ($($t:tt)*) => (web_sys::console::log_1(&format_args!($($t)*).to_string().into())) +} + +const MODEL_URL: &str = "https://huggingface.co/HuggingFaceTB/SmolLM-135M-Instruct/resolve/main/model.safetensors"; +const TOKENIZER_URL: &str = "https://huggingface.co/HuggingFaceTB/SmolLM-135M-Instruct/resolve/main/tokenizer.json"; + +/// Lataa tiedosto HuggingFacesta streaming-latauksella (progress-ilmoitukset) ja tallentaa IndexedDB:hen +async fn ensure_cached(key: &str, url: &str, ws: &Rc>) -> Result, String> { + if let Ok(Some(bytes)) = storage::load_from_idb(key).await { + console_log!("[SmolLM] {} löytyi välimuistista ({} MB)", key, bytes.len() / 1024 / 1024); + send_progress(ws, key, 100, bytes.len(), bytes.len()); + return Ok(bytes); + } + + console_log!("[SmolLM] Ladataan {}...", key); + send_progress(ws, key, 0, 0, 0); + + // Fetch API:lla saadaan Content-Length ja streaming-luku + let window = web_sys::window().unwrap(); + let resp_val = wasm_bindgen_futures::JsFuture::from(window.fetch_with_str(url)) + .await.map_err(|e| format!("Fetch epäonnistui: {:?}", e))?; + let resp: web_sys::Response = resp_val.dyn_into().map_err(|_| "Ei Response-objekti".to_string())?; + + if !resp.ok() { + return Err(format!("HTTP {}", resp.status())); + } + + // Kokonaiskoko Content-Length-headerista + let total_size: usize = resp.headers() + .get("content-length").ok().flatten() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + let body = resp.body().ok_or("Ei bodyä")?; + let reader = body.get_reader(); + let reader: web_sys::ReadableStreamDefaultReader = reader.dyn_into().map_err(|_| "Ei ReadableStreamDefaultReader".to_string())?; + + let mut data: Vec = Vec::with_capacity(total_size); + let mut last_pct: u32 = 0; + + loop { + let chunk = wasm_bindgen_futures::JsFuture::from(reader.read()) + .await.map_err(|e| format!("Luku epäonnistui: {:?}", e))?; + + let done = js_sys::Reflect::get(&chunk, &"done".into()) + .map_err(|_| "done-kenttä puuttuu".to_string())? + .as_bool().unwrap_or(true); + + if done { break; } + + let value = js_sys::Reflect::get(&chunk, &"value".into()) + .map_err(|_| "value-kenttä puuttuu".to_string())?; + let array = js_sys::Uint8Array::new(&value); + let mut buf = vec![0u8; array.length() as usize]; + array.copy_to(&mut buf); + data.extend_from_slice(&buf); + + // Progress-päivitys (joka 5%) + if total_size > 0 { + let pct = ((data.len() as f64 / total_size as f64) * 100.0) as u32; + if pct >= last_pct + 5 || pct == 100 { + last_pct = pct; + console_log!("[SmolLM] {} lataus: {}% ({}/{} MB)", key, pct, data.len() / 1024 / 1024, total_size / 1024 / 1024); + send_progress(ws, key, pct, data.len(), total_size); + } + } + } + + console_log!("[SmolLM] Tallennetaan {} ({} MB) IndexedDB:hen...", key, data.len() / 1024 / 1024); + let _ = storage::save_to_idb(key, &data).await; + console_log!("[SmolLM] {} tallennettu!", key); + send_progress(ws, key, 100, data.len(), data.len()); + + Ok(data) +} + +fn send_progress(ws: &Rc>, file: &str, pct: u32, loaded: usize, total: usize) { + let msg = serde_json::json!({ + "type": "download_progress", + "file": file, + "pct": pct, + "loaded_mb": loaded / 1024 / 1024, + "total_mb": total / 1024 / 1024, + }); + let _ = ws.borrow().send_with_str(&msg.to_string()); +} + +/// Lataa malli ja tokenizer, suorita inferenssi ja streamaa tokenit hubille +pub async fn run_smollm_inference(prompt: String, ws: Rc>) { + let perf = web_sys::window().unwrap().performance().unwrap(); + + // 1. Lataa tokenizer + let tok_bytes = match ensure_cached("smollm-tokenizer.json", TOKENIZER_URL, &ws).await { + Ok(b) => b, + Err(e) => { console_log!("[SmolLM] Tokenizer-virhe: {}", e); return; } + }; + + let tokenizer = match tokenizers::Tokenizer::from_bytes(&tok_bytes) { + Ok(t) => t, + Err(e) => { console_log!("[SmolLM] Tokenizer-parsinta epäonnistui: {}", e); return; } + }; + + // 2. Lataa mallin painot + let model_bytes = match ensure_cached("smollm-model.safetensors", MODEL_URL, &ws).await { + Ok(b) => b, + Err(e) => { console_log!("[SmolLM] Malli-virhe: {}", e); return; } + }; + + console_log!("[SmolLM] Rakennetaan mallia..."); + let start_load = perf.now(); + + let device = Device::Cpu; + let dtype = DType::F32; + + // Parsitaan safetensors + let tensors = match candle_core::safetensors::load_buffer(&model_bytes, &device) { + Ok(t) => t, + Err(e) => { console_log!("[SmolLM] Safetensors-parsinta epäonnistui: {}", e); return; } + }; + + let vb = VarBuilder::from_tensors(tensors, dtype, &device); + + // SmolLM-135M config (Llama-arkkitehtuuri) + let config = LlamaConfig { + hidden_size: 576, + intermediate_size: 1536, + vocab_size: 49152, + num_hidden_layers: 30, + num_attention_heads: 9, + num_key_value_heads: Some(3), + rms_norm_eps: 1e-5, + rope_theta: 10000.0, + max_position_embeddings: 2048, + tie_word_embeddings: Some(true), + bos_token_id: Some(1u32), + eos_token_id: Some(LlamaEosToks::Single(2)), + rope_scaling: None, + }; + + let llama_config = config.into_config(false); // false = ei flash attention + let mut cache = Cache::new(true, dtype, &llama_config, &device).unwrap(); + + let model = match Llama::load(vb, &llama_config) { + Ok(m) => m, + Err(e) => { console_log!("[SmolLM] Mallin lataus epäonnistui: {}", e); return; } + }; + + let load_time = perf.now() - start_load; + console_log!("[SmolLM] Malli ladattu ({:.0}ms). Generoidaan...", load_time); + + // 3. Tokenisoi syöte + let encoding = match tokenizer.encode(prompt.as_str(), true) { + Ok(e) => e, + Err(e) => { console_log!("[SmolLM] Tokenisointivirhe: {}", e); return; } + }; + + let input_ids: Vec = encoding.get_ids().to_vec(); + let input_len = input_ids.len(); + console_log!("[SmolLM] Syöte: {} tokenia", input_len); + + // 4. Generoi tokeneita + let start_gen = perf.now(); + let mut logits_processor = LogitsProcessor::new(42, Some(0.8), Some(0.95)); + let mut all_tokens = input_ids.clone(); + let max_new_tokens = 64; + let mut generated_text = String::new(); + + for i in 0..max_new_tokens { + let context_tokens = if i == 0 { + all_tokens.as_slice() + } else { + std::slice::from_ref(all_tokens.last().unwrap()) + }; + + let input = Tensor::new(context_tokens, &device).unwrap().unsqueeze(0).unwrap(); + let seq_len = input.dim(1).unwrap(); + + let logits = match model.forward(&input, input_len + i - seq_len, &mut cache) { + Ok(l) => l, + Err(e) => { console_log!("[SmolLM] Forward-virhe stepissä {}: {}", i, e); break; } + }; + + // Viimeisen tokenin logitit + let logits = logits.squeeze(0).unwrap(); + let last_dim = logits.dim(0).unwrap(); + let logits = if last_dim > 1 { + logits.get(last_dim - 1).unwrap() + } else { + logits.get(0).unwrap() + }; + + let next_token = logits_processor.sample(&logits).unwrap(); + + // EOS-tarkistus + if next_token == 2 { + break; + } + + all_tokens.push(next_token); + + // Dekoodaa token tekstiksi + if let Ok(text) = tokenizer.decode(&[next_token], true) { + generated_text.push_str(&text); + + // Streamaa token hubille + let chunk = serde_json::json!({ + "type": "llm_chunk", + "token": text, + "is_last": false, + "prompt": prompt, + "model": "SmolLM-135M" + }); + let _ = ws.borrow().send_with_str(&chunk.to_string()); + } + } + + let gen_time = perf.now() - start_gen; + let tokens_generated = all_tokens.len() - input_len; + let tokens_per_sec = if gen_time > 0.0 { (tokens_generated as f64 / gen_time) * 1000.0 } else { 0.0 }; + + console_log!("[SmolLM] Generoitu {} tokenia | {:.0}ms | {:.1} tok/s", tokens_generated, gen_time, tokens_per_sec); + + let done = serde_json::json!({ + "type": "llm_done", + "prompt": prompt, + "model": "SmolLM-135M-Instruct", + "response": generated_text, + "tokens_generated": tokens_generated, + "duration_ms": (gen_time * 100.0).round() / 100.0, + "tokens_per_sec": (tokens_per_sec * 10.0).round() / 10.0, + "load_time_ms": (load_time * 100.0).round() / 100.0, + }); + let _ = ws.borrow().send_with_str(&done.to_string()); +} diff --git a/network-poc/static/index.html b/network-poc/static/index.html index 10f477f..1a15fd7 100644 --- a/network-poc/static/index.html +++ b/network-poc/static/index.html @@ -226,6 +226,54 @@ } .toggle-tokens:hover { color: var(--text-color); border-color: #8b949e; } + .task-option { + background: var(--panel-bg); + border: 2px solid var(--border-color); + border-radius: 8px; + padding: 14px; + cursor: pointer; + transition: border-color 0.2s; + position: relative; + } + .task-option:hover { border-color: #8b949e; } + .task-option.selected { border-color: var(--accent-color); background: #58a6ff10; } + .task-title { font-weight: 600; font-size: 15px; color: var(--text-color); margin-bottom: 4px; } + .task-desc { font-size: 12px; color: #8b949e; line-height: 1.4; margin-bottom: 8px; } + .task-size { font-size: 11px; color: #6e7681; } + .task-badge { + position: absolute; + top: 10px; + right: 10px; + font-size: 10px; + font-weight: 600; + padding: 2px 8px; + border-radius: 10px; + } + .task-ready { background: #23392050; color: var(--success-color); border: 1px solid #23392080; } + .task-soon { background: #d2992215; color: #d29922; border: 1px solid #d2992240; } + + .download-bar { + background: #0d1117; + border: 1px solid var(--border-color); + border-radius: 6px; + padding: 12px 16px; + margin-bottom: 16px; + display: none; + } + .download-bar .bar-track { + background: #21262d; + border-radius: 4px; + height: 8px; + margin-top: 8px; + overflow: hidden; + } + .download-bar .bar-fill { + background: var(--accent-color); + height: 100%; + border-radius: 4px; + transition: width 0.3s ease; + } + .metric-card { background: var(--panel-bg); border: 1px solid var(--border-color); @@ -270,10 +318,53 @@
+ +
+
Valitse tehtävä
+
+ + + + +
+
+