#!/usr/local/bin/node // cveutils - MCP server for CVE's suckless FreeBSD setup // ISC License import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; import { spawnSync } from "child_process"; import { readdirSync, readFileSync, writeFileSync, existsSync, mkdtempSync, unlinkSync } from "fs"; import { tmpdir } from "os"; import { join } from "path"; import * as cfg from "./config.js"; const server = new McpServer({ name: "cveutils", version: "1.0.0" }); // -- helpers -- function run(cmd, args, input) { const r = spawnSync(cmd, args, { encoding: "utf8", input }); return { rc: r.status ?? 1, out: r.stdout ?? "", err: r.stderr ?? "" }; } function ssh(cmd, input) { return run("ssh", [cfg.SERVER_HOST, cmd], input); } function findToolDir(tool) { const exact = join(cfg.SRC_DIR, tool); if (existsSync(exact)) return exact; const entries = readdirSync(cfg.SRC_DIR, { withFileTypes: true }); const match = entries.find(e => e.isDirectory() && e.name.startsWith(tool + "-")); return match ? join(cfg.SRC_DIR, match.name) : null; } async function forgejo(path, method = "GET", data) { const url = `${cfg.FORGEJO_URL}/api/v1${path}`; const res = await fetch(url, { method, headers: { "Authorization": `token ${cfg.FORGEJO_TOKEN}`, "Content-Type": "application/json", }, body: data ? JSON.stringify(data) : undefined, }); return res.json(); } // -- suckless -- server.tool( "suckless_build", "build and install a suckless tool (dwm, st, dmenu, slock, surf, tabbed, beastie)", { tool: z.string() }, ({ tool }) => { const d = findToolDir(tool); if (!d) return { content: [{ type: "text", text: `error: no directory found for '${tool}'` }] }; const r = spawnSync("doas", ["make", "install"], { encoding: "utf8", cwd: d }); const result = (r.stdout ?? "") + (r.stderr ?? ""); const text = r.status !== 0 ? `build failed (exit ${r.status}):\n${result}` : `built ${tool} ok:\n${result}`; return { content: [{ type: "text", text }] }; } ); server.tool( "suckless_config_read", "read config.h for a suckless tool", { tool: z.string() }, ({ tool }) => { const d = findToolDir(tool); if (!d) return { content: [{ type: "text", text: `error: no directory found for '${tool}'` }] }; for (const name of ["config.h", "config.def.h"]) { const p = join(d, name); if (existsSync(p)) return { content: [{ type: "text", text: readFileSync(p, "utf8") }] }; } return { content: [{ type: "text", text: `error: no config.h in ${d}` }] }; } ); server.tool( "suckless_config_write", "write config.h for a suckless tool and optionally rebuild", { tool: z.string(), content: z.string(), rebuild: z.boolean().default(true) }, ({ tool, content, rebuild }) => { const d = findToolDir(tool); if (!d) return { content: [{ type: "text", text: `error: no directory found for '${tool}'` }] }; writeFileSync(join(d, "config.h"), content); if (!rebuild) return { content: [{ type: "text", text: `wrote config.h for ${tool}` }] }; const r = spawnSync("doas", ["make", "install"], { encoding: "utf8", cwd: d }); const result = (r.stdout ?? "") + (r.stderr ?? ""); const text = r.status !== 0 ? `build failed (exit ${r.status}):\n${result}` : `built ${tool} ok:\n${result}`; return { content: [{ type: "text", text }] }; } ); // -- server -- server.tool( "server_logs", "get docker compose logs for a service on the debian server", { service: z.string(), lines: z.coerce.number().default(50) }, ({ service, lines }) => { const { rc, out, err } = ssh(`cd ~/${service} && docker compose logs --tail=${lines} 2>&1`); const text = rc !== 0 ? `error (exit ${rc}):\n${out}${err}` : out; return { content: [{ type: "text", text }] }; } ); server.tool( "server_restart", "restart a docker compose service on the debian server", { service: z.string() }, ({ service }) => { const { rc, out, err } = ssh(`cd ~/${service} && docker compose restart 2>&1`); const text = rc !== 0 ? `error (exit ${rc}):\n${out}${err}` : (out || "restarted ok"); return { content: [{ type: "text", text }] }; } ); server.tool( "server_shell", "run an arbitrary command on the debian server via ssh", { cmd: z.string() }, ({ cmd }) => { const { rc, out, err } = ssh(cmd); const result = out + err; const text = rc !== 0 ? `exit ${rc}:\n${result}` : (result || "(no output)"); return { content: [{ type: "text", text }] }; } ); server.tool( "caddy_config_read", "read the Caddyfile from the debian server", {}, () => { const { rc, out, err } = ssh("cat ~/searxng-docker/Caddyfile"); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "caddy_config_write", "write the Caddyfile on the debian server and reload caddy", { content: z.string() }, ({ content }) => { const wr = run("ssh", [cfg.SERVER_HOST, "cat > ~/searxng-docker/Caddyfile"], content); if (wr.rc !== 0) return { content: [{ type: "text", text: `write failed: ${wr.err}` }] }; const { rc, out, err } = ssh("cd ~/searxng-docker && docker compose exec caddy caddy reload --config /etc/caddy/Caddyfile 2>&1"); const text = rc !== 0 ? `written but reload failed: ${out}${err}` : "written and reloaded ok"; return { content: [{ type: "text", text }] }; } ); // -- pkg -- server.tool( "pkg_search", "search pkg for a FreeBSD package", { name: z.string() }, ({ name }) => { const { out, err } = run("pkg", ["search", name]); return { content: [{ type: "text", text: out || err || "(no results)" }] }; } ); server.tool( "pkg_install", "install a FreeBSD package with doas pkg install -y", { name: z.string() }, ({ name }) => { const r = spawnSync("doas", ["pkg", "install", "-y", name], { encoding: "utf8" }); const result = (r.stdout ?? "") + (r.stderr ?? ""); const text = r.status !== 0 ? `install failed (exit ${r.status}):\n${result}` : (result || "installed ok"); return { content: [{ type: "text", text }] }; } ); // -- forgejo -- server.tool( "forgejo_repo_list", "list repos for a user or org. defaults to configured user if owner not specified", { owner: z.string().optional() }, async ({ owner }) => { const target = owner ?? cfg.FORGEJO_USER; // try org endpoint first, fall back to user let result = await forgejo(`/orgs/${target}/repos?limit=50`); if (!Array.isArray(result)) result = await forgejo(`/users/${target}/repos?limit=50`); const text = Array.isArray(result) ? result.map(r => r.full_name).join("\n") : JSON.stringify(result); return { content: [{ type: "text", text }] }; } ); server.tool( "forgejo_create_issue", "create an issue on forgejo. repo format: owner/repo", { repo: z.string(), title: z.string(), body: z.string().default("") }, async ({ repo, title, body }) => { const result = await forgejo(`/repos/${repo}/issues`, "POST", { title, body }); const text = result.number ? `created issue #${result.number}: ${result.html_url}` : JSON.stringify(result); return { content: [{ type: "text", text }] }; } ); server.tool( "forgejo_push_file", "create or update a file in a forgejo repo. repo: owner/repo", { repo: z.string(), path: z.string(), content: z.string(), message: z.string().default("update via cveutils") }, async ({ repo, path, content, message }) => { const existing = await forgejo(`/repos/${repo}/contents/${path}`); const sha = existing?.sha; const payload = { message, content: Buffer.from(content).toString("base64") }; const method = sha ? "PUT" : "POST"; if (sha) payload.sha = sha; const result = await forgejo(`/repos/${repo}/contents/${path}`, method, payload); const text = result.content ? `pushed ${path} to ${repo}` : JSON.stringify(result); return { content: [{ type: "text", text }] }; } ); // -- system -- server.tool( "dmesg_tail", "get the last N lines of dmesg", { lines: z.coerce.number().default(30) }, ({ lines }) => { const { rc, out, err } = run("doas", ["dmesg"]); if (rc !== 0) return { content: [{ type: "text", text: `error: ${err}` }] }; const text = out.trimEnd().split("\n").slice(-lines).join("\n"); return { content: [{ type: "text", text }] }; } ); server.tool( "proc_list", "list running processes (ps aux)", {}, () => { const { out, err } = run("ps", ["aux"]); return { content: [{ type: "text", text: out || err }] }; } ); // -- cmus -- server.tool( "cmus_status", "get current cmus playback status and track info", {}, () => { const { rc, out, err } = run("cmus-remote", ["-Q"]); if (rc !== 0) return { content: [{ type: "text", text: `cmus not running or error: ${err}` }] }; // parse the status output into something readable const lines = out.split("\n"); const get = (key) => { const l = lines.find(l => l.startsWith(key + " ") || l.startsWith("tag " + key + " ")); return l ? l.split(" ").slice(l.startsWith("tag") ? 2 : 1).join(" ") : null; }; const status = get("status"); const file = get("file"); const artist = get("artist"); const title = get("title"); const album = get("album"); const pos = get("position"); const dur = get("duration"); const fmt = (s) => s ? `${Math.floor(s/60)}:${String(s%60).padStart(2,"0")}` : "?"; const track = (artist && title) ? `${artist} - ${title}` : (file ?? "nothing"); const time = (pos && dur) ? `${fmt(parseInt(pos))} / ${fmt(parseInt(dur))}` : ""; const shuffle = lines.find(l => l.includes("shuffle")) ?? ""; const repeat = lines.find(l => l.includes("repeat ")) ?? ""; let out2 = `status: ${status}\ntrack: ${track}`; if (album) out2 += `\nalbum: ${album}`; if (time) out2 += `\ntime: ${time}`; out2 += `\n${shuffle.trim()} ${repeat.trim()}`; return { content: [{ type: "text", text: out2 }] }; } ); server.tool( "cmus_control", "control cmus in natural language. e.g. 'play', 'pause', 'next', 'previous', 'shuffle on', 'repeat off', 'volume up', 'volume down', 'volume 50', 'seek forward', 'seek back'", { cmd: z.string() }, ({ cmd }) => { const c = cmd.toLowerCase().trim(); let args; if (/^play$/.test(c)) args = ["-p"]; else if (/^pause$/.test(c)) args = ["-u"]; else if (/^(stop)$/.test(c)) args = ["-s"]; else if (/^(next|skip)$/.test(c)) args = ["-n"]; else if (/^(prev|previous|back)$/.test(c)) args = ["-r"]; else if (/shuffle on/.test(c)) args = ["-S", "true"]; else if (/shuffle off/.test(c)) args = ["-S", "false"]; else if (/shuffle/.test(c)) args = ["-S"]; else if (/repeat all/.test(c)) args = ["-R", "true"]; else if (/repeat off/.test(c)) args = ["-R", "false"]; else if (/repeat/.test(c)) args = ["-R"]; else if (/vol(ume)?\s+up/.test(c)) args = ["-v", "+5%"]; else if (/vol(ume)?\s+down/.test(c)) args = ["-v", "-5%"]; else if (/vol(ume)?\s+(\d+)/.test(c)) args = ["-v", c.match(/(\d+)/)[1] + "%"]; else if (/seek\s+(forward|ahead|\+)/.test(c)) args = ["--seek", "+10"]; else if (/seek\s+(back(ward)?|behind|-)/.test(c)) args = ["--seek", "-10"]; else { return { content: [{ type: "text", text: `unknown command: '${cmd}'. try: play, pause, stop, next, previous, shuffle on/off, repeat on/off, volume up/down/50, seek forward/back` }] }; } const { rc, err } = run("cmus-remote", args); if (rc !== 0) return { content: [{ type: "text", text: `error: ${err}` }] }; return { content: [{ type: "text", text: `ok: ${cmd}` }] }; } ); // -- irc (ii) -- server.tool( "irc_read", "read recent messages from an ii IRC channel log. server: 127.0.0.1, irc.libera.chat, irc.oftc.net. channel: #freebsd, #linux, etc", { server: z.string().default("127.0.0.1"), channel: z.string(), lines: z.coerce.number().default(30) }, ({ server, channel, lines }) => { const home = process.env.HOME ?? "/home/realdaemon"; const path = join(home, "irc", server, channel, "out"); if (!existsSync(path)) return { content: [{ type: "text", text: `no log found at ${path}` }] }; const { rc, out, err } = run("tail", ["-n", String(lines), path]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "irc_send", "send a message to an ii IRC channel. server: 127.0.0.1, irc.libera.chat, etc. channel: #freebsd, etc", { server: z.string().default("127.0.0.1"), channel: z.string(), msg: z.string() }, ({ server, channel, msg }) => { const home = process.env.HOME ?? "/home/realdaemon"; const path = join(home, "irc", server, channel, "in"); if (!existsSync(path)) return { content: [{ type: "text", text: `no input fifo found at ${path}` }] }; const { rc, err } = run("sh", ["-c", `echo ${JSON.stringify(msg)} > ${JSON.stringify(path)}`]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : `sent: ${msg}` }] }; } ); // -- system info -- server.tool( "battery", "get battery status and charge level", {}, () => { const { rc, out, err } = run("apm", []); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "wifi_status", "get wifi interface status", {}, () => { const { rc, out, err } = run("ifconfig", ["wlan0"]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "disk_usage", "get disk usage (df -h)", {}, () => { const { rc, out, err } = run("df", ["-h"]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "sysctl_get", "read a sysctl value, e.g. hw.physmem, kern.osrelease, dev.cpu.0.temperature", { key: z.string() }, ({ key }) => { const { rc, out, err } = run("sysctl", [key]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out.trim() }] }; } ); // -- x / desktop -- server.tool( "clipboard_read", "read the current X clipboard contents", {}, () => { const { rc, out, err } = run("xclip", ["-selection", "clipboard", "-o"]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "clipboard_write", "write text to the X clipboard", { text: z.string() }, ({ text }) => { const r = spawnSync("xclip", ["-selection", "clipboard"], { input: text, encoding: "utf8", timeout: 2000 }); const ok = r.status === 0 || r.signal === "SIGTERM"; return { content: [{ type: "text", text: ok ? "copied to clipboard" : `error: ${r.stderr}` }] }; } ); server.tool( "xwindow_list", "list all open X windows with their IDs and titles", {}, () => { const { rc, out, err } = run("wmctrl", ["-l"]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "xwindow_focus", "focus a window by name or partial title", { name: z.string() }, ({ name }) => { const { rc, err } = run("wmctrl", ["-a", name]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : `focused: ${name}` }] }; } ); server.tool( "xprop_focused", "get X properties of the currently focused window (title, class, etc)", {}, () => { const id = spawnSync("xdotool", ["getactivewindow"], { encoding: "utf8", env: { ...process.env, DISPLAY: ":0" } }); if (id.status !== 0) return { content: [{ type: "text", text: `error: ${id.stderr}` }] }; const { rc, out, err } = run("xprop", ["-id", id.stdout.trim()]); return { content: [{ type: "text", text: rc !== 0 ? `error: ${err}` : out }] }; } ); server.tool( "xdotool_key", "send a key or key combo to the focused window, e.g. 'ctrl+c', 'super+j', 'Return', 'Escape'", { key: z.string(), window: z.string().optional() }, ({ key, window }) => { const args = window ? ["key", "--window", window, key] : ["key", key]; const r = spawnSync("xdotool", args, { encoding: "utf8", env: { ...process.env, DISPLAY: ":0" } }); return { content: [{ type: "text", text: r.status !== 0 ? `error: ${r.stderr}` : `sent key: ${key}` }] }; } ); server.tool( "xdotool_type", "type text into the focused window", { text: z.string(), window: z.string().optional() }, ({ text, window }) => { const args = window ? ["type", "--window", window, "--", text] : ["type", "--", text]; const r = spawnSync("xdotool", args, { encoding: "utf8", env: { ...process.env, DISPLAY: ":0" } }); return { content: [{ type: "text", text: r.status !== 0 ? `error: ${r.stderr}` : `typed: ${text}` }] }; } ); server.tool( "xdotool_click", "click the mouse. button: 1=left, 2=middle, 3=right. optionally move to x,y first", { button: z.coerce.number().default(1), x: z.coerce.number().optional(), y: z.coerce.number().optional() }, ({ button, x, y }) => { const env = { ...process.env, DISPLAY: ":0" }; if (x !== undefined && y !== undefined) { spawnSync("xdotool", ["mousemove", String(x), String(y)], { encoding: "utf8", env }); } const r = spawnSync("xdotool", ["click", String(button)], { encoding: "utf8", env }); return { content: [{ type: "text", text: r.status !== 0 ? `error: ${r.stderr}` : `clicked button ${button}` }] }; } ); server.tool( "surf_open", "open a URL in the running surf instance", { url: z.string() }, ({ url }) => { const env = { ...process.env, DISPLAY: ":0" }; const search = spawnSync("xdotool", ["search", "--class", "surf"], { encoding: "utf8", env }); if (search.status !== 0 || !search.stdout.trim()) { return { content: [{ type: "text", text: "no surf window found" }] }; } const wid = search.stdout.trim().split("\n")[0]; spawnSync("xdotool", ["windowfocus", wid], { encoding: "utf8", env }); spawnSync("xdotool", ["key", "--window", wid, "ctrl+l"], { encoding: "utf8", env }); spawnSync("xdotool", ["type", "--clearmodifiers", "--", url], { encoding: "utf8", env }); spawnSync("xdotool", ["key", "Return"], { encoding: "utf8", env }); return { content: [{ type: "text", text: `opened: ${url}` }] }; } ); server.tool( "dwm_status_read", "read the current dwm status bar text", {}, () => { const r = spawnSync("xprop", ["-root", "WM_NAME"], { encoding: "utf8", env: { ...process.env, DISPLAY: ":0" } }); if (r.status !== 0) return { content: [{ type: "text", text: `error: ${r.stderr}` }] }; const match = r.stdout.match(/WM_NAME\(\w+\) = "(.*)"/); return { content: [{ type: "text", text: match ? match[1] : r.stdout.trim() }] }; } ); server.tool( "dwm_status", "set the dwm status bar text (shown on the right of the bar)", { text: z.string() }, ({ text }) => { const r = spawnSync("xsetroot", ["-name", text], { encoding: "utf8", env: { ...process.env, DISPLAY: ":0" } }); return { content: [{ type: "text", text: r.status !== 0 ? `error: ${r.stderr}` : `status set: ${text}` }] }; } ); // -- forgejo extra -- server.tool( "forgejo_create_repo", "create a new repo on forgejo. owner defaults to configured user", { name: z.string(), description: z.string().default(""), private: z.boolean().default(false), owner: z.string().optional(), }, async ({ name, description, private: isPrivate, owner }) => { const target = owner ?? cfg.FORGEJO_USER; // check if owner is an org const orgCheck = await forgejo(`/orgs/${target}`); const isOrg = orgCheck?.id !== undefined && !orgCheck?.login?.includes("404"); const endpoint = isOrg ? `/orgs/${target}/repos` : `/user/repos`; const result = await forgejo(endpoint, "POST", { name, description, private: isPrivate, auto_init: false, }); const text = result.full_name ? `created repo: ${result.html_url}` : JSON.stringify(result); return { content: [{ type: "text", text }] }; } ); server.tool( "forgejo_list_issues", "list open issues for a repo. repo: owner/repo", { repo: z.string(), state: z.enum(["open", "closed", "all"]).default("open") }, async ({ repo, state }) => { const result = await forgejo(`/repos/${repo}/issues?type=issues&state=${state}&limit=50`); if (!Array.isArray(result)) return { content: [{ type: "text", text: JSON.stringify(result) }] }; if (result.length === 0) return { content: [{ type: "text", text: "no issues" }] }; const text = result.map(i => `#${i.number} [${i.state}] ${i.title}`).join("\n"); return { content: [{ type: "text", text }] }; } ); server.tool( "forgejo_close_issue", "close an issue on forgejo. repo: owner/repo", { repo: z.string(), number: z.coerce.number() }, async ({ repo, number }) => { const result = await forgejo(`/repos/${repo}/issues/${number}`, "PATCH", { state: "closed" }); const text = result.number ? `closed issue #${result.number}` : JSON.stringify(result); return { content: [{ type: "text", text }] }; } ); server.tool( "forgejo_delete_file", "delete a file from a forgejo repo. repo: owner/repo", { repo: z.string(), path: z.string(), message: z.string().default("delete via cveutils") }, async ({ repo, path, message }) => { const existing = await forgejo(`/repos/${repo}/contents/${path}`); const sha = existing?.sha; if (!sha) return { content: [{ type: "text", text: `file not found: ${path}` }] }; const result = await forgejo(`/repos/${repo}/contents/${path}`, "DELETE", { message, sha }); const text = result.commit ? `deleted ${path} from ${repo}` : JSON.stringify(result); return { content: [{ type: "text", text }] }; } ); // -- screenshot -- server.tool( "screenshot", "take a screenshot of the current screen and return it as a base64 PNG", {}, () => { const path = join(tmpdir(), `cveutils-shot-${Date.now()}.png`); const r = spawnSync("scrot", [path], { encoding: "utf8", env: { ...process.env, DISPLAY: ":0" } }); if (r.status !== 0) { return { content: [{ type: "text", text: `scrot failed: ${r.stderr ?? "unknown error"}` }] }; } const data = readFileSync(path).toString("base64"); unlinkSync(path); return { content: [{ type: "image", data, mimeType: "image/png" }] }; } ); // -- util -- server.tool( "wait", "wait for N seconds before returning. useful for set/wait/read loops on the dwm bar", { seconds: z.coerce.number().default(5) }, ({ seconds }) => new Promise(resolve => setTimeout(() => resolve({ content: [{ type: "text", text: `waited ${seconds}s` }] }), seconds * 1000) ) ); // -- start -- const transport = new StdioServerTransport(); await server.connect(transport);