# Generated by Copilot
import solara
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
from dataclasses import dataclass
from typing import Tuple, Optional
# --- Constants ---
EPSILON = 1e-15
DEFAULT_BIN_WIDTH = 1.0
MIN_BIN_WIDTH = 0.05
DAY_START_HOUR = 0.0
DAY_END_HOUR = 24.0
GRID_LINE_INTERVAL = 1.0
AXIS_PADDING_RATIO = 0.05
Y_PADDING_RATIO = 0.1
TRUE_CURVE_RESOLUTION = 1000
DEFAULT_MARKER_SIZE = 6
DEFAULT_LINE_WIDTH = 1.5
SMALL_MARKER_SIZE = 3
# --- Data generation ---
def generate_uneven_times_for_day(start_time: float, end_time: float,
mean_interval: float, interval_noise: float) -> np.ndarray:
"""Generate unevenly-spaced time samples for a single day."""
times = []
current_time = start_time + (np.random.uniform(-interval_noise, interval_noise) / 60.0)
while current_time < end_time + 3 * (interval_noise / 60.0):
times.append(current_time)
noisy_interval = mean_interval + np.random.uniform(-interval_noise, interval_noise)
current_time += noisy_interval / 60.0
return np.array(times)
def double_gaussian_function(x: np.ndarray, mu1: float, sigma1: float, amp1: float,
mu2: float, sigma2: float, amp2: float) -> np.ndarray:
"""Calculate double Gaussian function."""
gaussian1 = amp1 * np.exp(-((x - mu1) ** 2) / (2 * sigma1 ** 2))
gaussian2 = amp2 * np.exp(-((x - mu2) ** 2) / (2 * sigma2 ** 2))
return gaussian1 + gaussian2
# --- Caching ---
class DatasetCache:
"""Manages dataset caching with proper encapsulation."""
def __init__(self):
self._cache_key: Optional[Tuple] = None
self._cache_value: Optional[Tuple] = None
def get(self, key: Tuple) -> Optional[Tuple]:
"""Retrieve cached data if key matches."""
if self._cache_key == key and self._cache_value is not None:
return self._cache_value
return None
def set(self, key: Tuple, value: Tuple) -> None:
"""Store data in cache."""
self._cache_key = key
self._cache_value = value
def clear(self) -> None:
"""Clear the cache."""
self._cache_key = None
self._cache_value = None
# Global cache instance
_dataset_cache = DatasetCache()
# --- Binning utilities ---
def clean_binned_average(x: np.ndarray, y: np.ndarray,
bin_edges: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate binned averages, filtering out empty bins."""
hist, _ = np.histogram(x, bins=bin_edges)
sum_values, _ = np.histogram(x, bins=bin_edges, weights=y)
err_values, _ = np.histogram(x, bins=bin_edges, weights=y ** 2)
valid_bins = hist > 0
binned_averages = sum_values[valid_bins] / hist[valid_bins]
binned_errors = np.sqrt(err_values[valid_bins]) / hist[valid_bins]
return valid_bins, binned_averages, binned_errors
def forward_binned_average(times: np.ndarray, values: np.ndarray,
start_hour: float, end_hour: float,
bin_width: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate forward-aligned binned average."""
bin_edges = np.arange(start_hour, end_hour + bin_width + EPSILON, bin_width)
valid_bins, binned_averages, binned_errors = clean_binned_average(times, values, bin_edges)
binned_times = bin_edges[:-1][valid_bins]
return binned_times, binned_averages, binned_errors
def back_binned_average(times: np.ndarray, values: np.ndarray,
start_hour: float, end_hour: float,
bin_width: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate back-aligned binned average."""
bin_edges = np.arange(start_hour - bin_width, end_hour + EPSILON, bin_width)
valid_bins, binned_averages, binned_errors = clean_binned_average(times, values, bin_edges)
binned_times = bin_edges[1:][valid_bins]
return binned_times, binned_averages, binned_errors
def center_binned_average(times: np.ndarray, values: np.ndarray,
start_hour: float, end_hour: float,
bin_width: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate center-aligned binned average."""
bin_edges = np.arange(start_hour, end_hour + EPSILON, bin_width)
valid_bins, binned_averages, binned_errors = clean_binned_average(times, values, bin_edges)
binned_times = (bin_edges[:-1] + bin_edges[1:]) / 2
binned_times = binned_times[valid_bins]
return binned_times, binned_averages, binned_errors
def offset_center_binned_average(times: np.ndarray, values: np.ndarray,
start_hour: float, end_hour: float,
bin_width: float,
offset: Optional[float] = None) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate offset-center-aligned binned average.
Args:
times: Array of time values
values: Array of measured values
start_hour: Start of time range
end_hour: End of time range
bin_width: Width of each bin
offset: Offset for bin edges. If None, uses bin_width/2 (default behavior)
Returns:
Tuple of (binned_times, binned_averages, binned_errors)
"""
if offset is None:
offset = bin_width / 2.0
bin_edges = np.arange(start_hour - offset, end_hour + offset + EPSILON, bin_width)
valid_bins, binned_averages, binned_errors = clean_binned_average(times, values, bin_edges)
binned_times = (bin_edges[:-1] + bin_edges[1:]) / 2
binned_times = binned_times[valid_bins]
return binned_times, binned_averages, binned_errors
def moving_average(x: np.ndarray, y: np.ndarray, window_size: float) -> Tuple[np.ndarray, np.ndarray]:
"""Calculate moving average with specified window size."""
half_window = window_size / 2.0
ma_x = []
ma_y = []
for xi in x:
mask = (x >= xi - half_window) & (x <= xi + half_window)
if np.any(mask):
ma_x.append(xi)
ma_y.append(np.mean(y[mask]))
srt = np.argsort(ma_x)
return np.array(ma_x)[srt], np.array(ma_y)[srt]
# --- Plotting utilities ---
def show_bins(ax, bin_edges: np.ndarray, **kwargs):
"""Draw alternating background colors for bins."""
color = kwargs.pop("color", "lightgray")
color2 = kwargs.pop("color2", "white")
for i, edge in enumerate(bin_edges[:-1]):
ax.axvspan(edge, bin_edges[i + 1],
color=(color if i % 2 == 0 else color2), **kwargs)
def extend_left(x: np.ndarray, y: np.ndarray, width: float) -> Tuple[np.ndarray, np.ndarray]:
"""Extend arrays to the left for step plotting."""
x = np.asarray(x)
y = np.asarray(y)
if len(x) == 0:
return np.array([]), np.array([])
if len(x) == 1:
dx = width if width > 0 else 1.0
else:
dx = width if width > 0 else max(x[1] - x[0], 1e-6)
x_ext = np.insert(x, 0, x[0] - dx)
y_ext = np.insert(y, 0, y[0])
return x_ext, y_ext
def extend_right(x: np.ndarray, y: np.ndarray, width: float) -> Tuple[np.ndarray, np.ndarray]:
"""Extend arrays to the right for step plotting."""
x = np.asarray(x)
y = np.asarray(y)
if len(x) == 0:
return np.array([]), np.array([])
if len(x) == 1:
dx = width if width > 0 else 1.0
else:
dx = width if width > 0 else max(x[-1] - x[-2], 1e-6)
x_ext = np.append(x, x[-1] + dx)
y_ext = np.append(y, y[-1])
return x_ext, y_ext
def extend_both(x: np.ndarray, y: np.ndarray, width: float) -> Tuple[np.ndarray, np.ndarray]:
"""Extend arrays on both sides for step plotting."""
x = np.asarray(x)
y = np.asarray(y)
if len(x) == 0:
return np.array([]), np.array([])
if len(x) == 1:
dx = width / 2.0 if width > 0 else 0.5
else:
dx = width / 2.0 if width > 0 else max((x[1] - x[0]) / 2.0, 1e-6)
x_ext = np.insert(x, 0, x[0] - dx)
y_ext = np.insert(y, 0, y[0])
x_ext = np.append(x_ext, x[-1] + dx)
y_ext = np.append(y_ext, y[-1])
return x_ext, y_ext
def get_step_data(mode: str, times: np.ndarray, values: np.ndarray,
width: float) -> Tuple[np.ndarray, np.ndarray, str]:
"""Prepare data for step plotting based on mode."""
if mode == "forward":
x_ext, y_ext = extend_right(times, values, width)
where = "post"
elif mode == "back":
x_ext, y_ext = extend_left(times, values, width)
where = "pre"
else:
x_ext, y_ext = extend_both(times, values, width)
where = "mid"
return x_ext, y_ext, where
def calculate_axis_limits(all_times: np.ndarray, all_values: np.ndarray,
true_curve: np.ndarray, ma_y: np.ndarray,
fb_y: np.ndarray, bb_y: np.ndarray,
cb_y: np.ndarray, ob_y: np.ndarray,
bin_width: float, start_hour_val: float,
end_hour_val: float) -> Tuple[Tuple[float, float], Optional[Tuple[float, float]]]:
"""Calculate appropriate axis limits for plotting."""
# X-axis limits
if all_times.size:
xmin = float(all_times.min())
xmax = float(all_times.max())
if xmin == xmax:
pad_x = max(bin_width, 0.25)
else:
pad_x = max(AXIS_PADDING_RATIO * (xmax - xmin), bin_width * 0.5)
xlim = (xmin - pad_x, xmax + pad_x)
else:
start_val = min(start_hour_val, end_hour_val)
end_val = max(start_hour_val, end_hour_val)
pad_x = 2 * bin_width
xlim = (start_val - pad_x, end_val + pad_x)
# Y-axis limits
series_for_ylim = [all_values, true_curve, ma_y, fb_y, bb_y, cb_y, ob_y]
valid_series = [np.asarray(arr) for arr in series_for_ylim
if isinstance(arr, np.ndarray) and arr.size]
if valid_series:
ymin = min(arr.min() for arr in valid_series)
ymax = max(arr.max() for arr in valid_series)
if np.isfinite(ymin) and np.isfinite(ymax):
if ymax == ymin:
pad_y = max(abs(ymax) * Y_PADDING_RATIO, 1.0)
else:
pad_y = AXIS_PADDING_RATIO * (ymax - ymin)
ylim = (ymin - pad_y, ymax + pad_y)
else:
ylim = None
else:
ylim = None
return xlim, ylim
# --- Stage Configuration ---
@dataclass
class StageConfig:
"""Configuration for a learning stage."""
stage_number: int
bin_width: float = DEFAULT_BIN_WIDTH
show_raw_data: bool = True
show_true_curve: bool = False
show_forward: bool = False
show_back: bool = False
show_center: bool = False
show_offset_center: bool = False
show_moving_average: bool = False
show_bins_background: bool = False
show_offset_bins: bool = False
use_steps: bool = False
show_lines: bool = True
view_overlaid: bool = True
# Signal parameters
mu1: float = 9.25
sigma1: float = 0.4
amp1: float = 2.1
mu2: float = 17.0
sigma2: float = 1.0
amp2: float = 1.65
noise_level: float = 0.3
# Stage configurations as data
STAGE_CONFIGS = {
1: StageConfig(
stage_number=1,
show_raw_data=True,
show_true_curve=False,
show_bins_background=False,
use_steps=False,
mu1=9.25, sigma1=0.4, amp1=2.1,
mu2=17.0, sigma2=1.0, amp2=1.65,
noise_level=0.3,
),
2: StageConfig(
stage_number=2,
show_raw_data=True,
show_true_curve=True,
show_bins_background=False,
mu1=9.25, sigma1=0.4, amp1=2.1,
mu2=17.0, sigma2=1.0, amp2=1.65,
noise_level=0.3,
),
3: StageConfig(
stage_number=3,
show_raw_data=True,
show_forward=True,
show_back=True,
show_bins_background=True,
use_steps=False,
show_lines=False,
view_overlaid=False,
mu1=9.25, sigma1=0.4, amp1=2.1,
mu2=17.0, sigma2=1.0, amp2=1.65,
noise_level=0.3,
),
4: StageConfig(
stage_number=4,
show_raw_data=True,
show_true_curve=True,
show_bins_background=True,
view_overlaid=True,
mu1=9.25, sigma1=0.4, amp1=2.1,
mu2=17.0, sigma2=1.0, amp2=1.65,
noise_level=0.3,
),
5: StageConfig(
stage_number=5,
show_raw_data=True,
show_true_curve=True,
show_bins_background=True,
view_overlaid=True,
mu1=9.25, sigma1=0.4, amp1=2.1,
mu2=17.0, sigma2=1.0, amp2=1.65,
noise_level=0.3,
),
6: StageConfig(
stage_number=6,
show_raw_data=True,
show_true_curve=True,
show_bins_background=False,
view_overlaid=True,
mu1=9.25, sigma1=0.4, amp1=2.1,
mu2=17.0, sigma2=1.0, amp2=1.65,
noise_level=0.3,
),
}
def apply_stage_config(config: StageConfig, state_dict: dict) -> None:
"""Apply a stage configuration to reactive state variables."""
state_dict['learning_stage'].set(config.stage_number)
state_dict['bin_width'].set(config.bin_width)
state_dict['show_raw_data'].set(config.show_raw_data)
state_dict['show_true_curve'].set(config.show_true_curve)
state_dict['show_forward'].set(config.show_forward)
state_dict['show_back'].set(config.show_back)
state_dict['show_center'].set(config.show_center)
state_dict['show_offset_center'].set(config.show_offset_center)
state_dict['show_moving_average'].set(config.show_moving_average)
state_dict['show_bins_background'].set(config.show_bins_background)
state_dict['show_offset_bins'].set(config.show_offset_bins)
state_dict['use_steps'].set(config.use_steps)
state_dict['show_lines'].set(config.show_lines)
state_dict['view_overlaid'].set(config.view_overlaid)
# Reset signal parameters
mu1.set(config.mu1)
sigma1.set(config.sigma1)
amp1.set(config.amp1)
mu2.set(config.mu2)
sigma2.set(config.sigma2)
amp2.set(config.amp2)
noise_level.set(config.noise_level)
# --- Solara reactive state ---
# Double Gaussian parameters
mu1 = solara.reactive(9.25)
sigma1 = solara.reactive(0.4)
amp1 = solara.reactive(2.1)
mu2 = solara.reactive(17.0)
sigma2 = solara.reactive(1.0)
amp2 = solara.reactive(1.65)
noise_level = solara.reactive(0.3)
# Data/time parameters
num_days = solara.reactive(30)
start_hour = solara.reactive(8.0)
end_hour = solara.reactive(17.0)
mean_interval_minutes = solara.reactive(42)
interval_noise_minutes = solara.reactive(10)
# Binning
bin_width = solara.reactive(DEFAULT_BIN_WIDTH)
# Display toggles
show_raw_data = solara.reactive(True)
show_true_curve = solara.reactive(True)
show_moving_average = solara.reactive(False)
show_forward = solara.reactive(False)
show_back = solara.reactive(False)
show_center = solara.reactive(False)
show_offset_center = solara.reactive(False)
show_bins_background = solara.reactive(True)
show_offset_bins = solara.reactive(False)
use_steps = solara.reactive(False)
show_lines = solara.reactive(True)
# Layout mode: True = overlaid, False = stacked
view_overlaid = solara.reactive(True)
# Learning mode system
learning_mode = solara.reactive(True)
learning_stage = solara.reactive(1)
# State dictionary for easy access
STATE_DICT = {
'learning_stage': learning_stage,
'bin_width': bin_width,
'show_raw_data': show_raw_data,
'show_true_curve': show_true_curve,
'show_forward': show_forward,
'show_back': show_back,
'show_center': show_center,
'show_offset_center': show_offset_center,
'show_moving_average': show_moving_average,
'show_bins_background': show_bins_background,
'show_offset_bins': show_offset_bins,
'use_steps': use_steps,
'show_lines': show_lines,
'view_overlaid': view_overlaid,
}
def create_dataset_key() -> Tuple:
"""Create a cache key from current reactive values."""
return (
float(mu1.value),
float(sigma1.value),
float(amp1.value),
float(mu2.value),
float(sigma2.value),
float(amp2.value),
float(noise_level.value),
int(num_days.value),
float(start_hour.value),
float(end_hour.value),
int(mean_interval_minutes.value),
int(interval_noise_minutes.value),
)
def generate_dataset() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Generate unevenly-sampled multi-day dataset and true curve."""
key = create_dataset_key()
cached = _dataset_cache.get(key)
if cached is not None:
return cached
all_times = []
all_values = []
for _ in range(num_days.value):
daily_times = generate_uneven_times_for_day(
start_hour.value,
end_hour.value,
mean_interval_minutes.value,
interval_noise_minutes.value,
)
daily_values = double_gaussian_function(
daily_times,
mu1.value,
sigma1.value,
amp1.value,
mu2.value,
sigma2.value,
amp2.value,
)
daily_values_noisy = daily_values + np.random.normal(
0, noise_level.value, len(daily_values)
)
all_times.extend(daily_times)
all_values.extend(daily_values_noisy)
all_times = np.array(all_times)
all_values = np.array(all_values)
# True underlying curve on a dense grid
true_times = np.linspace(start_hour.value, end_hour.value, TRUE_CURVE_RESOLUTION)
true_curve = double_gaussian_function(
true_times,
mu1.value,
sigma1.value,
amp1.value,
mu2.value,
sigma2.value,
amp2.value,
)
result = (all_times, all_values, true_times, true_curve)
_dataset_cache.set(key, result)
return result
@dataclass
class BinnedData:
"""Container for all binned data calculations."""
forward_times: np.ndarray
forward_values: np.ndarray
forward_errors: np.ndarray
back_times: np.ndarray
back_values: np.ndarray
back_errors: np.ndarray
center_times: np.ndarray
center_values: np.ndarray
center_errors: np.ndarray
offset_times: np.ndarray
offset_values: np.ndarray
offset_errors: np.ndarray
ma_times: np.ndarray
ma_values: np.ndarray
def calculate_all_binning(all_times: np.ndarray, all_values: np.ndarray,
bin_width: float) -> BinnedData:
"""Calculate all binning methods at once."""
fb_t, fb_y, fb_e = forward_binned_average(
all_times, all_values, DAY_START_HOUR, DAY_END_HOUR, bin_width
)
bb_t, bb_y, bb_e = back_binned_average(
all_times, all_values, DAY_START_HOUR, DAY_END_HOUR, bin_width
)
cb_t, cb_y, cb_e = center_binned_average(
all_times, all_values, DAY_START_HOUR, DAY_END_HOUR, bin_width
)
ob_t, ob_y, ob_e = offset_center_binned_average(
all_times, all_values, DAY_START_HOUR, DAY_END_HOUR, bin_width
)
ma_x, ma_y = moving_average(all_times, all_values, window_size=bin_width)
return BinnedData(
forward_times=fb_t, forward_values=fb_y, forward_errors=fb_e,
back_times=bb_t, back_values=bb_y, back_errors=bb_e,
center_times=cb_t, center_values=cb_y, center_errors=cb_e,
offset_times=ob_t, offset_values=ob_y, offset_errors=ob_e,
ma_times=ma_x, ma_values=ma_y
)
def plot_binned_series(ax, times: np.ndarray, values: np.ndarray,
label: str, color: str, marker: str,
mode: str, width: float) -> None:
"""Plot a binned data series with optional step lines."""
times = np.asarray(times)
values = np.asarray(values)
if times.size == 0 or values.size == 0:
return
# Determine labels based on whether lines are shown
line_label = label
marker_label = "_nolegend_" if show_lines.value else label
# Always plot markers
ax.plot(times, values, marker, c=color, ms=DEFAULT_MARKER_SIZE,
linestyle="None", label=marker_label)
if not show_lines.value:
return
if use_steps.value and times.size >= 1:
x_ext, y_ext, where = get_step_data(mode, times, values, width)
if x_ext.size == 0:
return
ax.step(x_ext, y_ext, where=where, label=line_label,
c=color, lw=DEFAULT_LINE_WIDTH)
# Re-plot markers on top
ax.plot(times, values, marker, c=color, ms=DEFAULT_MARKER_SIZE,
linestyle="None", label="_nolegend_")
else:
ax.plot(times, values, f"{marker}-", label=line_label,
c=color, ms=DEFAULT_MARKER_SIZE, lw=DEFAULT_LINE_WIDTH)
def add_grid_to_axes(axes: list) -> None:
"""Add light grid lines to axes."""
for ax in axes:
ax.grid(True, which='major', axis='x', color='#e0e0e0',
linestyle='-', linewidth=0.5)
ax.xaxis.set_major_locator(MultipleLocator(GRID_LINE_INTERVAL))
def plot_common_data(ax, all_times: np.ndarray, all_values: np.ndarray,
true_times: np.ndarray, true_curve: np.ndarray,
ma_x: np.ndarray, ma_y: np.ndarray,
regular_bin_edges: np.ndarray, offset_bin_edges: np.ndarray,
highlight_offset_bins: bool = False) -> None:
"""Plot common data elements (raw data, true curve, etc.)."""
if show_bins_background.value and not highlight_offset_bins:
show_bins(ax, regular_bin_edges, color="lightgray", alpha=0.4)
if highlight_offset_bins and show_offset_center.value:
show_bins(ax, offset_bin_edges, color="#f3e5f5", color2="#ffffff", alpha=0.6)
if show_raw_data.value:
ax.plot(all_times, all_values, "o", label="Noisy data",
markersize=SMALL_MARKER_SIZE, c="0.7", mec="none")
if show_true_curve.value:
ax.plot(true_times, true_curve, label="True curve", c="k", lw=1)
if show_moving_average.value:
ax.plot(ma_x, ma_y, label="Moving average", c="0.3", lw=2)
def create_stacked_plots(binned: BinnedData, all_times: np.ndarray,
all_values: np.ndarray, true_times: np.ndarray,
true_curve: np.ndarray, regular_bin_edges: np.ndarray,
offset_bin_edges: np.ndarray, xlim: Tuple, ylim: Optional[Tuple],
bw: float):
"""Create stacked subplot layout."""
fig, axs = plt.subplots(4, 1, figsize=(8, 6.4), sharex=True)
axes = list(axs)
add_grid_to_axes(axes)
titles = ["Forward binned average", "Back binned average",
"Center binned average", "Offset center binned average"]
for ax, title in zip(axes, titles):
highlight_offset = bool(title.startswith("Offset") and show_offset_center.value)
plot_common_data(ax, all_times, all_values, true_times, true_curve,
binned.ma_times, binned.ma_values, regular_bin_edges,
offset_bin_edges, highlight_offset)
if show_forward.value and title.startswith("Forward"):
plot_binned_series(ax, binned.forward_times, binned.forward_values,
"Forward", "tab:blue", "s", "forward", bw)
if show_back.value and title.startswith("Back"):
plot_binned_series(ax, binned.back_times, binned.back_values,
"Back", "tab:green", "^", "back", bw)
if show_center.value and title.startswith("Center") and not title.startswith("Offset"):
plot_binned_series(ax, binned.center_times, binned.center_values,
"Center", "tab:red", "d", "center", bw)
if show_offset_center.value and title.startswith("Offset"):
plot_binned_series(ax, binned.offset_times, binned.offset_values,
"Offset center", "tab:purple", "P", "center", bw)
ax.set_ylabel("Measured value")
ax.set_title(title)
ax.legend(fontsize=8)
# Apply shared limits
for ax in axes:
ax.set_xlim(*xlim)
if ylim is not None:
ax.set_ylim(*ylim)
axes[-1].set_xlabel("Time of day (hours)")
fig.tight_layout()
return fig
def create_overlaid_plot(binned: BinnedData, all_times: np.ndarray,
all_values: np.ndarray, true_times: np.ndarray,
true_curve: np.ndarray, regular_bin_edges: np.ndarray,
offset_bin_edges: np.ndarray, xlim: Tuple, ylim: Optional[Tuple],
bw: float):
"""Create single overlaid plot layout."""
fig, ax = plt.subplots(1, 1, figsize=(8, 4), sharex=True)
add_grid_to_axes([ax])
highlight_offset = show_offset_bins.value and show_offset_center.value
plot_common_data(ax, all_times, all_values, true_times, true_curve,
binned.ma_times, binned.ma_values, regular_bin_edges,
offset_bin_edges, highlight_offset)
if show_forward.value:
plot_binned_series(ax, binned.forward_times, binned.forward_values,
"Forward", "tab:blue", "s", "forward", bw)
if show_back.value:
plot_binned_series(ax, binned.back_times, binned.back_values,
"Back", "tab:green", "^", "back", bw)
if show_center.value:
plot_binned_series(ax, binned.center_times, binned.center_values,
"Center", "tab:red", "d", "center", bw)
if show_offset_center.value:
plot_binned_series(ax, binned.offset_times, binned.offset_values,
"Offset center", "tab:purple", "P", "center", bw)
ax.set_xlim(*xlim)
if ylim is not None:
ax.set_ylim(*ylim)
ax.set_xlabel("Time of day (hours)")
ax.set_ylabel("Measured value")
ax.legend(fontsize=8)
fig.tight_layout()
return fig
def make_figure():
"""Create matplotlib figure based on current controls."""
all_times, all_values, true_times, true_curve = generate_dataset()
bw = max(bin_width.value, MIN_BIN_WIDTH)
binned = calculate_all_binning(all_times, all_values, bw)
regular_bin_edges = np.arange(DAY_START_HOUR, DAY_END_HOUR + bw + EPSILON, bw)
offset = bw / 2.0
offset_bin_edges = np.arange(
DAY_START_HOUR - offset,
DAY_END_HOUR + offset + EPSILON,
bw
)
xlim, ylim = calculate_axis_limits(
all_times, all_values, true_curve, binned.ma_values,
binned.forward_values, binned.back_values,
binned.center_values, binned.offset_values,
bw, start_hour.value, end_hour.value
)
if not view_overlaid.value:
return create_stacked_plots(binned, all_times, all_values, true_times,
true_curve, regular_bin_edges, offset_bin_edges,
xlim, ylim, bw)
else:
return create_overlaid_plot(binned, all_times, all_values, true_times,
true_curve, regular_bin_edges, offset_bin_edges,
xlim, ylim, bw)
# --- UI Components ---
@solara.component
def ControlsPanel():
"""Group signal, binning, and sampling controls."""
with solara.Column():
solara.Markdown("""### Controls""")
with solara.Column():
solara.Checkbox(label="Use single overlaid plot (uncheck for 3 stacked panels)",
value=view_overlaid)
with solara.Details("๐๏ธ Signal parameters (peak 1 & 2)"):
solara.SliderFloat("Peak 1 time (ฮผโ)", value=mu1, min=8.0, max=17.0, step=0.1)
solara.SliderFloat("Peak 1 width (ฯโ)", value=sigma1, min=0.1, max=2.0, step=0.1)
solara.SliderFloat("Peak 1 amplitude (Aโ)", value=amp1, min=0.0, max=3.0, step=0.1)
solara.SliderFloat("Peak 2 time (ฮผโ)", value=mu2, min=8.0, max=24.0, step=0.1)
solara.SliderFloat("Peak 2 width (ฯโ)", value=sigma2, min=0.1, max=3.0, step=0.1)
solara.SliderFloat("Peak 2 amplitude (Aโ)", value=amp2, min=0.0, max=3.0, step=0.1)
solara.SliderFloat("Noise level", value=noise_level, min=0.0, max=2.0, step=0.1)
solara.Markdown("""**Binning and averaging**""")
solara.SliderFloat("Bin width (hours)", value=bin_width, min=0.25, max=4.0, step=0.25)
with solara.Details("๐ Sampling details"):
solara.SliderFloat("Start hour", value=start_hour, min=0.0, max=12.0, step=0.5)
solara.SliderFloat("End hour", value=end_hour, min=12.0, max=24.0, step=0.5)
solara.SliderInt("Mean interval (min)", value=mean_interval_minutes,
min=10, max=120, step=5)
solara.SliderInt("Interval noise (min)", value=interval_noise_minutes,
min=0, max=60, step=5)
def hex_is_dark(hex_color: str) -> bool:
"""Check if a hex color is dark (for contrast calculation)."""
if not hex_color:
return False
value = hex_color.lstrip("#")
if len(value) == 3:
value = "".join(ch * 2 for ch in value)
if len(value) != 6:
return False
r, g, b = (int(value[i : i + 2], 16) for i in (0, 2, 4))
luminance = 0.299 * r + 0.587 * g + 0.114 * b
return luminance < 128
@solara.component
def ToggleChip(label: str, reactive_value: solara.Reactive[bool],
color: str, text_color: str = "#000000"):
"""Button-like chip for toggling series visibility."""
active = reactive_value.value
def on_click():
reactive_value.set(not reactive_value.value)
background_color = color if active else "#555555"
inactive_text = "#ffffff" if hex_is_dark(color) else color
label_color = text_color if active else inactive_text
outline = not active
style = {
"font-weight": "bold",
"color": label_color,
"min-width": "150px",
"border": "1px solid #ffffff" if active else f"1px solid {label_color}",
}
return solara.Button(
label=label,
color=background_color,
outlined=outline,
class_="ma-1",
style=style,
on_click=on_click,
)
@solara.component
def PresetButtons():
"""Quick preset configurations for common learning scenarios."""
def apply_stage(stage_num: int):
"""Apply configuration for specified stage."""
if stage_num in STAGE_CONFIGS:
apply_stage_config(STAGE_CONFIGS[stage_num], STATE_DICT)
elif stage_num == 7:
learning_stage.set(7)
learning_mode.set(False)
with solara.Column():
if learning_mode.value:
solara.Markdown("""**๐ฏ Quick Presets**""")
with solara.Row():
solara.Button("1. The Problem", on_click=lambda: apply_stage(1),
outlined=learning_stage.value != 1)
solara.Button("2. The Solution", on_click=lambda: apply_stage(2),
outlined=learning_stage.value != 2)
solara.Button("3. Forward vs Back", on_click=lambda: apply_stage(3),
outlined=learning_stage.value != 3)
with solara.Row():
solara.Button("4. Center Problem", on_click=lambda: apply_stage(4),
outlined=learning_stage.value != 4)
solara.Button("5. Offset-Center โญ", on_click=lambda: apply_stage(5),
outlined=learning_stage.value != 5)
solara.Button("6. Moving Average", on_click=lambda: apply_stage(6),
outlined=learning_stage.value != 6)
with solara.Row():
solara.Button("Skip to Free Exploration โ", on_click=lambda: apply_stage(7),
color="primary")
ContextualControls()
@solara.component
def PlotControlsPanel():
"""Controls for which series to show and basic view options."""
with solara.Column():
solara.Markdown("""### View & series""")
with solara.Row():
ToggleChip("Noisy data", show_raw_data, color="#e0e0e0", text_color="#000000")
ToggleChip("True curve", show_true_curve, color="#000000", text_color="#ffffff")
ToggleChip("Moving average", show_moving_average, color="#555555", text_color="#ffffff")
with solara.Row():
ToggleChip("Forward binned", show_forward, color="#bbdefb", text_color="#000000")
ToggleChip("Back binned", show_back, color="#c8e6c9", text_color="#000000")
ToggleChip("Center binned", show_center, color="#ffcdd2", text_color="#000000")
ToggleChip("Offset-center binned", show_offset_center, color="#d1c4e9", text_color="#000000")
with solara.Row():
ToggleChip("Bin backgrounds", show_bins_background, color="#eeeeee", text_color="#000000")
ToggleChip("Offset bins", show_offset_bins, color="#d9c4f0", text_color="#000000")
ToggleChip("Step lines", use_steps, color="#90a4ae", text_color="#000000")
ToggleChip("Show lines", show_lines, color="#78909c", text_color="#ffffff")
@solara.component
def CurrentLessonPanel():
"""Display stage-specific learning guidance."""
stage = learning_stage.value
stage_content = {
1: {
"title": "Stage 1: The Problem (Noise)",
"text": """The plot shows scattered gray points representing raw, noisy measurements.
\n**Goal**: Recognize that raw data is noisy and the true signal is often hidden.""",
"action": "Follow the steps in 'Try This!' to explore the noise."
},
2: {
"title": "Stage 2: The Solution (Binning)",
"text": """We can reduce noise by grouping points into time windows ('bins') and averaging them.
\n**Goal**: Understand how binning reduces noise to reveal the trend.""",
"action": "Follow the steps to see how binning works."
},
3: {
"title": "Stage 3: The Alignment Problem",
"text": """You're looking at **Forward** (blue squares) and **Back** (green triangles).
\nThey both calculate the **exact same averages**, but plot them at different times.
\n**Goal**: Discover that the *same* data can look different depending on where we plot the average.""",
"action": "Follow the steps in 'Try This!' to see the connection."
},
4: {
"title": "Stage 4: Symmetry & Lag",
"text": """Asymmetric bins (Forward/Back) shift features in time. Symmetric bins (Center) align correctly.
\n**Goal**: Understand **Time Lag** and **Labeling**. The problem with Forward/Back binning is often just a labeling issue (where we put the timestamp).""",
"action": "Follow the steps to investigate the lag effect."
},
5: {
"title": "Stage 5: Best Practice - Offset-Center โญ",
"text": """**Offset-Center** combines the accuracy of symmetric binning with the convenience of clean timestamps.
\n**Goal**: Motivate why Offset-Center is the standard for time-series data.""",
"action": "Follow the steps to see how timestamps are aligned."
},
6: {
"title": "Stage 6: Binning vs. Moving Average",
"text": """Compare **discrete** binning (one point per window) vs. **continuous** moving average (smooth sliding window).
\n**Goal**: Understand the difference between discrete and continuous smoothing methods.""",
"action": "Follow the steps to compare both methods."
},
7: {
"title": "Stage 7: Free Exploration",
"text": """You've completed the guided tour! All controls are now unlocked for you to experiment.
\nTry different scenarios, adjust the underlying signal, compare methods, and see if you can find situations where one method is clearly better than another.""",
"action": "Experiment freely! Try extreme bin widths, sparse data, or a signal with very sharp peaks."
}
}
content = stage_content.get(stage, stage_content[1])
with solara.Card(title=content["title"],
style="background-color: #2d2d2d; margin-bottom: 20px; padding: 20px;"):
solara.Markdown(f"{content['text']}")
solara.Info(content["action"], text=True)
@solara.component
def ExplanationsPanel():
"""Detailed explanations that adapt to current stage."""
stage = learning_stage.value
# Stage 1-2: Basic binning explanation
if stage <= 2:
text = r"""
### What is Binning?
**Binning** is a powerful technique for reducing noise and revealing underlying trends in data. The process involves three steps:
1. **Divide Time**: The total time range is divided into discrete, equal-sized windows called "bins".
2. **Group Data**: Each raw data point is assigned to the bin it falls into.
3. **Average**: The values of all points within a single bin are averaged to produce one representative value for that bin.
The **Bin Width** is a critical parameter. It controls the trade-off between noise reduction and signal detail.
- **Large Bins**: Produce a very smooth line, but may blur or hide narrow features in the data.
- **Small Bins**: Preserve more detail, but the resulting line will be noisier because fewer points are averaged in each bin.
"""
# Stage 3-5: Alignment details
elif stage in [3, 4, 5]:
text = r"""
### Bin Alignment: Where does the point go?
After calculating a bin's average, we must decide at what time to plot that point. This is the "alignment" of the bin.
- **Forward Alignment**: Plots the average at the **start** of the bin interval.
- *Interval*: $[t, t+w)$
- *Plots at*: $t$
- *Use Case*: Good for forecasting, as the point at time $t$ represents what happens *after* $t$.
- **Back Alignment**: Plots the average at the **end** of the bin interval.
- *Interval*: $(t-w, t]$
- *Plots at*: $t$
- *Use Case*: Good for summarizing past performance, as the point at time $t$ represents what happened *before* $t$.
- **Center Alignment**: Plots the average at the **midpoint** of the bin interval.
- *Interval*: $[t, t+w)$
- *Plots at*: $t + w/2$
- *Use Case*: Excellent for trend analysis, as the window of data is **symmetric** around the plotted point.
- **Offset-Center Alignment (Recommended)**: Shifts the entire bin definition to ensure the symmetric point lands on a convenient time (like the top of the hour).
- *Interval*: $[t - w/2, t + w/2)$
- *Plots at*: $t$
- *Use Case*: The most common and often most intuitive method for time-of-day analysis.
"""
# Stage 6: Moving average
elif stage == 6:
text = r"""
### Binning vs. Moving Average
Binning and Moving Average are both methods for smoothing data, but they are fundamentally different.
#### Binning
- **Discrete**: Divides time into a fixed, global grid of bins. Each data point belongs to exactly one bin.
- **Efficient**: The calculation is very fast (linear time, $O(N)$).
- **Output**: A set of discrete points. There will be gaps if a bin contains no data.
#### Moving Average
- **Continuous**: Creates a unique sliding window centered on *every single data point*.
- **Slower**: The calculation is much slower (quadratic time, $O(N^2)$) because each point requires its own calculation.
- **Output**: A continuous, smooth curve that follows the density of the input data.
**Choose binning** when you need statistically robust summaries for fixed time intervals (e.g., \"average sales per hour\").
**Choose moving average** when you want a smooth visualization of a trend without being tied to a fixed clock schedule.
"""
# Stage 7: Complete reference
else:
text = r"""
### Complete Binning Reference
A bin of width $w$ can be represented by the interval $[t_{start}, t_{end})$. The alignment determines where the average value is plotted.
| Method | Interval | Plots At | Symmetry |
|---|---|---|---|
| **Forward** | $[t, t+w)$ | $t$ | Asymmetric (Future) |
| **Back** | $(t-w, t]$ | $t$ | Asymmetric (Past) |
| **Center** | $[t, t+w)$ | $t + w/2$ | Symmetric |
| **Offset-Center** | $[t-w/2, t+w/2)$ | $t$ | Symmetric |
---
#### Moving Average
- For each point $p_i$ at time $t_i$, it averages all points $p_j$ where $t_j$ is in the window $(t_i - w/2, t_i + w/2)$.
---
#### Choosing a Bin Width ($w$)
- **Nyquist-Shannon Principle**: Your bin width should be, at most, half the duration of the smallest feature you want to resolve. If a peak lasts 2 hours, your bin width should be $\le$ 1 hour.
- **Noise Level**: The higher the noise, the larger the bin width needed to average it out.
- **Experimentation**: There is no single perfect answer. Start wide to find the general trend, then make the bins narrower to resolve more detail.
"""
solara.Markdown(text)
@solara.component
def ContextualControls():
"""Stage-relevant set of controls for the learning panel."""
stage = learning_stage.value
with solara.Card("Try This!", style="padding: 15px; margin-top: 20px;"):
if stage == 1:
solara.Markdown("**Step 1: Observation**")
solara.Markdown("Look at the gray data points. Can you see a pattern, or does it just look like a cloud?")
solara.Markdown("**Step 2: Reveal the Truth**")
solara.Checkbox(label="Show true curve", value=show_true_curve)
solara.Markdown("**Step 3: Noise Level**")
solara.Markdown("Increase the noise. At what point does the pattern disappear entirely?")
solara.SliderFloat("Noise level", value=noise_level, min=0.0, max=2.0, step=0.1)
elif stage == 2:
solara.Markdown("**Step 1: The Mechanism**")
solara.Markdown("Turn on the bin boundaries to see how we group points into 'buckets'.")
solara.Checkbox(label="Show bin boundaries", value=show_bins_background)
solara.Markdown("**Step 2: The Result**")
solara.Markdown("Calculate the average for each bucket.")
solara.Checkbox(label="Show binned data (Center)", value=show_center)
solara.Markdown("**Step 3: Trade-offs**")
solara.Markdown("Adjust the Bin Width. What happens to detail vs. smoothness?")
solara.SliderFloat("Bin width (hours)", value=bin_width, min=0.25, max=4.0, step=0.25)
solara.Checkbox(label="Show raw data", value=show_raw_data)
elif stage == 3:
solara.Markdown("**Step 1: Ambiguity**")
solara.Markdown("Here are two ways to bin the *exact same data*. Why don't they line up?")
solara.Markdown("**Step 2: The Connection**")
solara.Markdown("Turn on the step plot. Notice that they are actually the same 'steps', just labeled differently.")
def toggle_show_lines(value):
use_steps.set(value)
show_lines.set(value)
solara.Checkbox(label="Show lines (Step Plot)", value=show_lines, on_value=toggle_show_lines)
solara.Markdown("**Step 3: Reveal the Truth**")
solara.Markdown("Compare with true line.")
solara.Checkbox(label="Show true curve", value=show_true_curve)
elif stage == 4:
solara.Markdown("**Step 1: Forward Lag**")
solara.Markdown("Show Forward binning. Look at the peak. Does the blue square appear *before* (left) or *after* (right) the true peak?")
solara.Checkbox(label="Show Forward", value=show_forward)
solara.Markdown("**Step 2: Center Alignment**")
solara.Markdown("Now show Center binning. Does the red diamond align better with the peak?")
solara.Checkbox(label="Show Center", value=show_center)
solara.Markdown("**Step 3: The Deception**")
solara.Markdown("Turn on the Step Plot. Notice how the 'Forward' bin is just the 'Center' bin shifted? The problem is just where we put the dot (the timestamp).")
def toggle_show_lines_s4(value):
use_steps.set(value)
show_lines.set(value)
solara.Checkbox(label="Show lines (Step Plot)", value=show_lines, on_value=toggle_show_lines_s4)
elif stage == 5:
solara.Markdown("**Step 1: The Timestamp Problem**")
solara.Markdown("Show Center binning. Do the red diamonds land on the hour (9:00) or half-hour (9:30)?")
solara.Checkbox(label="Show Center", value=show_center)
solara.Markdown("**Step 2: The Solution**")
solara.Markdown("Switch to Offset-Center. This shifts the bin window so the average is plotted exactly on the hour.")
solara.Checkbox(label="Show Offset-Center", value=show_offset_center)
solara.Markdown("**Step 3: Visualizing the Shift**")
solara.Markdown("Turn on 'Offset Bins'. See how the purple shaded boxes are shifted?")
solara.Checkbox(label="Show offset bins", value=show_offset_bins)
elif stage == 6:
solara.Markdown("**Step 1: Discrete Steps**")
solara.Markdown("Binning gives one point per window. It creates a 'blocky' approximation.")
solara.Checkbox(label="Show Offset-Center", value=show_offset_center)
solara.Markdown("**Step 2: Continuous Flow**")
solara.Markdown("A Moving Average slides the window smoothly, calculating an average at every point.")
solara.Checkbox(label="Show Moving Average", value=show_moving_average)
solara.Markdown("**Step 3: Compare**")
solara.Markdown("Adjust width. Does the Moving Average show the same trends as the binned data?")
solara.SliderFloat("Bin/Window width (hours)", value=bin_width, min=0.25, max=4.0, step=0.25)
@solara.component
def Page():
"""Main application page."""
with solara.Column():
# Header with learning mode toggle
with solara.Row(style="align-items: center; margin-bottom: 10px;"):
solara.Markdown("""## Interactive Binning and Averaging Demo""")
with solara.Row(style="margin-left: auto;"):
if not learning_mode.value:
def return_to_learning():
learning_mode.set(True)
learning_stage.set(1)
solara.Button("Return to Learning Mode", on_click=return_to_learning, outlined=True)
# Show current lesson panel if in learning mode
if learning_mode.value:
CurrentLessonPanel()
# Two-column: Preset buttons (left) | Plot (right)
with solara.Row(margin=4):
with solara.Column():
PresetButtons()
with solara.Column(margin="1rem"):
fig = make_figure()
solara.FigureMatplotlib(fig)
# Two-column: Full controls (left) | Plot controls (right)
with solara.Row():
with solara.Column(style={"width": "300px", "flex-shrink": "0"}):
ControlsPanel()
with solara.Column(style={"flex": "1", "min-width": "0"}):
PlotControlsPanel()
else:
# Free exploration mode: Two-column layout
with solara.Row():
with solara.Column(style={"width": "300px", "flex-shrink": "0"}):
ControlsPanel()
with solara.Column(style={"flex": "1", "min-width": "0"}):
fig = make_figure()
solara.FigureMatplotlib(fig)
PlotControlsPanel()
# Stage-aware explanations (collapsible, full width)
with solara.Details("๐ Detailed Explanations"):
ExplanationsPanel()
# Footer
solara.Markdown(
"""---
Designed by John Lewis with AI assistance (Claude/Gemini/GPT), based on notebooks he made.""",
style="text-align: center; color: #888; margin-top: 2rem;"
)
if __name__ == "__main__":
Page()