# ACRE - Analytics & Insights Control Tower
import dash_bootstrap_components as dbc
import vizro.models as vm
from vizro import Vizro
import geopandas as gpd
import numpy as np
import pandas as pd
from vizro.figures import kpi_card_reference
import os
import sys
from pages.home import home_page
# from inseason_page import inseason_page
"""Contains custom components and charts used inside the dashboard."""
import base64
import io
from typing import List, Literal, Optional
import dash_bootstrap_components as dbc
import geojson
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import vizro.models as vm
import vizro.plotly.express as px
from dash import html
from plotly.colors import n_colors
from vizro.models.types import capture
# Constants
default_colorscale = px.colors.sample_colorscale(px.colors.diverging.RdBu, 20)
colors = n_colors(
lowcolor=default_colorscale[0],
highcolor=default_colorscale[8],
n_colors=8,
colortype="rgb",
) + n_colors(
lowcolor="rgb(255, 255, 255)",
highcolor=default_colorscale[-1],
n_colors=5,
colortype="rgb",
)
### Sourcing constants
REGION_COL = "Region"
CROP_COL = "Crop"
YIELD_2024 = "2024 Yield"
AREA = "Area (ha)"
YIELD_DIFF_2C = "2050 Yield Difference in 2C Scenario"
YIELD_DIFF_23C = "2050 Yield Difference in 2.3C Scenario"
YIELD_DIFF_EXT = "2050 Yield Difference in Extreme Years (10th Percentile)"
SOURCING_MAP_LAYER_COLS = [YIELD_2024, YIELD_DIFF_2C, YIELD_DIFF_23C, YIELD_DIFF_EXT]
PROD_2024 = "2024 Production"
PROD_2050_2C = "2050 Production in 2C Scenario"
PROD_2050_23C = "2050 Production in 2.3C Scenario"
PROD_2050_EXT = "2050 Production in Extreme Years (10th Percentile)"
PROD_DIFF_2C = "2050 Production Difference in 2C Scenario"
PROD_DIFF_23C = "2050 Production Difference in 2.3C Scenario"
PROD_DIFF_EXT = "2050 Production Difference in Extreme Years (10th Percentile)"
LIMITS = (-50, 50)
COLUMN_MAPPING = {
YIELD_2024: {
"yield": YIELD_2024,
"prod": PROD_2024,
"prod_diff": None,
"title": "Current yield (t/ha)",
"unit": "t/ha",
"limits": None,
},
YIELD_DIFF_2C: {
"yield": YIELD_DIFF_2C,
"prod": PROD_2050_2C,
"prod_diff": PROD_DIFF_2C,
"title": "Expected yield change in a 2°C warming scenario",
"unit": "%",
"limits": LIMITS,
},
YIELD_DIFF_23C: {
"yield": YIELD_DIFF_23C,
"prod": PROD_2050_23C,
"prod_diff": PROD_DIFF_23C,
"title": "Expected yield change in a 2.3°C warming scenario",
"unit": "%",
"limits": LIMITS,
},
YIELD_DIFF_EXT: {
"yield": YIELD_DIFF_EXT,
"prod": PROD_2050_EXT,
"prod_diff": PROD_DIFF_EXT,
"title": "Expected yield change in extreme years",
"unit": "%",
"limits": LIMITS,
},
}
GEOGRAPHIC_CRS_CODE = 4326
# CUSTOM COMPONENTS -------------------------------------------------------------
class FlexContainer(vm.Container):
"""Custom flex `Container`."""
type: Literal["flex_container"] = "flex_container"
title: str = "" # Title exists in vm.Container but we don't want to use it here.
classname: str = "d-flex w-100 h-100" # Ensure full width and height
def build(self):
"""Returns a flex container."""
return html.Div(
id=self.id,
children=[component.build() for component in self.components],
className=self.classname,
style={
"width": "100%",
"height": "100%",
"margin": "0", # Remove any margin
"padding": "0", # Remove any padding
"overflow": "hidden", # Prevent scrollbars or extra space
},
)
# CUSTOM CHARTS ----------------------------------------------------------------
@capture("graph")
def plot_line_with_uncertainty(
data_frame: pd.DataFrame,
x: str,
y: str,
lower_bound: str,
upper_bound: str,
forecast_month: str,
):
"""
Create a line plot with uncertainty (shaded area and error bars) using Plotly Express.
Parameters:
- data_frame: pd.DataFrame containing the data
- x: str, column name for the x-axis
- y: str, column name for the y-axis
- lower_bound: str, column name for the lower bound of the uncertainty
- upper_bound: str, column name for the upper bound of the uncertainty
- forecast_month: str, the current month to highlight
"""
# Calculate the error bars
data_frame.loc[:, "error_upper"] = data_frame[upper_bound] - data_frame[y]
data_frame.loc[:, "error_lower"] = data_frame[y] - data_frame[lower_bound]
# Get index loc for current_month
if forecast_month in data_frame[x].values:
current_month_loc = data_frame.index.get_loc(
data_frame[data_frame[x] == forecast_month].index.to_list()[0]
)
else:
current_month_loc = len(data_frame) - 3
# Create a line plot using Plotly Express
fig = px.line(
data_frame.iloc[: current_month_loc + 1, :],
x=x,
y=y,
error_y="error_upper",
error_y_minus="error_lower",
)
fig.update_traces(line=dict(color="orange"))
# Customize the error bars color to white
fig.update_traces(error_y=dict(color="white"))
# Add another dashed line for current_month onwards
fig.add_trace(
go.Scatter(
x=data_frame.loc[current_month_loc:, x],
y=data_frame.loc[current_month_loc:, y],
mode="lines",
line=dict(color="orange", dash="dash"),
showlegend=False,
error_y=dict(
type="data",
array=data_frame.loc[current_month_loc:, "error_upper"],
arrayminus=data_frame.loc[current_month_loc:, "error_lower"],
color="white",
),
)
)
# Add the uncertainty (shaded area)
fig.add_traces(
[
go.Scatter(
x=data_frame[x].tolist() + data_frame[x][::-1].tolist(),
y=data_frame[upper_bound].tolist()
+ data_frame[lower_bound][::-1].tolist(),
fill="toself",
fillcolor="rgba(65,105,225,0.2)", # Transparent royal blue
line=dict(color="rgba(255,255,255,0)"),
hoverinfo="skip",
showlegend=False,
name="Uncertainty",
)
]
)
# Add a red dashed vertical line in April if it exists
fig.add_vline(
name="Forecast Date",
x=current_month_loc,
line=dict(color="red", dash="dash"),
showlegend=True,
)
# Update the layout
fig.update_layout(
template="plotly_dark", # Use the dark template
xaxis_title=x,
yaxis_title=y,
yaxis=dict(
range=[min(data_frame[lower_bound]) - 1, max(data_frame[upper_bound]) + 1]
), # Start y-axis at 0
paper_bgcolor="rgba(0,0,0,0)", # Transparent background
plot_bgcolor="rgba(0,0,0,0)", # Transparent plot area
font=dict(color="white"), # White font color
legend=dict(
x=1, # Position on the x-axis (1 = far right)
y=1.2, # Position on the y-axis (1 = top)
xanchor="right", # Anchor the legend to the right
yanchor="top", # Anchor the legend to the top
),
)
return fig
@capture("graph")
def bar_charts(
x: str,
y: str,
data_frame: pd.DataFrame,
top_n: int = 10,
xaxis_title: str = "",
custom_data: Optional[List[str]] = None,
):
"""Custom bar chart implementation.
Based on [px.bar](https://plotly.com/python-api-reference/generated/plotly.express.bar).
"""
df_sorted_top = data_frame.sort_values(by=x, ascending=False).head(top_n)
fig = px.bar(
data_frame=df_sorted_top,
x=y,
y=x,
orientation="v",
text=x,
color_discrete_sequence=["#1A85FF"],
custom_data=custom_data,
)
fig.update_layout(
xaxis_title=xaxis_title,
yaxis={
"title": "",
},
)
return fig
@capture("graph")
def bar_charts_with_uncertainty(
data_frame: pd.DataFrame,
x: str,
y: str,
lower_bound: str,
upper_bound: str,
forecast_month: str,
):
# Calculate the error bars
data_frame.loc[:, "error_upper"] = data_frame[upper_bound] - data_frame[y]
data_frame.loc[:, "error_lower"] = data_frame[y] - data_frame[lower_bound]
# Get index loc for current_month
if forecast_month in data_frame[x].values:
current_month_loc = data_frame.index.get_loc(
data_frame[data_frame[x] == forecast_month].index.to_list()[0]
)
else:
current_month_loc = len(data_frame) - 3
fig = go.Figure(
data=go.Bar(
x=data_frame.iloc[:current_month_loc, :][x],
y=data_frame.iloc[:current_month_loc, :][y],
error_y=dict(
type="data",
symmetric=False,
array=data_frame.iloc[:current_month_loc, :]["error_upper"],
arrayminus=data_frame.iloc[:current_month_loc, :]["error_lower"],
),
marker=dict(color="#05843c"),
showlegend=False,
)
)
fig.add_trace(
go.Bar(
x=data_frame.iloc[current_month_loc:, :][x],
y=data_frame.iloc[current_month_loc:, :][y],
error_y=dict(
type="data",
symmetric=False,
array=data_frame.iloc[current_month_loc:, :]["error_upper"],
arrayminus=data_frame.iloc[current_month_loc:, :]["error_lower"],
),
marker=dict(pattern=dict(shape="/"), color="#05843c"),
showlegend=False,
)
)
# Customize the error bars color to white
fig.update_traces(error_y=dict(color="white"))
# Add a red dashed vertical line in April if it exists
fig.add_vline(
name="Forecast Date",
x=current_month_loc,
line=dict(color="red", dash="dash"),
showlegend=False,
)
return fig
@capture("graph")
def custom_choropleth_map(
locations: str,
color: str,
data_frame: gpd.GeoDataFrame = None,
title: Optional[str] = None,
custom_data: Optional[List[str]] = None,
):
"""Custom choropleth implementation."""
# Ensure the geometry column is in GeoJSON format
if isinstance(data_frame["geometry"].iloc[0], dict):
geojson = data_frame["geometry"].tolist()
else:
geojson = data_frame["geometry"].apply(lambda x: x.__geo_interface__).tolist()
# Create a GeoJSON dictionary
geojson_dict = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": geom,
"properties": {locations: row[locations]},
}
for geom, (_, row) in zip(geojson, data_frame.iterrows())
],
}
# Calculate the bounds of the GeoDataFrame
bounds = data_frame.total_bounds # [minx, miny, maxx, maxy]
map_center = {
"lat": (bounds[1] + bounds[3]) / 2,
"lon": (bounds[0] + bounds[2]) / 2,
}
# Calculate zoom level based on bounds (approximation)
lat_diff = bounds[3] - bounds[1]
lon_diff = bounds[2] - bounds[0]
zoom = np.clip(9.5 - np.log2(max(lat_diff, lon_diff)), 0, 20)
# Create the choropleth map
fig = px.choropleth_map(
data_frame=data_frame,
geojson=geojson_dict,
locations=locations,
featureidkey=f"properties.{locations}",
color=color,
color_continuous_scale="greens",
title=title,
custom_data=custom_data,
center=map_center,
zoom=zoom,
opacity=0.7,
map_style="satellite",
)
# Update layout
fig.update_layout(
margin={"r": 0, "t": 0, "l": 0, "b": 0},
coloraxis_colorbar={
"thickness": 10,
"title": {"side": "bottom"},
"orientation": "h", # Horizontal orientation
"x": 0.5, # Center the legend horizontally
"y": 0, # Position the legend below the map
"xanchor": "center", # Anchor the legend at the center
"yanchor": "top", # Anchor the legend at the top
},
)
return fig
def get_card(
value,
id: str = None,
value_format: str = "{value}",
title: Optional[str] = None,
icon: Optional[str] = None,
title_font_size: str = "24px",
):
header = dbc.CardHeader(
[
html.P(icon, className="material-symbols-outlined") if icon else None,
html.H4(
title,
className="card-kpi-title",
style={"font-size": f"{title_font_size}"},
),
]
)
body = dbc.CardBody(
value_format.format(value=value), id=id, style={"font-size": "54px"}
)
return dbc.Card([header, body], class_name="card-kpi")
@capture("graph")
def create_waterfall(
data_frame: pd.DataFrame,
field_id: str,
seed_density: int,
fertilizer: int,
sowing_date: int = 0,
harvest_date: int = 0,
):
df_filter = (
data_frame[
(data_frame["Seed density increase (%)"] == seed_density / 100)
& (data_frame["Fertilizer application increase (%)"] == fertilizer / 100)
& (data_frame["Shift in sowing date"] == f"{sowing_date} days")
& (data_frame["Shift in harvest date"] == f"{harvest_date} days")
& (data_frame["Field ID"] == field_id)
]
.copy()
.reset_index(drop=True)
)
fig = go.Figure(
go.Waterfall(
# name = "20",
orientation="v",
measure=["initial", "relative", "relative", "relative", "total"],
x=[
"Current EBITDA",
"Seeds Cost Increase",
" Fertilizers Costs Increase",
"Change in Revenue after implementation",
"Optimized EBITDA",
],
textposition="outside",
# text = ["+60", "+80", "", "-40", "-20", "Total"],
y=[
df_filter.loc[0, "Value of production (dolars)"],
-df_filter.loc[0, "Seed costs increase"],
-df_filter.loc[0, "Fertilizer costs increase"],
df_filter.loc[0, "Final value of production (dolars)"],
df_filter.loc[0, "Value of production increase"],
df_filter.loc[0, "Value of production (dolars)"]
+ df_filter.loc[0, "Value of production increase"],
],
connector={"line": {"color": "rgb(63, 63, 63)"}},
# decreasing = {"marker":{"color":"Orange"}},
# increasing = {"marker":{"color":"Teal"}},
# totals = {"marker":{"color":"deep sky blue"}}
)
)
fig.update_layout(title="Profit and loss <sub>(US$)</sub>", showlegend=False)
return fig
@capture("graph")
def create_choropleth(
data_frame: pd.DataFrame,
field_id: str,
seed_density: int,
fertilizer: int,
sowing_date: int = 0,
harvest_date: int = 0,
polygons_pth: str = None,
):
data_frame["Yield variation"] = data_frame["Yield variation"] * 100
df_filter = data_frame[
(data_frame["Seed density increase (%)"] == seed_density / 100)
& (data_frame["Fertilizer application increase (%)"] == fertilizer / 100)
& (data_frame["Shift in sowing date"] == f"{sowing_date} days")
& (data_frame["Shift in harvest date"] == f"{harvest_date} days")
& (data_frame["Field ID"] == field_id)
].copy()
with open(polygons_pth) as f:
gj = geojson.load(f)
gdf = gpd.read_file(polygons_pth)
# Get geometry centroid in EPSG:3857
centroid_xy = gdf.to_crs("EPSG:3857").geometry.centroid
# Convert to EPSG:4326
gdf["geometry"] = centroid_xy.to_crs("EPSG:4326")
fig = go.Figure(
go.Choroplethmap(
locations=df_filter["Field ID"],
z=df_filter["Yield variation"],
geojson=gj,
featureidkey="properties.Field_ID",
zmin=-40,
zmax=20,
colorscale="Greens",
marker=dict(opacity=0.7),
name="Field ID",
text=["Current yield", "Modeled yield", "Yield variation"],
)
)
fig.update_geos(fitbounds="locations", visible=False)
fig.update_layout(
map=dict(
style="satellite",
center=dict(lat=gdf.loc[0, "geometry"].y, lon=gdf.loc[0, "geometry"].x),
zoom=15,
),
margin={"r": 0, "t": 0, "l": 0, "b": 0},
title=None,
)
return fig
@capture("figure")
def custom_kpi_card_0(
data_frame: pd.DataFrame,
field_id: str,
column,
seed_density: int,
fertilizer: int,
sowing_date: int = 0,
harvest_date: int = 0,
icon: str = None,
) -> dbc.Card:
"""Creates a custom KPI card."""
df_filter = (
data_frame[
(data_frame["Seed density increase (%)"] == seed_density / 100)
& (data_frame["Fertilizer application increase (%)"] == fertilizer / 100)
& (data_frame["Shift in sowing date"] == f"{sowing_date} days")
& (data_frame["Shift in harvest date"] == f"{harvest_date} days")
& (data_frame["Field ID"] == field_id)
]
.copy()
.reset_index()
)
yield_value = df_filter.loc[0, column]
fig = get_card(
title=column,
value=yield_value,
value_format=(
"{value:.1f} %" if (column == "Yield variation") else "{value:.1f} t/ha"
),
id=f"{field_id}_{column}",
icon=icon,
)
return fig
####### Suitability
def convert_numpy_to_png(arr, cmap):
# Convert the raster array to a PNG image (using matplotlib)
buf = io.BytesIO()
vmin = np.nanmin(arr)
vmax = np.nanmax(arr)
plt.imsave(buf, np.flipud(arr), cmap=cmap, vmin=vmin, vmax=vmax)
buf.seek(0)
img_png = buf.read()
img_base64 = base64.b64encode(img_png).decode("utf-8")
img_data_uri = f"data:image/png;base64,{img_base64}"
return img_data_uri
def _filter_data_frame_(
data_frame: pd.DataFrame,
t_per_ha: (0, 40),
percentage: (0, 1),
):
min_yield, max_yield = t_per_ha
min_suitability, max_suitability = percentage
# Filter the data based on the selected ranges
data_frame = data_frame[
(data_frame.lower_2 >= min_yield)
& (data_frame.upper_2 <= max_yield)
& (data_frame.lower_1 >= min_suitability)
& (data_frame.upper_1 <= max_suitability)
]
return data_frame
@capture("graph")
def create_image(
data_frame: pd.DataFrame,
template: gpd.GeoDataFrame,
images: dict,
cmap: str = "gray",
min_value=0,
layer="yield",
t_per_ha=(20, 30),
percentage=(0, 1),
):
# Remove any extra dimensions
if layer == "yield" or layer == "suitability":
min_value, max_value = t_per_ha if layer == "yield" else percentage
da = images[layer].squeeze()
da = da.where((da > min_value) * (da < max_value), np.nan)
arr = da.values # The raster data as a numpy array
# Convert the raster array to a PNG image (using matplotlib)
img_data_uri = convert_numpy_to_png(arr, cmap, min_value, max_value)
elif layer == "available land":
da = images[layer].squeeze()
da_yield = images["yield"].squeeze()
da_suitability = images["suitability"].squeeze()
min_yield, max_yield = t_per_ha
min_suitability, max_suitability = percentage
da = da.where((da_yield >= min_yield) * (da_yield <= max_yield), 0)
da = da.where(
(da_suitability >= min_suitability) * (da_suitability <= max_suitability), 0
)
arr = da.values # The raster data as a numpy array
# get custom cmap where value 0 is transparent, and value 1 is green
cmap = plt.get_cmap(cmap)
cmap = cmap(np.linspace(0, 1, 256))
cmap[0, -1] = 0 # set alpha to 0 for value 0
cmap[1:, -1] = 1
cmap = plt.cm.colors.ListedColormap(cmap)
# Convert the raster array to a PNG image (using matplotlib)
img_data_uri = convert_numpy_to_png(arr, cmap, 0, 1)
else:
raise ValueError("layer must be either yield, suitability or available land")
# Extract georeferenced bounds: left, bottom, right, top, but set them in WGS84
left_1, top_1, right_1, bottom_1 = template.to_crs("EPSG:4326").total_bounds
# create map
fig = go.Figure(
go.Scattermap(
# data_frame = data_frame,
# geojson=json.loads(data_frame.to_json()),
# geojson = data_frame.__geo_interface__,
# featureidkey = "properties.ADM1_ES",
# locations=data_frame.ADM1_ES.to_list(),
# coloraxis="ADM1_ES",
# opacity=0.7
)
)
fig.update_layout(
map=dict(
style="satellite",
layers=[
{
"sourcetype": "image",
"source": img_data_uri,
"coordinates": [
[left_1, top_1],
[right_1, top_1],
[right_1, bottom_1],
[left_1, bottom_1],
],
"opacity": 0.7,
}
],
center=dict(lat=(bottom_1 + top_1) / 2, lon=(left_1 + right_1) / 2),
zoom=9,
),
margin={"r": 0, "t": 0, "l": 0, "b": 0},
)
return fig
@capture("graph")
def create_barchart(
data_frame: pd.DataFrame,
t_per_ha: (0, 40),
percentage: (0, 1),
):
data_frame = _filter_data_frame_(data_frame, t_per_ha, percentage)
if len(data_frame) == 0:
pivot = pd.DataFrame(data={"name": ["NA"], "area": [0]})
else:
pivot = (
pd.pivot_table(
data_frame,
index=["name"],
values=["area"],
aggfunc="sum",
)
.reset_index(drop=False)
.sort_values(by="area", ascending=False)
)
fig = px.bar(pivot, x="name", y="area", title="Total Area Available per Region")
fig.update_layout(
xaxis_title="Region", yaxis_title="Total Area (ha)", xaxis_tickangle=45
)
return fig
# https://vizro.readthedocs.io/en/stable/pages/user-guides/custom-figures/#custom-kpi-card
# icons: https://fonts.google.com/icons?icon.query=map
@capture("figure")
def custom_kpi_card_1(
data_frame: pd.DataFrame,
title: Optional[str],
icon: Optional[str] = None,
t_per_ha=(20, 40),
percentage=(0, 1),
) -> dbc.Card:
"""Creates a custom KPI card."""
# filter and format data
data_frame = _filter_data_frame_(data_frame, t_per_ha, percentage)
if len(data_frame) == 0:
data_frame = pd.DataFrame(data={"name": ["NA"], "area": [0]})
else:
data_frame = pd.DataFrame(
data={
"area": [np.round(data_frame["area"].sum(), 1)],
}
)
# create card
title = title
value = data_frame["area"].sum()
header = dbc.CardHeader(
[
html.H2(title),
html.P(icon, className="material-symbols-outlined") if icon else None,
]
)
body_1 = dbc.CardBody([f"{value / 1e3:.1f}"])
body_2 = dbc.CardBody([html.H4("kha")])
return dbc.Card([header, body_1, body_2], class_name="card-kpi")
@capture("figure")
def custom_kpi_card_2(
data_frame: pd.DataFrame,
title: Optional[str],
icon: Optional[str] = None,
t_per_ha=(20, 40),
percentage=(0, 1),
) -> dbc.Card:
"""Creates a custom KPI card."""
# filter and format data
data_frame = _filter_data_frame_(data_frame, t_per_ha, percentage)
if len(data_frame) == 0:
average_yield = 0
else:
data_frame["yield"] = (data_frame["upper_2"] + data_frame["lower_2"]) / 2
average_yield = np.sum(data_frame["yield"] * data_frame["area"]) / np.sum(
data_frame["area"]
)
# create card
title = title
header = dbc.CardHeader(
[
html.H2(title),
html.P(icon, className="material-symbols-outlined") if icon else None,
]
)
body_1 = dbc.CardBody([f"{average_yield:.1f}"])
body_2 = dbc.CardBody([html.H4("t/ha")])
return dbc.Card([header, body_1, body_2], class_name="card-kpi")
@capture("figure")
def custom_kpi_card_3(
data_frame: pd.DataFrame,
title: Optional[str],
icon: Optional[str] = None,
t_per_ha=(20, 40),
percentage=(0, 1),
) -> dbc.Card:
"""Creates a custom KPI card."""
# filter and format data
data_frame = _filter_data_frame_(data_frame, t_per_ha, percentage)
if len(data_frame) == 0:
total_yield = 0
else:
data_frame["yield"] = (data_frame["upper_2"] + data_frame["lower_2"]) / 2
total_yield = np.sum(data_frame["yield"] * data_frame["area"])
# create card
title = title
header = dbc.CardHeader(
[
html.H2(title),
html.P(icon, className="material-symbols-outlined") if icon else None,
]
)
body_1 = dbc.CardBody(
[
html.P(f"{total_yield / 1e6:.1f}"),
]
)
body_2 = dbc.CardBody([html.H4("million t")])
return dbc.Card([header, body_1, body_2], class_name="card-kpi")
def get_card_body(
data_frame: pd.DataFrame,
yield_column: str = "",
region: str = None,
):
if region is not None:
data_frame = data_frame[data_frame[REGION_COL] == region]
"""Creates a custom KPI card."""
yield_2024 = np.sum(data_frame[YIELD_2024] * data_frame[AREA]) / np.sum(
data_frame[AREA]
)
yield_2050 = np.sum(
data_frame[YIELD_2024] * (1 + data_frame[yield_column] / 100) * data_frame[AREA]
) / np.sum(data_frame[AREA])
difference = 100 * (yield_2050 - yield_2024) / yield_2024
return yield_2024, yield_2050, difference
@capture("figure")
def sourcing_risk_custom_kpi_card(
data_frame: pd.DataFrame,
yield_column: str = "",
to_show: Literal["yield_2024", "yield_2050", "difference"] = "yield_2024",
region: str = None,
title: str = None,
icon: str = None,
) -> dbc.Card:
yield_2024, yield_2050, difference = get_card_body(
data_frame,
yield_column=yield_column,
region=region,
)
if to_show == "yield_2024":
fig = get_card(
value=yield_2024,
id="current_yield_card_text",
value_format="{value:.1f} t/ha",
title=title,
icon=icon,
)
elif to_show == "yield_2050" and yield_column != YIELD_2024:
fig = get_card(
value=yield_2050,
id="future_yield_card_text",
value_format="{value:.1f} t/ha",
title=title,
icon=icon,
)
elif to_show == "difference" and yield_column != YIELD_2024:
fig = get_card(
value=difference,
id="yield_difference_card_text",
value_format="{value:.1f} %",
title=title,
icon=icon,
)
else:
fig = get_card(
value="",
id="empty_card_text",
value_format="",
title="Select a future climate scenario to see how yield will change",
icon=None,
title_font_size="20px",
)
return fig
@capture("graph")
def create_map(
data_frame: pd.DataFrame,
yield_column: str = YIELD_2024,
custom_data: Optional[List[str]] = None,
):
data_to_scale = data_frame[yield_column].dropna()
# if limits are specified
if COLUMN_MAPPING[yield_column]["limits"] is not None:
min_val, max_val = COLUMN_MAPPING[yield_column]["limits"]
else:
min_val, max_val = data_to_scale.min(), data_to_scale.max()
map_zoom = 7
bounds = data_frame.total_bounds
map_center = dict(lat=(bounds[1] + bounds[3]) / 2, lon=(bounds[0] + bounds[2]) / 2)
is_diff_map = "Difference" in yield_column
color_scale = "RdBu" if is_diff_map else "YlGn"
col_val = "Yield (t/ha)" if yield_column == YIELD_2024 else "Change (%)"
figure = px.choropleth_map(
data_frame=data_frame,
geojson=data_frame.geometry,
locations=data_frame.index,
color=yield_column,
map_style="satellite",
center=map_center,
zoom=map_zoom,
opacity=0.7,
color_continuous_scale=color_scale,
range_color=(min_val, max_val),
labels={col: col_val for col in SOURCING_MAP_LAYER_COLS + [REGION_COL]},
custom_data=custom_data,
)
figure.update_layout(
margin={"l": 0, "r": 0, "t": 0, "b": 0},
)
# update colorbar title
figure.update_coloraxes(colorbar_title_text=COLUMN_MAPPING[yield_column]["unit"])
return figure
@capture("graph") # Decorator ADDED as requested by error
def create_bar_chart(
data_frame: pd.DataFrame,
yield_column: str = YIELD_2024,
custom_data: Optional[List[str]] = None,
):
data_frame["absolute_difference"] = data_frame[yield_column].abs()
is_diff_chart = "Difference" in yield_column
x_axis_label = "Relative change in yield (%)" if is_diff_chart else "Yield (t/ha)"
df_top10 = (
data_frame.sort_values(
by="absolute_difference",
ascending=False,
)
.reset_index()
.iloc[:10]
)
df_top10 = df_top10.sort_values(
by=yield_column,
ascending=False,
).reset_index()
if COLUMN_MAPPING[yield_column]["limits"] is not None:
min_val, max_val = COLUMN_MAPPING[yield_column]["limits"]
else:
min_val, max_val = df_top10[yield_column].min(), df_top10[yield_column].max()
figure = px.bar(
df_top10,
x=yield_column,
y=REGION_COL,
orientation="h",
title="",
labels={yield_column: x_axis_label, REGION_COL: "Region"},
color=yield_column,
color_continuous_scale="RdBu" if is_diff_chart else "YlGn",
range_color=(min_val, max_val),
text=yield_column,
custom_data=custom_data,
)
figure.update_traces(texttemplate="%{text:.2f}", textposition="outside")
max_val_bar, min_val_bar = (
df_top10[yield_column].max(),
df_top10[yield_column].min(),
)
xaxis_max = max_val_bar + (abs(max_val_bar) * 0.15)
xaxis_min = min_val_bar - (abs(min_val_bar) * 0.1)
figure.update_layout(
title_x=0.5,
title_xanchor="center",
xaxis_title=x_axis_label,
yaxis_title=None,
margin={"r": 50, "t": 40, "l": 10, "b": 10},
xaxis_range=[xaxis_min, xaxis_max],
)
# # set 1 decimal place
figure.update_traces(texttemplate="%{text:.1f}", textposition="outside")
# # don't show legend
figure.update_layout(showlegend=False)
figure.update_coloraxes(showscale=False)
return figure
# Load and preprocess data for Azerbaijan - Wheat
testcwd = os.getcwd()
inseaon_data_path = os.path.join(testcwd, "data/in_season_forecasting/")
in_season_data_az = (
gpd.read_file(inseaon_data_path + "Aze_adm1.gpkg")
.query(
"ADM1_EN in ['Beylagan', 'Ujar', 'Saatly', 'Barda', 'Agdash', 'Zardab', 'Sabirabad', 'Agdjabadi', 'Hajigabul', 'Imishly', 'Kurdamir', 'Goychay', 'Ismayilly', 'Aghsu', 'Shamakhy', 'Gabala']"
)
.assign(
Region=lambda df: [f"Region {i + 1}" for i in range(len(df))],
Area=lambda df: df.to_crs(epsg=3395).geometry.area / 1e6,
Crop="Wheat",
)[["Region", "geometry", "Area", "Crop"]]
.to_crs(epsg=4326)
)
# Load and preprocess data for Sumatra - Oil Palm and Sugarcane
inseason_data_sumatra = (
gpd.read_file(inseaon_data_path + "Sumatra_focus_regions.geojson")
.assign(
Region=lambda df: [f"Region {i + 1}" for i in range(len(df))],
Area=lambda df: df.to_crs(epsg=3395).geometry.area / 1e6,
Crop="Oil Palm",
)[["Region", "geometry", "Area", "Crop"]]
.to_crs(epsg=4326)
)
inseason_data_sugarcane = inseason_data_sumatra.assign(Crop="Sugarcane")
inseason_data_sumatra = pd.concat(
[inseason_data_sumatra, inseason_data_sugarcane], ignore_index=True
)
# Load and preprocess data for France - Sugar Beet
inseason_data_france = (
gpd.read_file(inseaon_data_path + "french_departments.gpkg")
.cx[0:3, 46:48]
.assign(
Region=lambda df: [f"Region {i + 1}" for i in range(len(df))],
Area=lambda df: df.to_crs(epsg=3395).geometry.area / 1e6,
Crop="Sugar Beet",
)[["Region", "geometry", "Area", "Crop"]]
.to_crs(epsg=4326)
)
# Area km2 is already present in the NL geopackage
inseason_data_netherlands = (
gpd.read_file(inseaon_data_path + "nl_carrots_supply.gpkg")
.assign(
# Region=lambda df: [f'Region {i + 1}' for i in range(len(df))],
Region=lambda df: df["name"].str.replace(" ", "_"),
Crop="Carrot",
)[["Region", "geometry", "Area", "Crop", "estimated_supply_t"]]
.to_crs(epsg=4326)
)
# Combine all data
mydf = pd.concat(
[
in_season_data_az,
inseason_data_sumatra,
inseason_data_france,
inseason_data_netherlands,
],
ignore_index=True,
)
# Yield estimate uncertainties
df_uncertainty_all = pd.concat(
[
pd.DataFrame(
{
"Crop": ["Wheat"] * 9,
"Month": [
"November",
"December",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
],
"Lower_Bound": [4.4, 5.5, 5.6, 5.6, 5.7, 5.7, 5.9, 6.0, 6.0],
"Yield (t/ha)": [6.6, 6.4, 6.3, 6.0, 6.1, 6.2, 6.1, 6.2, 6.2],
"Upper_Bound": [8.7, 8.6, 8.2, 7.0, 6.9, 6.6, 6.5, 6.3, 6.3],
}
),
pd.DataFrame(
{
"Crop": ["Sugarcane"] * 11,
"Month": [
"September",
"October",
"November",
"December",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
],
"Lower_Bound": [
50.0,
52.0,
52.5,
55.0,
55.0,
56.5,
58.0,
62.0,
62.5,
64.0,
64.7,
],
"Yield (t/ha)": [
60.0,
62.0,
61.5,
63.0,
63.5,
63.8,
64.0,
64.2,
64.5,
64.7,
65.0,
],
"Upper_Bound": [
72.0,
71.0,
70.5,
67.0,
68.0,
68.0,
67.0,
65.5,
65.5,
65.3,
65.3,
],
}
),
pd.DataFrame(
{
"Crop": ["Oil Palm"] * 9,
"Month": [
"August 10th",
"August 20th",
"August 30th",
"September 10th",
"September 20th",
"September 30th",
"October 10th",
"October 20th",
"October 30th",
],
"Lower_Bound": [40.0, 42.0, 42.0, 44.0, 45.0, 44.5, 48.0, 50.5, 52.0],
"Yield (t/ha)": [50.0, 51.5, 51.0, 52.5, 53.0, 53.2, 53.5, 53.8, 54.0],
"Upper_Bound": [65.0, 63.0, 64.0, 61.0, 60.0, 59.0, 57.0, 55.5, 55.0],
}
),
pd.DataFrame(
{
"Crop": ["Sugar Beet"] * 10,
"Month": [
"October",
"November",
"December",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
],
"Lower_Bound": [
60.0,
61.0,
65.0,
64.0,
67.5,
69.0,
70.0,
72.0,
72.5,
73.0,
],
"Yield (t/ha)": [
70.0,
71.5,
71.0,
72.5,
72.8,
73.0,
73.2,
73.5,
73.8,
74.0,
],
"Upper_Bound": [
82.0,
80.0,
79.0,
77.0,
76.0,
75.0,
74.5,
74.0,
73.9,
74.2,
],
}
),
pd.DataFrame(
{
"Crop": ["Carrot"] * 10,
"Month": [
"March 15th",
"March 30th",
"April 15th",
"April 30th",
"May 15th",
"May 30th",
"June 15th",
"June 30th",
"July 15th",
"July 30th",
],
"Lower_Bound": [
38.0,
40.0,
42.0,
44.0,
46.0,
48.0,
49.0,
50.3,
51.0,
50.0,
],
"Yield (t/ha)": [
53.0,
58.0,
56.0,
61.0,
58.0,
57.0,
56.0,
57.0,
56.0,
57.0,
],
"Upper_Bound": [
82.0,
80.0,
78.0,
73.0,
70.0,
68.0,
65.0,
64.0,
63.0,
61.0,
],
}
),
],
ignore_index=True,
)
# Generate dummy data for KPIs
inseason_kpi_df = pd.read_excel(inseaon_data_path + "KPIs.xlsx")
def generate_dummy_data(df, kpi_df, crop):
crop_row_index = kpi_df.index[kpi_df["Crop"] == crop].tolist()[0]
kpi_df_crop = kpi_df.loc[crop_row_index]
mydf_crop = df[df["Crop"] == crop].copy()
# Create random data for production
total_area = mydf_crop["Area"].sum()
total_production = kpi_df_crop["2025 Potential Supply (Mt)"] * 1000
crop_production = [x / total_area * total_production for x in mydf_crop["Area"]]
# Randomly assign production values to regions
if crop.lower() == "carrot" and "estimated_supply_t" in mydf_crop.columns:
mydf_crop["Supply"] = mydf_crop["estimated_supply_t"]
else:
mydf_crop["Supply"] = np.random.choice(
crop_production, size=len(mydf_crop), replace=True
)
# Ensure production is non-negative
mydf_crop["Supply"] = round(mydf_crop["Supply"].clip(lower=0), 2)
# Create random data for yield with mean equal to kpi_df.loc[crop_row_index, '2025 Avg Yield (t/ha)'
avg_yield = kpi_df_crop["2025 Avg Yield (t/ha)"]
mydf_crop["Yield"] = np.random.normal(
loc=avg_yield, scale=avg_yield / 10, size=len(mydf_crop)
)
# Ensure yield is non-negative
mydf_crop["Yield"] = round(mydf_crop["Yield"].clip(lower=0), 2)
return mydf_crop
myinseason_df = pd.concat(
[
generate_dummy_data(mydf, inseason_kpi_df, crop)
for crop in inseason_kpi_df["Crop"].unique()
],
ignore_index=True,
)
inseason_available_crops = myinseason_df["Crop"].unique().tolist()
inseason_default_crop = (
inseason_available_crops[0] if inseason_available_crops else None
)
total_area = myinseason_df.groupby("Crop")["Area"].sum().reset_index()
df_uncertainty_all = df_uncertainty_all.merge(total_area, on="Crop", how="left")
df_uncertainty_all["Area"] *= 10
df_uncertainty_all["Supply (t)"] = (
df_uncertainty_all["Area"] * df_uncertainty_all["Yield (t/ha)"]
)
df_uncertainty_all["Lower_Bound_Sup"] = (
df_uncertainty_all["Area"] * df_uncertainty_all["Lower_Bound"]
)
df_uncertainty_all["Upper_Bound_Sup"] = (
df_uncertainty_all["Area"] * df_uncertainty_all["Upper_Bound"]
)
vm.Page.add_type(field_name="components", new_type=FlexContainer)
inseason_kpi_banner = FlexContainer(
components=[
vm.Figure(
figure=kpi_card_reference(
inseason_kpi_df,
value_column=inseason_kpi_df.columns[1],
reference_column=inseason_kpi_df.columns[0],
title=inseason_kpi_df.columns[1],
value_format="{value:.2f}",
reference_format="{delta_relative:+.1%} vs. 5 years avg ({reference:.1f})",
icon="Grain",
)
),
vm.Figure(
figure=kpi_card_reference(
inseason_kpi_df,
value_column=inseason_kpi_df.columns[3],
reference_column=inseason_kpi_df.columns[2],
title=inseason_kpi_df.columns[3],
value_format="{value:.1f}",
reference_format="{delta_relative:+.1%} vs. 5 years avg ({reference:.1f})",
icon="Eco",
)
),
vm.Figure(
figure=kpi_card_reference(
inseason_kpi_df,
value_column=inseason_kpi_df.columns[5],
reference_column=inseason_kpi_df.columns[4],
title=inseason_kpi_df.columns[5],
value_format="{value:.3f}",
reference_format="{delta_relative:+.1%} vs. 5 years avg ({reference:.1f})",
icon="Agriculture",
),
),
vm.Figure(
figure=kpi_card_reference(
inseason_kpi_df,
value_column=inseason_kpi_df.columns[7],
reference_column=inseason_kpi_df.columns[6],
title=inseason_kpi_df.columns[7],
value_format="{value:.1f}",
reference_format="{delta:.1f}pp vs. 5 years avg ({reference:.1f}%)",
icon="Water",
reverse_color=True,
),
),
vm.Figure(
figure=kpi_card_reference(
inseason_kpi_df,
value_column=inseason_kpi_df.columns[9],
reference_column=inseason_kpi_df.columns[8],
title=inseason_kpi_df.columns[9],
value_format="{value:.1f}",
reference_format="{delta:.1f}pp vs. 5 years avg ({reference:.1f}%)",
icon="Grass",
reverse_color=True,
),
),
],
classname="kpi-banner",
)
## IN-SEASON ------------------------------------------------------------------------
inseason_page = vm.Page(
title="In Season Supply Forecasting",
layout=vm.Layout(grid=[[0, 0], [1, 2], [1, 2], [1, 2], [1, 3], [1, 3], [1, 3]]),
components=[
inseason_kpi_banner, # KPI cards (0)
vm.Graph( # Supply by Region Map (1)
title="Supply by Region (Mt)",
id="supply-map",
figure=custom_choropleth_map(
locations="Region",
color="Supply",
data_frame=myinseason_df,
custom_data=None,
),
),
vm.Graph( # Yield Forecast Uncertainty Chart (2)
title="In-Season Yield Forecast (t/ha)",
id="yield-chart",
figure=plot_line_with_uncertainty(
data_frame=df_uncertainty_all,
x="Month",
y="Yield (t/ha)",
lower_bound="Lower_Bound",
upper_bound="Upper_Bound",
forecast_month="May 30th",
),
),
vm.Graph( # Supply by Region Chart (3)
title="In-Season Supply Forecast (t)",
id="supply-chart",
figure=bar_charts_with_uncertainty(
data_frame=df_uncertainty_all,
x="Month",
y="Supply (t)",
lower_bound="Lower_Bound_Sup",
upper_bound="Upper_Bound_Sup",
forecast_month="May 30th",
),
),
],
controls=[
vm.Filter(
column="Crop",
selector=vm.Dropdown(
title="Select Crop",
options=inseason_available_crops,
value=inseason_default_crop,
multi=False,
),
),
],
)
dashboard = vm.Dashboard(
title="Agriculture Analytics & Insights Control Tower",
pages=[home_page, inseason_page],
navigation=vm.Navigation(
nav_selector=vm.NavBar(
items=[
vm.NavLink(label="Homepage", pages=["Homepage"], icon="Home"),
vm.NavLink(label="In-Season-Supply-Forecast", pages=["In Season Supply Forecasting"],
icon="Event Upcoming"),
]
)
),
)
app = Vizro().build(dashboard)
# Add footer in the bottom right corner
app.dash.layout.children.append(
dbc.NavLink(
["From McKinsey - Powered by ACRE"],
href="https://www.mckinsey.com/industries/agriculture/how-we-help-clients/acre",
target="_blank",
className="anchor-container",
)
)
server = app.dash.server
if __name__ == "__main__":
app.run()