use rusqlite::{Connection, params}; use std::sync::Mutex; pub struct NodeDb { conn: Mutex, } impl NodeDb { pub fn new(path: &str) -> Self { let conn = Connection::open(path).expect("SQLite-tietokantaa ei voitu avata"); // Poista vanha tietokanta jos skeema on rikki — PoC, ei tuotantodata let _ = conn.execute_batch(" CREATE TABLE IF NOT EXISTS _schema_version (version INTEGER); "); let version: i64 = conn.query_row( "SELECT COALESCE(MAX(version), 0) FROM _schema_version", [], |r| r.get(0) ).unwrap_or(0); if version < 2 { // Pudotetaan vanhat taulut ja luodaan uudet let _ = conn.execute_batch(" DROP TABLE IF EXISTS node_sessions; DROP TABLE IF EXISTS pair_results; DELETE FROM _schema_version; INSERT INTO _schema_version VALUES (2); "); } if version < 3 { let _ = conn.execute_batch(" CREATE TABLE IF NOT EXISTS agents ( id TEXT PRIMARY KEY, name TEXT NOT NULL, avatar TEXT NOT NULL DEFAULT '/avatars/kipina_notext.png', role TEXT NOT NULL DEFAULT 'coder', model TEXT NOT NULL DEFAULT 'qwen2.5-coder:7b', color TEXT NOT NULL DEFAULT '#3fb950', docs TEXT, prompt TEXT NOT NULL DEFAULT '', temperature REAL DEFAULT 0.7, top_k INTEGER DEFAULT 40, max_tokens INTEGER DEFAULT 512, repetition_penalty REAL DEFAULT 1.15, is_default BOOLEAN DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); DELETE FROM _schema_version; INSERT INTO _schema_version VALUES (3); "); } conn.execute_batch(" CREATE TABLE IF NOT EXISTS node_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id INTEGER NOT NULL, ip TEXT NOT NULL, node_type TEXT NOT NULL DEFAULT 'browser', connected_at TEXT NOT NULL, disconnected_at TEXT, -- Järjestelmätiedot platform TEXT, hostname TEXT, os TEXT, cpu_cores INTEGER, cpu_model TEXT, ram_mb INTEGER, -- GPU gpu_name TEXT, gpu_vendor TEXT, gpu_backend TEXT, vram_total_mb INTEGER, vram_used_mb INTEGER, gpu_temp_c INTEGER, gpu_util_pct INTEGER, -- Varaus ja tehtävä allocated_gb INTEGER, selected_task TEXT DEFAULT 'tokenize', -- WebGPU-tuki has_webgpu BOOLEAN, -- Tehtävätilastot tasks_completed INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS pair_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, node_id INTEGER NOT NULL, created_at TEXT NOT NULL, en_text TEXT, fi_text TEXT, en_tokens INTEGER, fi_tokens INTEGER, en_chars_per_token REAL, fi_chars_per_token REAL, overhead_pct REAL, duration_ms INTEGER ); ").expect("Tietokantataulujen luonti epäonnistui"); NodeDb { conn: Mutex::new(conn) } } pub fn insert_session( &self, node_id: u64, ip: &str, node_type: &str, auth_data: &serde_json::Value, ) -> i64 { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); // Selainsolmun tiedot let platform = auth_data.get("platform").and_then(|v| v.as_str()); let cpu_cores = auth_data.get("cpu_cores").and_then(|v| v.as_u64()); let ram = auth_data.get("device_memory_gb").and_then(|v| v.as_f64()).map(|v| (v * 1024.0) as i64); let allocated = auth_data.get("allocated_gb").and_then(|v| v.as_u64()); let selected_task = auth_data.get("selected_task").and_then(|v| v.as_str()); // GPU (selain) let gpu_vendor = auth_data.get("gpu").and_then(|g| g.get("vendor")).and_then(|v| v.as_str()); let gpu_desc = auth_data.get("gpu").and_then(|g| g.get("description")).and_then(|v| v.as_str()); let gpu_backend = if gpu_vendor.is_some() { Some("WebGPU") } else { None }; let has_webgpu = gpu_vendor.is_some(); // Natiivi-solmun tiedot let sys = auth_data.get("system"); let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()); let os = sys.and_then(|s| s.get("os")).and_then(|v| v.as_str()); let native_cores = sys.and_then(|s| s.get("cpu_cores")).and_then(|v| v.as_u64()); let cpu_model = sys.and_then(|s| s.get("cpu_model")).and_then(|v| v.as_str()); let native_ram = sys.and_then(|s| s.get("ram_total_mb")).and_then(|v| v.as_u64()); // GPU (natiivi — ensimmäinen GPU) let gpu = auth_data.get("gpus").and_then(|v| v.as_array()).and_then(|a| a.first()); let native_gpu_name = gpu.and_then(|g| g.get("name")).and_then(|v| v.as_str()); let native_gpu_vendor = gpu.and_then(|g| g.get("vendor")).and_then(|v| v.as_str()); let native_gpu_backend = gpu.and_then(|g| g.get("backend")).and_then(|v| v.as_str()); let vram_total = gpu.and_then(|g| g.get("vram_total_mb")).and_then(|v| v.as_u64()); let vram_used = gpu.and_then(|g| g.get("vram_used_mb")).and_then(|v| v.as_u64()); let gpu_temp = gpu.and_then(|g| g.get("temperature_c")).and_then(|v| v.as_u64()); let gpu_util = gpu.and_then(|g| g.get("gpu_util_pct")).and_then(|v| v.as_u64()); conn.execute( "INSERT INTO node_sessions ( node_id, ip, node_type, connected_at, platform, hostname, os, cpu_cores, cpu_model, ram_mb, gpu_name, gpu_vendor, gpu_backend, vram_total_mb, vram_used_mb, gpu_temp_c, gpu_util_pct, allocated_gb, selected_task, has_webgpu ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20)", params![ node_id as i64, ip, node_type, now, platform, hostname, os, native_cores.or(cpu_cores).map(|v| v as i64), cpu_model, native_ram.map(|v| v as i64).or(ram), native_gpu_name.or(gpu_desc), native_gpu_vendor.or(gpu_vendor), native_gpu_backend.or(gpu_backend), vram_total.map(|v| v as i64), vram_used.map(|v| v as i64), gpu_temp.map(|v| v as i64), gpu_util.map(|v| v as i64), allocated.map(|v| v as i64), selected_task, has_webgpu, ], ).expect("Session insert epäonnistui"); conn.last_insert_rowid() } pub fn update_session_task(&self, node_id: u64, task: &str) { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let _ = conn.execute( "UPDATE node_sessions SET selected_task = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL", params![task, node_id as i64], ); } /// Sulkee saman IP:n viewer-sessiot kun aktiivinen node liittyy pub fn close_viewers_by_ip(&self, ip: &str) { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); let _ = conn.execute( "UPDATE node_sessions SET disconnected_at = ?1 WHERE ip = ?2 AND disconnected_at IS NULL AND (selected_task = 'viewer' OR selected_task = 'codelab-viewer')", params![now, ip], ); } pub fn close_session(&self, node_id: u64) { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); let _ = conn.execute( "UPDATE node_sessions SET disconnected_at = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL", params![now, node_id as i64], ); } pub fn increment_tasks(&self, node_id: u64) { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let _ = conn.execute( "UPDATE node_sessions SET tasks_completed = tasks_completed + 1 WHERE node_id = ?1 AND disconnected_at IS NULL", params![node_id as i64], ); } pub fn get_sessions(&self, limit: u32) -> Vec { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let mut stmt = conn.prepare( "SELECT id, node_id, ip, node_type, connected_at, disconnected_at, platform, hostname, os, cpu_cores, cpu_model, ram_mb, gpu_name, gpu_vendor, gpu_backend, vram_total_mb, gpu_temp_c, gpu_util_pct, allocated_gb, selected_task, has_webgpu, tasks_completed FROM node_sessions ORDER BY id DESC LIMIT ?1" ).unwrap(); stmt.query_map(params![limit], |row| { Ok(serde_json::json!({ "id": row.get::<_, i64>(0)?, "node_id": row.get::<_, i64>(1)?, "ip": row.get::<_, String>(2)?, "node_type": row.get::<_, String>(3)?, "connected_at": row.get::<_, String>(4)?, "disconnected_at": row.get::<_, Option>(5)?, "platform": row.get::<_, Option>(6)?, "hostname": row.get::<_, Option>(7)?, "os": row.get::<_, Option>(8)?, "cpu_cores": row.get::<_, Option>(9)?, "cpu_model": row.get::<_, Option>(10)?, "ram_mb": row.get::<_, Option>(11)?, "gpu_name": row.get::<_, Option>(12)?, "gpu_vendor": row.get::<_, Option>(13)?, "gpu_backend": row.get::<_, Option>(14)?, "vram_total_mb": row.get::<_, Option>(15)?, "gpu_temp_c": row.get::<_, Option>(16)?, "gpu_util_pct": row.get::<_, Option>(17)?, "allocated_gb": row.get::<_, Option>(18)?, "selected_task": row.get::<_, Option>(19)?, "has_webgpu": row.get::<_, Option>(20)?, "tasks_completed": row.get::<_, i64>(21)?, })) }).unwrap().filter_map(|r| r.ok()).collect() } pub fn get_pair_results(&self, limit: u32) -> Vec { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let mut stmt = conn.prepare( "SELECT id, node_id, created_at, en_text, fi_text, en_tokens, fi_tokens, en_chars_per_token, fi_chars_per_token, overhead_pct, duration_ms FROM pair_results ORDER BY id DESC LIMIT ?1" ).unwrap(); stmt.query_map(params![limit], |row| { Ok(serde_json::json!({ "id": row.get::<_, i64>(0)?, "node_id": row.get::<_, i64>(1)?, "created_at": row.get::<_, String>(2)?, "en_text": row.get::<_, Option>(3)?, "fi_text": row.get::<_, Option>(4)?, "en_tokens": row.get::<_, Option>(5)?, "fi_tokens": row.get::<_, Option>(6)?, "en_chars_per_token": row.get::<_, Option>(7)?, "fi_chars_per_token": row.get::<_, Option>(8)?, "overhead_pct": row.get::<_, Option>(9)?, "duration_ms": row.get::<_, Option>(10)?, })) }).unwrap().filter_map(|r| r.ok()).collect() } pub fn get_stats(&self) -> serde_json::Value { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let total_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions", [], |r| r.get(0)).unwrap_or(0); let active_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE disconnected_at IS NULL", [], |r| r.get(0)).unwrap_or(0); let total_pairs: i64 = conn.query_row("SELECT COUNT(*) FROM pair_results", [], |r| r.get(0)).unwrap_or(0); let avg_overhead: f64 = conn.query_row("SELECT COALESCE(AVG(overhead_pct), 0) FROM pair_results", [], |r| r.get(0)).unwrap_or(0.0); let avg_en_cpt: f64 = conn.query_row("SELECT COALESCE(AVG(en_chars_per_token), 0) FROM pair_results", [], |r| r.get(0)).unwrap_or(0.0); let avg_fi_cpt: f64 = conn.query_row("SELECT COALESCE(AVG(fi_chars_per_token), 0) FROM pair_results", [], |r| r.get(0)).unwrap_or(0.0); let webgpu_count: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE has_webgpu = 1", [], |r| r.get(0)).unwrap_or(0); let cpu_fallback_count: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE has_webgpu = 0 OR has_webgpu IS NULL", [], |r| r.get(0)).unwrap_or(0); let unique_ips: i64 = conn.query_row("SELECT COUNT(DISTINCT ip) FROM node_sessions", [], |r| r.get(0)).unwrap_or(0); serde_json::json!({ "total_sessions": total_sessions, "active_sessions": active_sessions, "unique_ips": unique_ips, "total_pairs": total_pairs, "avg_overhead_pct": (avg_overhead * 10.0).round() / 10.0, "avg_en_chars_per_token": (avg_en_cpt * 100.0).round() / 100.0, "avg_fi_chars_per_token": (avg_fi_cpt * 100.0).round() / 100.0, "webgpu_sessions": webgpu_count, "cpu_fallback_sessions": cpu_fallback_count, }) } // ── Agents CRUD ── pub fn upsert_agent(&self, agent: &serde_json::Value) -> Result<(), String> { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); let id = agent.get("id").and_then(|v| v.as_str()).ok_or("id puuttuu")?; let name = agent.get("name").and_then(|v| v.as_str()).ok_or("name puuttuu")?; conn.execute( "INSERT INTO agents (id, name, avatar, role, model, color, docs, prompt, temperature, top_k, max_tokens, repetition_penalty, is_default, created_at, updated_at) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?14) ON CONFLICT(id) DO UPDATE SET name=?2, avatar=?3, role=?4, model=?5, color=?6, docs=?7, prompt=?8, temperature=?9, top_k=?10, max_tokens=?11, repetition_penalty=?12, updated_at=?14", params![ id, name, agent.get("avatar").and_then(|v| v.as_str()).unwrap_or("/avatars/kipina_notext.png"), agent.get("role").and_then(|v| v.as_str()).unwrap_or("coder"), agent.get("model").and_then(|v| v.as_str()).unwrap_or("qwen2.5-coder:7b"), agent.get("color").and_then(|v| v.as_str()).unwrap_or("#3fb950"), agent.get("docs").and_then(|v| v.as_str()), agent.get("prompt").and_then(|v| v.as_str()).unwrap_or(""), agent.get("temperature").and_then(|v| v.as_f64()).unwrap_or(0.7), agent.get("top_k").and_then(|v| v.as_u64()).unwrap_or(40) as i64, agent.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(512) as i64, agent.get("repetition_penalty").and_then(|v| v.as_f64()).unwrap_or(1.15), agent.get("is_default").and_then(|v| v.as_bool()).unwrap_or(false), now, ], ).map_err(|e| format!("Agent upsert: {}", e))?; Ok(()) } pub fn get_agents(&self) -> Vec { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let mut stmt = conn.prepare( "SELECT id, name, avatar, role, model, color, docs, prompt, temperature, top_k, max_tokens, repetition_penalty, is_default, created_at, updated_at FROM agents ORDER BY is_default DESC, name" ).unwrap(); stmt.query_map([], |row| { Ok(serde_json::json!({ "id": row.get::<_, String>(0)?, "name": row.get::<_, String>(1)?, "avatar": row.get::<_, String>(2)?, "role": row.get::<_, String>(3)?, "model": row.get::<_, String>(4)?, "color": row.get::<_, String>(5)?, "docs": row.get::<_, Option>(6)?, "prompt": row.get::<_, String>(7)?, "temperature": row.get::<_, f64>(8)?, "top_k": row.get::<_, i64>(9)?, "max_tokens": row.get::<_, i64>(10)?, "repetition_penalty": row.get::<_, f64>(11)?, "is_default": row.get::<_, bool>(12)?, "created_at": row.get::<_, String>(13)?, "updated_at": row.get::<_, String>(14)?, })) }).unwrap().filter_map(|r| r.ok()).collect() } pub fn delete_agent(&self, id: &str) -> Result<(), String> { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let deleted = conn.execute( "DELETE FROM agents WHERE id = ?1 AND is_default = 0", params![id], ).map_err(|e| format!("Agent delete: {}", e))?; if deleted == 0 { Err("Agenttia ei löydy tai se on oletusagentti".to_string()) } else { Ok(()) } } pub fn insert_pair_result( &self, node_id: u64, en: &serde_json::Value, fi: &serde_json::Value, overhead: f64, duration_ms: f64, ) { let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let now = chrono::Utc::now().to_rfc3339(); let _ = conn.execute( "INSERT INTO pair_results ( node_id, created_at, en_text, fi_text, en_tokens, fi_tokens, en_chars_per_token, fi_chars_per_token, overhead_pct, duration_ms ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10)", params![ node_id as i64, now, en.get("text").and_then(|v| v.as_str()), fi.get("text").and_then(|v| v.as_str()), en.get("token_count").and_then(|v| v.as_u64()).map(|v| v as i64), fi.get("token_count").and_then(|v| v.as_u64()).map(|v| v as i64), en.get("chars_per_token").and_then(|v| v.as_f64()), fi.get("chars_per_token").and_then(|v| v.as_f64()), overhead, duration_ms, ], ); } }