# 💾 Session Management

Sonora v1.2.7 provides comprehensive session management capabilities, allowing you to save and restore complete bot states across restarts and deployments.

# Overview

Session management includes:

  • Complete state snapshots (player, queue, filters, autoplay)
  • Automatic background snapshots with configurable intervals
  • Crash recovery across application restarts
  • Deployment-safe persistence for zero-downtime updates

# Basic Snapshot Operations

# Creating Snapshots

from sonora import snapshot_manager

# Create snapshot of current player state
player = await client.get_player(guild_id)
snapshot = snapshot_manager.create_snapshot(player)

# Save to file
filepath = snapshot_manager.save_snapshot(snapshot, "guild_123_backup.json")
print(f"Snapshot saved: {filepath}")

# Restoring Sessions

# Load snapshot from file
snapshot = snapshot_manager.load_snapshot("guild_123_backup.json")

# Restore to player
player = await client.get_player(guild_id)
await snapshot_manager.restore_snapshot(snapshot, player)
print("Session restored successfully")

# Automatic Snapshots

# Background Snapshot Service

# Start automatic snapshots (every 5 minutes by default)
await snapshot_manager.start_auto_snapshot()

# Configure snapshot interval
snapshot_manager.auto_snapshot_interval = 300  # 5 minutes
snapshot_manager.max_snapshots_per_guild = 10  # Keep 10 snapshots per guild

# Stop automatic snapshots
await snapshot_manager.stop_auto_snapshot()

# Custom Snapshot Scheduling

import asyncio

class SnapshotScheduler:
    def __init__(self, client, interval_minutes=5):
        self.client = client
        self.interval = interval_minutes * 60
        self.running = False

    async def start(self):
        """Start scheduled snapshots"""
        self.running = True
        while self.running:
            try:
                await self.create_all_snapshots()
            except Exception as e:
                print(f"Snapshot scheduling error: {e}")

            await asyncio.sleep(self.interval)

    async def create_all_snapshots(self):
        """Create snapshots for all active players"""
        for guild_id, player in self.client.players.items():
            try:
                snapshot = snapshot_manager.create_snapshot(player)
                snapshot_manager.save_snapshot(snapshot)
            except Exception as e:
                print(f"Failed to snapshot guild {guild_id}: {e}")

    def stop(self):
        """Stop scheduled snapshots"""
        self.running = False

# Usage
scheduler = SnapshotScheduler(client, interval_minutes=10)
await scheduler.start()

# Snapshot Contents

# Player State

{
  "player": {
    "volume": 75,
    "paused": false,
    "position": 45000,
    "connected": true,
    "session_id": "session_123"
  }
}

# Queue State

{
  "queue": {
    "current": {
      "title": "Never Gonna Give You Up",
      "author": "Rick Astley",
      "uri": "https://youtube.com/watch?v=dQw4w9WgXcQ",
      "length": 213000,
      "position": 45000
    },
    "upcoming": [
      {
        "title": "Track 2",
        "author": "Artist 2",
        "length": 180000
      }
    ],
    "history": [
      {
        "title": "Previous Track",
        "author": "Previous Artist",
        "length": 240000
      }
    ],
    "loop_mode": "none",
    "shuffle_enabled": false
  }
}

# Filter State

{
  "filters": {
    "active_filters": ["bassboost", "nightcore"],
    "filter_config": {
      "equalizer": [
        {"band": 0, "gain": 0.5},
        {"band": 14, "gain": -0.2}
      ],
      "timescale": {
        "speed": 1.2,
        "pitch": 1.1
      }
    }
  }
}

# Autoplay State

{
  "autoplay": {
    "enabled": true,
    "strategy": "similar_artist",
    "fallback_playlist": "global_fallback",
    "max_history": 50,
    "smart_shuffle": true
  }
}

# Advanced Features

# Selective Restoration

class SelectiveRestorer:
    def __init__(self, snapshot):
        self.snapshot = snapshot

    async def restore_player_only(self, player):
        """Restore only player state"""
        player_data = self.snapshot.data.get("player", {})
        player.volume = player_data.get("volume", 100)
        # Skip queue, filters, autoplay

    async def restore_queue_only(self, player):
        """Restore only queue state"""
        queue_data = self.snapshot.data.get("queue", {})

        # Clear current queue
        player.queue._upcoming.clear()
        player.queue._history.clear()

        # Restore upcoming tracks
        for track_data in queue_data.get("upcoming", []):
            # Convert track_data back to Track objects
            track = self._deserialize_track(track_data)
            await player.queue.add(track)

        player.queue.loop_mode = queue_data.get("loop_mode", "none")

    def _deserialize_track(self, track_data):
        """Convert track data back to Track object"""
        from sonora import Track
        return Track(
            track=f"restored_{track_data['identifier']}",
            info=track_data
        )

