import dash
from dash import dcc, html, Input, Output, dash_table
import dash_bootstrap_components as dbc
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import LabelEncoder, StandardScaler
from datetime import datetime, timedelta
# App Configuration
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.MATERIA, dbc.icons.FONT_AWESOME])
app.title = "Intelligent Construction Anomaly Detector"
# Initial Global Variables
df = pd.DataFrame()
df_features = pd.DataFrame()
X_scaled = np.array([])
scaler = None
label_encoders = {}
# Default map coordinates
map_center_lat = 34.0522
map_center_lon = -118.2437
# --- Advanced Anomaly Metric Calculation ---
def calculate_anomaly_metrics(df_row, full_df):
"""Calculates specific metrics that explain why something is anomalous."""
metrics = {}
cost_val = df_row.get('estprojectcost', 0)
time_val = df_row.get('construction_time_days', 0)
if time_val > 0:
metrics['cost_per_day'] = cost_val / time_val
else:
metrics['cost_per_day'] = cost_val
workclass_name = df_row.get('workclass')
if 'workclass' in full_df.columns and workclass_name in full_df['workclass'].values:
workclass_data = full_df[full_df['workclass'] == workclass_name]
workclass_avg_cost = workclass_data['estprojectcost'].median() if not workclass_data.empty else 0
metrics['cost_deviation_percent'] = ((cost_val - workclass_avg_cost) / workclass_avg_cost * 100) if workclass_avg_cost > 0 else 0
metrics['workclass_avg_cost'] = workclass_avg_cost
workclass_avg_time = workclass_data['construction_time_days'].median() if not workclass_data.empty else 0
metrics['time_deviation_percent'] = ((time_val - workclass_avg_time) / workclass_avg_time * 100) if workclass_avg_time > 0 else 0
metrics['workclass_avg_time'] = workclass_avg_time
else:
overall_avg_cost = full_df['estprojectcost'].median() if not full_df.empty else 0
metrics['cost_deviation_percent'] = ((cost_val - overall_avg_cost) / overall_avg_cost * 100) if overall_avg_cost > 0 else 0
metrics['workclass_avg_cost'] = overall_avg_cost
overall_avg_time = full_df['construction_time_days'].median() if not full_df.empty else 0
metrics['time_deviation_percent'] = ((time_val - overall_avg_time) / overall_avg_time * 100) if overall_avg_time > 0 else 0
metrics['workclass_avg_time'] = overall_avg_time
return metrics
# --- Enhanced Anomaly Categorization ---
def categorize_anomaly(row, df_stats):
"""Categorizes the type of anomaly with detailed analysis."""
if not row['is_anomaly']:
return 'Normal'
categories = []
cost_dev = row.get('cost_deviation_percent', 0)
time_dev = row.get('time_deviation_percent', 0)
cons_time = row.get('construction_time_days', 0)
est_cost = row.get('estprojectcost', 0)
work_class = row.get('workclass', '')
cost_dev_threshold = df_stats.get('cost_dev_std', 200) * 1.5
time_dev_threshold = df_stats.get('time_dev_std', 150) * 1.5
if abs(cost_dev) > cost_dev_threshold:
categories.append('Extreme Cost')
if abs(time_dev) > time_dev_threshold:
categories.append('Extreme Time')
if cons_time < 5 and est_cost > 100000 and row.get('construction_time_days') >= 0:
categories.append('Suspiciously Rapid Approval')
if cons_time > 365 and row.get('construction_time_days') > 0:
categories.append('Excessive Delay')
if est_cost > 500000 and work_class not in ['Commercial', 'Industrial', 'Multi-Family']:
categories.append('Unusual Cost for Type')
if 'anomaly_score' in row and 'anomaly_score_5th_percentile' in df_stats:
if row['anomaly_score'] < df_stats['anomaly_score_5th_percentile']:
categories.append('Critical Anomaly Score')
return ' | '.join(categories) if categories else 'General Anomaly'
# --- Load and Preprocess Dataset ---
def load_and_preprocess_data():
global df, df_features, X_scaled, scaler, map_center_lat, map_center_lon, label_encoders
try:
temp_df = pd.read_csv("Building_Permits_Issued_Past_180_Days.csv")
print(f"Dataset loaded: {len(temp_df)} records.")
# Convert dates
temp_df['applieddate'] = pd.to_datetime(temp_df['applieddate'], errors='coerce')
temp_df['issueddate'] = pd.to_datetime(temp_df['issueddate'], errors='coerce')
temp_df['expiresdate'] = pd.to_datetime(temp_df['expiresdate'], errors='coerce')
# Calculate temporal metrics
temp_df['construction_time_days'] = (temp_df['issueddate'] - temp_df['applieddate']).dt.days.fillna(0).astype(int)
temp_df['days_to_expire'] = (temp_df['expiresdate'] - temp_df['issueddate']).dt.days.fillna(365).astype(int)
temp_df['application_month'] = temp_df['applieddate'].dt.month
temp_df['application_weekday'] = temp_df['applieddate'].dt.dayofweek
# Clean data
temp_df.dropna(subset=['estprojectcost', 'construction_time_days', 'latitude_perm', 'longitude_perm'], inplace=True)
# Fill categorical columns
categorical_cols_to_fill = ['workclass', 'permitclassmapped', 'censuslanduse',
'contractorcompanyname', 'contractorcity', 'statuscurrentmapped',
'proposedworkdescription', 'originaladdress']
for col in categorical_cols_to_fill:
if col in temp_df.columns:
temp_df[col] = temp_df[col].fillna('Unknown')
else:
temp_df[col] = 'Unknown'
# Quality filters
temp_df = temp_df[temp_df['estprojectcost'] > 0]
temp_df = temp_df[temp_df['construction_time_days'] >= 0]
temp_df = temp_df[temp_df['construction_time_days'] <= 730]
# Create derived features
if 'totalactualhpfsf' in temp_df.columns and not temp_df['totalactualhpfsf'].isnull().all():
temp_df['cost_per_sqft_estimate'] = temp_df['estprojectcost'] / np.maximum(temp_df['totalactualhpfsf'].fillna(1), 1)
elif 'housingunitstotal' in temp_df.columns and not temp_df['housingunitstotal'].isnull().all():
temp_df['cost_per_sqft_estimate'] = temp_df['estprojectcost'] / np.maximum(temp_df['housingunitstotal'].fillna(1), 1)
else:
temp_df['cost_per_sqft_estimate'] = temp_df['estprojectcost']
if 'applieddate' in temp_df.columns:
temp_df['is_weekend_application'] = temp_df['application_weekday'].isin([5, 6])
else:
temp_df['is_weekend_application'] = False
temp_df['high_value_project'] = temp_df['estprojectcost'] > temp_df['estprojectcost'].quantile(0.9)
print(f"Records after initial preprocessing: {len(temp_df)}")
if temp_df.empty:
raise ValueError("DataFrame is empty after initial preprocessing.")
# Prepare features for ML
ml_features_candidates = [
'estprojectcost', 'construction_time_days', 'latitude_perm', 'longitude_perm',
'fee', 'application_month', 'application_weekday', 'days_to_expire',
'workclass', 'permitclassmapped', 'censuslanduse', 'originalzip',
'cost_per_sqft_estimate', 'is_weekend_application', 'high_value_project'
]
ml_features = [col for col in ml_features_candidates if col in temp_df.columns]
df_features = temp_df[ml_features].copy()
# Encode categorical features
categorical_cols_for_ml = ['workclass', 'permitclassmapped', 'censuslanduse']
label_encoders = {}
for col in categorical_cols_for_ml:
if col in df_features.columns:
le = LabelEncoder()
df_features[col] = df_features[col].astype(str).fillna('Unknown')
df_features[col] = le.fit_transform(df_features[col])
label_encoders[col] = le
# Clean numerical data
for col in df_features.columns:
if df_features[col].dtype == 'object':
try:
df_features[col] = pd.to_numeric(df_features[col], errors='coerce')
except ValueError:
df_features[col] = df_features[col].apply(lambda x: hash(x) % 1000 if pd.notna(x) else 0)
if df_features[col].dtype in ['float64', 'int64', 'bool']:
df_features[col] = df_features[col].replace([np.inf, -np.inf], np.nan)
df_features[col] = df_features[col].fillna(df_features[col].median() if not df_features[col].empty else 0)
else:
df_features[col] = df_features[col].fillna(df_features[col].mode()[0] if not df_features[col].mode().empty else 'Missing')
print(f"NaN values after final cleanup for ML:")
print(df_features.isnull().sum())
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_features)
if np.isnan(X_scaled).any():
print("Warning: NaNs still present in scaled data after processing. Imputing to 0.")
X_scaled = np.nan_to_num(X_scaled)
# Apply initial Isolation Forest
initial_contamination_rate = 0.02
model = IsolationForest(contamination=initial_contamination_rate, random_state=42, n_estimators=100)
model.fit(X_scaled)
temp_df['anomaly_score'] = model.decision_function(X_scaled)
temp_df['is_anomaly'] = model.predict(X_scaled) == -1
# Calculate anomaly metrics
anomaly_metrics_list = []
for idx, row in temp_df.iterrows():
metrics = calculate_anomaly_metrics(row, temp_df)
anomaly_metrics_list.append(metrics)
metrics_df = pd.DataFrame(anomaly_metrics_list, index=temp_df.index)
for col in metrics_df.columns:
temp_df[col] = metrics_df[col]
# Calculate dynamic thresholds
df_stats = {
'cost_dev_std': temp_df['cost_deviation_percent'].std() if not temp_df['cost_deviation_percent'].empty else 200,
'time_dev_std': temp_df['time_deviation_percent'].std() if not temp_df['time_deviation_percent'].empty else 150,
'anomaly_score_5th_percentile': temp_df[temp_df['is_anomaly']]['anomaly_score'].quantile(0.05) if not temp_df[temp_df['is_anomaly']].empty else -np.inf
}
# Categorize anomaly types
temp_df['anomaly_type'] = temp_df.apply(lambda row: categorize_anomaly(row, df_stats), axis=1)
# Map Center
if not temp_df.empty and 'latitude_perm' in temp_df.columns and 'longitude_perm' in temp_df.columns:
map_center_lat = temp_df['latitude_perm'].mean()
map_center_lon = temp_df['longitude_perm'].mean()
else:
map_center_lat = 34.0522
map_center_lon = -118.2437
df = temp_df
print(f"Anomalies initially detected: {df['is_anomaly'].sum()}")
except FileNotFoundError:
print("Error: CSV file 'Building_Permits_Issued_Past_180_Days.csv' not found.")
df = pd.DataFrame()
except Exception as e:
print(f"Error during data processing: {e}")
df = pd.DataFrame()
# Load data at startup
load_and_preprocess_data()
# --- Enhanced Layout with Improved Cards ---
app.layout = dbc.Container([
# Header with improved styling
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H1([
html.I(className="fas fa-search-location me-3"),
"Intelligent Construction Anomaly Detector"
], className="text-center mb-3 text-primary fw-bold"),
html.P("Predictive ML: Uncovering Construction Irregularities.",
className="text-center text-muted mb-0 lead")
])
], className="shadow-sm border-0", style={'background': 'linear-gradient(135deg, #FAD7A1 0%, #E96D71 100%)',
#'linear-gradient(135deg, #D4FC79 0%, #96E6A1 100%)',
# 'linear-gradient(135deg, #667eea 0%, #764ba2 100%)',
'color': 'white'})
])
], className="mb-4"),
# Enhanced Executive Summary
html.Div(id='executive-summary', className="mb-4"),
# Improved Controls Card
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-filter me-2"),
html.Strong("Visualization Controls")
]),
dbc.CardBody([
html.Label("Filter & Display Options:", className="fw-bold mb-2"),
dcc.Dropdown(
id='consolidated-filter-dropdown',
options=[
{'label': 'π All Projects', 'value': 'all_projects'},
{'label': 'π Only Anomalies - All Types', 'value': 'anomalies_all'},
{'label': 'π° Only Anomalies - Extreme Cost', 'value': 'anomalies_cost'},
{'label': 'β±οΈ Only Anomalies - Extreme Time', 'value': 'anomalies_time'},
{'label': 'β‘ Only Anomalies - Rapid Approval', 'value': 'anomalies_rapid'},
{'label': 'π Only Anomalies - Excessive Delay', 'value': 'anomalies_delay'},
{'label': 'β οΈ Critical Anomalies - All Types', 'value': 'critical_all'},
{'label': 'π΄ Critical - Extreme Cost', 'value': 'critical_cost'},
{'label': 'π΄ Critical - Extreme Time', 'value': 'critical_time'}
],
value='anomalies_all',
clearable=False
)
])
], className="shadow-sm")
], md=7),
dbc.Col([
dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-sliders-h me-2"),
html.Strong("Detection Sensitivity")
]),
dbc.CardBody([
html.Label("Anomaly Detection Rate:", className="fw-bold mb-2"),
dcc.Slider(
id='contamination-slider',
min=0.005, max=0.05, step=0.005, value=0.02,
marks={i/100: f'{i:.1f}%' for i in [0.5, 1, 2, 3, 4, 5]},
tooltip={"placement": "bottom", "always_visible": True}
)
])
], className="shadow-sm")
], md=5)
], className="mb-4"),
# Enhanced Map Card
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-map-marked-alt me-2"),
html.H4("Geospatial Anomaly Distribution", className="mb-0"),
html.Small("Interactive map showing project locations colored by anomaly status", className="text-muted")
]),
dbc.CardBody(dcc.Graph(id='anomaly-map', style={'height': '700px'}))
], className="shadow")
])
], className="mb-4"),
# Enhanced Critical Anomaly Panel
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-exclamation-triangle me-2"),
html.H4("Investigation Panel", className="mb-1"),
html.Small("Detailed analysis of flagged projects requiring attention", className="text-muted")
]),
dbc.CardBody([
# Enhanced Anomaly Selector
dbc.Row([
dbc.Col([
html.Label([
html.I(className="fas fa-search me-2"),
"Select Flagged Project for Investigation:"
], className="fw-bold mb-2"),
dcc.Dropdown(
id='anomaly-selector',
placeholder="Choose a project to investigate...",
style={'marginBottom': '20px'}
)
])
]),
# Enhanced Detail Panel
html.Div(id='anomaly-detail-panel', children=[
dbc.Alert([
html.I(className="fas fa-info-circle me-2"),
"Select a flagged project from the dropdown to begin investigation."
], color="info", className="text-center")
])
])
], className="shadow")
])
], className="mb-4"),
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardBody([
html.Hr(className="mb-3"),
html.P([
"Web Aplication developed with ",
html.A("Python/Plotly/Dash", href="https://plotly.com/dash/", target="_blank", className="text-decoration-none"),
" | Thank you to the ",
html.A("Raleigh Open Data 2025", href="https://data-ral.opendata.arcgis.com/", target="_blank", className="text-decoration-none")
], className="text-center text-muted mb-2"),
])
], className="border-0", style={'backgroundColor': 'transparent'})
])
], className="mt-4")
], fluid=True, className="py-4", style={'backgroundColor': '#e3f2fd'})
# --- Enhanced Callbacks ---
@app.callback(
[Output('executive-summary', 'children'),
Output('anomaly-map', 'figure'),
Output('anomaly-selector', 'options')],
[Input('consolidated-filter-dropdown', 'value'),
Input('contamination-slider', 'value')]
)
def update_dashboard(consolidated_filter, contamination_val):
global df, X_scaled, scaler, map_center_lat, map_center_lon
if df.empty or X_scaled.size == 0 or scaler is None:
empty_fig = px.scatter_mapbox(lat=[map_center_lat], lon=[map_center_lon], zoom=8, mapbox_style="outdoors",
title="No data available to visualize. Please ensure a valid CSV file is loaded.")
empty_fig.update_layout(margin={"r":0,"t":50,"l":0,"b":0})
empty_executive_summary = dbc.Alert([
html.I(className="fas fa-exclamation-triangle me-2"),
"No data available or processing failed. Check console for errors."
], color="danger", className="text-center")
return empty_executive_summary, empty_fig, []
# Re-run model with new contamination
model = IsolationForest(contamination=contamination_val, random_state=42, n_estimators=100)
model.fit(X_scaled)
df['anomaly_score'] = model.decision_function(X_scaled)
df['is_anomaly'] = model.predict(X_scaled) == -1
# Recalculate anomaly types
df_stats = {
'cost_dev_std': df['cost_deviation_percent'].std() if not df['cost_deviation_percent'].empty else 200,
'time_dev_std': df['time_deviation_percent'].std() if not df['time_deviation_percent'].empty else 150,
'anomaly_score_5th_percentile': df[df['is_anomaly']]['anomaly_score'].quantile(0.05) if not df[df['is_anomaly']].empty else -np.inf
}
df['anomaly_type'] = df.apply(lambda row: categorize_anomaly(row, df_stats), axis=1)
total_projects = len(df)
total_anomalies = df['is_anomaly'].sum()
anomaly_percentage = (total_anomalies / total_projects * 100) if total_projects > 0 else 0
avg_cost_all = df['estprojectcost'].median()
avg_time_all = df['construction_time_days'].median()
# Filter data
filtered_df = df.copy()
if consolidated_filter == 'all_projects':
filtered_df = df.copy()
elif consolidated_filter == 'anomalies_all':
filtered_df = df[df['is_anomaly']]
elif consolidated_filter == 'critical_all':
if not df['is_anomaly'].empty:
filtered_df = df[df['is_anomaly']].nsmallest(int(total_anomalies * 0.1), 'anomaly_score', keep='first')
else:
filtered_df = pd.DataFrame()
elif consolidated_filter == 'anomalies_cost':
filtered_df = df[df['is_anomaly'] & df['anomaly_type'].str.contains('Extreme Cost', na=False)]
elif consolidated_filter == 'anomalies_time':
filtered_df = df[df['is_anomaly'] & df['anomaly_type'].str.contains('Extreme Time', na=False)]
elif consolidated_filter == 'anomalies_rapid':
filtered_df = df[df['is_anomaly'] & df['anomaly_type'].str.contains('Suspiciously Rapid Approval', na=False)]
elif consolidated_filter == 'anomalies_delay':
filtered_df = df[df['is_anomaly'] & df['anomaly_type'].str.contains('Excessive Delay', na=False)]
elif consolidated_filter == 'critical_cost':
if not df['is_anomaly'].empty:
critical_cost_anomalies = df[df['is_anomaly'] & df['anomaly_type'].str.contains('Extreme Cost', na=False)]
filtered_df = critical_cost_anomalies.nsmallest(int(len(critical_cost_anomalies) * 0.5), 'anomaly_score', keep='first')
else:
filtered_df = pd.DataFrame()
elif consolidated_filter == 'critical_time':
if not df['is_anomaly'].empty:
critical_time_anomalies = df[df['is_anomaly'] & df['anomaly_type'].str.contains('Extreme Time', na=False)]
filtered_df = critical_time_anomalies.nsmallest(int(len(critical_time_anomalies) * 0.5), 'anomaly_score', keep='first')
else:
filtered_df = pd.DataFrame()
# Enhanced Map
map_zoom = 10
if not filtered_df.empty:
min_lat, max_lat = filtered_df['latitude_perm'].min(), filtered_df['latitude_perm'].max()
min_lon, max_lon = filtered_df['longitude_perm'].min(), filtered_df['longitude_perm'].max()
lat_range = max_lat - min_lat
lon_range = max_lon - min_lon
if lat_range == 0: lat_range = 0.01
if lon_range == 0: lon_range = 0.01
if max(lat_range, abs(lon_range)) < 0.01:
map_zoom = 15
elif max(lat_range, abs(lon_range)) < 0.1:
map_zoom = 12
elif max(lat_range, abs(lon_range)) < 0.5:
map_zoom = 10
else:
map_zoom = 8
if consolidated_filter == 'all_projects':
color_column = 'is_anomaly'
color_map = {True: '#dc3545', False: '#007bff'}
title_suffix = f"All Projects: {len(filtered_df):,} records"
else:
color_column = 'anomaly_score'
color_map = None
title_suffix = f"Filtered View: {len(filtered_df):,} projects"
map_fig = px.scatter_mapbox(
filtered_df,
lat="latitude_perm", lon="longitude_perm",
color=color_column,
color_discrete_map=color_map,
color_continuous_scale= 'Turbo',
size="estprojectcost",
size_max=40,
zoom=map_zoom,
center={"lat": map_center_lat, "lon": map_center_lon},
mapbox_style="carto-positron",
title=f"π {title_suffix}",
hover_name="proposedworkdescription",
hover_data={
'anomaly_type': True,
'estprojectcost': ':$%,.0f',
'construction_time_days': ':.0f',
'cost_deviation_percent': ':.1f',
'time_deviation_percent': ':.1f',
'anomaly_score': ':.2f',
'workclass': True
}
)
map_fig.update_layout(margin={"r":0,"t":50,"l":0,"b":0}, coloraxis_showscale=False)
map_fig.update_traces(
hovertemplate="""
<b>%{hovertext}</b><br>
<b>Type:</b> %{customdata[0]}<br>
<b>Cost:</b> %{customdata[1]}<br>
<b>Duration:</b> %{customdata[2]} days<br>
<b>Cost Dev:</b> %{customdata[3]}%<br>
<b>Time Dev:</b> %{customdata[4]}%<br>
<b>Score:</b> %{customdata[5]}<br>
<b>Class:</b> %{customdata[6]}<br>
<extra></extra>
"""
)
else:
map_fig = px.scatter_mapbox(lat=[map_center_lat], lon=[map_center_lon], zoom=8, mapbox_style="outdoors",
title="No data found with current filters")
map_fig.update_layout(margin={"r":0,"t":50,"l":0,"b":0})
# Enhanced Anomaly Options with better labels
anomaly_options = []
if not df.empty and df['is_anomaly'].sum() > 0:
top_anomalies = df[df['is_anomaly']].nsmallest(20, 'anomaly_score')
for idx, row in top_anomalies.iterrows():
description = row.get('proposedworkdescription', 'No description')
if pd.isna(description):
description = 'No description'
anomaly_type = row.get('anomaly_type', 'General')
score = row.get('anomaly_score', 0)
cost = row.get('estprojectcost', 0)
applied_date = row.get('applieddate', pd.NaT)
date_str = applied_date.strftime('%Y-%m-%d') if pd.notna(applied_date) else 'Unknown'
label = f"π¨ Score: {score:.2f} | {anomaly_type} | ${cost:,.0f} | {date_str} | {str(description)[:40]}"
if len(str(description)) > 40:
label += "..."
anomaly_options.append({
'label': label,
'value': idx
})
else:
anomaly_options = [{'label': 'No anomalies detected with current sensitivity.', 'value': 'no_anomalies', 'disabled': True}]
# Enhanced Executive Summary
if not df['is_anomaly'].empty and df['is_anomaly'].sum() > 0:
critical_anomalies_count = df[df['is_anomaly']].nsmallest(int(total_anomalies * 0.1), 'anomaly_score', keep='first').shape[0]
avg_cost_anomaly = df[df['is_anomaly']]['estprojectcost'].mean()
avg_time_anomaly = df[df['is_anomaly']]['construction_time_days'].mean()
anomaly_type_counts = df[df['is_anomaly']]['anomaly_type'].value_counts()
most_common_anomaly = anomaly_type_counts.index[0] if not anomaly_type_counts.empty else 'N/A'
cost_increase_percentage = ((avg_cost_anomaly - avg_cost_all) / avg_cost_all * 100) if avg_cost_all > 0 else 0
time_increase_percentage = ((avg_time_anomaly - avg_time_all) / avg_time_all * 100) if avg_time_all > 0 else 0
executive_summary = dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H5([html.I(className="fas fa-chart-pie me-2"), "Total Projects"], className="text-primary"),
html.H2(f"{total_projects:,}", className="text-dark fw-bold"),
html.P("Projects analyzed", className="text-muted mb-0")
])
], className="shadow-sm h-100")
], md=2),
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H5([html.I(className="fas fa-exclamation-triangle me-2"), "Anomalies Detected"], className="text-warning"),
html.H2(f"{total_anomalies:,}", className="text-warning fw-bold"),
html.P(f"{anomaly_percentage:.1f}% of total", className="text-muted mb-0")
])
], className="shadow-sm h-100")
], md=2),
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H5([html.I(className="fas fa-fire me-2"), "Critical Cases"], className="text-danger"),
html.H2(f"{critical_anomalies_count:,}", className="text-danger fw-bold"),
html.P("Requiring immediate attention", className="text-muted mb-0")
])
], className="shadow-sm h-100")
], md=2),
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H5([html.I(className="fas fa-dollar-sign me-2"), "Avg Anomaly Cost"], className="text-success"),
html.H2(f"${avg_cost_anomaly:,.0f}", className="text-success fw-bold"),
html.P(f"{cost_increase_percentage:+.0f}% vs normal", className="text-muted mb-0")
])
], className="shadow-sm h-100")
], md=3),
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H5([html.I(className="fas fa-clock me-2"), "Common Type"], className="text-info"),
html.H6(f"{most_common_anomaly}", className="text-info fw-bold"),
html.P(f"{time_increase_percentage:+.0f}% time vs normal", className="text-muted mb-0")
])
], className="shadow-sm h-100")
], md=3)
], className="mb-3")
else:
executive_summary = dbc.Alert([
html.I(className="fas fa-info-circle me-2"),
"No anomalies detected with current sensitivity settings. Try adjusting the detection rate."
], color="info", className="text-center")
return executive_summary, map_fig, anomaly_options
@app.callback(
Output('anomaly-detail-panel', 'children'),
[Input('anomaly-selector', 'value')]
)
def update_anomaly_detail(selected_idx):
if not selected_idx or selected_idx == 'no_anomalies' or df.empty:
return dbc.Alert([
html.I(className="fas fa-info-circle me-2"),
"Select a flagged project from the dropdown to begin investigation."
], color="info", className="text-center")
try:
row = df.loc[selected_idx]
# Basic project information
project_info = dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-building me-2"),
html.Strong("Project Information")
]),
dbc.CardBody([
dbc.Row([
dbc.Col([
html.P([html.Strong("Description: "), row.get('proposedworkdescription', 'N/A')]),
html.P([html.Strong("Address: "), row.get('originaladdress', 'N/A')]),
html.P([html.Strong("Work Class: "), row.get('workclass', 'N/A')]),
html.P([html.Strong("Permit Class: "), row.get('permitclassmapped', 'N/A')])
], md=6),
dbc.Col([
html.P([html.Strong("Contractor: "), row.get('contractorcompanyname', 'N/A')]),
html.P([html.Strong("Status: "), row.get('statuscurrentmapped', 'N/A')]),
html.P([html.Strong("Applied Date: "),
row.get('applieddate').strftime('%Y-%m-%d') if pd.notna(row.get('applieddate')) else 'N/A']),
html.P([html.Strong("Issued Date: "),
row.get('issueddate').strftime('%Y-%m-%d') if pd.notna(row.get('issueddate')) else 'N/A'])
], md=6)
])
])
], className="mb-3")
# Anomaly metrics
anomaly_metrics = dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-exclamation-triangle me-2"),
html.Strong("Anomaly Analysis")
]),
dbc.CardBody([
dbc.Row([
dbc.Col([
html.H6("Financial Metrics", className="text-primary"),
html.P([html.Strong("Project Cost: "), f"${row.get('estprojectcost', 0):,.2f}"]),
html.P([html.Strong("Cost Deviation: "), f"{row.get('cost_deviation_percent', 0):.1f}%"]),
html.P([html.Strong("Workclass Avg Cost: "), f"${row.get('workclass_avg_cost', 0):,.2f}"]),
html.P([html.Strong("Cost per Day: "), f"${row.get('cost_per_day', 0):,.2f}"])
], md=6),
dbc.Col([
html.H6("Temporal Metrics", className="text-info"),
html.P([html.Strong("Construction Time: "), f"{row.get('construction_time_days', 0):.0f} days"]),
html.P([html.Strong("Time Deviation: "), f"{row.get('time_deviation_percent', 0):.1f}%"]),
html.P([html.Strong("Workclass Avg Time: "), f"{row.get('workclass_avg_time', 0):.0f} days"]),
html.P([html.Strong("Days to Expire: "), f"{row.get('days_to_expire', 0):.0f} days"])
], md=6)
])
])
], className="mb-3")
# Anomaly scoring and classification
scoring_card = dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-calculator me-2"),
html.Strong("Risk Assessment")
]),
dbc.CardBody([
dbc.Row([
dbc.Col([
html.H6("Anomaly Scoring", className="text-warning"),
html.P([html.Strong("Anomaly Score: "), f"{row.get('anomaly_score', 0):.3f}"]),
html.P([html.Strong("Risk Level: "),
dbc.Badge("HIGH RISK", color="danger" if row.get('anomaly_score', 0) < -0.3 else "warning")])
], md=6),
dbc.Col([
html.H6("Classification", className="text-danger"),
html.P([html.Strong("Anomaly Type: "), row.get('anomaly_type', 'N/A')]),
html.P([html.Strong("Requires Review: "),
dbc.Badge("YES", color="danger" if row.get('is_anomaly', False) else "success")])
], md=6)
])
])
])
# Investigation recommendations
recommendations = []
anomaly_type = row.get('anomaly_type', '')
if 'Extreme Cost' in anomaly_type:
recommendations.append("π Verify project cost estimates and compare with similar projects")
recommendations.append("π Review contractor pricing and potential cost inflation")
if 'Extreme Time' in anomaly_type:
recommendations.append("β° Investigate processing delays and approval bottlenecks")
recommendations.append("π Contact relevant departments for timeline clarification")
if 'Suspiciously Rapid Approval' in anomaly_type:
recommendations.append("π¨ Review approval process for potential irregularities")
recommendations.append("π₯ Verify all required inspections and approvals were completed")
if 'Excessive Delay' in anomaly_type:
recommendations.append("π
Investigate reasons for prolonged processing time")
recommendations.append("π’ Review departmental workload and resource allocation")
if not recommendations:
recommendations.append("π Conduct general review of project parameters")
recommendations.append("π Cross-reference with similar projects for patterns")
recommendations_card = dbc.Card([
dbc.CardHeader([
html.I(className="fas fa-clipboard-list me-2"),
html.Strong("Investigation Recommendations")
]),
dbc.CardBody([
html.Ul([html.Li(rec) for rec in recommendations])
])
], className="mt-3")
return [project_info, anomaly_metrics, scoring_card, recommendations_card]
except Exception as e:
return dbc.Alert([
html.I(className="fas fa-exclamation-triangle me-2"),
f"Error loading project details: {str(e)}"
], color="danger")