This commit is contained in:
Jaakko Vanhala
2026-04-12 06:22:52 +03:00
parent ce0ccbddd3
commit 403f35efdc
9 changed files with 597 additions and 15 deletions

View File

@@ -21,3 +21,6 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
dialoguer = "0.12.0"
console = "0.16.3"
ratatui = "0.29.0"
crossterm = { version = "0.28.1", features = ["event-stream"] }
tracing-appender = "0.2.4"

View File

@@ -47,10 +47,9 @@ impl LlmEngine {
};
// Kysytään malli TUI:lla jos ei pakotettu ympäristöstä
let model = if let Ok(m) = std::env::var("OLLAMA_MODEL") {
m
} else {
crate::tui::select_model(&ollama_url, &client).await?
let model = match std::env::var("OLLAMA_MODEL") {
Ok(m) if !m.is_empty() => m,
_ => crate::tui::select_model(&ollama_url, &client).await?
};
tracing::info!("Ollama backend: {} | malli: {}", ollama_url, model);

View File

@@ -6,6 +6,7 @@ use tokio_tungstenite::tungstenite::Message;
mod inference;
mod tui;
mod tui_dashboard;
/// GPU-tietorakenne — yhtenäinen kaikille valmistajille
struct GpuInfo {
@@ -268,10 +269,24 @@ fn format_optional<T: std::fmt::Display>(val: Option<T>, suffix: &str) -> String
#[tokio::main]
async fn main() {
let file_appender = tracing_appender::rolling::never(".", "native-node.log");
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_env_filter("native_node=debug")
.with_writer(non_blocking)
.init();
// Hookataan paniikkitilanteet palauttamaan terminaalin raw-moodista
let original_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
tui_dashboard::restore_terminal();
original_hook(panic_info);
}));
let tui_state = std::sync::Arc::new(tokio::sync::RwLock::new(tui_dashboard::DashboardState::new()));
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let hub_url = std::env::var("HUB_URL").unwrap_or_else(|_| "ws://hub:3000/ws".to_string());
let allocated_gb: u32 = std::env::var("ALLOCATED_GB")
.ok()
@@ -287,6 +302,18 @@ async fn main() {
sys["cpu_cores"],
sys["ram_total_mb"]
);
{
let mut st = tui_state.write().await;
st.sys_info = format!("{} | {} | {} ydintä | {} MB RAM",
sys["hostname"].as_str().unwrap_or("?"),
sys["os"].as_str().unwrap_or("?"),
sys["cpu_cores"],
sys["ram_total_mb"]
);
let i = st.sys_info.clone();
st.push_log("System", format!("Järjestelmä: {}", i), None);
}
let gpus = collect_all_gpus();
if gpus.is_empty() {
@@ -328,6 +355,20 @@ async fn main() {
let active_model = llm.as_ref().map(|e| e.model_name()).unwrap_or_else(|| "unknown".to_string());
tracing::info!("Käytettävä kielimalli konfiguroitu (selected_task): {}", active_model);
{
let mut st = tui_state.write().await;
st.model_name = active_model.clone();
st.push_log("System", format!("Malli valmis: {}", active_model), None);
}
// Käynnistetään graafinen TUI vasta kun TUI:n Prompt (LlmEngine::load) on ohitettu!
let ui_state = tui_state.clone();
tokio::spawn(async move {
if let Err(e) = tui_dashboard::run_dashboard(ui_state, cmd_tx).await {
tracing::error!("Pääluupin TUI kaatui: {}", e);
}
});
// Haetaan paikalliset mallit hubille lähetettäväksi
let mut available_models = None;
@@ -355,22 +396,28 @@ async fn main() {
continue;
}
use tokio::io::AsyncBufReadExt;
let mut stdin_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
tokio::select! {
line = stdin_lines.next_line() => {
if let Ok(Some(text)) = line {
let t = text.trim();
if t == "p" || t == "pause" {
cmd = cmd_rx.recv() => {
if let Some(cmd_str) = cmd {
if cmd_str == "pause" {
tracing::info!("Tauotetaan solmun suoritus (Hub ei lähetä tehtäviä)...");
let req = json!({"type": "status_update", "status": "paused"});
let _ = write.send(Message::Text(req.to_string())).await;
} else if t == "r" || t == "resume" || t == "s" {
{
let mut st = tui_state.write().await;
st.status = "PAUSED".to_string();
st.push_log("Network", "Solmu siirretty taukotilaan".to_string(), None);
}
} else if cmd_str == "resume" {
tracing::info!("Jatketaan solmun suoritusta...");
let req = json!({"type": "status_update", "status": "active"});
let _ = write.send(Message::Text(req.to_string())).await;
{
let mut st = tui_state.write().await;
st.status = "ACTIVE".to_string();
st.push_log("System", "Suoritus jatkuu...".to_string(), None);
}
}
}
}
@@ -385,10 +432,32 @@ async fn main() {
tracing::info!("Hub pakotti solmun tauolle (Pause)");
let req = json!({"type": "status_update", "status": "paused"});
let _ = write.send(Message::Text(req.to_string())).await;
{
let mut st = tui_state.write().await;
st.status = "PAUSED".to_string();
st.push_log("Network", "Hub kytki solmun tauolle".to_string(), None);
}
} else if action == "resume" {
tracing::info!("Hub aktivoi solmun suorituksen (Resume)");
let req = json!({"type": "status_update", "status": "active"});
let _ = write.send(Message::Text(req.to_string())).await;
{
let mut st = tui_state.write().await;
st.status = "ACTIVE".to_string();
st.push_log("Network", "Hub palautti solmun töihin".to_string(), None);
}
}
}
}
}
// Verkon globaali tila
if text.contains(r#""type":"network_status""#) {
if let Ok(status) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(nodes) = status.get("active_nodes").and_then(|v| v.as_u64()) {
if let Some(tasks) = status.get("tasks").and_then(|v| v.as_u64()) {
let mut st = tui_state.write().await;
st.network_active_nodes = nodes as usize;
st.network_total_tasks = tasks;
}
}
}
@@ -407,17 +476,35 @@ async fn main() {
let prompt_lines = prompt.lines().count();
let prompt_last: String = prompt.lines().last().unwrap_or("").chars().take(60).collect();
tracing::info!("→ task_id:{} | {}r prompti | \"{}...\"", task_id, prompt_lines, prompt_last);
{
let mut st = tui_state.write().await;
st.cur_task_id = Some(task_id.to_string());
st.cur_prompt = Some(format!("{} riviä | \"{}...\"", prompt_lines, prompt_last));
// Ei login puskemista vielä tässä! Yhdistetään se valmiin lohkoon yhdelle riville.
}
let model_name = engine.model_name();
match engine.generate(prompt, max_tokens).await {
Ok(result) => {
let tokens_sec = (result.tokens_per_sec * 10.0).round() / 10.0;
tracing::info!(
"✓ {} | {} tok | {:.0}ms | {:.1} tok/s",
model_name,
result.tokens_generated,
result.duration_ms,
result.tokens_per_sec,
tokens_sec,
);
{
let mut st = tui_state.write().await;
st.tasks_completed += 1;
st.last_tokens_sec = tokens_sec as f64;
st.cur_task_id = None;
st.cur_prompt = None;
let msg_type = if task_id == "status-check" { "Ping" } else { "Task" };
let msg_text = format!("{} ({} tok)", task_id, result.tokens_generated);
st.push_log(msg_type, msg_text, Some(tokens_sec as f64));
}
let prompt_short: String = prompt.lines().last().unwrap_or("").chars().take(100).collect();
let done = json!({
"type": "llm_done",
@@ -426,7 +513,7 @@ async fn main() {
"response": result.text,
"tokens_generated": result.tokens_generated,
"duration_ms": result.duration_ms,
"tokens_per_sec": (result.tokens_per_sec * 10.0).round() / 10.0,
"tokens_per_sec": tokens_sec,
"load_time_ms": 0,
"task_id": task_id,
});
@@ -434,6 +521,12 @@ async fn main() {
}
Err(e) => {
tracing::error!("Inferenssivirhe: {}", e);
{
let mut st = tui_state.write().await;
st.cur_task_id = None;
st.cur_prompt = None;
st.push_log("System", format!("Virhe inferenssissä: {}", e), None);
}
}
}
}
@@ -449,7 +542,12 @@ async fn main() {
tracing::info!("Vaihdetaan malli: {}", new_model);
engine.set_model(new_model.to_string());
match engine.ensure_model().await {
Ok(()) => tracing::info!("Malli {} valmis!", new_model),
Ok(()) => {
tracing::info!("Malli {} valmis!", new_model);
let mut st = tui_state.write().await;
st.model_name = new_model.to_string();
st.push_log("System", format!("Malli {} ladattu & valmis!", new_model), None);
}
Err(e) => tracing::error!("Mallin lataus epäonnistui: {}", e),
}
}

View File

@@ -0,0 +1,223 @@
use crossterm::{
event::{self, Event, EventStream, KeyCode},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
backend::CrosstermBackend,
layout::{Constraint, Direction, Layout, Alignment},
style::{Color, Modifier, Style},
widgets::{Block, Borders, Paragraph, Wrap},
Terminal,
};
use std::io;
use tokio::sync::RwLock;
use std::sync::Arc;
use futures_util::StreamExt;
use std::time::Duration;
#[derive(Clone)]
pub struct LogEntry {
pub ty: String,
pub msg: String,
pub speed: Option<f64>,
}
pub struct DashboardState {
pub logs: Vec<LogEntry>,
pub status: String,
pub node_id: Option<u64>,
pub sys_info: String,
pub model_name: String,
pub cur_task_id: Option<String>,
pub cur_prompt: Option<String>,
pub tasks_completed: u32,
pub last_tokens_sec: f64,
pub network_active_nodes: usize,
pub network_total_tasks: u64,
}
impl DashboardState {
pub fn new() -> Self {
Self {
logs: Vec::new(),
status: "ACTIVE".to_string(),
node_id: None,
sys_info: "".to_string(),
model_name: "Yhdistetään...".to_string(),
cur_task_id: None,
cur_prompt: None,
tasks_completed: 0,
last_tokens_sec: 0.0,
network_active_nodes: 1, // oletetaan itsemme
network_total_tasks: 0,
}
}
pub fn push_log(&mut self, ty: &str, msg: String, speed: Option<f64>) {
self.logs.push(LogEntry {
ty: ty.to_string(),
msg,
speed,
});
if self.logs.len() > 100 {
self.logs.remove(0);
}
}
}
pub async fn run_dashboard(
state: Arc<RwLock<DashboardState>>,
cmd_tx: tokio::sync::mpsc::UnboundedSender<String>,
) -> Result<(), io::Error> {
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
let mut reader = EventStream::new();
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
tokio::select! {
_ = interval.tick() => {
let st = state.read().await;
terminal.draw(|f| ui(f, &st))?;
}
ev = reader.next() => {
if let Some(Ok(Event::Key(key))) = ev {
match key.code {
KeyCode::Char('q') | KeyCode::Esc => {
// Palautetaan näyttö ja suljetaan ohjelma
disable_raw_mode()?;
execute!(terminal.backend_mut(), LeaveAlternateScreen)?;
std::process::exit(0);
}
KeyCode::Char('p') | KeyCode::Char('P') => {
let _ = cmd_tx.send("pause".to_string());
}
KeyCode::Char('r') | KeyCode::Char('R') | KeyCode::Char('s') => {
let _ = cmd_tx.send("resume".to_string());
}
_ => {}
}
}
}
}
}
}
pub fn restore_terminal() {
let _ = disable_raw_mode();
let _ = execute!(io::stdout(), LeaveAlternateScreen);
}
fn ui(f: &mut ratatui::Frame, st: &DashboardState) {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Length(3), // Header
Constraint::Min(0), // Body
Constraint::Length(3), // Footer / Status
].as_ref())
.split(f.area());
// --- Header ---
let header_text = match st.node_id {
Some(id) => format!(" Kipinä Agentic Node #{} ", id),
None => " Kipinä Agentic Node (Yhdistää...) ".to_string(),
};
let header = Paragraph::new(header_text)
.style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD))
.alignment(Alignment::Center)
.block(Block::default().borders(Borders::ALL).style(Style::default().fg(Color::DarkGray)));
f.render_widget(header, chunks[0]);
// --- Body ---
let body_chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Length(7), // Yläosan info ja tehtävä
Constraint::Min(0), // Lokit / Chat alas
].as_ref())
.split(chunks[1]);
let top_panels = Layout::default()
.direction(Direction::Horizontal)
.constraints([
Constraint::Percentage(40), // Vasen paneeli (Info)
Constraint::Percentage(60), // Oikea paneeli (Tehtävä)
].as_ref())
.split(body_chunks[0]);
// Vasen paneeli: Laitteisto, Malli & Verkosto
let info_text = format!(
"🚀 Malli: {}\n💻 Järjestelmä: {}\n📊 Tehdyt: {} | Nopeus: {} t/s\n🌐 Verkosto: {} solmua | {} tehtävää",
st.model_name, st.sys_info, st.tasks_completed, st.last_tokens_sec, st.network_active_nodes, st.network_total_tasks
);
let left_panel = Paragraph::new(info_text)
.block(Block::default().title(" Laitteisto ja AI ").borders(Borders::ALL))
.style(Style::default().fg(Color::White))
.wrap(Wrap { trim: true });
f.render_widget(left_panel, top_panels[0]);
// Oikea paneeli: Käynnissä oleva tehtävä
let task_title = match &st.cur_task_id {
Some(id) => format!(" Työn alla: {} ", id),
None => " Vapaana ".to_string(),
};
let task_content = st.cur_prompt.clone().unwrap_or_else(|| "Odotetaan tehtäviä Hubilta...".to_string());
let task_style = if st.cur_task_id.is_some() {
Style::default().fg(Color::Magenta)
} else {
Style::default().fg(Color::DarkGray)
};
let task_panel = Paragraph::new(task_content)
.wrap(Wrap { trim: true })
.block(Block::default().title(task_title).borders(Borders::ALL).style(task_style));
f.render_widget(task_panel, top_panels[1]);
// Alaosan paneeli: Tapahtumaloki koko leveydeltä
let area_height = body_chunks[1].height.saturating_sub(2) as usize;
let skip_count = if st.logs.len() > area_height { st.logs.len() - area_height } else { 0 };
let visible_logs: Vec<ratatui::text::Line> = st.logs.iter().skip(skip_count).map(|log| {
let ty_color = match log.ty.as_str() {
"System" => Color::Yellow,
"Network" => Color::Blue,
"Task" => Color::Magenta,
"Ping" => Color::DarkGray,
_ => Color::White,
};
let speed_str = if let Some(s) = log.speed {
format!(" | {:.1} tok/s", s)
} else {
"".to_string()
};
ratatui::text::Line::from(vec![
ratatui::text::Span::styled(format!("{: <8}", log.ty), Style::default().fg(ty_color).add_modifier(Modifier::BOLD)),
ratatui::text::Span::raw(" | "),
ratatui::text::Span::styled(log.msg.clone(), Style::default().fg(Color::White)),
ratatui::text::Span::styled(speed_str, Style::default().fg(ty_color)),
])
}).collect();
let logs_panel = Paragraph::new(visible_logs)
.block(Block::default().title(" Tapahtumaloki ").borders(Borders::ALL).style(Style::default().fg(Color::Cyan)));
f.render_widget(logs_panel, body_chunks[1]);
// --- Footer / Status ---
let status_color = if st.status == "ACTIVE" { Color::Green } else { Color::Yellow };
let status_text = format!(" Tila: {} | Komennot: [P] Pause / [R] Työhön / [Q] Sulje ", st.status);
let footer = Paragraph::new(status_text)
.style(Style::default().fg(status_color).add_modifier(Modifier::BOLD))
.alignment(Alignment::Center)
.block(Block::default().borders(Borders::ALL));
f.render_widget(footer, chunks[2]);
}