LimaCharlie Python SDK Documentation¶
Table of Contents¶
- Overview
- Installation
- Authentication
- Core Classes
- Manager Class
- Sensor Management
- Detection and Response Rules
- Real-time Data Streaming
- Artifacts and Payloads
- Event Ingestion
- Advanced Features
- Error Handling
- Complete Examples
Overview¶
The LimaCharlie Python SDK provides a comprehensive interface for interacting with the LimaCharlie SecOps Cloud Platform. This SDK enables programmatic access to all platform features including sensor management, detection and response rules, real-time event streaming, and artifact collection.
Key Features¶
- Organization Management: Create, configure, and manage LimaCharlie organizations
- Sensor Operations: Deploy, monitor, and control endpoint sensors
- Detection & Response: Create and manage detection rules with automated response actions
- Real-time Streaming: Receive events, detections, and audit logs in real-time
- Artifact Management: Collect and manage forensic artifacts
- Event Processing: Ingest and query historical events
- API Integration: Full access to LimaCharlie REST API endpoints
SDK Version¶
Current version: 4.9.24
Installation¶
Requirements¶
- Python 3.6 or higher
- pip package manager
Install via pip¶
Install from source¶
git clone https://github.com/refractionPOINT/python-limacharlie.git
cd python-limacharlie
pip install -r requirements.txt
python setup.py install
Dependencies¶
Authentication¶
Authentication Methods¶
The LimaCharlie SDK supports multiple authentication methods:
- API Key Authentication (Recommended for automation)
- OAuth Authentication (For user-based access)
- JWT Token Authentication (For temporary access)
- Environment-based Authentication (Using configuration files)
API Key Authentication¶
import limacharlie
# Direct API key authentication
manager = limacharlie.Manager(
oid='YOUR_ORGANIZATION_ID', # UUID format
secret_api_key='YOUR_API_KEY' # UUID format
)
Environment-based Authentication¶
The SDK can read credentials from environment variables or configuration files:
# Using environment variables
# Set: LC_OID="your-org-id"
# Set: LC_API_KEY="your-api-key"
manager = limacharlie.Manager()
# Using a specific environment from config file
manager = limacharlie.Manager(environment='production')
Configuration File Format¶
Create a file at ~/.limacharlie or specify with LC_CREDS_FILE environment variable:
# Default credentials
oid: "12345678-1234-1234-1234-123456789012"
api_key: "87654321-4321-4321-4321-210987654321"
# Named environments
env:
production:
oid: "12345678-1234-1234-1234-123456789012"
api_key: "87654321-4321-4321-4321-210987654321"
staging:
oid: "87654321-4321-4321-4321-210987654321"
api_key: "12345678-1234-1234-1234-123456789012"
dev:
oid: "11111111-1111-1111-1111-111111111111"
api_key: "22222222-2222-2222-2222-222222222222"
OAuth Authentication¶
# OAuth configuration in ~/.limacharlie
oauth:
client_id: "your-client-id"
client_secret: "your-client-secret"
refresh_token: "your-refresh-token"
id_token: "your-id-token"
User-based Authentication¶
# Authenticate as a user instead of organization
manager = limacharlie.Manager(
uid='USER_ID',
secret_api_key='USER_API_KEY'
)
Core Classes¶
Class Hierarchy¶
limacharlie
├── Manager # Main entry point for API operations
├── Sensor # Individual sensor management
├── Firehose # Real-time data streaming (push)
├── Spout # Real-time data streaming (pull)
├── Configs # Configuration management
├── Payloads # Artifact and payload management
├── Logs/Artifacts # Log and artifact retrieval
├── Hive # Hive data storage
├── Extensions # Add-on extensions
├── Billing # Billing and usage information
├── User # User management
├── UserPreferences # User preferences
├── Webhook # Webhook management
├── WebhookSender # Send data to webhooks
├── Jobs # Background job management
├── Query # Query builder for LCQL
├── Replay # Event replay functionality
├── Search # Search operations
├── SpotCheck # Spot check operations
└── Replicants # Service replicants (external services)
Manager Class¶
The Manager class is the primary entry point for interacting with LimaCharlie.
Initialization¶
import limacharlie
# Basic initialization
manager = limacharlie.Manager(
oid='ORG_ID',
secret_api_key='API_KEY'
)
# Advanced initialization with all parameters
manager = limacharlie.Manager(
oid='ORG_ID', # Organization ID (UUID)
secret_api_key='API_KEY', # API Key (UUID)
environment='production', # Named environment from config
inv_id='investigation-123', # Investigation ID for tracking
print_debug_fn=print, # Debug output function
is_interactive=True, # Enable interactive mode
extra_params={'key': 'value'}, # Additional parameters
jwt='existing-jwt-token', # Use existing JWT
uid='USER_ID', # User ID for user-based auth
onRefreshAuth=callback_func, # Callback on auth refresh
isRetryQuotaErrors=True # Auto-retry on quota errors
)
Core Manager Methods¶
Organization Management¶
# Get organization information
org_info = manager.getOrgInfo()
# Returns: {'oid': '...', 'name': '...', 'tier': '...', ...}
# Get organization URLs
urls = manager.getOrgURLs()
# Returns: {'webapp': 'https://...', 'api': 'https://...', ...}
# Get organization configuration
config = manager.getOrgConfig()
# Returns: {'sensor_quota': 100, 'retention': 30, ...}
# Update organization configuration
manager.setOrgConfig({
'sensor_quota': 200,
'retention': 60
})
# Get billing information
billing = limacharlie.Billing(manager)
billing_info = billing.getInfo()
Sensor Listing and Search¶
# List all sensors
sensors = manager.sensors()
# Returns: {'sensor-id-1': {...}, 'sensor-id-2': {...}, ...}
# List sensors with details
detailed_sensors = manager.sensors(with_details=True)
# Get sensor count
online_count = manager.getOnlineHosts()
# Returns: 42
# Search sensors by hostname
matching_sensors = manager.getHostnames(['web-server-*', 'db-*'])
# Returns: {'web-server-01': 'sensor-id-1', ...}
# Get sensors by tag
tagged_sensors = manager.getSensorsWithTag('production')
# Returns: ['sensor-id-1', 'sensor-id-2', ...]
# Get sensor information
sensor_info = manager.getSensorInfo('SENSOR_ID')
# Returns: {'hostname': '...', 'platform': '...', 'architecture': '...', ...}
Installation Keys¶
# List installation keys
keys = manager.getInstallationKeys()
# Returns: [{'key': '...', 'tags': [...], 'desc': '...', ...}, ...]
# Create new installation key
new_key = manager.setInstallationKey(
key_name='prod-servers',
tags=['production', 'server'],
desc='Production server deployment key'
)
# Returns: 'INSTALLATION_KEY_STRING'
# Delete installation key
manager.delInstallationKey('INSTALLATION_KEY_ID')
Outputs Management¶
# List all outputs
outputs = manager.outputs()
# Returns: {'output1': {...}, 'output2': {...}, ...}
# Add new output
manager.add_output(
name='siem_output',
module='syslog',
type='detect',
config={
'dest_host': 'siem.example.com',
'dest_port': 514,
'protocol': 'tcp'
}
)
# Delete output
manager.del_output('siem_output')
Artifact Collection¶
# List available artifacts
artifacts = manager.getArtifacts(
start_time=1234567890, # Unix timestamp
end_time=1234567999
)
# Get specific artifact
artifact_data = manager.getArtifact('ARTIFACT_ID')
# Get artifact as JSON
artifact_json = manager.getArtifactAsJson('ARTIFACT_ID')
Service Management¶
# List services
services = manager.getServices()
# Subscribe to a service
manager.subscribeToService('virustotal')
# Unsubscribe from service
manager.unsubscribeFromService('virustotal')
# Get service configuration
service_config = manager.getServiceConfig('virustotal')
# Update service configuration
manager.setServiceConfig('virustotal', {
'api_key': 'YOUR_VT_API_KEY'
})
Sensor Management¶
The Sensor class provides detailed control over individual sensors.
Creating a Sensor Object¶
# Create from Manager
sensor = manager.sensor('SENSOR_ID')
# Direct instantiation
from limacharlie import Sensor
sensor = Sensor(manager, 'SENSOR_ID')
# With detailed info pre-loaded
sensor = Sensor(manager, 'SENSOR_ID', detailedInfo={...})
Sensor Properties and Methods¶
Getting Sensor Information¶
# Check if sensor is online
is_online = sensor.isOnline()
# Returns: True/False
# Get sensor hostname
hostname = sensor.getHostname()
# Returns: 'web-server-01'
# Get platform information
platform = sensor.getPlatform()
# Returns: 'windows', 'linux', 'macos', etc.
# Get architecture
architecture = sensor.getArchitecture()
# Returns: 'x64', 'x86', 'arm64', etc.
# Get full sensor information
info = sensor.getInfo()
# Returns: {'hostname': '...', 'platform': '...', 'last_seen': ..., ...}
# Get sensor tags
tags = sensor.getTags()
# Returns: ['production', 'web-server', ...]
Sending Tasks to Sensors¶
# Send a single task
response = sensor.task('os_info')
# Send multiple tasks
responses = sensor.task([
'os_info',
'os_packages',
'os_services'
])
# Task with investigation ID
sensor.setInvId('investigation-123')
response = sensor.task('os_processes')
# Direct task with inv_id
response = sensor.task('os_processes', inv_id='investigation-456')
Interactive Task Execution¶
# Manager must be in interactive mode
manager = limacharlie.Manager(
oid='ORG_ID',
secret_api_key='API_KEY',
is_interactive=True,
inv_id='investigation-123'
)
sensor = manager.sensor('SENSOR_ID')
# Request with future results
future = sensor.request('os_info')
# Wait for results
result = future.getNewResponses(timeout=30)
# Simple request (blocking)
result = sensor.simpleRequest('os_info', timeout=30)
# Request with completion tracking
results = sensor.simpleRequest(
['os_info', 'os_processes'],
timeout=60,
until_completion=True # Wait for all tasks to complete
)
# Request with callback
def process_response(event):
print(f"Received: {event}")
sensor.simpleRequest(
'os_processes',
timeout=30,
until_completion=process_response # Called for each response
)
Sensor Isolation¶
# Isolate sensor from network
sensor.isolate()
# Re-join sensor to network
sensor.rejoin()
# Check isolation status
is_isolated = sensor.isIsolated()
Sensor Management¶
# Tag management
sensor.addTag('critical')
sensor.removeTag('test')
# Wait for sensor to come online
came_online = sensor.waitToComeOnline(timeout=300) # 5 minutes
# Returns: True if online, False if timeout
# Delete sensor
sensor.delete()
# Drain sensor (mark for deletion after offline)
sensor.drain()
Available Sensor Tasks¶
Common tasks that can be sent to sensors:
# System Information
'os_info' # Get OS information
'os_packages' # List installed packages
'os_services' # List running services
'os_processes' # List running processes
'os_autoruns' # List autorun entries
'os_drivers' # List loaded drivers
# File Operations
'file_info <path>' # Get file information
'file_get <path>' # Retrieve file content
'file_del <path>' # Delete file
'file_mov <src> <dst>' # Move file
'file_hash <path>' # Get file hash
# Process Operations
'mem_map <pid>' # Get process memory map
'mem_read <pid> <address> <size>' # Read process memory
'mem_strings <pid>' # Get process strings
'kill <pid>' # Kill process
# Network Operations
'netstat' # Get network connections
'dns_resolve <domain>' # Resolve DNS
# Registry Operations (Windows)
'reg_list <path>' # List registry keys
'reg_get <path>' # Get registry value
# History and Forensics
'history_dump' # Dump sensor history
'hidden_module_scan' # Scan for hidden modules
'exec_oob_scan' # Out-of-band executable scan
Detection and Response Rules¶
Note: The direct D&R rule methods (rules(), add_rule(), del_rule(), add_fp(), del_fp()) are deprecated. Please use the Hive accessors instead for managing Detection & Response rules. See the Hive Operations section for details on using the key-value storage system for rule management.
Real-time Data Streaming¶
Firehose (Push-based Streaming)¶
The Firehose class receives data pushed from LimaCharlie:
import limacharlie
manager = limacharlie.Manager('ORG_ID', 'API_KEY')
# Create a Firehose for events
firehose = limacharlie.Firehose(
manager,
listen_on='0.0.0.0:4443', # Listen interface and port
data_type='event', # event, detect, or audit
name='my_firehose', # Output name in LC
public_dest='1.2.3.4:4443', # Public IP:port for LC to connect
ssl_cert='/path/to/cert.pem', # Optional SSL cert
ssl_key='/path/to/key.pem', # Optional SSL key
is_parse=True, # Parse JSON automatically
max_buffer=1024, # Max messages to buffer
inv_id='investigation-123', # Filter by investigation ID
tag='production', # Filter by sensor tag
sid='SENSOR_ID' # Filter by sensor ID
)
# Start receiving data
firehose.start()
# Process events
while True:
event = firehose.queue.get()
print(f"Received event: {event}")
# Process event
if event.get('event_type') == 'PROCESS_START':
process_path = event.get('event', {}).get('FILE_PATH')
print(f"Process started: {process_path}")
# Shutdown
firehose.shutdown()
Spout (Pull-based Streaming)¶
The Spout class pulls data from LimaCharlie:
# Create a Spout for detections
spout = limacharlie.Spout(
manager,
data_type='detect', # event, detect, audit, or tailored
is_parse=True, # Parse JSON
inv_id='investigation-123',
tag='critical',
cat='malware', # Detection category filter
sid='SENSOR_ID',
is_compressed=True, # Use compression
rel_parent_events=True, # Include parent events
rel_child_events=True # Include child events
)
# Process detections
for detection in spout:
print(f"Detection: {detection['detect_name']}")
print(f"Sensor: {detection['sid']}")
print(f"Event: {detection['event']}")
# Take action based on detection
if detection['detect_name'] == 'ransomware_behavior':
sensor = manager.sensor(detection['sid'])
sensor.isolate()
# Shutdown
spout.shutdown()
Tailored Event Streaming¶
# Stream specific event types
spout = limacharlie.Spout(
manager,
data_type='tailored',
event_type=['PROCESS_START', 'NETWORK_CONNECT'],
is_parse=True
)
for event in spout:
if event['event_type'] == 'PROCESS_START':
print(f"Process: {event['event']['FILE_PATH']}")
elif event['event_type'] == 'NETWORK_CONNECT':
print(f"Connection: {event['event']['DESTINATION']}")
Artifacts and Payloads¶
Managing Artifacts¶
# Initialize Artifacts/Logs manager
artifacts = limacharlie.Artifacts(manager)
# List artifacts in time range
artifact_list = artifacts.listArtifacts(
start_time=1234567890,
end_time=1234567999,
sid='SENSOR_ID', # Optional: filter by sensor
type='log' # Optional: filter by type
)
# Get specific artifact
artifact_data = artifacts.getArtifact('ARTIFACT_ID')
# Get artifact metadata
metadata = artifacts.getArtifactMetadata('ARTIFACT_ID')
# Delete artifact
artifacts.deleteArtifact('ARTIFACT_ID')
Payloads Management¶
# Initialize Payloads manager
payloads = limacharlie.Payloads(manager)
# List available payloads
payload_list = payloads.list()
# Create a new payload
payload_id = payloads.create(
name='custom_script',
data=b'#!/bin/bash\necho "Hello World"',
description='Custom bash script'
)
# Get payload
payload_data = payloads.get('PAYLOAD_ID')
# Delete payload
payloads.delete('PAYLOAD_ID')
# Use payload in a task
sensor.task(f'run_payload {payload_id}')
Event Ingestion¶
Ingesting Custom Events¶
# Ingest a single event
manager.ingestEvent({
'event_type': 'CUSTOM_EVENT',
'timestamp': 1234567890,
'data': {
'key': 'value',
'severity': 'high'
}
})
# Batch ingestion
events = [
{
'event_type': 'CUSTOM_LOGIN',
'timestamp': 1234567890,
'user': 'admin',
'source_ip': '192.168.1.100'
},
{
'event_type': 'CUSTOM_FILE_ACCESS',
'timestamp': 1234567891,
'file': '/etc/passwd',
'user': 'admin'
}
]
for event in events:
manager.ingestEvent(event)
Ingesting Third-party Logs¶
# Ingest syslog events
def ingest_syslog(syslog_line):
# Parse syslog format
parsed = parse_syslog(syslog_line)
manager.ingestEvent({
'event_type': 'SYSLOG',
'timestamp': parsed['timestamp'],
'hostname': parsed['hostname'],
'facility': parsed['facility'],
'severity': parsed['severity'],
'message': parsed['message']
})
# Ingest Windows Event Logs
def ingest_windows_event(event_xml):
# Parse Windows Event XML
parsed = parse_windows_event(event_xml)
manager.ingestEvent({
'event_type': 'WINDOWS_EVENT',
'timestamp': parsed['TimeCreated'],
'event_id': parsed['EventID'],
'provider': parsed['Provider'],
'level': parsed['Level'],
'data': parsed['EventData']
})
Advanced Features¶
Hive Operations¶
The Hive is LimaCharlie's key-value storage system:
# Initialize Hive
hive = limacharlie.Hive(manager)
# Store data
hive_record = limacharlie.HiveRecord(
hive_name='threat_intel',
key='malware_hash_123',
data={
'hash': 'abc123...',
'type': 'ransomware',
'first_seen': 1234567890
},
ttl=86400 # TTL in seconds
)
hive.set(hive_record)
# Get data
record = hive.get('threat_intel', 'malware_hash_123')
print(record.data)
# List keys
keys = hive.list('threat_intel')
# Delete data
hive.delete('threat_intel', 'malware_hash_123')
# Bulk operations
records = [
limacharlie.HiveRecord('intel', f'hash_{i}', {'value': i})
for i in range(100)
]
hive.setBulk(records)
Query Language (LCQL)¶
from limacharlie import Query
# Build queries programmatically
query = Query()
# Simple query
results = manager.query(
query='event_type == "PROCESS_START" AND event.FILE_PATH contains "powershell"',
limit=100
)
# Complex query with time range
results = manager.query(
query='''
event_type == "NETWORK_CONNECT"
AND event.DESTINATION contains "suspicious.com"
AND timestamp > now() - 86400
''',
start_time=time.time() - 86400,
end_time=time.time(),
limit=1000
)
# Query with aggregation
results = manager.query(
query='event_type == "PROCESS_START"',
select=['event.FILE_PATH', 'count()'],
group_by=['event.FILE_PATH'],
order_by='count() DESC'
)
Replay Functionality¶
# Initialize Replay
replay = limacharlie.Replay(manager)
# Start replay of historical events
replay.start(
start_time=time.time() - 3600, # 1 hour ago
end_time=time.time(),
filters={
'event_type': ['PROCESS_START', 'FILE_CREATE'],
'sid': 'SENSOR_ID'
},
rules=['suspicious_process', 'ransomware_behavior'] # Test specific rules
)
# Get replay results
results = replay.getResults()
for result in results:
print(f"Rule triggered: {result['rule_name']}")
print(f"Event: {result['event']}")
Jobs Management¶
# Create a job
job = limacharlie.Job(manager)
job_id = job.create(
name='daily_scan',
schedule='0 2 * * *', # Cron format
task='full_scan',
sensors=['SENSOR_ID_1', 'SENSOR_ID_2']
)
# List jobs
jobs = job.list()
# Get job status
status = job.getStatus('JOB_ID')
# Cancel job
job.cancel('JOB_ID')
Extensions Management¶
# Initialize Extensions
extensions = limacharlie.Extension(manager)
# List available extensions
available = extensions.list()
# Install extension
extensions.install('velociraptor')
# Configure extension
extensions.configure('velociraptor', {
'api_key': 'YOUR_API_KEY',
'server_url': 'https://velociraptor.example.com'
})
# Uninstall extension
extensions.uninstall('velociraptor')
Error Handling¶
Exception Types¶
from limacharlie.utils import LcApiException
try:
manager = limacharlie.Manager('INVALID_OID', 'INVALID_KEY')
except LcApiException as e:
print(f"API Error: {e}")
# Handle authentication error
try:
sensor = manager.sensor('INVALID_SENSOR_ID')
sensor.task('os_info')
except LcApiException as e:
if 'not found' in str(e):
print("Sensor not found")
elif 'offline' in str(e):
print("Sensor is offline")
else:
print(f"Unexpected error: {e}")
Retry Logic¶
import time
from limacharlie.utils import LcApiException
def retry_operation(func, max_retries=3, delay=1):
"""Retry an operation with exponential backoff"""
for attempt in range(max_retries):
try:
return func()
except LcApiException as e:
if 'rate limit' in str(e).lower():
wait_time = delay * (2 ** attempt)
print(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
raise Exception(f"Failed after {max_retries} attempts")
# Usage
result = retry_operation(lambda: manager.sensors())
Handling Quota Errors¶
# Enable automatic retry on quota errors
manager = limacharlie.Manager(
oid='ORG_ID',
secret_api_key='API_KEY',
isRetryQuotaErrors=True # Auto-retry on HTTP 429
)
# Manual handling
try:
sensors = manager.sensors()
except LcApiException as e:
if e.http_code == 429: # Too Many Requests
retry_after = e.headers.get('Retry-After', 60)
print(f"Quota exceeded, retry after {retry_after}s")
time.sleep(int(retry_after))
sensors = manager.sensors() # Retry
Complete Examples¶
Example 1: Automated Threat Hunting¶
import limacharlie
import time
from datetime import datetime, timedelta
class ThreatHunter:
def __init__(self, org_id, api_key):
self.manager = limacharlie.Manager(org_id, api_key)
self.suspicious_processes = [
'mimikatz.exe',
'lazagne.exe',
'pwdump.exe',
'procdump.exe'
]
def hunt_suspicious_processes(self):
"""Hunt for suspicious processes across all sensors"""
print("Starting threat hunt...")
# Get all online sensors
sensors = self.manager.sensors()
online_sensors = [
sid for sid, info in sensors.items()
if info.get('online', False)
]
print(f"Found {len(online_sensors)} online sensors")
findings = []
for sid in online_sensors:
sensor = self.manager.sensor(sid)
hostname = sensor.getHostname()
print(f"Scanning {hostname}...")
# Get running processes
try:
result = sensor.simpleRequest('os_processes', timeout=30)
if result and 'processes' in result:
for process in result['processes']:
process_name = process.get('name', '').lower()
for suspicious in self.suspicious_processes:
if suspicious.lower() in process_name:
findings.append({
'sensor': sid,
'hostname': hostname,
'process': process_name,
'pid': process.get('pid'),
'path': process.get('path')
})
print(f" [!] Found suspicious process: {process_name}")
except Exception as e:
print(f" Error scanning {hostname}: {e}")
return findings
def monitor_detections(self, duration=3600):
"""Monitor for detections in real-time"""
print(f"Monitoring detections for {duration} seconds...")
spout = limacharlie.Spout(
self.manager,
data_type='detect',
is_parse=True
)
start_time = time.time()
try:
for detection in spout:
if time.time() - start_time > duration:
break
print(f"\n[DETECTION] {detection['detect_name']}")
print(f" Sensor: {detection['hostname']} ({detection['sid']})")
print(f" Time: {datetime.fromtimestamp(detection['ts'])}")
# Auto-respond to critical detections
if detection.get('priority', 0) >= 4:
sensor = self.manager.sensor(detection['sid'])
print(f" [!] High priority detection - isolating sensor")
sensor.isolate()
# Collect forensics
sensor.task([
'history_dump',
'os_processes',
'netstat'
])
finally:
spout.shutdown()
def generate_hunt_report(self, findings):
"""Generate a hunting report"""
report = {
'timestamp': datetime.now().isoformat(),
'total_sensors': len(self.manager.sensors()),
'findings_count': len(findings),
'findings': findings,
'recommendations': []
}
if findings:
report['recommendations'].append(
"Suspicious processes detected - investigate immediately"
)
report['recommendations'].append(
"Consider isolating affected systems"
)
report['recommendations'].append(
"Review process execution history on affected systems"
)
return report
# Usage
if __name__ == "__main__":
hunter = ThreatHunter('ORG_ID', 'API_KEY')
# Hunt for suspicious processes
findings = hunter.hunt_suspicious_processes()
# Generate report
report = hunter.generate_hunt_report(findings)
print(f"\nHunt Report: {report}")
# Monitor for new detections
hunter.monitor_detections(duration=3600)
Example 2: Incident Response Automation¶
import limacharlie
import json
from datetime import datetime
import hashlib
class IncidentResponder:
def __init__(self, org_id, api_key):
self.manager = limacharlie.Manager(
org_id,
api_key,
is_interactive=True,
inv_id=f"incident_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
self.artifacts = limacharlie.Artifacts(self.manager)
self.hive = limacharlie.Hive(self.manager)
def respond_to_incident(self, sensor_id, incident_type):
"""Orchestrate incident response"""
print(f"Responding to {incident_type} on sensor {sensor_id}")
sensor = self.manager.sensor(sensor_id)
hostname = sensor.getHostname()
# Create incident record
incident = {
'id': hashlib.md5(f"{sensor_id}{datetime.now()}".encode()).hexdigest(),
'type': incident_type,
'sensor': sensor_id,
'hostname': hostname,
'timestamp': datetime.now().isoformat(),
'status': 'investigating',
'artifacts': [],
'actions': []
}
# Step 1: Isolate if critical
if incident_type in ['ransomware', 'data_theft', 'backdoor']:
print(f" Isolating {hostname}...")
sensor.isolate()
incident['actions'].append({
'action': 'isolate',
'timestamp': datetime.now().isoformat()
})
# Step 2: Collect forensics
print(" Collecting forensic data...")
forensics_tasks = [
'os_processes',
'netstat',
'os_autoruns',
'history_dump'
]
for task in forensics_tasks:
try:
result = sensor.simpleRequest(task, timeout=60)
if result:
# Store in Hive for analysis
self.hive.set(limacharlie.HiveRecord(
hive_name='incidents',
key=f"{incident['id']}_{task}",
data=result,
ttl=2592000 # 30 days
))
incident['artifacts'].append(task)
except Exception as e:
print(f" Failed to collect {task}: {e}")
# Step 3: Collect memory dump for critical incidents
if incident_type in ['ransomware', 'backdoor']:
print(" Collecting memory dump...")
try:
sensor.task('os_memory_dump')
incident['actions'].append({
'action': 'memory_dump',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
print(f" Memory dump failed: {e}")
# Step 4: Kill malicious processes
if incident_type == 'malware':
self.kill_malicious_processes(sensor)
incident['actions'].append({
'action': 'kill_processes',
'timestamp': datetime.now().isoformat()
})
# Step 5: Update incident status
incident['status'] = 'contained'
self.hive.set(limacharlie.HiveRecord(
hive_name='incidents',
key=incident['id'],
data=incident,
ttl=7776000 # 90 days
))
return incident
def kill_malicious_processes(self, sensor):
"""Kill known malicious processes"""
malicious_patterns = [
'mimikatz',
'cobalt',
'empire',
'meterpreter'
]
try:
result = sensor.simpleRequest('os_processes', timeout=30)
if result and 'processes' in result:
for process in result['processes']:
process_name = process.get('name', '').lower()
for pattern in malicious_patterns:
if pattern in process_name:
print(f" Killing {process_name} (PID: {process['pid']})")
sensor.task(f"kill {process['pid']}")
except Exception as e:
print(f" Error killing processes: {e}")
def analyze_network_connections(self, sensor_id):
"""Analyze network connections for IOCs"""
sensor = self.manager.sensor(sensor_id)
try:
result = sensor.simpleRequest('netstat', timeout=30)
suspicious_connections = []
if result and 'connections' in result:
for conn in result['connections']:
# Check for suspicious ports
if conn.get('dst_port') in [4444, 5555, 31337, 1337]:
suspicious_connections.append(conn)
# Check for known C2 IPs (would come from threat intel)
# This is a placeholder - real implementation would check against threat intel
if conn.get('dst_ip') in ['1.2.3.4', '5.6.7.8']:
suspicious_connections.append(conn)
return suspicious_connections
except Exception as e:
print(f"Error analyzing network: {e}")
return []
def generate_incident_timeline(self, incident_id):
"""Generate timeline of incident events"""
timeline = []
# Get incident data from Hive
incident = self.hive.get('incidents', incident_id)
if incident:
# Query events around incident time
results = self.manager.query(
query=f"sid == '{incident.data['sensor']}'",
start_time=datetime.fromisoformat(incident.data['timestamp']).timestamp() - 3600,
end_time=datetime.fromisoformat(incident.data['timestamp']).timestamp() + 3600,
limit=1000
)
for event in results:
timeline.append({
'timestamp': event['ts'],
'event_type': event['event_type'],
'summary': self.summarize_event(event)
})
return sorted(timeline, key=lambda x: x['timestamp'])
def summarize_event(self, event):
"""Create human-readable event summary"""
event_type = event.get('event_type')
if event_type == 'PROCESS_START':
return f"Process started: {event.get('event', {}).get('FILE_PATH', 'unknown')}"
elif event_type == 'NETWORK_CONNECT':
return f"Network connection to {event.get('event', {}).get('DESTINATION', 'unknown')}"
elif event_type == 'FILE_CREATE':
return f"File created: {event.get('event', {}).get('FILE_PATH', 'unknown')}"
else:
return f"Event: {event_type}"
# Usage
if __name__ == "__main__":
responder = IncidentResponder('ORG_ID', 'API_KEY')
# Respond to a ransomware incident
incident = responder.respond_to_incident(
sensor_id='SENSOR_ID',
incident_type='ransomware'
)
print(f"\nIncident Response Complete:")
print(json.dumps(incident, indent=2))
# Generate timeline
timeline = responder.generate_incident_timeline(incident['id'])
print(f"\nIncident Timeline: {len(timeline)} events")
Example 3: Compliance and Audit Automation¶
import limacharlie
import csv
from datetime import datetime, timedelta
import json
class ComplianceAuditor:
def __init__(self, org_id, api_key):
self.manager = limacharlie.Manager(org_id, api_key)
self.required_software = {
'windows': ['Windows Defender', 'BitLocker'],
'linux': ['auditd', 'aide'],
'macos': ['XProtect', 'Gatekeeper']
}
def audit_sensor_compliance(self, sensor_id):
"""Audit individual sensor for compliance"""
sensor = self.manager.sensor(sensor_id)
compliance_report = {
'sensor_id': sensor_id,
'hostname': sensor.getHostname(),
'platform': sensor.getPlatform(),
'timestamp': datetime.now().isoformat(),
'checks': {},
'compliant': True
}
# Check 1: Sensor online status
compliance_report['checks']['sensor_online'] = {
'status': sensor.isOnline(),
'required': True
}
if not sensor.isOnline():
compliance_report['compliant'] = False
# Check 2: Required software installed
if sensor.isOnline():
platform = sensor.getPlatform()
required = self.required_software.get(platform, [])
try:
result = sensor.simpleRequest('os_packages', timeout=60)
installed_packages = []
if result and 'packages' in result:
installed_packages = [p.get('name', '') for p in result['packages']]
for software in required:
found = any(software.lower() in pkg.lower() for pkg in installed_packages)
compliance_report['checks'][f'software_{software}'] = {
'status': found,
'required': True
}
if not found:
compliance_report['compliant'] = False
except Exception as e:
compliance_report['checks']['software_audit'] = {
'status': False,
'error': str(e)
}
compliance_report['compliant'] = False
# Check 3: Security patches
if sensor.isOnline():
try:
# Check for critical patches (platform-specific)
if platform == 'windows':
result = sensor.simpleRequest('os_patches', timeout=60)
if result:
missing_critical = [
p for p in result.get('missing', [])
if p.get('severity') == 'Critical'
]
compliance_report['checks']['critical_patches'] = {
'status': len(missing_critical) == 0,
'missing_count': len(missing_critical),
'required': True
}
if missing_critical:
compliance_report['compliant'] = False
except Exception as e:
pass # Not all platforms support patch checking
return compliance_report
def audit_organization_compliance(self):
"""Audit entire organization for compliance"""
print("Starting organization-wide compliance audit...")
sensors = self.manager.sensors()
audit_results = []
for sid in sensors:
print(f"Auditing sensor {sid}...")
report = self.audit_sensor_compliance(sid)
audit_results.append(report)
# Generate summary
summary = {
'timestamp': datetime.now().isoformat(),
'total_sensors': len(sensors),
'compliant_sensors': sum(1 for r in audit_results if r['compliant']),
'non_compliant_sensors': sum(1 for r in audit_results if not r['compliant']),
'compliance_rate': 0,
'details': audit_results
}
if summary['total_sensors'] > 0:
summary['compliance_rate'] = (
summary['compliant_sensors'] / summary['total_sensors'] * 100
)
return summary
def generate_audit_log(self, days=30):
"""Generate audit log for compliance reporting"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# Query audit events
audit_events = self.manager.query(
query='event_type == "AUDIT"',
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
limit=10000
)
# Process and categorize audit events
audit_log = {
'period': {
'start': start_time.isoformat(),
'end': end_time.isoformat()
},
'events': {
'authentication': [],
'authorization': [],
'configuration_changes': [],
'data_access': []
},
'summary': {}
}
for event in audit_events:
event_data = event.get('event', {})
audit_type = event_data.get('type', 'unknown')
if 'auth' in audit_type.lower():
audit_log['events']['authentication'].append(event)
elif 'config' in audit_type.lower():
audit_log['events']['configuration_changes'].append(event)
elif 'access' in audit_type.lower():
audit_log['events']['data_access'].append(event)
else:
audit_log['events']['authorization'].append(event)
# Generate summary statistics
audit_log['summary'] = {
'total_events': len(audit_events),
'authentication_events': len(audit_log['events']['authentication']),
'configuration_changes': len(audit_log['events']['configuration_changes']),
'data_access_events': len(audit_log['events']['data_access'])
}
return audit_log
def export_compliance_report(self, audit_results, filename='compliance_report.csv'):
"""Export compliance audit results to CSV"""
with open(filename, 'w', newline='') as csvfile:
fieldnames = [
'sensor_id', 'hostname', 'platform',
'compliant', 'timestamp', 'issues'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in audit_results['details']:
issues = []
for check, status in result['checks'].items():
if not status.get('status', False):
issues.append(check)
writer.writerow({
'sensor_id': result['sensor_id'],
'hostname': result['hostname'],
'platform': result['platform'],
'compliant': result['compliant'],
'timestamp': result['timestamp'],
'issues': ', '.join(issues)
})
print(f"Compliance report exported to {filename}")
# Usage
if __name__ == "__main__":
auditor = ComplianceAuditor('ORG_ID', 'API_KEY')
# Perform organization-wide audit
audit_results = auditor.audit_organization_compliance()
print(f"\nCompliance Audit Summary:")
print(f" Total Sensors: {audit_results['total_sensors']}")
print(f" Compliant: {audit_results['compliant_sensors']}")
print(f" Non-Compliant: {audit_results['non_compliant_sensors']}")
print(f" Compliance Rate: {audit_results['compliance_rate']:.1f}%")
# Export results
auditor.export_compliance_report(audit_results)
# Generate audit log
audit_log = auditor.generate_audit_log(days=30)
print(f"\nAudit Log Summary (Last 30 days):")
print(json.dumps(audit_log['summary'], indent=2))
Best Practices¶
Performance Optimization¶
-
Batch Operations
-
Use Streaming for Large Datasets
-
Connection Pooling
Security Best Practices¶
-
Credential Management
-
Least Privilege
-
Audit Logging
Error Handling Best Practices¶
-
Graceful Degradation
-
Retry with Backoff
-
Timeout Handling
Troubleshooting¶
Common Issues and Solutions¶
-
Authentication Failures
-
Sensor Offline
-
Rate Limiting
-
Network Issues
API Reference Links¶
- LimaCharlie REST API Documentation
- LimaCharlie Python SDK GitHub
- LimaCharlie Documentation
- LimaCharlie Web Console
Support¶
For SDK issues or questions: - GitHub Issues: https://github.com/refractionPOINT/python-limacharlie/issues - LimaCharlie Support: support@limacharlie.io - Community Slack: https://slack.limacharlie.io