from shiny import App, Inputs, Outputs, Session, reactive, ui, render
import pandas as pd
import os
import json
import random
from collections import Counter
from pathlib import Path
import uuid
import datetime
import re
import shiny.experimental as x
# ============== Configuration and Global Variables ==============
# Video URL mapping
VIDEO_NAME_TO_DRIVE_URL = {
"0d86ddd51beff56fe35d01930cf345550f0f736e0104d18ff61ece6f310f0181.mp4": "https://drive.google.com/file/d/10QPW6eiYl1qRNCjCOTSLAJ8qMfol1_xO/view?usp=sharing",
# ... rest of your video URL mappings ...
}
# Global variables to track assignments
model_clip_assignments = Counter()
model_pool = []
clip_pool = []
valid_combinations = []
user_responses = {}
# ============== Helper Functions ==============
def initialize_pools():
"""Initialize the model and clip pools from the directory structure and existing feedback."""
global model_pool, clip_pool, valid_combinations, model_clip_assignments
root_dir = Path("assets/evals/model_caption_strategy_base_frames16/judge_model_gpt-4o")
models = set()
clips = set()
combinations = []
for clip_dir in root_dir.glob("*"):
if clip_dir.is_dir():
clip_name = clip_dir.name
clips.add(clip_name)
for json_file in clip_dir.glob("REF_human/TAR_*.json"):
model_name = json_file.name.replace("TAR_", "").replace(".json", "")
models.add(model_name)
combinations.append((model_name, clip_name))
model_pool = sorted(list(models))
clip_pool = sorted(list(clips))
valid_combinations = combinations
# Initialize assignments from existing feedback files
os.makedirs("feedback", exist_ok=True)
feedback_files = list(Path("feedback").glob("*.csv"))
for file_path in feedback_files:
if file_path.exists():
try:
df = pd.read_csv(file_path)
# Extract model and clip from the dataframe
if "model_name" in df.columns and "clip_name" in df.columns:
for _, row in df.iterrows():
model_name = row["model_name"]
clip_name = row["clip_name"]
model_clip_assignments[(model_name, clip_name)] += 1
except Exception as e:
print(f"Error loading feedback file {file_path}: {e}")
print(f"Initialized pools with {len(model_pool)} models and {len(clip_pool)} clips")
print(f"Valid combinations: {len(valid_combinations)}")
print(f"Loaded {sum(model_clip_assignments.values())} existing assignments")
return model_pool, clip_pool
def get_random_assignment():
"""Get a random model-clip combination with uniform sampling."""
global model_clip_assignments, valid_combinations
if not valid_combinations:
initialize_pools()
# Find the combinations with minimum assignments
min_count = min(model_clip_assignments.get(combo, 0) for combo in valid_combinations)
min_assigned_combos = [combo for combo in valid_combinations if model_clip_assignments.get(combo, 0) == min_count]
# Randomly choose one of the minimally assigned combinations
chosen_combo = random.choice(min_assigned_combos)
# Update the counter
model_clip_assignments[chosen_combo] += 1
return chosen_combo
def get_judgments(model_name, clip_name):
"""Load judgments for a specific model and clip."""
file_path = f"assets/evals/model_caption_strategy_base_frames16/judge_model_gpt-4o/{clip_name}/REF_human/TAR_{model_name}.json"
if not os.path.exists(file_path):
return None
with open(file_path, "r") as f:
data = json.load(f)
# Process result_tuple to create a list of judgment items
judgments = []
for item in data.get("result_tuple", []):
if len(item) >= 6:
line_number = item[0]
line_text = item[1]
evidence = item[2]
if "- Reasoning" in evidence:
evidence = "N/A"
sentence_type = item[3]
reasoning = item[4]
verdict = item[5]
judgments.append({
"line_number": line_number,
"line_text": line_text,
"evidence": evidence,
"sentence_type": sentence_type,
"reasoning": reasoning,
"verdict": verdict,
})
# Check if video exists
video_url = VIDEO_NAME_TO_DRIVE_URL.get(f"{clip_name}.mp4", None)
return {
"human_caption": data.get("human_caption", ""),
"model_caption": data.get("model_caption", ""),
"result_raw": data.get("Result_raw", ""),
"judgments": judgments,
"video_url": video_url,
}
def save_feedback(user_id, user_data):
"""Save all user feedback to a CSV file."""
if not user_data or "responses" not in user_data or not user_data["responses"]:
return False
model_name = user_data["model_name"]
clip_name = user_data["clip_name"]
username = user_data["username"]
start_time = user_data["start_time"]
end_time = datetime.datetime.now().isoformat()
# Create a list of dictionaries for each response
rows = []
for line_number, response in user_data["responses"].items():
rows.append({
"user_id": user_id,
"username": username,
"model_name": model_name,
"clip_name": clip_name,
"line_number": line_number,
"feedback": response["feedback"],
"response_time": response["timestamp"],
"session_start": start_time,
"session_end": end_time,
})
if not rows:
return False
# Create and save dataframe
os.makedirs("feedback", exist_ok=True)
feedback_file = f"feedback/{model_name}_{clip_name}_feedback.csv"
df = pd.DataFrame(rows)
if os.path.exists(feedback_file):
existing_df = pd.read_csv(feedback_file)
# Avoid duplicates
if "user_id" in existing_df.columns and "line_number" in existing_df.columns:
for _, row in df.iterrows():
if ((existing_df["user_id"] == row["user_id"]) &
(existing_df["line_number"] == row["line_number"])).any():
continue
existing_df = pd.concat([existing_df, pd.DataFrame([row])], ignore_index=True)
else:
existing_df = pd.concat([existing_df, df], ignore_index=True)
existing_df.to_csv(feedback_file, index=False)
else:
df.to_csv(feedback_file, index=False)
return True
# ============== UI Components ==============
# Login page
def login_page():
return ui.div(
ui.h1("Caption Judgment Evaluator"),
ui.p("Please enter your name to begin evaluating captions."),
ui.input_text("username", "Your Name", placeholder="Enter your name"),
ui.input_action_button("start_evaluation", "Start Evaluation", class_="btn-primary"),
ui.div(id="login_error", class_="text-danger mt-2")
)
# Caption evaluation page
def evaluation_page():
return ui.div(
ui.h1("Evaluating Caption Judgments"),
ui.p(ui.output_text("clip_name_header"), class_="lead"),
ui.div(
ui.div(
ui.h4("Video Clip:"),
ui.output_ui("video_player"),
ui.div(
ui.p(ui.tags.strong("Having trouble viewing the video?")),
ui.p(ui.a("Open the video in a new tab", id="video_link", target="_blank", class_="btn btn-primary btn-sm")),
ui.p("Please make sure to watch the video before evaluating the captions."),
class_="alert alert-info", id="video_fallback_message"
)
),
ui.div(
ui.h4("Human Caption:"),
ui.output_text("human_caption"),
ui.h4("Model Caption:"),
ui.output_text("model_caption"),
),
ui.h3("Judgments"),
ui.div(
"Please review each judgment below and indicate whether you agree or disagree with it. "
"You must provide feedback for all judgments before submitting.",
class_="alert alert-info"
),
ui.output_ui("judgments_ui"),
ui.div(
ui.input_action_button("submit_all", "Submit All Responses", class_="btn btn-lg btn-primary"),
ui.output_ui("submit_message"),
class_="mt-5 mb-5 text-center"
),
class_="caption-box"
)
)
# Thank you page
def thank_you_page():
return ui.div(
ui.h1("Thank You!"),
ui.p("Your feedback has been successfully recorded."),
ui.p("Would you like to evaluate another clip?"),
ui.input_action_button("start_new_evaluation", "Evaluate Another Clip", class_="btn btn-primary")
)
# Main UI layout
app_ui = ui.navset_hidden(
ui.nav("login", login_page()),
ui.nav("evaluation", evaluation_page()),
ui.nav("thank_you", thank_you_page()),
id="main_page"
)
# ============== Server Logic ==============
def server(input: Inputs, output: Outputs, session: Session):
# Initialize data on startup
initialize_pools()
# Store user data in reactive values
user_data = reactive.Value({
"user_id": None,
"username": None,
"model_name": None,
"clip_name": None,
"start_time": None,
"judgment_data": None,
"responses": {},
"all_answered": False
})
@reactive.Effect
@reactive.event(input.start_evaluation)
def handle_login():
username = input.username()
if not username.strip():
ui.update_text("login_error", "Please enter your name to continue")
return
# Generate user ID and setup session
user_id = str(uuid.uuid4())
model_name, clip_name = get_random_assignment()
judgment_data = get_judgments(model_name, clip_name)
# Store values
user_data.set({
"user_id": user_id,
"username": username,
"model_name": model_name,
"clip_name": clip_name,
"start_time": datetime.datetime.now().isoformat(),
"judgment_data": judgment_data,
"responses": {},
"all_answered": False
})
# Navigate to evaluation page
ui.update_navs("main_page", "evaluation")
@output
@render.text
def clip_name_header():
if user_data()["clip_name"]:
return f"Clip: {user_data()['clip_name']}"
return ""
@output
@render.ui
def video_player():
judgment_data = user_data()["judgment_data"]
if judgment_data and judgment_data["video_url"]:
# Format embed URL for Google Drive if needed
video_url = judgment_data["video_url"]
if "drive.google.com" in video_url:
file_id = re.search(r"/d/([^/]+)/", video_url)
if file_id:
embed_url = f"https://drive.google.com/file/d/{file_id.group(1)}/preview"
return ui.tags.iframe(
src=embed_url,
width="100%",
height="400px",
allowfullscreen=True,
frameborder="0"
)
# Update the video link
ui.update_navs("video_link", href=video_url)
# Return iframe or video tag as appropriate
return ui.tags.iframe(
src=video_url,
width="100%",
height="400px",
allowfullscreen=True,
frameborder="0"
)
else:
return ui.div(
ui.h4("Video Not Available"),
ui.p("No video was found for this clip. Please evaluate based on the captions provided."),
class_="mb-4 alert alert-warning"
)
@output
@render.text
def human_caption():
judgment_data = user_data()["judgment_data"]
if judgment_data:
return judgment_data["human_caption"]
return ""
@output
@render.text
def model_caption():
judgment_data = user_data()["judgment_data"]
if judgment_data:
return judgment_data["model_caption"]
return ""
@output
@render.ui
def judgments_ui():
judgment_data = user_data()["judgment_data"]
if not judgment_data:
return ui.div()
judgments = judgment_data["judgments"]
judgment_divs = []
for judgment in judgments:
line_number = judgment["line_number"]
# Create judgment box
judgment_box = ui.div(
ui.div(
ui.tags.strong(f"Line {line_number}:"),
f" {judgment['line_text']}",
class_="section"
),
ui.div(
ui.tags.strong("Verdict:"),
ui.span(
judgment["verdict"],
class_=f"verdict-{'correct' if judgment['verdict'] == 'entailment' else 'incorrect' if judgment['verdict'] == 'contradiction' else 'neutral'}"
),
class_="section"
),
ui.div(
ui.tags.strong("Reasoning:"),
f" {judgment['reasoning']}",
class_="section"
),
ui.div(
ui.tags.strong("Evidence from Human Caption:"),
f" {judgment['evidence']}",
class_="section"
),
ui.div(
ui.input_radio_buttons(
f"feedback_{line_number}",
None,
{
"agree": "I Agree with this Judgment",
"disagree": "I Disagree with this Judgment"
},
inline=True
),
ui.span(
"Feedback recorded",
id=f"feedback_received_{line_number}",
class_="feedback-received",
style="display: none;"
),
class_="section mt-3"
),
id=f"judgment-{line_number}",
class_="judgment-box"
)
judgment_divs.append(judgment_box)
return ui.div(*judgment_divs)
@reactive.Effect
def handle_feedback_updates():
# Check for changes in any feedback radio buttons
judgment_data = user_data()["judgment_data"]
if not judgment_data:
return
responses = user_data()["responses"].copy()
all_answered = True
for judgment in judgment_data["judgments"]:
line_number = str(judgment["line_number"])
feedback_id = f"feedback_{line_number}"
# Skip if the input doesn't exist yet
if feedback_id not in input:
all_answered = False
continue
feedback = input[feedback_id]()
if feedback:
# Update responses if not already recorded
if line_number not in responses or responses[line_number]["feedback"] != feedback:
responses[line_number] = {
"feedback": feedback,
"timestamp": datetime.datetime.now().isoformat()
}
# Show the feedback received message
ui.update_text(
f"feedback_received_{line_number}",
style="display: inline;"
)
else:
all_answered = False
# Update user data with new responses
current_data = user_data.get()
current_data["responses"] = responses
current_data["all_answered"] = all_answered
user_data.set(current_data)
@output
@render.ui
def submit_message():
if not user_data()["all_answered"]:
return ui.p("Please provide feedback for all judgments before submitting.", class_="text-danger mt-2")
return ui.div()
@reactive.Effect
@reactive.event(input.submit_all)
def handle_submission():
if user_data()["all_answered"]:
# Save feedback to file
save_feedback(user_data()["user_id"], user_data())
# Navigate to thank you page
ui.update_navs("main_page", "thank_you")
@reactive.Effect
@reactive.event(input.start_new_evaluation)
def start_new_evaluation():
# Get a new assignment
model_name, clip_name = get_random_assignment()
judgment_data = get_judgments(model_name, clip_name)
# Reset user data for new evaluation
current_data = user_data.get()
current_data.update({
"model_name": model_name,
"clip_name": clip_name,
"start_time": datetime.datetime.now().isoformat(),
"judgment_data": judgment_data,
"responses": {},
"all_answered": False
})
user_data.set(current_data)
# Navigate back to evaluation page
ui.update_navs("main_page", "evaluation")
# ============== Main App ==============
# Apply custom CSS
app_css = """
/* Custom styling for the application */
.caption-box {
background-color: #f9f9f9;
padding: 20px;
border-radius: 5px;
margin-bottom: 20px;
}
.judgment-box {
background-color: white;
border: 1px solid #ddd;
padding: 15px;
margin-bottom: 15px;
border-radius: 5px;
}
.section {
margin-bottom: 10px;
}
.verdict-correct {
color: green;
font-weight: bold;
}
.verdict-incorrect {
color: red;
font-weight: bold;
}
.verdict-neutral {
color: orange;
font-weight: bold;
}
.feedback-received {
margin-left: 10px;
color: green;
font-style: italic;
}
"""
app = App(
ui=ui.page_fluid(
ui.tags.style(app_css),
ui.panel_title("Caption Judgment Evaluation"),
app_ui
),
server=server
)