Skip to main content

Architecture Overview

OrcBot is software-first but makes an excellent brain for hardware systems. The recommended pattern keeps real-world control in a dedicated hardware bridge while OrcBot handles planning, reasoning, and high-level decisions.

Why Use a Hardware Bridge?

Bridge enforces safety regardless of AI decisions:
  • Speed limits (max PWM duty cycle)
  • Duration limits (max time per command)
  • Obstacle detection (auto-stop)
  • Emergency stop (overrides everything)
  • Watchdog timer (auto-stop if no commands)

Reference Architecture

Based on the robotics blog example:

Component Layers

  1. OrcBot Core - Planning, memory, autonomy, decision pipeline
  2. Hardware Bridge Service - Translates intents into robot-specific commands
  3. Message Bus - ROS2 topics, MQTT, REST, or serial gateway
  4. Safety Layer - Rate limits, e-stop, command validation
  5. Physical Hardware - Motors, sensors, actuators

Integration Patterns

Pattern 1: REST API Bridge

Best for: Simple robots, prototyping, web-based control
# hardware-bridge.py
from flask import Flask, request, jsonify
import RPi.GPIO as GPIO
import time

app = Flask(__name__)

# GPIO setup
MOTOR_LEFT = {'IN1': 17, 'IN2': 27, 'ENA': 18}
MOTOR_RIGHT = {'IN3': 22, 'IN4': 10, 'ENB': 25}
MAX_SPEED = 80  # Safety limit

@app.route('/move', methods=['POST'])
def move():
    data = request.json
    direction = data.get('direction', 'forward')
    speed = min(int(data.get('speed', 40)), MAX_SPEED)
    duration = min(float(data.get('duration', 1.0)), 5.0)
    
    # Set motor directions
    if direction == 'forward':
        GPIO.output(MOTOR_LEFT['IN1'], GPIO.HIGH)
        GPIO.output(MOTOR_RIGHT['IN3'], GPIO.HIGH)
    elif direction == 'backward':
        GPIO.output(MOTOR_LEFT['IN2'], GPIO.HIGH)
        GPIO.output(MOTOR_RIGHT['IN4'], GPIO.HIGH)
    
    # Set speed via PWM
    pwm_left.ChangeDutyCycle(speed)
    pwm_right.ChangeDutyCycle(speed)
    
    # Auto-stop after duration
    time.sleep(duration)
    stop_motors()
    
    return jsonify({'status': 'success', 'direction': direction})

@app.route('/sensor/distance', methods=['GET'])
def get_distance():
    distance = measure_ultrasonic_distance()
    return jsonify({'distance_cm': distance})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5050)
OrcBot Plugin:
// ~/.orcbot/plugins/robot-control/index.js

const BRIDGE_URL = process.env.ROBOT_BRIDGE_URL || 'http://localhost:5050';

module.exports = [
  {
    name: 'robot_move',
    description: 'Move the robot in a direction',
    usage: 'robot_move(direction, speed?, duration?)',
    handler: async (args) => {
      const response = await fetch(`${BRIDGE_URL}/move`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          direction: args.direction,
          speed: args.speed || 40,
          duration: args.duration || 1.0
        })
      });
      return await response.json();
    }
  },
  {
    name: 'robot_distance',
    description: 'Measure distance to nearest obstacle',
    usage: 'robot_distance()',
    handler: async () => {
      const response = await fetch(`${BRIDGE_URL}/sensor/distance`);
      return await response.json();
    }
  }
];

Pattern 2: ROS2 Integration

Best for: Complex robots, SLAM, navigation, multi-sensor fusion
# ros2_bridge.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from flask import Flask, request, jsonify
import threading

class OrcBotBridge(Node):
    def __init__(self):
        super().__init__('orcbot_bridge')
        self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
        self.scan_sub = self.create_subscription(
            LaserScan, '/scan', self.scan_callback, 10
        )
        self.latest_scan = None
    
    def scan_callback(self, msg):
        self.latest_scan = msg
    
    def move(self, linear, angular, duration):
        twist = Twist()
        twist.linear.x = linear
        twist.angular.z = angular
        
        # Publish for duration
        rate = self.create_rate(10)
        start = self.get_clock().now()
        while (self.get_clock().now() - start).nanoseconds < duration * 1e9:
            self.cmd_vel_pub.publish(twist)
            rate.sleep()
        
        # Stop
        self.cmd_vel_pub.publish(Twist())

app = Flask(__name__)
bridge = None

