import dash
from dash import dcc, html, Input, Output, State, ctx
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import time
# ------------------------
# Dataset generators
# ------------------------
def dataset_blobs():
np.random.seed(0)
c1 = np.random.randn(100, 2) + np.array([0, 0])
c2 = np.random.randn(100, 2) + np.array([5, 5])
c3 = np.random.randn(100, 2) + np.array([0, 5])
return np.vstack([c1, c2, c3])
def dataset_circles():
np.random.seed(1)
angles = np.random.rand(300) * 2 * np.pi
r = np.concatenate([np.ones(150)*2, np.ones(150)*5])
x = r * np.cos(angles)
y = r * np.sin(angles)
return np.vstack([x, y]).T
def dataset_uniform():
np.random.seed(2)
return np.random.rand(300, 2) * 10
def load_dataset(name):
if name == "blobs":
return dataset_blobs()
elif name == "circles":
return dataset_circles()
else:
return dataset_uniform()
# ------------------------
# K-means core
# ------------------------
def kmeans_step(points, centroids):
distances = np.linalg.norm(points[:, None] - centroids, axis=2)
labels = np.argmin(distances, axis=1)
new_centroids = []
for k in range(len(centroids)):
cluster_points = points[labels == k]
if len(cluster_points) > 0:
new_centroids.append(cluster_points.mean(axis=0))
else:
new_centroids.append(centroids[k])
return np.array(new_centroids), labels
def compute_sse(points, centroids, labels):
return np.sum((points - centroids[labels])**2)
# ------------------------
# Slide 2
# ------------------------
import numpy as np
def dataset_blobs_aleatori():
# Centres lleugerament aleatoris
centers = np.array([
[0, 0] + np.random.uniform(-0.5, 0.5, 2),
[5, 5] + np.random.uniform(-0.5, 0.5, 2),
[0, 5] + np.random.uniform(-0.5, 0.5, 2)
])
# Variància lleugerament diferent per cada clúster
scales = np.random.uniform(0.8, 1.2, 3)
c1 = np.random.randn(100, 2) * scales[0] + centers[0]
c2 = np.random.randn(100, 2) * scales[1] + centers[1]
c3 = np.random.randn(100, 2) * scales[2] + centers[2]
return np.vstack([c1, c2, c3])
def dataset_circles_aleatori():
angles = np.random.rand(300) * 2 * np.pi
# Radis amb una mica de soroll
r_inner = 2 + np.random.uniform(-0.3, 0.3)
r_outer = 5 + np.random.uniform(-0.5, 0.5)
r = np.concatenate([
np.ones(150) * r_inner,
np.ones(150) * r_outer
])
# Afegim una mica de soroll radial
r += np.random.normal(0, 0.1, 300)
x = r * np.cos(angles)
y = r * np.sin(angles)
return np.vstack([x, y]).T
def dataset_uniform_aleatori():
# Canvi lleu en rang i desplaçament
scale = np.random.uniform(8, 12)
shift = np.random.uniform(-1, 1, 2)
return np.random.rand(300, 2) * scale + shift
def load_dataset_aleatori(name):
if name == "blobs":
return dataset_blobs_aleatori()
elif name == "circles":
return dataset_circles_aleatori()
else:
return dataset_uniform_aleatori()
def run_kmeans_multi(points, k, n_init=5, max_iter=100):
best_sse = np.inf
best_centroids = None
best_labels = None
for _ in range(n_init):
rng = np.random.default_rng()
centroids = points[rng.choice(len(points), k, replace=False)]
for _ in range(max_iter):
distances = np.linalg.norm(points[:, None] - centroids, axis=2)
labels = np.argmin(distances, axis=1)
new_centroids = []
for i in range(k):
cluster_points = points[labels == i]
if len(cluster_points) > 0:
new_centroids.append(cluster_points.mean(axis=0))
else:
new_centroids.append(centroids[i])
new_centroids = np.array(new_centroids)
if np.allclose(new_centroids, centroids):
break
centroids = new_centroids
sse = np.sum((points - centroids[labels])**2)
if sse < best_sse:
best_sse = sse
best_centroids = centroids
best_labels = labels
return best_centroids, best_labels, best_sse
# ------------------------
# Inicialització slide 1
# ------------------------
colors = px.colors.qualitative.Plotly
initial_points = dataset_blobs()
initial_k = 3
initial_centroids = initial_points[np.random.choice(len(initial_points), initial_k, replace=False)]
initial_state = {
"points": initial_points.tolist(),
"centroids": initial_centroids.tolist(),
"labels": None,
"iter": 0,
"sse_history": [],
"centroid_history": [initial_centroids.tolist()],
"running": False
}
# ------------------------
# Dash App Layout
# ------------------------
external_stylesheets = [
'https://codepen.io/chriddyp/pen/bWLwgP.css'
]
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div([
html.Div([
html.H1("K-means"),
html.P("Explora el funcionament de l'algoritme K-means"),
# Gràfics
html.Div([
html.Div([dcc.Graph(id='plot-kmeans')], className="eight columns"),
html.Div([dcc.Graph(id='sse-plot')], className="four columns"),
], className="row"),
# Controls
html.Div([
html.Div('Dataset', className="two columns", style={'text-align': 'right'}),
html.Div([
dcc.Dropdown(
id='dataset-dropdown',
options=[
{'label': 'Blobs', 'value': 'blobs'},
{'label': 'Cercles', 'value': 'circles'},
{'label': 'Uniforme', 'value': 'uniform'}
],
value='blobs'
)
], className="four columns"),
html.Div('k', className="two columns", style={'text-align': 'right'}),
html.Div([
dcc.Slider(
id='k-slider',
min=2,
max=6,
step=1,
value=3,
marks={i: str(i) for i in range(2, 7)},
tooltip={"always_visible": True}
)
], className="four columns"),
], className="row"),
html.Br(),
# Botons
html.Div([
html.Div([
html.Button("Step", id="step-btn", n_clicks=0),
html.Button("Reset", id="reset-btn", n_clicks=0),
html.Button("Run / Stop", id="run-btn", n_clicks=0),
], className="eight columns"),
html.Div([
html.Div(id="iteration-label"),
], className="four columns"),
], className="row"),
dcc.Interval(id="interval", interval=800, n_intervals=0, disabled=True),
dcc.Store(id='state', data=initial_state)
], className="slide"),
html.Div([
html.H1("Visualització del colze (K-means)"),
html.P("Observa com varia l'error (SSE) en funció de k."),
html.Div([
html.Div([dcc.Graph(id='elbow-scatter')], className="eight columns"),
html.Div([dcc.Graph(id='elbow-plot')], className="four columns"),
], className="row"),
html.Div([
html.Div('Dataset', className="one column", style={'text-align': 'right'}),
html.Div([
dcc.Dropdown(
id='elbow-dataset',
options=[
{'label': 'Blobs', 'value': 'blobs'},
{'label': 'Cercles', 'value': 'circles'},
{'label': 'Uniforme', 'value': 'uniform'}
],
value='blobs'
)
], className="three columns"),
html.Div('k', className="one column", style={'text-align': 'right'}),
html.Div([
dcc.Slider(
id='elbow-k',
min=1,
max=10,
step=1,
value=3,
marks={i: str(i) for i in range(1, 11)},
tooltip={"always_visible": True}
)
], className="three columns"),
html.Div('N.reinicis', className="one column", style={'text-align': 'right'}),
html.Div([
dcc.Slider(
id='elbow-ninit',
min=1,
max=20,
step=1,
value=5,
marks={i: str(i) for i in [1, 5, 10, 15, 20]},
tooltip={"always_visible": True}
)
], className="three columns"),
], className="row"),
dcc.Store(id='elbow-dataset-store'),
dcc.Store(id='elbow-sse-store')
], className="slide")
])
# ------------------------
# Reset
# ------------------------
@app.callback(
Output("state", "data"),
Input("reset-btn", "n_clicks"),
State("dataset-dropdown", "value"),
State("k-slider", "value"),
prevent_initial_call=True
)
def reset(n, dataset, k):
points = load_dataset(dataset)
np.random.seed(int(time.time() * 1000)% (2**32))
centroids = points[np.random.choice(len(points), k, replace=False)]
return {
"points": points.tolist(),
"centroids": centroids.tolist(),
"labels": None,
"iter": 0,
"sse_history": [],
"centroid_history": [centroids.tolist()],
"running": False
}
# ------------------------
# Toggle Run
# ------------------------
@app.callback(
Output("interval", "disabled"),
Output("state", "data", allow_duplicate=True),
Input("run-btn", "n_clicks"),
State("state", "data"),
prevent_initial_call=True
)
def toggle_run(n, state):
running = not state["running"]
state["running"] = running
return (not running), state
# ------------------------
# Step (manual o automàtic)
# ------------------------
@app.callback(
Output("state", "data", allow_duplicate=True),
Input("step-btn", "n_clicks"),
Input("interval", "n_intervals"),
State("state", "data"),
prevent_initial_call=True
)
def step(step_clicks, interval_ticks, state):
trigger = ctx.triggered_id
if trigger == "interval" and not state["running"]:
return dash.no_update
points = np.array(state["points"])
centroids = np.array(state["centroids"])
new_centroids, labels = kmeans_step(points, centroids)
sse = compute_sse(points, new_centroids, labels)
return {
"points": state["points"],
"centroids": new_centroids.tolist(),
"labels": labels.tolist(),
"iter": state["iter"] + 1,
"sse_history": state["sse_history"] + [sse],
"centroid_history": state["centroid_history"] + [new_centroids.tolist()],
"running": state["running"]
}
# ------------------------
# Plot
# ------------------------
@app.callback(
Output("plot-kmeans", "figure"),
Output("sse-plot", "figure"),
Output("iteration-label", "children"),
Input("state", "data")
)
def update_plot(state):
points = np.array(state["points"])
centroids = np.array(state["centroids"])
labels = state["labels"]
# trajectòria centroides
history = state["centroid_history"]
k = len(history[0])
# Scatter
fig = go.Figure()
if labels is None:
fig.add_scatter(x=points[:,0],
y=points[:,1],
mode='markers',
name='Punts')
else:
for i in range(k):
ids = np.where(np.array(labels) == i)[0]
fig.add_scatter(x=points[ids,0],
y=points[ids,1],
mode='markers',
marker=dict(color=colors[i]),
name=f'Punts C{i}')
"""if labels is None:
fig = px.scatter(x=points[:,0], y=points[:,1])
else:
fig = px.scatter(x=points[:,0],
y=points[:,1],
color=[str(l) for l in labels],
color_discrete_sequence=colors)"""
# centroides actuals
#for i in range(len(centroids)):
# fig.add_scatter(
# x=centroids[i,0],
# y=centroids[i,1],
# mode='markers',
# marker=dict(size=15, symbol='x', color=colors[i]),
# name=f'centroide {i}'
# )
fig.add_scatter(
x=centroids[:,0],
y=centroids[:,1],
mode='markers',
marker=dict(size=15, symbol='x'),
name='Centroides'
)
if labels is not None:
for i in range(k):
traj = np.array([h[i] for h in history])
fig.add_scatter(
x=traj[:,0],
y=traj[:,1],
mode='lines',
line=dict(color=colors[i]),
name=f'Camí m{i}'
)
# SSE plot
sse_fig = go.Figure()
if len(state["sse_history"]) > 0:
sse_fig.add_scatter(
y=state["sse_history"],
mode='lines+markers',
name='SSE'
)
sse_fig.update_layout(
title="Convergència (SSE)",
xaxis_title="Iteració",
yaxis_title="SSE"
)
return fig, sse_fig, f"Iteració: {state['iter']}"
#######################################
############# Slide 2 #############
#######################################
@app.callback(
Output("elbow-dataset-store", "data"),
Input("elbow-dataset", "value")
)
def update_dataset(dataset_name):
points = load_dataset_aleatori(dataset_name)
return points.tolist()
@app.callback(
Output("elbow-sse-store", "data"),
Input("elbow-dataset-store", "data"),
Input("elbow-ninit", "value")
)
def compute_elbow_sse(dataset_points, n_init):
points = np.array(dataset_points)
max_k = 10
best_sse = []
mean_sse = []
min_sse = []
max_sse = []
for k in range(1, max_k + 1):
sse_runs = []
for _ in range(n_init):
_, _, sse = run_kmeans_multi(points, k, n_init=1)
sse_runs.append(sse)
sse_runs = np.array(sse_runs)
best_sse.append(np.min(sse_runs))
mean_sse.append(np.mean(sse_runs))
min_sse.append(np.min(sse_runs))
max_sse.append(np.max(sse_runs))
return {
"best_sse": best_sse,
"mean_sse": mean_sse,
"min_sse": min_sse,
"max_sse": max_sse
}
@app.callback(
Output("elbow-scatter", "figure"),
Output("elbow-plot", "figure"),
# Input("elbow-dataset", "value"),
Input("elbow-dataset-store", "data"),
Input("elbow-sse-store", "data"),
Input("elbow-k", "value"),
Input("elbow-ninit", "value")
)
def update_elbow(dataset_points, sse_data, k_selected, n_init):
#def update_elbow(dataset_name, k_selected, n_init):
#points = load_dataset_aleatori(dataset_name)
points = np.array(dataset_points) # ja no es regenera cada cop
colors = px.colors.qualitative.Plotly
# -------- Scatter (millor run) --------
centroids, labels, _ = run_kmeans_multi(points, k_selected, n_init=n_init)
# Idealment, aquest pas anterior no caldria tornar-ho a executar.
scatter_fig = px.scatter(
x=points[:,0],
y=points[:,1],
color=[str(l) for l in labels],
color_discrete_sequence=colors
)
scatter_fig.add_scatter(
x=centroids[:,0],
y=centroids[:,1],
mode='markers',
marker=dict(size=15, symbol='x'),
name=f'centroides'
)
scatter_fig.update_layout(title=f"K-means (millor de {n_init} runs) amb k = {k_selected}")
# -------- Elbow multi-run --------
max_k = 10
"""best_sse = []
mean_sse = []
min_sse = []
max_sse = []
for k in range(1, max_k + 1):
sse_runs = []
for _ in range(n_init):
_, _, sse = run_kmeans_multi(points, k, n_init=1)
sse_runs.append(sse)
sse_runs = np.array(sse_runs)
best_sse.append(np.min(sse_runs))
mean_sse.append(np.mean(sse_runs))
min_sse.append(np.min(sse_runs))
max_sse.append(np.max(sse_runs))"""
elbow_fig = go.Figure()
# --- Banda d'incertesa ---
elbow_fig.add_scatter(
x=list(range(1, max_k + 1)),
y=sse_data['max_sse'],#max_sse,
mode='lines',
line=dict(width=0),
showlegend=False
)
elbow_fig.add_scatter(
x=list(range(1, max_k + 1)),
y=sse_data['min_sse'],#min_sse,
mode='lines',
fill='tonexty',
name='Min–Max SSE'
)
# --- Mitjana ---
elbow_fig.add_scatter(
x=list(range(1, max_k + 1)),
y=sse_data['mean_sse'],#mean_sse,
mode='lines+markers',
name='SSE mitjà',
line=dict(dash='dash')
)
# --- Millor ---
elbow_fig.add_scatter(
x=list(range(1, max_k + 1)),
y=sse_data['best_sse'],#best_sse,
mode='lines+markers',
name='Millor SSE'
)
# --- Punt seleccionat ---
elbow_fig.add_scatter(
x=[k_selected],
y=[sse_data['best_sse'][k_selected - 1]],#[best_sse[k_selected - 1]],
mode='markers',
marker=dict(size=12),
name='k actual'
)
elbow_fig.update_layout(
title="Colze",
xaxis_title="k",
yaxis_title="SSE"
)
return scatter_fig, elbow_fig
# ------------------------
# Run
# ------------------------
if __name__ == '__main__':
app.run(debug=True)