#
Advanced Examples
Complex usage patterns and real-world implementations.
#
Multi-Guild Bot Architecture
#
Sharded Bot Setup
import discord
from discord.ext import commands
from sonora import SonoraClient
import asyncio
class MusicBot(commands.Bot):
def __init__(self):
intents = discord.Intents.default()
intents.message_content = True
super().__init__(
command_prefix='!',
intents=intents,
shard_count=3 # Multi-shard for large bots
)
# Separate Sonora clients per shard
self.sonora_clients = {}
async def setup_hook(self):
# Create client per shard
for shard_id in range(self.shard_count):
client = SonoraClient(
lavalink_nodes=[
{
"host": f"lavalink-{shard_id}.example.com",
"port": 2333,
"password": "password"
}
],
user_id=self.user.id if self.user else 0
)
await client.start()
self.sonora_clients[shard_id] = client
def get_sonora_client(self, guild_id):
# Route to appropriate shard
shard_id = (guild_id >> 22) % self.shard_count
return self.sonora_clients[shard_id]
bot = MusicBot()
@bot.command()
async def play(ctx, *, query):
client = bot.get_sonora_client(ctx.guild.id)
player = await client.get_player(ctx.guild.id)
if not ctx.author.voice:
return await ctx.send("Join a voice channel first!")
await ctx.author.voice.channel.connect()
track = await player.play(query)
if track:
await ctx.send(f"Now playing: {track.title}")
else:
await ctx.send("Failed to load track")
#
Advanced Autoplay System
#
Collaborative Filtering
class CollaborativeAutoplay:
def __init__(self, player):
self.player = player
self.user_preferences = {}
self.track_similarity = {}
async def learn_user_preferences(self, user_id, played_tracks):
# Build user profile
genres = Counter()
artists = Counter()
for track in played_tracks:
if hasattr(track, 'genre'):
genres[track.genre] += 1
if hasattr(track, 'author'):
artists[track.author] += 1
self.user_preferences[user_id] = {
'genres': dict(genres.most_common(5)),
'artists': dict(artists.most_common(5))
}
async def find_similar_users(self, user_id, threshold=0.7):
similar_users = []
for other_id, prefs in self.user_preferences.items():
if other_id == user_id:
continue
similarity = self.calculate_similarity(
self.user_preferences[user_id],
prefs
)
if similarity > threshold:
similar_users.append((other_id, similarity))
return sorted(similar_users, key=lambda x: x[1], reverse=True)
def calculate_similarity(self, prefs1, prefs2):
# Jaccard similarity for genres and artists
genres1 = set(prefs1.get('genres', {}).keys())
genres2 = set(prefs2.get('genres', {}).keys())
artists1 = set(prefs1.get('artists', {}).keys())
artists2 = set(prefs2.get('artists', {}).keys())
genre_sim = len(genres1 & genres2) / len(genres1 | genres2) if genres1 | genres2 else 0
artist_sim = len(artists1 & artists2) / len(artists1 | artists2) if artists1 | artists2 else 0
return (genre_sim + artist_sim) / 2
async def recommend_collaborative(self, user_id):
similar_users = await self.find_similar_users(user_id)
candidate_tracks = []
for similar_user, _ in similar_users[:5]: # Top 5 similar users
# Get tracks liked by similar users but not by current user
user_tracks = set(t.identifier for t in self.get_user_tracks(user_id))
similar_tracks = set(t.identifier for t in self.get_user_tracks(similar_user))
new_tracks = similar_tracks - user_tracks
candidate_tracks.extend(new_tracks)
# Score candidates by popularity among similar users
track_scores = Counter(candidate_tracks)
return [t for t, _ in track_scores.most_common(10)]
#
Real-time Analytics Dashboard
#
Web Dashboard with WebSocket
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
import json
app = FastAPI()
app.mount("/", StaticFiles(directory="dashboard", html=True), name="dashboard")
active_connections = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# Keep connection alive
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except:
active_connections.remove(websocket)
async def broadcast_stats():
while True:
stats = {
"players": len(client.players),
"total_tracks": sum(len(p.queue.upcoming) for p in client.players.values()),
"active_filters": sum(len(p.filters.active_filters) for p in client.players.values()),
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent
}
for connection in active_connections:
try:
await connection.send_text(json.dumps(stats))
except:
pass
await asyncio.sleep(5)
# Start broadcasting
asyncio.create_task(broadcast_stats())
#
HTML Dashboard
<!DOCTYPE html>
<html>
<head>
<title>Sonora Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<h1>Sonora Analytics Dashboard</h1>
<div class="stats">
<div id="players">Players: <span id="players-count">-</span></div>
<div id="tracks">Total Tracks: <span id="tracks-count">-</span></div>
<div id="cpu">CPU: <span id="cpu-percent">-</span>%</div>
<div id="memory">Memory: <span id="memory-percent">-</span>%</div>
</div>
<canvas id="performanceChart"></canvas>
<script>
const ws = new WebSocket('ws://localhost:8000/ws');
const ctx = document.getElementById('performanceChart').getContext('2d');
const chart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: 'CPU Usage',
data: [],
borderColor: 'rgb(75, 192, 192)',
}, {
label: 'Memory Usage',
data: [],
borderColor: 'rgb(255, 99, 132)',
}]
}
});
ws.onmessage = function(event) {
const stats = JSON.parse(event.data);
document.getElementById('players-count').textContent = stats.players;
document.getElementById('tracks-count').textContent = stats.total_tracks;
document.getElementById('cpu-percent').textContent = stats.cpu_usage.toFixed(1);
document.getElementById('memory-percent').textContent = stats.memory_usage.toFixed(1);
// Update chart
const now = new Date().toLocaleTimeString();
chart.data.labels.push(now);
chart.data.datasets[0].data.push(stats.cpu_usage);
chart.data.datasets[1].data.push(stats.memory_usage);
if (chart.data.labels.length > 20) {
chart.data.labels.shift();
chart.data.datasets[0].data.shift();
chart.data.datasets[1].data.shift();
}
chart.update();
};
// Keep connection alive
setInterval(() => {
ws.send('ping');
}, 30000);
</script>
</body>
</html>
#
Custom Plugin Ecosystem
#
Plugin Marketplace
class PluginMarketplace:
def __init__(self):
self.plugins = {}
self.reviews = {}
async def publish_plugin(self, plugin_info, code):
plugin_id = str(uuid.uuid4())
self.plugins[plugin_id] = {
'info': plugin_info,
'code': code,
'downloads': 0,
'rating': 0,
'reviews': []
}
return plugin_id
async def download_plugin(self, plugin_id):
if plugin_id not in self.plugins:
raise ValueError("Plugin not found")
self.plugins[plugin_id]['downloads'] += 1
return self.plugins[plugin_id]['code']
async def review_plugin(self, plugin_id, user_id, rating, comment):
if plugin_id not in self.plugins:
raise ValueError("Plugin not found")
review = {
'user_id': user_id,
'rating': rating,
'comment': comment,
'timestamp': time.time()
}
self.plugins[plugin_id]['reviews'].append(review)
# Update average rating
reviews = self.plugins[plugin_id]['reviews']
avg_rating = sum(r['rating'] for r in reviews) / len(reviews)
self.plugins[plugin_id]['rating'] = avg_rating
def search_plugins(self, query, category=None):
results = []
for plugin_id, plugin in self.plugins.items():
if query.lower() in plugin['info']['name'].lower() or \
query.lower() in plugin['info']['description'].lower():
if category and plugin['info'].get('category') != category:
continue
results.append({
'id': plugin_id,
'info': plugin['info'],
'downloads': plugin['downloads'],
'rating': plugin['rating']
})
return sorted(results, key=lambda x: x['downloads'], reverse=True)
#
Plugin Auto-Updater
class PluginAutoUpdater:
def __init__(self, marketplace_url):
self.marketplace_url = marketplace_url
self.installed_plugins = {}
async def check_updates(self):
updates = {}
for plugin_name, current_version in self.installed_plugins.items():
try:
latest_info = await self.fetch_plugin_info(plugin_name)
if self.version_compare(latest_info['version'], current_version) > 0:
updates[plugin_name] = latest_info
except:
continue
return updates
async def update_plugin(self, plugin_name):
updates = await self.check_updates()
if plugin_name not in updates:
return False
# Download new version
new_code = await self.download_plugin(updates[plugin_name]['id'])
# Backup current version
self.backup_plugin(plugin_name)
# Install new version
await self.install_plugin_code(plugin_name, new_code)
# Update version
self.installed_plugins[plugin_name] = updates[plugin_name]['version']
return True
async def auto_update_all(self):
updates = await self.check_updates()
for plugin_name in updates:
try:
await self.update_plugin(plugin_name)
logger.info(f"Updated plugin: {plugin_name}")
except Exception as e:
logger.error(f"Failed to update {plugin_name}: {e}")
#
Machine Learning Integration
#
Track Feature Extraction
class TrackFeatureExtractor:
def __init__(self):
self.feature_cache = {}
async def extract_features(self, track):
if track.identifier in self.feature_cache:
return self.feature_cache[track.identifier]
# Extract audio features
features = await self.analyze_audio(track.uri)
# Cache features
self.feature_cache[track.identifier] = features
return features
async def analyze_audio(self, uri):
# Use external service or local analysis
# This is a placeholder for actual ML feature extraction
features = {
'tempo': self.detect_tempo(uri),
'key': self.detect_key(uri),
'danceability': self.calculate_danceability(uri),
'energy': self.calculate_energy(uri),
'valence': self.calculate_valence(uri),
'genres': self.predict_genres(uri)
}
return features
def detect_tempo(self, uri):
# BPM detection algorithm
return 128.0 # Placeholder
def detect_key(self, uri):
# Key detection algorithm
return "C Major" # Placeholder
def calculate_danceability(self, uri):
# Danceability calculation
return 0.75 # Placeholder
def calculate_energy(self, uri):
# Energy calculation
return 0.82 # Placeholder
def calculate_valence(self, uri):
# Valence (positivity) calculation
return 0.65 # Placeholder
def predict_genres(self, uri):
# Genre prediction
return ["electronic", "dance"] # Placeholder
#
ML-Based Recommendations
class MLRecommender:
def __init__(self, feature_extractor):
self.feature_extractor = feature_extractor
self.user_profiles = {}
self.track_features = {}
async def build_user_profile(self, user_id, played_tracks):
features_list = []
for track in played_tracks:
features = await self.feature_extractor.extract_features(track)
features_list.append(features)
self.track_features[track.identifier] = features
if features_list:
# Average features for user profile
profile = {}
for key in features_list[0].keys():
if isinstance(features_list[0][key], (int, float)):
profile[key] = sum(f[key] for f in features_list) / len(features_list)
elif isinstance(features_list[0][key], list):
# Combine genre lists
all_genres = [g for f in features_list for g in f[key]]
profile[key] = list(set(all_genres))
self.user_profiles[user_id] = profile
async def recommend_tracks(self, user_id, available_tracks, n=10):
if user_id not in self.user_profiles:
return available_tracks[:n] # Fallback
user_profile = self.user_profiles[user_id]
scored_tracks = []
for track in available_tracks:
if track.identifier in self.track_features:
track_features = self.track_features[track.identifier]
similarity = self.calculate_similarity(user_profile, track_features)
scored_tracks.append((track, similarity))
# Sort by similarity
scored_tracks.sort(key=lambda x: x[1], reverse=True)
return [track for track, _ in scored_tracks[:n]]
def calculate_similarity(self, profile1, profile2):
# Cosine similarity for numerical features
numerical_keys = ['tempo', 'danceability', 'energy', 'valence']
vec1 = [profile1.get(k, 0) for k in numerical_keys]
vec2 = [profile2.get(k, 0) for k in numerical_keys]
dot_product = sum(a * b for a, b in zip(vec1, vec2))
magnitude1 = sum(a ** 2 for a in vec1) ** 0.5
magnitude2 = sum(b ** 2 for b in vec2) ** 0.5
if magnitude1 * magnitude2 == 0:
return 0
cosine_sim = dot_product / (magnitude1 * magnitude2)
# Genre similarity
genres1 = set(profile1.get('genres', []))
genres2 = set(profile2.get('genres', []))
if not genres1 or not genres2:
genre_sim = 0
else:
genre_sim = len(genres1 & genres2) / len(genres1 | genres2)
return (cosine_sim + genre_sim) / 2
These advanced examples demonstrate complex, real-world implementations using Sonora's extensive feature set.