# Queue Management

Sonora v1.2.8 provides advanced queue management with intelligent features for optimal music playback.

# Table of Contents

  • Overview
  • Basic Operations
  • Advanced Features
  • Queue Views
  • Persistence
  • Performance
  • Examples

# Overview

The queue system manages track ordering and playback flow with features like:

  • Multiple queue types (standard, priority, history)
  • Smart reordering based on user preferences
  • Session persistence across restarts
  • Advanced filtering and search
  • Real-time updates and synchronization

# Queue Components

  • Current Track: Currently playing track
  • Upcoming Queue: Tracks waiting to play
  • History: Recently played tracks
  • Priority Queue: High-priority tracks
  • Shuffle State: Randomization settings

# Basic Operations

# Adding Tracks

# Add single track
await player.queue.add(track)

# Add at specific position
await player.queue.add(track, position=1)

# Add multiple tracks
tracks = [track1, track2, track3]
await player.queue.add_multiple(tracks)

# Add to priority queue
await player.queue.add_priority(track)

# Removing Tracks

# Remove by position
removed_track = await player.queue.pop(0)

# Remove current track
await player.queue.remove_current()

# Clear entire queue
await player.queue.clear()

# Remove range
await player.queue.remove_range(start=5, end=10)

# Reordering

# Move track to new position
await player.queue.move(from_pos=0, to_pos=5)

# Swap two tracks
await player.queue.swap(pos1=0, pos2=1)

# Reverse queue order
await player.queue.reverse()

# Advanced Features

# Smart Shuffle

Intelligent shuffling that preserves context:

# Enable smart shuffle
player.queue.smart_shuffle = True

# Shuffle with context preservation
await player.queue.shuffle()

# Smart shuffle considers:
# - Recently played tracks
# - Artist clustering
# - Genre grouping
# - User preferences

# Adaptive Reorder

Automatically reorder queue based on metrics:

# Enable adaptive reordering
player.queue.enable_adaptive_reorder = True

# Trigger reordering
await player.queue.perform_adaptive_reorder()

# Reordering factors:
# - Play frequency
# - Skip rate
# - User ratings
# - Time-based preferences

# Queue Modes

# Normal playback
player.queue.loop_mode = "none"

# Loop current track
player.queue.loop_mode = "track"

# Loop entire queue
player.queue.loop_mode = "queue"

# Autoplay mode
player.queue.loop_mode = "autoplay"
player.autoplay.enabled = True

# Priority System

# Add high-priority tracks
await player.queue.add_priority(urgent_track)

# Priority tracks play before regular queue
# Useful for announcements, requests, etc.

# Check priority queue length
priority_count = player.queue.priority_length

# Queue Views

# Different Perspectives

# Get upcoming tracks
upcoming = player.queue.get_view("upcoming", limit=10)

# Get recent history
history = player.queue.get_view("history", limit=5)

# Get all tracks
all_tracks = player.queue.get_view("all")

# Get tracks by artist
artist_tracks = player.queue.get_view("artist", artist="Artist Name")

# Get tracks by genre
genre_tracks = player.queue.get_view("genre", genre="Rock")

# Filtered Views

# Filter by duration
short_tracks = player.queue.filter(lambda t: t.length < 300000)  # < 5 minutes

# Filter by source
youtube_tracks = player.queue.filter(lambda t: "youtube" in t.uri)

# Filter by custom criteria
popular_tracks = player.queue.filter(lambda t: t.play_count > 100)

# Search Functionality

# Search by title
results = player.queue.search("song title", field="title")

# Search by artist
results = player.queue.search("artist name", field="author")

# Search across all fields
results = player.queue.search("query", field="all")

# Fuzzy search
results = player.queue.search("song", fuzzy=True)

# Persistence

# Session Snapshots

# Save current session
snapshot = await player.queue.save_snapshot()
await snapshot.save_to_file("my_session.json")

# Load saved session
loaded_snapshot = await Snapshot.load_from_file("my_session.json")
await player.queue.load_snapshot(loaded_snapshot)

# Auto-Save

# Enable automatic snapshots
player.queue.auto_save = True
player.queue.auto_save_interval = 300  # Every 5 minutes

# Configure snapshot location
player.queue.snapshot_path = "./snapshots/"

# Backup and Restore

# Create backup
backup_data = await player.queue.create_backup()

# Restore from backup
await player.queue.restore_from_backup(backup_data)

# Export to different formats
await player.queue.export_to_json("queue_backup.json")
await player.queue.export_to_csv("queue_tracks.csv")

# Performance

# Memory Optimization

# Limit queue size to prevent memory issues
player.queue.max_size = 1000

# Automatic cleanup of old history
player.queue.history_max_age = 3600  # 1 hour

# Compress stored data
player.queue.enable_compression = True

