import solara
from mesa import Model, Agent
from mesa.space import ContinuousSpace
from mesa.datacollection import DataCollector
from mesa.visualization import SolaraViz, make_space_component, make_plot_component
import numpy as np
import random
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from solara import FigureMatplotlib
import base64
import solara
from solara import HTML
import io
# the chokepoint reference point for all locations
chokepointx = 230
chokepointy = 200
from mesa.visualization.utils import update_counter
# ---- Model Space Creation ----
def render_custom_space_component(space_component):
fig = space_component.render()
return FigureMatplotlib(fig)
def draw_background(ax, bases):
for name, info in bases.items():
x, y = info["position"]
rect = patches.Rectangle((x - 30, y - 15), 60, 30, linewidth=1, edgecolor='limegreen', facecolor='none')
ax.add_patch(rect)
ax.text(x, y-30, name, fontsize=8, ha="center", va="center", color="limegreen")
def draw_strait(ax, chokepoint, label="Strait of Malacca", width=60, height=60):
x = chokepoint["x"]
y = chokepoint["y"]
rect = patches.Rectangle((x - width/2, y - height/2), width, height, linewidth=3, edgecolor='blue', facecolor='none')
ax.add_patch(rect)
ax.text(x, y - height/2 - 10, label, fontsize=9, ha="center", va="top", color="blue")
def draw_china(ax, chokepoint, label="China"):
x0 = chokepoint["x"] - 10
y0 = chokepoint["y"] + 85
x1 = x0 + 120
ax.hlines(y0, x0, x1, colors="red", linewidth=2)
ax.text((x0 + x1) / 2 - 40 , y0 - 5, label,
fontsize=9, ha="center", va="top", color="red")
def draw_exporter(ax, label="Exporter", width=20, height=20):
x = chokepointx - 100
y = chokepointx + 50
rect = patches.Rectangle((x - width/2, y - height/2), width, height, linewidth=1, edgecolor='black', facecolor='none')
ax.add_patch(rect)
ax.text(x, y - height/2 - 10, label, fontsize=9, ha="center", va="top", color="black")
def get_ships_on_station(model):
cx, cy = model.chokepoint["x"], model.chokepoint["y"]
return sum(
1 for a in model.space.agents
if isinstance(a, (SSC, LSC)) and abs(a.pos[0] - cx) < 30 and abs(a.pos[1] - cy) < 30
)
class CustomSpaceComponent:
def __init__(self, agent_portrayal, bases, space, chokepoint):
self.agent_portrayal = agent_portrayal
self.bases = bases
self.space = space
self.chokepoint = chokepoint
def render(self):
fig, ax = plt.subplots(figsize=(8, 8))
draw_background(ax, self.bases)
draw_strait(ax, self.chokepoint)
draw_china(ax, self.chokepoint)
draw_exporter(ax)
brigade_groups = {}
for brig in self.space.agents:
if isinstance(brig, MissileBrigade):
brigade_groups.setdefault(brig.pos, []).append(brig)
for pos, brigs in brigade_groups.items():
bx, by = pos
#draw range circle for each brigade
for brig in brigs:
if brig.missile_type in ("DF-21", "YJ-83"):
circle_color = "orange"
else:
circle_color = "purple"
radius = brig.range
range_circle = patches.Circle(
(bx, by),
radius,
linewidth=0.5,
edgecolor=circle_color,
facecolor="none",
linestyle="--",
alpha=0.3
)
ax.add_patch(range_circle)
dot = patches.Circle((bx, by), radius=7, facecolor='red', edgecolor='red', linewidth=0)
ax.add_patch(dot)
if len(brigs) == 2:
br_sorted = sorted(brigs, key=lambda b: b.missile_type)
top_y = by + 10
bottom_y = by - 10
for idx, brig in enumerate(br_sorted):
label_color = "orange" if brig.missile_type in ("DF-21", "YJ-83") else "purple"
y = top_y if idx == 0 else bottom_y
va = "bottom" if idx == 0 else "top"
ax.text(
bx, y,
f"{brig.missile_type}:{brig.current_missiles}",
fontsize=6, ha="center", va=va, color=label_color
)
else:
n = len(brigs)
spacing = 15
for i, brig in enumerate(brigs):
offset_x = (i - (n-1)/2) * spacing
ax.text(
bx + offset_x, by - 15,
f"{brig.missile_type}:{brig.current_missiles}",
fontsize=6, ha="center", va="top", color="red"
)
#count labels at base
for name, info in self.bases.items():
bx, by = info["position"]
ssc_count = sum(
1 for a in self.space.agents
if isinstance(a, SSC) and a.status == "in_base" and a.target_base == name
)
lsc_count = sum(
1 for a in self.space.agents
if isinstance(a, LSC) and a.status == "in_base" and a.target_base == name
)
ax.text(
bx, by - 45,
f"SSC:{ssc_count} LSC:{lsc_count}",
fontsize=6, ha="center", va="top", color="black"
)
#count labels at chokepoint
cx, cy = self.chokepoint["x"], self.chokepoint["y"]
ssc_cp = sum(
1 for a in self.space.agents
if isinstance(a, SSC) and a.status == "in_area"
)
lsc_cp = sum(
1 for a in self.space.agents
if isinstance(a, LSC) and a.status == "in_area"
)
ax.text(
cx, cy + 40,
f"SSC:{ssc_cp} LSC:{lsc_cp}",
fontsize=6, ha="center", va="bottom", color="blue"
)
# draw the dynamic agents
xs, ys, colors, sizes = [], [], [], []
for agent in self.space.agents:
portrayal = self.agent_portrayal(agent)
if isinstance(agent, (BaseAgent, MissileBrigade)):
continue
x, y = agent.pos
xs.append(x)
ys.append(y)
colors.append(portrayal.get("color", "black"))
size = portrayal.get("r", 20)
sizes.append(size)
ax.scatter(xs, ys, c=colors, s=sizes, edgecolors="k")
ax.set_xlim(0, self.space.width)
ax.set_ylim(0, self.space.height)
ax.set_aspect("equal")
ax.set_xticks([])
ax.set_yticks([])
for spine in ax.spines.values():
spine.set_visible(False)
# create legend (removed)
import matplotlib.lines as mlines
base_handle = mlines.Line2D([], [], marker='s', color='gray', markersize=10, linestyle='None', label='Base')
us_in_area = mlines.Line2D([], [], marker='o', color='tab:blue', markersize=10, linestyle='None', label='US Ship (On Station)')
us_transiting = mlines.Line2D([], [], marker='o', color='tab:purple', markersize=10, linestyle='None', label='US Ship (In Transit)')
us_in_base = mlines.Line2D([], [], marker='o', color='tab:green', markersize=10, linestyle='None', label='US Ship (In Base)')
trade_handle = mlines.Line2D([], [], marker='o', color='yellow', markersize=10, linestyle='None', label='Daily Chinese Trade')
#ax.legend(handles=[us_in_base, us_transiting, us_in_area, trade_handle],
#loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=2)
return fig
# ---- Ship Agent ----
class BaseShip(Agent):
def __init__(self, model, base, speed, maintenance_start):
super().__init__(model)
self.base = base
#knots to mph to graphical units per day
self.speed = speed * 1.151 * 2.4
self.maintenance_start = maintenance_start
#how long this individual ship will stay out before heading home
self.maintenance_target = random.uniform(
0.9 * self.maintenance_start,
1.1 * self.maintenance_start
)
self.target_base = None
self.released_early = False
#track time since the ship was last serviced
if random.random() < 0.8:
# start in area with some wear already
self.status = "in_area"
self.days_since_maintenance = random.randint(5, maintenance_start)
model.space.place_agent(self, (model.chokepoint["x"], model.chokepoint["y"]))
else:
# otherwise the ship is fresh (to change later)
self.status = "transiting"
self.days_since_maintenance = 0
base_pos = model.bases[base]["position"]
model.space.place_agent(self, base_pos)
# counter for maintenance duration
self.maintenance_remaining = 0
# deployed ships head for the chokepoint
self.destination = (model.chokepoint["x"], model.chokepoint["y"])
# time tracking
self.total_days_in_area = 0
self.total_days_in_base = 0
self.total_days_in_transit = 0
def step(self):
# increment daily counters for analysis
if self.status == "in_area":
self.total_days_in_area += 1
elif self.status == "in_base":
self.total_days_in_base += 1
# increment days in transit
if self.status in ["transiting", "transiting_back"]:
self.total_days_in_transit += 1
# add wear to endurance for each day the ship is out
if self.status not in ["in_base"]:
self.days_since_maintenance += 1
if self.status == "transiting":
self.move_towards(self.destination)
if self.reached_destination(self.destination):
self.status = "in_area"
elif self.status == "in_area":
# once we've exceeded this ship’s individual maintenance target, head back
if self.days_since_maintenance >= self.maintenance_target:
self.target_base = self.closest_base()
if self.target_base:
# reserve maintenance slot
self.model.bases[self.target_base]["capacity"] -= 1
self.status = "transiting_back"
self.destination = self.model.bases[self.target_base]["position"]
else:
self.status = "in_area"
elif self.status == "transiting_back":
self.move_towards(self.destination)
if self.reached_destination(self.destination):
self.released_early = False
self.status = "in_base"
self.maintenance_remaining = self.calculate_maintenance_time()
elif self.status == "in_base":
travel_time = self.base_to_chokepoint_travel_time()
cushion = 2
if self.maintenance_remaining > (travel_time + cushion):
self.maintenance_remaining -= 1
else:
if not self.released_early:
self.model.bases[self.target_base]["capacity"] += 1
self.released_early = True
self.maintenance_remaining -= 1
if self.maintenance_remaining <= 0:
self.days_since_maintenance = 0
self.status = "transiting"
self.destination = (self.model.chokepoint["x"], self.model.chokepoint["y"])
def calculate_maintenance_time(self):
# one day in port per 15 days out
return max(1, self.days_since_maintenance // 15)
def closest_base(self):
# choose closest base with excess capacity
available_bases = {
b: self.model.bases[b]
for b in self.model.bases
if self.model.bases[b]["capacity"] > 0
}
if available_bases:
return min(
available_bases,
key=lambda b: np.linalg.norm(
np.array(self.pos) - np.array(self.model.bases[b]["position"])
),
)
return None
def move_towards(self, target):
x, y = self.pos
tx, ty = target
dx, dy = tx - x, ty - y
dist = np.sqrt(dx**2 + dy**2)
if dist > 0:
if self.speed > dist:
factor = random.uniform(0.7, 0.8)
step = factor * dist
else:
step = self.speed
step_x = step * (dx / dist)
step_y = step * (dy / dist)
new_pos = (x + step_x, y + step_y)
self.model.space.move_agent(self, new_pos)
self.pos = new_pos
def reached_destination(self, target):
return np.linalg.norm(np.array(self.pos) - np.array(target)) < 0.3 * self.speed
def base_to_chokepoint_travel_time(self):
# compute travel time from the ship's target base to chokepoint
base_pos = np.array(self.model.bases[self.target_base]["position"])
cp_pos = np.array((self.model.chokepoint["x"], self.model.chokepoint["y"]))
distance = np.linalg.norm(base_pos - cp_pos)
return distance / self.speed
class SSC(BaseShip):
def __init__(self, model, base, speed, maintenance_start, interdiction_rate):
super().__init__(model, base, speed, maintenance_start)
self.interdiction_rate = interdiction_rate
class LSC(BaseShip):
def __init__(self, model, base, speed, maintenance_start, interdiction_rate):
super().__init__(model, base, speed, maintenance_start)
self.interdiction_rate = interdiction_rate
class BaseAgent(Agent):
def __init__(self, model, name, position, capacity):
super().__init__(model)
self.name = name
self.pos = position
self.capacity = capacity
def step(self):
pass
class TradeShip(Agent):
def __init__(self, model, speed):
super().__init__(model)
self.speed = speed
self.status = "en_route"
self.pos = (model.chokepoint["x"] - 100, model.chokepoint["y"] + 80)
self.original_trade_amount = 125
self.trade_size = self.original_trade_amount
self.has_updated = False
self.recorded = False
#initial destination is the chokepoint
self.destination = (model.chokepoint["x"], model.chokepoint["y"])
def step(self):
# stop once you get to china
if self.status == "in_china":
return
# otherwise move
x, y = self.pos
dest_x, dest_y = self.destination
dx = dest_x - x
dy = dest_y - y
dist = np.sqrt(dx**2 + dy**2)
if dist > 0:
step_x = self.speed * (dx / dist)
step_y = self.speed * (dy / dist)
new_x = x + step_x
new_y = y + step_y
new_x = min(max(new_x, 0), self.model.space.width)
new_y = min(max(new_y, 0), self.model.space.height)
new_pos = (new_x, new_y)
self.model.space.move_agent(self, new_pos)
self.pos = new_pos
# check if the trade is by the strait, and change the ship
cx, cy = self.model.chokepoint["x"], self.model.chokepoint["y"]
in_chokepoint = abs(self.pos[0] - cx) < 10 and abs(self.pos[1] - cy) < 10
if in_chokepoint and not self.has_updated:
total_trade = self.original_trade_amount
cx, cy = self.model.chokepoint["x"], self.model.chokepoint["y"]
interdiction_total = sum(
a.interdiction_rate for a in self.model.space.agents
if isinstance(a, (SSC, LSC)) and abs(a.pos[0] - cx) < 30 and abs(a.pos[1] - cy) < 30
)
interception_rate = min(1.0, interdiction_total / total_trade)
self.trade_size = self.original_trade_amount * (1 - interception_rate)
self.has_updated = True
# head to just above the China line
cx = self.model.chokepoint["x"] + 75
cy = self.model.chokepoint["y"] + 103
mid_x = cx + 25
self.destination = (mid_x, cy + 5)
cx_new, cy_new = self.destination
if abs(self.pos[0] - cx_new) < 10 and abs(self.pos[1] - cy_new) < 10:
# caclulate the interception rate
self.interception_rate = 1 - (self.trade_size / self.original_trade_amount)
self.status = "in_china"
self.speed = 0
if not self.recorded:
self.model.trade_interception_history.append(self.interception_rate)
self.recorded = True
class MissileBrigade(Agent):
#most of this code was replaced with salvo logic, but kept here for dependencies.
# bad coding on my part
def __init__(self, model, position, missiles, accuracy, range_, missile_type):
super().__init__(model)
self.pos = position
self.missiles = missiles
self.accuracy = accuracy
self.range = range_
self.missile_type = missile_type
# salvo stock & reload tracking
self.max_missiles = missiles
self.current_missiles = missiles
self.reload_counter = 0
def step(self):
m = self.model
if not self.model.enable_missiles:
return
if m.day <= m.missile_delay:
return
if self.current_missiles < self.max_missiles:
self.reload_counter += 1
if self.reload_counter >= m.reload_rate:
self.current_missiles += 1
self.reload_counter = 0
if self.current_missiles < self.max_missiles:
return
if self.missile_type == "DF-21":
min_r, max_r = 0, self.range
else:
min_r = m.df21_range
max_r = self.range
cp = np.array((m.chokepoint["x"], m.chokepoint["y"]))
candidates = []
for ship in m.space.agents:
if not isinstance(ship, (SSC, LSC)) or ship.status == "Dead":
continue
if ship in m.fired_ships:
continue
if ship.status not in ("transiting", "transiting_back"):
continue
if self.missile_type == "DF-26":
if not (isinstance(ship, LSC) and ship.status in ("in_base", "transiting", "transiting_back")):
continue
if np.linalg.norm(np.array(ship.pos) - cp) <= 30:
continue
d = np.linalg.norm(np.array(ship.pos) - np.array(self.pos))
if d < min_r or d > max_r:
continue
candidates.append((ship, d))
# separate and sort by distance
sscs = sorted([(s, d) for s, d in candidates if isinstance(s, SSC)], key=lambda x: x[1])
lscs = sorted([(s, d) for s, d in candidates if isinstance(s, LSC)], key=lambda x: x[1])
# fire on 2 SSCs or 1 LSC
fired = False
if len(sscs) >= 2:
for ship, d in sscs[:2]:
if random.random() < self.compute_dynamic_kill_prob(ship, d):
# release reserved slot if any
if ship.target_base is not None:
m.bases[ship.target_base]["capacity"] += 1
ship.target_base = None
ship.status = "Dead"
m.fired_ships.add(ship)
fired = True
elif len(lscs) >= 1:
ship, d = lscs[0]
if random.random() < self.compute_dynamic_kill_prob(ship, d):
# release reserved slot if any
if ship.target_base is not None:
m.bases[ship.target_base]["capacity"] += 1
ship.target_base = None
ship.status = "Dead"
m.fired_ships.add(ship)
fired = True
# if fired, lose missiles
if fired:
self.current_missiles = 0
def compute_dynamic_kill_prob(self, ship, distance):
max_range = self.range
base_prob = self.model.salvo_kill_prob_lsc if isinstance(ship, LSC) else self.model.salvo_kill_prob_ssc
return min(0.9, base_prob + 0.1 * (1 - distance / max_range))
def fire_salvo(self):
"""Launch one timed salvo against ships outside the chokepoint."""
m = self.model
# select salvo parameters
if self.missile_type in ("DF-21", "YJ-83"):
salvo_count = m.df21_salvo
mpssc = m.df21_mpssc
mplsc = m.df21_mplsc
min_r, max_r = 0, self.range
elif self.missile_type in ("DF-26", "YJ-12"):
salvo_count = m.df26_salvo
mpssc = m.df26_mpssc
mplsc = m.df26_mplsc
min_r = (m.df21_range + m.df26_range) / 2
max_r = self.range
else:
return
# chokepoint center
cp_x, cp_y = m.chokepoint["x"], m.chokepoint["y"]
# gather valid targets: SSC/LSC alive
candidates = []
for ship in m.space.agents:
if not isinstance(ship, (SSC, LSC)) or ship.status == "Dead":
continue
# only target transiting ships
if ship.status not in ("transiting", "transiting_back"):
continue
# DF-26 only targets LSCs
if self.missile_type == "DF-26":
if not (isinstance(ship, LSC)):
continue
# skip ships too close to chokepoint
if np.linalg.norm(np.array(ship.pos) - np.array((cp_x, cp_y))) <= 50:
continue
# distance to this brigade
d = np.linalg.norm(np.array(ship.pos) - np.array(self.pos))
if d < min_r or d > max_r:
continue
candidates.append((ship, d))
# separate and sort by distance
sscs = sorted([(s, d) for s, d in candidates if isinstance(s, SSC)], key=lambda x: x[1])
lscs = sorted([(s, d) for s, d in candidates if isinstance(s, LSC)], key=lambda x: x[1])
missiles_left = salvo_count
# engage SSCs first
max_ssc = missiles_left // mpssc
for ship, d in sscs[:max_ssc]:
if random.random() < self.compute_dynamic_kill_prob(ship, d):
# release maintenance slot if reserved
if ship.target_base is not None:
m.bases[ship.target_base]["capacity"] += 1
ship.target_base = None
ship.status = "Dead"
missiles_left -= mpssc
# then engage LSCs
max_lsc = missiles_left // mplsc
for ship, d in lscs[:max_lsc]:
if random.random() < self.compute_dynamic_kill_prob(ship, d):
# release maintenance slot if reserved
if ship.target_base is not None:
m.bases[ship.target_base]["capacity"] += 1
ship.target_base = None
ship.status = "Dead"
missiles_left -= mplsc
return missiles_left < salvo_count
# ---- Model ----
class ShipDeploymentModel(Model):
#lots of these parameters are no longer used (especially on missiles)
def __init__(self, num_ssc=23, num_lsc=23, ssc_speed=30, lsc_speed=20, ssc_maintenance_start=30, lsc_maintenance_start=50,
ssc_interdictions=1.5, lsc_interdictions=1.5, bases=None, chokepoint=None,
diego_capacity=20, guam_capacity=10, yokosuka_capacity=10, num_df21_missiles=8, df21_accuracy=0.7, df21_range=95,
num_df26_missiles=8, df26_accuracy=0.6, df26_range=217, status_hit_multipliers=None, lsc_defense_coeff=0.8,
missile_delay=20, reload_rate=5, enable_missiles=True, seed=None):
super().__init__(seed=seed)
# seed set
if seed is not None:
random.seed(seed)
np.random.seed(seed)
#store parameters for output
self.input_params = {
"seed": seed,
"num_ssc": num_ssc,
"ssc_speed": ssc_speed,
"ssc_maintenance_start": ssc_maintenance_start,
"ssc_interdictions": ssc_interdictions,
"num_lsc": num_lsc,
"lsc_speed": lsc_speed,
"lsc_maintenance_start": lsc_maintenance_start,
"lsc_interdictions": lsc_interdictions,
"diego_capacity": diego_capacity,
"guam_capacity": guam_capacity,
"yokosuka_capacity": yokosuka_capacity,
"enable_missiles": enable_missiles,
"missile_delay": missile_delay,
"reload_rate": reload_rate
}
self.space = ContinuousSpace(600, 500, torus=False)
# set chokepoint from master variable and bases relative to it
self.chokepoint = {"x": chokepointx, "y": chokepointy}
self.bases = {
"Diego Garcia": {
"position": (chokepointx - 197, chokepointy - 75),
"distance": 2200,
"capacity": diego_capacity
},
"Guam": {
"position": (chokepointx + 300, chokepointy + 74),
"distance": 3200,
"capacity": guam_capacity
},
"Yokosuka": {
"position": (chokepointx + 262, chokepointy + 226),
"distance": 3600,
"capacity": yokosuka_capacity
},
}
self.trade_interception_history = []
# daily interception tracking
self.daily_interception_rates = []
self._last_trade_history_len = 0
# track days for missile delay
self.missile_delay = missile_delay
self.day = 0
self.reload_rate = reload_rate
self.enable_missiles = enable_missiles
# missile types (no longer used)
self.num_df21_missiles = num_df21_missiles
self.df21_accuracy = df21_accuracy
self.df21_range = df21_range
self.num_df26_missiles = num_df26_missiles
self.df26_accuracy = df26_accuracy
self.df26_range = df26_range
# salvo settings
self.df21_salvo = 8
self.df21_mpssc = 4
self.df21_mplsc = 8
self.df26_salvo = 8
self.df26_mpssc = 4
self.df26_mplsc = 8
# per-salvo kill probabilities
self.salvo_kill_prob_ssc = 0.8
self.salvo_kill_prob_lsc = 0.8
# shared modifiers (no longer used)
self.status_hit_multipliers = status_hit_multipliers or {
"transiting": 0.5,
"in_area": 0.8,
"in_base": 1.0,
}
self.shiptype_hit_multipliers = {
"SSC": 1.0,
"LSC": lsc_defense_coeff,
}
# launch locations
launch_coords = [
(chokepointx + 54, chokepointy + 110),
(chokepointx + 118, chokepointy + 153),
(chokepointx + 149, chokepointy + 235),
]
missiles21_each = self.num_df21_missiles
missiles26_each = self.num_df26_missiles
self.brigade_agents = []
for pos in launch_coords:
# stacking both missile capabilities
b21 = MissileBrigade(self, pos, missiles21_each, self.df21_accuracy, self.df21_range, "DF-21")
self.brigade_agents.append(b21)
self.space.place_agent(b21, pos)
b26 = MissileBrigade(self, pos, missiles26_each, self.df26_accuracy, self.df26_range, "DF-26")
self.brigade_agents.append(b26)
self.space.place_agent(b26, pos)
# make the bases as agents
self.base_agents = []
for name, info in self.bases.items():
base_agent = BaseAgent(self, name, info["position"], info["capacity"])
self.base_agents.append(base_agent)
self.space.place_agent(base_agent, info["position"])
# make our ship agents (overrides other logic)
for _ in range(num_ssc):
base = random.choice(list(self.bases.keys()))
ship = SSC(self, base, ssc_speed, ssc_maintenance_start, ssc_interdictions)
r = random.random()
if r < 1/3:
# needs maintenance in base
ship.status = "in_base"
ship.target_base = base
ship.maintenance_remaining = random.randint(1, 3)
# reserve slot
self.bases[base]["capacity"] -= 1
# ensure positioned at base
pos = self.bases[base]["position"]
self.space.move_agent(ship, pos)
ship.pos = pos
elif r < 2/3:
# starts on station at chokepoint with wear
ship.status = "in_area"
ship.days_since_maintenance = random.randint(0, ssc_maintenance_start)
pos = (self.chokepoint["x"], self.chokepoint["y"])
self.space.move_agent(ship, pos)
ship.pos = pos
else:
# starts transiting from base fresh
ship.status = "transiting"
ship.days_since_maintenance = 0
ship.destination = (self.chokepoint["x"], self.chokepoint["y"])
# LSC ships with the same 1/3 distribution
for _ in range(num_lsc):
base = random.choice(list(self.bases.keys()))
ship = LSC(self, base, lsc_speed, lsc_maintenance_start, lsc_interdictions)
r = random.random()
if r < 1/3:
ship.status = "in_base"
ship.target_base = base
ship.maintenance_remaining = random.randint(1, 3)
self.bases[base]["capacity"] -= 1
pos = self.bases[base]["position"]
self.space.move_agent(ship, pos)
ship.pos = pos
elif r < 2/3:
ship.status = "in_area"
ship.days_since_maintenance = random.randint(0, lsc_maintenance_start)
pos = (self.chokepoint["x"], self.chokepoint["y"])
self.space.move_agent(ship, pos)
ship.pos = pos
else:
ship.status = "transiting"
ship.days_since_maintenance = 0
ship.destination = (self.chokepoint["x"], self.chokepoint["y"])
# count ships by status
self.datacollector = DataCollector(
model_reporters={
"Ships on Station": lambda m: sum(1 for s in m.space.agents if isinstance(s, (SSC, LSC)) and s.status == "in_area"),
"Ships Transiting": lambda m: sum(1 for s in m.space.agents if isinstance(s, (SSC, LSC)) and s.status in ["transiting", "transiting_back"]),
"Ships in Base": lambda m: sum(1 for s in m.space.agents if isinstance(s, (SSC, LSC)) and s.status == "in_base"),
"Dead Ships": lambda m: sum(
1 for s in m.space.agents
if isinstance(s, (SSC, LSC)) and s.status == "Dead"
),
"Trade Interception Rate": lambda m: np.mean(m.trade_interception_history) if m.trade_interception_history else 0,
"Daily Interception Rate": lambda m: m.daily_interception_rates[-1] if m.daily_interception_rates else 0,
"Avg SSC days on station": lambda m: np.mean([s.total_days_in_area for s in m.space.agents if isinstance(s, SSC)]),
"Avg SSC days in base": lambda m: np.mean([s.total_days_in_base for s in m.space.agents if isinstance(s, SSC)]),
"Avg LSC days on station": lambda m: np.mean([s.total_days_in_area for s in m.space.agents if isinstance(s, LSC)]),
"Avg LSC days in base": lambda m: np.mean([s.total_days_in_base for s in m.space.agents if isinstance(s, LSC)]),
"Avg SSC days in transit": lambda m: np.mean([s.total_days_in_transit for s in m.space.agents if isinstance(s, SSC)]),
"Avg LSC days in transit": lambda m: np.mean([s.total_days_in_transit for s in m.space.agents if isinstance(s, LSC)]),
}
)
self.running = True
def step(self):
# actual firing logic
# reset per-step fire tracking
self.fired_ships = set()
self.day += 1
# brigades fire once fully reloaded and after the delay
if self.enable_missiles and self.day > self.missile_delay:
for brig in self.brigade_agents:
if brig.current_missiles >= brig.max_missiles:
fired = brig.fire_salvo()
if fired:
brig.current_missiles = 0
self.agents.shuffle_do("step")
trade_speed = 20
trade_ship = TradeShip(self, trade_speed)
self.space.place_agent(trade_ship, trade_ship.pos)
self.agents.add(trade_ship)
# calculate today's interceptions since last step
new_rates = self.trade_interception_history[self._last_trade_history_len:]
if new_rates:
self.daily_interception_rates.append(np.mean(new_rates))
else:
self.daily_interception_rates.append(0)
self._last_trade_history_len = len(self.trade_interception_history)
self.datacollector.collect(self)
# ---- Visualization ----
def agent_portrayal(agent):
if isinstance(agent, BaseAgent):
# drawing the bases
return {
"shape": "rect",
"w": 50,
"h": 50,
"color": "green",
"layer": 0,
"text": agent.name,
"text_color": "black"
}
#changing ship color by status
elif isinstance(agent, (SSC, LSC)):
# determine radius by ship type
if isinstance(agent, SSC):
size = 10
else:
size = 30
# choose color based on status, gray if mission killed
if agent.status == "Dead":
color = "gray"
elif agent.status == "in_area":
color = "tab:blue"
elif agent.status in ["transiting", "transiting_back"]:
color = "tab:purple"
elif agent.status == "in_base":
color = "tab:green"
else:
color = "black"
return {"shape": "circle", "r": size, "color": color, "layer": 1}
elif isinstance(agent, TradeShip):
# radius is function of trade amount
return {"shape": "circle", "r": agent.trade_size, "color": "yellow", "layer": 1}
return {}
ShipPlot = make_plot_component({
"Ships on Station": "tab:blue",
"Ships Transiting": "tab:purple",
"Ships in Base": "tab:green",
"Dead Ships": "tab:gray",
})
TradeInterceptionPlot = make_plot_component({
"Trade Interception Rate": "tab:red",
"Daily Interception Rate": "tab:orange"
})
AvgTimePlot = make_plot_component({
"Avg SSC days on station": "tab:blue",
"Avg SSC days in base": "tab:green",
"Avg LSC days on station": "tab:purple",
"Avg LSC days in base": "tab:orange",
"Avg SSC days in transit": "tab:brown",
"Avg LSC days in transit": "tab:pink"
})
#our params and sliders, code from Mesa/Solara
model_params = {
"seed": {"type": "SliderInt",
"value": 30, "min": 1,
"max": 100, "step": 1,
"label": "Random Seed"},
"num_ssc": {"type": "SliderInt",
"value": 23, "min": 1,
"max": 100, "step": 1,
"label": "# Small Surface Combatants"},
"ssc_speed": {"type": "SliderFloat",
"value": 25.0, "min": 5.0,
"max": 60.0, "step": 1.0,
"label": "SSC Speed (knots)"},
"ssc_maintenance_start": {"type": "SliderInt",
"value": 30, "min": 1,
"max": 100, "step": 1,
"label": "SSC Port Call (Days)"},
"ssc_interdictions": {"type": "SliderFloat",
"value": 1.5, "min": 0.0,
"max": 10.0, "step": 0.25,
"label": "SSC Interdictions/Day"},
"num_lsc": {"type": "SliderInt",
"value": 23, "min": 1,
"max": 100, "step": 1,
"label": "# Large Surface Combatants"},
"lsc_speed": {"type": "SliderFloat",
"value": 15.0, "min": 5.0,
"max": 60.0, "step": 1.0,
"label": "LSC Speed (knots)"},
"lsc_maintenance_start": {"type": "SliderInt",
"value": 60, "min": 1,
"max": 100, "step": 1,
"label": "LSC Port Call (Days)"},
"lsc_interdictions": {"type": "SliderFloat",
"value": 1.5, "min": 0.0,
"max": 10.0, "step": 0.25,
"label": "LSC Interdictions/Day"},
"diego_capacity": {"type": "SliderInt",
"value": 3, "min": 0,
"max": 20, "step": 1,
"label": "Diego Garcia Capacity"},
"guam_capacity": {"type": "SliderInt",
"value": 10, "min": 0,
"max": 20, "step": 1,
"label": "Guam Capacity"},
"yokosuka_capacity": {"type": "SliderInt",
"value": 12, "min": 0,
"max": 20, "step": 1,
"label": "Yokosuka Capacity"},
"enable_missiles": {
"type": "Checkbox",
"value": False,
"label": "Enable Missile Attacks"},
"missile_delay": {"type": "SliderInt",
"value": 30, "min": 0,
"max": 50, "step": 1,
"label": "A2/AD Delay (days)"},
"reload_rate": {"type": "SliderInt",
"value": 4, "min": 1,
"max": 50, "step": 1,
"label": "Missile Reload Rate (days)"},
}
model_instance = ShipDeploymentModel(
seed=model_params["seed"]["value"],
**{k: v["value"] for k, v in model_params.items() if k != "seed"}
)
model = solara.reactive(model_instance)
space_component = CustomSpaceComponent(agent_portrayal, model_instance.bases, model_instance.space, model_instance.chokepoint)
# ---- Page UI ----
def MyLayout(model):
new_space_component = CustomSpaceComponent(agent_portrayal, model.bases, model.space, model.chokepoint)
model_vis = render_custom_space_component(new_space_component)
us_chart = ShipPlot(model)
trade_chart = TradeInterceptionPlot(model)
avg_time_chart = AvgTimePlot(model)
centered_vis = solara.Div(children=[model_vis], style={"margin": "0 auto", "width": "95%"})
charts_box1 = solara.HBox(children=[
solara.Card(children=[solara.Markdown("**Daily US Ship Status**"), us_chart]),
solara.Card(children=[solara.Markdown("**Average Ship Time in Status**"), avg_time_chart])
])
charts_box2 = solara.HBox(children=[
solara.Card(children=[solara.Markdown("**Overall Trade Interception Rate**"), trade_chart])
])
import pandas as pd
# extract model parameters (not really working)
params = model_instance.input_params
tracked_keys = [
"num_ssc", "ssc_speed", "ssc_maintenance_start", "ssc_interdictions",
"num_lsc", "lsc_speed", "lsc_maintenance_start", "lsc_interdictions",
"diego_capacity", "guam_capacity", "yokosuka_capacity",
"enable_missiles", "missile_delay", "reload_rate", "seed"
]
param_subset = {k: params[k] for k in tracked_keys if k in params}
param_df = pd.DataFrame([param_subset])
# get our data for download
model_df = model.datacollector.get_model_vars_dataframe()
# convert to csv
param_csv_bytes = param_df.to_csv(index=False).encode("utf-8")
model_csv_bytes = model_df.to_csv(index=True).encode("utf-8")
# layout components
inner_layout = solara.VBox(children=[
solara.Markdown("# Blockade Model Visualization"),
centered_vis,
charts_box1,
charts_box2
])
return solara.Div(
children=[inner_layout],
style={"width": "100vw", "display": "block", "margin": "0 auto"}
)
page = SolaraViz(
model,
components=[MyLayout],
model_params=model_params,
name="Diego Blockade",
)
#voila
page