# reactive var version of https://py.cafe/maartenbreddels/solara-shared-state
import asyncio
from typing import Union, cast
import logging
import solara
import solara._stores
from anyio import to_thread
import httpx
logger = logging.getLogger("app")
# SharedStore is not documented yet, nor a stable API
btc = solara.Reactive(solara._stores.SharedStore(cast(Union[float, None], None)))
async def fetch_price():
await asyncio.sleep(10)
while 1:
try:
await asyncio.sleep(1)
# try:
async with httpx.AsyncClient() as client:
url = "https://api.binance.com/api/v1/ticker/price?symbol=BTCUSDT"
response = await client.get(url)
btc.value = float(response.json()["price"])
print("btc.value", btc.value)
except Exception as e:
print(e, type(e))
logger.exception("error in fetch_price")
# NOTE: on reload, this will not be stopped, requires https://github.com/widgetti/solara/pull/843
asyncio.create_task(fetch_price())
@solara.component
def Page():
print("render btc.value", btc.value)
with solara.Card("BTC price"):
if btc.value is not None:
solara.Preformatted(f"$ {btc.value:,}")
else:
solara.ProgressLinear()