# Snapshot Validation

class SnapshotValidator:
    @staticmethod
    def validate_snapshot(snapshot):
        """Validate snapshot integrity"""
        required_fields = ["player", "queue", "filters", "autoplay"]

        for field in required_fields:
            if field not in snapshot.data:
                raise ValueError(f"Missing required field: {field}")

        # Validate player data
        player_data = snapshot.data["player"]
        if not isinstance(player_data.get("volume"), int):
            raise ValueError("Invalid volume in player data")

        # Validate queue data
        queue_data = snapshot.data["queue"]
        if not isinstance(queue_data.get("upcoming", []), list):
            raise ValueError("Invalid upcoming tracks in queue data")

        return True

# Usage
try:
    snapshot = snapshot_manager.load_snapshot("backup.json")
    SnapshotValidator.validate_snapshot(snapshot)
    print("Snapshot is valid")
except ValueError as e:
    print(f"Invalid snapshot: {e}")

# Encrypted Snapshots

from sonora import credential_manager

class EncryptedSnapshotManager:
    def __init__(self, snapshot_manager, credential_manager):
        self.snapshot_manager = snapshot_manager
        self.credential_manager = credential_manager

    def save_encrypted_snapshot(self, snapshot, filename, key_name):
        """Save snapshot encrypted"""
        # Convert to JSON
        json_data = snapshot.to_json()

        # Encrypt with credential vault
        encrypted_data = self.credential_manager.encrypt_credential(json_data)

        # Save encrypted data
        with open(filename, 'w') as f:
            f.write(encrypted_data)

    def load_encrypted_snapshot(self, filename, key_name):
        """Load and decrypt snapshot"""
        # Read encrypted data
        with open(filename, 'r') as f:
            encrypted_data = f.read()

        # Decrypt with credential vault
        json_data = self.credential_manager.decrypt_credential(encrypted_data)

        # Parse back to snapshot
        return self.snapshot_manager.from_json(json_data)

# Usage
encrypted_manager = EncryptedSnapshotManager(snapshot_manager, credential_manager)

# Save encrypted
snapshot = snapshot_manager.create_snapshot(player)
encrypted_manager.save_encrypted_snapshot(snapshot, "secure_backup.json", "backup_key")

# Load encrypted
restored_snapshot = encrypted_manager.load_encrypted_snapshot("secure_backup.json", "backup_key")

# Deployment Strategies

# Blue-Green Deployment

class BlueGreenDeployment:
    def __init__(self, client):
        self.client = client
        self.snapshots = {}

    async def prepare_deployment(self):
        """Take snapshots before deployment"""
        for guild_id, player in self.client.players.items():
            snapshot = snapshot_manager.create_snapshot(player)
            self.snapshots[guild_id] = snapshot

        print(f"Prepared snapshots for {len(self.snapshots)} guilds")

    async def rollback_deployment(self):
        """Rollback to previous state if deployment fails"""
        for guild_id, snapshot in self.snapshots.items():
            try:
                player = await self.client.get_player(guild_id)
                await snapshot_manager.restore_snapshot(snapshot, player)
                print(f"Rolled back guild {guild_id}")
            except Exception as e:
                print(f"Failed to rollback guild {guild_id}: {e}")

# Usage
deployment = BlueGreenDeployment(client)

# Before deployment
await deployment.prepare_deployment()

# After deployment issues
await deployment.rollback_deployment()

# Rolling Deployment

class RollingDeployment:
    def __init__(self, client, batch_size=5):
        self.client = client
        self.batch_size = batch_size

    async def rolling_update(self, new_version_callback):
        """Perform rolling update with snapshots"""
        guild_ids = list(self.client.players.keys())
        snapshots = {}

        # Process in batches
        for i in range(0, len(guild_ids), self.batch_size):
            batch = guild_ids[i:i + self.batch_size]

            # Take snapshots for this batch
            for guild_id in batch:
                player = self.client.players[guild_id]
                snapshots[guild_id] = snapshot_manager.create_snapshot(player)

            # Update this batch
            for guild_id in batch:
                try:
                    await new_version_callback(guild_id)
                    print(f"Updated guild {guild_id}")
                except Exception as e:
                    # Rollback this guild
                    snapshot = snapshots[guild_id]
                    player = await self.client.get_player(guild_id)
                    await snapshot_manager.restore_snapshot(snapshot, player)
                    print(f"Rolled back guild {guild_id}: {e}")

            # Wait between batches
            await asyncio.sleep(10)

