Plugin Development Guide
Introduction
This guide provides a comprehensive walkthrough on developing device driver plugins for the system. We’ll use the SocketCAN driver as a reference implementation to demonstrate best practices and required components.
Plugin Architecture Overview
The plugin system is built on a hierarchical structure:
BasePlugin
: The root class providing basic plugin functionalityBaseDeviceDriver
: ExtendsBasePlugin
with device-specific operationsYour custom driver class: Implements the specific device functionality
Key Components
Device State Management
The system defines several device states that represent the lifecycle of a device:
State
Description
UNKNOWN
Initial state
DISCOVERED
Device found during scanning
INITIALIZED
Device initialized but not connected
CONNECTED
Device connected and ready
ACTIVE
Device actively performing operations
ERROR
Device encountered an error
DISCONNECTED
Device disconnected
Stream Management - Handles real-time data streaming - Manages WebSocket connections - Processes data acquisition
Creating a Device Driver Plugin
Here’s a step-by-step guide to creating a device driver plugin:
Create Plugin File Structure
Create a new Python file in the
plugins/devices/
directory with the prefixdrv_
:plugins/ └── devices/ └── your_device/ └── drv_your_device.py
Basic Plugin Structure
Here’s a template based on the SocketCAN implementation:
import logging from typing import Optional, Dict, List from sat_toolkit.core.base_plugin import BaseDeviceDriver from sat_toolkit.models.Device_Model import Device from sat_toolkit.core.stream_manager import StreamData, StreamType, StreamSource, StreamAction logger = logging.getLogger(__name__) class YourDeviceDriver(BaseDeviceDriver): def __init__(self): super().__init__() self.device = None # Define supported commands self.supported_commands = { "start": "Start device operation", "stop": "Stop device operation", "status": "Get device status" }
Implement Required Methods
Your plugin must implement these core methods:
def _scan_impl(self) -> List[Device]: """Scan for available devices""" try: # Implement device discovery logic devices = [] # Example: scan for devices # devices.append(YourDevice(...)) return devices except Exception as e: logger.error(f"Scan failed: {str(e)}") raise def _initialize_impl(self, device: Device) -> bool: """Initialize the device""" try: # Implement device initialization # Example: setup device parameters return True except Exception as e: logger.error(f"Initialization failed: {str(e)}") raise def _connect_impl(self, device: Device) -> bool: """Connect to the device""" try: # Implement device connection logic return True except Exception as e: logger.error(f"Connection failed: {str(e)}") raise def _command_impl(self, device: Device, command: str, args: Optional[Dict] = None) -> Optional[str]: """Execute device commands""" try: if command == "start": self.start_streaming(device) return "Started streaming" elif command == "stop": self.stop_streaming(device) return "Stopped streaming" else: raise ValueError(f"Unknown command: {command}") except Exception as e: logger.error(f"Command execution failed: {str(e)}") raise def _reset_impl(self, device: Device) -> bool: """Reset the device""" try: # Implement device reset logic return True except Exception as e: logger.error(f"Reset failed: {str(e)}") raise def _close_impl(self, device: Device) -> bool: """Close the device connection""" try: # Implement cleanup logic return True except Exception as e: logger.error(f"Close failed: {str(e)}") raise
Implement Data Streaming
If your device supports data streaming, implement these methods:
def _setup_acquisition(self, device: Device): """Setup for data acquisition""" try: # Initialize data acquisition resources pass except Exception as e: logger.error(f"Setup acquisition failed: {str(e)}") raise def _cleanup_acquisition(self, device: Device): """Cleanup after data acquisition""" try: # Clean up resources pass except Exception as e: logger.error(f"Cleanup acquisition failed: {str(e)}") raise def _acquisition_loop(self): """Main data acquisition loop""" while self.is_acquiring.is_set(): try: # Read data from device data = self._read_device_data() # Create stream data stream_data = StreamData( stream_type=StreamType.YOUR_TYPE, channel=self.device.device_id, timestamp=time.time(), source=StreamSource.SERVER, action=StreamAction.DATA, data=data ) # Broadcast data self.stream_wrapper.broadcast_data(stream_data) except Exception as e: logger.error(f"Error in acquisition loop: {str(e)}") time.sleep(0.1)
Best Practices
Error Handling - Use try-except blocks in all methods - Log errors with appropriate detail - Clean up resources in case of failures
Resource Management - Initialize resources in
_setup_acquisition
- Clean up resources in_cleanup_acquisition
- Implement proper cleanup in_close_impl
Logging - Use the logger for debugging and error tracking - Include relevant context in log messages - Use appropriate log levels (debug, info, error)
State Management - Track device state transitions - Validate state before operations - Handle error states appropriately
Thread Safety - Use thread-safe mechanisms for shared resources - Properly handle thread lifecycle in streaming - Use appropriate synchronization primitives
Integration with Device Manager
The Device Manager automatically discovers and loads plugins:
Place your plugin in the correct directory
Ensure the filename starts with
drv_
Implement all required methods
The Device Manager will handle: - Plugin loading - State management - Command routing - Error handling
Device Lifecycle Management
The Device Manager implements a sophisticated lifecycle management system for devices. Understanding this system is crucial for plugin development.
State Machine
The device lifecycle follows a state machine pattern:
UNKNOWN ──────► DISCOVERED ──────► INITIALIZED ──────► CONNECTED ──────► ACTIVE ▲ │ │ │ └────────────────┘ ▲ DISCONNECTED ◄────── ERROR ◄────────────────┘
State Transitions
Valid state transitions are strictly controlled:
_state_transitions = { DeviceState.UNKNOWN: [DeviceState.DISCOVERED], DeviceState.DISCOVERED: [DeviceState.INITIALIZED], DeviceState.INITIALIZED: [DeviceState.CONNECTED, DeviceState.DISCONNECTED], DeviceState.CONNECTED: [DeviceState.ACTIVE, DeviceState.DISCONNECTED], DeviceState.ACTIVE: [DeviceState.CONNECTED, DeviceState.ERROR], DeviceState.ERROR: [DeviceState.DISCONNECTED], DeviceState.DISCONNECTED: [DeviceState.INITIALIZED] }
Lifecycle Operations
Each operation in the device lifecycle is managed through the
_manage_device_lifecycle
method:# Scanning devices result = manager.scan_devices("drv_your_device") # State: UNKNOWN -> DISCOVERED # Initializing device init_result = manager.initialize_device("drv_your_device", device) # State: DISCOVERED -> INITIALIZED # Connecting device connect_result = manager.connect_device("drv_your_device", device) # State: INITIALIZED -> CONNECTED # Executing commands command_result = manager.execute_command("drv_your_device", "start", device_id=device.device_id) # State: CONNECTED -> ACTIVE -> CONNECTED
Automatic State Management
The Device Manager provides automatic state management features:
Auto-Discovery: When connecting to an unknown device, it automatically triggers device scanning
Auto-Initialization: When connecting to a discovered device, it automatically performs initialization
State Validation: Ensures operations are only performed in valid states
Error Handling: Automatically transitions to ERROR state on failures
Thread Safety
The Device Manager implements thread-safe operations:
# Device-specific locks with self._get_device_lock(device_key): current_state = self.device_states.get(device_key, DeviceState.UNKNOWN) # Perform state transition and operation
Error Recovery
The system provides mechanisms for error recovery:
try: # Attempt device operation result = manager.execute_command("drv_your_device", "start", device_id=device.device_id) except Exception: # Reset device on error manager.reset_device("drv_your_device", device) # Reinitialize if needed manager.initialize_device("drv_your_device", device)
Cleanup and Resource Management
The Device Manager provides methods for proper cleanup:
# Cleanup single device manager.close_device("drv_your_device", device) # Cleanup all devices cleanup_results = manager.cleanup_all_devices()
State Hierarchy
The system maintains a state hierarchy for proper state management:
state_hierarchy = { DeviceState.UNKNOWN: 0, DeviceState.DISCOVERED: 1, DeviceState.INITIALIZED: 2, DeviceState.CONNECTED: 3, DeviceState.ACTIVE: 4 }
Best Practices for Lifecycle Management
State Tracking - Always check device state before operations - Use appropriate state transitions - Handle state changes atomically
Resource Management - Initialize resources in correct state - Clean up resources when transitioning to DISCONNECTED - Use context managers for resource handling
Error Handling - Implement proper error recovery - Use appropriate state transitions on errors - Clean up resources on errors
Thread Safety - Use provided locking mechanisms - Avoid long operations while holding locks - Handle concurrent access properly
Example Lifecycle Management
Here’s a complete example of device lifecycle management:
from sat_toolkit.core.device_manager import DeviceDriverManager
from sat_toolkit.core.device_spec import DeviceState
def manage_device_lifecycle():
manager = DeviceDriverManager()
try:
# Scan for devices
scan_result = manager.scan_devices("drv_your_device")
if scan_result["status"] != "success":
raise RuntimeError("Device scanning failed")
devices = scan_result["devices"]
if not devices:
raise RuntimeError("No devices found")
device = devices[0]
# Initialize device
init_result = manager.initialize_device("drv_your_device", device)
if init_result["status"] != "success":
raise RuntimeError("Device initialization failed")
# Connect to device
connect_result = manager.connect_device("drv_your_device", device)
if connect_result["status"] != "success":
raise RuntimeError("Device connection failed")
# Execute commands
command_result = manager.execute_command(
"drv_your_device",
"start",
device_id=device.device_id
)
if command_result["status"] != "success":
raise RuntimeError("Command execution failed")
# Normal operation...
except Exception as e:
# Error handling
logger.error(f"Device operation failed: {e}")
try:
# Attempt cleanup
manager.close_device("drv_your_device", device)
except Exception as cleanup_error:
logger.error(f"Cleanup failed: {cleanup_error}")
finally:
# Final cleanup
manager.cleanup_all_devices()
Example Usage
Here’s how to use your plugin once it’s integrated:
from sat_toolkit.core.device_manager import DeviceDriverManager
# Get device manager instance
manager = DeviceDriverManager()
# Scan for devices
result = manager.scan_devices("drv_your_device")
if result["status"] == "success":
devices = result["devices"]
# Initialize first device
if devices:
device = devices[0]
init_result = manager.initialize_device("drv_your_device", device)
if init_result["status"] == "success":
# Connect to device
connect_result = manager.connect_device("drv_your_device", device)
if connect_result["status"] == "success":
# Execute command
command_result = manager.execute_command(
"drv_your_device",
"start",
device_id=device.device_id
)
Troubleshooting
Plugin Not Loading - Check file naming (must start with
drv_
) - Verify class inheritance - Check for syntax errorsState Transition Errors - Verify correct state progression - Check initialization sequence - Ensure proper cleanup
Streaming Issues - Check thread management - Verify data format - Monitor resource usage
Resource Leaks - Implement proper cleanup - Use context managers where appropriate - Monitor system resources