# Batch Operations

# Add multiple tracks efficiently
await player.queue.add_batch(tracks, batch_size=50)

# Bulk operations
await player.queue.bulk_update(updates)
await player.queue.bulk_remove(indices)

# Caching

# Enable queue caching
player.queue.enable_cache = True
player.queue.cache_size = 1000

# Cache frequently accessed data
player.queue.cache_metadata = True
player.queue.cache_search_results = True

# Examples

# Discord Bot Integration

import discord
from discord.ext import commands
from sonora import SonoraClient

bot = commands.Bot(command_prefix='!')
sonora = SonoraClient(lavalink_nodes=[{"host": "localhost", "port": 2333, "password": "pass"}])

@bot.command()
async def queue(ctx):
    """Display current queue"""
    player = await sonora.get_player(ctx.guild.id)

    embed = discord.Embed(title="๐ŸŽต Music Queue", color=0x00ff00)

    # Current track
    if player.current:
        embed.add_field(
            name="Now Playing",
            value=f"**{player.current.title}**\nby {player.current.author}",
            inline=False
        )

    # Upcoming tracks
    upcoming = list(player.queue.upcoming)[:10]
    if upcoming:
        queue_text = ""
        for i, track in enumerate(upcoming, 1):
            duration = track.length // 1000
            queue_text += f"`{i}.` {track.title} - {duration//60}:{duration%60:02d}\n"

        embed.add_field(name="Up Next", value=queue_text, inline=False)

    # Queue stats
    embed.set_footer(text=f"Total tracks: {len(player.queue.upcoming)} | Loop: {player.queue.loop_mode}")

    await ctx.send(embed=embed)

@bot.command()
async def shuffle(ctx):
    """Shuffle the queue"""
    player = await sonora.get_player(ctx.guild.id)
    await player.queue.shuffle()
    await ctx.send("๐Ÿ”€ Queue shuffled!")

@bot.command()
async def move(ctx, from_pos: int, to_pos: int):
    """Move a track in the queue"""
    player = await sonora.get_player(ctx.guild.id)

    try:
        await player.queue.move(from_pos - 1, to_pos - 1)  # Convert to 0-based
        await ctx.send(f"โœ… Moved track from position {from_pos} to {to_pos}")
    except IndexError:
        await ctx.send("โŒ Invalid position")

@bot.command()
async def remove(ctx, position: int):
    """Remove a track from the queue"""
    player = await sonora.get_player(ctx.guild.id)

    try:
        removed = await player.queue.pop(position - 1)
        await ctx.send(f"๐Ÿ—‘๏ธ Removed: **{removed.title}**")
    except IndexError:
        await ctx.send("โŒ Invalid position")

@bot.command()
async def clear(ctx):
    """Clear the entire queue"""
    player = await sonora.get_player(ctx.guild.id)
    track_count = len(player.queue.upcoming)
    await player.queue.clear()
    await ctx.send(f"๐Ÿงน Cleared {track_count} tracks from queue")

@bot.command()
async def loop(ctx, mode: str = None):
    """Control loop mode"""
    player = await sonora.get_player(ctx.guild.id)

    if mode is None:
        current = player.queue.loop_mode
        await ctx.send(f"๐Ÿ” Current loop mode: {current}")
        return

    valid_modes = ["none", "track", "queue"]
    if mode not in valid_modes:
        await ctx.send(f"โŒ Invalid mode. Use: {', '.join(valid_modes)}")
        return

    player.queue.loop_mode = mode
    await ctx.send(f"๐Ÿ” Loop mode set to: {mode}")

# Advanced Queue Management

