# Advanced Examples

Complex usage patterns and real-world implementations.

# Multi-Guild Bot Architecture

# Sharded Bot Setup

import discord
from discord.ext import commands
from sonora import SonoraClient
import asyncio

class MusicBot(commands.Bot):
    def __init__(self):
        intents = discord.Intents.default()
        intents.message_content = True

        super().__init__(
            command_prefix='!',
            intents=intents,
            shard_count=3  # Multi-shard for large bots
        )

        # Separate Sonora clients per shard
        self.sonora_clients = {}

    async def setup_hook(self):
        # Create client per shard
        for shard_id in range(self.shard_count):
            client = SonoraClient(
                lavalink_nodes=[
                    {
                        "host": f"lavalink-{shard_id}.example.com",
                        "port": 2333,
                        "password": "password"
                    }
                ],
                user_id=self.user.id if self.user else 0
            )
            await client.start()
            self.sonora_clients[shard_id] = client

    def get_sonora_client(self, guild_id):
        # Route to appropriate shard
        shard_id = (guild_id >> 22) % self.shard_count
        return self.sonora_clients[shard_id]

bot = MusicBot()

@bot.command()
async def play(ctx, *, query):
    client = bot.get_sonora_client(ctx.guild.id)
    player = await client.get_player(ctx.guild.id)

    if not ctx.author.voice:
        return await ctx.send("Join a voice channel first!")

    await ctx.author.voice.channel.connect()
    track = await player.play(query)

    if track:
        await ctx.send(f"Now playing: {track.title}")
    else:
        await ctx.send("Failed to load track")

# Advanced Autoplay System

# Collaborative Filtering

class CollaborativeAutoplay:
    def __init__(self, player):
        self.player = player
        self.user_preferences = {}
        self.track_similarity = {}

    async def learn_user_preferences(self, user_id, played_tracks):
        # Build user profile
        genres = Counter()
        artists = Counter()

        for track in played_tracks:
            if hasattr(track, 'genre'):
                genres[track.genre] += 1
            if hasattr(track, 'author'):
                artists[track.author] += 1

        self.user_preferences[user_id] = {
            'genres': dict(genres.most_common(5)),
            'artists': dict(artists.most_common(5))
        }

    async def find_similar_users(self, user_id, threshold=0.7):
        similar_users = []

        for other_id, prefs in self.user_preferences.items():
            if other_id == user_id:
                continue

            similarity = self.calculate_similarity(
                self.user_preferences[user_id],
                prefs
            )

            if similarity > threshold:
                similar_users.append((other_id, similarity))

        return sorted(similar_users, key=lambda x: x[1], reverse=True)

    def calculate_similarity(self, prefs1, prefs2):
        # Jaccard similarity for genres and artists
        genres1 = set(prefs1.get('genres', {}).keys())
        genres2 = set(prefs2.get('genres', {}).keys())

        artists1 = set(prefs1.get('artists', {}).keys())
        artists2 = set(prefs2.get('artists', {}).keys())

        genre_sim = len(genres1 & genres2) / len(genres1 | genres2) if genres1 | genres2 else 0
        artist_sim = len(artists1 & artists2) / len(artists1 | artists2) if artists1 | artists2 else 0

        return (genre_sim + artist_sim) / 2

    async def recommend_collaborative(self, user_id):
        similar_users = await self.find_similar_users(user_id)

        candidate_tracks = []
        for similar_user, _ in similar_users[:5]:  # Top 5 similar users
            # Get tracks liked by similar users but not by current user
            user_tracks = set(t.identifier for t in self.get_user_tracks(user_id))
            similar_tracks = set(t.identifier for t in self.get_user_tracks(similar_user))

            new_tracks = similar_tracks - user_tracks
            candidate_tracks.extend(new_tracks)

        # Score candidates by popularity among similar users
        track_scores = Counter(candidate_tracks)
        return [t for t, _ in track_scores.most_common(10)]

# Real-time Analytics Dashboard

# Web Dashboard with WebSocket

from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
import json

app = FastAPI()
app.mount("/", StaticFiles(directory="dashboard", html=True), name="dashboard")

active_connections = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)

    try:
        while True:
            # Keep connection alive
            data = await websocket.receive_text()
            if data == "ping":
                await websocket.send_text("pong")
    except:
        active_connections.remove(websocket)

async def broadcast_stats():
    while True:
        stats = {
            "players": len(client.players),
            "total_tracks": sum(len(p.queue.upcoming) for p in client.players.values()),
            "active_filters": sum(len(p.filters.active_filters) for p in client.players.values()),
            "cpu_usage": psutil.cpu_percent(),
            "memory_usage": psutil.virtual_memory().percent
        }

        for connection in active_connections:
            try:
                await connection.send_text(json.dumps(stats))
            except:
                pass

        await asyncio.sleep(5)

