import streamlit as st
import time
import os
import json
import re
import threading
import numpy as np
import sounddevice as sd
from scipy.io.wavfile import write
import soundfile as sf
import pyaudio
import aubio
import requests
import psycopg2
import pytz
from datetime import datetime
from audio_analyzer import AudioAnalyzer
# Page config
st.set_page_config(
page_title="Shazam Sound",
page_icon="π΅",
layout="wide",
initial_sidebar_state="expanded"
)
# --------------------------
# THEME SYSTEM (IMPROVED)
# --------------------------
# Initialize selected theme
if "theme" not in st.session_state:
st.session_state.theme = "light" # DEFAULT LIGHT THEME
# Initialize recording state
if 'is_recording' not in st.session_state:
st.session_state.is_recording = False
if "continuous" not in st.session_state:
st.session_state.continuous = False
if "status_message" not in st.session_state:
st.session_state.status_message = ""
# -----------------------------------------------------------------------------------------
def apply_theme():
theme = st.session_state.theme
if theme == "dark":
bg = "#000000"
text = "#ffffff"
input_bg = "#1a1a1a"
input_text = "#ffffff"
sidebar_bg = "#1a1a1a"
sidebar_text = "#ffffff"
info_bg = "#1a3a52" # Darker blue for dark mode
info_text = "#ffffff"
else:
bg = "#ffffff"
text = "#000000"
input_bg = "#f1f1f1"
input_text = "#000000"
sidebar_bg = "#f0f2f6"
sidebar_text = "#000000"
info_bg = "#e8f4fd" # Light blue for light mode
info_text = "#000000"
purple = "#4b0082"
purple_hover = "#5c0aa6"
st.markdown(f"""
<style>
/* App background */
.stApp {{
background: {bg} !important;
color: {text} !important;
}}
/* ALL general text */
h1, h2, h3, h4, h5, h6,
p, span, label, div, input, textarea {{
color: {text} !important;
}}
/* Sidebar styling */
section[data-testid="stSidebar"] {{
background-color: {sidebar_bg} !important;
}}
section[data-testid="stSidebar"] *,
section[data-testid="stSidebar"] p,
section[data-testid="stSidebar"] span,
section[data-testid="stSidebar"] h1,
section[data-testid="stSidebar"] h2,
section[data-testid="stSidebar"] h3 {{
color: {sidebar_text} !important;
}}
/* Fix streamlit inputs */
input[type="text"],
input[type="password"],
textarea {{
background-color: {input_bg} !important;
color: {input_text} !important;
border-radius: 6px !important;
}}
/* Purple buttons - FIXED TEXT COLOR */
div.stButton > button {{
background-color: {purple} !important;
color: white !important;
border-radius: 8px !important;
border: none !important;
font-weight: 500 !important;
}}
/* --- ADD THIS NEW RULE to force the inner text white --- */
div.stButton > button * {{
color: white !important;
}}
/* -------------------------------------------------------- */
div.stButton > button:hover {{
background-color: {purple_hover} !important;
}}
/* Disabled button styling */
div.stButton > button:disabled {{
background-color: #cccccc !important;
color: #666666 !important;
cursor: not-allowed !important;
opacity: 0.6 !important;
}}
/* === FIX INFO/WARNING/ERROR BOXES === */
/* Info - themed based on light/dark mode */
div[data-testid="stAlert"][style*="rgb(232, 244, 253)"],
div[data-testid="stAlert"]:has( svg[data-testid="stInfoIcon"] ) {{
background-color: {info_bg} !important;
color: {info_text} !important;
border-radius: 8px !important;
}}
div[data-testid="stAlert"]:has( svg[data-testid="stInfoIcon"] ) * {{
color: {info_text} !important;
}}
/* Warning */
div[data-testid="stAlert"]:has( svg[data-testid="stWarningIcon"] ) {{
background-color: #fff4c2 !important;
color: black !important;
border-radius: 8px !important;
}}
div[data-testid="stAlert"]:has( svg[data-testid="stWarningIcon"] ) * {{
color: black !important;
}}
/* Error */
div[data-testid="stAlert"]:has( svg[data-testid="stErrorIcon"] ) {{
background-color: #ffd4d4 !important;
color: black !important;
border-radius: 8px !important;
}}
div[data-testid="stAlert"]:has( svg[data-testid="stErrorIcon"] ) * {{
color: black !important;
}}
/* Success */
div[data-testid="stAlert"]:has( svg[data-testid="stSuccessIcon"] ) {{
background-color: #d9f7d6 !important;
color: black !important;
border-radius: 8px !important;
}}
div[data-testid="stAlert"]:has( svg[data-testid="stSuccessIcon"] ) * {{
color: black !important;
}}
/* Tabs text color */
.stTabs [data-baseweb="tab"] {{
color: {text} !important;
}}
</style>
""", unsafe_allow_html=True)
apply_theme()
# --------------------------
# END THEME SYSTEM
# --------------------------
# --- ADD THE STATUS PLACEHOLDER HERE ---
if 'continuous_status_placeholder' not in st.session_state:
st.session_state.continuous_status_placeholder = st.empty()
# Show loading indicator at the top during initialization
if 'initialized' not in st.session_state:
with st.spinner('π΅ Initializing Music Recognition Service...'):
time.sleep(0.5)
st.session_state.initialized = True
# Initialize session state
if 'authenticated' not in st.session_state:
st.session_state.authenticated = False
if 'current_user' not in st.session_state:
st.session_state.current_user = None
if 'recording' not in st.session_state:
st.session_state.recording = False
if 'count' not in st.session_state:
st.session_state.count = 0
# Load configuration
@st.cache_resource
def load_config():
with st.spinner('Loading configuration...'):
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json')
with open(config_path, 'r') as f:
return json.load(f)
try:
settings = load_config()
except Exception as e:
st.error(f"Failed to load config.json: {str(e)}")
st.stop()
# Utility Functions
def validate_email(email):
"""Validate email using a basic regex pattern."""
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(pattern, email) is not None
def get_db_connection():
"""Get database connection."""
return psycopg2.connect(
host=settings["postgres"]["host"],
database=settings["postgres"]["database"],
user=settings["postgres"]["user"],
password=settings["postgres"]["password"],
port=settings["postgres"]["port"],
)
def check_user_exists(conn, email):
"""Check if a user with the given email exists in the database."""
with conn.cursor() as cursor:
cursor.execute("SELECT id FROM users WHERE email = %s", (email,))
return cursor.fetchone() is not None
def login_user(conn, email, password):
"""Attempt to login a user with the provided credentials."""
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, name, last_sequence_number FROM users WHERE email = %s AND password = crypt(%s, password)",
(email, password)
)
return cursor.fetchone()
def register_user(conn, name, email, password):
"""Register a new user with the provided details."""
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO users (name, email, password) VALUES (%s, %s, crypt(%s, gen_salt('bf'))) RETURNING id",
(name, email, password)
)
user_id = cursor.fetchone()[0]
conn.commit()
return user_id
def extract_audio_features(file_path, output_dir=None, sr=16000, model_type='htdemucs'):
"""Extract audio features using AudioAnalyzer."""
try:
if output_dir is None:
output_dir = "results"
os.makedirs(output_dir, exist_ok=True)
analyzer = AudioAnalyzer(model_type=model_type, file_path=file_path, output_dir=output_dir, sr=sr)
return analyzer.analyze_all()
except Exception as e:
st.error(f"Error during audio analysis: {e}")
return None
def trim_audio(input_file, output_file, duration=12):
"""Trim audio file to specified duration."""
try:
data, samplerate = sf.read(input_file)
trim_samples = int(duration * samplerate)
trimmed_data = data[:trim_samples] if len(data.shape) == 1 else data[:trim_samples, :]
sf.write(output_file, trimmed_data, samplerate)
return True
except Exception as e:
st.error(f"Audio trimming error: {e}")
return False
def convert_numpy_to_python(value):
"""Convert numpy types to Python native types."""
if isinstance(value, (np.integer, np.floating, np.bool_)):
return value.item()
elif isinstance(value, (list, tuple)):
return [convert_numpy_to_python(item) for item in value]
elif isinstance(value, dict):
return {k: convert_numpy_to_python(v) for k, v in value.items()}
return value
def write_to_database(song_data, count, audio_features, current_user, session_date, session_time):
"""Write song data to database."""
if not settings["postgres"]["enabled"]:
return False
try:
conn = get_db_connection()
cursor = conn.cursor()
user_id = current_user[0]
title = song_data[0]
# Format release date
release_date = song_data[4]
if release_date and len(release_date) >= 10:
release_date = release_date[:10]
else:
release_date = None
# Convert numpy types
low_level_features = convert_numpy_to_python(audio_features.get('low_level_features', {}))
audio_features = convert_numpy_to_python(audio_features)
# Prepare list fields
mfcc_str = ",".join(map(str, low_level_features.get('mfcc', [])))
segment_boundaries_str = ",".join(map(str, low_level_features.get('segment_boundaries', [])))
structural_novelty_str = ",".join(map(str, low_level_features.get('structural_novelty', [])))
hook_repetition_offsets_str = ",".join(
map(str, audio_features.get('harmonic', {}).get('hook_repetition_offsets', [])))
# Insert query
insert_query = """
INSERT INTO sound (
date, time, title, album, artist, label, release_date, duration,
bpm, beat_strength, rhythm_stability, onset_rate, pulse_clarity, danceability,
rms_energy, dynamic_range, loudness, spectral_centroid, spectral_flatness,
spectral_rolloff, spectral_flux, spectral_spread, spectral_contrast,
harmonic_ratio, inharmonicity, pitch_mean, key, mode, brightness, warmth,
roughness, zero_crossing_rate, crest_factor, transient_density, dissonance,
mfcc, segment_boundaries, structural_novelty, vocal_presence_percentage,
vocal_energy_ratio, min_pitch_hz, max_pitch_hz, avg_pitch_hz, vocal_range_semitones,
pitch_variability, bass_prominence_percentage, sub_bass_presence_percentage,
bass_timbre_type, note_density, pattern_variation, drum_pattern_analysis,
percussion_diversity, harmonic_rhythm, hook_repetition_offsets, frequency_balance_score,
stereo_correlation, stereo_width, dynamic_range_db
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s
) RETURNING id
"""
data_to_insert = (
session_date, session_time, song_data[0], song_data[1], song_data[2], song_data[3], release_date,
song_data[5],
low_level_features.get('bpm'), low_level_features.get('beat_strength'),
low_level_features.get('rhythm_stability'),
low_level_features.get('onset_rate'), low_level_features.get('pulse_clarity'),
low_level_features.get('danceability'),
low_level_features.get('rms_energy'), low_level_features.get('dynamic_range'),
low_level_features.get('loudness'),
low_level_features.get('spectral_centroid'), low_level_features.get('spectral_flatness'),
low_level_features.get('spectral_rolloff'), low_level_features.get('spectral_flux'),
low_level_features.get('spectral_spread'),
low_level_features.get('spectral_contrast'), low_level_features.get('harmonic_ratio'),
low_level_features.get('inharmonicity'),
low_level_features.get('pitch_mean'), low_level_features.get('key'), low_level_features.get('mode'),
low_level_features.get('brightness'), low_level_features.get('warmth'),
low_level_features.get('roughness'),
low_level_features.get('zero_crossing_rate'), low_level_features.get('crest_factor'),
low_level_features.get('transient_density'),
low_level_features.get('dissonance'), mfcc_str, segment_boundaries_str, structural_novelty_str,
audio_features.get('vocals', {}).get('vocal_presence_percentage'),
audio_features.get('vocals', {}).get('vocal_energy_ratio'),
audio_features.get('vocals', {}).get('min_pitch_hz'),
audio_features.get('vocals', {}).get('max_pitch_hz'),
audio_features.get('vocals', {}).get('avg_pitch_hz'),
audio_features.get('vocals', {}).get('vocal_range_semitones'),
audio_features.get('vocals', {}).get('pitch_variability'),
audio_features.get('bass', {}).get('bass_prominence_percentage'),
audio_features.get('bass', {}).get('sub_bass_presence_percentage'),
audio_features.get('bass', {}).get('bass_timbre_type'),
audio_features.get('bass', {}).get('note_density'),
audio_features.get('bass', {}).get('pattern_variation'),
audio_features.get('drums', {}).get('tempo'),
audio_features.get('drums', {}).get('spectral_variation'),
audio_features.get('harmonic', {}).get('chord_changes_per_minute'),
hook_repetition_offsets_str,
audio_features.get('mix', {}).get('frequency_balance_score'),
audio_features.get('mix', {}).get('stereo_correlation'),
audio_features.get('mix', {}).get('stereo_width'),
audio_features.get('mix', {}).get('dynamic_range_db')
)
cursor.execute(insert_query, data_to_insert)
new_id = cursor.fetchone()[0]
# Link in user_sounds
cursor.execute("SELECT id FROM sound WHERE title ILIKE %s ORDER BY id ASC LIMIT 1", (title,))
first_song = cursor.fetchone()
if first_song:
first_song_id = first_song[0]
cursor.execute("SELECT 1 FROM user_sounds WHERE user_id = %s AND sound_id = %s",
(user_id, first_song_id))
exists = cursor.fetchone()
if not exists:
cursor.execute("INSERT INTO user_sounds (user_id, sound_id) VALUES (%s, %s)",
(user_id, first_song_id))
else:
cursor.execute("""UPDATE user_sounds SET interaction_time = now() AT TIME ZONE 'US/Central'
WHERE user_id = %s AND sound_id = %s""", (user_id, first_song_id))
# Insert into user_sound_sequence
cursor.execute("""INSERT INTO user_sound_sequence (user_id, sound_id, sequence_number, title)
VALUES (%s, %s, %s, %s)""", (user_id, new_id, count, title))
# Update user's last_sequence_number
cursor.execute("UPDATE users SET last_sequence_number = %s WHERE id = %s", (count, user_id))
conn.commit()
conn.close()
return True
except Exception as e:
st.error(f"Database error: {str(e)}")
if 'conn' in locals():
conn.close()
return False
def record_and_recognize(num, current_user, count):
"""Record audio and recognize song."""
st.session_state.is_recording = True
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, f"output{num}.wav")
# Create placeholder for status updates
status_placeholder = st.empty()
progress_bar = st.progress(0)
pA = pyaudio.PyAudio()
mic = None
detected_flag = threading.Event()
session_time = None
session_date = None
detection_timestamp_captured = False
try:
status_placeholder.info("π€ Opening microphone...")
progress_bar.progress(10)
mic = pA.open(
format=pyaudio.paFloat32,
channels=1,
rate=settings["Microphone HZ"],
input=True,
frames_per_buffer=1024,
)
status_placeholder.info("π§ Listening for song start... (Make sure audio is playing)")
progress_bar.progress(20)
# Start recording
recording = sd.rec(
int(settings["TPR"] * settings["Microphone HZ"]),
samplerate=settings["Microphone HZ"],
channels=2,
)
# Detection function with status updates
def detect_volume():
nonlocal session_time, session_date, detection_timestamp_captured
check_count = 0
while not detected_flag.is_set():
try:
buffer = mic.read(1024, exception_on_overflow=False)
sample = np.frombuffer(buffer, dtype=aubio.float_type)
if len(sample) == 0:
continue
volume = np.sum(sample ** 2) / len(sample)
scaled_volume = float("{:6f}".format(volume)) * 1_000_000
check_count += 1
if check_count % 50 == 0: # Update every 50 checks
status_placeholder.info(
f"π§ Listening... Current volume: {int(scaled_volume)} (threshold: {settings['Sensitivity']})")
if scaled_volume > settings["Sensitivity"]:
if not detection_timestamp_captured:
cst = pytz.timezone("US/Central")
detected_time = datetime.now(pytz.UTC).astimezone(cst)
session_date = detected_time.strftime("%Y-%m-%d")
session_time = detected_time.strftime("%H:%M:%S")
detection_timestamp_captured = True
status_placeholder.success(
f"β
Detected music at {session_date} {session_time}! (Volume: {int(scaled_volume)})")
detected_flag.set()
except Exception as e:
st.warning(f"Detection error: {str(e)}")
continue
detect_thread = threading.Thread(target=detect_volume, daemon=True)
detect_thread.start()
progress_bar.progress(30)
status_placeholder.info("π Recording audio...")
sd.wait()
detected_flag.set()
detect_thread.join(timeout=2)
progress_bar.progress(50)
status_placeholder.info("πΎ Saving recording...")
write(output_file, settings["Microphone HZ"], recording)
# Check recorded volume
recorded_volume = np.sum(recording.astype(np.float32) ** 2) / len(recording)
avg_volume = round(recorded_volume * 1_000_000)
progress_bar.progress(60)
status_placeholder.info(f"π Recorded average volume: {avg_volume} (threshold: {settings['Sensitivity']})")
if avg_volume > settings["Sensitivity"]:
# Trim audio
progress_bar.progress(70)
status_placeholder.info("βοΈ Trimming audio...")
trimmed_dir = "trimmed_output"
os.makedirs(trimmed_dir, exist_ok=True)
trimmed_file = os.path.join(trimmed_dir, f"trimmed_output{num}.wav")
if not trim_audio(output_file, trimmed_file, settings["RecognitionDuration"]):
status_placeholder.error("β Failed to trim audio")
progress_bar.empty()
st.session_state.is_recording = False
return count
# Recognize song
progress_bar.progress(80)
status_placeholder.info("π Recognizing song with Audd.io API...")
with open(trimmed_file, "rb") as f:
response = requests.post(
"https://api.audd.io/",
data={
"api_token": settings["audd"]["api_token"],
"return": "apple_music,spotify",
},
files={"file": f},
timeout=30
)
result = response.json()
if result.get("status") == "success" and result.get("result"):
progress_bar.progress(90)
status_placeholder.info("π΅ Extracting audio features...")
song_data = [
result["result"]["title"],
result["result"].get("album", ""),
result["result"].get("artist", ""),
result["result"].get("label", ""),
result["result"].get("release_date", ""),
result["result"].get("apple_music", {}).get("durationInMillis",
result["result"].get("spotify", {}).get("duration_ms",
0)) / 1000,
]
# Extract audio features
audio_features = extract_audio_features(output_file)
if settings["postgres"]["enabled"] and audio_features:
status_placeholder.info("πΎ Saving to database...")
if write_to_database(song_data, count, audio_features, current_user, session_date, session_time):
progress_bar.progress(100)
status_placeholder.success(f"β
**Song #{count + 1}**: {song_data[0]} by {song_data[2]}")
st.balloons()
count += 1
time.sleep(3)
else:
progress_bar.empty()
status_placeholder.warning("β οΈ Song already exists or couldn't be saved")
else:
progress_bar.progress(100)
status_placeholder.success(f"β
Recognized: {song_data[0]} by {song_data[2]}")
else:
progress_bar.empty()
error_msg = result.get("error", {}).get("error_message", "Unknown error")
status_placeholder.error(f"β No music detected or API error: {error_msg}")
st.info(
"**Troubleshooting tips:**\n- Ensure music is playing loudly\n- Check microphone permissions")
else:
progress_bar.empty()
status_placeholder.warning(
f"β οΈ Audio not loud enough. Volume: {avg_volume}, Required: {settings['Sensitivity']}")
st.info("**Tips:** Increase volume or adjust sensitivity setting in config.json")
except Exception as e:
progress_bar.empty()
status_placeholder.error(f"β Error during recording: {str(e)}")
st.exception(e)
finally:
# Clear the status and progress bar when one cycle finishes
progress_bar.empty()
status_placeholder.empty()
if mic:
mic.close()
pA.terminate()
st.session_state.is_recording = False
time.sleep(0.5)
return count
# UI Components
def login_page():
"""Login/Registration page."""
st.title("π΅ Music Recognition Service")
st.markdown("---")
tab1, tab2 = st.tabs(["Login", "Register"])
with tab1:
st.subheader("Login")
email = st.text_input("Email", key="login_email")
password = st.text_input("Password", type="password", key="login_password")
if st.button("Login", type="primary"):
if not email or not password:
st.error("Please enter both email and password")
else:
with st.spinner("π Authenticating..."):
try:
conn = get_db_connection()
if not check_user_exists(conn, email):
st.error("No account exists with this email")
else:
user = login_user(conn, email, password)
if user:
st.session_state.authenticated = True
st.session_state.current_user = user
st.session_state.count = user[2] + 1
st.success(f"Welcome back, {user[1]}!")
time.sleep(0.5)
st.rerun()
else:
st.error("Incorrect password")
conn.close()
except Exception as e:
st.error(f"Database connection error: {str(e)}")
with tab2:
st.subheader("Register")
name = st.text_input("Name", key="register_name")
email = st.text_input("Email", key="register_email")
password = st.text_input("Password (min 8 characters)", type="password", key="register_password")
if st.button("Register", type="primary"):
if not name or not email or not password:
st.error("Please fill all fields")
elif not validate_email(email):
st.error("Invalid email format")
elif len(password) < 8:
st.error("Password must be at least 8 characters")
else:
with st.spinner("π Creating your account..."):
try:
conn = get_db_connection()
if check_user_exists(conn, email):
st.error("An account with this email already exists")
else:
user_id = register_user(conn, name, email, password)
if user_id:
st.success(f"Registration successful! Your account ID is: {user_id}")
st.session_state.authenticated = True
st.session_state.current_user = (user_id, name, 0)
st.session_state.count = 1
time.sleep(0.5)
st.rerun()
else:
st.error("Registration failed")
conn.close()
except Exception as e:
st.error(f"Database connection error: {str(e)}")
def main_page():
"""Main application page."""
st.title("π΅ Music Recognition Service")
# Sidebar
with st.sidebar:
st.header(f"π€ {st.session_state.current_user[1]}")
st.write(f"Songs recorded: {st.session_state.count - 1}")
if st.button("Logout", type="secondary"):
st.session_state.authenticated = False
st.session_state.current_user = None
st.session_state.count = 0
st.rerun()
# Main tabs
tab1, tab2 = st.tabs(["ποΈ Record Music", "π Music History"])
with tab1:
st.header("Record New Music")
# --- NEW: Continuous Toggle Button ---
if st.session_state.continuous:
button_text = "βΉοΈ Stop Recording"
button_type = "secondary"
else:
button_text = "π€ Start Recording"
button_type = "primary"
# Disable the button if a recording is actively running to prevent issues
if st.button(button_text, type=button_type, use_container_width=True,
disabled=st.session_state.is_recording):
# Toggle the continuous state
st.session_state.continuous = not st.session_state.continuous
# When stopping, clear any existing persistent messages.
if not st.session_state.continuous:
st.session_state.continuous_status_placeholder.empty()
st.session_state.is_recording = False
# Rerun to update the button and kick off the next cycle (if starting)
st.rerun()
st.markdown("---")
# Show instructions ONLY when not recording AND not in continuous mode
if not st.session_state.is_recording and not st.session_state.continuous:
st.info("""
**How to use:**
1. Click the 'Start Recording' button above.
2. Play music from any source.
3. The system will detect, recognize, and save the song.
4. **The process will automatically restart after each successful detection.**
""")
st.markdown("---")
# --- NEW: AUTO-RERUN/CONTINUOUS LOGIC ---
if st.session_state.continuous:
# Display persistent indicator that continuous mode is ON
st.session_state.continuous_status_placeholder.info(
"π΄ **CONTINUOUS RECORDING ACTIVE** β Now listening for music..."
)
# Only start a new cycle if one isn't currently running
if not st.session_state.is_recording:
try:
# CALL THE RECORDING FUNCTION (This runs, displays its *internal* bar, and clears it)
new_count = record_and_recognize(
st.session_state.count,
st.session_state.current_user,
st.session_state.count
)
# Update state after cycle completion
st.session_state.count = new_count
# Check state and RERUN to immediately start the next cycle
if st.session_state.continuous:
# 1. USE THE PERSISTENT PLACEHOLDER FOR THE SUCCESS MESSAGE
st.session_state.continuous_status_placeholder.success(
"β
Recognition cycle complete. **Restarting recording** in a moment..."
)
# 2. ADD A DELAY HERE TO SHOW THE SUCCESS MESSAGE
time.sleep(1) # <-- Keep your desired delay here
# 3. UPDATE THE PERSISTENT PLACEHOLDER TO 'LISTENING' BEFORE RERUN
st.session_state.continuous_status_placeholder.info(
"π΄ **CONTINUOUS RECORDING ACTIVE** β Now listening for music..."
)
st.rerun() # THIS IS THE KEY TO CONTINUOUS OPERATION
except Exception as e:
st.session_state.continuous_status_placeholder.error(f"An error occurred: {e}")
# Do NOT rerun if there's an error, let the user stop/restart manually
st.exception(e)
# If the script reruns while is_recording is True (e.g. from user interaction),
# the status should reflect that the worker is busy.
elif st.session_state.is_recording:
st.session_state.continuous_status_placeholder.info(
"π΄ **CONTINUOUS RECORDING ACTIVE** β Recognition in progress..."
)
else:
# Clear the placeholder if continuous mode is stopped
st.session_state.continuous_status_placeholder.empty()
with tab2:
st.header("Your Music History")
with st.spinner("π Loading your music history..."):
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT s.id, s.title, s.artist, s.album, s.date, s.time, us.liked
FROM user_sounds us
JOIN sound s ON us.sound_id = s.id
WHERE us.user_id = %s
ORDER BY s.id DESC
""", (st.session_state.current_user[0],))
rows = cursor.fetchall()
conn.close()
if rows:
st.success(f"Found {len(rows)} song(s) in your history")
for idx, (sid, title, artist, album, date, recorded_time, liked) in enumerate(rows):
with st.container():
col1, col2, col3 = st.columns([3, 1, 1])
with col1:
st.write(f"**{title}**")
st.caption(f"{artist} - {album}")
st.caption(f"Recorded: {date} at {recorded_time}")
with col2:
liked_display = "β" if liked is None else ("π" if liked else "π")
st.write(liked_display)
with col3:
if st.button("Rate", key=f"rate_{sid}"):
st.session_state[f"rating_{sid}"] = True
if st.session_state.get(f"rating_{sid}", False):
feedback = st.radio(
"Do you like this song?",
["π Like", "π Dislike"],
key=f"feedback_{sid}",
horizontal=True
)
if st.button("Submit", key=f"submit_{sid}"):
with st.spinner("πΎ Updating feedback..."):
liked_val = feedback == "π Like"
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE user_sounds
SET liked = %s, interaction_time = now() AT TIME ZONE 'US/Central'
WHERE user_id = %s AND sound_id = %s
""", (liked_val, st.session_state.current_user[0], sid))
conn.commit()
conn.close()
st.success("Feedback updated!")
st.session_state[f"rating_{sid}"] = False
time.sleep(0.5)
st.rerun()
st.markdown("---")
else:
st.info("No music recorded yet. Start recording to build your music history!")
except Exception as e:
st.error(f"Error loading music history: {str(e)}")
st.exception(e)
# ---------------------------------------
# FINAL THEME TOGGLE BUTTON
# ---------------------------------------
st.markdown("---")
if st.button("π Toggle Light / Dark Mode", use_container_width=True):
st.session_state.theme = "dark" if st.session_state.theme == "light" else "light"
st.rerun()
# Main App Logic
if not st.session_state.authenticated:
login_page()
else:
main_page()