@app.route('/move', methods=['POST'])
def move():
    data = request.json
    direction = data.get('direction')
    speed = data.get('speed', 0.3)
    duration = data.get('duration', 1.0)
    
    # Convert direction to linear/angular
    if direction == 'forward':
        bridge.move(speed, 0.0, duration)
    elif direction == 'left':
        bridge.move(0.0, 0.5, duration)
    
    return jsonify({'status': 'success'})

@app.route('/scan', methods=['GET'])
def get_scan():
    if bridge.latest_scan:
        ranges = bridge.latest_scan.ranges
        min_distance = min(ranges)
        return jsonify({'min_distance': min_distance})
    return jsonify({'error': 'No scan data'})

def run_ros():
    rclpy.init()
    global bridge
    bridge = OrcBotBridge()
    rclpy.spin(bridge)

if __name__ == '__main__':
    # Run ROS2 in separate thread
    ros_thread = threading.Thread(target=run_ros, daemon=True)
    ros_thread.start()
    
    # Run Flask
    app.run(host='0.0.0.0', port=5050)

Pattern 3: MQTT for Multi-Robot Fleets

Best for: Multiple robots, distributed systems, cloud coordination
# mqtt_bridge.py
import paho.mqtt.client as mqtt
import json
import RPi.GPIO as GPIO

MQTT_BROKER = 'localhost'
MQTT_PORT = 1883
ROBOT_ID = 'robot-001'

def on_connect(client, userdata, flags, rc):
    print(f"Connected to MQTT broker")
    client.subscribe(f'orcbot/{ROBOT_ID}/command')

def on_message(client, userdata, msg):
    command = json.loads(msg.payload)
    
    if command['action'] == 'move':
        move_robot(command['direction'], command['speed'])
        
        # Publish status
        client.publish(f'orcbot/{ROBOT_ID}/status', json.dumps({
            'status': 'moving',
            'direction': command['direction']
        }))
    
    elif command['action'] == 'stop':
        stop_motors()
        client.publish(f'orcbot/{ROBOT_ID}/status', json.dumps({
            'status': 'stopped'
        }))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_forever()
OrcBot Plugin:
// ~/.orcbot/plugins/mqtt-robot/index.js

const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');

module.exports = [
  {
    name: 'fleet_move',
    description: 'Send move command to robot fleet',
    usage: 'fleet_move(robotId, direction, speed)',
    handler: async (args) => {
      return new Promise((resolve) => {
        const topic = `orcbot/${args.robotId}/command`;
        const payload = JSON.stringify({
          action: 'move',
          direction: args.direction,
          speed: args.speed || 40
        });
        
        client.publish(topic, payload);
        
        // Wait for status update
        const statusTopic = `orcbot/${args.robotId}/status`;
        client.once('message', (topic, message) => {
          if (topic === statusTopic) {
            resolve({
              success: true,
              status: JSON.parse(message.toString())
            });
          }
        });
        
        client.subscribe(statusTopic);
      });
    }
  }
];

Safety Patterns

Layer 1: Hardware Bridge Safety

Built into bridge service:
# Safety constants
MAX_SPEED = 80          # Max PWM duty cycle (0-100)
MIN_SPEED = 20          # Below this, motors stall
MAX_DURATION = 5.0      # Max seconds per command
WATCHDOG_TIMEOUT = 10   # Auto-stop if no command in N seconds
OBSTACLE_MIN_CM = 15    # Stop if obstacle closer than this

def validate_command(speed, duration):
    """Clamp values to safe ranges"""
    speed = max(MIN_SPEED, min(speed, MAX_SPEED))
    duration = max(0.1, min(duration, MAX_DURATION))
    return speed, duration

def check_obstacle():
    """Block forward movement if obstacle detected"""
    distance = measure_distance()
    if distance != -1 and distance < OBSTACLE_MIN_CM:
        return False  # Blocked
    return True  # Safe to proceed

def watchdog_loop():
    """Auto-stop if no commands received"""
    while True:
        time.sleep(1)
        if state['moving'] and time.time() - state['last_command'] > WATCHDOG_TIMEOUT:
            stop_motors()
            logger.warning("Watchdog triggered: auto-stop")

Layer 2: OrcBot Guard Rails

Built-in safety features:
# ~/.orcbot/orcbot.config.yaml

# Limit how many times same skill can be called
messageDedupWindow: 15  # Prevent spamming robot_move 20 times

# Pattern loop detection
# Detects and breaks repetitive action cycles

