import zarr
import s3fs
import pandas as pd
import altair as alt
import solara
from tqdm import tqdm
def get_forecast_data(river_number, date):
s3_bucket_url = f's3://geoglows-v2-forecasts/{date}.zarr/'
s3 = s3fs.S3FileSystem(anon=True)
try:
mapper = s3fs.S3Map(root=s3_bucket_url, s3=s3, check=False)
zarr_group = zarr.open_group(mapper, mode='r')
rivid_array = zarr_group['rivid'][:]
if river_number in rivid_array:
river_index = list(rivid_array).index(river_number)
qout_array = zarr_group['Qout'][:, :, river_index]
time_array = zarr_group['time'][:]
ensemble_array = zarr_group['ensemble'][:]
forecast_df = pd.DataFrame(qout_array, columns=time_array)
forecast_df.index = [f"ensemble_{i}" for i in ensemble_array]
forecast_df = forecast_df.transpose()
return forecast_df
else:
return pd.DataFrame()
except Exception as e:
print(f"Error accessing data for RiverNumber {river_number} on {date}: {e}")
return pd.DataFrame()
def list_s3_contents(bucket_url):
s3 = s3fs.S3FileSystem(anon=True)
try:
contents = s3.ls(bucket_url)
return contents
except Exception as e:
print(f"Error accessing S3 bucket: {e}")
return []
def visualize_forecast_data(forecast_df, river_number, date):
forecast_df = forecast_df.reset_index().melt(id_vars=['index'], var_name='Ensemble', value_name='Flow')
forecast_df.columns = ['Time', 'Ensemble', 'Flow']
chart = alt.Chart(forecast_df).mark_line().encode(
x='Time:T',
y='Flow:Q',
color='Ensemble:N'
).properties(
title=f'Forecast for RiverNumber {river_number} on {date}',
width=800,
height=400
)
return chart
# Example usage:
river_number = 110123714 # Replace with actual river number
selected_date = '2024040100' # Replace with actual date
# Fetch forecast data
forecast_df = get_forecast_data(river_number, selected_date)
# Visualize forecast data if available
if not forecast_df.empty:
chart = visualize_forecast_data(forecast_df, river_number, selected_date)
solara.FigureAltair(chart)
else:
print(f"No data available for RiverNumber {river_number} on {selected_date}")