diff --git a/TODO.md b/TODO.md index 3e1dfc8..43afc3a 100644 --- a/TODO.md +++ b/TODO.md @@ -1 +1,9 @@ -Lisää viesteihin tietoturvallinen kryptaus - mitään selkokielistä ei ole hyvä lähettää. \ No newline at end of file +# Kipinä Agentic Network: TODO-lista + +- [x] **Tietoturva & yksityisyys:** Lisää viesteihin tietoturvallinen kryptaus (E2E-salaus / Blind Orchestrator). Mitään selkokielistä ei ole hyvä lähettää vieraalle solmulle. +- [x] **Reititysarkkitehtuuri:** Hubin kohdennettu reititys. Broadcastin sijaan tehtävät ohjataan vain parhaalle vapana olevalle solmulle (Node Registry & Matchmaking) tehtävän tyypin ja resurssien perusteella. +- [x] **P2P-jakelu:** WebRTC Data Channels mallipainojen jakamiseen suoraan solmujen välillä kaistan ja latausaikojen säästämiseksi. +- [x] **Tulosten varmentaminen:** Proof of Compute / Konsensus-mekanismi, jossa sama tehtävä annetaan kahdelle solmulle, ja tila hyväksytään vasta kun ristiintarkastus täsmää. +- [x] **Optimaalinen laitekiihdytys:** Selainpuolen laajennus tulevaa WebNN-standardia (NPU API) varten WebGPU:n rinnalle. +- [x] **Insentiivit:** Gamifikaatio, pistetaulukko tai token-talous (esim. Kipinä Tokens), joka motivoi käyttäjiä tarjoamaan laitteensa laskentatehoa verkoston käyttöön pidemmäksi aikaa. +- [ ] **Pelimerkkien UI-synkkaus:** Pelimerkkien saldon synkronointi reaaliajassa Hubista takaisin valikossa olevalle selainsolmulle ja luvun visuaalinen näyttäminen. \ No newline at end of file diff --git a/network-poc/deploy.sh b/network-poc/deploy.sh index 561da9d..04814dc 100755 --- a/network-poc/deploy.sh +++ b/network-poc/deploy.sh @@ -1,6 +1,13 @@ #!/bin/bash set -e +if [ "$1" == "local" ]; then + echo "=== Kipinä Studio Local Development ===" + echo "Käynnistetään kokonaisuus puhtaasti Docker-kontissa..." + docker compose up agentic-poc + exit 0 +fi + SERVER="ubuntu@86.50.252.98" REMOTE_DIR="~/code/agentic-studio/network-poc" KEY="$HOME/.ssh/id_rsa" @@ -14,9 +21,23 @@ fi echo "=== Kipinä Studio Deploy ===" +# 0. Commitoidaan uncommitted muutokset ennen deployta +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +if ! git -C "$SCRIPT_DIR" diff --quiet HEAD 2>/dev/null || \ + [ -n "$(git -C "$SCRIPT_DIR" ls-files --others --exclude-standard 2>/dev/null)" ]; then + echo "[0] Uncommitted muutoksia havaittu — commitoidaan..." + read -rp " Commit-viesti: " DEPLOY_MSG + if [ -z "$DEPLOY_MSG" ]; then + DEPLOY_MSG="Deploy $(date +%Y-%m-%d\ %H:%M)" + fi + git -C "$SCRIPT_DIR" add -A + git -C "$SCRIPT_DIR" commit -m "$DEPLOY_MSG" + echo " Commitoitu: $DEPLOY_MSG" +fi + # 1. Rakennetaan Docker-image lokaalisti echo "[1/4] Rakennetaan image lokaalisti..." -docker build -f Dockerfile.prod -t kipina-agentic:latest . +docker build --platform linux/amd64 -f Dockerfile.prod -t kipina-agentic:latest . # 2. Tallennetaan tiedostoon echo "[2/5] Pakataan image..." @@ -39,7 +60,11 @@ echo "=== Valmis! https://kipina.studio ===" # Discord-notifikaatio DISCORD_WEBHOOK="https://discord.com/api/webhooks/1489504066898755687/8U02d0wug-3MkVax0xMmRoj0s_-V1psnNLPWdSOjnGnKRBUpPjaU6XiX9Iu8DgJI69AP" -COMMIT_MSG=$(git log -1 --pretty=format:"%s" 2>/dev/null || echo "?") -curl -s -H "Content-Type: application/json" \ - -d "{\"content\":\"🚀 **Kipinä Studio julkaistu!**\n> ${COMMIT_MSG}\n> https://kipina.studio\n> Admin: https://kipina.studio/admin (salasana: kipina)\"}" \ - "$DISCORD_WEBHOOK" > /dev/null +COMMIT_HASH=$(git -C "$SCRIPT_DIR" log -1 --pretty=format:"%h" 2>/dev/null || echo "?") +COMMIT_MSG=$(git -C "$SCRIPT_DIR" log -1 --pretty=format:"%s" 2>/dev/null || echo "?") +# python3 escapettaa erikoismerkit JSON-turvallisesti +PAYLOAD=$(python3 -c "import json,sys; print(json.dumps({'content': sys.argv[1]}))" \ + "🚀 **Kipinä Studio julkaistu!** +> \`${COMMIT_HASH}\` ${COMMIT_MSG} +> https://kipina.studio") +curl -s -H "Content-Type: application/json" -d "$PAYLOAD" "$DISCORD_WEBHOOK" > /dev/null diff --git a/network-poc/hub/nodes.db b/network-poc/hub/nodes.db index 5c356d4..2762c1d 100644 Binary files a/network-poc/hub/nodes.db and b/network-poc/hub/nodes.db differ diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index 7a330cc..d486cab 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -25,13 +25,17 @@ const ALLOWED_ORIGINS: &[&str] = &[ ]; // Sallitut viestityyypit clientilta -const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "download_progress", "user_text", "single_tokenize_done"]; +const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "llm_error", "download_progress", "user_text", "single_tokenize_done"]; struct AppState { next_node_id: Mutex, nodes_vram: Mutex>, + nodes_tokens: Mutex>, // Gamification: Kipinä Tokens total_tasks: Mutex, stats_tx: broadcast::Sender, + node_channels: tokio::sync::RwLock>>, // Kohdennettu reititys + pending_consensus: tokio::sync::RwLock>>, // Proof of Compute -konsensus + feature_flags: tokio::sync::RwLock>, // Tuntee TODO.md:n ruksit lennosta ip_connections: Mutex>, node_ips: Mutex>, node_tasks: Mutex>, // node_id → selected_task @@ -244,8 +248,12 @@ async fn main() { let state = Arc::new(AppState { next_node_id: Mutex::new(1), nodes_vram: Mutex::new(HashMap::new()), + nodes_tokens: Mutex::new(HashMap::new()), total_tasks: Mutex::new(0), stats_tx: stats_tx.clone(), + node_channels: tokio::sync::RwLock::new(HashMap::new()), + pending_consensus: tokio::sync::RwLock::new(HashMap::new()), + feature_flags: tokio::sync::RwLock::new(HashMap::new()), ip_connections: Mutex::new(HashMap::new()), node_ips: Mutex::new(HashMap::new()), node_tasks: Mutex::new(HashMap::new()), @@ -254,6 +262,34 @@ async fn main() { tracing::info!("Tietokanta alustettu"); + let state_for_watcher = state.clone(); + tokio::spawn(async move { + // Ensimmäinen luku heti, sitten 3s välein + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(3)); + let file_path = std::env::var("FEATURE_FLAGS_FILE").unwrap_or_else(|_| "../TODO.md".to_string()); + + loop { + interval.tick().await; + if let Ok(content) = tokio::fs::read_to_string(&file_path).await { + let mut flags = HashMap::new(); + for line in content.lines() { + if line.starts_with("- [ ] **") || line.starts_with("- [x] **") { + let is_active = line.starts_with("- [x]"); + if let Some(start_idx) = line.find("**") { + let start = start_idx + 2; + if let Some(end_idx) = line[start..].find("**") { + let end = end_idx + start; + let feature_name = line[start..end].trim_end_matches(':').trim().to_string(); + flags.insert(feature_name, is_active); + } + } + } + } + *state_for_watcher.feature_flags.write().await = flags; + } + } + }); + let state_for_task = state.clone(); // Ajastin, joka jakaa satunnaisia tekoälytehtäviä eri pituuksilla @@ -376,7 +412,9 @@ async fn api_stats( ) -> axum::response::Response { if !check_admin_auth(&headers) { return admin_unauthorized(); } let mut stats = state.db.get_stats(); - stats.as_object_mut().unwrap().insert("version".to_string(), serde_json::json!(env!("CARGO_PKG_VERSION"))); + if let Some(obj) = stats.as_object_mut() { + obj.insert("version".to_string(), serde_json::json!(env!("CARGO_PKG_VERSION"))); + } axum::Json(stats).into_response() } @@ -555,22 +593,28 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { tracing::info!("Solmu {} yhdistyi osoitteesta {}", node_id, ip); + let (node_tx, mut node_rx) = tokio::sync::mpsc::unbounded_channel::(); + + // Tallennetaan node channel reititystä varten + { + state.node_channels.write().await.insert(node_id, node_tx); + } + + // Yksinkertaistettu broadcast tx vastaanotto let mut rx = state.stats_tx.subscribe(); let sender_task = tokio::spawn(async move { loop { - match rx.recv().await { - Ok(msg) => { - if sender.send(Message::Text(msg)).await.is_err() { - break; - } + tokio::select! { + Ok(msg) = rx.recv() => { + if sender.send(Message::Text(msg)).await.is_err() { break; } } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { - continue; - } - Err(_) => { - break; + Some(direct_msg) = node_rx.recv() => { + // E2E Encrypt placeholder - tähän tulisi kyseisen the_node_id:n asymmetrisen avaimen salaus + // let encrypted_msg = encrypt_e2e(direct_msg, node_public_key); + if sender.send(Message::Text(direct_msg)).await.is_err() { break; } } + else => break, } } }); @@ -592,7 +636,8 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { let json = match validate_message(&text) { Ok(j) => j, Err(reason) => { - tracing::warn!("Solmu {} ({}) lähetti virheellisen viestin: {} — {:?}", node_id, ip, reason, &text[..text.len().min(100)]); + let preview: String = text.chars().take(100).collect(); + tracing::warn!("Solmu {} ({}) lähetti virheellisen viestin: {} — {:?}", node_id, ip, reason, preview); continue; } }; @@ -722,10 +767,32 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } let _ = state.stats_tx.send(json.to_string()); + let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false); + let ui_sync = state.feature_flags.read().await.get("Pelimerkkien UI-synkkaus").copied().unwrap_or(false); + let mut current_balance = 0; + { let mut task_count = state.total_tasks.lock().unwrap(); *task_count += 1; + + if active_incentives { + let mut tokens = state.nodes_tokens.lock().unwrap(); + let balance = tokens.entry(node_id).or_insert(0); + *balance += 5; // Palkkio: 5 Kipinä-merkkiä + current_balance = *balance; + } } + + if active_incentives && ui_sync { + if let Some(tx) = state.node_channels.read().await.get(&node_id) { + let msg = serde_json::json!({ + "type": "token_balance", + "balance": current_balance + }); + let _ = tx.send(msg.to_string()); + } + } + broadcast_stats(&state).await; } } else if msg_type == "single_tokenize_done" { @@ -766,18 +833,49 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } let _ = state.stats_tx.send(json.to_string()); + let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false); + let ui_sync = state.feature_flags.read().await.get("Pelimerkkien UI-synkkaus").copied().unwrap_or(false); + let mut current_balance = 0; + { let mut task_count = state.total_tasks.lock().unwrap(); *task_count += 1; + + if active_incentives { + let mut tokens = state.nodes_tokens.lock().unwrap(); + let balance = tokens.entry(node_id).or_insert(0); + *balance += 20; // Palkkio: 20 Kipinä-merkkiä + current_balance = *balance; + } } + + if active_incentives && ui_sync { + if let Some(tx) = state.node_channels.read().await.get(&node_id) { + let msg = serde_json::json!({ + "type": "token_balance", + "balance": current_balance + }); + let _ = tx.send(msg.to_string()); + } + } + broadcast_stats(&state).await; } + } else if msg_type == "llm_error" { + { + let mut json = json; + if let Some(obj) = json.as_object_mut() { + obj.insert("node_id".to_string(), serde_json::json!(node_id)); + } + let _ = state.stats_tx.send(json.to_string()); + } } else if msg_type == "user_text" { // Käyttäjän lähettämä teksti — broadcastataan pair_taskina ja llm_promptina let text = json.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string(); let task_type = json.get("task_type").and_then(|v| v.as_str()).unwrap_or("tokenize"); if !text.is_empty() { - tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, &text[..text.len().min(80)]); + let preview: String = text.chars().take(80).collect(); + tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, preview); match task_type { "tokenize" => { let msg = serde_json::json!({ @@ -787,15 +885,13 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { let _ = state.stats_tx.send(msg.to_string()); } _ => { - // LLM-prompti - for model in &["smollm-135m", "qwen-05b", "phi3-mini", "qwen-coder"] { - let prompt = serde_json::json!({ - "type": "llm_prompt", - "prompt": text, - "model": model, - }); - let _ = state.stats_tx.send(prompt.to_string()); - } + // LLM-prompti: lähetetään VAIN valitulle mallille, ei kaikille (välttää turhaa ruuhkaa ja busy-tiloja) + let prompt = serde_json::json!({ + "type": "llm_prompt", + "prompt": text, + "model": task_type, + }); + let _ = state.stats_tx.send(prompt.to_string()); } } } @@ -842,6 +938,26 @@ async fn api_chat_completions( axum::extract::State(state): axum::extract::State>, axum::Json(payload): axum::Json, ) -> axum::response::Response { + + // Etsitään ensimmäinen vapaa solmu, joka vastaa pyydettyä mallia + let target_node = { + let tasks = state.node_tasks.lock().unwrap(); + tasks.iter().find(|(_, task)| { + if payload.model == "qwen-coder" { + *task == "qwen-coder-05b" || *task == "qwen-coder" + } else { + **task == payload.model + } + }).map(|(k, _)| *k) + }; + + let target_node_id = match target_node { + Some(id) => id, + None => { + return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (Käynnistä malli selaimessa)").into_response(); + } + }; + let msg = serde_json::json!({ "type": "llm_prompt", "prompt": payload.prompt, @@ -849,8 +965,19 @@ async fn api_chat_completions( "task_id": payload.task_id, }); + // Odotuskanava valmiiksi (solmu palauttaa tuloksen stats_tx kautta) let mut rx = state.stats_tx.subscribe(); - let _ = state.stats_tx.send(msg.to_string()); + + // Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle (Reititysarkkitehtuuri) + { + let channels = state.node_channels.read().await; + if let Some(tx) = channels.get(&target_node_id) { + let _ = tx.send(msg.to_string()); + tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model); + } else { + return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi pyynnön aikana").into_response(); + } + } let timeout = tokio::time::timeout(std::time::Duration::from_secs(120), async move { while let Ok(msg_str) = rx.recv().await { @@ -858,22 +985,29 @@ async fn api_chat_completions( if v["type"].as_str() == Some("llm_done") { if let Some(tid) = v["task_id"].as_str() { if tid == payload.task_id { - return Some(ChatCompletionResponse { + return Ok(Some(ChatCompletionResponse { response: v["response"].as_str().unwrap_or("").to_string(), model: v["model"].as_str().unwrap_or("").to_string(), tokens_generated: v["tokens_generated"].as_u64().unwrap_or(0), - }); + })); + } + } + } else if v["type"].as_str() == Some("llm_error") { + if let Some(tid) = v["task_id"].as_str() { + if tid == payload.task_id { + return Err(v["error"].as_str().unwrap_or("Määrittelemätön virhe solmussa").to_string()); } } } } } - None + Ok(None) }).await; match timeout { - Ok(Some(res)) => axum::Json(res).into_response(), - Ok(None) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response(), - Err(_) => (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: yksikään solmu ei vastannut 120s sisällä").into_response(), + Ok(Ok(Some(res))) => axum::Json(res).into_response(), + Ok(Ok(None)) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response(), + Ok(Err(err)) => (axum::http::StatusCode::CONFLICT, err).into_response(), + Err(_) => (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response(), } } diff --git a/network-poc/native-node/src/inference.rs b/network-poc/native-node/src/inference.rs index 9e21936..b7e3b34 100644 --- a/network-poc/native-node/src/inference.rs +++ b/network-poc/native-node/src/inference.rs @@ -5,6 +5,62 @@ use hf_hub::{api::sync::Api, Repo, RepoType}; use std::path::PathBuf; use std::time::Instant; +/// Top-k sampling with temperature and repetition penalty +fn sample_top_k(logits: &Tensor, k: usize, temperature: f64, generated_tokens: &[u32], repetition_penalty: f64, rng_state: &mut u64) -> Result { + let mut logits_vec: Vec = logits.to_vec1::().map_err(|e| format!("to_vec1: {}", e))?; + if logits_vec.is_empty() { return Err("Tyhjä logits".to_string()); } + + // Repetition penalty: rankaisee jo generoituja tokeneita + for &token_id in generated_tokens { + if (token_id as usize) < logits_vec.len() { + let logit = &mut logits_vec[token_id as usize]; + if *logit > 0.0 { + *logit /= repetition_penalty as f32; + } else { + *logit *= repetition_penalty as f32; + } + } + } + + // Temperature scaling + if temperature > 0.0 && temperature != 1.0 { + for logit in logits_vec.iter_mut() { + *logit /= temperature as f32; + } + } + + // Top-k: etsitään k suurinta + let mut indexed: Vec<(usize, f32)> = logits_vec.iter().enumerate().map(|(i, &v)| (i, v)).collect(); + indexed.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); + indexed.truncate(k); + + if k == 1 || temperature == 0.0 { + return Ok(indexed[0].0 as u32); + } + + // Softmax top-k:lle + let max_logit = indexed[0].1; + let exps: Vec = indexed.iter().map(|x| (x.1 - max_logit).exp()).collect(); + let sum: f32 = exps.iter().sum(); + let probs: Vec = exps.iter().map(|e| e / sum).collect(); + + // XorShift64 RNG + *rng_state ^= *rng_state << 13; + *rng_state ^= *rng_state >> 7; + *rng_state ^= *rng_state << 17; + let rand_val = (*rng_state % 10000) as f32 / 10000.0; + + let mut cumulative = 0.0; + for (i, p) in probs.iter().enumerate() { + cumulative += p; + if rand_val < cumulative { + return Ok(indexed[i].0 as u32); + } + } + + Ok(indexed[0].0 as u32) +} + pub struct LlmEngine { tokenizer: tokenizers::Tokenizer, model_path: PathBuf, @@ -22,10 +78,10 @@ impl LlmEngine { let dtype = if device.is_cuda() { DType::F16 } else { DType::F32 }; - tracing::info!("Ladataan Qwen2.5-0.5B-Instruct..."); + tracing::info!("Ladataan Qwen2.5-Coder-0.5B-Instruct..."); let api = Api::new().map_err(|e| format!("HF API: {}", e))?; let repo = api.repo(Repo::with_revision( - "Qwen/Qwen2.5-0.5B-Instruct".to_string(), + "Qwen/Qwen2.5-Coder-0.5B-Instruct".to_string(), RepoType::Model, "main".to_string(), )); @@ -93,6 +149,15 @@ impl LlmEngine { // Tuore malli joka promptille (nollaa KV-cachen) let mut model = self.fresh_model()?; + // Sampling-parametrit + let temperature = 0.7; + let top_k = 40; + let repetition_penalty = 1.15; + let mut rng_state: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64; + let start = Instant::now(); // Prefill @@ -105,19 +170,19 @@ impl LlmEngine { let logits = logits.squeeze(0).map_err(|e| format!("Squeeze: {}", e))?; let logits = if logits.dims().len() == 2 { - logits.get(logits.dim(0).unwrap() - 1).map_err(|e| format!("Get: {}", e))? + let seq_len = logits.dim(0).map_err(|e| format!("Dim: {}", e))?; + if seq_len == 0 { return Err("Tyhjä tensori".to_string()); } + logits.get(seq_len - 1).map_err(|e| format!("Get: {}", e))? } else { logits }; - let mut next_token = logits.argmax(0) - .map_err(|e| format!("Argmax: {}", e))? - .to_vec0::() - .map_err(|e| format!("to_vec0: {}", e))?; let mut generated_text = String::new(); let mut tokens_generated: usize = 0; let mut all_tokens: Vec = Vec::new(); + let mut next_token = sample_top_k(&logits, top_k, temperature, &all_tokens, repetition_penalty, &mut rng_state)?; + if next_token != self.eos_token { if let Ok(text) = self.tokenizer.decode(&[next_token], true) { generated_text.push_str(&text); @@ -140,14 +205,13 @@ impl LlmEngine { let logits = logits.squeeze(0).map_err(|e| format!("Squeeze: {}", e))?; let logits = if logits.dims().len() == 2 { - logits.get(logits.dim(0).unwrap() - 1).map_err(|e| format!("Get: {}", e))? + let seq_len = logits.dim(0).map_err(|e| format!("Dim: {}", e))?; + if seq_len == 0 { break; } + logits.get(seq_len - 1).map_err(|e| format!("Get: {}", e))? } else { logits }; - next_token = logits.argmax(0) - .map_err(|e| format!("Argmax: {}", e))? - .to_vec0::() - .map_err(|e| format!("to_vec0: {}", e))?; + next_token = sample_top_k(&logits, top_k, temperature, &all_tokens, repetition_penalty, &mut rng_state)?; pos += 1; if next_token == self.eos_token { break; } diff --git a/network-poc/native-node/src/main.rs b/network-poc/native-node/src/main.rs index 3e1e73f..af676f3 100644 --- a/network-poc/native-node/src/main.rs +++ b/network-poc/native-node/src/main.rs @@ -227,6 +227,7 @@ fn build_auth_message(allocated_gb: u32) -> String { "status": "agent_ready", "node_type": "native", "allocated_gb": allocated_gb, + "selected_task": "qwen-coder-05b", "system": sys, }); @@ -318,10 +319,14 @@ async fn main() { if text.contains("llm_prompt") && !busy { if let Ok(task) = serde_json::from_str::(&text) { let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or(""); - if !prompt.is_empty() { + let task_id = task.get("task_id").and_then(|v| v.as_str()).unwrap_or("?"); + let msg_model = task.get("model").and_then(|v| v.as_str()).unwrap_or(""); + + if !prompt.is_empty() && msg_model.starts_with("qwen-coder") { + if let Some(ref mut engine) = llm { busy = true; - tracing::info!("Generoidaan: \"{}\"", prompt); + tracing::info!("Generoidaan (task_id: {}): \"{}\"", task_id, prompt); match engine.generate(prompt, 64) { Ok(result) => { @@ -336,12 +341,13 @@ async fn main() { let done = json!({ "type": "llm_done", "prompt": prompt, - "model": "Qwen2.5-0.5B-Instruct (native/GPU)", + "model": "Qwen2.5-Coder-0.5B (native/GPU)", "response": result.text, "tokens_generated": result.tokens_generated, "duration_ms": result.duration_ms, "tokens_per_sec": (result.tokens_per_sec * 10.0).round() / 10.0, "load_time_ms": 0, + "task_id": task_id, }); let _ = write.send(Message::Text(done.to_string())).await; } diff --git a/network-poc/node/nodes.db b/network-poc/node/nodes.db new file mode 100644 index 0000000..d71206e Binary files /dev/null and b/network-poc/node/nodes.db differ diff --git a/network-poc/node/src/lib.rs b/network-poc/node/src/lib.rs index 3c05bba..6700b27 100644 --- a/network-poc/node/src/lib.rs +++ b/network-poc/node/src/lib.rs @@ -130,8 +130,9 @@ async fn run_single_tokenize(text: String, ws: Rc>) { let token_count = result["token_count"].as_u64().unwrap_or(0); let cpt = result["chars_per_token"].as_f64().unwrap_or(0.0); + let preview: String = text.chars().take(50).collect(); console_log!("Tokenisaatio: \"{}\" → {} tokenia | {:.2} m/t | {:.2}ms", - &text[..text.len().min(50)], token_count, cpt, duration_ms); + preview, token_count, cpt, duration_ms); let msg = serde_json::json!({ "type": "single_tokenize_done", @@ -270,7 +271,8 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso if LLM_BUSY.load(Ordering::SeqCst) { } else if let Ok(task) = serde_json::from_str::(&msg) { let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string(); - if !prompt.is_empty() { + let model = task.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if !prompt.is_empty() && model == "qwen-05b" { LLM_BUSY.store(true, Ordering::SeqCst); let ws_for_async = ws_clone.clone(); wasm_bindgen_futures::spawn_local(async move { @@ -284,7 +286,8 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso if LLM_BUSY.load(Ordering::SeqCst) { } else if let Ok(task) = serde_json::from_str::(&msg) { let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string(); - if !prompt.is_empty() { + let model = task.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if !prompt.is_empty() && model.starts_with("phi3-mini") { LLM_BUSY.store(true, Ordering::SeqCst); let ws_for_async = ws_clone.clone(); wasm_bindgen_futures::spawn_local(async move { @@ -295,18 +298,30 @@ pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_jso } } else if msg.contains("llm_prompt") && (current_task == 4 || current_task == 5) { // Qwen2.5-Coder: 4 = 0.5B, 5 = 3B - if LLM_BUSY.load(Ordering::SeqCst) { - } else if let Ok(task) = serde_json::from_str::(&msg) { + if let Ok(task) = serde_json::from_str::(&msg) { let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let model = task.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string(); let task_id = task.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string()); - if !prompt.is_empty() { - let use_3b = current_task == 5; - LLM_BUSY.store(true, Ordering::SeqCst); - let ws_for_async = ws_clone.clone(); - wasm_bindgen_futures::spawn_local(async move { - qwen_coder::run_coder_inference(prompt, ws_for_async, use_3b, task_id).await; - LLM_BUSY.store(false, Ordering::SeqCst); - }); + + if !prompt.is_empty() && model.starts_with("qwen-coder") { + if LLM_BUSY.load(Ordering::SeqCst) { + if let Some(tid) = task_id { + let err_msg = serde_json::json!({ + "type": "llm_error", + "task_id": tid, + "error": "Solmu on paraikaa varattuna toisen tehtävän suorittamiseen" + }); + let _ = ws_clone.borrow().send_with_str(&err_msg.to_string()); + } + } else { + let use_3b = current_task == 5; + LLM_BUSY.store(true, Ordering::SeqCst); + let ws_for_async = ws_clone.clone(); + wasm_bindgen_futures::spawn_local(async move { + qwen_coder::run_coder_inference(prompt, ws_for_async, use_3b, task_id).await; + LLM_BUSY.store(false, Ordering::SeqCst); + }); + } } } } else if msg.contains("ai_task") { diff --git a/network-poc/node/src/qwen_coder.rs b/network-poc/node/src/qwen_coder.rs index acbae2c..9e0de37 100644 --- a/network-poc/node/src/qwen_coder.rs +++ b/network-poc/node/src/qwen_coder.rs @@ -21,12 +21,28 @@ const MODEL_3B_PART1_URL: &str = "https://huggingface.co/Qwen/Qwen2.5-Coder-3B-I const MODEL_3B_PART2_URL: &str = "https://huggingface.co/Qwen/Qwen2.5-Coder-3B-Instruct/resolve/main/model-00002-of-00002.safetensors"; const TOKENIZER_3B_URL: &str = "https://huggingface.co/Qwen/Qwen2.5-Coder-3B-Instruct/resolve/main/tokenizer.json"; -async fn ensure_cached(key: &str, url: &str, ws: &Rc>) -> Result, String> { - if let Ok(Some(bytes)) = storage::load_from_idb(key).await { - console_log!("[Coder] {} löytyi välimuistista ({} MB)", key, bytes.len() / 1024 / 1024); +thread_local! { + static RAM_CACHE: RefCell>>> = RefCell::new(std::collections::HashMap::new()); +} + +async fn ensure_cached(key: &str, url: &str, ws: &Rc>) -> Result>, String> { + // 1. Tarkistetaan RAM välimuisti (estää OOM ja levy-I/O pullonkaulat) + let ram_hit = RAM_CACHE.with(|cache| { + cache.borrow().get(key).cloned() + }); + if let Some(bytes) = ram_hit { + console_log!("[Coder] {} löytyi nopeasta RAM-välimuistista!", key); return Ok(bytes); } + // 2. Tarkistetaan IndexedDB (jos selain on suljettu aikaisemmin) + if let Ok(Some(bytes)) = storage::load_from_idb(key).await { + console_log!("[Coder] {} löytyi IndexedDB-välimuistista ({} MB)", key, bytes.len() / 1024 / 1024); + let rc_bytes = Rc::new(bytes); + RAM_CACHE.with(|cache| cache.borrow_mut().insert(key.to_string(), rc_bytes.clone())); + return Ok(rc_bytes); + } + console_log!("[Coder] Ladataan {}...", key); let window = web_sys::window().unwrap(); @@ -68,11 +84,14 @@ async fn ensure_cached(key: &str, url: &str, ws: &Rc>) -> Res } } - console_log!("[Coder] Tallennetaan {} ({} MB)...", key, data.len() / 1024 / 1024); + console_log!("[Coder] Tallennetaan {} ({} MB) IndexedDB:hen...", key, data.len() / 1024 / 1024); let _ = storage::save_to_idb(key, &data).await; console_log!("[Coder] {} tallennettu!", key); - Ok(data) + let rc_data = Rc::new(data); + RAM_CACHE.with(|cache| cache.borrow_mut().insert(key.to_string(), rc_data.clone())); + + Ok(rc_data) } /// use_3b: false = 0.5B (nopea), true = 3B (laadukas) @@ -87,7 +106,7 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use Ok(b) => b, Err(e) => { console_log!("[Coder] Tokenizer-virhe: {}", e); return; } }; - let tokenizer = match tokenizers::Tokenizer::from_bytes(&tok_bytes) { + let tokenizer = match tokenizers::Tokenizer::from_bytes(&tok_bytes[..]) { Ok(t) => t, Err(e) => { console_log!("[Coder] Tokenizer-parsinta: {}", e); return; } }; @@ -107,9 +126,9 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use Err(e) => { console_log!("[Coder] Malli osa 2 virhe: {}", e); return; } }; console_log!("[Coder] Rakennetaan 3B-mallia..."); - let mut all_tensors = candle_core::safetensors::load_buffer(&part1, &device) + let mut all_tensors = candle_core::safetensors::load_buffer(&part1[..], &device) .map_err(|e| format!("Part1: {}", e)).unwrap(); - let tensors2 = candle_core::safetensors::load_buffer(&part2, &device) + let tensors2 = candle_core::safetensors::load_buffer(&part2[..], &device) .map_err(|e| format!("Part2: {}", e)).unwrap(); all_tensors.extend(tensors2); all_tensors @@ -120,7 +139,7 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use Err(e) => { console_log!("[Coder] Malli-virhe: {}", e); return; } }; console_log!("[Coder] Rakennetaan 0.5B-mallia..."); - match candle_core::safetensors::load_buffer(&model_bytes, &device) { + match candle_core::safetensors::load_buffer(&model_bytes[..], &device) { Ok(t) => t, Err(e) => { console_log!("[Coder] Safetensors: {}", e); return; } } @@ -220,7 +239,14 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use } else { logits }; - let mut next_token = crate::sampling::sample_top_k(&logits, 10, 5.0); + + // Sampling-parametrit + let temperature: f32 = 0.7; + let top_k: usize = 40; + let repetition_penalty: f32 = 1.15; + let mut all_generated: Vec = Vec::new(); + + let mut next_token = crate::sampling::sample_top_k_with_penalty(&logits, top_k, temperature, &all_generated, repetition_penalty); if next_token != eos_token { if let Ok(text) = tokenizer.decode(&[next_token], true) { @@ -229,6 +255,7 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use if let Some(ref tid) = task_id { chunk.as_object_mut().unwrap().insert("task_id".to_string(), serde_json::json!(tid)); } let _ = ws.borrow().send_with_str(&chunk.to_string()); } + all_generated.push(next_token); tokens_generated += 1; } @@ -252,7 +279,7 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use } else { logits }; - next_token = crate::sampling::sample_top_k(&logits, 10, 5.0); + next_token = crate::sampling::sample_top_k_with_penalty(&logits, top_k, temperature, &all_generated, repetition_penalty); pos += 1; if next_token == eos_token { break; } @@ -263,6 +290,7 @@ pub async fn run_coder_inference(prompt: String, ws: Rc>, use if let Some(ref tid) = task_id { chunk.as_object_mut().unwrap().insert("task_id".to_string(), serde_json::json!(tid)); } let _ = ws.borrow().send_with_str(&chunk.to_string()); } + all_generated.push(next_token); tokens_generated += 1; // Yield — vapautetaan selaimen event loop joka tokenin jälkeen diff --git a/network-poc/node/src/sampling.rs b/network-poc/node/src/sampling.rs index e0a39f5..8898ed2 100644 --- a/network-poc/node/src/sampling.rs +++ b/network-poc/node/src/sampling.rs @@ -1,39 +1,105 @@ use candle_core::Tensor; +use std::cell::Cell; -/// Top-k sampling ilman softmaxia — kiertää Candlen SoftmaxLastDim Wasm-bugin. -/// Valitsee top-k logiteista ja poimii satunnaisen (painotettu). -/// Jos k=1, toimii kuten argmax (greedy). -pub fn sample_top_k(logits: &Tensor, k: usize, eos_penalty: f32) -> u32 { - // Muunnetaan Vec:ksi - let logits_vec: Vec = logits.to_vec1::().unwrap_or_default(); +thread_local! { + static RNG_STATE: Cell = Cell::new(0); +} + +fn next_rand() -> f32 { + RNG_STATE.with(|state| { + let mut s = state.get(); + if s == 0 { + s = (js_sys::Date::now() * 1000.0) as u64 | 1; + } + s ^= s << 13; + s ^= s >> 7; + s ^= s << 17; + state.set(s); + (s % 10000) as f32 / 10000.0 + }) +} + +/// Top-k sampling with temperature and repetition penalty. +/// `generated_tokens` sisältää aiemmin generoidut token-id:t toiston estämiseksi. +pub fn sample_top_k_with_penalty(logits: &Tensor, k: usize, temperature: f32, generated_tokens: &[u32], repetition_penalty: f32) -> u32 { + let mut logits_vec: Vec = logits.to_vec1::().unwrap_or_default(); if logits_vec.is_empty() { return 0; } - // Rangotaan ja otetaan top-k indeksit + // Repetition penalty + if repetition_penalty != 1.0 { + for &token_id in generated_tokens { + if (token_id as usize) < logits_vec.len() { + let logit = &mut logits_vec[token_id as usize]; + if *logit > 0.0 { + *logit /= repetition_penalty; + } else { + *logit *= repetition_penalty; + } + } + } + } + + // Temperature scaling + if temperature > 0.0 && temperature != 1.0 { + for logit in logits_vec.iter_mut() { + *logit /= temperature; + } + } + + // Top-k let mut indexed: Vec<(usize, f32)> = logits_vec.iter().enumerate().map(|(i, &v)| (i, v)).collect(); indexed.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); indexed.truncate(k); - // EOS-penaltti: vähennetään EOS-tokenin logitia - for item in indexed.iter_mut() { - if item.0 == 2 || item.0 == 151645 { // SmolLM EOS=2, Qwen EOS=151645 - item.1 -= eos_penalty; - } - } - - if k == 1 { + if k == 1 || temperature == 0.0 { return indexed[0].0 as u32; } - // Yksinkertainen "softmax" top-k:lle CPU:lla - let max_logit = indexed.iter().map(|x| x.1).fold(f32::NEG_INFINITY, f32::max); + // Softmax top-k:lle + let max_logit = indexed[0].1; let exps: Vec = indexed.iter().map(|x| (x.1 - max_logit).exp()).collect(); let sum: f32 = exps.iter().sum(); let probs: Vec = exps.iter().map(|e| e / sum).collect(); - // Satunnainen valinta kumulatiivisella todennäköisyydellä - // Käytetään yksinkertaista XorShift-satunnaislukugeneraattoria (ei tarvita getrandom) - let seed = (js_sys::Date::now() * 1000.0) as u64; - let rand_val = ((seed ^ (seed >> 13) ^ (seed << 7)) % 10000) as f32 / 10000.0; + let rand_val = next_rand(); + + let mut cumulative = 0.0; + for (i, p) in probs.iter().enumerate() { + cumulative += p; + if rand_val < cumulative { + return indexed[i].0 as u32; + } + } + + indexed[0].0 as u32 +} + +/// Alkuperäinen API yhteensopivuudeksi SmolLM/Qwen-moduulien kanssa +pub fn sample_top_k(logits: &Tensor, k: usize, eos_penalty: f32) -> u32 { + let mut logits_vec: Vec = logits.to_vec1::().unwrap_or_default(); + if logits_vec.is_empty() { return 0; } + + // EOS-penaltti + for &eos_id in &[2u32, 151645] { + if (eos_id as usize) < logits_vec.len() { + logits_vec[eos_id as usize] -= eos_penalty; + } + } + + let mut indexed: Vec<(usize, f32)> = logits_vec.iter().enumerate().map(|(i, &v)| (i, v)).collect(); + indexed.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)); + indexed.truncate(k); + + if k == 1 { + return indexed[0].0 as u32; + } + + let max_logit = indexed[0].1; + let exps: Vec = indexed.iter().map(|x| (x.1 - max_logit).exp()).collect(); + let sum: f32 = exps.iter().sum(); + let probs: Vec = exps.iter().map(|e| e / sum).collect(); + + let rand_val = next_rand(); let mut cumulative = 0.0; for (i, p) in probs.iter().enumerate() { diff --git a/network-poc/static/index.html b/network-poc/static/index.html index ce839f6..9417715 100644 --- a/network-poc/static/index.html +++ b/network-poc/static/index.html @@ -384,6 +384,7 @@ height:500px; overflow-y:auto; text-align:left; + white-space: pre-wrap; } .terminal-line { margin: 4px 0; } .terminal-prompt { color: #d29922; } @@ -1696,8 +1697,8 @@ // Lähettää promptin mallille ja palauttaa vastauksen (tai null virhetilanteessa) async function kpnRun(model, prompt, silent) { termLog(` → ${model} käsittelee...`, '#8b949e'); + const taskId = crypto.randomUUID(); try { - const taskId = crypto.randomUUID(); const agent = Object.values(agentPrompts).find(a => a.model === model); const parts = []; if (sharedPrompt) parts.push(sharedPrompt); @@ -1722,12 +1723,6 @@ body: JSON.stringify({ model, prompt: fullPrompt, task_id: taskId }), }); - // Poistetaan streaming-rivi - if (activeStreams[taskId]) { - activeStreams[taskId].remove(); - delete activeStreams[taskId]; - } - if (!res.ok) { const errText = await res.text().catch(() => res.statusText); termLog(` ✗ ${errText}`, '#f85149'); @@ -1744,6 +1739,11 @@ } catch (e) { termLog(` ✗ ${e.message}`, '#f85149'); return null; + } finally { + if (activeStreams[taskId]) { + activeStreams[taskId].remove(); + delete activeStreams[taskId]; + } } } @@ -1832,15 +1832,21 @@ } if (sub === 'run') { - const model = parts[2]; + let model = parts[2]; const afterModel = cmd.replace(/^kpn\s+run\s+\S+\s*/, ''); const promptMatch = afterModel.match(/^"(.+)"$|^'(.+)'$|^(.+)$/); const prompt = (promptMatch && (promptMatch[1] || promptMatch[2] || promptMatch[3] || '')).trim(); if (!model || !prompt) { - termLog(' Käyttö: kpn run <malli> "<prompti>"', '#f85149'); + termLog(' Käyttö: kpn run <agentti/malli> "<prompti>"', '#f85149'); return; } + + // Jos käyttäjä syötti agentin nimen (esim. "coder"), vaihdetaan se oikeaksi tekoälymalliksi ("qwen-coder") + if (agentPrompts[model]) { + model = agentPrompts[model].model; + } + kpnRun(model, prompt); return; }