Bugikorjaukset ja tietoturvaparannukset: broadcast lag, busy-reititys, rate limiting, gamification-validointi, XSS, base64, DOM-vuoto

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jaakko Vanhala
2026-04-05 08:25:13 +03:00
parent 12fd3c4eae
commit 57c6506f91
4 changed files with 140 additions and 50 deletions

View File

@@ -15,3 +15,4 @@ uuid = { version = "1.7.0", features = ["v4", "serde"] }
futures = "0.3"
rusqlite = { version = "0.31", features = ["bundled"] }
chrono = "0.4"
base64 = "0.22"

View File

@@ -39,6 +39,9 @@ struct AppState {
ip_connections: Mutex<HashMap<IpAddr, u32>>,
node_ips: Mutex<HashMap<u64, IpAddr>>,
node_tasks: Mutex<HashMap<u64, String>>, // node_id → selected_task
node_busy: Mutex<std::collections::HashSet<u64>>, // Solmut joilla on aktiivinen tehtävä
pending_task_ids: Mutex<std::collections::HashSet<String>>, // Hubin jakamat task_id:t (gamification-validointi)
api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä)
db: db::NodeDb,
}
@@ -257,6 +260,9 @@ async fn main() {
ip_connections: Mutex::new(HashMap::new()),
node_ips: Mutex::new(HashMap::new()),
node_tasks: Mutex::new(HashMap::new()),
node_busy: Mutex::new(std::collections::HashSet::new()),
pending_task_ids: Mutex::new(std::collections::HashSet::new()),
api_rate_limits: Mutex::new(HashMap::new()),
db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())),
});
@@ -419,15 +425,23 @@ async fn api_stats(
}
fn check_admin_auth(headers: &axum::http::HeaderMap) -> bool {
let password = std::env::var("ADMIN_PASSWORD").unwrap_or_else(|_| "kipina".to_string());
let password = match std::env::var("ADMIN_PASSWORD") {
Ok(p) if !p.is_empty() => p,
_ => {
tracing::warn!("ADMIN_PASSWORD ei ole asetettu — käytetään oletusta 'kipina' (ÄLÄ käytä tuotannossa!)");
"kipina".to_string()
}
};
if let Some(auth) = headers.get("authorization").and_then(|v| v.to_str().ok()) {
if auth.starts_with("Basic ") {
if let Ok(decoded) = String::from_utf8(
base64_decode(auth.trim_start_matches("Basic ").trim())
) {
// Tarkistetaan "user:password" — käyttäjänimi ei väliä
if let Some(pass) = decoded.split(':').nth(1) {
return pass == password;
use base64::Engine;
if let Ok(decoded_bytes) = base64::engine::general_purpose::STANDARD
.decode(auth.trim_start_matches("Basic ").trim())
{
if let Ok(decoded) = String::from_utf8(decoded_bytes) {
if let Some(pass) = decoded.split(':').nth(1) {
return pass == password;
}
}
}
}
@@ -435,20 +449,6 @@ fn check_admin_auth(headers: &axum::http::HeaderMap) -> bool {
false
}
fn base64_decode(input: &str) -> Vec<u8> {
// Yksinkertainen base64-dekooderi
const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut out = Vec::new();
let bytes: Vec<u8> = input.bytes().filter(|&b| b != b'=').collect();
for chunk in bytes.chunks(4) {
let vals: Vec<u8> = chunk.iter().filter_map(|&b| TABLE.iter().position(|&t| t == b).map(|p| p as u8)).collect();
if vals.len() >= 2 { out.push((vals[0] << 2) | (vals[1] >> 4)); }
if vals.len() >= 3 { out.push((vals[1] << 4) | (vals[2] >> 2)); }
if vals.len() >= 4 { out.push((vals[2] << 6) | vals[3]); }
}
out
}
fn admin_unauthorized() -> axum::response::Response {
axum::response::Response::builder()
.status(401)
@@ -606,12 +606,19 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
let sender_task = tokio::spawn(async move {
loop {
tokio::select! {
Ok(msg) = rx.recv() => {
if sender.send(Message::Text(msg)).await.is_err() { break; }
result = rx.recv() => {
match result {
Ok(msg) => {
if sender.send(Message::Text(msg)).await.is_err() { break; }
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::debug!("Broadcast lagged {} viestiä — ohitetaan", n);
continue;
}
Err(_) => break, // Kanava suljettu
}
}
Some(direct_msg) = node_rx.recv() => {
// E2E Encrypt placeholder - tähän tulisi kyseisen the_node_id:n asymmetrisen avaimen salaus
// let encrypted_msg = encrypt_e2e(direct_msg, node_public_key);
if sender.send(Message::Text(direct_msg)).await.is_err() { break; }
}
else => break,
@@ -812,6 +819,13 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
let _ = state.stats_tx.send(json.to_string());
}
} else if msg_type == "llm_done" {
// Vapautetaan solmu ja tarkistetaan task_id:n aitous
state.node_busy.lock().unwrap().remove(&node_id);
let valid_task = if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) {
state.pending_task_ids.lock().unwrap().remove(tid)
} else {
false
};
{
let mut json = json;
if let Some(obj) = json.as_object_mut() {
@@ -841,7 +855,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
let mut task_count = state.total_tasks.lock().unwrap();
*task_count += 1;
if active_incentives {
if active_incentives && valid_task {
let mut tokens = state.nodes_tokens.lock().unwrap();
let balance = tokens.entry(node_id).or_insert(0);
*balance += 20; // Palkkio: 20 Kipinä-merkkiä
@@ -862,6 +876,10 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
broadcast_stats(&state).await;
}
} else if msg_type == "llm_error" {
state.node_busy.lock().unwrap().remove(&node_id);
if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) {
state.pending_task_ids.lock().unwrap().remove(tid);
}
{
let mut json = json;
if let Some(obj) = json.as_object_mut() {
@@ -898,23 +916,23 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
}
}
// Yhteys katkesi — merkitään session päättyneeksi ja siivotaan
// Yhteys katkesi — merkitään session päättyneeksi ja siivotaan atomisesti
state.db.close_session(node_id);
state.node_tasks.lock().unwrap().remove(&node_id);
{
// Lukitaan kaikki kerralla, jotta solmu ei ole osittain siivottu
let mut tasks = state.node_tasks.lock().unwrap();
let mut conns = state.ip_connections.lock().unwrap();
let mut ips = state.node_ips.lock().unwrap();
let mut vram = state.nodes_vram.lock().unwrap();
let mut busy = state.node_busy.lock().unwrap();
tasks.remove(&node_id);
busy.remove(&node_id);
if let Some(count) = conns.get_mut(&ip) {
*count = count.saturating_sub(1);
if *count == 0 {
conns.remove(&ip);
}
if *count == 0 { conns.remove(&ip); }
}
}
{
state.node_ips.lock().unwrap().remove(&node_id);
}
{
state.nodes_vram.lock().unwrap().remove(&node_id);
ips.remove(&node_id);
vram.remove(&node_id);
}
tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip);
broadcast_stats(&state).await;
@@ -936,28 +954,49 @@ struct ChatCompletionResponse {
async fn api_chat_completions(
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
axum::Json(payload): axum::Json<ChatCompletionRequest>,
) -> axum::response::Response {
// Etsitään ensimmäinen vapaa solmu, joka vastaa pyydettyä mallia
// Rate limiting: max 10 pyyntöä per IP per minuutti
{
let mut limits = state.api_rate_limits.lock().unwrap();
let now = std::time::Instant::now();
let entry = limits.entry(addr.ip()).or_insert((now, 0));
if now.duration_since(entry.0).as_secs() >= 60 {
*entry = (now, 1); // Uusi ikkuna
} else {
entry.1 += 1;
if entry.1 > 10 {
return (axum::http::StatusCode::TOO_MANY_REQUESTS, "Liian monta pyyntöä — yritä minuutin kuluttua").into_response();
}
}
}
// Etsitään ensimmäinen VAPAA solmu, joka vastaa pyydettyä mallia
let target_node = {
let tasks = state.node_tasks.lock().unwrap();
tasks.iter().find(|(_, task)| {
if payload.model == "qwen-coder" {
let busy = state.node_busy.lock().unwrap();
tasks.iter().find(|(node_id, task)| {
let model_match = if payload.model == "qwen-coder" {
*task == "qwen-coder-05b" || *task == "qwen-coder"
} else {
**task == payload.model
}
};
model_match && !busy.contains(node_id)
}).map(|(k, _)| *k)
};
let target_node_id = match target_node {
Some(id) => id,
None => {
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (Käynnistä malli selaimessa)").into_response();
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (kaikki varattuja tai ei käynnissä)").into_response();
}
};
// Merkitään solmu varatuksi ja task_id jaetuksi
state.node_busy.lock().unwrap().insert(target_node_id);
state.pending_task_ids.lock().unwrap().insert(payload.task_id.clone());
let msg = serde_json::json!({
"type": "llm_prompt",
"prompt": payload.prompt,
@@ -980,7 +1019,15 @@ async fn api_chat_completions(
}
let timeout = tokio::time::timeout(std::time::Duration::from_secs(120), async move {
while let Ok(msg_str) = rx.recv().await {
loop {
let msg_str = match rx.recv().await {
Ok(msg) => msg,
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::debug!("API-kanava lagged {} viestiä", n);
continue;
}
Err(_) => return Ok(None), // Kanava suljettu
};
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&msg_str) {
if v["type"].as_str() == Some("llm_done") {
if let Some(tid) = v["task_id"].as_str() {
@@ -1001,6 +1048,7 @@ async fn api_chat_completions(
}
}
}
#[allow(unreachable_code)]
Ok(None)
}).await;

View File

@@ -2066,7 +2066,7 @@
div.style.color = '#a5d6ff';
div.innerHTML = `${model} <span style="color:#8b949e">${tokGen} tok | ${durMs}ms | ${tokS} tok/s</span>`;
term.appendChild(div);
while (term.children.length > 50) term.removeChild(term.firstChild);
while (term.children.length > 50 && !term.firstChild.querySelector('.stream-content')) term.removeChild(term.firstChild);
term.scrollTop = term.scrollHeight;
document.querySelectorAll('.avatar-card').forEach(c => c.classList.remove('active'));
@@ -2192,7 +2192,7 @@
div.className = 'terminal-line';
div.innerHTML = `<span class="terminal-prompt">$</span> kpn run ${model} <span style="color:#8b949e">"${promptShort}"</span>`;
term.appendChild(div);
while (term.children.length > 50) term.removeChild(term.firstChild);
while (term.children.length > 50 && !term.firstChild.querySelector('.stream-content')) term.removeChild(term.firstChild);
term.scrollTop = term.scrollHeight;
}
}
@@ -2453,10 +2453,17 @@
if (msg.includes('[Coder]') && msg.includes('model') && msg.includes('tallennettu')) { setStep('step-model', 'done', '100%'); }
if (msg.includes('[Coder]') && msg.includes('Rakennetaan')) { setStep('step-build', 'active'); }
if (msg.includes('[Coder]') && msg.includes('Malli ladattu')) {
// Malli on valmis — merkataan kaikki vaiheet valmiiksi (myös cache-hitillä)
// Malli on valmis — merkataan kaikki vaiheet valmiiksi
setStep('step-wasm', 'done');
setStep('step-tokenizer', 'done');
setStep('step-model', 'done', 'cache');
const pctSpan = document.getElementById('step-model-pct');
if (pctSpan && pctSpan.textContent.includes('100%')) {
setStep('step-model', 'done', '100%');
} else {
setStep('step-model', 'done', 'cache');
}
setStep('step-build', 'done');
setStep('step-ready', 'done');
}
@@ -2519,7 +2526,9 @@
coderWsReady = true;
if (pendingCodePrompt) {
sendCodeToHub(pendingCodePrompt);
setTimeout(() => {
sendCodeToHub(pendingCodePrompt);
}, 800);
pendingCodePrompt = null;
}
} catch(e) {