diff --git a/network-poc/frontend/public/kipina-node b/network-poc/frontend/public/kipina-node index b7e755d..743d34b 100644 --- a/network-poc/frontend/public/kipina-node +++ b/network-poc/frontend/public/kipina-node @@ -4,7 +4,6 @@ set -e BASE_URL="https://kipina.studio/download" HUB_URL="${KIPINA_HUB:-wss://kipina.studio/ws}" -MODEL="${KIPINA_MODEL:-qwen2.5-coder:3b}" OLLAMA_URL="${OLLAMA_URL:-http://localhost:11434}" # Tunnista OS ja arkkitehtuuri @@ -96,14 +95,9 @@ fi echo "" echo " Hub: $HUB_URL" echo " Ollama: $OLLAMA_URL" -echo " Malli: $MODEL" - -# Lataa malli (toimii sekä lokaalilla binäärillä että API:n kautta) -if ! curl -s "$OLLAMA_URL/api/tags" | grep -q "$MODEL"; then - echo " Ladataan $MODEL..." - curl -s "$OLLAMA_URL/api/pull" -d "{\"name\":\"$MODEL\"}" > /dev/null +if [ -n "$KIPINA_MODEL" ]; then + echo " Malli: $KIPINA_MODEL (Ympäristömuuttujasta)" fi -echo " ✓ Malli $MODEL valmis" # Lataa binääri BIN_PATH="./kipina-node-bin" @@ -123,8 +117,13 @@ if [ ! -f "$BIN_PATH" ]; then fi echo "" -echo " ✓ Yhdistetään laskentaverkkoon..." +echo " ✓ Siirrytään Kipinä Noden hallintaan..." echo " Ctrl+C pysäyttää" echo "" -HUB_URL="$HUB_URL" OLLAMA_URL="$OLLAMA_URL" OLLAMA_MODEL="$MODEL" exec "$BIN_PATH" +if [ -n "$KIPINA_MODEL" ]; then + export OLLAMA_MODEL="$KIPINA_MODEL" +fi +export HUB_URL="$HUB_URL" +export OLLAMA_URL="$OLLAMA_URL" +exec "$BIN_PATH" diff --git a/network-poc/hub/src/db.rs b/network-poc/hub/src/db.rs index fd4058b..bd68abe 100644 --- a/network-poc/hub/src/db.rs +++ b/network-poc/hub/src/db.rs @@ -49,6 +49,13 @@ impl NodeDb { INSERT INTO _schema_version VALUES (3); "); } + if version < 4 { + let _ = conn.execute_batch(" + ALTER TABLE node_sessions ADD COLUMN is_paused BOOLEAN DEFAULT 0; + DELETE FROM _schema_version; + INSERT INTO _schema_version VALUES (4); + "); + } conn.execute_batch(" CREATE TABLE IF NOT EXISTS node_sessions ( @@ -84,7 +91,10 @@ impl NodeDb { has_webgpu BOOLEAN, -- Tehtävätilastot - tasks_completed INTEGER DEFAULT 0 + tasks_completed INTEGER DEFAULT 0, + + -- Ohjaustilat + is_paused BOOLEAN DEFAULT 0 ); CREATE TABLE IF NOT EXISTS pair_results ( @@ -183,6 +193,14 @@ impl NodeDb { ); } + pub fn update_session_status(&self, node_id: u64, is_paused: bool) { + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); + let _ = conn.execute( + "UPDATE node_sessions SET is_paused = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL", + params![is_paused as i64, node_id as i64], + ); + } + /// Sulkee saman IP:n viewer-sessiot kun aktiivinen node liittyy pub fn close_viewers_by_ip(&self, ip: &str) { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); @@ -216,7 +234,7 @@ impl NodeDb { "SELECT id, node_id, ip, node_type, connected_at, disconnected_at, platform, hostname, os, cpu_cores, cpu_model, ram_mb, gpu_name, gpu_vendor, gpu_backend, vram_total_mb, gpu_temp_c, gpu_util_pct, - allocated_gb, selected_task, has_webgpu, tasks_completed + allocated_gb, selected_task, has_webgpu, tasks_completed, is_paused FROM node_sessions ORDER BY id DESC LIMIT ?1" ).unwrap(); @@ -244,6 +262,7 @@ impl NodeDb { "selected_task": row.get::<_, Option>(19)?, "has_webgpu": row.get::<_, Option>(20)?, "tasks_completed": row.get::<_, i64>(21)?, + "is_paused": row.get::<_, Option>(22)?.unwrap_or(false), })) }).unwrap().filter_map(|r| r.ok()).collect() } diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index edd955c..a7bf9ab 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -25,7 +25,7 @@ const ALLOWED_ORIGINS: &[&str] = &[ ]; // Sallitut viestityyypit clientilta -const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "llm_error", "download_progress", "user_text", "single_tokenize_done"]; +const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "llm_error", "download_progress", "user_text", "single_tokenize_done", "status_update"]; struct AppState { next_node_id: Mutex, @@ -40,6 +40,7 @@ struct AppState { node_ips: Mutex>, node_tasks: Mutex>, // node_id → selected_task node_types: Mutex>, // node_id → "native" | "browser" + node_paused: Mutex>, // node_id → onko tauolla node_busy: Mutex>, // Solmut joilla on aktiivinen tehtävä pending_task_ids: Mutex>, // Hubin jakamat task_id:t (gamification-validointi) pending_responses: Mutex>>, // task_id → oneshot API-vastaukselle @@ -82,6 +83,8 @@ tr:hover td { background:#1c2333; } .table-wrap { overflow-x:auto; max-height:70vh; overflow-y:auto; } .online { color:var(--green); } .offline { color:#8b949e; } +.pause-btn { background:var(--panel); border:1px solid var(--border); color:var(--text); padding:4px 8px; border-radius:4px; cursor:pointer; font-size:12px; } +.pause-btn:hover { border-color:var(--yellow); } @@ -102,12 +105,12 @@ tr:hover td { background:#1c2333; } - + IDTilaTehtäväTyyppiIPAlusta OSCPURAMGPUVRAM - WebGPUTeht.YhdistettyKesto + WebGPUTeht.YhdistettyKestoToiminnot @@ -210,9 +213,17 @@ async function load() { document.getElementById('sessions-body').innerHTML = sessions.map(s => { const online = !s.disconnected_at; const isViewer = s.selected_task === 'viewer'; - const status = online - ? (isViewer ? 'CONNECTED' : 'ACTIVE') - : 'offline'; + let status; + if (!online) { + status = 'offline'; + } else if (isViewer) { + status = 'CONNECTED'; + } else if (s.is_paused) { + status = 'PAUSED'; + } else { + status = 'ACTIVE'; + } + const typeBadge = s.node_type === 'native' ? badge('native','blue') : badge('browser','yellow'); const taskColor = isViewer ? 'yellow' : s.selected_task === 'tokenize' ? 'green' : 'blue'; const taskBadge = badge(taskNames[s.selected_task] || s.selected_task || '?', taskColor); @@ -225,11 +236,16 @@ async function load() { const os = s.os || '-'; const time = s.connected_at ? new Date(s.connected_at).toLocaleString('fi-FI') : ''; const dur = duration(s.connected_at, s.disconnected_at); + const actionBtn = online && !isViewer + ? `` + : ''; + return ` ${s.node_id}${status}${taskBadge}${typeBadge}${s.ip} ${plat}${os}${cores}${ram} ${gpu}${vram}${gpuBadge} ${s.tasks_completed}${time}${dur} + ${actionBtn} `; }).join(''); @@ -269,6 +285,17 @@ async function load() { }).join(''); } +async function togglePause(nodeId, isPaused) { + try { + await fetch('/api/v1/control/' + nodeId, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ action: isPaused ? 'resume' : 'pause' }) + }); + load(); // virkistetään + } catch(e) { console.error(e); } +} + load(); setInterval(load, 5000); @@ -300,6 +327,7 @@ async fn main() { node_ips: Mutex::new(HashMap::new()), node_tasks: Mutex::new(HashMap::new()), node_types: Mutex::new(HashMap::new()), + node_paused: Mutex::new(std::collections::HashSet::new()), node_busy: Mutex::new(std::collections::HashSet::new()), pending_task_ids: Mutex::new(std::collections::HashSet::new()), pending_responses: Mutex::new(HashMap::new()), @@ -421,6 +449,7 @@ async fn main() { .route("/api/pairs", get(api_pairs)) .route("/api/stats", get(api_stats)) .route("/api/v1/chat/completions", axum::routing::post(api_chat_completions)) + .route("/api/v1/control/:id", axum::routing::post(api_control_node)) .route("/api/v1/model", axum::routing::post(api_change_model)) .route("/api/v1/hardware", get(api_hardware)) .route("/api/v1/ollama/tags", get(api_ollama_tags)) @@ -440,6 +469,26 @@ async fn main() { axum::serve(listener, app.into_make_service_with_connect_info::()).await.unwrap(); } +async fn api_control_node( + headers: axum::http::HeaderMap, + axum::extract::State(state): axum::extract::State>, + axum::extract::Path(id): axum::extract::Path, + axum::Json(payload): axum::Json, +) -> axum::response::Response { + if !check_admin_auth(&headers) { return admin_unauthorized(); } + let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or(""); + if action == "pause" || action == "resume" { + let msg = serde_json::json!({ "type": "control", "action": action }); + let channels = state.node_channels.read().await; + if let Some(tx) = channels.get(&id) { + let _ = tx.send(msg.to_string()); + tracing::info!("Lähetetty control: {} solmulle {}", action, id); + return axum::Json(serde_json::json!({"status": "ok"})).into_response(); + } + } + (axum::http::StatusCode::BAD_REQUEST, "Invalid action or node offline").into_response() +} + async fn api_sessions( headers: axum::http::HeaderMap, axum::extract::State(state): axum::extract::State>, @@ -770,6 +819,9 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } state.node_tasks.lock().unwrap().insert(node_id, selected_task); state.node_types.lock().unwrap().insert(node_id, node_type.to_string()); + // Uudelleen-kirjautuessa nollataan tauko + state.node_paused.lock().unwrap().remove(&node_id); + state.db.update_session_status(node_id, false); if node_type == "native" { let sys = json.get("system"); @@ -826,6 +878,18 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { }); let _ = state.stats_tx.send(join_msg.to_string()); + } else if msg_type == "status_update" { + let status = json.get("status").and_then(|v| v.as_str()).unwrap_or("active"); + if status == "paused" { + state.node_paused.lock().unwrap().insert(node_id); + state.db.update_session_status(node_id, true); + tracing::info!("Solmu {} ({}) asettui tauolle.", node_id, ip); + } else { + state.node_paused.lock().unwrap().remove(&node_id); + state.db.update_session_status(node_id, false); + tracing::info!("Solmu {} ({}) on taas aktiivinen.", node_id, ip); + } + broadcast_stats(&state).await; } else if msg_type == "result" { tracing::info!("Solmu {} sai tuloksen: {}", node_id, text); { @@ -1053,6 +1117,7 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { vram.remove(&node_id); } state.node_types.lock().unwrap().remove(&node_id); + state.node_paused.lock().unwrap().remove(&node_id); state.node_models.write().await.remove(&node_id); tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip); broadcast_stats(&state).await; @@ -1179,7 +1244,9 @@ async fn api_chat_completions( let tasks = state.node_tasks.lock().unwrap(); let _busy = state.node_busy.lock().unwrap(); let node_types = state.node_types.lock().unwrap(); - let matching: Vec = tasks.iter().filter(|(_, task)| { + let paused = state.node_paused.lock().unwrap(); + let matching: Vec = tasks.iter().filter(|(k, task)| { + if paused.contains(k) { return false; } // Ei sallita tauotettuja // Eksakti match tai qwen-perheen yhteensopivuus (selain: qwen-coder-05b, natiivi: qwen2.5-coder:7b) let req_model = payload.model.to_lowercase(); let node_task = task.to_lowercase(); diff --git a/network-poc/native-node/Cargo.toml b/network-poc/native-node/Cargo.toml index 452a33f..8f2b44d 100644 --- a/network-poc/native-node/Cargo.toml +++ b/network-poc/native-node/Cargo.toml @@ -19,3 +19,5 @@ wgpu = { version = "24", optional = true } reqwest = { version = "0.12", features = ["json"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +dialoguer = "0.12.0" +console = "0.16.3" diff --git a/network-poc/native-node/src/inference.rs b/network-poc/native-node/src/inference.rs index 2d748d3..2815682 100644 --- a/network-poc/native-node/src/inference.rs +++ b/network-poc/native-node/src/inference.rs @@ -9,8 +9,6 @@ pub struct LlmEngine { impl LlmEngine { pub async fn load() -> Result { - let model = std::env::var("OLLAMA_MODEL").unwrap_or_else(|_| "qwen2.5-coder:3b".to_string()); - let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(600)) .connect_timeout(std::time::Duration::from_secs(3)) @@ -48,6 +46,13 @@ impl LlmEngine { }) }; + // Kysytään malli TUI:lla jos ei pakotettu ympäristöstä + let model = if let Ok(m) = std::env::var("OLLAMA_MODEL") { + m + } else { + crate::tui::select_model(&ollama_url, &client).await? + }; + tracing::info!("Ollama backend: {} | malli: {}", ollama_url, model); Ok(LlmEngine { ollama_url, model: RefCell::new(model), client }) } diff --git a/network-poc/native-node/src/main.rs b/network-poc/native-node/src/main.rs index deb3db7..8008e10 100644 --- a/network-poc/native-node/src/main.rs +++ b/network-poc/native-node/src/main.rs @@ -5,6 +5,7 @@ use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; mod inference; +mod tui; /// GPU-tietorakenne — yhtenäinen kaikille valmistajille struct GpuInfo { @@ -354,74 +355,115 @@ async fn main() { continue; } - while let Some(Ok(msg)) = read.next().await { - if let Message::Text(text) = msg { - // LLM-promptit - if text.contains("llm_prompt") { - if let Ok(task) = serde_json::from_str::(&text) { - let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or(""); - let task_id = task.get("task_id").and_then(|v| v.as_str()).unwrap_or("?"); - let msg_model = task.get("model").and_then(|v| v.as_str()).unwrap_or(""); - - if !prompt.is_empty() && (msg_model.starts_with("qwen-coder") || msg_model.starts_with("qwen2.5-coder")) { + use tokio::io::AsyncBufReadExt; + let mut stdin_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines(); - if let Some(ref engine) = llm { - let max_tokens = task.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(1024) as usize; - let prompt_lines = prompt.lines().count(); - let prompt_last: String = prompt.lines().last().unwrap_or("").chars().take(60).collect(); - tracing::info!("→ task_id:{} | {}r prompti | \"{}...\"", task_id, prompt_lines, prompt_last); - - let model_name = engine.model_name(); - match engine.generate(prompt, max_tokens).await { - Ok(result) => { - tracing::info!( - "✓ {} | {} tok | {:.0}ms | {:.1} tok/s", - model_name, - result.tokens_generated, - result.duration_ms, - result.tokens_per_sec, - ); - - // Lähetetään vain lyhyt prompti-esikatselu (ei koko kontekstia) - let prompt_short: String = prompt.lines().last().unwrap_or("").chars().take(100).collect(); - let done = json!({ - "type": "llm_done", - "prompt": prompt_short, - "model": format!("{} (Ollama)", model_name), - "response": result.text, - "tokens_generated": result.tokens_generated, - "duration_ms": result.duration_ms, - "tokens_per_sec": (result.tokens_per_sec * 10.0).round() / 10.0, - "load_time_ms": 0, - "task_id": task_id, - }); - let _ = write.send(Message::Text(done.to_string())).await; - } - Err(e) => { - tracing::error!("Inferenssivirhe: {}", e); - } - } - } + loop { + tokio::select! { + line = stdin_lines.next_line() => { + if let Ok(Some(text)) = line { + let t = text.trim(); + if t == "p" || t == "pause" { + tracing::info!("Tauotetaan solmun suoritus (Hub ei lähetä tehtäviä)..."); + let req = json!({"type": "status_update", "status": "paused"}); + let _ = write.send(Message::Text(req.to_string())).await; + } else if t == "r" || t == "resume" || t == "s" { + tracing::info!("Jatketaan solmun suoritusta..."); + let req = json!({"type": "status_update", "status": "active"}); + let _ = write.send(Message::Text(req.to_string())).await; } } } - // Mallin vaihto lennossa - if text.contains("change_model") { - if let Ok(task) = serde_json::from_str::(&text) { - if let Some(new_model) = task.get("model").and_then(|v| v.as_str()) { - if let Some(ref engine) = llm { - tracing::info!("Vaihdetaan malli: {}", new_model); - engine.set_model(new_model.to_string()); - match engine.ensure_model().await { - Ok(()) => tracing::info!("Malli {} valmis!", new_model), - Err(e) => tracing::error!("Mallin lataus epäonnistui: {}", e), + ws_msg = read.next() => { + match ws_msg { + Some(Ok(Message::Text(text))) => { + // Hubin control-viestit + if text.contains(r#""type":"control""#) { + if let Ok(task) = serde_json::from_str::(&text) { + if let Some(action) = task.get("action").and_then(|v| v.as_str()) { + if action == "pause" { + tracing::info!("Hub pakotti solmun tauolle (Pause)"); + let req = json!({"type": "status_update", "status": "paused"}); + let _ = write.send(Message::Text(req.to_string())).await; + } else if action == "resume" { + tracing::info!("Hub aktivoi solmun suorituksen (Resume)"); + let req = json!({"type": "status_update", "status": "active"}); + let _ = write.send(Message::Text(req.to_string())).await; + } + } + } + } + + // LLM-promptit + if text.contains("llm_prompt") { + if let Ok(task) = serde_json::from_str::(&text) { + let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or(""); + let task_id = task.get("task_id").and_then(|v| v.as_str()).unwrap_or("?"); + let msg_model = task.get("model").and_then(|v| v.as_str()).unwrap_or(""); + + if !prompt.is_empty() && (msg_model.starts_with("qwen-coder") || msg_model.starts_with("qwen2.5-coder") || msg_model.starts_with("phi")) { + if let Some(ref engine) = llm { + let max_tokens = task.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(1024) as usize; + let prompt_lines = prompt.lines().count(); + let prompt_last: String = prompt.lines().last().unwrap_or("").chars().take(60).collect(); + tracing::info!("→ task_id:{} | {}r prompti | \"{}...\"", task_id, prompt_lines, prompt_last); + + let model_name = engine.model_name(); + match engine.generate(prompt, max_tokens).await { + Ok(result) => { + tracing::info!( + "✓ {} | {} tok | {:.0}ms | {:.1} tok/s", + model_name, + result.tokens_generated, + result.duration_ms, + result.tokens_per_sec, + ); + let prompt_short: String = prompt.lines().last().unwrap_or("").chars().take(100).collect(); + let done = json!({ + "type": "llm_done", + "prompt": prompt_short, + "model": format!("{} (Ollama)", model_name), + "response": result.text, + "tokens_generated": result.tokens_generated, + "duration_ms": result.duration_ms, + "tokens_per_sec": (result.tokens_per_sec * 10.0).round() / 10.0, + "load_time_ms": 0, + "task_id": task_id, + }); + let _ = write.send(Message::Text(done.to_string())).await; + } + Err(e) => { + tracing::error!("Inferenssivirhe: {}", e); + } + } + } + } + } + } + + // Mallin vaihto lennossa + if text.contains("change_model") { + if let Ok(task) = serde_json::from_str::(&text) { + if let Some(new_model) = task.get("model").and_then(|v| v.as_str()) { + if let Some(ref engine) = llm { + tracing::info!("Vaihdetaan malli: {}", new_model); + engine.set_model(new_model.to_string()); + match engine.ensure_model().await { + Ok(()) => tracing::info!("Malli {} valmis!", new_model), + Err(e) => tracing::error!("Mallin lataus epäonnistui: {}", e), + } + } + } } } } + Some(Ok(_)) => {} // Muut viestityypit (binary/ping) + Some(Err(_)) | None => break, // Yhteys poikki } } } } + tracing::warn!("Yhteys hubiin katkesi — yritetään uudelleen 5s..."); } Err(e) => { diff --git a/network-poc/native-node/src/tui.rs b/network-poc/native-node/src/tui.rs new file mode 100644 index 0000000..b0c1df8 --- /dev/null +++ b/network-poc/native-node/src/tui.rs @@ -0,0 +1,67 @@ +use dialoguer::{Select, Input, theme::ColorfulTheme}; +use reqwest::Client; + +pub async fn select_model(ollama_url: &str, client: &Client) -> Result { + // 1. Hae tagit + let mut models = vec![]; + println!(" Haetaan asennettuja malleja osoitteesta {}...", ollama_url); + if let Ok(resp) = client.get(&format!("{}/api/tags", ollama_url)).send().await { + if resp.status().is_success() { + if let Ok(json) = resp.json::().await { + if let Some(arr) = json.get("models").and_then(|v| v.as_array()) { + for m in arr { + if let Some(name) = m.get("name").and_then(|v| v.as_str()) { + models.push(name.to_string()); + } + } + } + } + } + } + + let download_opt = "[➕ Lataa uusi malli internetistä]"; + let mut options = vec![download_opt.to_string()]; + options.extend(models); + + // 2. Kysy käyttäjältä Selectillä + let theme = ColorfulTheme::default(); + let selection = Select::with_theme(&theme) + .with_prompt("Valitse Ollama-malli Kipinä-verkkoa varten:") + .default(if options.len() > 1 { 1 } else { 0 }) + .items(&options) + .interact() + .map_err(|e| format!("TUI virhe: {}", e))?; + + let selected = &options[selection]; + + // 3. Jos käyttäjä haluaa uuden, kysy nimeä + if selected == download_opt { + let new_model: String = Input::with_theme(&theme) + .with_prompt("Syötä ladattavan mallin nimi (esim. llama3 tai qwen2.5-coder:3b)") + .interact_text() + .map_err(|e| format!("TUI virhe: {}", e))?; + + let new_model = new_model.trim().to_string(); + if new_model.is_empty() { + return Err("Mallin nimi ei voi olla tyhjä".to_string()); + } + + println!(" Ladataan malleja taustalla... Tämä voi kestää hetken ({})", new_model); + // Odotetaan että pull on valmis + let pull_body = serde_json::json!({ "name": &new_model }); + let resp = client.post(&format!("{}/api/pull", ollama_url)) + .json(&pull_body) + .send() + .await + .map_err(|e| format!("Pull req virhe: {}", e))?; + + if resp.status().is_success() { + println!(" ✓ Malli {} ladattu onnistuneesti!", new_model); + return Ok(new_model); + } else { + return Err(format!("Ollama pull epäonnistui: {}", resp.status())); + } + } + + Ok(selected.clone()) +}