diff --git a/network-poc/docker-compose.prod.yml b/network-poc/docker-compose.prod.yml index 67d6455..7b320c4 100644 --- a/network-poc/docker-compose.prod.yml +++ b/network-poc/docker-compose.prod.yml @@ -19,7 +19,12 @@ services: dockerfile: Dockerfile.prod container_name: kipina-agentic-hub restart: unless-stopped + environment: + - DATABASE_PATH=/data/nodes.db + volumes: + - hub_data:/data volumes: caddy_data: caddy_config: + hub_data: diff --git a/network-poc/hub/Cargo.toml b/network-poc/hub/Cargo.toml index 38d3398..182d6e8 100644 --- a/network-poc/hub/Cargo.toml +++ b/network-poc/hub/Cargo.toml @@ -13,3 +13,5 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1.7.0", features = ["v4", "serde"] } futures = "0.3" +rusqlite = { version = "0.31", features = ["bundled"] } +chrono = "0.4" diff --git a/network-poc/hub/src/db.rs b/network-poc/hub/src/db.rs new file mode 100644 index 0000000..f16f14e --- /dev/null +++ b/network-poc/hub/src/db.rs @@ -0,0 +1,272 @@ +use rusqlite::{Connection, params}; +use std::sync::Mutex; + +pub struct NodeDb { + conn: Mutex, +} + +impl NodeDb { + pub fn new(path: &str) -> Self { + let conn = Connection::open(path).expect("SQLite-tietokantaa ei voitu avata"); + + conn.execute_batch(" + CREATE TABLE IF NOT EXISTS node_sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + node_id INTEGER NOT NULL, + ip TEXT NOT NULL, + node_type TEXT NOT NULL DEFAULT 'browser', + connected_at TEXT NOT NULL, + disconnected_at TEXT, + + -- Järjestelmätiedot + platform TEXT, + hostname TEXT, + os TEXT, + cpu_cores INTEGER, + cpu_model TEXT, + ram_mb INTEGER, + + -- GPU + gpu_name TEXT, + gpu_vendor TEXT, + gpu_backend TEXT, + vram_total_mb INTEGER, + vram_used_mb INTEGER, + gpu_temp_c INTEGER, + gpu_util_pct INTEGER, + + -- Varaus + allocated_gb INTEGER, + + -- WebGPU-tuki + has_webgpu BOOLEAN, + + -- Tehtävätilastot + tasks_completed INTEGER DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS pair_results ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + node_id INTEGER NOT NULL, + created_at TEXT NOT NULL, + en_text TEXT, + fi_text TEXT, + en_tokens INTEGER, + fi_tokens INTEGER, + en_chars_per_token REAL, + fi_chars_per_token REAL, + overhead_pct REAL, + duration_ms INTEGER + ); + ").expect("Tietokantataulujen luonti epäonnistui"); + + NodeDb { conn: Mutex::new(conn) } + } + + pub fn insert_session( + &self, + node_id: u64, + ip: &str, + node_type: &str, + auth_data: &serde_json::Value, + ) -> i64 { + let conn = self.conn.lock().unwrap(); + let now = chrono::Utc::now().to_rfc3339(); + + // Selainsolmun tiedot + let platform = auth_data.get("platform").and_then(|v| v.as_str()); + let cpu_cores = auth_data.get("cpu_cores").and_then(|v| v.as_u64()); + let ram = auth_data.get("device_memory_gb").and_then(|v| v.as_f64()).map(|v| (v * 1024.0) as i64); + let allocated = auth_data.get("allocated_gb").and_then(|v| v.as_u64()); + + // GPU (selain) + let gpu_vendor = auth_data.get("gpu").and_then(|g| g.get("vendor")).and_then(|v| v.as_str()); + let gpu_desc = auth_data.get("gpu").and_then(|g| g.get("description")).and_then(|v| v.as_str()); + let gpu_backend = if gpu_vendor.is_some() { Some("WebGPU") } else { None }; + let has_webgpu = gpu_vendor.is_some(); + + // Natiivi-solmun tiedot + let sys = auth_data.get("system"); + let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()); + let os = sys.and_then(|s| s.get("os")).and_then(|v| v.as_str()); + let native_cores = sys.and_then(|s| s.get("cpu_cores")).and_then(|v| v.as_u64()); + let cpu_model = sys.and_then(|s| s.get("cpu_model")).and_then(|v| v.as_str()); + let native_ram = sys.and_then(|s| s.get("ram_total_mb")).and_then(|v| v.as_u64()); + + // GPU (natiivi — ensimmäinen GPU) + let gpu = auth_data.get("gpus").and_then(|v| v.as_array()).and_then(|a| a.first()); + let native_gpu_name = gpu.and_then(|g| g.get("name")).and_then(|v| v.as_str()); + let native_gpu_vendor = gpu.and_then(|g| g.get("vendor")).and_then(|v| v.as_str()); + let native_gpu_backend = gpu.and_then(|g| g.get("backend")).and_then(|v| v.as_str()); + let vram_total = gpu.and_then(|g| g.get("vram_total_mb")).and_then(|v| v.as_u64()); + let vram_used = gpu.and_then(|g| g.get("vram_used_mb")).and_then(|v| v.as_u64()); + let gpu_temp = gpu.and_then(|g| g.get("temperature_c")).and_then(|v| v.as_u64()); + let gpu_util = gpu.and_then(|g| g.get("gpu_util_pct")).and_then(|v| v.as_u64()); + + conn.execute( + "INSERT INTO node_sessions ( + node_id, ip, node_type, connected_at, + platform, hostname, os, cpu_cores, cpu_model, ram_mb, + gpu_name, gpu_vendor, gpu_backend, vram_total_mb, vram_used_mb, gpu_temp_c, gpu_util_pct, + allocated_gb, has_webgpu + ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)", + params![ + node_id as i64, ip, node_type, now, + platform, hostname, os, + native_cores.or(cpu_cores).map(|v| v as i64), + cpu_model, + native_ram.map(|v| v as i64).or(ram), + native_gpu_name.or(gpu_desc), + native_gpu_vendor.or(gpu_vendor), + native_gpu_backend.or(gpu_backend), + vram_total.map(|v| v as i64), + vram_used.map(|v| v as i64), + gpu_temp.map(|v| v as i64), + gpu_util.map(|v| v as i64), + allocated.map(|v| v as i64), + has_webgpu, + ], + ).expect("Session insert epäonnistui"); + + conn.last_insert_rowid() + } + + pub fn close_session(&self, node_id: u64) { + let conn = self.conn.lock().unwrap(); + let now = chrono::Utc::now().to_rfc3339(); + let _ = conn.execute( + "UPDATE node_sessions SET disconnected_at = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL", + params![now, node_id as i64], + ); + } + + pub fn increment_tasks(&self, node_id: u64) { + let conn = self.conn.lock().unwrap(); + let _ = conn.execute( + "UPDATE node_sessions SET tasks_completed = tasks_completed + 1 WHERE node_id = ?1 AND disconnected_at IS NULL", + params![node_id as i64], + ); + } + + pub fn get_sessions(&self, limit: u32) -> Vec { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, node_id, ip, node_type, connected_at, disconnected_at, + platform, hostname, os, cpu_cores, cpu_model, ram_mb, + gpu_name, gpu_vendor, gpu_backend, vram_total_mb, gpu_temp_c, gpu_util_pct, + allocated_gb, has_webgpu, tasks_completed + FROM node_sessions ORDER BY id DESC LIMIT ?1" + ).unwrap(); + + stmt.query_map(params![limit], |row| { + Ok(serde_json::json!({ + "id": row.get::<_, i64>(0)?, + "node_id": row.get::<_, i64>(1)?, + "ip": row.get::<_, String>(2)?, + "node_type": row.get::<_, String>(3)?, + "connected_at": row.get::<_, String>(4)?, + "disconnected_at": row.get::<_, Option>(5)?, + "platform": row.get::<_, Option>(6)?, + "hostname": row.get::<_, Option>(7)?, + "os": row.get::<_, Option>(8)?, + "cpu_cores": row.get::<_, Option>(9)?, + "cpu_model": row.get::<_, Option>(10)?, + "ram_mb": row.get::<_, Option>(11)?, + "gpu_name": row.get::<_, Option>(12)?, + "gpu_vendor": row.get::<_, Option>(13)?, + "gpu_backend": row.get::<_, Option>(14)?, + "vram_total_mb": row.get::<_, Option>(15)?, + "gpu_temp_c": row.get::<_, Option>(16)?, + "gpu_util_pct": row.get::<_, Option>(17)?, + "allocated_gb": row.get::<_, Option>(18)?, + "has_webgpu": row.get::<_, Option>(19)?, + "tasks_completed": row.get::<_, i64>(20)?, + })) + }).unwrap().filter_map(|r| r.ok()).collect() + } + + pub fn get_pair_results(&self, limit: u32) -> Vec { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT id, node_id, created_at, en_text, fi_text, + en_tokens, fi_tokens, en_chars_per_token, fi_chars_per_token, + overhead_pct, duration_ms + FROM pair_results ORDER BY id DESC LIMIT ?1" + ).unwrap(); + + stmt.query_map(params![limit], |row| { + Ok(serde_json::json!({ + "id": row.get::<_, i64>(0)?, + "node_id": row.get::<_, i64>(1)?, + "created_at": row.get::<_, String>(2)?, + "en_text": row.get::<_, Option>(3)?, + "fi_text": row.get::<_, Option>(4)?, + "en_tokens": row.get::<_, Option>(5)?, + "fi_tokens": row.get::<_, Option>(6)?, + "en_chars_per_token": row.get::<_, Option>(7)?, + "fi_chars_per_token": row.get::<_, Option>(8)?, + "overhead_pct": row.get::<_, Option>(9)?, + "duration_ms": row.get::<_, Option>(10)?, + })) + }).unwrap().filter_map(|r| r.ok()).collect() + } + + pub fn get_stats(&self) -> serde_json::Value { + let conn = self.conn.lock().unwrap(); + + let total_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions", [], |r| r.get(0)).unwrap_or(0); + let active_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE disconnected_at IS NULL", [], |r| r.get(0)).unwrap_or(0); + let total_pairs: i64 = conn.query_row("SELECT COUNT(*) FROM pair_results", [], |r| r.get(0)).unwrap_or(0); + let avg_overhead: f64 = conn.query_row("SELECT COALESCE(AVG(overhead_pct), 0) FROM pair_results", [], |r| r.get(0)).unwrap_or(0.0); + let avg_en_cpt: f64 = conn.query_row("SELECT COALESCE(AVG(en_chars_per_token), 0) FROM pair_results", [], |r| r.get(0)).unwrap_or(0.0); + let avg_fi_cpt: f64 = conn.query_row("SELECT COALESCE(AVG(fi_chars_per_token), 0) FROM pair_results", [], |r| r.get(0)).unwrap_or(0.0); + + let webgpu_count: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE has_webgpu = 1", [], |r| r.get(0)).unwrap_or(0); + let cpu_fallback_count: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE has_webgpu = 0 OR has_webgpu IS NULL", [], |r| r.get(0)).unwrap_or(0); + + let unique_ips: i64 = conn.query_row("SELECT COUNT(DISTINCT ip) FROM node_sessions", [], |r| r.get(0)).unwrap_or(0); + + serde_json::json!({ + "total_sessions": total_sessions, + "active_sessions": active_sessions, + "unique_ips": unique_ips, + "total_pairs": total_pairs, + "avg_overhead_pct": (avg_overhead * 10.0).round() / 10.0, + "avg_en_chars_per_token": (avg_en_cpt * 100.0).round() / 100.0, + "avg_fi_chars_per_token": (avg_fi_cpt * 100.0).round() / 100.0, + "webgpu_sessions": webgpu_count, + "cpu_fallback_sessions": cpu_fallback_count, + }) + } + + pub fn insert_pair_result( + &self, + node_id: u64, + en: &serde_json::Value, + fi: &serde_json::Value, + overhead: f64, + duration_ms: u64, + ) { + let conn = self.conn.lock().unwrap(); + let now = chrono::Utc::now().to_rfc3339(); + let _ = conn.execute( + "INSERT INTO pair_results ( + node_id, created_at, en_text, fi_text, + en_tokens, fi_tokens, en_chars_per_token, fi_chars_per_token, + overhead_pct, duration_ms + ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10)", + params![ + node_id as i64, + now, + en.get("text").and_then(|v| v.as_str()), + fi.get("text").and_then(|v| v.as_str()), + en.get("token_count").and_then(|v| v.as_u64()).map(|v| v as i64), + fi.get("token_count").and_then(|v| v.as_u64()).map(|v| v as i64), + en.get("chars_per_token").and_then(|v| v.as_f64()), + fi.get("chars_per_token").and_then(|v| v.as_f64()), + overhead, + duration_ms as i64, + ], + ); + } +} diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs index 125461d..e11170d 100644 --- a/network-poc/hub/src/main.rs +++ b/network-poc/hub/src/main.rs @@ -13,6 +13,8 @@ use tokio::sync::broadcast; use tower_http::services::ServeDir; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +mod db; + const MAX_MESSAGE_SIZE: usize = 16 * 1024; // Sallitut originit — estää cross-site WebSocket hijackingin @@ -30,12 +32,173 @@ struct AppState { nodes_vram: Mutex>, total_tasks: Mutex, stats_tx: broadcast::Sender, - // IP-rajoitus: max 2 yhteyttä per IP (dashboard-UI + selainsolmu) ip_connections: Mutex>, - // Node ID → IP -mappaus (siivousta varten) node_ips: Mutex>, + db: db::NodeDb, } +const ADMIN_HTML: &str = r##" + + + + +Kipina Admin + + + +

