from shapely.geometry import Point
import mesa
import mesa_geo as mg
import mesa_geo.visualization as mgv # the warning that appears from Solara is fixed in Mesa 3.0 you can install the pre-release with pip install -U --pre-mesa
import warnings
warnings.filterwarnings("ignore") # suppress solara warning; #TODO remove when Mesa-Geo updates for Mesa 3.0
# Functions needed for datacollector
def get_infected_count(model):
return model.counts["infected"]
def get_susceptible_count(model):
return model.counts["susceptible"]
def get_recovered_count(model):
return model.counts["recovered"]
def get_dead_count(model):
return model.counts["dead"]
def get_hotspot_count(model):
return model.counts["hotspot"]
def get_safe_count(model):
return model.counts["safe"]
class PersonAgent(mg.GeoAgent):
"""Person Agent."""
def __init__(
self,
model,
geometry,
crs,
agent_type,
mobility_range,
infection_risk,
recovery_rate,
death_risk
):
super().__init__(model, geometry, crs)
# Agent attributes
self.atype = agent_type
self.mobility_range = mobility_range
self.infection_risk=infection_risk,
self.recovery_rate = recovery_rate
self.death_risk = death_risk
self.steps_infected=0
self.steps_recovered = 0
def __repr__(self):
return f"Person {self.unique_id}"
#Helper function for moving agent
def move_point(self, dx, dy):
"""
Move a point by creating a new one
:param dx: Distance to move in x-axis
:param dy: Distance to move in y-axis
"""
return Point(self.geometry.x + dx, self.geometry.y + dy)
def step(self):
# Part 1 - find neighbors based on infection distance
if self.atype == "susceptible":
neighbors = self.model.space.get_neighbors_within_distance(
self, self.model.exposure_distance
)
for neighbor in neighbors:
if (
neighbor.atype == "infected"
and self.random.random() < self.model.infection_risk
):
self.atype = "infected"
break # stop process if agent becomes infected
# Part -2 If infected, check if agent recovers or agent dies
elif self.atype == "infected":
if self.steps_infected >= self.recovery_rate:
self.atype = "recovered"
self.steps_infected = 0
elif self.random.random() < self.death_risk:
self.atype = "dead"
else:
self.steps_infected += 1
elif self.atype == "recovered":
self.steps_recovered += 1
if self.steps_recovered >= 2:
self.atype = "susceptible"
self.steps_recovered = 0
# Part 3 - If not dead, move
if self.atype != "dead":
move_x = self.random.randint(-self.mobility_range, self.mobility_range)
move_y = self.random.randint(-self.mobility_range, self.mobility_range)
self.geometry = self.move_point(move_x, move_y) # Reassign geometry
self.model.counts[self.atype] += 1 # Count agent type
class NeighbourhoodAgent(mg.GeoAgent):
"""Neighbourhood agent. Changes color according to number of infected inside it."""
def __init__(
self, model, geometry, crs, agent_type="safe", hotspot_threshold=1
):
super().__init__(model, geometry, crs)
self.atype = agent_type
self.hotspot_threshold = (
hotspot_threshold # When a neighborhood is considered a hot-spot
)
def __repr__(self):
return f"Neighbourhood {self.unique_id}"
def color_hotspot(self):
# Decide if this region agent is a hot-spot
# (if more than threshold person agents are infected)
neighbors = self.model.space.get_intersecting_agents(self)
infected_neighbors = [
neighbor for neighbor in neighbors if neighbor.atype == "infected"
]
if len(infected_neighbors) > self.hotspot_threshold:
self.atype = "hotspot"
else:
self.atype = "safe"
def step(self):
"""Advance agent one step."""
self.color_hotspot()
self.model.counts[self.atype] += 1 # Count agent type
class GeoSIR(mesa.Model):
"""Model class for a simplistic infection model."""
# Geographical parameters for desired map
geojson_regions = "data/TorontoNeighbourhoods.geojson"
def __init__(
self, pop_size=30, mobility_range=500, init_infection=0.2, exposure_dist=500, max_infection_risk=0.2,
max_recovery_time=5
):
super().__init__()
# Scheduler
self.schedule = mesa.time.RandomActivationByType(self)
# Space
self.space = mg.GeoSpace(warn_crs_conversion=False)
# Data Collection
self.counts = None # added
self.reset_counts() # added
# SIR model parameters
self.pop_size = pop_size
self.mobility_range = mobility_range
self.initial_infection = init_infection
self.exposure_distance = exposure_dist
self.infection_risk = max_infection_risk
self.recovery_rate = max_recovery_time
self.running = True # added
# added
self.datacollector = mesa.DataCollector(
{
"infected": get_infected_count,
"susceptible": get_susceptible_count,
"recovered": get_recovered_count,
"dead": get_dead_count,
"safe": get_safe_count,
"hotspot": get_hotspot_count
}
)
# Set up the Neighbourhood patches for every region in file
ac = mg.AgentCreator(NeighbourhoodAgent, model=self)
neighbourhood_agents = ac.from_file(self.geojson_regions)
# Add neighbourhood agents to space
self.space.add_agents(neighbourhood_agents)
# Add neighbourhood agents to scheduler
for agent in neighbourhood_agents:
self.schedule.add(agent)
# Generate random location, add agent to grid and scheduler
for i in range(pop_size):
# assess if they are infected
if self.random.random() < self.initial_infection:
agent_type = "infected"
else:
agent_type = "susceptible"
# determine movement range
mobility_range = self.random.randint(0, self.mobility_range)
# determine agent recovery rate
recover = self.random.randint(1, self.recovery_rate)
# determine agents infection risk
infection_risk = self.random.uniform(0, self.infection_risk)
# determine agent death probability
death_risk = self.random.uniform(0, 0.05)
# Generate PersonAgent population
unique_person = mg.AgentCreator(
PersonAgent,
model=self,
crs=self.space.crs,
agent_kwargs={"agent_type": agent_type,
"mobility_range": mobility_range,
"recovery_rate": recover,
"infection_risk": infection_risk,
"death_risk": death_risk
}
)
x_home, y_home = self.find_home(neighbourhood_agents)
this_person = unique_person.create_agent(Point(x_home, y_home))
self.space.add_agents(this_person)
self.schedule.add(this_person)
def find_home(self, neighbourhood_agents):
""" Find start location of agent """
# identify location
this_neighbourhood = self.random.randint(
0, len(neighbourhood_agents) - 1
) # Region where agent starts
center_x, center_y = neighbourhood_agents[
this_neighbourhood
].geometry.centroid.coords.xy
this_bounds = neighbourhood_agents[this_neighbourhood].geometry.bounds
spread_x = int(
this_bounds[2] - this_bounds[0]
) # Heuristic for agent spread in region
spread_y = int(this_bounds[3] - this_bounds[1])
this_x = center_x[0] + self.random.randint(0, spread_x) - spread_x / 2
this_y = center_y[0] + self.random.randint(0, spread_y) - spread_y / 2
return this_x, this_y
# added
def reset_counts(self):
self.counts = {
"susceptible": 0,
"infected": 0,
"recovered": 0,
"dead": 0,
"safe": 0,
"hotspot": 0,
}
def step(self):
"""Run one step of the model."""
self.reset_counts() # added
self.schedule.step()
self.datacollector.collect(self) # added
# Run until no one is infected
if self.counts["infected"] == 0:
self.running = False
def SIR_draw(agent):
"""
Portrayal Method for canvas
"""
portrayal = {}
if isinstance(agent, PersonAgent):
if agent.atype == "susceptible":
portrayal["color"] = "Green"
elif agent.atype == "infected":
portrayal["color"] = "Red"
elif agent.atype == "recovered":
portrayal["color"] = "Blue"
else:
portrayal["marker_type"] = "AwesomeIcon"
portrayal["name"] = "times"
portrayal["icon_properties"] = {
"marker_color": 'black',
"icon_color": 'white'}
if isinstance(agent, NeighbourhoodAgent):
if agent.atype == "hotspot":
portrayal["color"] = "Red"
else:
portrayal["color"] = "Green"
return portrayal
model_params = {
"pop_size": {
"type": "SliderInt",
"value": 80,
"label": "Population Size",
"min": 0,
"max": 100,
"step": 1,
},
"mobility_range": {
"type": "SliderInt",
"value": 500,
"label": "Max Possible Agent Movement",
"min": 100,
"max": 1000,
"step": 50,
},
"init_infection": {
"type": "SliderFloat",
"value": 0.4,
"label": "Initial Infection",
"min": 0.0,
"max": 1.0,
"step": 0.1,
},
"exposure_dist": {
"type": "SliderInt",
"value": 800,
"label": "Exposure Distance",
"min": 100,
"max": 1000,
"step": 50,
},
"max_infection_risk": {
"type": "SliderFloat",
"value": 0.7,
"label": "Maximum Infection Risk",
"min": 0.0,
"max": 1.0,
"step": 0.1
},
"max_recovery_time": {
"type": "SliderInt",
"value": 7,
"label": "Maximum Number of Steps to Recover",
"min": 1,
"max": 10,
"step": 1,
}}
page = mgv.GeoJupyterViz(
GeoSIR,
model_params,
measures= [["infected", "susceptible", "recovered", "dead"], ["safe", "hotspot"]],
name="GeoSIR",
agent_portrayal=SIR_draw,
zoom=12,
scroll_wheel_zoom=False
)
# This is required to render the visualization in the Jupyter notebook
page