eka vedos

This commit is contained in:
2026-04-01 17:54:08 +03:00
commit 46848ee027
20 changed files with 1962 additions and 0 deletions

View File

@@ -0,0 +1,15 @@
[package]
name = "hub"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = { version = "0.7.4", features = ["ws", "macros"] }
tokio = { version = "1.36.0", features = ["full"] }
tower-http = { version = "0.5.2", features = ["fs", "cors", "trace"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.7.0", features = ["v4", "serde"] }
futures = "0.3"

426
network-poc/hub/src/main.rs Normal file
View File

@@ -0,0 +1,426 @@
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
extract::ConnectInfo,
response::IntoResponse,
routing::get,
Router,
};
use futures::{sink::SinkExt, stream::StreamExt};
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tower_http::services::ServeDir;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
const MAX_MESSAGE_SIZE: usize = 16 * 1024;
// Sallitut originit — estää cross-site WebSocket hijackingin
const ALLOWED_ORIGINS: &[&str] = &[
"https://kipina.studio",
"http://localhost:3000",
"http://127.0.0.1:3000",
];
// Sallitut viestityyypit clientilta
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk"];
struct AppState {
next_node_id: Mutex<u64>,
nodes_vram: Mutex<HashMap<u64, u32>>,
total_tasks: Mutex<u64>,
stats_tx: broadcast::Sender<String>,
// IP-rajoitus: max 2 yhteyttä per IP (dashboard-UI + selainsolmu)
ip_connections: Mutex<HashMap<IpAddr, u32>>,
// Node ID → IP -mappaus (siivousta varten)
node_ips: Mutex<HashMap<u64, IpAddr>>,
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "hub=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let (stats_tx, _) = broadcast::channel(100);
let state = Arc::new(AppState {
next_node_id: Mutex::new(1),
nodes_vram: Mutex::new(HashMap::new()),
total_tasks: Mutex::new(0),
stats_tx: stats_tx.clone(),
ip_connections: Mutex::new(HashMap::new()),
node_ips: Mutex::new(HashMap::new()),
});
let state_for_task = state.clone();
// Ajastin, joka jakaa satunnaisia tekoälytehtäviä eri pituuksilla
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
// Kieliparit: sama semanttinen sisältö englanniksi ja suomeksi
let pairs: Vec<(&str, &str)> = vec![
("Tell me a joke.", "Kerro vitsi."),
("What is Rust?", "Mikä on Rust?"),
("Explain WebGPU briefly.", "Selitä WebGPU lyhyesti."),
("It was a dark and stormy night, and the old sea captain began his tale:", "Oli synkkä ja myrskyinen yö, ja vanha merikapteeni aloitti tarinansa:"),
("Artificial intelligence is transforming the world in many ways, but perhaps the most significant change is", "Tekoäly muuttaa maailmaa monella tavalla, mutta kenties merkittävin muutos on"),
("Distributed computing in the browser is a fascinating concept because", "Hajautettu laskenta selaimessa on kiehtova konsepti, koska"),
("By the year 2030, programmers will no longer write code by hand, instead they will", "Vuonna 2030 ohjelmoijat eivät enää kirjoita koodia käsin, vaan he"),
("Imagine a world where every computer, phone, and tablet combines its processing power into one vast AI network. This future is closer than you think, because", "Kuvittele maailma, jossa jokainen tietokone, puhelin ja tabletti yhdistää prosessointivoimansa yhdeksi valtavaksi tekoälyverkoksi. Tämä tulevaisuus on lähempänä kuin uskotkaan, sillä"),
("The open source movement has fundamentally changed how software is built. What started as a fringe philosophy has become the backbone of modern infrastructure, and the next frontier is", "Avoimen lähdekoodin liike on muuttanut perustavanlaatuisesti ohjelmistojen rakentamisen. Marginaalisesta filosofiasta on tullut modernin infrastruktuurin selkäranka, ja seuraava rajapyykki on"),
];
let mut rng_state: u64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
loop {
interval.tick().await;
rng_state ^= rng_state << 13;
rng_state ^= rng_state >> 7;
rng_state ^= rng_state << 17;
let idx = (rng_state as usize) % pairs.len();
let (en, fi) = pairs[idx];
let task_msg = serde_json::json!({
"type": "pair_task",
"en": en,
"fi": fi,
});
tracing::debug!("Kielipari lähetetty: EN({}) vs FI({} merkkiä)", en.len(), fi.len());
let _ = state_for_task.stats_tx.send(task_msg.to_string());
}
});
let app = Router::new()
.nest_service("/", ServeDir::new("../static"))
.route("/ws", get(ws_handler))
.with_state(state);
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
tracing::debug!("Kipinä Agent Hub käynnistyy osoitteessa http://localhost:3000");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
).await.unwrap();
}
async fn ws_handler(
ws: WebSocketUpgrade,
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
headers: axum::http::HeaderMap,
) -> impl IntoResponse {
// Origin-tarkistus — estää cross-site WebSocket hijackingin
if let Some(origin) = headers.get("origin").and_then(|v| v.to_str().ok()) {
if !ALLOWED_ORIGINS.iter().any(|&allowed| origin == allowed) {
tracing::warn!("Estetty yhteys väärällä originilla: {}", origin);
return (
axum::http::StatusCode::FORBIDDEN,
"Origin not allowed",
).into_response();
}
}
// Origin puuttuu → natiivi-node (ei selainta), sallitaan
let ip = headers.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.split(',').next())
.and_then(|s| s.trim().parse::<IpAddr>().ok())
.unwrap_or_else(|| addr.ip());
// Max 2 yhteyttä per IP (dashboard-UI + selainsolmu)
{
let conns = state.ip_connections.lock().unwrap();
let count = conns.get(&ip).copied().unwrap_or(0);
if count >= 2 {
tracing::warn!("IP {} ylitti yhteysrajan ({}/2) — estetty", ip, count);
return (
axum::http::StatusCode::TOO_MANY_REQUESTS,
"Max 2 yhteyttä per IP",
).into_response();
}
}
ws.max_message_size(MAX_MESSAGE_SIZE)
.on_upgrade(move |socket| handle_socket(socket, state, ip))
.into_response()
}
async fn broadcast_stats(state: &Arc<AppState>) {
let total_nodes;
let mut total_vram = 0;
{
let map = state.nodes_vram.lock().unwrap();
total_nodes = map.len();
for (_, vram) in map.iter() {
total_vram += vram;
}
}
let completed = *state.total_tasks.lock().unwrap();
let stats_msg = serde_json::json!({
"type": "stats",
"nodes": total_nodes,
"vram_gb": total_vram,
"tasks": completed
});
let _ = state.stats_tx.send(stats_msg.to_string());
}
/// Validoi client-viesti: pakollinen "type"-kenttä, sallittu tyyppi, validi JSON
fn validate_message(text: &str) -> Result<serde_json::Value, &'static str> {
let json: serde_json::Value = serde_json::from_str(text)
.map_err(|_| "Ei validi JSON")?;
let msg_type = json.get("type")
.and_then(|v| v.as_str())
.ok_or("Puuttuva 'type'-kenttä")?;
if !ALLOWED_MSG_TYPES.contains(&msg_type) {
return Err("Tuntematon viestityyppi");
}
// Tyyppikohtainen validointi
match msg_type {
"auth" => {
// allocated_gb pitää olla järkevä (0-128)
if let Some(gb) = json.get("allocated_gb").and_then(|v| v.as_u64()) {
if gb > 128 { return Err("allocated_gb liian suuri"); }
}
}
"pair_done" => {
// Pitää sisältää en ja fi -objektit
if json.get("en").is_none() || json.get("fi").is_none() {
return Err("pair_done: puuttuu en/fi");
}
// token_count pitää olla järkevä
for lang in &["en", "fi"] {
if let Some(tc) = json.get(lang).and_then(|l| l.get("token_count")).and_then(|v| v.as_u64()) {
if tc > 10000 { return Err("token_count liian suuri"); }
}
}
}
"result" => {
// data-kenttä pitää olla olemassa
if json.get("data").is_none() && json.get("status").is_none() {
return Err("result: puuttuu data/status");
}
}
_ => {}
}
Ok(json)
}
async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
// Rekisteröidään IP-yhteys
{
let mut conns = state.ip_connections.lock().unwrap();
*conns.entry(ip).or_insert(0) += 1;
}
let (mut sender, mut receiver) = socket.split();
let node_id = {
let mut next_id = state.next_node_id.lock().unwrap();
let id = *next_id;
*next_id += 1;
id
};
// Tallennetaan node_id → IP -mappaus
{
state.node_ips.lock().unwrap().insert(node_id, ip);
}
tracing::info!("Solmu {} yhdistyi osoitteesta {}", node_id, ip);
let mut rx = state.stats_tx.subscribe();
let sender_task = tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(msg) => {
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(_) => {
break;
}
}
}
});
// Receiver loop
while let Some(Ok(msg)) = receiver.next().await {
let text = match msg {
Message::Text(t) => t,
Message::Close(_) => break,
_ => continue,
};
if text.len() > MAX_MESSAGE_SIZE {
tracing::warn!("Solmu {} ({}) lähetti liian suuren viestin ({} tavua)", node_id, ip, text.len());
continue;
}
// Validointi
let json = match validate_message(&text) {
Ok(j) => j,
Err(reason) => {
tracing::warn!("Solmu {} ({}) lähetti virheellisen viestin: {} — {:?}", node_id, ip, reason, &text[..text.len().min(100)]);
continue;
}
};
let msg_type = json.get("type").and_then(|v| v.as_str()).unwrap_or("");
if msg_type == "auth" {
{
let allocated = json.get("allocated_gb").and_then(|v| v.as_u64()).unwrap_or(4) as u32;
let node_type = json.get("node_type").and_then(|v| v.as_str()).unwrap_or("browser");
{
let mut map = state.nodes_vram.lock().unwrap();
map.insert(node_id, allocated);
}
if node_type == "native" {
let sys = json.get("system");
let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()).unwrap_or("?");
let os = sys.and_then(|s| s.get("os")).and_then(|v| v.as_str()).unwrap_or("?");
let cores = sys.and_then(|s| s.get("cpu_cores")).and_then(|v| v.as_u64()).unwrap_or(0);
let ram = sys.and_then(|s| s.get("ram_total_mb")).and_then(|v| v.as_u64()).unwrap_or(0);
tracing::info!(
"Solmu {} (natiivi) | {} | {} | {} | {} ydintä | {} MB RAM | varaus: {} GB",
node_id, ip, hostname, os, cores, ram, allocated
);
if let Some(gpus) = json.get("gpus").and_then(|v| v.as_array()) {
for gpu in gpus {
tracing::info!(
" GPU {}: {} | VRAM: {}/{} MB | {}°C | {}%",
gpu["index"].as_u64().unwrap_or(0),
gpu["name"].as_str().unwrap_or("?"),
gpu["vram_used_mb"].as_u64().unwrap_or(0),
gpu["vram_total_mb"].as_u64().unwrap_or(0),
gpu["temperature_c"].as_u64().unwrap_or(0),
gpu["gpu_util_pct"].as_u64().unwrap_or(0),
);
}
}
} else {
let cores = json.get("cpu_cores").and_then(|v| v.as_u64()).unwrap_or(0);
let ram = json.get("device_memory_gb").and_then(|v| v.as_f64()).unwrap_or(0.0);
let platform = json.get("platform").and_then(|v| v.as_str()).unwrap_or("?");
let gpu_desc = json.get("gpu")
.and_then(|g| g.get("description").or_else(|| g.get("vendor")))
.and_then(|v| v.as_str())
.unwrap_or("ei GPU:ta");
tracing::info!(
"Solmu {} (selain) | {} | {} | {} ydintä | ~{} GB RAM | GPU: {} | varaus: {} GB",
node_id, ip, platform, cores, ram, gpu_desc, allocated
);
}
}
broadcast_stats(&state).await;
let join_msg = serde_json::json!({
"type": "node_joined",
"node_id": node_id
});
let _ = state.stats_tx.send(join_msg.to_string());
} else if msg_type == "result" {
tracing::info!("Solmu {} sai tuloksen: {}", node_id, text);
{
let mut task_count = state.total_tasks.lock().unwrap();
*task_count += 1;
}
broadcast_stats(&state).await;
} else if msg_type == "pair_done" {
{
let mut json = json; // Siirretään omistajuus muokkausta varten
if let Some(obj) = json.as_object_mut() {
let empty = serde_json::json!({});
let en = obj.get("en").unwrap_or(&empty);
let fi = obj.get("fi").unwrap_or(&empty);
let overhead = obj.get("overhead_pct").and_then(|v| v.as_f64()).unwrap_or(0.0);
let duration = obj.get("duration_ms").and_then(|v| v.as_u64()).unwrap_or(0);
let en_text = en.get("text").and_then(|v| v.as_str()).unwrap_or("");
let en_tokens = en.get("token_count").and_then(|v| v.as_u64()).unwrap_or(0);
let en_chars = en.get("char_count").and_then(|v| v.as_u64()).unwrap_or(0);
let en_cpt = en.get("chars_per_token").and_then(|v| v.as_f64()).unwrap_or(0.0);
let fi_text = fi.get("text").and_then(|v| v.as_str()).unwrap_or("");
let fi_tokens = fi.get("token_count").and_then(|v| v.as_u64()).unwrap_or(0);
let fi_chars = fi.get("char_count").and_then(|v| v.as_u64()).unwrap_or(0);
let fi_cpt = fi.get("chars_per_token").and_then(|v| v.as_f64()).unwrap_or(0.0);
let overhead_color = if overhead > 10.0 { "\x1b[31m" } else if overhead < -10.0 { "\x1b[32m" } else { "\x1b[33m" };
println!();
println!("\x1b[36m━━━ Solmu {} ━━━ {}ms ━━━\x1b[0m", node_id, duration);
println!(" \x1b[34mEN\x1b[0m \"{}\"", en_text);
println!(" {} merkkiä → \x1b[35m{} tokenia\x1b[0m | \x1b[32m{:.2} merkkiä/token\x1b[0m", en_chars, en_tokens, en_cpt);
println!(" \x1b[33mFI\x1b[0m \"{}\"", fi_text);
println!(" {} merkkiä → \x1b[35m{} tokenia\x1b[0m | \x1b[32m{:.2} merkkiä/token\x1b[0m", fi_chars, fi_tokens, fi_cpt);
println!(" {}Suomen ylikustannus: {:+.1}%\x1b[0m", overhead_color, overhead);
obj.insert("node_id".to_string(), serde_json::json!(node_id));
}
let _ = state.stats_tx.send(json.to_string());
{
let mut task_count = state.total_tasks.lock().unwrap();
*task_count += 1;
}
broadcast_stats(&state).await;
}
} else if msg_type == "llm_chunk" {
{
let mut json = json;
if let Some(obj) = json.as_object_mut() {
obj.insert("node_id".to_string(), serde_json::json!(node_id));
}
let _ = state.stats_tx.send(json.to_string());
}
}
}
// Yhteys katkesi — siivotaan IP-laskuri ja node-tiedot
{
let mut conns = state.ip_connections.lock().unwrap();
if let Some(count) = conns.get_mut(&ip) {
*count = count.saturating_sub(1);
if *count == 0 {
conns.remove(&ip);
}
}
}
{
state.node_ips.lock().unwrap().remove(&node_id);
}
{
state.nodes_vram.lock().unwrap().remove(&node_id);
}
tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip);
broadcast_stats(&state).await;
sender_task.abort();
}