Remote start stop control

This commit is contained in:
Jaakko Vanhala
2026-04-11 19:14:20 +03:00
parent 660e80c2bc
commit 80806498e0
7 changed files with 279 additions and 78 deletions

View File

@@ -49,6 +49,13 @@ impl NodeDb {
INSERT INTO _schema_version VALUES (3);
");
}
if version < 4 {
let _ = conn.execute_batch("
ALTER TABLE node_sessions ADD COLUMN is_paused BOOLEAN DEFAULT 0;
DELETE FROM _schema_version;
INSERT INTO _schema_version VALUES (4);
");
}
conn.execute_batch("
CREATE TABLE IF NOT EXISTS node_sessions (
@@ -84,7 +91,10 @@ impl NodeDb {
has_webgpu BOOLEAN,
-- Tehtävätilastot
tasks_completed INTEGER DEFAULT 0
tasks_completed INTEGER DEFAULT 0,
-- Ohjaustilat
is_paused BOOLEAN DEFAULT 0
);
CREATE TABLE IF NOT EXISTS pair_results (
@@ -183,6 +193,14 @@ impl NodeDb {
);
}
pub fn update_session_status(&self, node_id: u64, is_paused: bool) {
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
let _ = conn.execute(
"UPDATE node_sessions SET is_paused = ?1 WHERE node_id = ?2 AND disconnected_at IS NULL",
params![is_paused as i64, node_id as i64],
);
}
/// Sulkee saman IP:n viewer-sessiot kun aktiivinen node liittyy
pub fn close_viewers_by_ip(&self, ip: &str) {
let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner());
@@ -216,7 +234,7 @@ impl NodeDb {
"SELECT id, node_id, ip, node_type, connected_at, disconnected_at,
platform, hostname, os, cpu_cores, cpu_model, ram_mb,
gpu_name, gpu_vendor, gpu_backend, vram_total_mb, gpu_temp_c, gpu_util_pct,
allocated_gb, selected_task, has_webgpu, tasks_completed
allocated_gb, selected_task, has_webgpu, tasks_completed, is_paused
FROM node_sessions ORDER BY id DESC LIMIT ?1"
).unwrap();
@@ -244,6 +262,7 @@ impl NodeDb {
"selected_task": row.get::<_, Option<String>>(19)?,
"has_webgpu": row.get::<_, Option<bool>>(20)?,
"tasks_completed": row.get::<_, i64>(21)?,
"is_paused": row.get::<_, Option<bool>>(22)?.unwrap_or(false),
}))
}).unwrap().filter_map(|r| r.ok()).collect()
}

View File