Kipina Admin

+

Node-sessiot ja tokenisointivertailut

+ +
+ +
+
Sessiot
+
Tokenisointiparit
+
+ +
+
+ + + + +
IDTilaTyyppiIPAlustaOSCPURAMGPUVRAMWebGPUTeht.YhdistettyKesto
+
+
+ +
+
+ + + +
SolmuENEN tokEN m/tFIFI tokFI m/tYlikust.Aika
+
+
+ + + +"##; + #[tokio::main] async fn main() { tracing_subscriber::registry() @@ -55,8 +218,11 @@ async fn main() { stats_tx: stats_tx.clone(), ip_connections: Mutex::new(HashMap::new()), node_ips: Mutex::new(HashMap::new()), + db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())), }); + tracing::info!("Tietokanta alustettu"); + let state_for_task = state.clone(); // Ajastin, joka jakaa satunnaisia tekoälytehtäviä eri pituuksilla @@ -100,18 +266,41 @@ async fn main() { }); let app = Router::new() - .nest_service("/", ServeDir::new(std::env::var("STATIC_DIR").unwrap_or_else(|_| "../static".to_string()))) .route("/ws", get(ws_handler)) + .route("/api/sessions", get(api_sessions)) + .route("/api/pairs", get(api_pairs)) + .route("/api/stats", get(api_stats)) + .route("/admin", get(admin_page)) + .nest_service("/", ServeDir::new(std::env::var("STATIC_DIR").unwrap_or_else(|_| "../static".to_string()))) .with_state(state); let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); tracing::debug!("Kipinä Agent Hub käynnistyy osoitteessa http://localhost:3000"); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); - axum::serve( - listener, - app.into_make_service_with_connect_info::(), - ).await.unwrap(); + axum::serve(listener, app.into_make_service_with_connect_info::()).await.unwrap(); +} + +async fn api_sessions( + axum::extract::State(state): axum::extract::State>, +) -> impl IntoResponse { + axum::Json(state.db.get_sessions(200)) +} + +async fn api_pairs( + axum::extract::State(state): axum::extract::State>, +) -> impl IntoResponse { + axum::Json(state.db.get_pair_results(500)) +} + +async fn api_stats( + axum::extract::State(state): axum::extract::State>, +) -> impl IntoResponse { + axum::Json(state.db.get_stats()) +} + +async fn admin_page() -> impl IntoResponse { + axum::response::Html(ADMIN_HTML) } async fn ws_handler( @@ -298,6 +487,9 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { map.insert(node_id, allocated); } + // Tallennetaan sessiotieto tietokantaan + state.db.insert_session(node_id, &ip.to_string(), node_type, &json); + if node_type == "native" { let sys = json.get("system"); let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()).unwrap_or("?"); @@ -383,6 +575,12 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { println!(" {} merkkiä → \x1b[35m{} tokenia\x1b[0m | \x1b[32m{:.2} merkkiä/token\x1b[0m", fi_chars, fi_tokens, fi_cpt); println!(" {}Suomen ylikustannus: {:+.1}%\x1b[0m", overhead_color, overhead); + // Tallennetaan parin tulos tietokantaan + let en_ref = obj.get("en").cloned().unwrap_or_default(); + let fi_ref = obj.get("fi").cloned().unwrap_or_default(); + state.db.insert_pair_result(node_id, &en_ref, &fi_ref, overhead, duration); + state.db.increment_tasks(node_id); + obj.insert("node_id".to_string(), serde_json::json!(node_id)); } let _ = state.stats_tx.send(json.to_string()); @@ -404,7 +602,8 @@ async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { } } - // Yhteys katkesi — siivotaan IP-laskuri ja node-tiedot + // Yhteys katkesi — merkitään session päättyneeksi ja siivotaan + state.db.close_session(node_id); { let mut conns = state.ip_connections.lock().unwrap(); if let Some(count) = conns.get_mut(&ip) {