Älykäs reititys: capability=heavy priorisoi isoimman mallin solmun

Hub:
- Parsii node_models:sta suurimman mallin parametrimäärän (B)
  per solmu (esim. qwen3:32b → 32, qwen2.5-coder:7b → 7)
- Tallentaa node_max_param_b: HashMap<u64, u32>
- ChatCompletionRequest: uusi capability-kenttä ("heavy"/"light")
- Reitityslogiikka: capability=heavy → valitsee solmun jolla on
  suurin malli; oletus → natiivi ensin kuten ennenkin

Frontend (pipeline):
- JSON-speksin generointi: capability=heavy
- QA-korjaussilmukan koodikorjaus: capability=heavy
- Observer/README-arviointi: capability=heavy
- Vaatimukset (Client): oletus (kevyt, kelpaa pieni malli)

Tämä mahdollistaa sen, että A40-koneella pyörivä Qwen3:32B
saa raskaat tehtävät ja selaimen 0.5B-malli hoitaa kevyet.
This commit is contained in:
2026-04-13 16:30:47 +03:00
parent 58d93613f0
commit fa85dcc5b3
2 changed files with 57 additions and 10 deletions

View File

@@ -846,6 +846,7 @@ OUTPUT FORMAT:
top_k: opts.topK ?? settings.topK ?? undefined, top_k: opts.topK ?? settings.topK ?? undefined,
max_tokens: opts.maxTokens ?? settings.maxTokens ?? undefined, max_tokens: opts.maxTokens ?? settings.maxTokens ?? undefined,
repeat_penalty: opts.repeatPenalty ?? settings.repeatPenalty ?? undefined, repeat_penalty: opts.repeatPenalty ?? settings.repeatPenalty ?? undefined,
capability: opts.capability || undefined, // "heavy" → isoin malli
}; };
const res = await fetch('/api/v1/chat/completions', { const res = await fetch('/api/v1/chat/completions', {
@@ -1422,7 +1423,7 @@ Blog → Author: name,email,bio(Text|None) / Post: title, content(Text), author_
highlightAgent('manager'); highlightAgent('manager');
explainStep('Arkkitehtuuri', `${mgr.name} analysoi vaatimukset ja tuottaa JSON-speksin: entiteetit, kentät, tyypit.`); explainStep('Arkkitehtuuri', `${mgr.name} analysoi vaatimukset ja tuottaa JSON-speksin: entiteetit, kentät, tyypit.`);
const specRaw = await kpnRun(mgr.model, `${brief}\n\nOutput a JSON spec for this project.`, false, { ...mgr, prompt: SPEC_SYSTEM }); const specRaw = await kpnRun(mgr.model, `${brief}\n\nOutput a JSON spec for this project.`, false, { ...mgr, prompt: SPEC_SYSTEM, capability: 'heavy' });
const spec = specRaw ? extractJson(specRaw) : null; const spec = specRaw ? extractJson(specRaw) : null;
promptLog.push({ step: 1, agentKey: 'manager', agentName: mgr.name, model: mgr.model, label: 'JSON-speksi', systemPrompt: SPEC_SYSTEM, userPrompt: brief, response: specRaw || '' }); promptLog.push({ step: 1, agentKey: 'manager', agentName: mgr.name, model: mgr.model, label: 'JSON-speksi', systemPrompt: SPEC_SYSTEM, userPrompt: brief, response: specRaw || '' });
@@ -1493,7 +1494,7 @@ Blog → Author: name,email,bio(Text|None) / Post: title, content(Text), author_
explainStep(`Korjaus: ${fname}`, `${fixAgent.name} korjaa validoinnin löytämät ongelmat.`); explainStep(`Korjaus: ${fname}`, `${fixAgent.name} korjaa validoinnin löytämät ongelmat.`);
const fixPrompt = `Fix the following issues in this Python file. Return ONLY the complete corrected file, no explanations.\n\nISSUES:\n${fIssues.join('\n')}\n\nCURRENT FILE (${fname}):\n\`\`\`python\n${files[fname]}\`\`\``; const fixPrompt = `Fix the following issues in this Python file. Return ONLY the complete corrected file, no explanations.\n\nISSUES:\n${fIssues.join('\n')}\n\nCURRENT FILE (${fname}):\n\`\`\`python\n${files[fname]}\`\`\``;
const fixResult = await kpnRun(fixAgent.model, fixPrompt, false, { ...fixAgent, prompt: 'You are a Python code fixer. Return ONLY the corrected Python file. No markdown fences, no explanations — just valid Python code.' }); const fixResult = await kpnRun(fixAgent.model, fixPrompt, false, { ...fixAgent, prompt: 'You are a Python code fixer. Return ONLY the corrected Python file. No markdown fences, no explanations — just valid Python code.', capability: 'heavy' });
if (fixResult) { if (fixResult) {
// Poistetaan markdown-koodiblokit jos LLM palauttaa ne // Poistetaan markdown-koodiblokit jos LLM palauttaa ne
@@ -1557,7 +1558,7 @@ Blog → Author: name,email,bio(Text|None) / Post: title, content(Text), author_
`## Architecture\nDescribe the project structure and design decisions.\n\n` + `## Architecture\nDescribe the project structure and design decisions.\n\n` +
`## Risk Assessment\n| Severity | Issue |\n|----------|-------|\n| ... | ... |\n\n` + `## Risk Assessment\n| Severity | Issue |\n|----------|-------|\n| ... | ... |\n\n` +
`Project code:\n${finalCode}`; `Project code:\n${finalCode}`;
const readme = await kpnRun(obs.model, obsPrompt, false, obs); const readme = await kpnRun(obs.model, obsPrompt, false, { ...obs, capability: 'heavy' });
if (readme) { if (readme) {
files['README.md'] = readme; files['README.md'] = readme;
// Tallennetaan raportti globaalisti jotta tarkkailija-klikkaus avaa sen // Tallennetaan raportti globaalisti jotta tarkkailija-klikkaus avaa sen

View File

@@ -47,6 +47,7 @@ struct AppState {
pending_responses: Mutex<HashMap<String, tokio::sync::oneshot::Sender<serde_json::Value>>>, // task_id → oneshot API-vastaukselle pending_responses: Mutex<HashMap<String, tokio::sync::oneshot::Sender<serde_json::Value>>>, // task_id → oneshot API-vastaukselle
api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä) api_rate_limits: Mutex<HashMap<IpAddr, (std::time::Instant, u32)>>, // IP → (ikkuna-alku, pyyntömäärä)
node_models: tokio::sync::RwLock<HashMap<u64, serde_json::Value>>, // node_id → ollama tags JSON node_models: tokio::sync::RwLock<HashMap<u64, serde_json::Value>>, // node_id → ollama tags JSON
node_max_param_b: tokio::sync::RwLock<HashMap<u64, u32>>, // node_id → suurimman mallin parametrit (B)
db: db::NodeDb, db: db::NodeDb,
} }
@@ -335,6 +336,7 @@ async fn main() {
pending_responses: Mutex::new(HashMap::new()), pending_responses: Mutex::new(HashMap::new()),
api_rate_limits: Mutex::new(HashMap::new()), api_rate_limits: Mutex::new(HashMap::new()),
node_models: tokio::sync::RwLock::new(HashMap::new()), node_models: tokio::sync::RwLock::new(HashMap::new()),
node_max_param_b: tokio::sync::RwLock::new(HashMap::new()),
db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())), db: db::NodeDb::new(&std::env::var("DATABASE_PATH").unwrap_or_else(|_| "nodes.db".to_string())),
}); });
@@ -846,10 +848,34 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
node_id, ip, hostname, os, cores, ram, allocated node_id, ip, hostname, os, cores, ram, allocated
); );
// Tallennetaan välitetyt mallit muistiin // Tallennetaan välitetyt mallit muistiin + parsitaan suurin malli
if let Some(models) = json.get("models") { if let Some(models) = json.get("models") {
let mut nm = state.node_models.write().await; let mut nm = state.node_models.write().await;
nm.insert(node_id, models.clone()); nm.insert(node_id, models.clone());
// Parsitaan suurin mallikoko (B) nimestä: "qwen3:32b" → 32, "qwen2.5-coder:7b" → 7
let max_b = models.get("models").and_then(|v| v.as_array()).map(|arr| {
arr.iter().filter_map(|m| {
let name = m.get("name")?.as_str()?;
// Etsitään :N tai :Nb tai -Nb muoto
let lower = name.to_lowercase();
for part in lower.split(&[':', '-'][..]) {
if let Some(num_str) = part.strip_suffix('b') {
if let Ok(n) = num_str.parse::<f32>() { return Some(n as u32); }
} else if let Ok(n) = part.parse::<f32>() {
if n >= 0.5 && n <= 500.0 { return Some(n as u32); }
}
}
// Fallback: koko tiedostosta (size / ~0.5GB per B param Q4)
let size = m.get("size")?.as_u64()?;
Some((size / 500_000_000) as u32) // karkea arvio
}).max().unwrap_or(0)
}).unwrap_or(0);
if max_b > 0 {
state.node_max_param_b.write().await.insert(node_id, max_b);
tracing::info!("Solmu {} — suurin malli: ~{}B parametria", node_id, max_b);
}
} }
if let Some(gpus) = json.get("gpus").and_then(|v| v.as_array()) { if let Some(gpus) = json.get("gpus").and_then(|v| v.as_array()) {
@@ -1149,6 +1175,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
state.node_types.lock().unwrap().remove(&node_id); state.node_types.lock().unwrap().remove(&node_id);
state.node_paused.lock().unwrap().remove(&node_id); state.node_paused.lock().unwrap().remove(&node_id);
state.node_models.write().await.remove(&node_id); state.node_models.write().await.remove(&node_id);
state.node_max_param_b.write().await.remove(&node_id);
tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip); tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip);
broadcast_stats(&state).await; broadcast_stats(&state).await;
sender_task.abort(); sender_task.abort();
@@ -1170,6 +1197,8 @@ struct ChatCompletionRequest {
repeat_penalty: Option<f64>, repeat_penalty: Option<f64>,
#[serde(default)] #[serde(default)]
stop: Option<Vec<String>>, stop: Option<Vec<String>>,
#[serde(default)]
capability: Option<String>, // "heavy" → priorisoi isoin malli, "light" → mikä tahansa
} }
#[derive(serde::Serialize)] #[derive(serde::Serialize)]
@@ -1279,12 +1308,14 @@ async fn api_chat_completions(
} }
} }
// Etsitään vapaa solmu — priorisoidaan natiivisolmut (GPU) selaimen edelle // Etsitään vapaa solmu — älykäs reititys kyvykkyyden mukaan
let want_heavy = payload.capability.as_deref() == Some("heavy");
let (target_node, _total_matching) = { let (target_node, _total_matching) = {
let tasks = state.node_tasks.lock().unwrap(); let tasks = state.node_tasks.lock().unwrap();
let _busy = state.node_busy.lock().unwrap(); let _busy = state.node_busy.lock().unwrap();
let node_types = state.node_types.lock().unwrap(); let node_types = state.node_types.lock().unwrap();
let paused = state.node_paused.lock().unwrap(); let paused = state.node_paused.lock().unwrap();
let param_b = state.node_max_param_b.read().await;
let matching: Vec<u64> = tasks.iter().filter(|(k, task)| { let matching: Vec<u64> = tasks.iter().filter(|(k, task)| {
if paused.contains(k) { return false; } // Ei sallita tauotettuja if paused.contains(k) { return false; } // Ei sallita tauotettuja
// Eksakti match tai qwen-perheen yhteensopivuus (selain: qwen-coder-05b, natiivi: qwen2.5-coder:7b) // Eksakti match tai qwen-perheen yhteensopivuus (selain: qwen-coder-05b, natiivi: qwen2.5-coder:7b)
@@ -1298,11 +1329,26 @@ async fn api_chat_completions(
**task == payload.model **task == payload.model
} }
}).map(|(k, _)| *k).collect(); }).map(|(k, _)| *k).collect();
// Etsitään mikä tahansa matchaava solmu (natiivi priorisoidaan)
let any = if want_heavy {
// Heavy: priorisoi solmu jolla on suurin malli (B-parametrit)
let mut ranked: Vec<(u64, u32)> = matching.iter().map(|id| {
(*id, param_b.get(id).copied().unwrap_or(0))
}).collect();
ranked.sort_by(|a, b| b.1.cmp(&a.1)); // suurin ensin
if let Some((best_id, best_b)) = ranked.first() {
tracing::info!("Heavy-reititys: solmu {} valittu ({}B parametria)", best_id, best_b);
Some(*best_id)
} else {
None
}
} else {
// Oletus: natiivi ensin, sitten mikä tahansa
let native = matching.iter().find(|id| { let native = matching.iter().find(|id| {
node_types.get(id).map(|t| t == "native").unwrap_or(false) node_types.get(id).map(|t| t == "native").unwrap_or(false)
}).copied(); }).copied();
let any = native.or_else(|| matching.first().copied()); native.or_else(|| matching.first().copied())
};
(any, matching.len()) (any, matching.len())
}; };