@@ -25,7 +25,7 @@ const ALLOWED_ORIGINS: &[&str] = &[
];
// Sallitut viestityyypit clientilta
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "llm_error", "download_progress", "user_text", "single_tokenize_done"];
const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk", "llm_done", "llm_error", "download_progress", "user_text", "single_tokenize_done", "status_update"];
struct AppState {
next_node_id: Mutex<u64>,
@@ -40,6 +40,7 @@ struct AppState {
node_ips: Mutex<HashMap<u64, IpAddr>>,
node_tasks: Mutex<HashMap<u64, String>>, // node_id → selected_task
node_types: Mutex<HashMap<u64, String>>, // node_id → "native" | "browser"
node_paused: Mutex<std::collections::HashSet<u64>>, // node_id → onko tauolla
node_busy: Mutex<std::collections::HashSet<u64>>, // Solmut joilla on aktiivinen tehtävä
pending_task_ids: Mutex<std::collections::HashSet<String>>, // Hubin jakamat task_id:t (gamification-validointi)
pending_responses: Mutex<HashMap<String, tokio::sync::oneshot::Sender<serde_json::Value>>>, // task_id → oneshot API-vastaukselle
@@ -82,6 +83,8 @@ tr:hover td { background:#1c2333; }
.table-wrap { overflow-x:auto; max-height:70vh; overflow-y:auto; }
.online { color:var(--green); }
.offline { color:#8b949e; }
.pause-btn { background:var(--panel); border:1px solid var(--border); color:var(--text); padding:4px 8px; border-radius:4px; cursor:pointer; font-size:12px; }
.pause-btn:hover { border-color:var(--yellow); }
</style>
</head>
<body>
@@ -102,12 +105,12 @@ tr:hover td { background:#1c2333; }
<colgroup>
<col style="width:35px"><col style="width:85px"><col style="width:95px"><col style="width:65px"><col style="width:110px"><col style="width:80px">
<col style="width:65px"><col style="width:40px"><col style="width:70px"><col style="width:90px"><col style="width:60px">
<col style="width:65px"><col style="width:40px"><col style="width:130px"><col style="width:60px">
<col style="width:65px"><col style="width:40px"><col style="width:130px"><col style="width:60px"><col style="width:80px">
</colgroup>
<thead><tr>
<th>ID</th><th>Tila</th><th>Tehtävä</th><th>Tyyppi</th><th>IP</th><th>Alusta</th>
<th>OS</th><th>CPU</th><th>RAM</th><th>GPU</th><th>VRAM</th>
<th>WebGPU</th><th>Teht.</th><th>Yhdistetty</th><th>Kesto</th>
<th>WebGPU</th><th>Teht.</th><th>Yhdistetty</th><th>Kesto</th><th>Toiminnot</th>
</tr></thead><tbody id="sessions-body"></tbody></table>
</div>
</div>
@@ -210,9 +213,17 @@ async function load() {
document.getElementById('sessions-body').innerHTML = sessions.map(s => {
const online = !s.disconnected_at;
const isViewer = s.selected_task === 'viewer';
const status = online
? (isViewer ? '<span style="color:#d29922">CONNECTED</span>' : '<span class="online">ACTIVE</span>')
: '<span class="offline">offline</span>';
let status;
if (!online) {
status = '<span class="offline">offline</span>';
} else if (isViewer) {
status = '<span style="color:#d29922">CONNECTED</span>';
} else if (s.is_paused) {
status = '<span style="color:#8b949e">PAUSED</span>';
} else {
status = '<span class="online">ACTIVE</span>';
}
const typeBadge = s.node_type === 'native' ? badge('native','blue') : badge('browser','yellow');
const taskColor = isViewer ? 'yellow' : s.selected_task === 'tokenize' ? 'green' : 'blue';
const taskBadge = badge(taskNames[s.selected_task] || s.selected_task || '?', taskColor);
@@ -225,11 +236,16 @@ async function load() {
const os = s.os || '-';
const time = s.connected_at ? new Date(s.connected_at).toLocaleString('fi-FI') : '';
const dur = duration(s.connected_at, s.disconnected_at);
const actionBtn = online && !isViewer
? `<button class="pause-btn" onclick="togglePause(${s.node_id}, ${s.is_paused})">${s.is_paused ? '▶ Työhön' : '⏸ Tauolle'}</button>`
: '';
return `<tr>
<td>${s.node_id}</td><td>${status}</td><td>${taskBadge}</td><td>${typeBadge}</td><td>${s.ip}</td>
<td>${plat}</td><td>${os}</td><td>${cores}</td><td>${ram}</td>
<td>${gpu}</td><td>${vram}</td><td>${gpuBadge}</td>
<td>${s.tasks_completed}</td><td>${time}</td><td>${dur}</td>
<td>${actionBtn}</td>
</tr>`;
}).join('');
@@ -269,6 +285,17 @@ async function load() {
}).join('');
}
async function togglePause(nodeId, isPaused) {
try {
await fetch('/api/v1/control/' + nodeId, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action: isPaused ? 'resume' : 'pause' })
});
load(); // virkistetään
} catch(e) { console.error(e); }
}
load();
setInterval(load, 5000);
</script>
@@ -300,6 +327,7 @@ async fn main() {
node_ips: Mutex::new(HashMap::new()),
node_tasks: Mutex::new(HashMap::new()),
node_types: Mutex::new(HashMap::new()),
node_paused: Mutex::new(std::collections::HashSet::new()),
node_busy: Mutex::new(std::collections::HashSet::new()),
pending_task_ids: Mutex::new(std::collections::HashSet::new()),
pending_responses: Mutex::new(HashMap::new()),
@@ -421,6 +449,7 @@ async fn main() {
.route("/api/pairs", get(api_pairs))
.route("/api/stats", get(api_stats))
.route("/api/v1/chat/completions", axum::routing::post(api_chat_completions))
.route("/api/v1/control/:id", axum::routing::post(api_control_node))
.route("/api/v1/model", axum::routing::post(api_change_model))
.route("/api/v1/hardware", get(api_hardware))
.route("/api/v1/ollama/tags", get(api_ollama_tags))
@@ -440,6 +469,26 @@ async fn main() {
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
}
async fn api_control_node(
headers: axum::http::HeaderMap,
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
axum::extract::Path(id): axum::extract::Path<u64>,
axum::Json(payload): axum::Json<serde_json::Value>,
) -> axum::response::Response {
if !check_admin_auth(&headers) { return admin_unauthorized(); }
let action = payload.get("action").and_then(|v| v.as_str()).unwrap_or("");
if action == "pause" || action == "resume" {
let msg = serde_json::json!({ "type": "control", "action": action });
let channels = state.node_channels.read().await;
if let Some(tx) = channels.get(&id) {
let _ = tx.send(msg.to_string());
tracing::info!("Lähetetty control: {} solmulle {}", action, id);
return axum::Json(serde_json::json!({"status": "ok"})).into_response();
}
}
(axum::http::StatusCode::BAD_REQUEST, "Invalid action or node offline").into_response()
}
async fn api_sessions(
headers: axum::http::HeaderMap,
axum::extract::State(state): axum::extract::State<Arc<AppState>>,
@@ -770,6 +819,9 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
}
state.node_tasks.lock().unwrap().insert(node_id, selected_task);
state.node_types.lock().unwrap().insert(node_id, node_type.to_string());
// Uudelleen-kirjautuessa nollataan tauko
state.node_paused.lock().unwrap().remove(&node_id);
state.db.update_session_status(node_id, false);
if node_type == "native" {
let sys = json.get("system");
@@ -826,6 +878,18 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
});
let _ = state.stats_tx.send(join_msg.to_string());
} else if msg_type == "status_update" {
let status = json.get("status").and_then(|v| v.as_str()).unwrap_or("active");
if status == "paused" {
state.node_paused.lock().unwrap().insert(node_id);
state.db.update_session_status(node_id, true);
tracing::info!("Solmu {} ({}) asettui tauolle.", node_id, ip);
} else {
state.node_paused.lock().unwrap().remove(&node_id);
state.db.update_session_status(node_id, false);
tracing::info!("Solmu {} ({}) on taas aktiivinen.", node_id, ip);
}
broadcast_stats(&state).await;
} else if msg_type == "result" {
tracing::info!("Solmu {} sai tuloksen: {}", node_id, text);
{
@@ -1053,6 +1117,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, ip: IpAddr) {
vram.remove(&node_id);
}
state.node_types.lock().unwrap().remove(&node_id);
state.node_paused.lock().unwrap().remove(&node_id);
state.node_models.write().await.remove(&node_id);
tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip);
broadcast_stats(&state).await;
@@ -1179,7 +1244,9 @@ async fn api_chat_completions(
let tasks = state.node_tasks.lock().unwrap();
let _busy = state.node_busy.lock().unwrap();
let node_types = state.node_types.lock().unwrap();
let matching: Vec<u64> = tasks.iter().filter(|(_, task)| {
let paused = state.node_paused.lock().unwrap();
let matching: Vec<u64> = tasks.iter().filter(|(k, task)| {
if paused.contains(k) { return false; } // Ei sallita tauotettuja
// Eksakti match tai qwen-perheen yhteensopivuus (selain: qwen-coder-05b, natiivi: qwen2.5-coder:7b)
let req_model = payload.model.to_lowercase();
let node_task = task.to_lowercase();