import solara
from mesa import Model, Agent
from mesa.space import ContinuousSpace
from mesa.datacollection import DataCollector
from mesa.visualization import SolaraViz, make_space_component, make_plot_component
import numpy as np
import random
# ---- Ship Agent ----
class Ship(Agent):
def __init__(self, model, base, speed, maintenance_start):
# Automatically assigns a unique_id and registers the agent with the model
super().__init__(model)
self.base = base
self.status = "transiting" # initial status: moving toward the area
self.days_in_area = 0
self.speed = speed
self.maintenance_start = maintenance_start
self.target_base = None
# Place the ship at its base position
base_pos = model.bases[base]["position"]
model.space.place_agent(self, base_pos)
# Set the destination as the area of interest
self.destination = (model.area_of_interest["x"], model.area_of_interest["y"])
def step(self):
if self.status == "transiting":
self.move_towards(self.destination)
if self.reached_destination(self.destination):
self.status = "in_area"
elif self.status == "in_area":
self.days_in_area += 1
if random.random() < self.maintenance_probability():
# Look for the closest available base (by Euclidean distance)
self.target_base = self.closest_base()
if self.target_base:
self.status = "transiting_back"
self.destination = self.model.bases[self.target_base]["position"]
# Reserve a maintenance slot by reducing capacity
self.model.bases[self.target_base]["capacity"] -= 1
else:
# If no base is available, remain in the area
self.status = "in_area"
elif self.status == "transiting_back":
self.move_towards(self.destination)
if self.reached_destination(self.destination):
self.status = "in_base"
# Maintenance complete; free the slot
self.model.bases[self.target_base]["capacity"] += 1
elif self.status == "in_base":
# After maintenance, there's a chance to redeploy
if random.random() < 0.1:
self.status = "transiting"
self.days_in_area = 0
self.destination = (self.model.area_of_interest["x"], self.model.area_of_interest["y"])
def maintenance_probability(self):
# Use a logistic function to ramp up the probability after maintenance_start days
if self.days_in_area > self.maintenance_start:
return 1 / (1 + np.exp(-0.1 * (self.days_in_area - self.maintenance_start - 50)))
return 0
def closest_base(self):
"""
Compute the Euclidean distance from the ship's current position to each base that
still has available capacity, and return the key of the closest one.
"""
available_bases = {
b: self.model.bases[b]
for b in self.model.bases
if self.model.bases[b]["capacity"] > 0
}
if available_bases:
return min(
available_bases,
key=lambda b: np.linalg.norm(
np.array(self.pos) - np.array(self.model.bases[b]["position"])
),
)
return None
def move_towards(self, target):
x, y = self.pos
tx, ty = target
dx, dy = tx - x, ty - y
dist = np.sqrt(dx ** 2 + dy ** 2)
if dist > 0:
step_x = self.speed * (dx / dist)
step_y = self.speed * (dy / dist)
new_pos = (x + step_x, y + step_y)
self.model.space.move_agent(self, new_pos)
def reached_destination(self, target):
return np.linalg.norm(np.array(self.pos) - np.array(target)) < self.speed
# ---- Model ----
class ShipDeploymentModel(Model):
def __init__(self, num_ships=40, bases=None, area_of_interest=None, speed=2, maintenance_start=60, seed=None):
super().__init__(seed=seed) # Mandatory in Mesa 3
self.num_ships = num_ships
self.bases = bases if bases else {
"Diego Garcia": {"position": (20, 20), "distance": 3500, "capacity": 10},
"Guam": {"position": (80, 80), "distance": 7000, "capacity": 10},
"Darwin": {"position": (40, 40), "distance": 5000, "capacity": 10},
}
self.area_of_interest = area_of_interest if area_of_interest else {"x": 50, "y": 50}
self.space = ContinuousSpace(100, 100, torus=False)
# Create Ship agents
for _ in range(num_ships):
base = random.choice(list(self.bases.keys()))
Ship(self, base, speed, maintenance_start)
# Data collection: count ships in different statuses
self.datacollector = DataCollector(
model_reporters={
"Ships in Area": lambda m: sum(1 for s in m.agents_by_type[Ship] if s.status == "in_area"),
"Ships Returning": lambda m: sum(1 for s in m.agents_by_type[Ship] if s.status == "transiting_back"),
"Ships in Base": lambda m: sum(1 for s in m.agents_by_type[Ship] if s.status == "in_base"),
}
)
self.running = True
def step(self):
# agent activation
self.agents_by_type[Ship].shuffle_do("step")
self.datacollector.collect(self)
# ---- Visualization ----
def agent_portrayal(agent):
if isinstance(agent, Ship):
if agent.status == "in_area":
color = "blue"
elif agent.status == "transiting_back":
color = "red"
else:
color = "green"
return {"Shape": "circle", "Color": color, "Filled": True, "r": 0.8}
return {}
# Plot component for live charts
ShipPlot = make_plot_component({
"Ships in Area": "tab:blue",
"Ships Returning": "tab:red",
"Ships in Base": "tab:green",
})
# Define model parameters (for UI inputs)
model_params = {
"seed": {
"type": "InputText",
"value": 42,
"label": "Random Seed",
},
"num_ships": {
"type": "SliderInt",
"value": 40,
"label": "Number of Ships",
"min": 10,
"max": 100,
"step": 1,
},
"speed": {
"type": "SliderFloat",
"value": 2,
"label": "Ship Speed",
"min": 1,
"max": 10,
"step": 0.5,
},
"maintenance_start": {
"type": "SliderInt",
"value": 60,
"label": "Maintenance Start (Days)",
"min": 30,
"max": 100,
"step": 5,
},
}
# Instantiate the model using the default parameter values (ignoring 'seed' here)
model_instance = ShipDeploymentModel(**{k: v["value"] for k, v in model_params.items() if k != "seed"})
# Wrap the model in a reactive container so that SolaraViz updates automatically
model = solara.reactive(model_instance)
# Initialize the visualization dashboard with space and plot components
page = SolaraViz(
model,
components=[
make_space_component(agent_portrayal),
ShipPlot,
],
model_params=model_params,
name="Blockade Model",
)
page