# Start broadcasting
asyncio.create_task(broadcast_stats())

# HTML Dashboard

<!DOCTYPE html>
<html>
<head>
    <title>Sonora Dashboard</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
    <h1>Sonora Analytics Dashboard</h1>

    <div class="stats">
        <div id="players">Players: <span id="players-count">-</span></div>
        <div id="tracks">Total Tracks: <span id="tracks-count">-</span></div>
        <div id="cpu">CPU: <span id="cpu-percent">-</span>%</div>
        <div id="memory">Memory: <span id="memory-percent">-</span>%</div>
    </div>

    <canvas id="performanceChart"></canvas>

    <script>
        const ws = new WebSocket('ws://localhost:8000/ws');
        const ctx = document.getElementById('performanceChart').getContext('2d');

        const chart = new Chart(ctx, {
            type: 'line',
            data: {
                labels: [],
                datasets: [{
                    label: 'CPU Usage',
                    data: [],
                    borderColor: 'rgb(75, 192, 192)',
                }, {
                    label: 'Memory Usage',
                    data: [],
                    borderColor: 'rgb(255, 99, 132)',
                }]
            }
        });

        ws.onmessage = function(event) {
            const stats = JSON.parse(event.data);

            document.getElementById('players-count').textContent = stats.players;
            document.getElementById('tracks-count').textContent = stats.total_tracks;
            document.getElementById('cpu-percent').textContent = stats.cpu_usage.toFixed(1);
            document.getElementById('memory-percent').textContent = stats.memory_usage.toFixed(1);

            // Update chart
            const now = new Date().toLocaleTimeString();
            chart.data.labels.push(now);
            chart.data.datasets[0].data.push(stats.cpu_usage);
            chart.data.datasets[1].data.push(stats.memory_usage);

            if (chart.data.labels.length > 20) {
                chart.data.labels.shift();
                chart.data.datasets[0].data.shift();
                chart.data.datasets[1].data.shift();
            }

            chart.update();
        };

        // Keep connection alive
        setInterval(() => {
            ws.send('ping');
        }, 30000);
    </script>
</body>
</html>

# Custom Plugin Ecosystem

# Plugin Marketplace

class PluginMarketplace:
    def __init__(self):
        self.plugins = {}
        self.reviews = {}

    async def publish_plugin(self, plugin_info, code):
        plugin_id = str(uuid.uuid4())

        self.plugins[plugin_id] = {
            'info': plugin_info,
            'code': code,
            'downloads': 0,
            'rating': 0,
            'reviews': []
        }

        return plugin_id

    async def download_plugin(self, plugin_id):
        if plugin_id not in self.plugins:
            raise ValueError("Plugin not found")

        self.plugins[plugin_id]['downloads'] += 1
        return self.plugins[plugin_id]['code']

    async def review_plugin(self, plugin_id, user_id, rating, comment):
        if plugin_id not in self.plugins:
            raise ValueError("Plugin not found")

        review = {
            'user_id': user_id,
            'rating': rating,
            'comment': comment,
            'timestamp': time.time()
        }

        self.plugins[plugin_id]['reviews'].append(review)

        # Update average rating
        reviews = self.plugins[plugin_id]['reviews']
        avg_rating = sum(r['rating'] for r in reviews) / len(reviews)
        self.plugins[plugin_id]['rating'] = avg_rating

    def search_plugins(self, query, category=None):
        results = []

        for plugin_id, plugin in self.plugins.items():
            if query.lower() in plugin['info']['name'].lower() or \
               query.lower() in plugin['info']['description'].lower():

                if category and plugin['info'].get('category') != category:
                    continue

                results.append({
                    'id': plugin_id,
                    'info': plugin['info'],
                    'downloads': plugin['downloads'],
                    'rating': plugin['rating']
                })

        return sorted(results, key=lambda x: x['downloads'], reverse=True)

# Plugin Auto-Updater

