Hub: broadcast-viestittely korvattu kohdennetulla reitityksellä
API-vastaukset käyttävät nyt oneshot-kanavaa broadcast-suodatuksen sijaan, ja user_text lähetetään vain lähettäjäsolmulle. Stats-broadcast säilyy UI:lle ja adminille. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -42,6 +42,7 @@ struct AppState {
|
|||||||
node_types: Mutex<HashMap<u64, String>>, // node_id → "native" | "browser"
|
node_types: Mutex<HashMap<u64, String>>, // node_id → "native" | "browser"
|
||||||
node_busy: Mutex<std::collections::HashSet<u64>>, // Solmut joilla on aktiivinen tehtävä
|
node_busy: Mutex<std::collections::HashSet<u64>>, // Solmut joilla on aktiivinen tehtävä
|
||||||
pending_task_ids: Mutex<std::collections::HashSet<String>>, // Hubin jakamat task_id:t (gamification-validointi)
|
pending_task_ids: Mutex<std::collections::HashSet<String>>, // Hubin jakamat task_id:t (gamification-validointi)
|
||||||
|
pending_responses: Mutex<HashMap<String, tokio::sync::oneshot::Sender<serde_json::Value>>>, // task_id → oneshot API-vastaukselle
|
||||||
api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä)
|
api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä)
|
||||||
db: db::NodeDb,
|
db: db::NodeDb,
|
||||||
}
|
}
|
||||||
@@ -264,6 +265,7 @@ async fn main() {
|
|||||||
node_types: Mutex::new(HashMap::new()),
|
node_types: Mutex::new(HashMap::new()),
|
||||||
node_busy: Mutex::new(std::collections::HashSet::new()),
|
node_busy: Mutex::new(std::collections::HashSet::new()),
|
||||||
pending_task_ids: Mutex::new(std::collections::HashSet::new()),
|
pending_task_ids: Mutex::new(std::collections::HashSet::new()),
|
||||||
|
pending_responses: Mutex::new(HashMap::new()),
|
||||||
api_rate_limits: Mutex::new(HashMap::new()),
|
api_rate_limits: Mutex::new(HashMap::new()),
|
||||||
db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())),
|
db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())),
|
||||||
});
|
});
|
||||||
@@ -875,11 +877,18 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
} else if msg_type == "llm_done" {
|
} else if msg_type == "llm_done" {
|
||||||
// Vapautetaan solmu ja tarkistetaan task_id:n aitous
|
// Vapautetaan solmu ja tarkistetaan task_id:n aitous
|
||||||
state.node_busy.lock().unwrap().remove(&node_id);
|
state.node_busy.lock().unwrap().remove(&node_id);
|
||||||
let valid_task = if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) {
|
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
||||||
state.pending_task_ids.lock().unwrap().remove(tid)
|
let valid_task = if let Some(ref tid) = task_id {
|
||||||
|
state.pending_task_ids.lock().unwrap().remove(tid.as_str())
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Jos API-pyyntö odottaa tätä vastausta, reititetään suoraan oneshot-kanavaan
|
||||||
|
let api_sender = task_id.as_ref().and_then(|tid| {
|
||||||
|
state.pending_responses.lock().unwrap().remove(tid)
|
||||||
|
});
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut json = json;
|
let mut json = json;
|
||||||
if let Some(obj) = json.as_object_mut() {
|
if let Some(obj) = json.as_object_mut() {
|
||||||
@@ -899,6 +908,12 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
state.db.increment_tasks(node_id);
|
state.db.increment_tasks(node_id);
|
||||||
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(sender) = api_sender {
|
||||||
|
// API-pyyntö: reititetään vastaus suoraan odottajalle
|
||||||
|
let _ = sender.send(json.clone());
|
||||||
|
}
|
||||||
|
// UI-broadcast jatkuu normaalisti
|
||||||
let _ = state.stats_tx.send(json.to_string());
|
let _ = state.stats_tx.send(json.to_string());
|
||||||
|
|
||||||
let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false);
|
let active_incentives = state.feature_flags.read().await.get("Insentiivit").copied().unwrap_or(false);
|
||||||
@@ -908,7 +923,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
{
|
{
|
||||||
let mut task_count = state.total_tasks.lock().unwrap();
|
let mut task_count = state.total_tasks.lock().unwrap();
|
||||||
*task_count += 1;
|
*task_count += 1;
|
||||||
|
|
||||||
if active_incentives && valid_task {
|
if active_incentives && valid_task {
|
||||||
let mut tokens = state.nodes_tokens.lock().unwrap();
|
let mut tokens = state.nodes_tokens.lock().unwrap();
|
||||||
let balance = tokens.entry(node_id).or_insert(0);
|
let balance = tokens.entry(node_id).or_insert(0);
|
||||||
@@ -916,7 +931,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
current_balance = *balance;
|
current_balance = *balance;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if active_incentives && ui_sync {
|
if active_incentives && ui_sync {
|
||||||
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
|
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
|
||||||
let msg = serde_json::json!({
|
let msg = serde_json::json!({
|
||||||
@@ -926,45 +941,50 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
|
|||||||
let _ = tx.send(msg.to_string());
|
let _ = tx.send(msg.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
broadcast_stats(&state).await;
|
broadcast_stats(&state).await;
|
||||||
}
|
}
|
||||||
} else if msg_type == "llm_error" {
|
} else if msg_type == "llm_error" {
|
||||||
state.node_busy.lock().unwrap().remove(&node_id);
|
state.node_busy.lock().unwrap().remove(&node_id);
|
||||||
if let Some(tid) = json.get("task_id").and_then(|v| v.as_str()) {
|
let task_id = json.get("task_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
||||||
state.pending_task_ids.lock().unwrap().remove(tid);
|
if let Some(ref tid) = task_id {
|
||||||
|
state.pending_task_ids.lock().unwrap().remove(tid.as_str());
|
||||||
}
|
}
|
||||||
|
// Jos API-pyyntö odottaa, reititetään virhe oneshot-kanavaan
|
||||||
|
let api_sender = task_id.as_ref().and_then(|tid| {
|
||||||
|
state.pending_responses.lock().unwrap().remove(tid)
|
||||||
|
});
|
||||||
{
|
{
|
||||||
let mut json = json;
|
let mut json = json;
|
||||||
if let Some(obj) = json.as_object_mut() {
|
if let Some(obj) = json.as_object_mut() {
|
||||||
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
obj.insert("node_id".to_string(), serde_json::json!(node_id));
|
||||||
}
|
}
|
||||||
|
if let Some(sender) = api_sender {
|
||||||
|
let _ = sender.send(json.clone());
|
||||||
|
}
|
||||||
let _ = state.stats_tx.send(json.to_string());
|
let _ = state.stats_tx.send(json.to_string());
|
||||||
}
|
}
|
||||||
} else if msg_type == "user_text" {
|
} else if msg_type == "user_text" {
|
||||||
// Käyttäjän lähettämä teksti — broadcastataan pair_taskina ja llm_promptina
|
// Käyttäjän lähettämä teksti — kohdennettu reititys lähettäjäsolmulle
|
||||||
let text = json.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
let text = json.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
||||||
let task_type = json.get("task_type").and_then(|v| v.as_str()).unwrap_or("tokenize");
|
let task_type = json.get("task_type").and_then(|v| v.as_str()).unwrap_or("tokenize");
|
||||||
if !text.is_empty() {
|
if !text.is_empty() {
|
||||||
let preview: String = text.chars().take(80).collect();
|
let preview: String = text.chars().take(80).collect();
|
||||||
tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, preview);
|
tracing::info!("Solmu {} lähetti oman tekstin ({}): \"{}\"", node_id, task_type, preview);
|
||||||
match task_type {
|
let msg = match task_type {
|
||||||
"tokenize" => {
|
"tokenize" => serde_json::json!({
|
||||||
let msg = serde_json::json!({
|
"type": "single_tokenize",
|
||||||
"type": "single_tokenize",
|
"text": text,
|
||||||
"text": text,
|
}),
|
||||||
});
|
_ => serde_json::json!({
|
||||||
let _ = state.stats_tx.send(msg.to_string());
|
"type": "llm_prompt",
|
||||||
}
|
"prompt": text,
|
||||||
_ => {
|
"model": task_type,
|
||||||
// LLM-prompti: lähetetään VAIN valitulle mallille, ei kaikille (välttää turhaa ruuhkaa ja busy-tiloja)
|
}),
|
||||||
let prompt = serde_json::json!({
|
};
|
||||||
"type": "llm_prompt",
|
// Lähetetään takaisin lähettäjäsolmulle (käyttäjä haluaa oman tekstinsä tuloksen)
|
||||||
"prompt": text,
|
if let Some(tx) = state.node_channels.read().await.get(&node_id) {
|
||||||
"model": task_type,
|
let _ = tx.send(msg.to_string());
|
||||||
});
|
|
||||||
let _ = state.stats_tx.send(prompt.to_string());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1161,8 +1181,9 @@ async fn api_chat_completions(
|
|||||||
msg.as_object_mut().unwrap().insert("max_tokens".to_string(), serde_json::json!(mt));
|
msg.as_object_mut().unwrap().insert("max_tokens".to_string(), serde_json::json!(mt));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Odotuskanava valmiiksi (solmu palauttaa tuloksen stats_tx kautta)
|
// Oneshot-kanava: solmu palauttaa tuloksen suoraan tälle pyynnölle
|
||||||
let mut rx = state.stats_tx.subscribe();
|
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<serde_json::Value>();
|
||||||
|
state.pending_responses.lock().unwrap().insert(payload.task_id.clone(), resp_tx);
|
||||||
|
|
||||||
// Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle
|
// Kohdennettu reititys: lähetetään AI-tehtävä suoraan VAIN valitulle solmulle
|
||||||
{
|
{
|
||||||
@@ -1171,48 +1192,34 @@ async fn api_chat_completions(
|
|||||||
let _ = tx.send(msg.to_string());
|
let _ = tx.send(msg.to_string());
|
||||||
tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model);
|
tracing::info!("Reititettiin API-pyyntö solmulle {} (Malli: {})", target_node_id, payload.model);
|
||||||
} else {
|
} else {
|
||||||
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
||||||
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi reitityksen aikana").into_response();
|
return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "Verkkovirhe: solmun yhteys katkesi reitityksen aikana").into_response();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let timeout = tokio::time::timeout(std::time::Duration::from_secs(600), async move {
|
let timeout = tokio::time::timeout(std::time::Duration::from_secs(600), resp_rx).await;
|
||||||
loop {
|
|
||||||
let msg_str = match rx.recv().await {
|
|
||||||
Ok(msg) => msg,
|
|
||||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
|
||||||
tracing::debug!("API-kanava lagged {} viestiä", n);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(_) => return Ok(None), // Kanava suljettu
|
|
||||||
};
|
|
||||||
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&msg_str) {
|
|
||||||
if v["type"].as_str() == Some("llm_done") {
|
|
||||||
if let Some(tid) = v["task_id"].as_str() {
|
|
||||||
if tid == payload.task_id {
|
|
||||||
return Ok(Some(ChatCompletionResponse {
|
|
||||||
response: v["response"].as_str().unwrap_or("").to_string(),
|
|
||||||
model: v["model"].as_str().unwrap_or("").to_string(),
|
|
||||||
tokens_generated: v["tokens_generated"].as_u64().unwrap_or(0),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if v["type"].as_str() == Some("llm_error") {
|
|
||||||
if let Some(tid) = v["task_id"].as_str() {
|
|
||||||
if tid == payload.task_id {
|
|
||||||
return Err(v["error"].as_str().unwrap_or("Määrittelemätön virhe solmussa").to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[allow(unreachable_code)]
|
|
||||||
Ok(None)
|
|
||||||
}).await;
|
|
||||||
|
|
||||||
match timeout {
|
match timeout {
|
||||||
Ok(Ok(Some(res))) => axum::Json(res).into_response(),
|
Ok(Ok(v)) => {
|
||||||
Ok(Ok(None)) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response(),
|
if v["type"].as_str() == Some("llm_error") {
|
||||||
Ok(Err(err)) => (axum::http::StatusCode::CONFLICT, err).into_response(),
|
let err = v["error"].as_str().unwrap_or("Määrittelemätön virhe solmussa").to_string();
|
||||||
Err(_) => (axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response(),
|
(axum::http::StatusCode::CONFLICT, err).into_response()
|
||||||
|
} else {
|
||||||
|
axum::Json(ChatCompletionResponse {
|
||||||
|
response: v["response"].as_str().unwrap_or("").to_string(),
|
||||||
|
model: v["model"].as_str().unwrap_or("").to_string(),
|
||||||
|
tokens_generated: v["tokens_generated"].as_u64().unwrap_or(0),
|
||||||
|
}).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Err(_)) => {
|
||||||
|
// Oneshot-kanava sulkeutui (solmu katosi)
|
||||||
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
||||||
|
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Verkkovirhe: yhteys katkesi").into_response()
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
state.pending_responses.lock().unwrap().remove(&payload.task_id);
|
||||||
|
(axum::http::StatusCode::GATEWAY_TIMEOUT, "Aikakatkaisu: solmu ei saanut tehtävää ajoissa valmiiksi").into_response()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user