commit 46848ee027b2bc007b1154617a8f130b013b62c1 Author: jaakko Date: Wed Apr 1 17:54:08 2026 +0300 eka vedos diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..333bf79 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# Created by https://www.toptal.com/developers/gitignore/api/rust,linux +# Edit at https://www.toptal.com/developers/gitignore?templates=rust,linux + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### Rust ### +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# End of https://www.toptal.com/developers/gitignore/api/rust,linux diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..3e1dfc8 --- /dev/null +++ b/TODO.md @@ -0,0 +1 @@ +Lisää viesteihin tietoturvallinen kryptaus - mitään selkokielistä ei ole hyvä lähettää. \ No newline at end of file diff --git a/network-poc/Cargo.toml b/network-poc/Cargo.toml new file mode 100644 index 0000000..5809e4a --- /dev/null +++ b/network-poc/Cargo.toml @@ -0,0 +1,7 @@ +[workspace] +resolver = "2" +members = [ + "hub", + "node", + "native-node" +] diff --git a/network-poc/Dockerfile.dev b/network-poc/Dockerfile.dev new file mode 100644 index 0000000..c70fe2b --- /dev/null +++ b/network-poc/Dockerfile.dev @@ -0,0 +1,14 @@ +FROM rust:slim + +# Asenna Wasm-packia ja WebGPU:ta varten tarvittavat kirjastot +RUN apt-get update && apt-get install -y \ + curl \ + pkg-config \ + libssl-dev \ + g++ \ + && rm -rf /var/lib/apt/lists/* + +# Lataa nopeasti suoraan valmis wasm-pack -binääri +RUN curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh + +WORKDIR /app diff --git a/network-poc/Dockerfile.native-node b/network-poc/Dockerfile.native-node new file mode 100644 index 0000000..2329389 --- /dev/null +++ b/network-poc/Dockerfile.native-node @@ -0,0 +1,30 @@ +FROM rust:slim AS builder + +RUN apt-get update && apt-get install -y \ + pkg-config libssl-dev g++ \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +COPY Cargo.toml Cargo.lock ./ +COPY hub/Cargo.toml hub/Cargo.toml +COPY node/Cargo.toml node/Cargo.toml +COPY native-node/Cargo.toml native-node/Cargo.toml + +# Tyhjät src-tiedostot riippuvuuksien esikääntämistä varten +RUN mkdir -p hub/src node/src native-node/src \ + && echo "fn main(){}" > hub/src/main.rs \ + && echo "" > node/src/lib.rs \ + && echo "fn main(){}" > native-node/src/main.rs \ + && cargo build --release -p native-node 2>/dev/null || true + +COPY native-node/src native-node/src +RUN cargo build --release -p native-node + +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +COPY --from=builder /app/target/release/native-node /usr/local/bin/native-node + +ENV HUB_URL=ws://hub:3000/ws +ENV ALLOCATED_GB=4 + +CMD ["native-node"] diff --git a/network-poc/Dockerfile.prod b/network-poc/Dockerfile.prod new file mode 100644 index 0000000..f833593 --- /dev/null +++ b/network-poc/Dockerfile.prod @@ -0,0 +1,26 @@ +FROM rust:slim AS builder + +RUN apt-get update && apt-get install -y \ + curl pkg-config libssl-dev g++ \ + && rm -rf /var/lib/apt/lists/* + +RUN curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh + +WORKDIR /app +COPY . . + +# Rakenna Wasm-paketti +RUN cd node && wasm-pack build --target web --out-dir ../static/pkg + +# Rakenna Hub release-binääri +RUN cargo build --release -p hub + +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /app/target/release/hub /usr/local/bin/hub +COPY --from=builder /app/static /app/static + +WORKDIR /app +EXPOSE 3000 +CMD ["hub"] diff --git a/network-poc/README.md b/network-poc/README.md new file mode 100644 index 0000000..5577736 --- /dev/null +++ b/network-poc/README.md @@ -0,0 +1,75 @@ +# Kipinä Agentic Network PoC (WebGPU Edition) + +Tämä on hajautetun tekoälylaskennan (Agentic Compute) kokeilulaboratorio. Projekti koostuu Rust-pohjaisesta keskuksesta (Hub) ja selainpohjaisista työntekijöistä (Nodet), jotka suorittavat tekoälytensoreiden matriisilaskentaa **WebGPU**-rajapintaa ja **Burn AI** -koneoppimiskirjastoa hyödyntäen. + +Normaalin keskitetyn palvelimen sijaan tämä kokeilu hyödyntää selaimeen kytkettyjen lukemattomien laitteiden vapaana olevaa tehokapasiteettia hajautetusti P2P-tyylillä. + +## Kuinka käynnistää projekti paikallisesti + +1. **Rakenna solmun WebAssembly-binääri** +Paketoi Rust WebAssemblyksi (vaatii `wasm-pack`-työkalun): +```bash +cd node +wasm-pack build --target web --out-dir ../static/pkg +``` + +2. **Käynnistä Hub-Keskuspalvelin** +```bash +cd hub +cargo run +``` +Palvelin lähtee pyörimään ja tarjoamaan sekä WebSocket-reititintä että staattista Dashboard-sivustoa lokaalisti portissa `3000`. + +--- + +## ⚠️ WebGPU Ota-Käyttöön -ohjeet (Linux / Mac / Win) + +Selainvalmistajat rajoittavat tällä hetkellä uuden WebGPU-rajapinnan hardware-yhteyttä (fyysiseen näytönohjaimeen) turvallisuus- ja vakaussyistä, erityisesti Linuxin Wayland-ympäristöissä (kuten Pop!_OS, Ubuntu). + +Päästäksesi hyödyntämään solmun laskentatehoa selaimesi ja tietokoneesi näytönohjaimen läpi, joudut todennäköisesti pakottamaan sen käyntiin. + +### Chromium-pohjaiset selaimet (Google Chrome, Brave, Chromium) + +**Vaihtoehto 1: Käynnistys lipuilla (Suositeltu Linuxille ja Waylandille)** +Jos Chromesi tuottaa Wasm-kaatumisia tai väittää ettei adapteria löydy, laitteesi Wayland-palvelin estää Vulkan-rajapinnan oletuksena. Käynnistä selaimesi komentoriviltä pakottamalla vanha X11-ikkunointi ja Vulkan: + +```bash +# Google Chrome +google-chrome --enable-unsafe-webgpu --enable-features=Vulkan --ignore-gpu-blocklist --use-angle=vulkan --ozone-platform=x11 + +# Brave Browser +brave-browser --enable-unsafe-webgpu --enable-features=Vulkan --ignore-gpu-blocklist --use-angle=vulkan --ozone-platform=x11 + +# Chromium +chromium-browser --enable-unsafe-webgpu --enable-features=Vulkan --ignore-gpu-blocklist --use-angle=vulkan --ozone-platform=x11 +``` + +*(Voit halutessasi testata puhdasta testi-ikkunaa erillisen profiilin kera, lisäämällä perään `--user-data-dir=/tmp/kipin-webgpu-test` jottei asetus sotke tai ohjaudu vanhaan auki olevaan sessioosi).* + +**Vaihtoehto 2: Sisäänrakennetun Flagin kääntö (Windows / Mac / Osittain Linux)** +1. Kirjoita selaimen osoiteriville `chrome://flags` (tai `brave://flags`) +2. Etsi hakusanalla **WebGPU** (Unsafe WebGPU / WebGPU Developer Features) ja vaihda tilaksi `Enabled` +3. Etsi hakusanalla **Vulkan** ja vaihda tilaan `Enabled` +4. Uudelleenkäynnistä selain pienen napin kautta. + +--- + +### Mozilla Firefox + +Firefox tukee WebGPU:ta toistaiseksi vahvasti vain Nightly-versioissa, mutta sitä voi yrittää aktivoida Config-asetuksista. +1. Kirjoita osoiteriville `about:config` ja ymmärrä riskit. +2. Etsi `dom.webgpu.enabled` ja tuplaklikkaa arvoksi `true`. +3. Etsi `gfx.webrender.all` ja aseta se `true`. +4. Uudelleenkäynnistä Firefox. + +*(Huomio Linux-käyttäjille: Firefox saattaa edellyttää MOZ_ENABLE_WAYLAND ympäristömuuttujaa).* + +--- + +### Apple Safari (Mac) + +Apple käyttää konepellin alla vahvaa omaa Metal-rajapintaansa ja tukee WebGPU:ta uudemmissa Safari-versioissa kehittäjäasetusten takaa: +1. Varmista ensin Safarin asetuksista (Preferences -> Advanced) , että ruutu on ruksittu kohdasta `"Show Develop menu in menu bar"`. +2. Valitse yläpalkista avautuva **Develop**-valikko -> **Feature Flags**. +3. Etsi listalta **WebGPU** ja laita siihen täppä pelastamaan tilanne. +4. Päivitä Dashboard-sivu. diff --git a/network-poc/REQUIREMENTS.md b/network-poc/REQUIREMENTS.md new file mode 100644 index 0000000..48d3ba1 --- /dev/null +++ b/network-poc/REQUIREMENTS.md @@ -0,0 +1,59 @@ +# Agentic Office - Kipinä Hajautettu Verkkoprojekti + +Tässä on kooste projektin vaatimuksista, työtehtävistä ja niiden nykytilanteesta. Tämä dokumentti on jatkuvasti päivittyvä kuvaus siitä, mitä tavoitellaan ja mitä on jo tehty. + +## 🚀 Vaihe 1: Rust + Wasm Selain-Nodet (Selainpohjainen P2P) + +### Tavoitteet +- Madaltaa käyttäjän osallistumiskynnys "yhteen klikkaukseen". +- Selainkäyttäjien verkkolaitteen WebGPU:ta hyödyntävä asynkroninen Rust-solmu (Wasm). +- WebSocket-yhteys julkiseen Hubiin, joka jakelee matematiikka/AI-laskentaa eteenpäin Nodeille. + +### Tehtävät +- [x] Rakenna axum-pohjainen Rust-reititin ja Hub Server (portti 3000) +- [x] Luo Wasm-käännettävä Rust-kirjasto selaimen Node-agentteja varten +- [x] Ota onnistuneesti käyttöön task-jono WebSocketin yli nodejen kanssa +- [x] Pakkaa solmu yhteen helposti levitettävään muotoon (wasm-pack -> static/index.html) +- [x] Tee yksinkertainen kuormalaskenta-algoritmi (Fibonacci) konseptin todentamiseksi +- [x] Tallenna tulokset asynkronisesti käyttöjärjestelmään ja takaisin weppiin + +--- + +## 🚀 Vaihe 2: Kipinä.studio taustajärjestelmä + +### Tavoitteet +- Luoda kunnollinen työjono (Job Queue). +- Sijoittaa Hub-palvelin julkisesti saatavuusosoitteeseen `kipina.studio`. + +### Tehtävät +- [ ] Tuotantopalvelimen käyttöönotto Nginxin tai Docker-compose kautta ehtojen täytyttyä +- [ ] Turvamekanismin lisäys: Varmistetaan, ettei kukaan lähetä "falskeja" vastauksia nodeilta +- [ ] Solmuille rekisteröitymismekanismi tai tulostaulukko + +--- + +## 🚀 Vaihe 3 & 4: WebGPU ja Klusterin Statistiikka (VALMIS!) + +### Tavoitteet +- Korvata simppeli Fibonacci-luuppi aidoimmilla AI-tensoreilla ja laitekiihdytyksellä Burn-kirjastoa (WebGPU) apuna käyttäen. +- Valvoa ja suojella solmujen tehoa reaaliajassa. + +### Tehtävät +- [x] Integroi `burn-wgpu` ja `burn-core` (v0.14.0) kääntymään Wasm-pakettiin +- [x] Valmistele laskettavien tensoreiden välitys Hubilta laitteiston Metal/WebGPU -muistiin +- [x] Koodaa Hubiin logiikka (Broadcast), joka yhdistää jokaisen solmun "4 GB" (oletuksena Mac-koneille) VRAM:in ja julkaisee summan Dashboardiin. +- [x] Teollisuustason GPU "Duty Cycle" Throttling: JS Slider (25%-100%) jarruttaa raskaan WebGPU-ajon välitöntä syöttöä tauottaen laitteistoa ja suojellen käyttöjärjestelmää ylikuormittumukselta. + +--- + +## 🚀 Vaihe 5: Aito Agentic Compute (Micro-LLM Tekstigeneraatio Verkossa) + +### Tavoitteet +- Korvata kokeellinen kymmenien tuhansien alkioiden pelkkä satunnais-matriisilaskenta (C=A*B) aidolla asynkronisella LLM-mallilla (esim. Llama-3 1B kvantisoituna / vastaava Transformer). +- Kyetä lataamaan selaimen IndexedDB:hen satojen megatavujen painot massivisena fetch-hakuna, kääntää ne WebGPU-puskureihin (Buffers) ja suorittaa tekstigeneraatiota etänä ohjattuna verkosta käsin WebSocketia myöden. + +### Tehtävät +- [ ] Refaktoroi Wasm-Noden (Burn.rs) paketti tuomaan Text-Tokenizerit (esim. BPE) ja kielimallin arkkitehtuuri käyttöön +- [ ] Koodaa Nodeen logiikka hakea / kasata mallin painot välimuistista "Chunk"-lohkoina valmiiksi +- [ ] Hub uudistetaan generoimaan pelkkien matikkavaikeuksien sijasta Text Prompts (esim. "Kirjoita haiku Suomesta") ja reitittämään työkuorman vapaalle solmulle +- [ ] Kipinän käyttöliittymään Chat-ikkuna Hubin striimaamien tulossanojen tarkkailuun reaaliajassa diff --git a/network-poc/USER-README.md b/network-poc/USER-README.md new file mode 100644 index 0000000..57dc901 --- /dev/null +++ b/network-poc/USER-README.md @@ -0,0 +1,117 @@ +# Kipinä Agentic Network — Hajautettu AI-laskentaverkko + +Kipinä Agentic Network on hajautettu tekoälylaskentaverkko, jossa selaimet ja natiivit solmut tarjoavat GPU-laskentatehoa yhteiseen käyttöön. Hub-palvelin koordinoi tehtäviä ja solmut suorittavat ne WebGPU:lla tai CPU:lla. + +## Miten se toimii + +``` +┌──────────┐ WebSocket ┌──────────┐ WebSocket ┌──────────────┐ +│ Selain │◄─────────────────►│ Hub │◄─────────────────►│ Natiivi-node │ +│ (Wasm) │ tehtävät/tulokset│ (Axum) │ tehtävät/tulokset│ (Rust) │ +│ WebGPU │ │ :3000 │ │ NVML/sysinfo │ +└──────────┘ └──────────┘ └──────────────┘ + │ │ + └── CPU fallback (NdArray) └── Dashboard + statistiikat + jos WebGPU ei tuettu +``` + +**Hub** jakaa tokenisointitehtäviä satunnaisesti 10 sekunnin välein. Solmut tokenisoivat syötteen Qwen2.5-Coder-tokenizerin avulla ja palauttavat tuloksen. Hub näyttää tulokset terminaalissa ja välittää ne dashboardiin. + +## Kaksi tapaa osallistua verkkoon + +### 1. Selainsolmu (Wasm + WebGPU) +- Avaa `http://localhost:3000` selaimessa ja klikkaa "Liity laskentaverkkoon" +- Selain tunnistaa automaattisesti WebGPU-tuen — jos ei löydy, käytetään CPU-fallbackia +- Tokenizer ladataan HuggingFacesta ensimmäisellä kerralla ja tallennetaan IndexedDB:hen +- GPU-kuormitusta voi säätää sliderilla (0–75 %) + +### 2. Natiivi-node (Rust + NVML) +- Kerää nvidia-smi-tason laitteistotiedot: GPU-nimi, VRAM, lämpötila, kuormitus +- Raportoi järjestelmätiedot: CPU-malli, ytimet, RAM, OS +- Yhdistää hubiin ja vastaanottaa tehtäviä + +## Käynnistys + +### Docker Compose (suositeltu) + +```bash +# Hub + selainsolmu +docker compose up + +# Hub + selainsolmu + natiivi-node (vaatii nvidia-container-toolkit) +docker compose --profile native up +``` + +Dashboard avautuu osoitteessa http://localhost:3000 + +### Ilman Dockeria + +```bash +# 1. Rakenna Wasm-paketti (vaatii wasm-pack) +cd node && wasm-pack build --target web --out-dir ../static/pkg && cd .. + +# 2. Käynnistä hub (terminaali 1) +cargo run -p hub + +# 3. Avaa selain: http://localhost:3000 + +# 4. Valinnainen: natiivi-node (terminaali 2) +HUB_URL=ws://localhost:3000/ws ALLOCATED_GB=4 cargo run -p native-node +``` + +## WebGPU-asetukset selaimessa + +WebGPU ei ole oletuksena päällä kaikissa selaimissa. Jos "Liity laskentaverkkoon" -nappi käynnistää CPU-fallbackin vaikka koneessa on näytönohjain: + +**Chrome / Brave (Linux + Wayland):** +```bash +google-chrome --enable-unsafe-webgpu --enable-features=Vulkan --ignore-gpu-blocklist --use-angle=vulkan --ozone-platform=x11 +``` + +**Chrome / Brave (Windows / Mac):** +1. Avaa `chrome://flags` +2. Ota käyttöön "WebGPU" ja "Vulkan" +3. Käynnistä selain uudelleen + +**Firefox:** `about:config` → `dom.webgpu.enabled` = `true` + +**Safari:** Develop → Feature Flags → WebGPU + +## Projektin rakenne + +``` +network-poc/ +├── hub/ # Keskuspalvelin (Rust + Axum) +│ └── src/main.rs # WebSocket-reititin, tehtävien jakelu, statistiikat +├── node/ # Selainsolmu (Rust → Wasm) +│ └── src/ +│ ├── lib.rs # WebGPU/NdArray-laskenta, tokenisaatio, WS-yhteys +│ └── storage.rs # IndexedDB-välimuisti (tokenizer) +├── native-node/ # Natiivi-solmu (Rust) +│ └── src/main.rs # NVML GPU-tunnistus, sysinfo, WS-yhteys +├── static/ +│ ├── index.html # Dashboard-käyttöliittymä +│ └── pkg/ # Wasm-build (generoidaan) +├── docker-compose.yml +├── Dockerfile.dev # Hub + Wasm-build +└── Dockerfile.native-node +``` + +## Ympäristömuuttujat + +| Muuttuja | Oletus | Kuvaus | +|---|---|---| +| `HUB_URL` | `ws://hub:3000/ws` | Hub-palvelimen WebSocket-osoite (native-node) | +| `ALLOCATED_GB` | `4` | Solmun varaama muisti verkosta (GB) | + +## Kehitysvaihe + +Tämä on proof-of-concept. Toimivat osat: +- Hub-palvelin, WebSocket-viestintä, dashboard +- WebGPU-tensorilaskenta selaimessa (Burn + Wgpu) +- CPU-fallback selaimissa ilman WebGPU-tukea (Burn + NdArray) +- Natiivi-node nvidia-smi-tason laitteistotiedoilla +- Qwen2.5-Coder-tokenizer + IndexedDB-välimuisti +- GPU-kuormituksen säätö (duty cycle throttling) + +Seuraavaksi: oikea LLM-inferenssi hajautetusti (mallin painojen lataus, transformer-arkkitehtuuri Wasm/WebGPU:lla). diff --git a/network-poc/docker-compose.yml b/network-poc/docker-compose.yml new file mode 100644 index 0000000..46d4c4b --- /dev/null +++ b/network-poc/docker-compose.yml @@ -0,0 +1,34 @@ +services: + agentic-poc: + build: + context: . + dockerfile: Dockerfile.dev + container_name: agentic_poc_dev + ports: + - "3000:3000" + volumes: + - .:/app + # Käännetään aina käynnistyksen yhteydessä varmuuden vuoksi Wasm uusimmista koodeista, ja päälle pyöräytetään Hub! + command: bash -c "cd node && wasm-pack build --target web --out-dir ../static/pkg && cd ../hub && cargo run" + + # Valinnainen natiivi-solmu — kerää oikeat laitteistotiedot (nvidia-smi-taso) + native-node: + build: + context: . + dockerfile: Dockerfile.native-node + container_name: kipina_native_node + environment: + - HUB_URL=ws://agentic-poc:3000/ws + - ALLOCATED_GB=4 + depends_on: + - agentic-poc + # GPU passthrough (valinnainen — toimii myös ilman) + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + profiles: + - native diff --git a/network-poc/hub/Cargo.toml b/network-poc/hub/Cargo.toml new file mode 100644 index 0000000..38d3398 --- /dev/null +++ b/network-poc/hub/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "hub" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = { version = "0.7.4", features = ["ws", "macros"] } +tokio = { version = "1.36.0", features = ["full"] } +tower-http = { version = "0.5.2", features = ["fs", "cors", "trace"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +uuid = { version = "1.7.0", features = ["v4", "serde"] } +futures = "0.3" diff --git a/network-poc/hub/src/main.rs b/network-poc/hub/src/main.rs new file mode 100644 index 0000000..a59d724 --- /dev/null +++ b/network-poc/hub/src/main.rs @@ -0,0 +1,426 @@ +use axum::{ + extract::ws::{Message, WebSocket, WebSocketUpgrade}, + extract::ConnectInfo, + response::IntoResponse, + routing::get, + Router, +}; +use futures::{sink::SinkExt, stream::StreamExt}; +use std::collections::HashMap; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; +use tower_http::services::ServeDir; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +const MAX_MESSAGE_SIZE: usize = 16 * 1024; + +// Sallitut originit — estää cross-site WebSocket hijackingin +const ALLOWED_ORIGINS: &[&str] = &[ + "https://kipina.studio", + "http://localhost:3000", + "http://127.0.0.1:3000", +]; + +// Sallitut viestityyypit clientilta +const ALLOWED_MSG_TYPES: &[&str] = &["auth", "result", "pair_done", "llm_chunk"]; + +struct AppState { + next_node_id: Mutex, + nodes_vram: Mutex>, + total_tasks: Mutex, + stats_tx: broadcast::Sender, + // IP-rajoitus: max 2 yhteyttä per IP (dashboard-UI + selainsolmu) + ip_connections: Mutex>, + // Node ID → IP -mappaus (siivousta varten) + node_ips: Mutex>, +} + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "hub=debug,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let (stats_tx, _) = broadcast::channel(100); + + let state = Arc::new(AppState { + next_node_id: Mutex::new(1), + nodes_vram: Mutex::new(HashMap::new()), + total_tasks: Mutex::new(0), + stats_tx: stats_tx.clone(), + ip_connections: Mutex::new(HashMap::new()), + node_ips: Mutex::new(HashMap::new()), + }); + + let state_for_task = state.clone(); + + // Ajastin, joka jakaa satunnaisia tekoälytehtäviä eri pituuksilla + tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + + // Kieliparit: sama semanttinen sisältö englanniksi ja suomeksi + let pairs: Vec<(&str, &str)> = vec![ + ("Tell me a joke.", "Kerro vitsi."), + ("What is Rust?", "Mikä on Rust?"), + ("Explain WebGPU briefly.", "Selitä WebGPU lyhyesti."), + ("It was a dark and stormy night, and the old sea captain began his tale:", "Oli synkkä ja myrskyinen yö, ja vanha merikapteeni aloitti tarinansa:"), + ("Artificial intelligence is transforming the world in many ways, but perhaps the most significant change is", "Tekoäly muuttaa maailmaa monella tavalla, mutta kenties merkittävin muutos on"), + ("Distributed computing in the browser is a fascinating concept because", "Hajautettu laskenta selaimessa on kiehtova konsepti, koska"), + ("By the year 2030, programmers will no longer write code by hand, instead they will", "Vuonna 2030 ohjelmoijat eivät enää kirjoita koodia käsin, vaan he"), + ("Imagine a world where every computer, phone, and tablet combines its processing power into one vast AI network. This future is closer than you think, because", "Kuvittele maailma, jossa jokainen tietokone, puhelin ja tabletti yhdistää prosessointivoimansa yhdeksi valtavaksi tekoälyverkoksi. Tämä tulevaisuus on lähempänä kuin uskotkaan, sillä"), + ("The open source movement has fundamentally changed how software is built. What started as a fringe philosophy has become the backbone of modern infrastructure, and the next frontier is", "Avoimen lähdekoodin liike on muuttanut perustavanlaatuisesti ohjelmistojen rakentamisen. Marginaalisesta filosofiasta on tullut modernin infrastruktuurin selkäranka, ja seuraava rajapyykki on"), + ]; + + let mut rng_state: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64; + + loop { + interval.tick().await; + rng_state ^= rng_state << 13; + rng_state ^= rng_state >> 7; + rng_state ^= rng_state << 17; + let idx = (rng_state as usize) % pairs.len(); + let (en, fi) = pairs[idx]; + + let task_msg = serde_json::json!({ + "type": "pair_task", + "en": en, + "fi": fi, + }); + tracing::debug!("Kielipari lähetetty: EN({}) vs FI({} merkkiä)", en.len(), fi.len()); + let _ = state_for_task.stats_tx.send(task_msg.to_string()); + } + }); + + let app = Router::new() + .nest_service("/", ServeDir::new("../static")) + .route("/ws", get(ws_handler)) + .with_state(state); + + let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); + tracing::debug!("Kipinä Agent Hub käynnistyy osoitteessa http://localhost:3000"); + + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ).await.unwrap(); +} + +async fn ws_handler( + ws: WebSocketUpgrade, + axum::extract::State(state): axum::extract::State>, + ConnectInfo(addr): ConnectInfo, + headers: axum::http::HeaderMap, +) -> impl IntoResponse { + // Origin-tarkistus — estää cross-site WebSocket hijackingin + if let Some(origin) = headers.get("origin").and_then(|v| v.to_str().ok()) { + if !ALLOWED_ORIGINS.iter().any(|&allowed| origin == allowed) { + tracing::warn!("Estetty yhteys väärällä originilla: {}", origin); + return ( + axum::http::StatusCode::FORBIDDEN, + "Origin not allowed", + ).into_response(); + } + } + // Origin puuttuu → natiivi-node (ei selainta), sallitaan + + let ip = headers.get("x-forwarded-for") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.split(',').next()) + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or_else(|| addr.ip()); + + // Max 2 yhteyttä per IP (dashboard-UI + selainsolmu) + { + let conns = state.ip_connections.lock().unwrap(); + let count = conns.get(&ip).copied().unwrap_or(0); + if count >= 2 { + tracing::warn!("IP {} ylitti yhteysrajan ({}/2) — estetty", ip, count); + return ( + axum::http::StatusCode::TOO_MANY_REQUESTS, + "Max 2 yhteyttä per IP", + ).into_response(); + } + } + + ws.max_message_size(MAX_MESSAGE_SIZE) + .on_upgrade(move |socket| handle_socket(socket, state, ip)) + .into_response() +} + +async fn broadcast_stats(state: &Arc) { + let total_nodes; + let mut total_vram = 0; + { + let map = state.nodes_vram.lock().unwrap(); + total_nodes = map.len(); + for (_, vram) in map.iter() { + total_vram += vram; + } + } + let completed = *state.total_tasks.lock().unwrap(); + let stats_msg = serde_json::json!({ + "type": "stats", + "nodes": total_nodes, + "vram_gb": total_vram, + "tasks": completed + }); + let _ = state.stats_tx.send(stats_msg.to_string()); +} + +/// Validoi client-viesti: pakollinen "type"-kenttä, sallittu tyyppi, validi JSON +fn validate_message(text: &str) -> Result { + let json: serde_json::Value = serde_json::from_str(text) + .map_err(|_| "Ei validi JSON")?; + + let msg_type = json.get("type") + .and_then(|v| v.as_str()) + .ok_or("Puuttuva 'type'-kenttä")?; + + if !ALLOWED_MSG_TYPES.contains(&msg_type) { + return Err("Tuntematon viestityyppi"); + } + + // Tyyppikohtainen validointi + match msg_type { + "auth" => { + // allocated_gb pitää olla järkevä (0-128) + if let Some(gb) = json.get("allocated_gb").and_then(|v| v.as_u64()) { + if gb > 128 { return Err("allocated_gb liian suuri"); } + } + } + "pair_done" => { + // Pitää sisältää en ja fi -objektit + if json.get("en").is_none() || json.get("fi").is_none() { + return Err("pair_done: puuttuu en/fi"); + } + // token_count pitää olla järkevä + for lang in &["en", "fi"] { + if let Some(tc) = json.get(lang).and_then(|l| l.get("token_count")).and_then(|v| v.as_u64()) { + if tc > 10000 { return Err("token_count liian suuri"); } + } + } + } + "result" => { + // data-kenttä pitää olla olemassa + if json.get("data").is_none() && json.get("status").is_none() { + return Err("result: puuttuu data/status"); + } + } + _ => {} + } + + Ok(json) +} + +async fn handle_socket(socket: WebSocket, state: Arc, ip: IpAddr) { + // Rekisteröidään IP-yhteys + { + let mut conns = state.ip_connections.lock().unwrap(); + *conns.entry(ip).or_insert(0) += 1; + } + + let (mut sender, mut receiver) = socket.split(); + + let node_id = { + let mut next_id = state.next_node_id.lock().unwrap(); + let id = *next_id; + *next_id += 1; + id + }; + + // Tallennetaan node_id → IP -mappaus + { + state.node_ips.lock().unwrap().insert(node_id, ip); + } + + tracing::info!("Solmu {} yhdistyi osoitteesta {}", node_id, ip); + + let mut rx = state.stats_tx.subscribe(); + + let sender_task = tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(msg) => { + if sender.send(Message::Text(msg)).await.is_err() { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + continue; + } + Err(_) => { + break; + } + } + } + }); + + // Receiver loop + while let Some(Ok(msg)) = receiver.next().await { + let text = match msg { + Message::Text(t) => t, + Message::Close(_) => break, + _ => continue, + }; + + if text.len() > MAX_MESSAGE_SIZE { + tracing::warn!("Solmu {} ({}) lähetti liian suuren viestin ({} tavua)", node_id, ip, text.len()); + continue; + } + + // Validointi + let json = match validate_message(&text) { + Ok(j) => j, + Err(reason) => { + tracing::warn!("Solmu {} ({}) lähetti virheellisen viestin: {} — {:?}", node_id, ip, reason, &text[..text.len().min(100)]); + continue; + } + }; + + let msg_type = json.get("type").and_then(|v| v.as_str()).unwrap_or(""); + + if msg_type == "auth" { + { + let allocated = json.get("allocated_gb").and_then(|v| v.as_u64()).unwrap_or(4) as u32; + let node_type = json.get("node_type").and_then(|v| v.as_str()).unwrap_or("browser"); + + { + let mut map = state.nodes_vram.lock().unwrap(); + map.insert(node_id, allocated); + } + + if node_type == "native" { + let sys = json.get("system"); + let hostname = sys.and_then(|s| s.get("hostname")).and_then(|v| v.as_str()).unwrap_or("?"); + let os = sys.and_then(|s| s.get("os")).and_then(|v| v.as_str()).unwrap_or("?"); + let cores = sys.and_then(|s| s.get("cpu_cores")).and_then(|v| v.as_u64()).unwrap_or(0); + let ram = sys.and_then(|s| s.get("ram_total_mb")).and_then(|v| v.as_u64()).unwrap_or(0); + + tracing::info!( + "Solmu {} (natiivi) | {} | {} | {} | {} ydintä | {} MB RAM | varaus: {} GB", + node_id, ip, hostname, os, cores, ram, allocated + ); + + if let Some(gpus) = json.get("gpus").and_then(|v| v.as_array()) { + for gpu in gpus { + tracing::info!( + " GPU {}: {} | VRAM: {}/{} MB | {}°C | {}%", + gpu["index"].as_u64().unwrap_or(0), + gpu["name"].as_str().unwrap_or("?"), + gpu["vram_used_mb"].as_u64().unwrap_or(0), + gpu["vram_total_mb"].as_u64().unwrap_or(0), + gpu["temperature_c"].as_u64().unwrap_or(0), + gpu["gpu_util_pct"].as_u64().unwrap_or(0), + ); + } + } + } else { + let cores = json.get("cpu_cores").and_then(|v| v.as_u64()).unwrap_or(0); + let ram = json.get("device_memory_gb").and_then(|v| v.as_f64()).unwrap_or(0.0); + let platform = json.get("platform").and_then(|v| v.as_str()).unwrap_or("?"); + let gpu_desc = json.get("gpu") + .and_then(|g| g.get("description").or_else(|| g.get("vendor"))) + .and_then(|v| v.as_str()) + .unwrap_or("ei GPU:ta"); + + tracing::info!( + "Solmu {} (selain) | {} | {} | {} ydintä | ~{} GB RAM | GPU: {} | varaus: {} GB", + node_id, ip, platform, cores, ram, gpu_desc, allocated + ); + } + } + broadcast_stats(&state).await; + + let join_msg = serde_json::json!({ + "type": "node_joined", + "node_id": node_id + }); + let _ = state.stats_tx.send(join_msg.to_string()); + + } else if msg_type == "result" { + tracing::info!("Solmu {} sai tuloksen: {}", node_id, text); + { + let mut task_count = state.total_tasks.lock().unwrap(); + *task_count += 1; + } + broadcast_stats(&state).await; + } else if msg_type == "pair_done" { + { + let mut json = json; // Siirretään omistajuus muokkausta varten + if let Some(obj) = json.as_object_mut() { + let empty = serde_json::json!({}); + let en = obj.get("en").unwrap_or(&empty); + let fi = obj.get("fi").unwrap_or(&empty); + let overhead = obj.get("overhead_pct").and_then(|v| v.as_f64()).unwrap_or(0.0); + let duration = obj.get("duration_ms").and_then(|v| v.as_u64()).unwrap_or(0); + + let en_text = en.get("text").and_then(|v| v.as_str()).unwrap_or(""); + let en_tokens = en.get("token_count").and_then(|v| v.as_u64()).unwrap_or(0); + let en_chars = en.get("char_count").and_then(|v| v.as_u64()).unwrap_or(0); + let en_cpt = en.get("chars_per_token").and_then(|v| v.as_f64()).unwrap_or(0.0); + + let fi_text = fi.get("text").and_then(|v| v.as_str()).unwrap_or(""); + let fi_tokens = fi.get("token_count").and_then(|v| v.as_u64()).unwrap_or(0); + let fi_chars = fi.get("char_count").and_then(|v| v.as_u64()).unwrap_or(0); + let fi_cpt = fi.get("chars_per_token").and_then(|v| v.as_f64()).unwrap_or(0.0); + + let overhead_color = if overhead > 10.0 { "\x1b[31m" } else if overhead < -10.0 { "\x1b[32m" } else { "\x1b[33m" }; + + println!(); + println!("\x1b[36m━━━ Solmu {} ━━━ {}ms ━━━\x1b[0m", node_id, duration); + println!(" \x1b[34mEN\x1b[0m \"{}\"", en_text); + println!(" {} merkkiä → \x1b[35m{} tokenia\x1b[0m | \x1b[32m{:.2} merkkiä/token\x1b[0m", en_chars, en_tokens, en_cpt); + println!(" \x1b[33mFI\x1b[0m \"{}\"", fi_text); + println!(" {} merkkiä → \x1b[35m{} tokenia\x1b[0m | \x1b[32m{:.2} merkkiä/token\x1b[0m", fi_chars, fi_tokens, fi_cpt); + println!(" {}Suomen ylikustannus: {:+.1}%\x1b[0m", overhead_color, overhead); + + obj.insert("node_id".to_string(), serde_json::json!(node_id)); + } + let _ = state.stats_tx.send(json.to_string()); + + { + let mut task_count = state.total_tasks.lock().unwrap(); + *task_count += 1; + } + broadcast_stats(&state).await; + } + } else if msg_type == "llm_chunk" { + { + let mut json = json; + if let Some(obj) = json.as_object_mut() { + obj.insert("node_id".to_string(), serde_json::json!(node_id)); + } + let _ = state.stats_tx.send(json.to_string()); + } + } + } + + // Yhteys katkesi — siivotaan IP-laskuri ja node-tiedot + { + let mut conns = state.ip_connections.lock().unwrap(); + if let Some(count) = conns.get_mut(&ip) { + *count = count.saturating_sub(1); + if *count == 0 { + conns.remove(&ip); + } + } + } + { + state.node_ips.lock().unwrap().remove(&node_id); + } + { + state.nodes_vram.lock().unwrap().remove(&node_id); + } + tracing::info!("Solmu {} ({}) poistui verkosta.", node_id, ip); + broadcast_stats(&state).await; + sender_task.abort(); +} diff --git a/network-poc/native-node/Cargo.toml b/network-poc/native-node/Cargo.toml new file mode 100644 index 0000000..7db596c --- /dev/null +++ b/network-poc/native-node/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "native-node" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1.36", features = ["full"] } +tokio-tungstenite = { version = "0.21", features = ["native-tls"] } +futures-util = "0.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +sysinfo = "0.30" +nvml-wrapper = "0.10" +wgpu = "24" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/network-poc/native-node/src/main.rs b/network-poc/native-node/src/main.rs new file mode 100644 index 0000000..411082b --- /dev/null +++ b/network-poc/native-node/src/main.rs @@ -0,0 +1,319 @@ +use futures_util::{SinkExt, StreamExt}; +use serde_json::json; +use sysinfo::System; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +/// GPU-tietorakenne — yhtenäinen kaikille valmistajille +struct GpuInfo { + name: String, + vendor: String, + backend: String, // "Vulkan", "Metal", "Dx12" + vram_total_mb: Option, + vram_used_mb: Option, + vram_free_mb: Option, + temperature_c: Option, + gpu_util_pct: Option, +} + +impl GpuInfo { + fn to_json(&self) -> serde_json::Value { + json!({ + "name": self.name, + "vendor": self.vendor, + "backend": self.backend, + "vram_total_mb": self.vram_total_mb, + "vram_used_mb": self.vram_used_mb, + "vram_free_mb": self.vram_free_mb, + "temperature_c": self.temperature_c, + "gpu_util_pct": self.gpu_util_pct, + }) + } +} + +/// Tunnistaa kaikki GPU:t wgpu:lla (NVIDIA/AMD/Apple/Intel) +fn collect_gpus_wgpu() -> Vec { + let instance = wgpu::Instance::new(&wgpu::InstanceDescriptor { + backends: wgpu::Backends::all(), + ..Default::default() + }); + + let mut gpus = Vec::new(); + + for adapter in instance.enumerate_adapters(wgpu::Backends::all()) { + let info = adapter.get_info(); + + // Ohitetaan CPU/software-adapterit ja OpenGL (duplikaatti) + if info.device_type == wgpu::DeviceType::Cpu { + continue; + } + if info.backend == wgpu::Backend::Gl { + continue; + } + + let vendor = match info.vendor { + 0x10DE => "NVIDIA", + 0x1002 => "AMD", + 0x8086 => "Intel", + _ => "Unknown", + }; + + let backend = match info.backend { + wgpu::Backend::Vulkan => "Vulkan", + wgpu::Backend::Metal => "Metal", + wgpu::Backend::Dx12 => "Dx12", + wgpu::Backend::Gl => "OpenGL", + _ => "?", + }; + + gpus.push(GpuInfo { + name: info.name.clone(), + vendor: vendor.to_string(), + backend: backend.to_string(), + // wgpu ei anna tarkkaa VRAM-dataa — täydennetään NVML:llä jos NVIDIA + vram_total_mb: None, + vram_used_mb: None, + vram_free_mb: None, + temperature_c: None, + gpu_util_pct: None, + }); + } + + gpus +} + +/// Täydentää NVIDIA-GPU:iden tiedot NVML:llä (VRAM, lämpötila, kuormitus) +fn enrich_nvidia_gpus(gpus: &mut [GpuInfo]) { + let Ok(nvml) = nvml_wrapper::Nvml::init() else { return }; + let Ok(count) = nvml.device_count() else { return }; + + // Yhdistetään NVML-laitteet wgpu-tunnisteisiin nimen perusteella + for i in 0..count { + let Ok(device) = nvml.device_by_index(i) else { continue }; + let nvml_name = device.name().unwrap_or_default(); + + // Etsitään vastaava GPU wgpu-listasta + if let Some(gpu) = gpus.iter_mut().find(|g| g.vendor == "NVIDIA" && g.name.contains(&nvml_name) || nvml_name.contains(&g.name)) { + if let Ok(mem) = device.memory_info() { + gpu.vram_total_mb = Some(mem.total / 1024 / 1024); + gpu.vram_used_mb = Some(mem.used / 1024 / 1024); + gpu.vram_free_mb = Some(mem.free / 1024 / 1024); + } + gpu.temperature_c = device.temperature(nvml_wrapper::enum_wrappers::device::TemperatureSensor::Gpu).ok(); + if let Ok(util) = device.utilization_rates() { + gpu.gpu_util_pct = Some(util.gpu); + } + } + } +} + +/// AMD GPU-tiedot Linuxin sysfs:stä (/sys/class/drm/) +fn enrich_amd_gpus(gpus: &mut [GpuInfo]) { + let Ok(entries) = std::fs::read_dir("/sys/class/drm") else { return }; + + for entry in entries.flatten() { + let path = entry.path(); + let name = path.file_name().unwrap_or_default().to_string_lossy().to_string(); + + // Vain renderD* tai card*-kansiot joissa on device/vendor + if !name.starts_with("card") || name.contains('-') { continue } + + let device_path = path.join("device"); + + // Tarkistetaan onko AMD (vendor 0x1002) + let vendor = std::fs::read_to_string(device_path.join("vendor")).unwrap_or_default(); + if !vendor.trim().contains("0x1002") { continue } + + // VRAM (mem_info_vram_total on tavuissa) + let vram_total = read_sysfs_u64(&device_path.join("mem_info_vram_total")); + let vram_used = read_sysfs_u64(&device_path.join("mem_info_vram_used")); + + // Lämpötila (hwmon) + let temp = find_hwmon_temp(&device_path); + + // GPU-kuormitus + let busy = read_sysfs_u64(&device_path.join("gpu_busy_percent")); + + // Etsitään vastaava GPU wgpu-listasta + if let Some(gpu) = gpus.iter_mut().find(|g| g.vendor == "AMD" && g.vram_total_mb.is_none()) { + gpu.vram_total_mb = vram_total.map(|v| v / 1024 / 1024); + gpu.vram_used_mb = vram_used.map(|v| v / 1024 / 1024); + gpu.vram_free_mb = match (vram_total, vram_used) { + (Some(t), Some(u)) => Some((t - u) / 1024 / 1024), + _ => None, + }; + gpu.temperature_c = temp.map(|t| t as u32); + gpu.gpu_util_pct = busy.map(|b| b as u32); + } + } +} + +fn read_sysfs_u64(path: &std::path::Path) -> Option { + std::fs::read_to_string(path).ok()?.trim().parse().ok() +} + +fn find_hwmon_temp(device_path: &std::path::Path) -> Option { + let hwmon_dir = device_path.join("hwmon"); + let entries = std::fs::read_dir(&hwmon_dir).ok()?; + for entry in entries.flatten() { + let temp_path = entry.path().join("temp1_input"); + if let Some(millideg) = read_sysfs_u64(&temp_path) { + return Some(millideg / 1000); // millidegrees → degrees + } + } + None +} + +/// Apple GPU-tiedot — wgpu/Metal antaa nimen, tarkempaa dataa ei saa ilman IOKit:ia +/// mutta Metal adapter_info sisältää jo olennaiset tiedot +fn enrich_apple_gpus(gpus: &mut [GpuInfo]) { + // Apple Silicon -koneiden unified memory: koko RAM on GPU:n käytettävissä + // Arvioidaan system RAM:sta + if gpus.iter().any(|g| g.vendor == "Unknown" && g.name.contains("Apple")) { + let mut sys = System::new(); + sys.refresh_memory(); + let total_ram_mb = sys.total_memory() / 1024 / 1024; + + for gpu in gpus.iter_mut().filter(|g| g.name.contains("Apple")) { + gpu.vendor = "Apple".to_string(); + // Apple Silicon: unified memory, GPU voi käyttää ~75% kokonaismuistista + gpu.vram_total_mb = Some(total_ram_mb * 3 / 4); + // Tarkkaa käyttö- ja lämpötiladataa ei saa ilman IOKit:ia + } + } +} + +/// Kerää kaikki GPU:t ja täydentää valmistajakohtaiset tiedot +fn collect_all_gpus() -> Vec { + let mut gpus = collect_gpus_wgpu(); + + enrich_nvidia_gpus(&mut gpus); + enrich_amd_gpus(&mut gpus); + enrich_apple_gpus(&mut gpus); + + gpus +} + +/// Kerää järjestelmätiedot (CPU, RAM, OS) +fn collect_system_info() -> serde_json::Value { + let mut sys = System::new_all(); + sys.refresh_all(); + + json!({ + "hostname": System::host_name().unwrap_or_default(), + "os": format!("{} {}", System::name().unwrap_or_default(), System::os_version().unwrap_or_default()), + "cpu_cores": sys.cpus().len(), + "cpu_model": sys.cpus().first().map(|c| c.brand().to_string()).unwrap_or_default(), + "ram_total_mb": sys.total_memory() / 1024 / 1024, + "ram_used_mb": sys.used_memory() / 1024 / 1024, + }) +} + +/// Koko auth-viesti hubille +fn build_auth_message(allocated_gb: u32) -> String { + let sys = collect_system_info(); + let gpus = collect_all_gpus(); + + let gpu_json: Vec = gpus.iter().enumerate().map(|(i, g)| { + let mut v = g.to_json(); + v.as_object_mut().unwrap().insert("index".to_string(), json!(i)); + v + }).collect(); + + let mut msg = json!({ + "type": "auth", + "status": "agent_ready", + "node_type": "native", + "allocated_gb": allocated_gb, + "system": sys, + }); + + if !gpu_json.is_empty() { + msg.as_object_mut().unwrap().insert("gpus".to_string(), json!(gpu_json)); + } + + msg.to_string() +} + +fn format_optional(val: Option, suffix: &str) -> String { + match val { + Some(v) => format!("{}{}", v, suffix), + None => "?".to_string(), + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .with_env_filter("native_node=debug") + .init(); + + let hub_url = std::env::var("HUB_URL").unwrap_or_else(|_| "ws://hub:3000/ws".to_string()); + let allocated_gb: u32 = std::env::var("ALLOCATED_GB") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(4); + + tracing::info!("Kipinä Native Node käynnistyy — hub: {}, varaus: {} GB", hub_url, allocated_gb); + + let sys = collect_system_info(); + tracing::info!("Järjestelmä: {} | {} | {} ydintä | {} MB RAM", + sys["hostname"].as_str().unwrap_or("?"), + sys["os"].as_str().unwrap_or("?"), + sys["cpu_cores"], + sys["ram_total_mb"] + ); + + let gpus = collect_all_gpus(); + if gpus.is_empty() { + tracing::info!("GPU:ta ei havaittu — toimitaan CPU-moodissa"); + } else { + for (i, gpu) in gpus.iter().enumerate() { + tracing::info!("GPU {}: {} ({}) [{}] | VRAM: {}/{} MB | {} | kuormitus: {}", + i, + gpu.name, + gpu.vendor, + gpu.backend, + format_optional(gpu.vram_used_mb, ""), + format_optional(gpu.vram_total_mb, ""), + format_optional(gpu.temperature_c, "°C"), + format_optional(gpu.gpu_util_pct, "%"), + ); + } + } + + // Yhdistetään hubiin — yritetään uudelleen katkon sattuessa + loop { + match connect_async(&hub_url).await { + Ok((ws_stream, _)) => { + tracing::info!("Yhdistetty hubiin!"); + let (mut write, mut read) = ws_stream.split(); + + let auth = build_auth_message(allocated_gb); + if write.send(Message::Text(auth)).await.is_err() { + tracing::error!("Auth-viestin lähetys epäonnistui"); + continue; + } + + while let Some(Ok(msg)) = read.next().await { + if let Message::Text(text) = msg { + if text.contains("pair_task") || text.contains("ai_task") { + tracing::debug!("Tehtävä vastaanotettu: {}", &text[..text.len().min(80)]); + let reply = json!({ + "type": "result", + "status": "success", + "data": "native-node: ei vielä laskentaa" + }); + let _ = write.send(Message::Text(reply.to_string())).await; + } + } + } + tracing::warn!("Yhteys hubiin katkesi — yritetään uudelleen 5s..."); + } + Err(e) => { + tracing::warn!("Hubiin yhdistäminen epäonnistui: {} — yritetään uudelleen 5s...", e); + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } +} diff --git a/network-poc/node/Cargo.toml b/network-poc/node/Cargo.toml new file mode 100644 index 0000000..c52ea08 --- /dev/null +++ b/network-poc/node/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "node" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wasm-bindgen = "0.2.91" +js-sys = "0.3.68" +web-sys = { version = "0.3.68", features = [ + "Window", + "Document", + "HtmlElement", + "WebSocket", + "MessageEvent", + "console", +] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +burn = { version = "0.14.0", features = ["wgpu", "ndarray"] } +burn-wgpu = "0.14.0" +burn-ndarray = "0.14.0" +wasm-bindgen-futures = "0.4" +console_error_panic_hook = "0.1.7" +reqwest = { version = "0.12", default-features = false, features = ["json"] } +tokenizers = { version = "0.19.1", default-features = false, features = ["unstable_wasm"] } +rexie = "0.6" +log = "0.4" + diff --git a/network-poc/node/src/lib.rs b/network-poc/node/src/lib.rs new file mode 100644 index 0000000..a138e53 --- /dev/null +++ b/network-poc/node/src/lib.rs @@ -0,0 +1,235 @@ +use wasm_bindgen::prelude::*; +use web_sys::{console, WebSocket, MessageEvent}; +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; +use burn::tensor::Tensor; +use burn::backend::{Wgpu, NdArray}; + +pub mod storage; + +macro_rules! console_log { + ($($t:tt)*) => (console::log_1(&format_args!($($t)*).to_string().into())) +} + +// Globaali muuttuja GPU Load Sliderille (25-100%) +static GPU_LOAD_PERCENT: AtomicU32 = AtomicU32::new(50); +// Onko WebGPU käytettävissä — asetetaan JS-puolelta käynnistyksessä +static HAS_WEBGPU: AtomicBool = AtomicBool::new(true); + +#[wasm_bindgen] +pub fn set_gpu_load(load: u32) { + GPU_LOAD_PERCENT.store(load, Ordering::SeqCst); + console_log!("[Wasm] GPU Kuormitusraja vaihdettu -> {}%", load); +} + +// Asynkroninen odotus WebAssemblylle +async fn sleep_ms(ms: i32) { + let promise = js_sys::Promise::new(&mut |resolve, _| { + web_sys::window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, ms) + .unwrap(); + }); + let _ = wasm_bindgen_futures::JsFuture::from(promise).await; +} + +// Geneerinen tensorilaskenta — toimii millä tahansa Burn-backendillä +fn run_matmul(size: usize) -> String { + let device = Default::default(); + let dist = burn::tensor::Distribution::Default; + let t1: Tensor = Tensor::random([size, size], dist, &device); + let t2: Tensor = Tensor::random([size, size], dist, &device); + let sum = t1.matmul(t2).sum(); + format!("{:?}", sum) +} + +// Päättelyfunktio — valitsee backendin automaattisesti +async fn run_ai_tensor_inference(difficulty: usize) -> String { + let load_pct = GPU_LOAD_PERCENT.load(Ordering::SeqCst); + + if load_pct == 0 { + sleep_ms(2000).await; + return format!("Paused (0%). Lepäillään zZz.."); + } + + let active_workload_size = (difficulty as f32 * (load_pct as f32 / 100.0)) as usize; + + let sleep_delay = (100 - load_pct) * 10; + if sleep_delay > 0 { + sleep_ms(sleep_delay as i32).await; + } + + let use_gpu = HAS_WEBGPU.load(Ordering::SeqCst); + let (backend_name, result) = if use_gpu { + ("WebGPU", run_matmul::(active_workload_size)) + } else { + ("CPU/NdArray", run_matmul::(active_workload_size)) + }; + + format!("PoC {} Matmul ({}x{}) >> {}", backend_name, active_workload_size, active_workload_size, result) +} + +/// Tokenisoi yhden tekstin ja palauttaa metriikat +fn tokenize_text(tokenizer: &tokenizers::Tokenizer, text: &str) -> serde_json::Value { + let char_count = text.chars().count(); + let word_count = text.split_whitespace().count(); + + if let Ok(encoding) = tokenizer.encode(text, true) { + let token_count = encoding.get_ids().len(); + let cpt = if token_count > 0 { char_count as f32 / token_count as f32 } else { 0.0 }; + let tokens: Vec = encoding.get_ids().iter().filter_map(|&id| { + tokenizer.decode(&[id], true).ok() + }).collect(); + + serde_json::json!({ + "text": text, + "char_count": char_count, + "word_count": word_count, + "token_count": token_count, + "chars_per_token": (cpt * 100.0).round() / 100.0, + "tokens": tokens, + }) + } else { + serde_json::json!({ + "text": text, + "char_count": char_count, + "word_count": word_count, + "token_count": word_count, + "chars_per_token": 0, + "tokens": [], + }) + } +} + +/// Tokenisoi en/fi-parin, vertaa tehokkuutta ja lähettää tuloksen hubille +async fn run_pair_comparison(en_text: String, fi_text: String, ws: Rc>) { + let load_pct = GPU_LOAD_PERCENT.load(Ordering::SeqCst); + if load_pct == 0 { return; } + + let cached_tok = storage::load_from_idb("tokenizer.json").await.unwrap_or(None); + let Some(bytes) = cached_tok else { + console_log!("[Tokenizer] Ei vielä ladattu — ohitetaan pari"); + return; + }; + let Ok(tokenizer) = tokenizers::Tokenizer::from_bytes(&bytes) else { + console_log!("[Tokenizer] Parsinta epäonnistui"); + return; + }; + + let start_time = js_sys::Date::now(); + let en_result = tokenize_text(&tokenizer, &en_text); + let fi_result = tokenize_text(&tokenizer, &fi_text); + let duration = (js_sys::Date::now() - start_time) as u64; + + let en_cpt = en_result["chars_per_token"].as_f64().unwrap_or(0.0); + let fi_cpt = fi_result["chars_per_token"].as_f64().unwrap_or(0.0); + let en_tokens = en_result["token_count"].as_u64().unwrap_or(0); + let fi_tokens = fi_result["token_count"].as_u64().unwrap_or(0); + + // Token-ylikustannus: kuinka monta % enemmän tokeneita suomi tarvitsee + let overhead_pct = if en_tokens > 0 { + ((fi_tokens as f64 / en_tokens as f64) - 1.0) * 100.0 + } else { 0.0 }; + + console_log!("EN: {} tokenia ({:.2} m/t) vs FI: {} tokenia ({:.2} m/t) | ylikustannus: {:.0}%", + en_tokens, en_cpt, fi_tokens, fi_cpt, overhead_pct); + + let pair_done = serde_json::json!({ + "type": "pair_done", + "en": en_result, + "fi": fi_result, + "overhead_pct": (overhead_pct * 10.0).round() / 10.0, + "duration_ms": duration, + "tokenizer": "Qwen2.5-Coder-0.5B", + }); + let _ = ws.borrow().send_with_str(&pair_done.to_string()); +} + +#[wasm_bindgen] +pub async fn start_agent_node(hub_url: String, has_webgpu: bool, device_info_json: String) -> Result<(), JsValue> { + console_error_panic_hook::set_once(); + + HAS_WEBGPU.store(has_webgpu, Ordering::SeqCst); + let backend_name = if has_webgpu { "WebGPU" } else { "CPU (NdArray)" }; + console_log!("Kipinä Agent Node käynnistyy — backend: {}", backend_name); + + let device_info = device_info_json.clone(); + + wasm_bindgen_futures::spawn_local(async move { + console_log!("[Storage] Tarkistetaan IndexedDB Qwen2.5-Coder Tokenizeria..."); + let cached_tokenizer = storage::load_from_idb("tokenizer.json").await.unwrap_or(None); + if let Some(tok_bytes) = cached_tokenizer { + console_log!("[Storage] Tokenizer löytyi välimuistista! Koko: {} tavua", tok_bytes.len()); + } else { + console_log!("[Storage] Ei välimuistia. Ladataan HF:stä... Odota selaimen Network-välilehdellä."); + if let Ok(resp) = reqwest::get("https://huggingface.co/Qwen/Qwen2.5-Coder-0.5B/resolve/main/tokenizer.json").await { + if let Ok(bytes) = resp.bytes().await { + console_log!("[Storage] Tallennetaan {}-tavuinen tiedosto IndexedDB:hen pysyvästi...", bytes.len()); + let _ = storage::save_to_idb("tokenizer.json", &bytes).await; + console_log!("[Storage] Tallennettu!"); + } + } + } + }); + + let ws = WebSocket::new(&hub_url)?; + let ws_clone = Rc::new(RefCell::new(ws)); + let ws_clone_2 = ws_clone.clone(); + + let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| { + if let Ok(txt) = e.data().dyn_into::() { + let msg: String = txt.into(); + + if msg.contains("pair_task") { + if let Ok(task) = serde_json::from_str::(&msg) { + let en = task.get("en").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let fi = task.get("fi").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if !en.is_empty() && !fi.is_empty() { + let ws_for_async = ws_clone.clone(); + wasm_bindgen_futures::spawn_local(async move { + run_pair_comparison(en, fi, ws_for_async).await; + }); + } + } + } else if msg.contains("ai_task") { + console_log!("Hub task vastaanotettu, ajetaan GPU:lla..."); + let ws_for_async = ws_clone.clone(); + let diff = if msg.contains(r#""difficulty":1024"#) { 1024 } else { 512 }; + + // Suoritetaan inference asynkronisesti erillisessä taaskissa välttääksemme UI-jäätymisen kokonaan + wasm_bindgen_futures::spawn_local(async move { + let result = run_ai_tensor_inference(diff).await; + let reply = format!("{{\"type\":\"result\", \"status\":\"success\", \"data\":\"{}\"}}", result); + let _ = ws_for_async.borrow().send_with_str(&reply); + }); + } else if msg.contains("stats") { + // Sivuutetaan statsit täällä, UI hallitsee ne aivan itse HTML:n puolella + } + } + }) as Box); + + ws_clone_2.borrow().set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + onmessage_callback.forget(); + + let ws_clone_3 = ws_clone_2.clone(); + let onopen_callback = Closure::wrap(Box::new(move |_| { + console_log!("Yhteys Hubiin avattu!"); + // Parsitaan device_info ja lisätään auth-kenttiin + let auth_msg = if let Ok(mut info) = serde_json::from_str::(&device_info) { + if let Some(obj) = info.as_object_mut() { + obj.insert("type".to_string(), serde_json::json!("auth")); + obj.insert("status".to_string(), serde_json::json!("agent_ready")); + } + info.to_string() + } else { + r#"{"type":"auth","status":"agent_ready","allocated_gb":4}"#.to_string() + }; + let _ = ws_clone_3.borrow().send_with_str(&auth_msg); + }) as Box); + + ws_clone_2.borrow().set_onopen(Some(onopen_callback.as_ref().unchecked_ref())); + onopen_callback.forget(); + + Ok(()) +} diff --git a/network-poc/node/src/storage.rs b/network-poc/node/src/storage.rs new file mode 100644 index 0000000..c3af718 --- /dev/null +++ b/network-poc/node/src/storage.rs @@ -0,0 +1,62 @@ +use rexie::{ObjectStore, Rexie, TransactionMode}; +use js_sys::Uint8Array; +use wasm_bindgen::JsValue; + +const DB_NAME: &str = "kipina_qwen_db"; +const STORE_NAME: &str = "weights_store"; + +/// Kytketään yhteys IndexedDB:hen (tai luodaan store jos sitä ei ole) +pub async fn get_db() -> Result { + Rexie::builder(DB_NAME) + .version(1) + .add_object_store(ObjectStore::new(STORE_NAME)) + .build() + .await +} + +/// Tallennetaan binääridata (esim. .safetensors lohko tai tokenizer.json string) IndexedDB-välimuistiin +pub async fn save_to_idb(key: &str, data: &[u8]) -> Result<(), String> { + let db = get_db().await.map_err(|e| format!("DB Error: {}", e))?; + let transaction = db + .transaction(&[STORE_NAME], TransactionMode::ReadWrite) + .map_err(|e| format!("Tx Error: {}", e))?; + + let store = transaction.store(STORE_NAME).map_err(|e| format!("Store Error: {}", e))?; + + // Konvertoidaan Rust u8-taulukko JS Uint8Array:ksi, joka on turvallisin blob IDB:lle + let js_data = Uint8Array::from(data); + + store.put(&js_data, Some(&JsValue::from_str(key))) + .await + .map_err(|e| format!("Put Error: {:?}", e))?; + + transaction.done().await.map_err(|e| format!("Done Error: {}", e))?; + Ok(()) +} + +/// Haetaan tallennettu data IndexedDB:stä key-arvon perusteella +pub async fn load_from_idb(key: &str) -> Result>, String> { + let db = get_db().await.map_err(|e| format!("DB Error: {}", e))?; + let transaction = db + .transaction(&[STORE_NAME], TransactionMode::ReadOnly) + .map_err(|e| format!("Tx Error: {}", e))?; + + let store = transaction.store(STORE_NAME).map_err(|e| format!("Store Error: {}", e))?; + let js_val_req = store.get(JsValue::from_str(key)).await.map_err(|e| format!("Get Error: {:?}", e))?; + + let js_val = match js_val_req { + Some(val) => val, + None => return Ok(None), + }; + + if js_val.is_undefined() || js_val.is_null() { + return Ok(None); + } + + // Ladataan JS muisti-blockista suoraan Rustin Veg:giksi + let arr = Uint8Array::new(&js_val); + let mut vec = vec![0; arr.length() as usize]; + arr.copy_to(&mut vec); + + Ok(Some(vec)) +} diff --git a/network-poc/static/index.html b/network-poc/static/index.html new file mode 100644 index 0000000..9c0f5fb --- /dev/null +++ b/network-poc/static/index.html @@ -0,0 +1,456 @@ + + + + + + Kipinä Agent Dashboard + + + +
+

Kipinä Agent Dashboard

+

Hajautettu WebGPU Laskentaverkko

+ + +
+
+

0

+

Aktiivisia Nodeja

+
+
+

0

+

Verkossa Suoritettua Tehtävää (Globaali)

+
+
+

0 GB

+

Verkon yhteis-VRAM

+
+
+ +
+ +
+ +
+ + +
+ + + + diff --git a/network-poc/target-native/.rustc_info.json b/network-poc/target-native/.rustc_info.json new file mode 100644 index 0000000..d622a1c --- /dev/null +++ b/network-poc/target-native/.rustc_info.json @@ -0,0 +1 @@ +{"rustc_fingerprint":15841952146704291179,"outputs":{"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.94.1 (e408947bf 2026-03-25)\nbinary: rustc\ncommit-hash: e408947bfd200af42db322daf0fadfe7e26d3bd1\ncommit-date: 2026-03-25\nhost: x86_64-unknown-linux-gnu\nrelease: 1.94.1\nLLVM version: 21.1.8\n","stderr":""},"7971740275564407648":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/jaakko/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""}},"successes":{}} \ No newline at end of file diff --git a/network-poc/target-native/CACHEDIR.TAG b/network-poc/target-native/CACHEDIR.TAG new file mode 100644 index 0000000..20d7c31 --- /dev/null +++ b/network-poc/target-native/CACHEDIR.TAG @@ -0,0 +1,3 @@ +Signature: 8a477f597d28d172789f06886806bc55 +# This file is a cache directory tag created by cargo. +# For information about cache directory tags see https://bford.info/cachedir/