#
Intelligent Queue Management
Advanced queue operations and smart features for optimal playback flow.
#
Queue Types
#
Standard Queue
Basic FIFO (First In, First Out) queue:
# Add tracks
await player.queue.add(track1)
await player.queue.add(track2)
# Play next
next_track = await player.queue.advance() # Returns track1
#
Smart Queue
Intelligent queue with advanced features:
# Enable smart features
player.queue.enable_adaptive_reorder = True
player.queue.smart_shuffle = True
#
Queue Operations
#
Adding Tracks
# Add to end
await player.queue.add(track)
# Add at position
await player.queue.add(track, position=1)
# Add multiple tracks
await player.queue.add_multiple([track1, track2, track3])
# Add with position
await player.queue.add_multiple([track1, track2], position=2)
#
Removing Tracks
# Remove by position
removed_track = await player.queue.pop(1)
# Remove first track
first_track = await player.queue.pop()
# Clear entire queue
await player.queue.clear()
#
Reordering
# Move track
player.queue.move(from_pos=0, to_pos=2)
# Shuffle
await player.queue.smart_shuffle()
# Adaptive reorder
await player.queue.perform_adaptive_reorder()
#
Smart Features
#
Adaptive Reorder
Automatically reorder queue based on metrics:
player.queue.enable_adaptive_reorder = True
# Triggers on track end, skip, etc.
await player.queue.perform_adaptive_reorder()
#
Smart Shuffle
Preserves recently played tracks:
await player.queue.smart_shuffle()
# Keeps last 10 tracks from being shuffled immediately
recent_titles = {t.title for t in player.queue.history[-10:]}
#
Skip Fatigue Detection
Prevents over-skipping of tracks:
# Tracks skipped more than threshold are avoided
player.queue.metrics.skip_fatigue_threshold = 3
if player.queue.metrics.is_skip_fatigued(track):
# Skip this track in recommendations
pass
#
Queue Modes
#
Normal Mode
Standard playback:
player.queue.loop_mode = "none"
#
Track Loop
Repeat current track:
player.queue.loop_mode = "track"
#
Queue Loop
Repeat entire queue:
player.queue.loop_mode = "queue"
#
Autoplay Mode
Automatic recommendations:
player.queue.loop_mode = "autoplay"
player.autoplay.enabled = True
#
Queue Views
#
Different Perspectives
# Upcoming tracks
upcoming = player.queue.upcoming
# Playback history
history = player.queue.history
# Current + upcoming + history
all_tracks = player.queue.get_view("all")
# Limited view
recent_history = player.queue.get_view("history", limit=5)
#
Metrics and Analytics
#
Popularity Tracking
# Get most played tracks
heatmap = player.queue.get_heatmap()
print(f"Top tracks: {list(heatmap.keys())}")
# Track play/skip counts
plays = player.queue.metrics.track_plays
skips = player.queue.metrics.track_skips
#
Session Memory
# Track similarity scores
player.queue.session_memory[track.identifier] = 0.85
# Use in recommendations
similar_tracks = [
t for t in player.queue.upcoming
if player.queue.session_memory.get(t.identifier, 0) > 0.7
]
#
Advanced Operations
#
Bulk Operations
# Import large playlist
tracks = await load_playlist("large_playlist.json")
await player.queue.add_multiple(tracks)
# Export queue
queue_data = {
"current": player.queue.current,
"upcoming": player.queue.upcoming,
"history": player.queue.history
}
#
Queue Cloning
# Create backup
backup_queue = player.queue.upcoming.copy()
# Restore
await player.queue.clear()
await player.queue.add_multiple(backup_queue)
#
Queue Merging
# Merge two queues
other_queue = await load_saved_queue("party_playlist")
await player.queue.add_multiple(other_queue.upcoming)
#
Performance Optimization
#
Memory Management
# Limit queue size
player.queue.max_size = 1000
# Automatic cleanup
if len(player.queue.upcoming) > player.queue.max_size:
# Remove oldest tracks
excess = len(player.queue.upcoming) - player.queue.max_size
for _ in range(excess):
player.queue._upcoming.popleft()
#
Batch Processing
# Process multiple operations atomically
async with player.queue._lock:
await player.queue.add(track1)
await player.queue.add(track2)
await player.queue.move(0, 1)
#
Error Handling
#
Robust Operations
async def safe_queue_operation(operation):
try:
await operation()
except asyncio.QueueFull:
# Handle queue overflow
await player.queue.pop(0) # Remove oldest
await operation()
except IndexError:
# Invalid position
logger.warning("Invalid queue position")
#
Integration Examples
#
Discord Commands
@bot.command()
async def queue(ctx):
"""Show current queue"""
embed = discord.Embed(title="Queue")
embed.add_field(name="Current", value=player.queue.current.title, inline=False)
upcoming = player.queue.upcoming[:10]
if upcoming:
queue_list = "\n".join(f"{i+1}. {t.title}" for i, t in enumerate(upcoming))
embed.add_field(name="Upcoming", value=queue_list, inline=False)
await ctx.send(embed=embed)
@bot.command()
async def shuffle(ctx):
"""Shuffle the queue"""
await player.queue.smart_shuffle()
await ctx.send("Queue shuffled! 🎵")
#
Web Dashboard
@app.get("/api/queue")
async def get_queue():
return {
"current": player.queue.current,
"upcoming": player.queue.upcoming,
"history": player.queue.history[-5:],
"length": player.queue.length,
"loop_mode": player.queue.loop_mode
}
@app.post("/api/queue/add")
async def add_to_queue(track_data: dict):
track = Track(**track_data)
await player.queue.add(track)
return {"status": "added"}
#
Best Practices
- Use Smart Features: Enable adaptive reorder and smart shuffle for better UX
- Monitor Performance: Watch memory usage with large queues
- Handle Concurrency: Use locks for complex operations
- Provide Feedback: Show queue changes to users
- Implement Limits: Prevent queue abuse with size limits
- Save State: Persist queue across restarts when possible
- Error Recovery: Handle queue corruption gracefully
- User Controls: Allow users to manage their queue
#
Troubleshooting
#
Common Issues
Queue Not Advancing
- Check if player is connected
- Verify tracks are valid
- Check for stuck operations
Memory Issues
- Implement size limits
- Clear old history
- Monitor queue growth
Concurrency Problems
- Use proper locking
- Avoid long operations in locks
- Implement timeouts