class PluginAutoUpdater:
    def __init__(self, marketplace_url):
        self.marketplace_url = marketplace_url
        self.installed_plugins = {}

    async def check_updates(self):
        updates = {}

        for plugin_name, current_version in self.installed_plugins.items():
            try:
                latest_info = await self.fetch_plugin_info(plugin_name)
                if self.version_compare(latest_info['version'], current_version) > 0:
                    updates[plugin_name] = latest_info
            except:
                continue

        return updates

    async def update_plugin(self, plugin_name):
        updates = await self.check_updates()

        if plugin_name not in updates:
            return False

        # Download new version
        new_code = await self.download_plugin(updates[plugin_name]['id'])

        # Backup current version
        self.backup_plugin(plugin_name)

        # Install new version
        await self.install_plugin_code(plugin_name, new_code)

        # Update version
        self.installed_plugins[plugin_name] = updates[plugin_name]['version']

        return True

    async def auto_update_all(self):
        updates = await self.check_updates()

        for plugin_name in updates:
            try:
                await self.update_plugin(plugin_name)
                logger.info(f"Updated plugin: {plugin_name}")
            except Exception as e:
                logger.error(f"Failed to update {plugin_name}: {e}")

# Machine Learning Integration

# Track Feature Extraction

class TrackFeatureExtractor:
    def __init__(self):
        self.feature_cache = {}

    async def extract_features(self, track):
        if track.identifier in self.feature_cache:
            return self.feature_cache[track.identifier]

        # Extract audio features
        features = await self.analyze_audio(track.uri)

        # Cache features
        self.feature_cache[track.identifier] = features

        return features

    async def analyze_audio(self, uri):
        # Use external service or local analysis
        # This is a placeholder for actual ML feature extraction

        features = {
            'tempo': self.detect_tempo(uri),
            'key': self.detect_key(uri),
            'danceability': self.calculate_danceability(uri),
            'energy': self.calculate_energy(uri),
            'valence': self.calculate_valence(uri),
            'genres': self.predict_genres(uri)
        }

        return features

    def detect_tempo(self, uri):
        # BPM detection algorithm
        return 128.0  # Placeholder

    def detect_key(self, uri):
        # Key detection algorithm
        return "C Major"  # Placeholder

    def calculate_danceability(self, uri):
        # Danceability calculation
        return 0.75  # Placeholder

    def calculate_energy(self, uri):
        # Energy calculation
        return 0.82  # Placeholder

    def calculate_valence(self, uri):
        # Valence (positivity) calculation
        return 0.65  # Placeholder

    def predict_genres(self, uri):
        # Genre prediction
        return ["electronic", "dance"]  # Placeholder

# ML-Based Recommendations

class MLRecommender:
    def __init__(self, feature_extractor):
        self.feature_extractor = feature_extractor
        self.user_profiles = {}
        self.track_features = {}

    async def build_user_profile(self, user_id, played_tracks):
        features_list = []

        for track in played_tracks:
            features = await self.feature_extractor.extract_features(track)
            features_list.append(features)
            self.track_features[track.identifier] = features

        if features_list:
            # Average features for user profile
            profile = {}
            for key in features_list[0].keys():
                if isinstance(features_list[0][key], (int, float)):
                    profile[key] = sum(f[key] for f in features_list) / len(features_list)
                elif isinstance(features_list[0][key], list):
                    # Combine genre lists
                    all_genres = [g for f in features_list for g in f[key]]
                    profile[key] = list(set(all_genres))

            self.user_profiles[user_id] = profile

    async def recommend_tracks(self, user_id, available_tracks, n=10):
        if user_id not in self.user_profiles:
            return available_tracks[:n]  # Fallback

        user_profile = self.user_profiles[user_id]
        scored_tracks = []

        for track in available_tracks:
            if track.identifier in self.track_features:
                track_features = self.track_features[track.identifier]
                similarity = self.calculate_similarity(user_profile, track_features)
                scored_tracks.append((track, similarity))

        # Sort by similarity
        scored_tracks.sort(key=lambda x: x[1], reverse=True)

        return [track for track, _ in scored_tracks[:n]]

    def calculate_similarity(self, profile1, profile2):
        # Cosine similarity for numerical features
        numerical_keys = ['tempo', 'danceability', 'energy', 'valence']

        vec1 = [profile1.get(k, 0) for k in numerical_keys]
        vec2 = [profile2.get(k, 0) for k in numerical_keys]

        dot_product = sum(a * b for a, b in zip(vec1, vec2))
        magnitude1 = sum(a ** 2 for a in vec1) ** 0.5
        magnitude2 = sum(b ** 2 for b in vec2) ** 0.5

        if magnitude1 * magnitude2 == 0:
            return 0

        cosine_sim = dot_product / (magnitude1 * magnitude2)

        # Genre similarity
        genres1 = set(profile1.get('genres', []))
        genres2 = set(profile2.get('genres', []))

        if not genres1 or not genres2:
            genre_sim = 0
        else:
            genre_sim = len(genres1 & genres2) / len(genres1 | genres2)

        return (cosine_sim + genre_sim) / 2

These advanced examples demonstrate complex, real-world implementations using Sonora's extensive feature set.