#
Advanced Usage Guide
Advanced features and patterns for Sonora.
#
Custom Node Configuration
from sonora import SonoraClient
# Multiple nodes with different regions
nodes = [
{
"host": "lavalink-us.example.com",
"port": 2333,
"password": "password1",
"region": "us-east",
"secure": True
},
{
"host": "lavalink-eu.example.com",
"port": 2333,
"password": "password2",
"region": "eu-west",
"secure": True
}
]
client = SonoraClient(
lavalink_nodes=nodes,
node_pooling=True,
reconnect_policy={
"max_retries": 10,
"backoff": "exponential",
"base_delay": 1.0,
"max_delay": 60.0
}
)
#
Advanced Queue Management
#
Queue Manipulation
player = await client.get_player(guild_id)
# Add tracks with position control
await player.queue.add(track, position=1) # Insert at position 1
# Bulk operations
tracks = [track1, track2, track3]
await player.queue.add_multiple(tracks, position=5)
# Move tracks
player.queue.move(from_pos=0, to_pos=2)
#
Smart Features
# Enable adaptive reordering
player.queue.enable_adaptive_reorder = True
# Smart shuffle (preserves recent tracks)
await player.queue.smart_shuffle()
# Get queue statistics
heatmap = player.queue.get_heatmap()
print(f"Most played: {heatmap}")
#
Autoplay Configuration
#
Custom Strategies
from sonora.autoplay import AutoplayEngine
autoplay = player.autoplay
# Configure autoplay
autoplay.enabled = True
autoplay.strategy = "similar_genre"
autoplay.max_history = 20
autoplay.smart_shuffle = True
# Register custom strategy
from sonora.autoplay.strategies import RecommendationStrategy
class CustomStrategy(RecommendationStrategy):
async def recommend(self, context):
# Your custom logic here
return [track1, track2]
autoplay.register_strategy("custom", CustomStrategy(track_provider))
autoplay.strategy = "custom"
#
Context-Aware Recommendations
# Manual recommendation fetch
context = {
"history": player.queue.history,
"current": player.queue.current,
"guild_id": guild_id,
"user_preferences": {...}
}
next_track = await autoplay.fetch_next_track(context)
if next_track:
await player.queue.add(next_track)
#
Audio Filters
#
Advanced Filter Configuration
# Bass boost with custom settings
await player.set_filter("bassboost", gain=0.5, frequency=100)
# Nightcore with speed control
await player.set_filter("nightcore", speed=1.2, pitch=1.1)
# Equalizer presets
await player.set_filter("equalizer", preset="rock")
await player.set_filter("equalizer", preset="pop")
# Custom equalizer
await player.set_filter("equalizer", bands=[
{"band": 0, "gain": 0.5}, # 25 Hz
{"band": 1, "gain": 0.3}, # 40 Hz
{"band": 2, "gain": 0.1}, # 63 Hz
# ... more bands
])
# Filter stacking
await player.set_filter("bassboost", gain=0.3)
await player.set_filter("reverb", room_size=0.8, damping=0.5)
#
Filter Management
# Check active filters
filters = player.filters.active_filters
print(f"Active filters: {list(filters.keys())}")
# Temporarily disable filters
await player.set_filter("mute", duration=10) # Mute for 10 seconds
# Reset all filters
await player.clear_filters()
#
Event Handling
#
Advanced Event Patterns
from sonora import event_manager, EventType
# Middleware for logging all events
@event_manager.use
async def logging_middleware(event):
print(f"Event: {event.type} - {event.data}")
return event # Pass through
# Conditional event handling
@event_manager.on(EventType.TRACK_START)
async def on_track_start(event):
track = event.data['track']
if track.length > 600000: # 10 minutes
# Special handling for long tracks
pass
# Event-driven queue management
@event_manager.on(EventType.QUEUE_EMPTY)
async def on_queue_empty(event):
guild_id = event.data['guild_id']
player = await client.get_player(guild_id)
if player.autoplay.enabled:
# Auto-add recommended tracks
context = {"guild_id": guild_id}
track = await player.autoplay.fetch_next_track(context)
if track:
await player.queue.add(track)
#
Session Management
#
Snapshot Operations
from sonora import snapshot_manager
# Create snapshot
snapshot = snapshot_manager.create_snapshot(player)
# Save to file
filepath = snapshot_manager.save_snapshot(snapshot, "my_session.json")
# Load and restore
loaded_snapshot = snapshot_manager.load_snapshot(filepath)
await snapshot_manager.restore_snapshot(loaded_snapshot, player)
#
Auto-Snapshots
# Enable automatic snapshots
await snapshot_manager.start_auto_snapshot()
# Configure interval (default 5 minutes)
snapshot_manager.auto_snapshot_interval = 300
# Stop auto-snapshots
await snapshot_manager.stop_auto_snapshot()
#
Performance Optimization
#
Connection Pooling
# Optimize for high-load scenarios
client = SonoraClient(
lavalink_nodes=nodes,
node_pooling=True,
performance_mode="overdrive" # Enable performance optimizations
)
#
Memory Management
# Limit queue size to prevent memory issues
player.queue.max_size = 1000
# Clean up old history
player.queue.history_maxlen = 50
# Periodic cleanup
import asyncio
async def cleanup_task():
while True:
await asyncio.sleep(3600) # Every hour
# Force garbage collection
import gc
gc.collect()
asyncio.create_task(cleanup_task())
#
Error Handling
#
Robust Error Recovery
async def safe_play(player, query):
try:
track = await player.play(query)
return track
except TrackLoadError:
# Fallback to search
tracks = await player.search(query, limit=1)
if tracks:
return await player.play(tracks[0])
except NodeException:
# Try different node
await player.switch_node()
return await player.play(query)
except Exception as e:
logger.error(f"Playback failed: {e}")
return None
#
Circuit Breaker Pattern
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "closed"
async def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
else:
raise CircuitBreakerError("Circuit is open")
try:
result = await func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
#
Custom Plugins
#
Plugin Development
from sonora.plugins import Plugin
class MyPlugin(Plugin):
def __init__(self):
super().__init__(
name="my_plugin",
version="1.0.0",
permissions=["track.read"]
)
async def on_load(self):
print("Plugin loaded!")
async def on_enable(self):
print("Plugin enabled!")
async def on_track_start(self, event):
# Custom logic when track starts
track = event.data['track']
# Send to external service, log analytics, etc.
pass
#
Plugin Security
# Plugin code validation
from sonora.security import PluginFirewall
firewall = PluginFirewall()
code = """
import os # Blocked
print('Hello World')
"""
violations = firewall.validate_code(code)
if violations:
print(f"Security violations: {violations}")
else:
print("Code is safe")
#
Monitoring and Metrics
#
Custom Metrics
from sonora import metrics
# Track custom events
metrics.track_event("custom_playback", {
"guild_id": guild_id,
"track_id": track.identifier,
"quality": "high"
})
# Performance monitoring
import time
start_time = time.time()
result = await player.play(query)
duration = time.time() - start_time
metrics.record_latency("track_load", duration)
#
Health Checks
async def health_check():
"""Comprehensive health check for the bot."""
health = {
"client_connected": client.connected,
"nodes": {},
"players": {},
"memory_usage": get_memory_usage(),
"uptime": get_uptime()
}
for node in client.nodes:
health["nodes"][node.host] = {
"connected": node.connected,
"stats": node.stats
}
for guild_id, player in client.players.items():
health["players"][guild_id] = {
"playing": player.current is not None,
"queue_length": player.queue.length,
"connected": player.connected
}
return health