#
Queue Management
Sonora v1.2.8 provides advanced queue management with intelligent features for optimal music playback.
#
Table of Contents
Overview Basic Operations Advanced Features Queue Views Persistence Performance Examples
#
Overview
The queue system manages track ordering and playback flow with features like:
- Multiple queue types (standard, priority, history)
- Smart reordering based on user preferences
- Session persistence across restarts
- Advanced filtering and search
- Real-time updates and synchronization
#
Queue Components
- Current Track: Currently playing track
- Upcoming Queue: Tracks waiting to play
- History: Recently played tracks
- Priority Queue: High-priority tracks
- Shuffle State: Randomization settings
#
Basic Operations
#
Adding Tracks
# Add single track
await player.queue.add(track)
# Add at specific position
await player.queue.add(track, position=1)
# Add multiple tracks
tracks = [track1, track2, track3]
await player.queue.add_multiple(tracks)
# Add to priority queue
await player.queue.add_priority(track)
#
Removing Tracks
# Remove by position
removed_track = await player.queue.pop(0)
# Remove current track
await player.queue.remove_current()
# Clear entire queue
await player.queue.clear()
# Remove range
await player.queue.remove_range(start=5, end=10)
#
Reordering
# Move track to new position
await player.queue.move(from_pos=0, to_pos=5)
# Swap two tracks
await player.queue.swap(pos1=0, pos2=1)
# Reverse queue order
await player.queue.reverse()
#
Advanced Features
#
Smart Shuffle
Intelligent shuffling that preserves context:
# Enable smart shuffle
player.queue.smart_shuffle = True
# Shuffle with context preservation
await player.queue.shuffle()
# Smart shuffle considers:
# - Recently played tracks
# - Artist clustering
# - Genre grouping
# - User preferences
#
Adaptive Reorder
Automatically reorder queue based on metrics:
# Enable adaptive reordering
player.queue.enable_adaptive_reorder = True
# Trigger reordering
await player.queue.perform_adaptive_reorder()
# Reordering factors:
# - Play frequency
# - Skip rate
# - User ratings
# - Time-based preferences
#
Queue Modes
# Normal playback
player.queue.loop_mode = "none"
# Loop current track
player.queue.loop_mode = "track"
# Loop entire queue
player.queue.loop_mode = "queue"
# Autoplay mode
player.queue.loop_mode = "autoplay"
player.autoplay.enabled = True
#
Priority System
# Add high-priority tracks
await player.queue.add_priority(urgent_track)
# Priority tracks play before regular queue
# Useful for announcements, requests, etc.
# Check priority queue length
priority_count = player.queue.priority_length
#
Queue Views
#
Different Perspectives
# Get upcoming tracks
upcoming = player.queue.get_view("upcoming", limit=10)
# Get recent history
history = player.queue.get_view("history", limit=5)
# Get all tracks
all_tracks = player.queue.get_view("all")
# Get tracks by artist
artist_tracks = player.queue.get_view("artist", artist="Artist Name")
# Get tracks by genre
genre_tracks = player.queue.get_view("genre", genre="Rock")
#
Filtered Views
# Filter by duration
short_tracks = player.queue.filter(lambda t: t.length < 300000) # < 5 minutes
# Filter by source
youtube_tracks = player.queue.filter(lambda t: "youtube" in t.uri)
# Filter by custom criteria
popular_tracks = player.queue.filter(lambda t: t.play_count > 100)
#
Search Functionality
# Search by title
results = player.queue.search("song title", field="title")
# Search by artist
results = player.queue.search("artist name", field="author")
# Search across all fields
results = player.queue.search("query", field="all")
# Fuzzy search
results = player.queue.search("song", fuzzy=True)
#
Persistence
#
Session Snapshots
# Save current session
snapshot = await player.queue.save_snapshot()
await snapshot.save_to_file("my_session.json")
# Load saved session
loaded_snapshot = await Snapshot.load_from_file("my_session.json")
await player.queue.load_snapshot(loaded_snapshot)
#
Auto-Save
# Enable automatic snapshots
player.queue.auto_save = True
player.queue.auto_save_interval = 300 # Every 5 minutes
# Configure snapshot location
player.queue.snapshot_path = "./snapshots/"
#
Backup and Restore
# Create backup
backup_data = await player.queue.create_backup()
# Restore from backup
await player.queue.restore_from_backup(backup_data)
# Export to different formats
await player.queue.export_to_json("queue_backup.json")
await player.queue.export_to_csv("queue_tracks.csv")
#
Performance
#
Memory Optimization
# Limit queue size to prevent memory issues
player.queue.max_size = 1000
# Automatic cleanup of old history
player.queue.history_max_age = 3600 # 1 hour
# Compress stored data
player.queue.enable_compression = True
#
Batch Operations
# Add multiple tracks efficiently
await player.queue.add_batch(tracks, batch_size=50)
# Bulk operations
await player.queue.bulk_update(updates)
await player.queue.bulk_remove(indices)
#
Caching
# Enable queue caching
player.queue.enable_cache = True
player.queue.cache_size = 1000
# Cache frequently accessed data
player.queue.cache_metadata = True
player.queue.cache_search_results = True
#
Examples
#
Discord Bot Integration
import discord
from discord.ext import commands
from sonora import SonoraClient
bot = commands.Bot(command_prefix='!')
sonora = SonoraClient(lavalink_nodes=[{"host": "localhost", "port": 2333, "password": "pass"}])
@bot.command()
async def queue(ctx):
"""Display current queue"""
player = await sonora.get_player(ctx.guild.id)
embed = discord.Embed(title="๐ต Music Queue", color=0x00ff00)
# Current track
if player.current:
embed.add_field(
name="Now Playing",
value=f"**{player.current.title}**\nby {player.current.author}",
inline=False
)
# Upcoming tracks
upcoming = list(player.queue.upcoming)[:10]
if upcoming:
queue_text = ""
for i, track in enumerate(upcoming, 1):
duration = track.length // 1000
queue_text += f"`{i}.` {track.title} - {duration//60}:{duration%60:02d}\n"
embed.add_field(name="Up Next", value=queue_text, inline=False)
# Queue stats
embed.set_footer(text=f"Total tracks: {len(player.queue.upcoming)} | Loop: {player.queue.loop_mode}")
await ctx.send(embed=embed)
@bot.command()
async def shuffle(ctx):
"""Shuffle the queue"""
player = await sonora.get_player(ctx.guild.id)
await player.queue.shuffle()
await ctx.send("๐ Queue shuffled!")
@bot.command()
async def move(ctx, from_pos: int, to_pos: int):
"""Move a track in the queue"""
player = await sonora.get_player(ctx.guild.id)
try:
await player.queue.move(from_pos - 1, to_pos - 1) # Convert to 0-based
await ctx.send(f"โ
Moved track from position {from_pos} to {to_pos}")
except IndexError:
await ctx.send("โ Invalid position")
@bot.command()
async def remove(ctx, position: int):
"""Remove a track from the queue"""
player = await sonora.get_player(ctx.guild.id)
try:
removed = await player.queue.pop(position - 1)
await ctx.send(f"๐๏ธ Removed: **{removed.title}**")
except IndexError:
await ctx.send("โ Invalid position")
@bot.command()
async def clear(ctx):
"""Clear the entire queue"""
player = await sonora.get_player(ctx.guild.id)
track_count = len(player.queue.upcoming)
await player.queue.clear()
await ctx.send(f"๐งน Cleared {track_count} tracks from queue")
@bot.command()
async def loop(ctx, mode: str = None):
"""Control loop mode"""
player = await sonora.get_player(ctx.guild.id)
if mode is None:
current = player.queue.loop_mode
await ctx.send(f"๐ Current loop mode: {current}")
return
valid_modes = ["none", "track", "queue"]
if mode not in valid_modes:
await ctx.send(f"โ Invalid mode. Use: {', '.join(valid_modes)}")
return
player.queue.loop_mode = mode
await ctx.send(f"๐ Loop mode set to: {mode}")
#
Advanced Queue Management
class AdvancedQueueManager:
def __init__(self, player):
self.player = player
self.backup_interval = 300 # 5 minutes
async def start_auto_backup(self):
"""Automatically backup queue periodically"""
while True:
await asyncio.sleep(self.backup_interval)
try:
snapshot = await self.player.queue.save_snapshot()
await snapshot.save_to_file(f"backup_{int(time.time())}.json")
# Clean old backups (keep last 10)
await self.cleanup_old_backups()
except Exception as e:
print(f"Backup failed: {e}")
async def cleanup_old_backups(self):
"""Remove old backup files"""
backup_dir = Path("./backups")
if not backup_dir.exists():
return
backups = sorted(backup_dir.glob("backup_*.json"))
if len(backups) > 10:
for old_backup in backups[:-10]:
old_backup.unlink()
async def smart_enqueue(self, tracks, priority=False):
"""Add tracks with smart positioning"""
if priority:
# Add to priority queue
for track in tracks:
await self.player.queue.add_priority(track)
else:
# Analyze current queue and insert optimally
current_genres = [t.genre for t in self.player.queue.upcoming[:5] if hasattr(t, 'genre')]
for track in tracks:
if hasattr(track, 'genre') and track.genre in current_genres:
# Insert after similar tracks
position = self.find_similar_position(track)
await self.player.queue.add(track, position)
else:
# Add to end
await self.player.queue.add(track)
def find_similar_position(self, track):
"""Find optimal position for similar track"""
for i, queue_track in enumerate(self.player.queue.upcoming):
if (hasattr(queue_track, 'genre') and hasattr(track, 'genre') and
queue_track.genre == track.genre):
return i + 1 # Insert after similar track
return len(self.player.queue.upcoming) # Add to end
async def create_playlist_from_queue(self, name):
"""Save current queue as playlist"""
tracks = [self.player.current] + list(self.player.queue.upcoming)
playlist = {
'name': name,
'created': time.time(),
'tracks': [self.track_to_dict(t) for t in tracks if t]
}
filename = f"playlist_{name.lower().replace(' ', '_')}.json"
async with aiofiles.open(filename, 'w') as f:
await f.write(json.dumps(playlist, indent=2))
return filename
def track_to_dict(self, track):
"""Convert track to dictionary"""
return {
'title': track.title,
'author': track.author,
'uri': track.uri,
'identifier': track.identifier,
'length': track.length,
'genre': getattr(track, 'genre', None)
}
# Usage
queue_manager = AdvancedQueueManager(player)
# Start auto-backup
asyncio.create_task(queue_manager.start_auto_backup())
# Smart enqueue
await queue_manager.smart_enqueue(new_tracks)
# Save playlist
filename = await queue_manager.create_playlist_from_queue("My Favorites")
print(f"Playlist saved to: {filename}")
#
Queue Analytics
class QueueAnalytics:
def __init__(self, player):
self.player = player
self.stats = {
'total_tracks_added': 0,
'total_tracks_played': 0,
'average_track_length': 0,
'popular_genres': {},
'popular_artists': {},
'skip_rate': 0.0,
'queue_completion_rate': 0.0
}
async def track_added(self, track):
"""Track when tracks are added"""
self.stats['total_tracks_added'] += 1
if hasattr(track, 'genre'):
self.stats['popular_genres'][track.genre] = \
self.stats['popular_genres'].get(track.genre, 0) + 1
if hasattr(track, 'author'):
self.stats['popular_artists'][track.author] = \
self.stats['popular_artists'].get(track.author, 0) + 1
async def track_played(self, track):
"""Track when tracks are played"""
self.stats['total_tracks_played'] += 1
# Update average length
total_length = self.stats['average_track_length'] * (self.stats['total_tracks_played'] - 1)
total_length += track.length
self.stats['average_track_length'] = total_length / self.stats['total_tracks_played']
async def track_skipped(self, track):
"""Track when tracks are skipped"""
# Calculate skip rate
total_actions = self.stats['total_tracks_played'] + self.stats.get('total_skips', 0)
if total_actions > 0:
self.stats['skip_rate'] = self.stats.get('total_skips', 0) / total_actions
def get_popular_genres(self, limit=5):
"""Get most popular genres"""
return sorted(
self.stats['popular_genres'].items(),
key=lambda x: x[1],
reverse=True
)[:limit]
def get_popular_artists(self, limit=5):
"""Get most popular artists"""
return sorted(
self.stats['popular_artists'].items(),
key=lambda x: x[1],
reverse=True
)[:limit]
def get_queue_health(self):
"""Get overall queue health metrics"""
return {
'total_tracks': self.stats['total_tracks_added'],
'played_tracks': self.stats['total_tracks_played'],
'average_length': self.stats['average_track_length'] / 1000, # Convert to seconds
'skip_rate': self.stats['skip_rate'],
'completion_rate': self.stats.get('queue_completion_rate', 0.0)
}
# Integration
analytics = QueueAnalytics(player)
# Hook into events
from sonora import event_manager, EventType
@event_manager.on(EventType.TRACK_START)
async def on_track_start(event):
await analytics.track_played(event.data['track'])
@event_manager.on(EventType.TRACK_END)
async def on_track_end(event):
if event.data.get('reason') == 'skipped':
await analytics.track_skipped(event.data['track'])
# Custom add function that tracks additions
original_add = player.queue.add
async def tracked_add(track, position=None):
await analytics.track_added(track)
return await original_add(track, position)
player.queue.add = tracked_add
This comprehensive queue management system provides powerful features for organizing and optimizing music playback in Sonora v1.2.8.