# Step limits
maxStepsPerAction: 20  # Prevent runaway actions

# Termination review
# Second LLM pass confirms task completion

Layer 3: Physical Safety

Highly recommended:
  1. Physical E-Stop Button
    Battery + ── FUSE ── [E-STOP] ── Motor Driver
    
    Normally closed button in series with motor power
    Press = instant power cut (no software involved)
    
  2. Battery Inline Fuse
    • 5A fuse between battery and motor driver
    • Prevents fires from shorts
  3. Bumper Switches
    • Microswitch on front of robot
    • Triggers software stop on physical contact
    • Backup to ultrasonic sensor

Layer 4: Testing Discipline

Follow this order - no exceptions:
1

Test Bridge API

# No motors connected
curl http://robot.local:5050/health
curl -X POST http://robot.local:5050/move \
  -d '{"direction":"forward","speed":0}'
2

Test Motors Individually

# Robot lifted off ground
python test_motors.py
3

Test OrcBot→Bridge

# Robot still lifted
"Move the robot forward at speed 30 for 1 second"
4

Test on Ground (Confined)

Place robot in cardboard box arenaTest obstacle avoidance
5

Normal Operation

Only after all tests passAlways supervised initially

Hardware Examples

Raspberry Pi Robot

See the complete guide in robotics blog including:
  • Bill of materials ($120-160)
  • Wiring diagrams
  • Complete Python bridge code
  • OrcBot plugin implementation
  • Safety systems
  • Testing procedures

Arduino Integration

// arduino_bridge.ino

void setup() {
  Serial.begin(9600);
  pinMode(MOTOR_A_PIN, OUTPUT);
  pinMode(MOTOR_B_PIN, OUTPUT);
}

void loop() {
  if (Serial.available()) {
    String command = Serial.readStringUntil('\n');
    
    if (command.startsWith("MOVE")) {
      int speed = command.substring(5).toInt();
      analogWrite(MOTOR_A_PIN, speed);
      analogWrite(MOTOR_B_PIN, speed);
      Serial.println("OK");
    }
    else if (command == "STOP") {
      analogWrite(MOTOR_A_PIN, 0);
      analogWrite(MOTOR_B_PIN, 0);
      Serial.println("STOPPED");
    }
  }
}
Bridge:
# serial_bridge.py
import serial
from flask import Flask, request, jsonify

app = Flask(__name__)
ser = serial.Serial('/dev/ttyUSB0', 9600)

@app.route('/move', methods=['POST'])
def move():
    speed = request.json.get('speed', 128)
    ser.write(f"MOVE {speed}\n".encode())
    response = ser.readline().decode().strip()
    return jsonify({'status': response})

app.run(host='0.0.0.0', port=5050)

ESP32 WiFi Robot

// esp32_robot.ino
#include <WiFi.h>
#include <WebServer.h>

WebServer server(80);

void handleMove() {
  if (server.hasArg("direction")) {
    String dir = server.arg("direction");
    if (dir == "forward") moveForward();
    else if (dir == "left") turnLeft();
    server.send(200, "application/json", "{\"status\":\"ok\"}");
  }
}

void setup() {
  WiFi.begin("YourSSID", "password");
  while (WiFi.status() != WL_CONNECTED) delay(500);
  
  server.on("/move", handleMove);
  server.begin();
}

void loop() {
  server.handleClient();
}
OrcBot connects directly (no bridge needed):
// Plugin
const ESP32_IP = '192.168.1.100';

module.exports = [{
  name: 'esp32_move',
  handler: async (args) => {
    const response = await fetch(`http://${ESP32_IP}/move?direction=${args.direction}`);
    return await response.json();
  }
}];

IoT Device Integration

Smart Home Devices

// ~/.orcbot/plugins/homeassistant/index.js

const HA_URL = process.env.HOMEASSISTANT_URL;
const HA_TOKEN = process.env.HOMEASSISTANT_TOKEN;

async function callService(domain, service, entity_id, data = {}) {
  const response = await fetch(`${HA_URL}/api/services/${domain}/${service}`, {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${HA_TOKEN}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ entity_id, ...data })
  });
  return await response.json();
}

