""" bridge.py — Arduino Serial → DataNet bridge (Python) Reads newline-delimited JSON from a USB serial port and publishes each message to a DataNet channel. Works with the SerialSensor.ino sketch and any other Arduino sketch that emits JSON lines over USB serial. Requirements: Python 3.11+ pip install pyserial aiohttp websockets Or, if you have the DataNet Python SDK checked out: pip install -e packages/sdk-python Usage: python bridge.py # auto-detect port SERIAL_PORT=/dev/ttyUSB0 python bridge.py # Linux explicit port SERIAL_PORT=/dev/tty.usbmodem* python bridge.py # macOS explicit port SERIAL_PORT=COM3 python bridge.py # Windows explicit port Environment variables: DATANET_API_KEY Your DataNet API key (required) SERIAL_PORT Serial device path (auto-detected if not set) BAUD_RATE Baud rate (default: 115200) CHANNEL DataNet channel (default: demo.serial.arduino) API_URL DataNet REST base URL (default: https://api.datanet.art) WS_URL DataNet WebSocket URL (default: wss://ws.datanet.art/ws) ROUTE_BY_SENSOR Set to "1" to publish to . sub-channels DEBUG Set to "1" to log every message """ from __future__ import annotations import asyncio import json import logging import os import sys import threading from typing import Optional import serial import serial.tools.list_ports # ── Config ──────────────────────────────────────────────────────────────── API_KEY = os.environ.get("DATANET_API_KEY", "") SERIAL_PORT = os.environ.get("SERIAL_PORT", "") BAUD_RATE = int(os.environ.get("BAUD_RATE", "115200")) CHANNEL = os.environ.get("CHANNEL", "demo.serial.arduino") API_URL = os.environ.get("API_URL", "https://api.datanet.art").rstrip("/") WS_URL = os.environ.get("WS_URL", "wss://ws.datanet.art/ws") ROUTE_BY_SENSOR = os.environ.get("ROUTE_BY_SENSOR", "") == "1" DEBUG = os.environ.get("DEBUG", "") == "1" logging.basicConfig( level=logging.DEBUG if DEBUG else logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) log = logging.getLogger("datanet.bridge") if not API_KEY: log.error("DATANET_API_KEY is required. Set it as an environment variable.") sys.exit(1) # ── Serial port auto-detection ──────────────────────────────────────────── ARDUINO_KEYWORDS = ["arduino", "ch340", "ch341", "ftdi", "cp210", "usbmodem", "usbserial"] def find_arduino_port() -> Optional[str]: ports = serial.tools.list_ports.comports() if DEBUG: for p in ports: log.debug("port: %s mfr: %s hwid: %s", p.device, p.manufacturer, p.hwid) # Prefer ports matching known Arduino/USB-serial chip signatures for p in ports: desc = " ".join([ p.device or "", p.manufacturer or "", p.description or "", p.hwid or "", ]).lower() if any(kw in desc for kw in ARDUINO_KEYWORDS): return p.device # Fall back to first port that looks like USB serial for p in ports: if "usb" in (p.device or "").lower() or "acm" in (p.device or "").lower(): return p.device return None # ── DataNet bridge using the Python SDK ─────────────────────────────────── async def run_bridge(port_path: str) -> None: """Open serial port and forward JSON lines to DataNet via the Python SDK.""" # Import the DataNet SDK. If it isn't installed as a package, try to import # from the monorepo source directory (packages/sdk-python). try: from datanet import DataNet # installed package except ImportError: # Resolve path relative to this file: demos/arduino-serial-bridge/bridge-python/ repo_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "..", "..", "packages", "sdk-python") ) sys.path.insert(0, repo_root) try: from datanet import DataNet except ImportError: log.error( "DataNet SDK not found. Install it with:\n" " pip install aiohttp websockets\n" " pip install -e packages/sdk-python\n" "Or run from the repo root." ) sys.exit(1) msg_count = 0 dn = DataNet(api_key=API_KEY, api_url=API_URL, ws_url=WS_URL) @dn.on("connect") async def on_connect() -> None: log.info("DataNet connected ✓") @dn.on("disconnect") async def on_disconnect() -> None: log.warning("DataNet disconnected — SDK will reconnect automatically") @dn.on("error") async def on_error(exc: Exception) -> None: log.error("DataNet error: %s", exc) # ── Open serial port (blocking I/O in a thread pool) ────────────────── loop = asyncio.get_running_loop() def open_serial() -> serial.Serial: log.info("Opening serial port %s @ %d baud…", port_path, BAUD_RATE) return serial.Serial(port_path, BAUD_RATE, timeout=1) ser = await loop.run_in_executor(None, open_serial) log.info("Serial port open ✓") log.info("Publishing to channel: %s%s\n", CHANNEL, "." if ROUTE_BY_SENSOR else "") # ── Queue for passing serial lines to the async coroutine ───────────── queue: asyncio.Queue[str] = asyncio.Queue() def read_serial_thread() -> None: """Blocking serial read in a background thread; puts lines into queue.""" while True: try: raw = ser.readline() if raw: line = raw.decode("utf-8", errors="replace").strip() if line: asyncio.run_coroutine_threadsafe(queue.put(line), loop) except serial.SerialException as e: log.error("Serial read error: %s", e) break thread = threading.Thread(target=read_serial_thread, daemon=True) thread.start() # ── Main bridge loop ───────────────────────────────────────────────── async with dn: while True: line = await queue.get() nonlocal_count = await handle_line(dn, line) msg_count += nonlocal_count if msg_count % 10 == 0 and msg_count > 0: log.info("msgs published: %d", msg_count) async def handle_line(dn, line: str) -> int: """Parse one JSON line and publish to DataNet. Returns 1 if published, 0 otherwise.""" if not line.startswith("{"): return 0 try: data = json.loads(line) except json.JSONDecodeError: log.debug("parse error, skipping: %s", line) return 0 # Status messages from the sketch — log, don't publish if "status" in data: log.info("Sketch says: %s", data.get("status")) return 0 # Route to sub-channel by sensor name, or publish everything to one channel sensor = data.get("sensor") target = f"{CHANNEL}.{sensor}" if (ROUTE_BY_SENSOR and sensor) else CHANNEL await dn.publish(target, data) log.debug("published → %s: %s", target, data) return 1 # ── Main ─────────────────────────────────────────────────────────────────── def main() -> None: print("DataNet Arduino Serial Bridge (Python)") print("───────────────────────────────────────") print(f"Channel : {CHANNEL}{'.' if ROUTE_BY_SENSOR else ''}") print(f"API URL : {API_URL}") print(f"WS URL : {WS_URL}") print() port_path = SERIAL_PORT if not port_path: port_path = find_arduino_port() if not port_path: log.error( "No serial port detected automatically. " "Set SERIAL_PORT explicitly, e.g.:\n" " SERIAL_PORT=/dev/ttyUSB0 python bridge.py" ) ports = serial.tools.list_ports.comports() if ports: log.error("Available ports: %s", ", ".join(p.device for p in ports)) sys.exit(1) log.info("Auto-detected port: %s", port_path) try: asyncio.run(run_bridge(port_path)) except KeyboardInterrupt: print("\n[bridge] Ctrl-C — shutting down") if __name__ == "__main__": main()