import os
import logging
from flask import Flask, render_template, jsonify, request, Response
from flask_cors import CORS
from siriusxm import SiriusXM
import configparser
import json
# Configure logging
logging.basicConfig(level=logging.DEBUG)
# Create Flask app
app = Flask(__name__)
app.secret_key = os.environ.get("SESSION_SECRET", "your-secret-key-here")
# Enable CORS for all routes
CORS(app)
# Global SiriusXM instance
sxm = None
def initialize_siriusxm():
"""Initialize SiriusXM instance with credentials from config or environment"""
global sxm
# Try to get credentials from environment variables first
username = os.environ.get("SIRIUSXM_USERNAME")
password = os.environ.get("SIRIUSXM_PASSWORD")
# If not in environment, try config file
if not username or not password:
try:
config = configparser.ConfigParser()
if config.read('config.ini'): # Check if file was actually read
if 'siriusxm' in config:
username = config['siriusxm']['username']
password = config['siriusxm']['password']
else:
app.logger.error("Config file exists but missing [siriusxm] section")
return False
else:
app.logger.error("No config.ini file found")
return False
except Exception as e:
app.logger.error(f"Failed to load config: {e}")
return False
if username and password:
sxm = SiriusXM(username, password)
return True
else:
app.logger.error("No SiriusXM credentials found")
return False
@app.route('/')
def index():
"""Serve the main web interface"""
return render_template('index.html')
@app.route('/api/channels')
def get_channels():
"""Get list of all available channels"""
global sxm
if not sxm:
if not initialize_siriusxm():
return jsonify({'error': 'SiriusXM not initialized'}), 500
try:
channels = sxm.get_channels()
if channels:
return jsonify({'channels': channels})
else:
return jsonify({'error': 'Failed to fetch channels'}), 500
except Exception as e:
app.logger.error(f"Error fetching channels: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/metadata/<channel_id>')
def get_metadata(channel_id):
"""Get current metadata for a specific channel"""
global sxm
if not sxm:
if not initialize_siriusxm():
return jsonify({'error': 'SiriusXM not initialized'}), 500
try:
metadata = sxm.get_metadata(channel_id)
if metadata:
return jsonify(metadata)
else:
# Return empty metadata instead of error to prevent UI issues
return jsonify({
'song': {
'name': 'Loading...',
'artistName': '',
'images': {}
},
'channel': {
'name': 'Unknown Channel',
'genre': ''
}
})
except Exception as e:
app.logger.error(f"Error fetching metadata for channel {channel_id}: {e}")
# Return empty metadata instead of error
return jsonify({
'song': {
'name': 'Loading...',
'artistName': '',
'images': {}
},
'channel': {
'name': 'Unknown Channel',
'genre': ''
}
})
@app.route('/channels.m3u8')
def get_playlist():
"""Generate M3U8 playlist for all channels"""
global sxm
if not sxm:
if not initialize_siriusxm():
return Response("Error: SiriusXM not initialized", status=500)
try:
playlist = sxm.get_playlist()
if playlist:
response = Response(playlist, mimetype='application/x-mpegURL')
response.headers['Access-Control-Allow-Origin'] = '*'
return response
else:
return Response("Error: Failed to generate playlist", status=500)
except Exception as e:
app.logger.error(f"Error generating playlist: {e}")
return Response(f"Error: {str(e)}", status=500)
@app.route('/listen/<channel_id>')
def get_channel_stream(channel_id):
"""Get M3U8 stream for a specific channel"""
global sxm
if not sxm:
if not initialize_siriusxm():
return Response("Error: SiriusXM not initialized", status=500)
try:
stream_data = sxm.get_channel(channel_id)
if stream_data:
response = Response(stream_data, mimetype='application/x-mpegURL')
response.headers['Access-Control-Allow-Origin'] = '*'
return response
else:
return Response("Error: Failed to get channel stream", status=500)
except Exception as e:
app.logger.error(f"Error getting stream for channel {channel_id}: {e}")
return Response(f"Error: {str(e)}", status=500)
@app.route('/listen/<channel_id>/<path:segment_path>')
def get_segment_for_listen(channel_id, segment_path):
"""Get audio segment for a channel via /listen route"""
global sxm
if not sxm:
if not initialize_siriusxm():
return Response("Error: SiriusXM not initialized", status=500)
try:
# Extract just the filename from the full segment path
segment_filename = segment_path.split('/')[-1]
app.logger.debug(f"Listen route - Requesting segment: {segment_path} -> {segment_filename} for channel: {channel_id}")
segment_data = sxm.get_segment(channel_id, segment_filename)
if segment_data:
response = Response(segment_data, mimetype='audio/aac')
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Cache-Control'] = 'no-cache'
return response
else:
return Response("Error: Failed to get segment", status=404)
except Exception as e:
app.logger.error(f"Error getting segment {segment_path} for channel {channel_id}: {e}")
return Response(f"Error: {str(e)}", status=500)
@app.route('/<channel_id>/<path:segment_path>')
def get_segment(channel_id, segment_path):
"""Get audio segment for a channel (legacy route)"""
global sxm
if not sxm:
if not initialize_siriusxm():
return Response("Error: SiriusXM not initialized", status=500)
try:
# Extract just the filename from the full segment path
segment_filename = segment_path.split('/')[-1]
app.logger.debug(f"Legacy route - Requesting segment: {segment_path} -> {segment_filename} for channel: {channel_id}")
segment_data = sxm.get_segment(channel_id, segment_filename)
if segment_data:
response = Response(segment_data, mimetype='audio/aac')
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Cache-Control'] = 'no-cache'
return response
else:
return Response("Error: Failed to get segment", status=404)
except Exception as e:
app.logger.error(f"Error getting segment {segment_path} for channel {channel_id}: {e}")
return Response(f"Error: {str(e)}", status=500)
@app.route('/key/<key_id>')
def get_aes_key(key_id):
"""Get AES decryption key"""
global sxm
if not sxm:
if not initialize_siriusxm():
return Response("Error: SiriusXM not initialized", status=500)
try:
key_data = sxm.getAESkey(key_id)
if key_data:
response = Response(key_data, mimetype='application/octet-stream')
response.headers['Access-Control-Allow-Origin'] = '*'
return response
else:
return Response("Error: Failed to get AES key", status=500)
except Exception as e:
app.logger.error(f"Error getting AES key {key_id}: {e}")
return Response(f"Error: {str(e)}", status=500)
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Not found'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
# Initialize SiriusXM on startup
if initialize_siriusxm():
app.logger.info("SiriusXM initialized successfully")
else:
app.logger.warning("Failed to initialize SiriusXM - will retry on first request")
# Run the Flask app
app.run(host='0.0.0.0', port=5000, debug=True)