use futures_util::{SinkExt, StreamExt}; use serde_json::json; use sysinfo::System; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; mod inference; /// GPU-tietorakenne — yhtenäinen kaikille valmistajille struct GpuInfo { name: String, vendor: String, backend: String, // "Vulkan", "Metal", "Dx12" vram_total_mb: Option, vram_used_mb: Option, vram_free_mb: Option, temperature_c: Option, gpu_util_pct: Option, } impl GpuInfo { fn to_json(&self) -> serde_json::Value { json!({ "name": self.name, "vendor": self.vendor, "backend": self.backend, "vram_total_mb": self.vram_total_mb, "vram_used_mb": self.vram_used_mb, "vram_free_mb": self.vram_free_mb, "temperature_c": self.temperature_c, "gpu_util_pct": self.gpu_util_pct, }) } } /// Tunnistaa kaikki GPU:t wgpu:lla (NVIDIA/AMD/Apple/Intel) fn collect_gpus_wgpu() -> Vec { let instance = wgpu::Instance::new(&wgpu::InstanceDescriptor { backends: wgpu::Backends::all(), ..Default::default() }); let mut gpus = Vec::new(); for adapter in instance.enumerate_adapters(wgpu::Backends::all()) { let info = adapter.get_info(); // Ohitetaan CPU/software-adapterit ja OpenGL (duplikaatti) if info.device_type == wgpu::DeviceType::Cpu { continue; } if info.backend == wgpu::Backend::Gl { continue; } let vendor = match info.vendor { 0x10DE => "NVIDIA", 0x1002 => "AMD", 0x8086 => "Intel", _ => "Unknown", }; let backend = match info.backend { wgpu::Backend::Vulkan => "Vulkan", wgpu::Backend::Metal => "Metal", wgpu::Backend::Dx12 => "Dx12", wgpu::Backend::Gl => "OpenGL", _ => "?", }; gpus.push(GpuInfo { name: info.name.clone(), vendor: vendor.to_string(), backend: backend.to_string(), // wgpu ei anna tarkkaa VRAM-dataa — täydennetään NVML:llä jos NVIDIA vram_total_mb: None, vram_used_mb: None, vram_free_mb: None, temperature_c: None, gpu_util_pct: None, }); } gpus } /// Täydentää NVIDIA-GPU:iden tiedot NVML:llä (VRAM, lämpötila, kuormitus) fn enrich_nvidia_gpus(gpus: &mut [GpuInfo]) { let Ok(nvml) = nvml_wrapper::Nvml::init() else { return }; let Ok(count) = nvml.device_count() else { return }; // Yhdistetään NVML-laitteet wgpu-tunnisteisiin nimen perusteella for i in 0..count { let Ok(device) = nvml.device_by_index(i) else { continue }; let nvml_name = device.name().unwrap_or_default(); // Etsitään vastaava GPU wgpu-listasta if let Some(gpu) = gpus.iter_mut().find(|g| g.vendor == "NVIDIA" && g.name.contains(&nvml_name) || nvml_name.contains(&g.name)) { if let Ok(mem) = device.memory_info() { gpu.vram_total_mb = Some(mem.total / 1024 / 1024); gpu.vram_used_mb = Some(mem.used / 1024 / 1024); gpu.vram_free_mb = Some(mem.free / 1024 / 1024); } gpu.temperature_c = device.temperature(nvml_wrapper::enum_wrappers::device::TemperatureSensor::Gpu).ok(); if let Ok(util) = device.utilization_rates() { gpu.gpu_util_pct = Some(util.gpu); } } } } /// AMD GPU-tiedot Linuxin sysfs:stä (/sys/class/drm/) fn enrich_amd_gpus(gpus: &mut [GpuInfo]) { let Ok(entries) = std::fs::read_dir("/sys/class/drm") else { return }; for entry in entries.flatten() { let path = entry.path(); let name = path.file_name().unwrap_or_default().to_string_lossy().to_string(); // Vain renderD* tai card*-kansiot joissa on device/vendor if !name.starts_with("card") || name.contains('-') { continue } let device_path = path.join("device"); // Tarkistetaan onko AMD (vendor 0x1002) let vendor = std::fs::read_to_string(device_path.join("vendor")).unwrap_or_default(); if !vendor.trim().contains("0x1002") { continue } // VRAM (mem_info_vram_total on tavuissa) let vram_total = read_sysfs_u64(&device_path.join("mem_info_vram_total")); let vram_used = read_sysfs_u64(&device_path.join("mem_info_vram_used")); // Lämpötila (hwmon) let temp = find_hwmon_temp(&device_path); // GPU-kuormitus let busy = read_sysfs_u64(&device_path.join("gpu_busy_percent")); // Etsitään vastaava GPU wgpu-listasta if let Some(gpu) = gpus.iter_mut().find(|g| g.vendor == "AMD" && g.vram_total_mb.is_none()) { gpu.vram_total_mb = vram_total.map(|v| v / 1024 / 1024); gpu.vram_used_mb = vram_used.map(|v| v / 1024 / 1024); gpu.vram_free_mb = match (vram_total, vram_used) { (Some(t), Some(u)) => Some((t - u) / 1024 / 1024), _ => None, }; gpu.temperature_c = temp.map(|t| t as u32); gpu.gpu_util_pct = busy.map(|b| b as u32); } } } fn read_sysfs_u64(path: &std::path::Path) -> Option { std::fs::read_to_string(path).ok()?.trim().parse().ok() } fn find_hwmon_temp(device_path: &std::path::Path) -> Option { let hwmon_dir = device_path.join("hwmon"); let entries = std::fs::read_dir(&hwmon_dir).ok()?; for entry in entries.flatten() { let temp_path = entry.path().join("temp1_input"); if let Some(millideg) = read_sysfs_u64(&temp_path) { return Some(millideg / 1000); // millidegrees → degrees } } None } /// Apple GPU-tiedot — wgpu/Metal antaa nimen, tarkempaa dataa ei saa ilman IOKit:ia /// mutta Metal adapter_info sisältää jo olennaiset tiedot fn enrich_apple_gpus(gpus: &mut [GpuInfo]) { // Apple Silicon -koneiden unified memory: koko RAM on GPU:n käytettävissä // Arvioidaan system RAM:sta if gpus.iter().any(|g| g.vendor == "Unknown" && g.name.contains("Apple")) { let mut sys = System::new(); sys.refresh_memory(); let total_ram_mb = sys.total_memory() / 1024 / 1024; for gpu in gpus.iter_mut().filter(|g| g.name.contains("Apple")) { gpu.vendor = "Apple".to_string(); // Apple Silicon: unified memory, GPU voi käyttää ~75% kokonaismuistista gpu.vram_total_mb = Some(total_ram_mb * 3 / 4); // Tarkkaa käyttö- ja lämpötiladataa ei saa ilman IOKit:ia } } } /// Kerää kaikki GPU:t ja täydentää valmistajakohtaiset tiedot fn collect_all_gpus() -> Vec { let mut gpus = collect_gpus_wgpu(); enrich_nvidia_gpus(&mut gpus); enrich_amd_gpus(&mut gpus); enrich_apple_gpus(&mut gpus); gpus } /// Kerää järjestelmätiedot (CPU, RAM, OS) fn collect_system_info() -> serde_json::Value { let mut sys = System::new_all(); sys.refresh_all(); json!({ "hostname": System::host_name().unwrap_or_default(), "os": format!("{} {}", System::name().unwrap_or_default(), System::os_version().unwrap_or_default()), "cpu_cores": sys.cpus().len(), "cpu_model": sys.cpus().first().map(|c| c.brand().to_string()).unwrap_or_default(), "ram_total_mb": sys.total_memory() / 1024 / 1024, "ram_used_mb": sys.used_memory() / 1024 / 1024, }) } /// Koko auth-viesti hubille fn build_auth_message(allocated_gb: u32) -> String { let sys = collect_system_info(); let gpus = collect_all_gpus(); let gpu_json: Vec = gpus.iter().enumerate().map(|(i, g)| { let mut v = g.to_json(); v.as_object_mut().unwrap().insert("index".to_string(), json!(i)); v }).collect(); let mut msg = json!({ "type": "auth", "status": "agent_ready", "node_type": "native", "allocated_gb": allocated_gb, "selected_task": "qwen-coder-05b", "system": sys, }); if !gpu_json.is_empty() { msg.as_object_mut().unwrap().insert("gpus".to_string(), json!(gpu_json)); } msg.to_string() } fn format_optional(val: Option, suffix: &str) -> String { match val { Some(v) => format!("{}{}", v, suffix), None => "?".to_string(), } } #[tokio::main] async fn main() { tracing_subscriber::fmt() .with_env_filter("native_node=debug") .init(); let hub_url = std::env::var("HUB_URL").unwrap_or_else(|_| "ws://hub:3000/ws".to_string()); let allocated_gb: u32 = std::env::var("ALLOCATED_GB") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(4); tracing::info!("Kipinä Native Node käynnistyy — hub: {}, varaus: {} GB", hub_url, allocated_gb); let sys = collect_system_info(); tracing::info!("Järjestelmä: {} | {} | {} ydintä | {} MB RAM", sys["hostname"].as_str().unwrap_or("?"), sys["os"].as_str().unwrap_or("?"), sys["cpu_cores"], sys["ram_total_mb"] ); let gpus = collect_all_gpus(); if gpus.is_empty() { tracing::info!("GPU:ta ei havaittu — toimitaan CPU-moodissa"); } else { for (i, gpu) in gpus.iter().enumerate() { tracing::info!("GPU {}: {} ({}) [{}] | VRAM: {}/{} MB | {} | kuormitus: {}", i, gpu.name, gpu.vendor, gpu.backend, format_optional(gpu.vram_used_mb, ""), format_optional(gpu.vram_total_mb, ""), format_optional(gpu.temperature_c, "°C"), format_optional(gpu.gpu_util_pct, "%"), ); } } // Ollama-backend tracing::info!("Alustetaan Ollama-yhteyttä..."); let llm = match inference::LlmEngine::load() { Ok(engine) => { // Varmistetaan malli (ollama pull) — odotetaan kunnes valmis match engine.ensure_model().await { Ok(()) => tracing::info!("Ollama valmis inferenssiin!"), Err(e) => tracing::warn!("Mallin lataus: {} — yritetään silti", e), } Some(engine) } Err(e) => { tracing::warn!("Ollama-alustus epäonnistui: {} — toimitaan ilman inferenssiä", e); None } }; // Yhdistetään hubiin loop { match connect_async(&hub_url).await { Ok((ws_stream, _)) => { tracing::info!("Yhdistetty hubiin!"); let (mut write, mut read) = ws_stream.split(); let auth = build_auth_message(allocated_gb); if write.send(Message::Text(auth)).await.is_err() { tracing::error!("Auth-viestin lähetys epäonnistui"); continue; } let mut busy = false; while let Some(Ok(msg)) = read.next().await { if let Message::Text(text) = msg { // LLM-promptit if text.contains("llm_prompt") && !busy { if let Ok(task) = serde_json::from_str::(&text) { let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or(""); let task_id = task.get("task_id").and_then(|v| v.as_str()).unwrap_or("?"); let msg_model = task.get("model").and_then(|v| v.as_str()).unwrap_or(""); if !prompt.is_empty() && msg_model.starts_with("qwen-coder") { if let Some(ref engine) = llm { busy = true; let max_tokens = task.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(512) as usize; tracing::info!("Generoidaan (task_id: {}, max_tokens: {}): \"{}\"", task_id, max_tokens, &prompt[..prompt.len().min(100)]); let model_name = engine.model_name(); match engine.generate(prompt, max_tokens).await { Ok(result) => { tracing::info!( "Tulos: {} tokenia | {:.0}ms | {:.1} tok/s | \"{}\"", result.tokens_generated, result.duration_ms, result.tokens_per_sec, &result.text[..result.text.len().min(80)] ); let done = json!({ "type": "llm_done", "prompt": prompt, "model": format!("{} (Ollama)", model_name), "response": result.text, "tokens_generated": result.tokens_generated, "duration_ms": result.duration_ms, "tokens_per_sec": (result.tokens_per_sec * 10.0).round() / 10.0, "load_time_ms": 0, "task_id": task_id, }); let _ = write.send(Message::Text(done.to_string())).await; } Err(e) => { tracing::error!("Inferenssivirhe: {}", e); } } busy = false; } } } } // Ohitetaan pair_task, stats jne. } } tracing::warn!("Yhteys hubiin katkesi — yritetään uudelleen 5s..."); } Err(e) => { tracing::warn!("Hubiin yhdistäminen epäonnistui: {} — yritetään uudelleen 5s...", e); } } tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } }