#!/usr/bin/env python3 """Kipinä Template Pipeline — rakennuspalaset + LLM-täydennys. LLM generoi vain JSON-speksin (entiteetit, kentät, tyypit). Koodi kootaan mekaanisesti valmiista pohjista. """ import json, re, ast, sys, time, textwrap, subprocess from pathlib import Path from urllib.request import urlopen, Request OLLAMA = "http://localhost:11434" MODEL = "qwen2.5-coder:7b-instruct-q4_K_M" OUTDIR = Path(__file__).parent / "template_runs" MAX_RETRIES = 3 # ── Ollama-kutsu ────────────────────────────────────────────── def llm(system: str, user: str, temperature: float = 0.1) -> str: body = json.dumps({ "model": MODEL, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user}, ], "stream": False, "options": {"temperature": temperature, "num_predict": 4096}, }).encode() req = Request(f"{OLLAMA}/api/chat", data=body, headers={"Content-Type": "application/json"}) resp = json.loads(urlopen(req, timeout=120).read()) return resp["message"]["content"] def extract_json(text: str) -> dict | None: """Poimi JSON LLM-vastauksesta.""" # Etsi JSON-blokki fenceistä m = re.search(r"```(?:json)?\s*\n(.*?)```", text, re.DOTALL) if m: text = m.group(1).strip() # Etsi ensimmäinen { ... } blokki depth = 0 start = None for i, c in enumerate(text): if c == '{': if depth == 0: start = i depth += 1 elif c == '}': depth -= 1 if depth == 0 and start is not None: try: return json.loads(text[start:i+1]) except json.JSONDecodeError: continue return None # ══════════════════════════════════════════════════════════════ # VAIHE 1: LLM generoi JSON-speksin # ══════════════════════════════════════════════════════════════ SPEC_SYSTEM = textwrap.dedent("""\ You are a software architect. Given a project description, output a JSON specification. Output ONLY valid JSON, no explanations. Follow this exact schema: ```json { "project_name": "short-name", "description": "One sentence about the project", "entities": [ { "name": "Todo", "table_name": "todos", "fields": [ {"name": "title", "sa_type": "String(255)", "py_type": "str", "nullable": false, "default": null}, {"name": "description", "sa_type": "Text", "py_type": "str | None", "nullable": true, "default": null}, {"name": "due_date", "sa_type": "Date", "py_type": "date | None", "nullable": true, "default": null}, {"name": "status", "sa_type": "String(20)", "py_type": "str", "nullable": false, "default": "pending"} ] } ], "extra_imports": ["from datetime import date"] } ``` RULES: - sa_type: SQLAlchemy column type (String(N), Text, Integer, Date, DateTime, Boolean, Float) - py_type: Python type hint for Pydantic (str, int, float, bool, date, datetime, str | None, etc.) - Do NOT use Enum — use String(20) with a default value for status fields - nullable: true means the field is optional - default: null means no default, otherwise a string/number value - extra_imports: stdlib imports needed in schemas.py (e.g. "from datetime import date" if Date fields exist) - entity name: PascalCase singular (Todo, User, Product) - table_name: snake_case plural (todos, users, products) - Keep it simple: 1-2 entities max, 3-7 fields each""") def generate_spec(project_description: str) -> dict | None: """Pyydä LLM:ltä JSON-speksi.""" for attempt in range(1, MAX_RETRIES + 1): print(f" Speksi yritys {attempt}/{MAX_RETRIES}...", end=" ", flush=True) t0 = time.time() raw = llm(SPEC_SYSTEM, project_description) elapsed = time.time() - t0 print(f"({elapsed:.1f}s)", end=" ") spec = extract_json(raw) if spec and "entities" in spec and len(spec["entities"]) > 0: # Validoi rakenne valid = True for entity in spec["entities"]: if not entity.get("name") or not entity.get("fields"): valid = False for field in entity.get("fields", []): if not field.get("name") or not field.get("sa_type") or not field.get("py_type"): valid = False if valid: print("✅") return spec print("❌ virheellinen JSON") return None # ══════════════════════════════════════════════════════════════ # VAIHE 2: Koodigenerointi templateista # ══════════════════════════════════════════════════════════════ def generate_models(spec: dict) -> str: """Generoi models.py templatesta.""" # Kerää tarvittavat SQLAlchemy-tyypit sa_types = set() for entity in spec["entities"]: for field in entity["fields"]: base_type = re.match(r"(\w+)", field["sa_type"]).group(1) sa_types.add(base_type) sa_types.add("Integer") sa_imports = ", ".join(sorted(sa_types)) lines = [ f"from sqlalchemy import create_engine, Column, Integer, {sa_imports}", "from sqlalchemy.ext.declarative import declarative_base", "from sqlalchemy.orm import sessionmaker", "", 'DATABASE_URL = "sqlite:///./app.db"', 'engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})', "SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)", "Base = declarative_base()", "", ] for entity in spec["entities"]: lines.append(f"class {entity['name']}(Base):") lines.append(f' __tablename__ = "{entity["table_name"]}"') lines.append(f" id = Column(Integer, primary_key=True, index=True)") for field in entity["fields"]: parts = [f"Column({field['sa_type']}"] if not field.get("nullable", True): parts.append("nullable=False") if field.get("default") is not None: default = field["default"] if isinstance(default, str): parts.append(f'default="{default}"') else: parts.append(f"default={default}") col_def = ", ".join(parts) + ")" lines.append(f" {field['name']} = {col_def}") lines.append("") lines.append("Base.metadata.create_all(bind=engine)") return "\n".join(lines) def generate_schemas(spec: dict) -> str: """Generoi schemas.py templatesta.""" lines = ["from pydantic import BaseModel"] # Extra imports (esim. date, datetime) for imp in spec.get("extra_imports", []): lines.append(imp) lines.append("") for entity in spec["entities"]: name = entity["name"] # Create schema lines.append(f"class {name}Create(BaseModel):") for field in entity["fields"]: py_type = field["py_type"] default = field.get("default") if default is not None: if isinstance(default, str): lines.append(f' {field["name"]}: {py_type} = "{default}"') else: lines.append(f' {field["name"]}: {py_type} = {default}') elif field.get("nullable", True) and "None" in py_type: lines.append(f' {field["name"]}: {py_type} = None') else: lines.append(f' {field["name"]}: {py_type}') lines.append("") # Response schema lines.append(f"class {name}Response({name}Create):") lines.append(" id: int") lines.append("") lines.append(" class Config:") lines.append(" from_attributes = True") lines.append("") return "\n".join(lines) def generate_main(spec: dict) -> str: """Generoi main.py templatesta.""" entities = spec["entities"] # Imports model_names = ", ".join(e["name"] for e in entities) create_names = ", ".join(f'{e["name"]}Create' for e in entities) response_names = ", ".join(f'{e["name"]}Response' for e in entities) lines = [ "from fastapi import FastAPI, Depends, HTTPException", "from sqlalchemy.orm import Session", f"from models import Base, engine, SessionLocal, {model_names}", f"from schemas import {create_names}, {response_names}", "", "app = FastAPI()", "", "def get_db():", " db = SessionLocal()", " try:", " yield db", " finally:", " db.close()", "", ] for entity in entities: name = entity["name"] lower = name.lower() table = entity["table_name"] lines.extend([ f'@app.post("/{table}/", response_model={name}Response, status_code=201)', f"def create_{lower}(item: {name}Create, db: Session = Depends(get_db)):", f" db_item = {name}(**item.model_dump())", f" db.add(db_item)", f" db.commit()", f" db.refresh(db_item)", f" return db_item", "", f'@app.get("/{table}/", response_model=list[{name}Response])', f"def list_{lower}s(db: Session = Depends(get_db)):", f" return db.query({name}).all()", "", f'@app.get("/{table}/{{item_id}}", response_model={name}Response)', f"def get_{lower}(item_id: int, db: Session = Depends(get_db)):", f" item = db.query({name}).filter({name}.id == item_id).first()", f" if not item:", f' raise HTTPException(status_code=404, detail="{name} not found")', f" return item", "", f'@app.put("/{table}/{{item_id}}", response_model={name}Response)', f"def update_{lower}(item_id: int, item: {name}Create, db: Session = Depends(get_db)):", f" db_item = db.query({name}).filter({name}.id == item_id).first()", f" if not db_item:", f' raise HTTPException(status_code=404, detail="{name} not found")', f" for key, value in item.model_dump().items():", f" setattr(db_item, key, value)", f" db.commit()", f" db.refresh(db_item)", f" return db_item", "", f'@app.delete("/{table}/{{item_id}}", status_code=204)', f"def delete_{lower}(item_id: int, db: Session = Depends(get_db)):", f" db_item = db.query({name}).filter({name}.id == item_id).first()", f" if not db_item:", f' raise HTTPException(status_code=404, detail="{name} not found")', f" db.delete(db_item)", f" db.commit()", "", ]) return "\n".join(lines) def generate_tests(spec: dict) -> str: """Generoi test_main.py templatesta.""" entities = spec["entities"] lines = [ "import pytest", "from fastapi.testclient import TestClient", "from sqlalchemy import create_engine", "from sqlalchemy.orm import sessionmaker", "from main import app, get_db", "from models import Base", "", 'TEST_DB = "sqlite:///./test.db"', 'test_engine = create_engine(TEST_DB, connect_args={"check_same_thread": False})', "TestSession = sessionmaker(autocommit=False, autoflush=False, bind=test_engine)", "Base.metadata.create_all(bind=test_engine)", "", "def override_get_db():", " db = TestSession()", " try:", " yield db", " finally:", " db.close()", "", "app.dependency_overrides[get_db] = override_get_db", "client = TestClient(app)", "", ] for entity in entities: name = entity["name"] lower = name.lower() table = entity["table_name"] # Rakenna esimerkki-JSON testidatasta test_data = {} for field in entity["fields"]: if "str" in field["py_type"]: test_data[field["name"]] = f"Test {field['name']}" elif "int" in field["py_type"]: test_data[field["name"]] = 1 elif "float" in field["py_type"]: test_data[field["name"]] = 1.0 elif "bool" in field["py_type"]: test_data[field["name"]] = True elif "date" in field["py_type"].lower(): test_data[field["name"]] = "2024-01-15" if field.get("default") is not None: test_data[field["name"]] = field["default"] # Poista None-arvoiset optional-kentät testidatasta test_data = {k: v for k, v in test_data.items() if v is not None} test_json = json.dumps(test_data) update_data = dict(test_data) first_str_field = next((f["name"] for f in entity["fields"] if "str" in f["py_type"] and f["name"] != "status"), None) if first_str_field: update_data[first_str_field] = f"Updated {first_str_field}" update_json = json.dumps(update_data) lines.extend([ f"def test_create_{lower}():", f" response = client.post('/{table}/', json={test_json})", f" assert response.status_code == 201", f" data = response.json()", f' assert "id" in data', "", f"def test_list_{lower}s():", f" client.post('/{table}/', json={test_json})", f" response = client.get('/{table}/')", f" assert response.status_code == 200", f" assert len(response.json()) >= 1", "", f"def test_get_{lower}_by_id():", f" created = client.post('/{table}/', json={test_json}).json()", f" item_id = created['id']", f" response = client.get(f'/{table}/{{item_id}}')", f" assert response.status_code == 200", f" assert response.json()['id'] == item_id", "", f"def test_get_{lower}_not_found():", f" response = client.get('/{table}/99999')", f" assert response.status_code == 404", "", f"def test_update_{lower}():", f" created = client.post('/{table}/', json={test_json}).json()", f" item_id = created['id']", f" response = client.put(f'/{table}/{{item_id}}', json={update_json})", f" assert response.status_code == 200", "", f"def test_delete_{lower}():", f" created = client.post('/{table}/', json={test_json}).json()", f" item_id = created['id']", f" response = client.delete(f'/{table}/{{item_id}}')", f" assert response.status_code == 204", f" response = client.get(f'/{table}/{{item_id}}')", f" assert response.status_code == 404", "", ]) return "\n".join(lines) def generate_pyproject(spec: dict) -> str: """Generoi pyproject.toml templatesta.""" name = spec.get("project_name", "app").lower().replace(" ", "-") return textwrap.dedent(f"""\ [project] name = "{name}" version = "0.1.0" requires-python = ">=3.11" dependencies = [ "fastapi", "uvicorn[standard]", "sqlalchemy", "pytest", "httpx", ] """) def generate_dockerfile() -> str: """Generoi Dockerfile templatesta — aina sama.""" return textwrap.dedent("""\ FROM python:3.12-slim COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv ENV UV_CACHE_DIR=/tmp/uv-cache WORKDIR /app COPY pyproject.toml . RUN uv sync COPY *.py . RUN useradd -m appuser && chown -R appuser:appuser /app /tmp/uv-cache USER appuser EXPOSE 8000 CMD ["uv", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] """) # ══════════════════════════════════════════════════════════════ # VAIHE 3: Validointi # ══════════════════════════════════════════════════════════════ def validate_python(code: str, filename: str) -> list[str]: """Syntaksitarkistus.""" try: ast.parse(code, filename=filename) return [] except SyntaxError as e: lines = code.splitlines() bad = lines[e.lineno - 1] if e.lineno and e.lineno <= len(lines) else "?" return [f"SyntaxError rivi {e.lineno}: {e.msg}. Koodi: `{bad.strip()}`"] # ══════════════════════════════════════════════════════════════ # VAIHE 4: Docker-testaus # ══════════════════════════════════════════════════════════════ def docker_test(run_dir: Path, spec: dict) -> dict: """Rakenna ja testaa generoitu projekti Dockerilla.""" results = {"build": False, "pytest": False, "api": False, "errors": []} compose = textwrap.dedent("""\ services: app: build: . ports: - "18765:8000" """) (run_dir / "docker-compose.yml").write_text(compose) # Cleanup subprocess.run(["docker", "compose", "down", "--remove-orphans", "-t", "2"], cwd=run_dir, capture_output=True, timeout=15) # 1. Build print(" 🔨 Docker build...", end=" ", flush=True) r = subprocess.run(["docker", "compose", "build", "--no-cache"], cwd=run_dir, capture_output=True, text=True, timeout=120) if r.returncode != 0: results["errors"].append(f"Build failed:\n{r.stderr[-500:]}") print("❌") print(f" {r.stderr[-200:]}") return results results["build"] = True print("✅") # 2. Pytest print(" 🧪 pytest...", end=" ", flush=True) r = subprocess.run( ["docker", "compose", "run", "--rm", "--no-deps", "app", "uv", "run", "pytest", "test_main.py", "-v", "--tb=short"], cwd=run_dir, capture_output=True, text=True, timeout=90) if r.returncode == 0: results["pytest"] = True # Laske testit passed = len(re.findall(r" PASSED", r.stdout)) print(f"✅ ({passed} tests passed)") else: output = r.stdout + r.stderr results["errors"].append(f"pytest exit {r.returncode}:\n{output[-800:]}") failed = re.findall(r"FAILED (.+?)(?:\s|$)", output) print(f"❌ ({len(failed)} failed)") for f in failed[:5]: print(f" FAILED: {f}") # 3. API smoke test print(" 🌐 API smoke test...", end=" ", flush=True) subprocess.run(["docker", "compose", "up", "-d"], cwd=run_dir, capture_output=True, timeout=30) api_ok = False for _ in range(15): time.sleep(1) try: resp = urlopen("http://localhost:18765/docs", timeout=3) if resp.status == 200: api_ok = True break except Exception: continue if api_ok: entity = spec["entities"][0] table = entity["table_name"] test_data = {} for field in entity["fields"]: if "str" in field["py_type"]: test_data[field["name"]] = f"API test" elif "int" in field["py_type"]: test_data[field["name"]] = 1 elif "date" in field["py_type"].lower(): test_data[field["name"]] = "2024-01-15" if field.get("default") is not None: test_data[field["name"]] = field["default"] test_data = {k: v for k, v in test_data.items() if v is not None} try: req = Request( f"http://localhost:18765/{table}/", data=json.dumps(test_data).encode(), headers={"Content-Type": "application/json"}, method="POST" ) resp = urlopen(req, timeout=5) if resp.status == 201: body = json.loads(resp.read()) if "id" in body: results["api"] = True print(f"✅ POST /{table}/ → 201, id={body['id']}") else: print("❌ POST 201 mutta ei id:tä") else: print(f"❌ POST → {resp.status}") except Exception as e: results["errors"].append(f"API: {e}") print(f"❌ {e}") else: results["errors"].append("Kontti ei käynnistynyt 15s:ssa") print("❌ timeout") # Cleanup subprocess.run(["docker", "compose", "down", "-t", "2"], cwd=run_dir, capture_output=True, timeout=15) return results # ══════════════════════════════════════════════════════════════ # PIPELINE # ══════════════════════════════════════════════════════════════ def run_template_pipeline(run_id: str, project_description: str) -> dict: run_dir = OUTDIR / run_id run_dir.mkdir(parents=True, exist_ok=True) results = {"run_id": run_id, "model": MODEL, "description": project_description} print(f"\n{'='*60}") print(f" Template Pipeline: {project_description}") print(f"{'='*60}") # 1. JSON-speksi LLM:ltä print(f"\n── Vaihe 1: JSON-speksi ──") spec = generate_spec(project_description) if not spec: print(" ❌ Speksin generointi epäonnistui") results["error"] = "spec generation failed" return results (run_dir / "spec.json").write_text(json.dumps(spec, indent=2, ensure_ascii=False)) results["spec"] = spec for entity in spec["entities"]: print(f" 📦 {entity['name']} ({entity['table_name']}): {len(entity['fields'])} kenttää") # 2. Koodigenerointi templateista print(f"\n── Vaihe 2: Koodigenerointi ──") files = { "models.py": generate_models(spec), "schemas.py": generate_schemas(spec), "main.py": generate_main(spec), "test_main.py": generate_tests(spec), "pyproject.toml": generate_pyproject(spec), "Dockerfile": generate_dockerfile(), } all_valid = True for filename, code in files.items(): (run_dir / filename).write_text(code) if filename.endswith(".py"): errors = validate_python(code, filename) if errors: print(f" ❌ {filename}: {errors}") all_valid = False else: print(f" ✅ {filename} ({len(code.splitlines())} riviä)") else: print(f" ✅ {filename} ({len(code.splitlines())} riviä)") results["files"] = {k: len(v) for k, v in files.items()} results["valid"] = all_valid if not all_valid: print("\n ⏭️ Docker-testi ohitettu — validointivirheitä") return results # 3. Docker-testaus print(f"\n── Vaihe 3: Docker ──") docker_results = docker_test(run_dir, spec) results["docker"] = docker_results # 4. Yhteenveto print(f"\n{'='*60}") print(f" YHTEENVETO: {run_id}") print(f"{'='*60}") print(f" Speksi: ✅") print(f" Syntaksi: {'✅' if all_valid else '❌'}") print(f" Build: {'✅' if docker_results['build'] else '❌'}") print(f" Pytest: {'✅' if docker_results['pytest'] else '❌'}") print(f" API: {'✅' if docker_results['api'] else '❌'}") score = sum([all_valid, docker_results["build"], docker_results["pytest"], docker_results["api"]]) print(f" Score: {score}/4") (run_dir / "report.json").write_text(json.dumps(results, indent=2, ensure_ascii=False)) return results # ── Main ────────────────────────────────────────────────────── if __name__ == "__main__": description = "Todo-sovellus FastAPI + SQLite, CRUD-endpointit ja testit" run_id = f"tmpl_{int(time.time())}" args = sys.argv[1:] if args: run_id = args[0] if len(args) > 1: description = " ".join(args[1:]) run_template_pipeline(run_id, description)