Natiivisolmun auth-viestistä tallennetaan mallilistaus node_models-mappiin. /api/tags priorisoi verkon solmujen malleja lokaalin Ollaman edelle. api_hardware käyttää tietokannan litteää rakennetta. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1279 lines
57 KiB
Rust
1279 lines
57 KiB
Rust
use axum::{
|
|
extract::ws::{Message, WebSocket, WebSocketUpgrade},
|
|
extract::ConnectInfo,
|
|
response::IntoResponse,
|
|
routing::get,
|
|
Router,
|
|
};
|
|
use futures::{sink::SinkExt, stream::StreamExt};
|
|
use std::collections::HashMap;
|
|
use std::net::{IpAddr, SocketAddr};
|
|
use std::sync::{Arc, Mutex};
|
|
use tokio::sync::broadcast;
|
|
use tower_http::services::{ServeDir, ServeFile};
|
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|
|
|
mod db;
|
|
|
|
const MAX_MESSAGE_SIZE: usize = 16 * 1024;
|
|
|
|
// Sallitut originit — estää cross-site WebSocket hijackingin
|
|
const ALLOWED_ORIGINS: &[&str] = &[
|
|
"https://kipina.studio",
|
|
"http://localhost:3000",
|
|
"http://127.0.0.1:3000",
|
|
];
|
|
|
|
// Sallitut viestityyypit clientilta
|
|
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "llm_error", "download_progress", "user_text", "single_tokenize_done"];
|
|
|
|
struct AppState {
|
|
next_node_id: Mutex<u64>,
|
|
nodes_vram: Mutex<HashMap<u64, u32>>,
|
|
nodes_tokens: Mutex<HashMap<u64, u32>>, // Gamification: Kipinä Tokens
|
|
total_tasks: Mutex<u64>,
|
|
stats_tx: broadcast::Sender<String>,
|
|
node_channels: tokio::sync::RwLock<HashMap<u64, tokio::sync::mpsc::UnboundedSender<String>>>, // Kohdennettu reititys
|
|
_pending_consensus: tokio::sync::RwLock<HashMap<String, Vec<serde_json::Value>>>, // Proof of Compute -konsensus
|
|
feature_flags: tokio::sync::RwLock<HashMap<String, bool>>, // Tuntee TODO.md:n ruksit lennosta
|
|
ip_connections: Mutex<HashMap<IpAddr, u32>>,
|
|
node_ips: Mutex<HashMap<u64, IpAddr>>,
|
|
node_tasks: Mutex<HashMap<u64, String>>, // node_id → selected_task
|
|
node_types: Mutex<HashMap<u64, String>>, // node_id → "native" | "browser"
|
|
node_busy: Mutex<std::collections::HashSet<u64>>, // Solmut joilla on aktiivinen tehtävä
|
|
pending_task_ids: Mutex<std::collections::HashSet<String>>, // Hubin jakamat task_id:t (gamification-validointi)
|
|
pending_responses: Mutex<HashMap<String, tokio::sync::oneshot::Sender<serde_json::Value>>>, // task_id → oneshot API-vastaukselle
|
|
api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä)
|
|
node_models: tokio::sync::RwLock<HashMap<u64, serde_json::Value>>, // node_id → ollama tags JSON
|
|
db: db::NodeDb,
|
|
}
|
|
|
|
const ADMIN_HTML: &str = r##"<!DOCTYPE html>
|
|
<html lang="fi">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<title>Kipina Admin</title>
|
|
<style>
|
|
:root { --bg:#0d1117; --panel:#161b22; --text:#c9d1d9; --accent:#58a6ff; --green:#3fb950; --yellow:#d29922; --red:#f85149; --border:#30363d; }
|
|
* { box-sizing:border-box; margin:0; padding:0; }
|
|
body { font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,sans-serif; background:var(--bg); color:var(--text); padding:20px; }
|
|
h1 { color:var(--accent); margin-bottom:5px; }
|
|
.sub { color:#8b949e; margin-bottom:20px; }
|
|
.stats-grid { display:grid; grid-template-columns:repeat(auto-fit,minmax(150px,1fr)); gap:12px; margin-bottom:24px; }
|
|
.stat-card { background:var(--panel); border:1px solid var(--border); border-radius:8px; padding:16px; text-align:center; }
|
|
.stat-card .val { font-size:28px; font-weight:700; color:var(--accent); }
|
|
.stat-card .label { font-size:12px; color:#8b949e; margin-top:4px; }
|
|
table { width:100%; border-collapse:collapse; margin-bottom:24px; font-size:13px; table-layout:fixed; }
|
|
th { background:var(--panel); color:var(--accent); text-align:left; padding:10px 8px; border-bottom:2px solid var(--border); position:sticky; top:0; z-index:1; white-space:nowrap; overflow:hidden; }
|
|
td { padding:8px; border-bottom:1px solid var(--border); height:36px; white-space:nowrap; overflow:hidden; text-overflow:ellipsis; }
|
|
tr:hover td { background:#1c2333; }
|
|
.table-wrap { max-height:60vh; overflow-y:auto; border:1px solid var(--border); border-radius:6px; }
|
|
.badge { display:inline-block; padding:2px 8px; border-radius:10px; font-size:11px; font-weight:600; }
|
|
.badge-green { background:#23392050; color:var(--green); border:1px solid #23392080; }
|
|
.badge-yellow { background:#d2992220; color:var(--yellow); border:1px solid #d2992240; }
|
|
.badge-red { background:#f8514920; color:var(--red); border:1px solid #f8514940; }
|
|
.badge-blue { background:#58a6ff20; color:var(--accent); border:1px solid #58a6ff40; }
|
|
.tabs { display:flex; gap:8px; margin-bottom:16px; }
|
|
.tab { padding:8px 16px; border-radius:6px; border:1px solid var(--border); background:var(--panel); color:var(--text); cursor:pointer; font-size:14px; }
|
|
.tab.active { background:var(--accent); color:#0d1117; border-color:var(--accent); }
|
|
.panel { display:none; }
|
|
.panel.active { display:block; }
|
|
.table-wrap { overflow-x:auto; max-height:70vh; overflow-y:auto; }
|
|
.online { color:var(--green); }
|
|
.offline { color:#8b949e; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1>Kipina Admin</h1>
|
|
<p class="sub">Node-sessiot ja tokenisointivertailut · <span id="admin-version" style="color:var(--accent)">-</span></p>
|
|
|
|
<div id="stats" class="stats-grid"></div>
|
|
|
|
<div class="tabs">
|
|
<div class="tab active" onclick="showTab('sessions')">Sessiot</div>
|
|
<div class="tab" onclick="showTab('pairs')">Tokenisointiparit</div>
|
|
<div class="tab" onclick="showTab('hardware')">Laitteisto & Mallit</div>
|
|
</div>
|
|
|
|
<div id="sessions" class="panel active">
|
|
<div class="table-wrap">
|
|
<table>
|
|
<colgroup>
|
|
<col style="width:35px"><col style="width:85px"><col style="width:95px"><col style="width:65px"><col style="width:110px"><col style="width:80px">
|
|
<col style="width:65px"><col style="width:40px"><col style="width:70px"><col style="width:90px"><col style="width:60px">
|
|
<col style="width:65px"><col style="width:40px"><col style="width:130px"><col style="width:60px">
|
|
</colgroup>
|
|
<thead><tr>
|
|
<th>ID</th><th>Tila</th><th>Tehtävä</th><th>Tyyppi</th><th>IP</th><th>Alusta</th>
|
|
<th>OS</th><th>CPU</th><th>RAM</th><th>GPU</th><th>VRAM</th>
|
|
<th>WebGPU</th><th>Teht.</th><th>Yhdistetty</th><th>Kesto</th>
|
|
</tr></thead><tbody id="sessions-body"></tbody></table>
|
|
</div>
|
|
</div>
|
|
|
|
<div id="pairs" class="panel">
|
|
<div class="table-wrap">
|
|
<table><thead><tr>
|
|
<th>Solmu</th><th>EN</th><th>EN tok</th><th>EN m/t</th>
|
|
<th>FI</th><th>FI tok</th><th>FI m/t</th><th>Ylikust.</th><th>Aika</th>
|
|
</tr></thead><tbody id="pairs-body"></tbody></table>
|
|
</div>
|
|
</div>
|
|
|
|
<div id="hardware" class="panel">
|
|
<div class="stats-grid" id="hardware-stats"></div>
|
|
<h2 style="margin-top: 10px; margin-bottom: 10px; color: var(--accent); font-size: 16px;">Käytettävissä olevat paikalliset kielimallit</h2>
|
|
<div class="table-wrap">
|
|
<table>
|
|
<thead><tr>
|
|
<th>Nimi</th><th>Koko</th><th>Parametrit</th>
|
|
</tr></thead>
|
|
<tbody id="models-body"></tbody>
|
|
</table>
|
|
</div>
|
|
</div>
|
|
|
|
<script>
|
|
function showTab(name) {
|
|
document.querySelectorAll('.panel').forEach(p => p.classList.remove('active'));
|
|
document.querySelectorAll('.tab').forEach(t => t.classList.remove('active'));
|
|
document.getElementById(name).classList.add('active');
|
|
event.target.classList.add('active');
|
|
}
|
|
|
|
function badge(text, cls) { return `<span class="badge badge-${cls}">${text}</span>`; }
|
|
|
|
function timeSince(iso) {
|
|
if (!iso) return '';
|
|
const d = new Date(iso);
|
|
const now = new Date();
|
|
const s = Math.floor((now - d) / 1000);
|
|
if (s < 60) return s + 's';
|
|
if (s < 3600) return Math.floor(s/60) + 'min';
|
|
if (s < 86400) return Math.floor(s/3600) + 'h';
|
|
return Math.floor(s/86400) + 'pv';
|
|
}
|
|
|
|
function duration(start, end) {
|
|
if (!start) return '';
|
|
const s = end ? new Date(end) : new Date();
|
|
const d = Math.floor((s - new Date(start)) / 1000);
|
|
if (d < 60) return d + 's';
|
|
if (d < 3600) return Math.floor(d/60) + 'min';
|
|
return Math.floor(d/3600) + 'h ' + (Math.floor(d/60)%60) + 'min';
|
|
}
|
|
|
|
async function load() {
|
|
const [statsRes, sessionsRes, pairsRes, hwRes, modelsRes] = await Promise.all([
|
|
fetch('/api/stats'), fetch('/api/sessions'), fetch('/api/pairs'),
|
|
fetch('/api/v1/hardware').catch(() => ({json: async()=>({gpu_name:'', vram_mb:0, ram_mb:0})})),
|
|
fetch('/api/v1/ollama/tags').catch(() => ({json: async()=>({models:[]})}))
|
|
]);
|
|
const stats = await statsRes.json();
|
|
const sessions = await sessionsRes.json();
|
|
const pairs = await pairsRes.json();
|
|
const hw = await hwRes.json().catch(() => ({gpu_name:'', vram_mb:0, ram_mb:0}));
|
|
const modelsData = await modelsRes.json().catch(() => ({models:[]}));
|
|
|
|
// Versio
|
|
if (stats.version) document.getElementById('admin-version').textContent = 'v' + stats.version;
|
|
|
|
// Stats
|
|
document.getElementById('stats').innerHTML = [
|
|
{v: stats.total_sessions, l: 'Sessioita'},
|
|
{v: stats.active_sessions, l: 'Aktiivisia'},
|
|
{v: stats.unique_ips, l: 'Uniikkeja IP'},
|
|
{v: stats.webgpu_sessions, l: 'WebGPU'},
|
|
{v: stats.cpu_fallback_sessions, l: 'CPU fallback'},
|
|
{v: stats.total_pairs, l: 'Pareja'},
|
|
{v: stats.avg_en_chars_per_token, l: 'EN m/t (ka.)'},
|
|
{v: stats.avg_fi_chars_per_token, l: 'FI m/t (ka.)'},
|
|
{v: stats.avg_overhead_pct + '%', l: 'FI ylikust. (ka.)'},
|
|
].map(s => `<div class="stat-card"><div class="val">${s.v}</div><div class="label">${s.l}</div></div>`).join('');
|
|
|
|
// Sessions — lajittelu: 1) aktiiviset nodet (online + ei viewer), 2) katsojat (online + viewer), 3) offline
|
|
const taskNames = {'tokenize':'Tokenisaatio','smollm-135m':'SmolLM 135M','qwen-05b':'Qwen2.5 0.5B','phi3-mini':'Phi-3 Mini','qwen-coder-05b':'Coder 0.5B','qwen-coder-3b':'Coder 3B','viewer':'Katsoja','codelab-viewer':'Koodilabra'};
|
|
sessions.sort((a, b) => {
|
|
const aOnline = !a.disconnected_at;
|
|
const bOnline = !b.disconnected_at;
|
|
const aViewer = a.selected_task === 'viewer';
|
|
const bViewer = b.selected_task === 'viewer';
|
|
// Online ennen offlinea
|
|
if (aOnline !== bOnline) return aOnline ? -1 : 1;
|
|
// Online: aktiiviset nodet ennen katsojia
|
|
if (aOnline && bOnline && aViewer !== bViewer) return aViewer ? 1 : -1;
|
|
// Saman ryhmän sisällä: uusin ensin
|
|
return new Date(b.connected_at) - new Date(a.connected_at);
|
|
});
|
|
|
|
document.getElementById('sessions-body').innerHTML = sessions.map(s => {
|
|
const online = !s.disconnected_at;
|
|
const isViewer = s.selected_task === 'viewer';
|
|
const status = online
|
|
? (isViewer ? '<span style="color:#d29922">CONNECTED</span>' : '<span class="online">ACTIVE</span>')
|
|
: '<span class="offline">offline</span>';
|
|
const typeBadge = s.node_type === 'native' ? badge('native','blue') : badge('browser','yellow');
|
|
const taskColor = isViewer ? 'yellow' : s.selected_task === 'tokenize' ? 'green' : 'blue';
|
|
const taskBadge = badge(taskNames[s.selected_task] || s.selected_task || '?', taskColor);
|
|
const gpuBadge = s.has_webgpu ? badge('WebGPU','green') : badge('CPU','red');
|
|
const gpu = s.gpu_name ? `${s.gpu_name}` : '-';
|
|
const vram = s.vram_total_mb ? `${s.vram_total_mb} MB` : '-';
|
|
const ram = s.ram_mb ? `${s.ram_mb} MB` : '-';
|
|
const cores = s.cpu_cores || '-';
|
|
const plat = s.platform || s.hostname || '-';
|
|
const os = s.os || '-';
|
|
const time = s.connected_at ? new Date(s.connected_at).toLocaleString('fi-FI') : '';
|
|
const dur = duration(s.connected_at, s.disconnected_at);
|
|
return `<tr>
|
|
<td>${s.node_id}</td><td>${status}</td><td>${taskBadge}</td><td>${typeBadge}</td><td>${s.ip}</td>
|
|
<td>${plat}</td><td>${os}</td><td>${cores}</td><td>${ram}</td>
|
|
<td>${gpu}</td><td>${vram}</td><td>${gpuBadge}</td>
|
|
<td>${s.tasks_completed}</td><td>${time}</td><td>${dur}</td>
|
|
</tr>`;
|
|
}).join('');
|
|
|
|
// Pairs
|
|
document.getElementById('pairs-body').innerHTML = pairs.map(p => {
|
|
const ovColor = p.overhead_pct > 50 ? 'red' : p.overhead_pct > 20 ? 'yellow' : 'green';
|
|
const enCpt = p.en_chars_per_token?.toFixed(2) || '-';
|
|
const fiCpt = p.fi_chars_per_token?.toFixed(2) || '-';
|
|
const ov = p.overhead_pct?.toFixed(1) || '-';
|
|
return `<tr>
|
|
<td>#${p.node_id}</td>
|
|
<td style="max-width:250px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap">${p.en_text||''}</td>
|
|
<td>${p.en_tokens||'-'}</td><td>${enCpt}</td>
|
|
<td style="max-width:250px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap">${p.fi_text||''}</td>
|
|
<td>${p.fi_tokens||'-'}</td><td>${fiCpt}</td>
|
|
<td>${badge(ov+'%', ovColor)}</td>
|
|
<td>${p.duration_ms||0}ms</td>
|
|
</tr>`;
|
|
}).join('');
|
|
|
|
// Hardware
|
|
document.getElementById('hardware-stats').innerHTML = [
|
|
{v: hw.gpu_name || '-', l: 'Paikallinen GPU tila'},
|
|
{v: hw.vram_mb ? hw.vram_mb + ' MB' : '-', l: 'GPU Muisti (VRAM)'},
|
|
{v: hw.ram_mb ? hw.ram_mb + ' MB' : '-', l: 'RAM'},
|
|
].map(s => `<div class="stat-card"><div class="val">${s.v}</div><div class="label">${s.l}</div></div>`).join('');
|
|
|
|
// Models
|
|
document.getElementById('models-body').innerHTML = (modelsData.models || []).map(m => {
|
|
const sizeGb = (m.size / (1024*1024*1024)).toFixed(2) + ' GB';
|
|
const params = m.details?.parameter_size || '-';
|
|
return `<tr>
|
|
<td><strong>${m.name}</strong></td>
|
|
<td>${sizeGb}</td>
|
|
<td>${params}</td>
|
|
</tr>`;
|
|
}).join('');
|
|
}
|
|
|
|
load();
|
|
setInterval(load, 5000);
|
|
</script>
|
|
</body>
|
|
</html>"##;
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
tracing_subscriber::registry()
|
|
.with(
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| "hub=debug,tower_http=debug".into()),
|
|
)
|
|
.with(tracing_subscriber::fmt::layer())
|
|
.init();
|
|
|
|
let (stats_tx, _) = broadcast::channel(100);
|
|
|
|
let state = Arc::new(AppState {
|
|
next_node_id: Mutex::new(1),
|
|
nodes_vram: Mutex::new(HashMap::new()),
|
|
nodes_tokens: Mutex::new(HashMap::new()),
|
|
total_tasks: Mutex::new(0),
|
|
stats_tx: stats_tx.clone(),
|
|
node_channels: tokio::sync::RwLock::new(HashMap::new()),
|
|
_pending_consensus: tokio::sync::RwLock::new(HashMap::new()),
|
|
feature_flags: tokio::sync::RwLock::new(HashMap::new()),
|
|
ip_connections: Mutex::new(HashMap::new()),
|
|
node_ips: Mutex::new(HashMap::new()),
|
|
node_tasks: Mutex::new(HashMap::new()),
|
|
node_types: Mutex::new(HashMap::new()),
|
|
node_busy: Mutex::new(std::collections::HashSet::new()),
|
|
pending_task_ids: Mutex::new(std::collections::HashSet::new()),
|
|
pending_responses: Mutex::new(HashMap::new()),
|
|
api_rate_limits: Mutex::new(HashMap::new()),
|
|
node_models: tokio::sync::RwLock::new(HashMap::new()),
|
|
db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())),
|
|
});
|
|
|
|
tracing::info!("Tietokanta alustettu");
|
|
|
|
let state_for_watcher = state.clone();
|
|
tokio::spawn(async move {
|
|
// Ensimmäinen luku heti, sitten 3s välein
|
|
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(3));
|
|
let file_path = std::env::var("FEATURE_FLAGS_FILE").unwrap_or_else(|_| "../TODO.md".to_string());
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
if let Ok(content) = tokio::fs::read_to_string(&file_path).await {
|
|
let mut flags = HashMap::new();
|
|
for line in content.lines() {
|
|
if line.starts_with("- [ ] **") || line.starts_with("- [x] **") {
|
|
let is_active = line.starts_with("- [x]");
|
|
if let Some(start_idx) = line.find("**") {
|
|
let start = start_idx + 2;
|
|
if let Some(end_idx) = line[start..].find("**") {
|
|
let end = end_idx + start;
|
|
let feature_name = line[start..end].trim_end_matches(':').trim().to_string();
|
|
flags.insert(feature_name, is_active);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
*state_for_watcher.feature_flags.write().await = flags;
|
|
}
|
|
}
|
|
});
|
|
|
|
let state_for_task = state.clone();
|
|
|
|
// Ajastin, joka jakaa satunnaisia tekoälytehtäviä eri pituuksilla
|
|
tokio::spawn(async move {
|
|
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
|
|
|
|
// Kieliparit: sama semanttinen sisältö englanniksi ja suomeksi
|
|
let pairs: Vec<(&str, &str)> = vec![
|
|
("Tell me a joke.", "Kerro vitsi."),
|
|
("What is Rust?", "Mikä on Rust?"),
|
|
("Explain WebGPU briefly.", "Selitä WebGPU lyhyesti."),
|
|
("It was a dark and stormy night, and the old sea captain began his tale:", "Oli synkkä ja myrskyinen yö, ja vanha merikapteeni aloitti tarinansa:"),
|
|
("Artificial intelligence is transforming the world in many ways, but perhaps the most significant change is", "Tekoäly muuttaa maailmaa monella tavalla, mutta kenties merkittävin muutos on"),
|
|
("Distributed computing in the browser is a fascinating concept because", "Hajautettu laskenta selaimessa on kiehtova konsepti, koska"),
|
|
("By the year 2030, programmers will no longer write code by hand, instead they will", "Vuonna 2030 ohjelmoijat eivät enää kirjoita koodia käsin, vaan he"),
|
|
("Imagine a world where every computer, phone, and tablet combines its processing power into one vast AI network. This future is closer than you think, because", "Kuvittele maailma, jossa jokainen tietokone, puhelin ja tabletti yhdistää prosessointivoimansa yhdeksi valtavaksi tekoälyverkoksi. Tämä tulevaisuus on lähempänä kuin uskotkaan, sillä"),
|
|
("The open source movement has fundamentally changed how software is built. What started as a fringe philosophy has become the backbone of modern infrastructure, and the next frontier is", "Avoimen lähdekoodin liike on muuttanut perustavanlaatuisesti ohjelmistojen rakentamisen. Marginaalisesta filosofiasta on tullut modernin infrastruktuurin selkäranka, ja seuraava rajapyykki on"),
|
|
];
|
|
|
|
let mut rng_state: u64 = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_nanos() as u64;
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
rng_state ^= rng_state << 13;
|
|
rng_state ^= rng_state >> 7;
|
|
rng_state ^= rng_state << 17;
|
|
let idx = (rng_state as usize) % pairs.len();
|
|
let (en, fi) = pairs[idx];
|
|
|
|
let llm_prompts = vec![
|
|
"Tell me a short joke.",
|
|
"What is WebGPU in one sentence?",
|
|
"Explain distributed computing briefly.",
|
|
"Write a haiku about technology.",
|
|
"What makes Rust special?",
|
|
];
|
|
let llm_idx = (rng_state as usize / 7) % llm_prompts.len();
|
|
|
|
// Smart Routing: Lähetetään vain niille, jotka valittuna ja idle
|
|
let mut sends = Vec::new();
|
|
{
|
|
let channels = state_for_task.node_channels.read().await;
|
|
let tasks = state_for_task.node_tasks.lock().unwrap();
|
|
let mut busy = state_for_task.node_busy.lock().unwrap();
|
|
|
|
for (node_id, task) in tasks.iter() {
|
|
if !busy.contains(node_id) {
|
|
// Vapaa node -> lähetetään oikea tehtävä
|
|
let msg = match task.as_str() {
|
|
"tokenize" => Some(serde_json::json!({ "type": "pair_task", "en": en, "fi": fi })),
|
|
"smollm-135m" => Some(serde_json::json!({ "type": "llm_prompt", "prompt": llm_prompts[llm_idx], "model": "smollm-135m" })),
|
|
"qwen-05b" => Some(serde_json::json!({ "type": "llm_prompt", "prompt": llm_prompts[llm_idx], "model": "qwen-05b" })),
|
|
"phi3-mini" => Some(serde_json::json!({ "type": "llm_prompt", "prompt": llm_prompts[llm_idx], "model": "phi3-mini" })),
|
|
_ => None, // Coder ja viewer ei saa auto-tehtäviä
|
|
};
|
|
|
|
if let Some(payload) = msg {
|
|
if let Some(ch) = channels.get(node_id) {
|
|
sends.push((ch.clone(), payload.to_string()));
|
|
busy.insert(*node_id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for (ch, msg_str) in sends {
|
|
let _ = ch.send(msg_str);
|
|
}
|
|
|
|
// tracing::debug!("Tehtävät lähetetty reititetysti idle-nodeille");
|
|
}
|
|
});
|
|
|
|
let app = Router::new()
|
|
.route("/ws", get(ws_handler))
|
|
.route("/api/sessions", get(api_sessions))
|
|
.route("/api/pairs", get(api_pairs))
|
|
.route("/api/stats", get(api_stats))
|
|
.route("/api/v1/chat/completions", axum::routing::post(api_chat_completions))
|
|
.route("/api/v1/model", axum::routing::post(api_change_model))
|
|
.route("/api/v1/hardware", get(api_hardware))
|
|
.route("/api/v1/ollama/tags", get(api_ollama_tags))
|
|
.route("/api/v1/agents", get(api_get_agents).post(api_upsert_agent))
|
|
.route("/api/v1/agents/:id", axum::routing::delete(api_delete_agent))
|
|
.route("/admin", get(admin_page))
|
|
.nest_service("/", {
|
|
let static_dir = std::env::var("STATIC_DIR").unwrap_or_else(|_| "../frontend/dist".to_string());
|
|
ServeDir::new(&static_dir).fallback(ServeFile::new(format!("{}/index.html", static_dir)))
|
|
})
|
|
.with_state(state);
|
|
|
|
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
|
|
tracing::info!("Kipinä Agent Hub v{} käynnistyy osoitteessa http://localhost:3000", env!("CARGO_PKG_VERSION"));
|
|
|
|
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
|
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
|
|
}
|
|
|
|
async fn api_sessions(
|
|
headers: axum::http::HeaderMap,
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
) -> axum::response::Response {
|
|
if !check_admin_auth(&headers) { return admin_unauthorized(); }
|
|
axum::Json(state.db.get_sessions(200)).into_response()
|
|
}
|
|
|
|
async fn api_pairs(
|
|
headers: axum::http::HeaderMap,
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
) -> axum::response::Response {
|
|
if !check_admin_auth(&headers) { return admin_unauthorized(); }
|
|
axum::Json(state.db.get_pair_results(500)).into_response()
|
|
}
|
|
|
|
async fn api_stats(
|
|
headers: axum::http::HeaderMap,
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
) -> axum::response::Response {
|
|
if !check_admin_auth(&headers) { return admin_unauthorized(); }
|
|
let mut stats = state.db.get_stats();
|
|
if let Some(obj) = stats.as_object_mut() {
|
|
obj.insert("version".to_string(), serde_json::json!(env!("CARGO_PKG_VERSION")));
|
|
}
|
|
axum::Json(stats).into_response()
|
|
}
|
|
|
|
fn check_admin_auth(headers: &axum::http::HeaderMap) -> bool {
|
|
let password = match std::env::var("ADMIN_PASSWORD") {
|
|
Ok(p) if !p.is_empty() => p,
|
|
_ => {
|
|
tracing::warn!("ADMIN_PASSWORD ei ole asetettu — käytetään oletusta 'kipina' (ÄLÄ käytä tuotannossa!)");
|
|
"kipina".to_string()
|
|
}
|
|
};
|
|
if let Some(auth) = headers.get("authorization").and_then(|v| v.to_str().ok()) {
|
|
if auth.starts_with("Basic ") {
|
|
use base64::Engine;
|
|
if let Ok(decoded_bytes) = base64::engine::general_purpose::STANDARD
|
|
.decode(auth.trim_start_matches("Basic ").trim())
|
|
{
|
|
if let Ok(decoded) = String::from_utf8(decoded_bytes) {
|
|
if let Some(pass) = decoded.split(':').nth(1) {
|
|
return pass == password;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
false
|
|
}
|
|
|
|
fn admin_unauthorized() -> axum::response::Response {
|
|
axum::response::Response::builder()
|
|
.status(401)
|
|
.header("WWW-Authenticate", "Basic realm=\"Kipinä Admin\"")
|
|
.body(axum::body::Body::from("Unauthorized"))
|
|
.unwrap()
|
|
}
|
|
|
|
// ── Agents API ──
|
|
|
|
async fn api_get_agents(
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
) -> axum::response::Response {
|
|
axum::Json(state.db.get_agents()).into_response()
|
|
}
|
|
|
|
async fn api_upsert_agent(
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
axum::Json(payload): axum::Json<serde_json::Value>,
|
|
) -> axum::response::Response {
|
|
match state.db.upsert_agent(&payload) {
|
|
Ok(()) => axum::Json(serde_json::json!({"ok": true})).into_response(),
|
|
Err(e) => (axum::http::StatusCode::BAD_REQUEST, e).into_response(),
|
|
}
|
|
}
|
|
|
|
async fn api_delete_agent(
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
axum::extract::Path(id): axum::extract::Path<String>,
|
|
) -> axum::response::Response {
|
|
match state.db.delete_agent(&id) {
|
|
Ok(()) => axum::Json(serde_json::json!({"ok": true})).into_response(),
|
|
Err(e) => (axum::http::StatusCode::BAD_REQUEST, e).into_response(),
|
|
}
|
|
}
|
|
|
|
async fn admin_page(headers: axum::http::HeaderMap) -> axum::response::Response {
|
|
if !check_admin_auth(&headers) { return admin_unauthorized(); }
|
|
axum::response::Html(ADMIN_HTML).into_response()
|
|
}
|
|
|
|
async fn ws_handler(
|
|
ws: WebSocketUpgrade,
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
|
headers: axum::http::HeaderMap,
|
|
) -> impl IntoResponse {
|
|
// Origin-tarkistus — estää cross-site WebSocket hijackingin
|
|
if let Some(origin) = headers.get("origin").and_then(|v| v.to_str().ok()) {
|
|
let is_allowed = ALLOWED_ORIGINS.iter().any(|&allowed| origin == allowed)
|
|
|| origin.starts_with("http://192.168.")
|
|
|| origin.starts_with("http://10.")
|
|
|| origin.starts_with("http://172."); // LAN-avaruudet
|
|
|
|
if !is_allowed {
|
|
tracing::warn!("Estetty yhteys väärällä originilla: {}", origin);
|
|
return (
|
|
axum::http::StatusCode::FORBIDDEN,
|
|
"Origin not allowed",
|
|
).into_response();
|
|
}
|
|
}
|
|
// Origin puuttuu → natiivi-node (ei selainta), sallitaan
|
|
|
|
let ip = headers.get("x-forwarded-for")
|
|
.and_then(|v| v.to_str().ok())
|
|
.and_then(|s| s.split(',').next())
|
|
.and_then(|s| s.trim().parse::<IpAddr>().ok())
|
|
.unwrap_or_else(|| addr.ip());
|
|
|
|
// Max yhteyttä per IP (ei rajoiteta localhost/127.0.0.1)
|
|
{
|
|
let is_local = ip.is_loopback();
|
|
if !is_local {
|
|
let conns = state.ip_connections.lock().unwrap();
|
|
let count = conns.get(&ip).copied().unwrap_or(0);
|
|
if count >= 20 {
|
|
tracing::warn!("IP {} ylitti yhteysrajan ({}/20) — estetty", ip, count);
|
|
return (
|
|
axum::http::StatusCode::TOO_MANY_REQUESTS,
|
|
"Max 20 yhteyttä per IP",
|
|
).into_response();
|
|
}
|
|
}
|
|
}
|
|
|
|
ws.max_message_size(MAX_MESSAGE_SIZE)
|
|
.on_upgrade(move |socket| handle_socket(socket, state, ip))
|
|
.into_response()
|
|
}
|
|
|
|
async fn broadcast_stats(state: &Arc<AppState>) {
|
|
let total_nodes;
|
|
let mut total_vram = 0;
|
|
{
|
|
let map = state.nodes_vram.lock().unwrap();
|
|
total_nodes = map.len();
|
|
for (_, vram) in map.iter() {
|
|
total_vram += vram;
|
|
}
|
|
}
|
|
let completed = *state.total_tasks.lock().unwrap();
|
|
let stats_msg = serde_json::json!({
|
|
"type": "stats",
|
|
"version": env!("CARGO_PKG_VERSION"),
|
|
"nodes": total_nodes,
|
|
"vram_gb": total_vram,
|
|
"tasks": completed
|
|
});
|
|
let _ = state.stats_tx.send(stats_msg.to_string());
|
|
}
|
|
|
|
/// Validoi client-viesti: pakollinen "type"-kenttä, sallittu tyyppi, validi JSON
|
|
fn validate_message(text: &str) -> Result<serde_json::Value, &'static str> {
|
|
let json: serde_json::Value = serde_json::from_str(text)
|
|
.map_err(|_| "Ei validi JSON")?;
|
|
|
|
let msg_type = json.get("type")
|
|
.and_then(|v| v.as_str())
|
|
.ok_or("Puuttuva 'type'-kenttä")?;
|
|
|
|
if !ALLOWED_MSG_TYPES.contains(&msg_type) {
|
|
return Err("Tuntematon viestityyppi");
|
|
}
|
|
|
|
// Tyyppikohtainen validointi
|
|
match msg_type {
|
|
"auth" => {
|
|
// allocated_gb pitää olla järkevä (0-128)
|
|
if let Some(gb) = json.get("allocated_gb").and_then(|v| v.as_u64()) {
|
|
if gb > 128 { return Err("allocated_gb liian suuri"); }
|
|
}
|
|
}
|
|
"pair_done" => {
|
|
// Pitää sisältää en ja fi -objektit
|
|
if json.get("en").is_none() || json.get("fi").is_none() {
|
|
return Err("pair_done: puuttuu en/fi");
|
|
}
|
|
// token_count pitää olla järkevä
|
|
for lang in &["en", "fi"] {
|
|
if let Some(tc) = json.get(lang).and_then(|l| l.get("token_count")).and_then(|v| v.as_u64()) {
|
|
if tc > 10000 { return Err("token_count liian suuri"); }
|
|
}
|
|
}
|
|
}
|
|
"result" => {
|
|
// data-kenttä pitää olla olemassa
|
|
if json.get("data").is_none() && json.get("status").is_none() {
|
|
return Err("result: puuttuu data/status");
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
|
|
Ok(json)
|
|
}
|
|
|
|
async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|
// Rekisteröidään IP-yhteys
|
|
{
|
|
let mut conns = state.ip_connections.lock().unwrap();
|
|
*conns.entry(ip).or_insert(0) += 1;
|
|
}
|
|
|
|
let (mut sender, mut receiver) = socket.split();
|
|
|
|
let node_id = {
|
|
let mut next_id = state.next_node_id.lock().unwrap();
|
|
let id = *next_id;
|
|
*next_id += 1;
|
|
id
|
|
};
|
|
|
|
// Tallennetaan node_id → IP -mappaus
|
|
{
|
|
state.node_ips.lock().unwrap().insert(node_id, ip);
|
|
}
|
|
|
|
tracing::info!("Solmu {} yhdistyi osoitteesta {}", node_id, ip);
|
|
|
|
let (node_tx, mut node_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
|
|
// Tallennetaan node channel reititystä varten
|
|
{
|
|
state.node_channels.write().await.insert(node_id, node_tx);
|
|
}
|
|
|
|
// Yksinkertaistettu broadcast tx vastaanotto
|
|
let mut rx = state.stats_tx.subscribe();
|
|
|
|
let sender_task = tokio::spawn(async move {
|
|
loop {
|
|
tokio::select! {
|
|
result = rx.recv() => {
|
|
match result {
|
|
Ok(msg) => {
|
|
if sender.send(Message::Text(msg)).await.is_err() { break; }
|
|
}
|
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
|
tracing::debug!("Broadcast lagged {} viestiä — ohitetaan", n);
|
|
continue;
|
|
}
|
|
Err(_) => break, // Kanava suljettu
|
|
}
|
|
}
|
|
Some(direct_msg) = node_rx.recv() => {
|
|
if sender.send(Message::Text(direct_msg)).await.is_err() { break; }
|
|
}
|
|
else => break,
|
|
}
|
|
}
|
|
});
|
|
|
|
// Receiver loop
|
|
while let Some(Ok(msg)) = receiver.next().await {
|
|
let text = match msg {
|
|
Message::Text(t) => t,
|
|
Message::Close(_) => break,
|
|
_ => continue,
|
|
};
|
|
|
|
if text.len() > MAX_MESSAGE_SIZE {
|
|
tracing::warn!("Solmu {} ({}) lähetti liian suuren viestin ({} tavua)", node_id, ip, text.len());
|
|
continue;
|
|
}
|
|
|
|
// Validointi
|
|
let json = match validate_message(&text) {
|
|
Ok(j) => j,
|
|
Err(reason) => {
|
|
let preview: String = text.chars().take(100).collect();
|
|
tracing::warn!("Solmu {} ({}) lähetti virheellisen viestin: {} — {:?}", node_id, ip, reason, preview);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let msg_type = json.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
|
|
|
if msg_type == "auth" {
|
|
{
|
|
let allocated = json.get("allocated_gb").and_then(|v| v.as_u64()).unwrap_or(4) as u32;
|
|
let node_type = json.get("node_type").and_then(|v| v.as_str()).unwrap_or("browser");
|
|
|
|
// API-avain vaaditaan natiivisolmuilta (ei selaimilta)
|
|
if node_type == "native" {
|
|
let required_key = std::env::var("NODE_API_KEY").unwrap_or_default();
|
|
if !required_key.is_empty() {
|
|
let provided_key = json.get("api_key").and_then(|v| v.as_str()).unwrap_or("");
|
|
if provided_key != required_key {
|
|
tracing::warn!("Solmu {} ({}) hylätty: virheellinen API-avain", node_id, ip);
|
|
break; // Suljetaan WebSocket
|
|
}
|
|
}
|
|
}
|
|
|
|
{
|
|
let mut map = state.nodes_vram.lock().unwrap();
|
|
map.insert(node_id, allocated);
|
|
}
|
|
|
|
let selected_task = json.get("selected_task").and_then(|v| v.as_str()).unwrap_or("tokenize").to_string();
|
|
let is_viewer = selected_task == "viewer" || selected_task == "codelab-viewer";
|
|
let existing = state.node_tasks.lock().unwrap().contains_key(&node_id);
|
|
|
|
if existing {
|
|
// Sama yhteys, eri tehtävä → päivitetään
|
|
state.db.update_session_task(node_id, &selected_task);
|
|
tracing::info!("Solmu {} päivitti tehtävän → {}", node_id, selected_task);
|
|
} else {
|
|
// Uusi yhteys — suljetaan saman IP:n viewer-sessiot jos tämä on aktiivinen node
|
|
if !is_viewer {
|
|
state.db.close_viewers_by_ip(&ip.to_string());
|
|
}
|
|
state.db.insert_session(node_id, &ip.to_string(), node_type, &json);
|
|
}
|
|
state.node_tasks.lock().unwrap().insert(node_id, selected_task);
|
|
state.node_types.lock().unwrap().insert(node_id, node_type.to_string());
|
|
|
|
if node_type == "native" {
|
|
let sys = json.get("system");
|
|
let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()).unwrap_or("?");
|
|
let os = sys.and_then(|s| s.get("os")).and_then(|v| v.as_str()).unwrap_or("?");
|
|
let cores = sys.and_then(|s| s.get("cpu_cores")).and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let ram = sys.and_then(|s| s.get("ram_total_mb")).and_then(|v| v.as_u64()).unwrap_or(0);
|
|
|
|
tracing::info!(
|
|
"Solmu {} (natiivi) | {} | {} | {} | {} ydintä | {} MB RAM | varaus: {} GB",
|
|
node_id, ip, hostname, os, cores, ram, allocated
|
|
);
|
|
|
|
// Tallennetaan välitetyt mallit muistiin
|
|
if let Some(models) = json.get("models") {
|
|
let mut nm = state.node_models.write().await;
|
|
nm.insert(node_id, models.clone());
|
|
}
|
|
|
|
if let Some(gpus) = json.get("gpus").and_then(|v| v.as_array()) {
|
|
for gpu in gpus {
|
|
tracing::info!(
|
|
" GPU {}: {} | VRAM: {}/{} MB | {}°C | {}%",
|
|
gpu["index"].as_u64().unwrap_or(0),
|
|
gpu["name"].as_str().unwrap_or("?"),
|
|
gpu["vram_used_mb"].as_u64().unwrap_or(0),
|
|
gpu["vram_total_mb"].as_u64().unwrap_or(0),
|
|
gpu["temperature_c"].as_u64().unwrap_or(0),
|
|
gpu["gpu_util_pct"].as_u64().unwrap_or(0),
|
|
);
|
|
}
|
|
}
|
|
} else {
|
|
let cores = json.get("cpu_cores").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let ram = json.get("device_memory_gb").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
|
let platform = json.get("platform").and_then(|v| v.as_str()).unwrap_or("?");
|
|
let gpu_desc = json.get("gpu")
|
|
.and_then(|g| g.get("description").or_else(|| g.get("vendor")))
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or("ei GPU:ta");
|
|
let task = json.get("selected_task").and_then(|v| v.as_str()).unwrap_or("tokenize");
|
|
|
|
tracing::info!(
|
|
"Solmu {} (selain) | {} | {} | {} ydintä | ~{} GB RAM | GPU: {} | tehtävä: {} | varaus: {} GB",
|
|
node_id, ip, platform, cores, ram, gpu_desc, task, allocated
|
|
);
|
|
}
|
|
}
|
|
broadcast_stats(&state).await;
|
|
|
|
let join_msg = serde_json::json!({
|
|
"type": "node_joined",
|
|
"node_id": node_id
|
|
});
|
|
let _ = state.stats_tx.send(join_msg.to_string());
|
|
|
|
} else if msg_type == "result" {
|
|
tracing::info!("Solmu {} sai tuloksen: {}", node_id, text);
|
|
{
|
|
let mut task_count = state.total_tasks.lock().unwrap();
|
|
*task_count += 1;
|
|
}
|
|
broadcast_stats(&state).await;
|
|
} else if msg_type == "pair_done" {
|
|
state.node_busy.lock().unwrap().remove(&node_id);
|
|
{
|
|
let mut json = json; // Siirretään omistajuus muokkausta varten
|
|
if let Some(obj) = json.as_object_mut() {
|
|
let empty = serde_json::json!({});
|
|
let en = obj.get("en").unwrap_or(&empty);
|
|
let fi = obj.get("fi").unwrap_or(&empty);
|
|
let overhead = obj.get("overhead_pct").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
|
let duration = obj.get("duration_ms").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
|
|
|
let en_text = en.get("text").and_then(|v| v.as_str()).unwrap_or("");
|
|
let en_tokens = en.get("token_count").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let en_chars = en.get("char_count").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let en_cpt = en.get("chars_per_token").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
|
|
|
let fi_text = fi.get("text").and_then(|v| v.as_str()).unwrap_or("");
|
|
let fi_tokens = fi.get("token_count").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let fi_chars = fi.get("char_count").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let fi_cpt = fi.get("chars_per_token").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
|
|
|
let overhead_color = if overhead > 10.0 { "\x1b[31m" } else if overhead < -10.0 { "\x1b[32m" } else { "\x1b[33m" };
|
|
|
|
println!();
|
|
println!("\x1b[36m━━━ Solmu {} ━━━ {:.2}ms ━━━\x1b[0m", node_id, duration);
|
|
println!(" \x1b[34mEN\x1b[0m \"{}\"", en_text);
|
|
println!(" {} merkkiä → \x1b[35m{} tokenia\x1b[0m | \x1b[32m{:.2} merkkiä/token\x1b[0m", en_chars, en_tokens, en_cpt);
|
|
println!(" \x1b[33mFI\x1b[0m \"{}\"", fi_text);
|
|
println!(" {} merkkiä → \x1b[35m{} tokenia\x1b[0m | \x1b[32m{:.2} merkkiä/token\x1b[0m", fi_chars, fi_tokens, fi_cpt);
|
|
println!(" {}Suomen ylikustannus: {:+.1}%\x1b[0m", overhead_color, overhead);
|
|
|
|
// Tallennetaan parin tulos tietokantaan
|
|
let en_ref = obj.get("en").cloned().unwrap_or_default();
|
|
let fi_ref = obj.get("fi").cloned().unwrap_or_default();
|
|
state.db.insert_pair_result(node_id, &en_ref, &fi_ref, overhead, duration);
|
|
state.db.increment_tasks(node_id);
|
|
|
|
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
|
}
|
|
let _ = state.stats_tx.send(json.to_string());
|
|
|
|
let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false);
|
|
let ui_sync = state.feature_flags.read().await.get("Pelimerkkien UI-synkkaus").copied().unwrap_or(false);
|
|
let mut current_balance = 0;
|
|
|
|
{
|
|
let mut task_count = state.total_tasks.lock().unwrap();
|
|
*task_count += 1;
|
|
|
|
if active_incentives {
|
|
let mut tokens = state.nodes_tokens.lock().unwrap();
|
|
let balance = tokens.entry(node_id).or_insert(0);
|
|
*balance += 5; // Palkkio: 5 Kipinä-merkkiä
|
|
current_balance = *balance;
|
|
}
|
|
}
|
|
|
|
if active_incentives && ui_sync {
|
|
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
|
|
let msg = serde_json::json!({
|
|
"type": "token_balance",
|
|
"balance": current_balance
|
|
});
|
|
let _ = tx.send(msg.to_string());
|
|
}
|
|
}
|
|
|
|
broadcast_stats(&state).await;
|
|
}
|
|
} else if msg_type == "single_tokenize_done" {
|
|
{
|
|
let mut json = json.clone();
|
|
if let Some(obj) = json.as_object_mut() {
|
|
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
|
}
|
|
let _ = state.stats_tx.send(json.to_string());
|
|
}
|
|
} else if msg_type == "llm_chunk" {
|
|
{
|
|
let mut json = json;
|
|
if let Some(obj) = json.as_object_mut() {
|
|
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
|
}
|
|
let _ = state.stats_tx.send(json.to_string());
|
|
}
|
|
} else if msg_type == "llm_done" {
|
|
// Vapautetaan solmu ja tarkistetaan task_id:n aitous
|
|
state.node_busy.lock().unwrap().remove(&node_id);
|
|
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
let valid_task = if let Some(ref tid) = task_id {
|
|
state.pending_task_ids.lock().unwrap().remove(tid.as_str())
|
|
} else {
|
|
false
|
|
};
|
|
|
|
// Jos API-pyyntö odottaa tätä vastausta, reititetään suoraan oneshot-kanavaan
|
|
let api_sender = task_id.as_ref().and_then(|tid| {
|
|
state.pending_responses.lock().unwrap().remove(tid)
|
|
});
|
|
|
|
{
|
|
let mut json = json;
|
|
if let Some(obj) = json.as_object_mut() {
|
|
let model = obj.get("model").and_then(|v| v.as_str()).unwrap_or("?");
|
|
let prompt = obj.get("prompt").and_then(|v| v.as_str()).unwrap_or("");
|
|
let response = obj.get("response").and_then(|v| v.as_str()).unwrap_or("");
|
|
let tok_gen = obj.get("tokens_generated").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let duration = obj.get("duration_ms").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
|
let tok_s = obj.get("tokens_per_sec").and_then(|v| v.as_f64()).unwrap_or(0.0);
|
|
|
|
println!();
|
|
println!("\x1b[35m━━━ Solmu {} ━━━ {} ━━━\x1b[0m", node_id, model);
|
|
println!(" Prompt: \x1b[33m\"{}\"\x1b[0m", prompt);
|
|
println!(" Vastaus: \x1b[32m{}\x1b[0m", response);
|
|
println!(" {} tokenia | {:.0}ms | \x1b[36m{:.1} tok/s\x1b[0m", tok_gen, duration, tok_s);
|
|
|
|
state.db.increment_tasks(node_id);
|
|
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
|
}
|
|
|
|
if let Some(sender) = api_sender {
|
|
// API-pyyntö: reititetään vastaus suoraan odottajalle
|
|
let _ = sender.send(json.clone());
|
|
}
|
|
// UI-broadcast jatkuu normaalisti
|
|
let _ = state.stats_tx.send(json.to_string());
|
|
|
|
let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false);
|
|
let ui_sync = state.feature_flags.read().await.get("Pelimerkkien UI-synkkaus").copied().unwrap_or(false);
|
|
let mut current_balance = 0;
|
|
|
|
{
|
|
let mut task_count = state.total_tasks.lock().unwrap();
|
|
*task_count += 1;
|
|
|
|
if active_incentives && valid_task {
|
|
let mut tokens = state.nodes_tokens.lock().unwrap();
|
|
let balance = tokens.entry(node_id).or_insert(0);
|
|
*balance += 20; // Palkkio: 20 Kipinä-merkkiä
|
|
current_balance = *balance;
|
|
}
|
|
}
|
|
|
|
if active_incentives && ui_sync {
|
|
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
|
|
let msg = serde_json::json!({
|
|
"type": "token_balance",
|
|
"balance": current_balance
|
|
});
|
|
let _ = tx.send(msg.to_string());
|
|
}
|
|
}
|
|
|
|
broadcast_stats(&state).await;
|
|
}
|
|
} else if msg_type == "llm_error" {
|
|
state.node_busy.lock().unwrap().remove(&node_id);
|
|
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
if let Some(ref tid) = task_id {
|
|
state.pending_task_ids.lock().unwrap().remove(tid.as_str());
|
|
}
|
|
// Jos API-pyyntö odottaa, reititetään virhe oneshot-kanavaan
|
|
let api_sender = task_id.as_ref().and_then(|tid| {
|
|
state.pending_responses.lock().unwrap().remove(tid)
|
|
});
|
|
{
|
|
let mut json = json;
|
|
if let Some(obj) = json.as_object_mut() {
|
|
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
|
}
|
|
if let Some(sender) = api_sender {
|
|
let _ = sender.send(json.clone());
|
|
}
|
|
let _ = state.stats_tx.send(json.to_string());
|
|
}
|
|
} else if msg_type == "user_text" {
|
|
// Käyttäjän lähettämä teksti — kohdennettu reititys lähettäjäsolmulle
|
|
let text = json.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
let task_type = json.get("task_type").and_then(|v| v.as_str()).unwrap_or("tokenize");
|
|
if !text.is_empty() {
|
|
let preview: String = text.chars().take(80).collect();
|
|
tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, preview);
|
|
let msg = match task_type {
|
|
"tokenize" => serde_json::json!({
|
|
"type": "single_tokenize",
|
|
"text": text,
|
|
}),
|
|
_ => serde_json::json!({
|
|
"type": "llm_prompt",
|
|
"prompt": text,
|
|
"model": task_type,
|
|
}),
|
|
};
|
|
// Lähetetään takaisin lähettäjäsolmulle (käyttäjä haluaa oman tekstinsä tuloksen)
|
|
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
|
|
let _ = tx.send(msg.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Yhteys katkesi — merkitään session päättyneeksi ja siivotaan atomisesti
|
|
state.db.close_session(node_id);
|
|
{
|
|
// Lukitaan kaikki kerralla, jotta solmu ei ole osittain siivottu
|
|
let mut tasks = state.node_tasks.lock().unwrap();
|
|
let mut conns = state.ip_connections.lock().unwrap();
|
|
let mut ips = state.node_ips.lock().unwrap();
|
|
let mut vram = state.nodes_vram.lock().unwrap();
|
|
let mut busy = state.node_busy.lock().unwrap();
|
|
tasks.remove(&node_id);
|
|
busy.remove(&node_id);
|
|
if let Some(count) = conns.get_mut(&ip) {
|
|
*count = count.saturating_sub(1);
|
|
if *count == 0 { conns.remove(&ip); }
|
|
}
|
|
ips.remove(&node_id);
|
|
vram.remove(&node_id);
|
|
}
|
|
state.node_types.lock().unwrap().remove(&node_id);
|
|
state.node_models.write().await.remove(&node_id);
|
|
tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip);
|
|
broadcast_stats(&state).await;
|
|
sender_task.abort();
|
|
}
|
|
#[derive(serde::Deserialize)]
|
|
struct ChatCompletionRequest {
|
|
model: String,
|
|
prompt: String,
|
|
task_id: String,
|
|
#[serde(default)]
|
|
max_tokens: Option<u64>,
|
|
}
|
|
|
|
#[derive(serde::Serialize)]
|
|
struct ChatCompletionResponse {
|
|
response: String,
|
|
model: String,
|
|
tokens_generated: u64,
|
|
}
|
|
|
|
async fn api_ollama_tags(
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
) -> axum::response::Response {
|
|
// Haetaan natiivisolmun tila muistista — priorisoidaan aito verkko-solmu
|
|
let node_models = state.node_models.read().await;
|
|
if let Some((_, models_json)) = node_models.iter().next() {
|
|
return axum::Json(models_json.clone()).into_response();
|
|
}
|
|
|
|
// Fallback: Haetaan lokaalista infra-Ollamasta ohjaimesta käsin (esim dev ympäristö)
|
|
let ollama_url = std::env::var("OLLAMA_URL").unwrap_or_else(|_| "http://ollama:11434".to_string());
|
|
match reqwest::get(format!("{}/api/tags", ollama_url)).await {
|
|
Ok(resp) => {
|
|
if let Ok(body) = resp.json::<serde_json::Value>().await {
|
|
axum::Json(body).into_response()
|
|
} else {
|
|
axum::Json(serde_json::json!({ "models": [] })).into_response()
|
|
}
|
|
}
|
|
Err(_) => axum::Json(serde_json::json!({ "models": [] })).into_response(),
|
|
}
|
|
}
|
|
|
|
async fn api_hardware(
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
) -> axum::response::Response {
|
|
// Etsitään natiivisolmun GPU-tiedot sessiosta
|
|
let sessions = state.db.get_sessions(50);
|
|
let native = sessions.iter().find(|s| {
|
|
s.get("node_type").and_then(|v| v.as_str()) == Some("native")
|
|
});
|
|
|
|
let (mut vram_mb, mut gpu_name, ram_mb) = if let Some(s) = native {
|
|
// Tieto on tietokannassa litteänä
|
|
let vram = s.get("vram_total_mb").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
let name = s.get("gpu_name").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
let ram = s.get("ram_mb").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
(vram, name, ram)
|
|
} else {
|
|
(0, String::new(), 0)
|
|
};
|
|
|
|
// Fallback: kysytään Ollamalta onko malleja ladattu (= Ollama on käynnissä)
|
|
if vram_mb == 0 {
|
|
let ollama_url = std::env::var("OLLAMA_URL").unwrap_or_else(|_| "http://ollama:11434".to_string());
|
|
if let Ok(resp) = reqwest::get(format!("{}/api/tags", ollama_url)).await {
|
|
if let Ok(body) = resp.json::<serde_json::Value>().await {
|
|
let models = body["models"].as_array().map(|a| a.len()).unwrap_or(0);
|
|
if models > 0 {
|
|
gpu_name = "Ollama (GPU/CPU)".to_string();
|
|
// Natiivisolmun RAM fallbackina
|
|
vram_mb = if ram_mb > 0 { ram_mb } else { 0 };
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if gpu_name.is_empty() { gpu_name = "ei natiivisolmua".to_string(); }
|
|
|
|
axum::Json(serde_json::json!({
|
|
"gpu_name": gpu_name,
|
|
"vram_mb": vram_mb,
|
|
"ram_mb": ram_mb,
|
|
})).into_response()
|
|
}
|
|
|
|
async fn api_change_model(
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
axum::Json(payload): axum::Json<serde_json::Value>,
|
|
) -> axum::response::Response {
|
|
let model = payload.get("model").and_then(|v| v.as_str()).unwrap_or("");
|
|
if model.is_empty() {
|
|
return (axum::http::StatusCode::BAD_REQUEST, "model puuttuu").into_response();
|
|
}
|
|
tracing::info!("Mallin vaihto: {}", model);
|
|
let msg = serde_json::json!({ "type": "change_model", "model": model });
|
|
let _ = state.stats_tx.send(msg.to_string());
|
|
axum::Json(serde_json::json!({ "status": "ok", "model": model })).into_response()
|
|
}
|
|
|
|
async fn api_chat_completions(
|
|
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
|
|
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
|
axum::Json(payload): axum::Json<ChatCompletionRequest>,
|
|
) -> axum::response::Response {
|
|
// Rate limiting: max 10 pyyntöä per IP per minuutti
|
|
{
|
|
let mut limits = state.api_rate_limits.lock().unwrap();
|
|
let now = std::time::Instant::now();
|
|
let entry = limits.entry(addr.ip()).or_insert((now, 0));
|
|
if now.duration_since(entry.0).as_secs() >= 60 {
|
|
*entry = (now, 1); // Uusi ikkuna
|
|
} else {
|
|
entry.1 += 1;
|
|
if entry.1 > 30 {
|
|
return (axum::http::StatusCode::TOO_MANY_REQUESTS, "Liian monta pyyntöä — yritä minuutin kuluttua").into_response();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Etsitään vapaa solmu — priorisoidaan natiivisolmut (GPU) selaimen edelle
|
|
let (target_node, _total_matching) = {
|
|
let tasks = state.node_tasks.lock().unwrap();
|
|
let _busy = state.node_busy.lock().unwrap();
|
|
let node_types = state.node_types.lock().unwrap();
|
|
let matching: Vec<u64> = tasks.iter().filter(|(_, task)| {
|
|
// Eksakti match tai qwen-perheen yhteensopivuus (selain: qwen-coder-05b, natiivi: qwen2.5-coder:7b)
|
|
let req_model = payload.model.to_lowercase();
|
|
let node_task = task.to_lowercase();
|
|
if req_model.starts_with("qwen") {
|
|
node_task.starts_with("qwen")
|
|
} else if req_model.starts_with("phi") {
|
|
node_task.starts_with("phi")
|
|
} else {
|
|
**task == payload.model
|
|
}
|
|
}).map(|(k, _)| *k).collect();
|
|
// Etsitään mikä tahansa matchaava solmu (natiivi priorisoidaan)
|
|
let native = matching.iter().find(|id| {
|
|
node_types.get(id).map(|t| t == "native").unwrap_or(false)
|
|
}).copied();
|
|
let any = native.or_else(|| matching.first().copied());
|
|
(any, matching.len())
|
|
};
|
|
|
|
let task_id = payload.task_id.clone();
|
|
|
|
let target_node_id = match target_node {
|
|
Some(id) => id,
|
|
None => {
|
|
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei solmua tälle mallille (käynnistä malli selaimessa)").into_response();
|
|
}
|
|
};
|
|
|
|
// Reititystila UI:lle
|
|
{
|
|
let routing_msg = serde_json::json!({
|
|
"type": "task_routed",
|
|
"task_id": task_id,
|
|
"node_id": target_node_id,
|
|
"status": "routed",
|
|
"message": format!("Reititetty solmulle #{}", target_node_id),
|
|
});
|
|
let _ = state.stats_tx.send(routing_msg.to_string());
|
|
}
|
|
|
|
// Merkitään solmu varatuksi ja task_id jaetuksi
|
|
state.node_busy.lock().unwrap().insert(target_node_id);
|
|
state.pending_task_ids.lock().unwrap().insert(payload.task_id.clone());
|
|
|
|
let mut msg = serde_json::json!({
|
|
"type": "llm_prompt",
|
|
"prompt": payload.prompt,
|
|
"model": payload.model,
|
|
"task_id": payload.task_id,
|
|
});
|
|
if let Some(mt) = payload.max_tokens {
|
|
msg.as_object_mut().unwrap().insert("max_tokens".to_string(), serde_json::json!(mt));
|
|
}
|
|
|
|
// Oneshot-kanava: solmu palauttaa tuloksen suoraan tälle pyynnölle
|
|
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<serde_json::Value>();
|
|
state.pending_responses.lock().unwrap().insert(payload.task_id.clone(), resp_tx);
|
|
|
|
// Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle
|
|
{
|
|
let channels = state.node_channels.read().await;
|
|
if let Some(tx) = channels.get(&target_node_id) {
|
|
let _ = tx.send(msg.to_string());
|
|
tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model);
|
|
} else {
|
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
|
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi reitityksen aikana").into_response();
|
|
}
|
|
}
|
|
|
|
let timeout = tokio::time::timeout(std::time::Duration::from_secs(600), resp_rx).await;
|
|
|
|
match timeout {
|
|
Ok(Ok(v)) => {
|
|
if v["type"].as_str() == Some("llm_error") {
|
|
let err = v["error"].as_str().unwrap_or("Määrittelemätön virhe solmussa").to_string();
|
|
(axum::http::StatusCode::CONFLICT, err).into_response()
|
|
} else {
|
|
axum::Json(ChatCompletionResponse {
|
|
response: v["response"].as_str().unwrap_or("").to_string(),
|
|
model: v["model"].as_str().unwrap_or("").to_string(),
|
|
tokens_generated: v["tokens_generated"].as_u64().unwrap_or(0),
|
|
}).into_response()
|
|
}
|
|
}
|
|
Ok(Err(_)) => {
|
|
// Oneshot-kanava sulkeutui (solmu katosi)
|
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
|
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response()
|
|
}
|
|
Err(_) => {
|
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
|
(axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response()
|
|
}
|
|
}
|
|
}
|