# Monitoring and Analytics

# Snapshot Analytics

class SnapshotAnalytics:
    def __init__(self, snapshot_manager):
        self.snapshot_manager = snapshot_manager

    def analyze_snapshot_usage(self):
        """Analyze snapshot creation and restoration patterns"""
        snapshots = self.snapshot_manager.list_snapshots()

        stats = {
            "total_snapshots": len(snapshots),
            "snapshots_by_guild": {},
            "average_snapshot_size": 0,
            "oldest_snapshot": None,
            "newest_snapshot": None
        }

        for snapshot_file in snapshots:
            # Parse guild ID from filename
            parts = snapshot_file.split('_')
            if len(parts) >= 2:
                guild_id = parts[1]
                if guild_id not in stats["snapshots_by_guild"]:
                    stats["snapshots_by_guild"][guild_id] = 0
                stats["snapshots_by_guild"][guild_id] += 1

        return stats

    def cleanup_old_snapshots(self, max_age_days=7):
        """Clean up snapshots older than specified days"""
        import os
        import time

        max_age_seconds = max_age_days * 24 * 60 * 60
        current_time = time.time()

        snapshots_dir = self.snapshot_manager.snapshot_dir
        cleaned_count = 0

        for filename in os.listdir(snapshots_dir):
            if filename.endswith('.json'):
                filepath = os.path.join(snapshots_dir, filename)
                file_age = current_time - os.path.getmtime(filepath)

                if file_age > max_age_seconds:
                    os.remove(filepath)
                    cleaned_count += 1

        return cleaned_count

# Usage
analytics = SnapshotAnalytics(snapshot_manager)
stats = analytics.analyze_snapshot_usage()
print(f"Total snapshots: {stats['total_snapshots']}")

cleaned = analytics.cleanup_old_snapshots(max_age_days=30)
print(f"Cleaned {cleaned} old snapshots")

# Best Practices

# 1. Snapshot Frequency

  • High-traffic guilds: Snapshot every 5-10 minutes
  • Low-traffic guilds: Snapshot every 30-60 minutes
  • Critical sessions: Snapshot before major operations

# 2. Storage Management

  • Retention policy: Keep 5-10 snapshots per guild
  • Cleanup automation: Remove snapshots older than 7-30 days
  • Compression: Consider compressing large snapshot files

# 3. Security Considerations

  • Encryption: Use encrypted snapshots for sensitive data
  • Access control: Restrict snapshot file access
  • Validation: Always validate snapshots before restoration

# 4. Performance Optimization

  • Background processing: Don't block main operations for snapshots
  • Incremental snapshots: Only save changed state
  • Compression: Use compression for large snapshots

# 5. Error Handling

async def safe_snapshot_restore(snapshot_file, player):
    """Safely restore snapshot with error handling"""
    try:
        # Validate file exists
        if not os.path.exists(snapshot_file):
            raise FileNotFoundError(f"Snapshot file not found: {snapshot_file}")

        # Load and validate snapshot
        snapshot = snapshot_manager.load_snapshot(snapshot_file)
        SnapshotValidator.validate_snapshot(snapshot)

        # Create backup of current state
        backup_snapshot = snapshot_manager.create_snapshot(player)
        backup_file = f"backup_{int(time.time())}.json"
        snapshot_manager.save_snapshot(backup_snapshot, backup_file)

        # Restore snapshot
        await snapshot_manager.restore_snapshot(snapshot, player)

        # Verify restoration
        if not await verify_restoration(player, snapshot):
            # Restore from backup if verification fails
            backup_snapshot = snapshot_manager.load_snapshot(backup_file)
            await snapshot_manager.restore_snapshot(backup_snapshot, player)
            raise RuntimeError("Snapshot restoration verification failed")

        # Cleanup backup
        os.remove(backup_file)

        return True

    except Exception as e:
        print(f"Snapshot restoration failed: {e}")
        return False

async def verify_restoration(player, original_snapshot):
    """Verify that restoration was successful"""
    # Check player state
    player_data = original_snapshot.data.get("player", {})
    if player.volume != player_data.get("volume", 100):
        return False

    # Check queue state
    queue_data = original_snapshot.data.get("queue", {})
    if len(player.queue.upcoming) != len(queue_data.get("upcoming", [])):
        return False

    return True

Session management in Sonora v1.2.7 provides enterprise-grade reliability and continuity for your music bot deployments.