toka toimiva vedos

This commit is contained in:
2026-04-01 23:52:39 +03:00
parent 02f6684378
commit 9a72d35081
1420 changed files with 79953 additions and 64 deletions

View File

@@ -9,6 +9,24 @@ impl NodeDb {
pub fn new(path: &str) -> Self {
let conn = Connection::open(path).expect("SQLite-tietokantaa ei voitu avata");
// Poista vanha tietokanta jos skeema on rikki — PoC, ei tuotantodata
let _ = conn.execute_batch("
CREATE TABLE IF NOT EXISTS _schema_version (version INTEGER);
");
let version: i64 = conn.query_row(
"SELECT COALESCE(MAX(version), 0) FROM _schema_version", [], |r| r.get(0)
).unwrap_or(0);
if version < 2 {
// Pudotetaan vanhat taulut ja luodaan uudet
let _ = conn.execute_batch("
DROP TABLE IF EXISTS node_sessions;
DROP TABLE IF EXISTS pair_results;
DELETE FROM _schema_version;
INSERT INTO _schema_version VALUES (2);
");
}
conn.execute_batch("
CREATE TABLE IF NOT EXISTS node_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -35,8 +53,9 @@ impl NodeDb {
gpu_temp_c INTEGER,
gpu_util_pct INTEGER,
-- Varaus
-- Varaus ja tehtävä
allocated_gb INTEGER,
selected_task TEXT DEFAULT 'tokenize',
-- WebGPU-tuki
has_webgpu BOOLEAN,
@@ -70,7 +89,7 @@ impl NodeDb {
node_type: &str,
auth_data: &serde_json::Value,
) -> i64 {
let conn = self.conn.lock().unwrap();
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let now = chrono::Utc::now().to_rfc3339();
// Selainsolmun tiedot
@@ -78,6 +97,7 @@ impl NodeDb {
let cpu_cores = auth_data.get("cpu_cores").and_then(|v| v.as_u64());
let ram = auth_data.get("device_memory_gb").and_then(|v| v.as_f64()).map(|v| (v * 1024.0) as i64);
let allocated = auth_data.get("allocated_gb").and_then(|v| v.as_u64());
let selected_task = auth_data.get("selected_task").and_then(|v| v.as_str());
// GPU (selain)
let gpu_vendor = auth_data.get("gpu").and_then(|g| g.get("vendor")).and_then(|v| v.as_str());
@@ -108,8 +128,8 @@ impl NodeDb {
node_id, ip, node_type, connected_at,
platform, hostname, os, cpu_cores, cpu_model, ram_mb,
gpu_name, gpu_vendor, gpu_backend, vram_total_mb, vram_used_mb, gpu_temp_c, gpu_util_pct,
allocated_gb, has_webgpu
) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19)",
allocated_gb, selected_task, has_webgpu
) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16,?17,?18,?19,?20)",
params![
node_id as i64, ip, node_type, now,
platform, hostname, os,
@@ -124,6 +144,7 @@ impl NodeDb {
gpu_temp.map(|v| v as i64),
gpu_util.map(|v| v as i64),
allocated.map(|v| v as i64),
selected_task,
has_webgpu,
],
).expect("Session insert epäonnistui");
@@ -132,7 +153,7 @@ impl NodeDb {
}
pub fn close_session(&self, node_id: u64) {
let conn = self.conn.lock().unwrap();
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let now = chrono::Utc::now().to_rfc3339();
let _ = conn.execute(
"UPDATE node_sessions SET disconnected_at = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL",
@@ -141,7 +162,7 @@ impl NodeDb {
}
pub fn increment_tasks(&self, node_id: u64) {
let conn = self.conn.lock().unwrap();
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let _ = conn.execute(
"UPDATE node_sessions SET tasks_completed = tasks_completed + 1 WHERE node_id = ?1 AND disconnected_at IS NULL",
params![node_id as i64],
@@ -149,12 +170,12 @@ impl NodeDb {
}
pub fn get_sessions(&self, limit: u32) -> Vec<serde_json::Value> {
let conn = self.conn.lock().unwrap();
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let mut stmt = conn.prepare(
"SELECT id, node_id, ip, node_type, connected_at, disconnected_at,
platform, hostname, os, cpu_cores, cpu_model, ram_mb,
gpu_name, gpu_vendor, gpu_backend, vram_total_mb, gpu_temp_c, gpu_util_pct,
allocated_gb, has_webgpu, tasks_completed
allocated_gb, selected_task, has_webgpu, tasks_completed
FROM node_sessions ORDER BY id DESC LIMIT ?1"
).unwrap();
@@ -179,14 +200,15 @@ impl NodeDb {
"gpu_temp_c": row.get::<_, Option<i64>>(16)?,
"gpu_util_pct": row.get::<_, Option<i64>>(17)?,
"allocated_gb": row.get::<_, Option<i64>>(18)?,
"has_webgpu": row.get::<_, Option<bool>>(19)?,
"tasks_completed": row.get::<_, i64>(20)?,
"selected_task": row.get::<_, Option<String>>(19)?,
"has_webgpu": row.get::<_, Option<bool>>(20)?,
"tasks_completed": row.get::<_, i64>(21)?,
}))
}).unwrap().filter_map(|r| r.ok()).collect()
}
pub fn get_pair_results(&self, limit: u32) -> Vec<serde_json::Value> {
let conn = self.conn.lock().unwrap();
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let mut stmt = conn.prepare(
"SELECT id, node_id, created_at, en_text, fi_text,
en_tokens, fi_tokens, en_chars_per_token, fi_chars_per_token,
@@ -212,7 +234,7 @@ impl NodeDb {
}
pub fn get_stats(&self) -> serde_json::Value {
let conn = self.conn.lock().unwrap();
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let total_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions", [], |r| r.get(0)).unwrap_or(0);
let active_sessions: i64 = conn.query_row("SELECT COUNT(*) FROM node_sessions WHERE disconnected_at IS NULL", [], |r| r.get(0)).unwrap_or(0);
@@ -247,7 +269,7 @@ impl NodeDb {
overhead: f64,
duration_ms: f64,
) {
let conn = self.conn.lock().unwrap();
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let now = chrono::Utc::now().to_rfc3339();
let _ = conn.execute(
"INSERT INTO pair_results (

View File

@@ -25,7 +25,7 @@ const ALLOWED_ORIGINS: &[&str] = &[
];
// Sallitut viestityyypit clientilta
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk"];
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "download_progress"];
struct AppState {
next_node_id: Mutex<u64>,
@@ -34,6 +34,7 @@ struct AppState {
stats_tx: broadcast::Sender<String>,
ip_connections: Mutex<HashMap<IpAddr, u32>>,
node_ips: Mutex<HashMap<u64, IpAddr>>,
node_tasks: Mutex<HashMap<u64, String>>, // node_id → selected_task
db: db::NodeDb,
}
@@ -86,7 +87,7 @@ tr:hover td { background:#1c2333; }
<div id="sessions" class="panel active">
<div class="table-wrap">
<table><thead><tr>
<th>ID</th><th>Tila</th><th>Tyyppi</th><th>IP</th><th>Alusta</th>
<th>ID</th><th>Tila</th><th>Tehtävä</th><th>Tyyppi</th><th>IP</th><th>Alusta</th>
<th>OS</th><th>CPU</th><th>RAM</th><th>GPU</th><th>VRAM</th>
<th>WebGPU</th><th>Teht.</th><th>Yhdistetty</th><th>Kesto</th>
</tr></thead><tbody id="sessions-body"></tbody></table>
@@ -161,6 +162,8 @@ async function load() {
const online = !s.disconnected_at;
const status = online ? '<span class="online">ONLINE</span>' : '<span class="offline">offline</span>';
const typeBadge = s.node_type === 'native' ? badge('native','blue') : badge('browser','yellow');
const taskNames = {'tokenize':'Tokenisaatio','smollm-135m':'SmolLM 135M','qwen-05b':'Qwen2.5 0.5B','phi3-mini':'Phi-3 Mini'};
const taskBadge = badge(taskNames[s.selected_task] || s.selected_task || 'tokenize', s.selected_task === 'tokenize' ? 'green' : 'blue');
const gpuBadge = s.has_webgpu ? badge('WebGPU','green') : badge('CPU','red');
const gpu = s.gpu_name ? `${s.gpu_name}` : '-';
const vram = s.vram_total_mb ? `${s.vram_total_mb} MB` : '-';
@@ -171,7 +174,7 @@ async function load() {
const time = s.connected_at ? new Date(s.connected_at).toLocaleString('fi-FI') : '';
const dur = duration(s.connected_at, s.disconnected_at);
return `<tr>
<td>${s.node_id}</td><td>${status}</td><td>${typeBadge}</td><td>${s.ip}</td>
<td>${s.node_id}</td><td>${status}</td><td>${taskBadge}</td><td>${typeBadge}</td><td>${s.ip}</td>
<td>${plat}</td><td>${os}</td><td>${cores}</td><td>${ram}</td>
<td>${gpu}</td><td>${vram}</td><td>${gpuBadge}</td>
<td>${s.tasks_completed}</td><td>${time}</td><td>${dur}</td>
@@ -221,6 +224,7 @@ async fn main() {
stats_tx: stats_tx.clone(),
ip_connections: Mutex::new(HashMap::new()),
node_ips: Mutex::new(HashMap::new()),
node_tasks: Mutex::new(HashMap::new()),
db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())),
});
@@ -258,13 +262,31 @@ async fn main() {
let idx = (rng_state as usize) % pairs.len();
let (en, fi) = pairs[idx];
let task_msg = serde_json::json!({
// Tokenisointiparit
let pair_msg = serde_json::json!({
"type": "pair_task",
"en": en,
"fi": fi,
});
tracing::debug!("Kielipari lähetetty: EN({}) vs FI({} merkkiä)", en.len(), fi.len());
let _ = state_for_task.stats_tx.send(task_msg.to_string());
let _ = state_for_task.stats_tx.send(pair_msg.to_string());
// LLM-promptit
let llm_prompts = vec![
"Tell me a short joke.",
"What is WebGPU in one sentence?",
"Explain distributed computing briefly.",
"Write a haiku about technology.",
"What makes Rust special?",
];
let llm_idx = (rng_state as usize / 7) % llm_prompts.len();
let llm_msg = serde_json::json!({
"type": "llm_prompt",
"prompt": llm_prompts[llm_idx],
"model": "smollm-135m",
});
let _ = state_for_task.stats_tx.send(llm_msg.to_string());
tracing::debug!("Tehtävät lähetetty: pair + llm_prompt");
}
});
@@ -496,6 +518,10 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
// Tallennetaan sessiotieto tietokantaan
state.db.insert_session(node_id, &ip.to_string(), node_type, &json);
// Tallennetaan valittu tehtävä muistiin reititystä varten
let selected_task = json.get("selected_task").and_then(|v| v.as_str()).unwrap_or("tokenize").to_string();
state.node_tasks.lock().unwrap().insert(node_id, selected_task);
if node_type == "native" {
let sys = json.get("system");
let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()).unwrap_or("?");
@@ -529,10 +555,11 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
.and_then(|g| g.get("description").or_else(|| g.get("vendor")))
.and_then(|v| v.as_str())
.unwrap_or("ei GPU:ta");
let task = json.get("selected_task").and_then(|v| v.as_str()).unwrap_or("tokenize");
tracing::info!(
"Solmu {} (selain) | {} | {} | {} ydintä | ~{} GB RAM | GPU: {} | varaus: {} GB",
node_id, ip, platform, cores, ram, gpu_desc, allocated
"Solmu {} (selain) | {} | {} | {} ydintä | ~{} GB RAM | GPU: {} | tehtävä: {} | varaus: {} GB",
node_id, ip, platform, cores, ram, gpu_desc, task, allocated
);
}
}
@@ -605,11 +632,40 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
}
let _ = state.stats_tx.send(json.to_string());
}
} else if msg_type == "llm_done" {
{
let mut json = json;
if let Some(obj) = json.as_object_mut() {
let model = obj.get("model").and_then(|v| v.as_str()).unwrap_or("?");
let prompt = obj.get("prompt").and_then(|v| v.as_str()).unwrap_or("");
let response = obj.get("response").and_then(|v| v.as_str()).unwrap_or("");
let tok_gen = obj.get("tokens_generated").and_then(|v| v.as_u64()).unwrap_or(0);
let duration = obj.get("duration_ms").and_then(|v| v.as_f64()).unwrap_or(0.0);
let tok_s = obj.get("tokens_per_sec").and_then(|v| v.as_f64()).unwrap_or(0.0);
println!();
println!("\x1b[35m━━━ Solmu {} ━━━ {} ━━━\x1b[0m", node_id, model);
println!(" Prompt: \x1b[33m\"{}\"\x1b[0m", prompt);
println!(" Vastaus: \x1b[32m{}\x1b[0m", response);
println!(" {} tokenia | {:.0}ms | \x1b[36m{:.1} tok/s\x1b[0m", tok_gen, duration, tok_s);
state.db.increment_tasks(node_id);
obj.insert("node_id".to_string(), serde_json::json!(node_id));
}
let _ = state.stats_tx.send(json.to_string());
{
let mut task_count = state.total_tasks.lock().unwrap();
*task_count += 1;
}
broadcast_stats(&state).await;
}
}
}
// Yhteys katkesi — merkitään session päättyneeksi ja siivotaan
state.db.close_session(node_id);
state.node_tasks.lock().unwrap().remove(&node_id);
{
let mut conns = state.ip_connections.lock().unwrap();
if let Some(count) = conns.get_mut(&ip) {