class AdvancedQueueManager:
    def __init__(self, player):
        self.player = player
        self.backup_interval = 300  # 5 minutes

    async def start_auto_backup(self):
        """Automatically backup queue periodically"""
        while True:
            await asyncio.sleep(self.backup_interval)
            try:
                snapshot = await self.player.queue.save_snapshot()
                await snapshot.save_to_file(f"backup_{int(time.time())}.json")

                # Clean old backups (keep last 10)
                await self.cleanup_old_backups()
            except Exception as e:
                print(f"Backup failed: {e}")

    async def cleanup_old_backups(self):
        """Remove old backup files"""
        backup_dir = Path("./backups")
        if not backup_dir.exists():
            return

        backups = sorted(backup_dir.glob("backup_*.json"))
        if len(backups) > 10:
            for old_backup in backups[:-10]:
                old_backup.unlink()

    async def smart_enqueue(self, tracks, priority=False):
        """Add tracks with smart positioning"""
        if priority:
            # Add to priority queue
            for track in tracks:
                await self.player.queue.add_priority(track)
        else:
            # Analyze current queue and insert optimally
            current_genres = [t.genre for t in self.player.queue.upcoming[:5] if hasattr(t, 'genre')]

            for track in tracks:
                if hasattr(track, 'genre') and track.genre in current_genres:
                    # Insert after similar tracks
                    position = self.find_similar_position(track)
                    await self.player.queue.add(track, position)
                else:
                    # Add to end
                    await self.player.queue.add(track)

    def find_similar_position(self, track):
        """Find optimal position for similar track"""
        for i, queue_track in enumerate(self.player.queue.upcoming):
            if (hasattr(queue_track, 'genre') and hasattr(track, 'genre') and
                queue_track.genre == track.genre):
                return i + 1  # Insert after similar track

        return len(self.player.queue.upcoming)  # Add to end

    async def create_playlist_from_queue(self, name):
        """Save current queue as playlist"""
        tracks = [self.player.current] + list(self.player.queue.upcoming)
        playlist = {
            'name': name,
            'created': time.time(),
            'tracks': [self.track_to_dict(t) for t in tracks if t]
        }

        filename = f"playlist_{name.lower().replace(' ', '_')}.json"
        async with aiofiles.open(filename, 'w') as f:
            await f.write(json.dumps(playlist, indent=2))

        return filename

    def track_to_dict(self, track):
        """Convert track to dictionary"""
        return {
            'title': track.title,
            'author': track.author,
            'uri': track.uri,
            'identifier': track.identifier,
            'length': track.length,
            'genre': getattr(track, 'genre', None)
        }

# Usage
queue_manager = AdvancedQueueManager(player)

# Start auto-backup
asyncio.create_task(queue_manager.start_auto_backup())

# Smart enqueue
await queue_manager.smart_enqueue(new_tracks)

# Save playlist
filename = await queue_manager.create_playlist_from_queue("My Favorites")
print(f"Playlist saved to: {filename}")

# Queue Analytics

class QueueAnalytics:
    def __init__(self, player):
        self.player = player
        self.stats = {
            'total_tracks_added': 0,
            'total_tracks_played': 0,
            'average_track_length': 0,
            'popular_genres': {},
            'popular_artists': {},
            'skip_rate': 0.0,
            'queue_completion_rate': 0.0
        }

    async def track_added(self, track):
        """Track when tracks are added"""
        self.stats['total_tracks_added'] += 1

        if hasattr(track, 'genre'):
            self.stats['popular_genres'][track.genre] = \
                self.stats['popular_genres'].get(track.genre, 0) + 1

        if hasattr(track, 'author'):
            self.stats['popular_artists'][track.author] = \
                self.stats['popular_artists'].get(track.author, 0) + 1

    async def track_played(self, track):
        """Track when tracks are played"""
        self.stats['total_tracks_played'] += 1

        # Update average length
        total_length = self.stats['average_track_length'] * (self.stats['total_tracks_played'] - 1)
        total_length += track.length
        self.stats['average_track_length'] = total_length / self.stats['total_tracks_played']

    async def track_skipped(self, track):
        """Track when tracks are skipped"""
        # Calculate skip rate
        total_actions = self.stats['total_tracks_played'] + self.stats.get('total_skips', 0)
        if total_actions > 0:
            self.stats['skip_rate'] = self.stats.get('total_skips', 0) / total_actions

    def get_popular_genres(self, limit=5):
        """Get most popular genres"""
        return sorted(
            self.stats['popular_genres'].items(),
            key=lambda x: x[1],
            reverse=True
        )[:limit]

    def get_popular_artists(self, limit=5):
        """Get most popular artists"""
        return sorted(
            self.stats['popular_artists'].items(),
            key=lambda x: x[1],
            reverse=True
        )[:limit]

    def get_queue_health(self):
        """Get overall queue health metrics"""
        return {
            'total_tracks': self.stats['total_tracks_added'],
            'played_tracks': self.stats['total_tracks_played'],
            'average_length': self.stats['average_track_length'] / 1000,  # Convert to seconds
            'skip_rate': self.stats['skip_rate'],
            'completion_rate': self.stats.get('queue_completion_rate', 0.0)
        }

# Integration
analytics = QueueAnalytics(player)

# Hook into events
from sonora import event_manager, EventType

@event_manager.on(EventType.TRACK_START)
async def on_track_start(event):
    await analytics.track_played(event.data['track'])

@event_manager.on(EventType.TRACK_END)
async def on_track_end(event):
    if event.data.get('reason') == 'skipped':
        await analytics.track_skipped(event.data['track'])

# Custom add function that tracks additions
original_add = player.queue.add
async def tracked_add(track, position=None):
    await analytics.track_added(track)
    return await original_add(track, position)

player.queue.add = tracked_add

This comprehensive queue management system provides powerful features for organizing and optimizing music playback in Sonora v1.2.8.