module.exports = [
  {
    name: 'turn_on_lights',
    description: 'Turn on smart lights',
    usage: 'turn_on_lights(room)',
    handler: async (args) => {
      const entity = `light.${args.room}`;
      await callService('light', 'turn_on', entity);
      return { success: true, message: `Turned on lights in ${args.room}` };
    }
  },
  {
    name: 'set_thermostat',
    description: 'Set thermostat temperature',
    usage: 'set_thermostat(temperature)',
    handler: async (args) => {
      await callService('climate', 'set_temperature', 'climate.thermostat', {
        temperature: args.temperature
      });
      return { success: true, temperature: args.temperature };
    }
  }
];

Industrial Equipment

# modbus_bridge.py
from pymodbus.client import ModbusTcpClient
from flask import Flask, jsonify

app = Flask(__name__)
client = ModbusTcpClient('192.168.1.50', port=502)

@app.route('/conveyor/start', methods=['POST'])
def start_conveyor():
    # Write to holding register
    client.write_register(0x1000, 1)  # Start
    return jsonify({'status': 'started'})

@app.route('/sensor/temperature', methods=['GET'])
def get_temperature():
    # Read input register
    result = client.read_input_registers(0x2000, 1)
    temp = result.registers[0] / 10.0  # Scale
    return jsonify({'temperature_c': temp})

app.run(host='0.0.0.0', port=5050)

Best Practices

Safety First

  • Always implement hardware e-stop
  • Test in simulation before real hardware
  • Start with low speeds and short durations
  • Monitor all operations initially
  • Implement watchdog timers

Separation of Concerns

  • Keep OrcBot for high-level planning
  • Keep bridge for hardware specifics
  • Use message bus for decoupling
  • Test components independently

Error Handling

  • Bridge should validate all commands
  • Return detailed error messages
  • Log all hardware operations
  • Implement graceful degradation

Monitoring

  • Track command success/failure rates
  • Monitor sensor readings
  • Log all movements
  • Alert on anomalies

Troubleshooting

Bridge Connection Issues

# Test bridge directly
curl http://robot.local:5050/health

# Check if bridge is running
sudo systemctl status robot-bridge

# View bridge logs
sudo journalctl -u robot-bridge -f

Motors Not Responding

1

Check Power

  • Battery voltage sufficient?
  • Connections tight?
  • Fuse intact?
2

Check Wiring

  • GPIO pins correct?
  • Motor driver connections?
  • PWM signals working?
3

Test Manually

import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setup(18, GPIO.OUT)
pwm = GPIO.PWM(18, 1000)
pwm.start(50)
# Motor should spin

Sensors Not Reading

# Test ultrasonic sensor
import RPi.GPIO as GPIO
import time

TRIG = 23
ECHO = 24

GPIO.setmode(GPIO.BCM)
GPIO.setup(TRIG, GPIO.OUT)
GPIO.setup(ECHO, GPIO.IN)

GPIO.output(TRIG, True)
time.sleep(0.00001)
GPIO.output(TRIG, False)

while GPIO.input(ECHO) == 0:
    start = time.time()

while GPIO.input(ECHO) == 1:
    stop = time.time()

distance = (stop - start) * 34300 / 2
print(f"Distance: {distance}cm")

Example Workflows

Autonomous Patrol

User: "Patrol the hallway and report any obstacles"

OrcBot Planning:
1. robot_move(direction="forward", duration=3)
2. robot_distance()
3. If obstacle < 30cm: robot_rotate(angle=90)
4. Repeat patrol pattern
5. Report findings via Telegram

Execution:
[Moving forward...]
[Distance: 45cm]
[Moving forward...]
[Distance: 12cm - OBSTACLE!]
[Rotating 90 degrees...]
[Obstacle detected at 2.3m from start point]

Bot (Telegram): ⚠️ Patrol Report:
- Patrolled 2.3 meters
- Obstacle detected (12cm away)
- Rotated to avoid
- Patrol continuing

Scheduled Inspections

# Config
autonomyEnabled: true

# Schedule
heartbeat_schedule(
  schedule: "0 */2 * * *",  # Every 2 hours
  task: "Robot: patrol perimeter and check all sensors"
)

Multi-Robot Coordination

User: "Deploy all robots to search the warehouse"

OrcBot:
1. fleet_move(robotId="robot-001", direction="north")
2. fleet_move(robotId="robot-002", direction="south")
3. fleet_move(robotId="robot-003", direction="east")
4. Monitor all robot statuses
5. Aggregate findings
6. Report results

Bot: 🤖 Search Results:

Robot-001 (North sector):
- Clear, no anomalies

Robot-002 (South sector):
- Detected movement near door

Robot-003 (East sector):  
- Temperature sensor: 28°C (elevated)
- Investigating further

Resources