Files
agentic-studio/network-poc/native-node/src/main.rs
Jaakko Vanhala 2f140c8a15 uusi projekti
2026-04-12 10:28:57 +03:00

627 lines
31 KiB
Rust

use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use std::io::IsTerminal;
use sysinfo::System;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
mod inference;
mod tui;
mod tui_dashboard;
/// GPU-tietorakenne — yhtenäinen kaikille valmistajille
struct GpuInfo {
name: String,
vendor: String,
backend: String, // "Vulkan", "Metal", "Dx12"
vram_total_mb: Option<u64>,
vram_used_mb: Option<u64>,
vram_free_mb: Option<u64>,
temperature_c: Option<u32>,
gpu_util_pct: Option<u32>,
}
impl GpuInfo {
fn to_json(&self) -> serde_json::Value {
json!({
"name": self.name,
"vendor": self.vendor,
"backend": self.backend,
"vram_total_mb": self.vram_total_mb,
"vram_used_mb": self.vram_used_mb,
"vram_free_mb": self.vram_free_mb,
"temperature_c": self.temperature_c,
"gpu_util_pct": self.gpu_util_pct,
})
}
}
#[cfg(feature = "gpu-detect")]
/// Tunnistaa kaikki GPU:t wgpu:lla (NVIDIA/AMD/Apple/Intel)
fn collect_gpus_wgpu() -> Vec<GpuInfo> {
let instance = wgpu::Instance::new(&wgpu::InstanceDescriptor {
backends: wgpu::Backends::all(),
..Default::default()
});
let mut gpus = Vec::new();
for adapter in instance.enumerate_adapters(wgpu::Backends::all()) {
let info = adapter.get_info();
// Ohitetaan CPU/software-adapterit ja OpenGL (duplikaatti)
if info.device_type == wgpu::DeviceType::Cpu {
continue;
}
if info.backend == wgpu::Backend::Gl {
continue;
}
let vendor = match info.vendor {
0x10DE => "NVIDIA",
0x1002 => "AMD",
0x8086 => "Intel",
_ => "Unknown",
};
let backend = match info.backend {
wgpu::Backend::Vulkan => "Vulkan",
wgpu::Backend::Metal => "Metal",
wgpu::Backend::Dx12 => "Dx12",
wgpu::Backend::Gl => "OpenGL",
_ => "?",
};
gpus.push(GpuInfo {
name: info.name.clone(),
vendor: vendor.to_string(),
backend: backend.to_string(),
// wgpu ei anna tarkkaa VRAM-dataa — täydennetään NVML:llä jos NVIDIA
vram_total_mb: None,
vram_used_mb: None,
vram_free_mb: None,
temperature_c: None,
gpu_util_pct: None,
});
}
gpus
}
#[cfg(feature = "gpu-detect")]
/// Täydentää NVIDIA-GPU:iden tiedot NVML:llä (VRAM, lämpötila, kuormitus)
fn enrich_nvidia_gpus(gpus: &mut [GpuInfo]) {
let Ok(nvml) = nvml_wrapper::Nvml::init() else { return };
let Ok(count) = nvml.device_count() else { return };
// Yhdistetään NVML-laitteet wgpu-tunnisteisiin nimen perusteella
for i in 0..count {
let Ok(device) = nvml.device_by_index(i) else { continue };
let nvml_name = device.name().unwrap_or_default();
// Etsitään vastaava GPU wgpu-listasta
if let Some(gpu) = gpus.iter_mut().find(|g| g.vendor == "NVIDIA" && g.name.contains(&nvml_name) || nvml_name.contains(&g.name)) {
if let Ok(mem) = device.memory_info() {
gpu.vram_total_mb = Some(mem.total / 1024 / 1024);
gpu.vram_used_mb = Some(mem.used / 1024 / 1024);
gpu.vram_free_mb = Some(mem.free / 1024 / 1024);
}
gpu.temperature_c = device.temperature(nvml_wrapper::enum_wrappers::device::TemperatureSensor::Gpu).ok();
if let Ok(util) = device.utilization_rates() {
gpu.gpu_util_pct = Some(util.gpu);
}
}
}
}
#[cfg(feature = "gpu-detect")]
/// AMD GPU-tiedot Linuxin sysfs:stä (/sys/class/drm/)
fn enrich_amd_gpus(gpus: &mut [GpuInfo]) {
let Ok(entries) = std::fs::read_dir("/sys/class/drm") else { return };
for entry in entries.flatten() {
let path = entry.path();
let name = path.file_name().unwrap_or_default().to_string_lossy().to_string();
// Vain renderD* tai card*-kansiot joissa on device/vendor
if !name.starts_with("card") || name.contains('-') { continue }
let device_path = path.join("device");
// Tarkistetaan onko AMD (vendor 0x1002)
let vendor = std::fs::read_to_string(device_path.join("vendor")).unwrap_or_default();
if !vendor.trim().contains("0x1002") { continue }
// VRAM (mem_info_vram_total on tavuissa)
let vram_total = read_sysfs_u64(&device_path.join("mem_info_vram_total"));
let vram_used = read_sysfs_u64(&device_path.join("mem_info_vram_used"));
// Lämpötila (hwmon)
let temp = find_hwmon_temp(&device_path);
// GPU-kuormitus
let busy = read_sysfs_u64(&device_path.join("gpu_busy_percent"));
// Etsitään vastaava GPU wgpu-listasta
if let Some(gpu) = gpus.iter_mut().find(|g| g.vendor == "AMD" && g.vram_total_mb.is_none()) {
gpu.vram_total_mb = vram_total.map(|v| v / 1024 / 1024);
gpu.vram_used_mb = vram_used.map(|v| v / 1024 / 1024);
gpu.vram_free_mb = match (vram_total, vram_used) {
(Some(t), Some(u)) => Some((t - u) / 1024 / 1024),
_ => None,
};
gpu.temperature_c = temp.map(|t| t as u32);
gpu.gpu_util_pct = busy.map(|b| b as u32);
}
}
}
#[cfg(feature = "gpu-detect")]
fn read_sysfs_u64(path: &std::path::Path) -> Option<u64> {
std::fs::read_to_string(path).ok()?.trim().parse().ok()
}
#[cfg(feature = "gpu-detect")]
fn find_hwmon_temp(device_path: &std::path::Path) -> Option<u64> {
let hwmon_dir = device_path.join("hwmon");
let entries = std::fs::read_dir(&hwmon_dir).ok()?;
for entry in entries.flatten() {
let temp_path = entry.path().join("temp1_input");
if let Some(millideg) = read_sysfs_u64(&temp_path) {
return Some(millideg / 1000); // millidegrees → degrees
}
}
None
}
#[cfg(feature = "gpu-detect")]
/// Apple GPU-tiedot — wgpu/Metal antaa nimen, tarkempaa dataa ei saa ilman IOKit:ia
fn enrich_apple_gpus(gpus: &mut [GpuInfo]) {
// Apple Silicon -koneiden unified memory: koko RAM on GPU:n käytettävissä
// Arvioidaan system RAM:sta
if gpus.iter().any(|g| g.vendor == "Unknown" && g.name.contains("Apple")) {
let mut sys = System::new();
sys.refresh_memory();
let total_ram_mb = sys.total_memory() / 1024 / 1024;
for gpu in gpus.iter_mut().filter(|g| g.name.contains("Apple")) {
gpu.vendor = "Apple".to_string();
// Apple Silicon: unified memory, GPU voi käyttää ~75% kokonaismuistista
gpu.vram_total_mb = Some(total_ram_mb * 3 / 4);
// Tarkkaa käyttö- ja lämpötiladataa ei saa ilman IOKit:ia
}
}
}
/// Kerää kaikki GPU:t ja täydentää valmistajakohtaiset tiedot
fn collect_all_gpus() -> Vec<GpuInfo> {
#[cfg(feature = "gpu-detect")]
{
let mut gpus = collect_gpus_wgpu();
enrich_nvidia_gpus(&mut gpus);
enrich_amd_gpus(&mut gpus);
enrich_apple_gpus(&mut gpus);
return gpus;
}
#[cfg(not(feature = "gpu-detect"))]
{
Vec::new()
}
}
/// Kerää järjestelmätiedot (CPU, RAM, OS)
fn collect_system_info() -> serde_json::Value {
let mut sys = System::new_all();
sys.refresh_all();
json!({
"hostname": System::host_name().unwrap_or_default(),
"os": format!("{} {}", System::name().unwrap_or_default(), System::os_version().unwrap_or_default()),
"cpu_cores": sys.cpus().len(),
"cpu_model": sys.cpus().first().map(|c| c.brand().to_string()).unwrap_or_default(),
"ram_total_mb": sys.total_memory() / 1024 / 1024,
"ram_used_mb": sys.used_memory() / 1024 / 1024,
})
}
/// Koko auth-viesti hubille
fn build_auth_message(allocated_gb: u32, model_name: &str, models_data: Option<serde_json::Value>) -> String {
let sys = collect_system_info();
let gpus = collect_all_gpus();
let gpu_json: Vec<serde_json::Value> = gpus.iter().enumerate().map(|(i, g)| {
let mut v = g.to_json();
v.as_object_mut().unwrap().insert("index".to_string(), json!(i));
v
}).collect();
let api_key = std::env::var("NODE_API_KEY").unwrap_or_default();
let mut msg = json!({
"type": "auth",
"status": "agent_ready",
"node_type": "native",
"allocated_gb": allocated_gb,
"selected_task": model_name,
"system": sys,
});
if !api_key.is_empty() {
msg.as_object_mut().unwrap().insert("api_key".to_string(), json!(api_key));
}
if !gpu_json.is_empty() {
msg.as_object_mut().unwrap().insert("gpus".to_string(), json!(gpu_json));
}
if let Some(models) = models_data {
msg.as_object_mut().unwrap().insert("models".to_string(), models);
}
msg.to_string()
}
fn format_optional<T: std::fmt::Display>(val: Option<T>, suffix: &str) -> String {
match val {
Some(v) => format!("{}{}", v, suffix),
None => "?".to_string(),
}
}
#[tokio::main]
async fn main() {
let file_appender = tracing_appender::rolling::never(".", "native-node.log");
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_env_filter("native_node=debug")
.with_writer(non_blocking)
.init();
// Hookataan paniikkitilanteet palauttamaan terminaalin raw-moodista
let original_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
tui_dashboard::restore_terminal();
original_hook(panic_info);
}));
let tui_state = std::sync::Arc::new(tokio::sync::RwLock::new(tui_dashboard::DashboardState::new()));
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let hub_url = std::env::var("HUB_URL").unwrap_or_else(|_| "ws://hub:3000/ws".to_string());
let allocated_gb: u32 = std::env::var("ALLOCATED_GB")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(4);
tracing::info!("Kipinä Native Node käynnistyy — hub: {}, varaus: {} GB", hub_url, allocated_gb);
let sys = collect_system_info();
tracing::info!("Järjestelmä: {} | {} | {} ydintä | {} MB RAM",
sys["hostname"].as_str().unwrap_or("?"),
sys["os"].as_str().unwrap_or("?"),
sys["cpu_cores"],
sys["ram_total_mb"]
);
{
let mut st = tui_state.write().await;
st.sys_info = format!("{} | {} | {} ydintä | {} MB RAM",
sys["hostname"].as_str().unwrap_or("?"),
sys["os"].as_str().unwrap_or("?"),
sys["cpu_cores"],
sys["ram_total_mb"]
);
let i = st.sys_info.clone();
st.push_log("System", format!("Järjestelmä: {}", i), None);
}
let gpus = collect_all_gpus();
if gpus.is_empty() {
#[cfg(not(feature = "gpu-detect"))]
tracing::info!("GPU-tunnistus ei käytössä (--no-default-features). Ollama käyttää GPU:ta automaattisesti jos saatavilla.");
#[cfg(feature = "gpu-detect")]
tracing::info!("GPU:ta ei havaittu — toimitaan CPU-moodissa");
} else {
for (i, gpu) in gpus.iter().enumerate() {
tracing::info!("GPU {}: {} ({}) [{}] | VRAM: {}/{} MB | {} | kuormitus: {}",
i,
gpu.name,
gpu.vendor,
gpu.backend,
format_optional(gpu.vram_used_mb, ""),
format_optional(gpu.vram_total_mb, ""),
format_optional(gpu.temperature_c, "°C"),
format_optional(gpu.gpu_util_pct, "%"),
);
}
}
// Ollama-backend
tracing::info!("Alustetaan Ollama-yhteyttä...");
let llm = match inference::LlmEngine::load().await {
Ok(engine) => {
// Varmistetaan malli (ollama pull) — odotetaan kunnes valmis
match engine.ensure_model().await {
Ok(()) => tracing::info!("Ollama valmis inferenssiin!"),
Err(e) => tracing::warn!("Mallin lataus: {} — yritetään silti", e),
}
Some(engine)
}
Err(e) => {
tracing::warn!("Ollama-alustus epäonnistui: {} — toimitaan ilman inferenssiä", e);
None
}
};
let active_model = llm.as_ref().map(|e| e.model_name()).unwrap_or_else(|| "unknown".to_string());
tracing::info!("Käytettävä kielimalli konfiguroitu (selected_task): {}", active_model);
{
let mut st = tui_state.write().await;
st.model_name = active_model.clone();
st.push_log("System", format!("Malli valmis: {}", active_model), None);
}
// Käynnistetään graafinen TUI vain jos stdin on terminaali (ei taustaprosessina)
let ui_state = tui_state.clone();
if std::io::stdin().is_terminal() {
tokio::spawn(async move {
if let Err(e) = tui_dashboard::run_dashboard(ui_state, cmd_tx).await {
tracing::error!("Pääluupin TUI kaatui: {}", e);
}
});
} else {
tracing::info!("Ei terminaalia — TUI ohitettu, lokitetaan stdoutiin");
};
// Haetaan paikalliset mallit hubille lähetettäväksi
let mut available_models = None;
if let Some(ref engine) = llm {
match engine.fetch_models().await {
Ok(models) => {
available_models = Some(models);
}
Err(e) => {
tracing::warn!("Mallilistauksen haku epäonnistui: {}", e);
}
}
}
// Yhdistetään hubiin
loop {
match connect_async(&hub_url).await {
Ok((ws_stream, _)) => {
tracing::info!("Yhdistetty hubiin!");
let (mut write, mut read) = ws_stream.split();
let auth = build_auth_message(allocated_gb, &active_model, available_models.clone());
if write.send(Message::Text(auth)).await.is_err() {
tracing::error!("Auth-viestin lähetys epäonnistui");
continue;
}
loop {
tokio::select! {
cmd = cmd_rx.recv() => {
if let Some(cmd_str) = cmd {
if cmd_str == "pause" {
tracing::info!("Tauotetaan solmun suoritus (Hub ei lähetä tehtäviä)...");
let req = json!({"type": "status_update", "status": "paused"});
let _ = write.send(Message::Text(req.to_string())).await;
{
let mut st = tui_state.write().await;
st.status = "PAUSED".to_string();
st.push_log("Network", "Solmu siirretty taukotilaan".to_string(), None);
}
} else if cmd_str == "resume" {
tracing::info!("Jatketaan solmun suoritusta...");
let req = json!({"type": "status_update", "status": "active"});
let _ = write.send(Message::Text(req.to_string())).await;
{
let mut st = tui_state.write().await;
st.status = "ACTIVE".to_string();
st.push_log("System", "Suoritus jatkuu...".to_string(), None);
}
} else if cmd_str == "fetch_models" {
// Haetaan mallit Ollamasta ja avataan valikkö
if let Some(ref engine) = llm {
match engine.fetch_models().await {
Ok(tags) => {
let models: Vec<String> = tags.get("models")
.and_then(|v| v.as_array())
.map(|arr| arr.iter()
.filter_map(|m| m.get("name").and_then(|n| n.as_str()).map(|s| s.to_string()))
.collect())
.unwrap_or_default();
let mut st = tui_state.write().await;
st.model_picker_items = models;
st.model_picker_idx = 0;
st.model_picker_open = true;
}
Err(e) => {
let mut st = tui_state.write().await;
st.push_log("System", format!("Mallilistan haku epäonnistui: {}", e), None);
}
}
}
} else if let Some(model) = cmd_str.strip_prefix("change_model:") {
// TUI:sta valittu malli — vaihdetaan
if let Some(ref engine) = llm {
engine.set_model(model.to_string());
match engine.ensure_model().await {
Ok(()) => {
tracing::info!("Malli vaihdettu: {}", model);
let mut st = tui_state.write().await;
st.model_name = model.to_string();
st.push_log("System", format!("Malli vaihdettu: {}", model), None);
// Ilmoitetaan hubille
let auth = build_auth_message(allocated_gb, model, available_models.clone());
let _ = write.send(Message::Text(auth)).await;
}
Err(e) => {
let mut st = tui_state.write().await;
st.push_log("System", format!("Mallin vaihto epäonnistui: {}", e), None);
}
}
}
}
}
}
ws_msg = read.next() => {
match ws_msg {
Some(Ok(Message::Text(text))) => {
// Hubin control-viestit
if text.contains(r#""type":"control""#) {
if let Ok(task) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(action) = task.get("action").and_then(|v| v.as_str()) {
if action == "pause" {
tracing::info!("Hub pakotti solmun tauolle (Pause)");
let req = json!({"type": "status_update", "status": "paused"});
let _ = write.send(Message::Text(req.to_string())).await;
{
let mut st = tui_state.write().await;
st.status = "PAUSED".to_string();
st.push_log("Network", "Hub kytki solmun tauolle".to_string(), None);
}
} else if action == "resume" {
tracing::info!("Hub aktivoi solmun suorituksen (Resume)");
let req = json!({"type": "status_update", "status": "active"});
let _ = write.send(Message::Text(req.to_string())).await;
{
let mut st = tui_state.write().await;
st.status = "ACTIVE".to_string();
st.push_log("Network", "Hub palautti solmun töihin".to_string(), None);
}
}
}
}
}
// Verkon globaali tila
if text.contains(r#""type":"network_status""#) {
if let Ok(status) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(nodes) = status.get("active_nodes").and_then(|v| v.as_u64()) {
if let Some(tasks) = status.get("tasks").and_then(|v| v.as_u64()) {
let mut st = tui_state.write().await;
st.network_active_nodes = nodes as usize;
st.network_total_tasks = tasks;
}
}
}
}
// LLM-promptit
if text.contains("llm_prompt") {
if let Ok(task) = serde_json::from_str::<serde_json::Value>(&text) {
let prompt = task.get("prompt").and_then(|v| v.as_str()).unwrap_or("");
let task_id = task.get("task_id").and_then(|v| v.as_str()).unwrap_or("?");
let msg_model = task.get("model").and_then(|v| v.as_str()).unwrap_or("");
if !prompt.is_empty() && (msg_model.starts_with("qwen-coder") || msg_model.starts_with("qwen2.5-coder") || msg_model.starts_with("phi")) {
if let Some(ref engine) = llm {
let gen_opts = inference::GenerateOptions {
max_tokens: task.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(1024) as usize,
system_prompt: task.get("system_prompt").and_then(|v| v.as_str()).map(|s| s.to_string()),
temperature: task.get("temperature").and_then(|v| v.as_f64()),
top_k: task.get("top_k").and_then(|v| v.as_u64()),
repeat_penalty: task.get("repeat_penalty").and_then(|v| v.as_f64()),
stop: task.get("stop").and_then(|v| v.as_array()).map(|a| a.iter().filter_map(|s| s.as_str().map(|s| s.to_string())).collect()),
};
let prompt_lines = prompt.lines().count();
let prompt_last: String = prompt.lines().last().unwrap_or("").chars().take(60).collect();
tracing::info!("→ task_id:{} | {}r prompti | \"{}...\"", task_id, prompt_lines, prompt_last);
{
let mut st = tui_state.write().await;
st.cur_task_id = Some(task_id.to_string());
st.cur_prompt = Some(format!("{} riviä | \"{}...\"", prompt_lines, prompt_last));
}
let model_name = engine.model_name();
match engine.generate(prompt, &gen_opts).await {
Ok(result) => {
let tokens_sec = (result.tokens_per_sec * 10.0).round() / 10.0;
tracing::info!(
"✓ {} | {} tok | {:.0}ms | {:.1} tok/s",
model_name,
result.tokens_generated,
result.duration_ms,
tokens_sec,
);
{
let mut st = tui_state.write().await;
st.tasks_completed += 1;
st.last_tokens_sec = tokens_sec as f64;
st.cur_task_id = None;
st.cur_prompt = None;
let msg_type = if task_id == "status-check" { "Ping" } else { "Task" };
let msg_text = format!("{} ({} tok)", task_id, result.tokens_generated);
st.push_log(msg_type, msg_text, Some(tokens_sec as f64));
}
let prompt_short: String = prompt.lines().last().unwrap_or("").chars().take(100).collect();
let done = json!({
"type": "llm_done",
"prompt": prompt_short,
"model": format!("{} (Ollama)", model_name),
"response": result.text,
"tokens_generated": result.tokens_generated,
"duration_ms": result.duration_ms,
"tokens_per_sec": tokens_sec,
"load_time_ms": 0,
"task_id": task_id,
});
let _ = write.send(Message::Text(done.to_string())).await;
}
Err(e) => {
tracing::error!("Inferenssivirhe: {}", e);
{
let mut st = tui_state.write().await;
st.cur_task_id = None;
st.cur_prompt = None;
st.push_log("System", format!("Virhe inferenssissä: {}", e), None);
}
}
}
}
}
}
}
// Mallin vaihto lennossa
if text.contains("change_model") {
if let Ok(task) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(new_model) = task.get("model").and_then(|v| v.as_str()) {
if let Some(ref engine) = llm {
tracing::info!("Vaihdetaan malli: {}", new_model);
engine.set_model(new_model.to_string());
match engine.ensure_model().await {
Ok(()) => {
tracing::info!("Malli {} valmis!", new_model);
let mut st = tui_state.write().await;
st.model_name = new_model.to_string();
st.push_log("System", format!("Malli {} ladattu & valmis!", new_model), None);
}
Err(e) => tracing::error!("Mallin lataus epäonnistui: {}", e),
}
}
}
}
}
}
Some(Ok(_)) => {} // Muut viestityypit (binary/ping)
Some(Err(_)) | None => break, // Yhteys poikki
}
}
}
}
tracing::warn!("Yhteys hubiin katkesi — yritetään uudelleen 5s...");
}
Err(e) => {
tracing::warn!("Hubiin yhdistäminen epäonnistui: {} — yritetään uudelleen 5s...", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}