Py.Cafe

tvb.../

diegoblockade

Diego Blockade

DocsPricing
  • app.py
  • requirements.txt
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import solara
from mesa import Model, Agent
from mesa.space import ContinuousSpace
from mesa.datacollection import DataCollector
from mesa.visualization import SolaraViz, make_space_component, make_plot_component
import numpy as np
import random


# ---- Ship Agent ----
class Ship(Agent):
    def __init__(self, model, base, speed, maintenance_start):
        # Automatically assigns a unique_id and registers the agent with the model
        super().__init__(model)
        self.base = base
        self.status = "transiting"  # initial status: moving toward the area
        self.days_in_area = 0
        self.speed = speed
        self.maintenance_start = maintenance_start
        self.target_base = None

        # Place the ship at its base position
        base_pos = model.bases[base]["position"]
        model.space.place_agent(self, base_pos)
        # Set the destination as the area of interest
        self.destination = (model.area_of_interest["x"], model.area_of_interest["y"])

    def step(self):
        if self.status == "transiting":
            self.move_towards(self.destination)
            if self.reached_destination(self.destination):
                self.status = "in_area"

        elif self.status == "in_area":
            self.days_in_area += 1
            if random.random() < self.maintenance_probability():
                # Look for the closest available base (by Euclidean distance)
                self.target_base = self.closest_base()
                if self.target_base:
                    self.status = "transiting_back"
                    self.destination = self.model.bases[self.target_base]["position"]
                    # Reserve a maintenance slot by reducing capacity
                    self.model.bases[self.target_base]["capacity"] -= 1
                else:
                    # If no base is available, remain in the area
                    self.status = "in_area"

        elif self.status == "transiting_back":
            self.move_towards(self.destination)
            if self.reached_destination(self.destination):
                self.status = "in_base"
                # Maintenance complete; free the slot
                self.model.bases[self.target_base]["capacity"] += 1

        elif self.status == "in_base":
            # After maintenance, there's a chance to redeploy
            if random.random() < 0.1:
                self.status = "transiting"
                self.days_in_area = 0
                self.destination = (self.model.area_of_interest["x"], self.model.area_of_interest["y"])

    def maintenance_probability(self):
        # Use a logistic function to ramp up the probability after maintenance_start days
        if self.days_in_area > self.maintenance_start:
            return 1 / (1 + np.exp(-0.1 * (self.days_in_area - self.maintenance_start - 50)))
        return 0

    def closest_base(self):
        """
        Compute the Euclidean distance from the ship's current position to each base that
        still has available capacity, and return the key of the closest one.
        """
        available_bases = {
            b: self.model.bases[b]
            for b in self.model.bases
            if self.model.bases[b]["capacity"] > 0
        }
        if available_bases:
            return min(
                available_bases,
                key=lambda b: np.linalg.norm(
                    np.array(self.pos) - np.array(self.model.bases[b]["position"])
                ),
            )
        return None

    def move_towards(self, target):
        x, y = self.pos
        tx, ty = target
        dx, dy = tx - x, ty - y
        dist = np.sqrt(dx ** 2 + dy ** 2)
        if dist > 0:
            step_x = self.speed * (dx / dist)
            step_y = self.speed * (dy / dist)
            new_pos = (x + step_x, y + step_y)
            self.model.space.move_agent(self, new_pos)

    def reached_destination(self, target):
        return np.linalg.norm(np.array(self.pos) - np.array(target)) < self.speed


# ---- Model ----
class ShipDeploymentModel(Model):
    def __init__(self, num_ships=40, bases=None, area_of_interest=None, speed=2, maintenance_start=60, seed=None):
        super().__init__(seed=seed)  # Mandatory in Mesa 3
        self.num_ships = num_ships
        self.bases = bases if bases else {
            "Diego Garcia": {"position": (20, 20), "distance": 3500, "capacity": 10},
            "Guam": {"position": (80, 80), "distance": 7000, "capacity": 10},
            "Darwin": {"position": (40, 40), "distance": 5000, "capacity": 10},
        }
        self.area_of_interest = area_of_interest if area_of_interest else {"x": 50, "y": 50}
        self.space = ContinuousSpace(100, 100, torus=False)

        # Create Ship agents
        for _ in range(num_ships):
            base = random.choice(list(self.bases.keys()))
            Ship(self, base, speed, maintenance_start)

        # Data collection: count ships in different statuses
        self.datacollector = DataCollector(
            model_reporters={
                "Ships in Area": lambda m: sum(1 for s in m.agents_by_type[Ship] if s.status == "in_area"),
                "Ships Returning": lambda m: sum(1 for s in m.agents_by_type[Ship] if s.status == "transiting_back"),
                "Ships in Base": lambda m: sum(1 for s in m.agents_by_type[Ship] if s.status == "in_base"),
            }
        )
        self.running = True

    def step(self):
        # agent activation
        self.agents_by_type[Ship].shuffle_do("step")
        self.datacollector.collect(self)


# ---- Visualization ----
def agent_portrayal(agent):
    if isinstance(agent, Ship):
        if agent.status == "in_area":
            color = "blue"
        elif agent.status == "transiting_back":
            color = "red"
        else:
            color = "green"
        return {"Shape": "circle", "Color": color, "Filled": True, "r": 0.8}
    return {}


# Plot component for live charts
ShipPlot = make_plot_component({
    "Ships in Area": "tab:blue",
    "Ships Returning": "tab:red",
    "Ships in Base": "tab:green",
})

# Define model parameters (for UI inputs)
model_params = {
    "seed": {
        "type": "InputText",
        "value": 42,
        "label": "Random Seed",
    },
    "num_ships": {
        "type": "SliderInt",
        "value": 40,
        "label": "Number of Ships",
        "min": 10,
        "max": 100,
        "step": 1,
    },
    "speed": {
        "type": "SliderFloat",
        "value": 2,
        "label": "Ship Speed",
        "min": 1,
        "max": 10,
        "step": 0.5,
    },
    "maintenance_start": {
        "type": "SliderInt",
        "value": 60,
        "label": "Maintenance Start (Days)",
        "min": 30,
        "max": 100,
        "step": 5,
    },
}

# Instantiate the model using the default parameter values (ignoring 'seed' here)
model_instance = ShipDeploymentModel(**{k: v["value"] for k, v in model_params.items() if k != "seed"})
# Wrap the model in a reactive container so that SolaraViz updates automatically
model = solara.reactive(model_instance)

# Initialize the visualization dashboard with space and plot components
page = SolaraViz(
    model,
    components=[
        make_space_component(agent_portrayal),
        ShipPlot,
    ],
    model_params=model_params,
    name="Blockade Model",
)

page