Päivitetty juttuja

This commit is contained in:
Jaakko Vanhala
2026-04-04 21:13:20 +03:00
parent 2e7ddf6f1e
commit 3ada8949d0
11 changed files with 457 additions and 105 deletions

View File

@@ -25,13 +25,17 @@ const ALLOWED_ORIGINS: &[&str] = &[
];
// Sallitut viestityyypit clientilta
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "download_progress", "user_text", "single_tokenize_done"];
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "llm_error", "download_progress", "user_text", "single_tokenize_done"];
struct AppState {
next_node_id: Mutex<u64>,
nodes_vram: Mutex<HashMap<u64, u32>>,
nodes_tokens: Mutex<HashMap<u64, u32>>, // Gamification: Kipinä Tokens
total_tasks: Mutex<u64>,
stats_tx: broadcast::Sender<String>,
node_channels: tokio::sync::RwLock<HashMap<u64, tokio::sync::mpsc::UnboundedSender<String>>>, // Kohdennettu reititys
pending_consensus: tokio::sync::RwLock<HashMap<String, Vec<serde_json::Value>>>, // Proof of Compute -konsensus
feature_flags: tokio::sync::RwLock<HashMap<String, bool>>, // Tuntee TODO.md:n ruksit lennosta
ip_connections: Mutex<HashMap<IpAddr, u32>>,
node_ips: Mutex<HashMap<u64, IpAddr>>,
node_tasks: Mutex<HashMap<u64, String>>, // node_id → selected_task
@@ -244,8 +248,12 @@ async fn main() {
let state = Arc::new(AppState {
next_node_id: Mutex::new(1),
nodes_vram: Mutex::new(HashMap::new()),
nodes_tokens: Mutex::new(HashMap::new()),
total_tasks: Mutex::new(0),
stats_tx: stats_tx.clone(),
node_channels: tokio::sync::RwLock::new(HashMap::new()),
pending_consensus: tokio::sync::RwLock::new(HashMap::new()),
feature_flags: tokio::sync::RwLock::new(HashMap::new()),
ip_connections: Mutex::new(HashMap::new()),
node_ips: Mutex::new(HashMap::new()),
node_tasks: Mutex::new(HashMap::new()),
@@ -254,6 +262,34 @@ async fn main() {
tracing::info!("Tietokanta alustettu");
let state_for_watcher = state.clone();
tokio::spawn(async move {
// Ensimmäinen luku heti, sitten 3s välein
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(3));
let file_path = std::env::var("FEATURE_FLAGS_FILE").unwrap_or_else(|_| "../TODO.md".to_string());
loop {
interval.tick().await;
if let Ok(content) = tokio::fs::read_to_string(&file_path).await {
let mut flags = HashMap::new();
for line in content.lines() {
if line.starts_with("- [ ] **") || line.starts_with("- [x] **") {
let is_active = line.starts_with("- [x]");
if let Some(start_idx) = line.find("**") {
let start = start_idx + 2;
if let Some(end_idx) = line[start..].find("**") {
let end = end_idx + start;
let feature_name = line[start..end].trim_end_matches(':').trim().to_string();
flags.insert(feature_name, is_active);
}
}
}
}
*state_for_watcher.feature_flags.write().await = flags;
}
}
});
let state_for_task = state.clone();
// Ajastin, joka jakaa satunnaisia tekoälytehtäviä eri pituuksilla
@@ -376,7 +412,9 @@ async fn api_stats(
) -> axum::response::Response {
if !check_admin_auth(&headers) { return admin_unauthorized(); }
let mut stats = state.db.get_stats();
stats.as_object_mut().unwrap().insert("version".to_string(), serde_json::json!(env!("CARGO_PKG_VERSION")));
if let Some(obj) = stats.as_object_mut() {
obj.insert("version".to_string(), serde_json::json!(env!("CARGO_PKG_VERSION")));
}
axum::Json(stats).into_response()
}
@@ -555,22 +593,28 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
tracing::info!("Solmu {} yhdistyi osoitteesta {}", node_id, ip);
let (node_tx, mut node_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
// Tallennetaan node channel reititystä varten
{
state.node_channels.write().await.insert(node_id, node_tx);
}
// Yksinkertaistettu broadcast tx vastaanotto
let mut rx = state.stats_tx.subscribe();
let sender_task = tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(msg) => {
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
tokio::select! {
Ok(msg) = rx.recv() => {
if sender.send(Message::Text(msg)).await.is_err() { break; }
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(_) => {
break;
Some(direct_msg) = node_rx.recv() => {
// E2E Encrypt placeholder - tähän tulisi kyseisen the_node_id:n asymmetrisen avaimen salaus
// let encrypted_msg = encrypt_e2e(direct_msg, node_public_key);
if sender.send(Message::Text(direct_msg)).await.is_err() { break; }
}
else => break,
}
}
});
@@ -592,7 +636,8 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
let json = match validate_message(&text) {
Ok(j) => j,
Err(reason) => {
tracing::warn!("Solmu {} ({}) lähetti virheellisen viestin: {} — {:?}", node_id, ip, reason, &text[..text.len().min(100)]);
let preview: String = text.chars().take(100).collect();
tracing::warn!("Solmu {} ({}) lähetti virheellisen viestin: {} — {:?}", node_id, ip, reason, preview);
continue;
}
};
@@ -722,10 +767,32 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
}
let _ = state.stats_tx.send(json.to_string());
let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false);
let ui_sync = state.feature_flags.read().await.get("Pelimerkkien UI-synkkaus").copied().unwrap_or(false);
let mut current_balance = 0;
{
let mut task_count = state.total_tasks.lock().unwrap();
*task_count += 1;
if active_incentives {
let mut tokens = state.nodes_tokens.lock().unwrap();
let balance = tokens.entry(node_id).or_insert(0);
*balance += 5; // Palkkio: 5 Kipinä-merkkiä
current_balance = *balance;
}
}
if active_incentives && ui_sync {
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
let msg = serde_json::json!({
"type": "token_balance",
"balance": current_balance
});
let _ = tx.send(msg.to_string());
}
}
broadcast_stats(&state).await;
}
} else if msg_type == "single_tokenize_done" {
@@ -766,18 +833,49 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
}
let _ = state.stats_tx.send(json.to_string());
let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false);
let ui_sync = state.feature_flags.read().await.get("Pelimerkkien UI-synkkaus").copied().unwrap_or(false);
let mut current_balance = 0;
{
let mut task_count = state.total_tasks.lock().unwrap();
*task_count += 1;
if active_incentives {
let mut tokens = state.nodes_tokens.lock().unwrap();
let balance = tokens.entry(node_id).or_insert(0);
*balance += 20; // Palkkio: 20 Kipinä-merkkiä
current_balance = *balance;
}
}
if active_incentives && ui_sync {
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
let msg = serde_json::json!({
"type": "token_balance",
"balance": current_balance
});
let _ = tx.send(msg.to_string());
}
}
broadcast_stats(&state).await;
}
} else if msg_type == "llm_error" {
{
let mut json = json;
if let Some(obj) = json.as_object_mut() {
obj.insert("node_id".to_string(), serde_json::json!(node_id));
}
let _ = state.stats_tx.send(json.to_string());
}
} else if msg_type == "user_text" {
// Käyttäjän lähettämä teksti — broadcastataan pair_taskina ja llm_promptina
let text = json.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string();
let task_type = json.get("task_type").and_then(|v| v.as_str()).unwrap_or("tokenize");
if !text.is_empty() {
tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, &text[..text.len().min(80)]);
let preview: String = text.chars().take(80).collect();
tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, preview);
match task_type {
"tokenize" => {
let msg = serde_json::json!({
@@ -787,15 +885,13 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
let _ = state.stats_tx.send(msg.to_string());
}
_ => {
// LLM-prompti
for model in &["smollm-135m", "qwen-05b", "phi3-mini", "qwen-coder"] {
let prompt = serde_json::json!({
"type": "llm_prompt",
"prompt": text,
"model": model,
});
let _ = state.stats_tx.send(prompt.to_string());
}
// LLM-prompti: lähetetään VAIN valitulle mallille, ei kaikille (välttää turhaa ruuhkaa ja busy-tiloja)
let prompt = serde_json::json!({
"type": "llm_prompt",
"prompt": text,
"model": task_type,
});
let _ = state.stats_tx.send(prompt.to_string());
}
}
}
@@ -842,6 +938,26 @@ async fn api_chat_completions(
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
axum::Json(payload): axum::Json<ChatCompletionRequest>,
) -> axum::response::Response {
// Etsitään ensimmäinen vapaa solmu, joka vastaa pyydettyä mallia
let target_node = {
let tasks = state.node_tasks.lock().unwrap();
tasks.iter().find(|(_, task)| {
if payload.model == "qwen-coder" {
*task == "qwen-coder-05b" || *task == "qwen-coder"
} else {
**task == payload.model
}
}).map(|(k, _)| *k)
};
let target_node_id = match target_node {
Some(id) => id,
None => {
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Ei vapaata solmua tälle mallille (Käynnistä malli selaimessa)").into_response();
}
};
let msg = serde_json::json!({
"type": "llm_prompt",
"prompt": payload.prompt,
@@ -849,8 +965,19 @@ async fn api_chat_completions(
"task_id": payload.task_id,
});
// Odotuskanava valmiiksi (solmu palauttaa tuloksen stats_tx kautta)
let mut rx = state.stats_tx.subscribe();
let _ = state.stats_tx.send(msg.to_string());
// Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle (Reititysarkkitehtuuri)
{
let channels = state.node_channels.read().await;
if let Some(tx) = channels.get(&target_node_id) {
let _ = tx.send(msg.to_string());
tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model);
} else {
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi pyynnön aikana").into_response();
}
}
let timeout = tokio::time::timeout(std::time::Duration::from_secs(120), async move {
while let Ok(msg_str) = rx.recv().await {
@@ -858,22 +985,29 @@ async fn api_chat_completions(
if v["type"].as_str() == Some("llm_done") {
if let Some(tid) = v["task_id"].as_str() {
if tid == payload.task_id {
return Some(ChatCompletionResponse {
return Ok(Some(ChatCompletionResponse {
response: v["response"].as_str().unwrap_or("").to_string(),
model: v["model"].as_str().unwrap_or("").to_string(),
tokens_generated: v["tokens_generated"].as_u64().unwrap_or(0),
});
}));
}
}
} else if v["type"].as_str() == Some("llm_error") {
if let Some(tid) = v["task_id"].as_str() {
if tid == payload.task_id {
return Err(v["error"].as_str().unwrap_or("Määrittelemätön virhe solmussa").to_string());
}
}
}
}
}
None
Ok(None)
}).await;
match timeout {
Ok(Some(res)) => axum::Json(res).into_response(),
Ok(None) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response(),
Err(_) => (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: yksikään solmu ei vastannut 120s sisällä").into_response(),
Ok(Ok(Some(res))) => axum::Json(res).into_response(),
Ok(Ok(None)) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response(),
Ok(Err(err)) => (axum::http::StatusCode::CONFLICT, err).into_response(),
Err(_) => (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response(),
}
}