import websocket
import json
import threading
import ssl
import certifi
import time
import pandas as pd
import numpy as np
import plotly.graph_objs as go
import dash
from dash import dcc, html
from dash.dependencies import Output, Input
from collections import defaultdict, deque
class MarketDataProcessor:
def __init__(self, symbol='ethusdt', max_length=50000):
self.symbol = symbol
self.max_length = max_length
self.trades = pd.DataFrame(columns=['timestamp', 'price', 'quantity', 'trade_type'])
self.order_flow_stats = {'buy_volume': 0, 'sell_volume': 0, 'total_volume': 0, 'order_imbalance': 0, 'vwap': 0}
self.cumulative_volume_delta = 0
self.cvd_history = deque(maxlen=1000)
self.order_book = {'bids': defaultdict(float), 'asks': defaultdict(float)}
self.price_buckets = defaultdict(lambda: {'buy_volume': 0, 'sell_volume': 0})
def process_trade(self, trade_data):
timestamp = pd.Timestamp.fromtimestamp(trade_data['T'] / 1000)
price = float(trade_data['p'])
quantity = float(trade_data['q'])
trade_type = 'sell' if trade_data['m'] else 'buy'
new_trade = pd.DataFrame([{'timestamp': timestamp, 'price': price, 'quantity': quantity, 'trade_type': trade_type}])
self.trades = pd.concat([self.trades, new_trade], ignore_index=True).tail(self.max_length)
self._update_order_flow_stats(price, quantity, trade_type)
self._update_cumulative_volume_delta(quantity, trade_type)
self._update_price_buckets(price, quantity, trade_type)
def _update_order_flow_stats(self, price, quantity, trade_type):
if trade_type == 'buy':
self.order_flow_stats['buy_volume'] += quantity
else:
self.order_flow_stats['sell_volume'] += quantity
self.order_flow_stats['total_volume'] += quantity
if self.order_flow_stats['total_volume'] > 0:
self.order_flow_stats['order_imbalance'] = (
self.order_flow_stats['buy_volume'] - self.order_flow_stats['sell_volume']
) / self.order_flow_stats['total_volume']
self.order_flow_stats['vwap'] = (
self.trades['price'] * self.trades['quantity']
).sum() / self.trades['quantity'].sum()
def _update_cumulative_volume_delta(self, quantity, trade_type):
self.cumulative_volume_delta += quantity if trade_type == 'buy' else -quantity
self.cvd_history.append((self.trades['timestamp'].iloc[-1], self.cumulative_volume_delta))
def _update_price_buckets(self, price, quantity, trade_type):
price_bucket = round(price, 2)
if trade_type == 'buy':
self.price_buckets[price_bucket]['buy_volume'] += quantity
else:
self.price_buckets[price_bucket]['sell_volume'] += quantity
def get_market_summary(self):
return {
'total_volume': self.order_flow_stats['total_volume'],
'buy_volume': self.order_flow_stats['buy_volume'],
'sell_volume': self.order_flow_stats['sell_volume'],
'order_imbalance': self.order_flow_stats['order_imbalance'],
'vwap': self.order_flow_stats['vwap'],
'cvd_history': list(self.cvd_history),
'price_buckets': dict(self.price_buckets)
}
class CryptoDashboard:
def __init__(self, symbol='ethusdt'):
self.symbol = symbol
self.base_url = "wss://stream.binance.com:9443/ws"
self.market_processor = MarketDataProcessor(symbol)
self._connect_websocket()
self.app = self._create_dashboard()
def _connect_websocket(self):
def on_message(ws, message):
trade_data = json.loads(message)
if trade_data.get('e') == 'trade':
self.market_processor.process_trade(trade_data)
def on_error(ws, error):
print(f"WebSocket Error: {error}")
def on_close(ws, close_status_code, close_msg):
print(f"WebSocket closed: {close_msg}")
def run_websocket():
while True:
try:
ws = websocket.WebSocketApp(
f"{self.base_url}/{self.symbol.lower()}@trade",
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ssl_context = ssl.create_default_context(cafile=certifi.where())
ws.run_forever(sslopt={"context": ssl_context})
except Exception as e:
print(f"WebSocket connection error: {e}")
time.sleep(5)
threading.Thread(target=run_websocket, daemon=True).start()
def _create_dashboard(self):
app = dash.Dash(__name__)
app.layout = html.Div([
html.H1(f'{self.symbol.upper()} Market Dashboard'),
html.Div(id='market-summary'),
dcc.Graph(id='volume-pie-chart'),
dcc.Graph(id='cvd-chart'),
dcc.Graph(id='dom-heatmap'),
dcc.Graph(id='volume-profile'),
dcc.Interval(id='interval-component', interval=2*1000, n_intervals=0)
])
@app.callback(
[
Output('market-summary', 'children'),
Output('volume-pie-chart', 'figure'),
Output('cvd-chart', 'figure'),
Output('dom-heatmap', 'figure'),
Output('volume-profile', 'figure')
],
[Input('interval-component', 'n_intervals')]
)
def update_dashboard(n):
market_summary = self.market_processor.get_market_summary()
# Market Summary Text
market_summary_text = html.Div([
html.P(f"Total Volume: {market_summary['total_volume']}"),
html.P(f"Buy Volume: {market_summary['buy_volume']}"),
html.P(f"Sell Volume: {market_summary['sell_volume']}"),
html.P(f"Order Imbalance: {market_summary['order_imbalance']:.2f}"),
html.P(f"VWAP: {market_summary['vwap']:.2f}")
])
# Volume Pie Chart
volume_pie_chart = go.Figure(
data=[go.Pie(
labels=['Buy Volume', 'Sell Volume'],
values=[market_summary['buy_volume'], market_summary['sell_volume']],
hole=0.4
)]
)
volume_pie_chart.update_layout(title="Volume Distribution")
# Cumulative Volume Delta Chart
timestamps, cvd_values = zip(*market_summary['cvd_history']) if market_summary['cvd_history'] else ([], [])
cvd_chart = go.Figure(
data=[go.Scatter(x=timestamps, y=cvd_values, mode='lines', name='CVD')]
)
cvd_chart.update_layout(title="Cumulative Volume Delta (CVD)", xaxis_title="Time", yaxis_title="CVD")
# DOM Heatmap
dom_data = market_summary['price_buckets']
dom_heatmap = go.Figure(
data=go.Heatmap(
z=[
[dom_data[price]['buy_volume'], dom_data[price]['sell_volume']]
for price in sorted(dom_data.keys())
],
x=['Buy Volume', 'Sell Volume'],
y=[f"{price}" for price in sorted(dom_data.keys())],
colorscale='Viridis'
)
)
dom_heatmap.update_layout(title="Depth of Market (DOM) Heatmap", xaxis_title="Order Type", yaxis_title="Price Level")
# Volume Profile
volume_profile = go.Figure()
for price in sorted(dom_data.keys()):
volume_profile.add_trace(
go.Bar(
x=[price],
y=[dom_data[price]['buy_volume']],
name='Buy Volume',
orientation='v'
)
)
volume_profile.add_trace(
go.Bar(
x=[price],
y=[dom_data[price]['sell_volume']],
name='Sell Volume',
orientation='v'
)
)
volume_profile.update_layout(
barmode='stack',
title="Volume Profile",
xaxis_title="Price Level",
yaxis_title="Volume"
)
return market_summary_text, volume_pie_chart, cvd_chart, dom_heatmap, volume_profile
return app
if __name__ == "__main__":
dashboard = CryptoDashboard(symbol="ethusdt")
dashboard.app.run_server(debug=True)