Py.Cafe

Vizro Iris Dataset Visualizations

DocsPricing
  • app.py
  • requirements.txt
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""everything related to vizro visualization in one single file"""

import os
import psycopg2
import pandas as pd
import vizro.plotly.express as px
from vizro import Vizro
import vizro.models as vm

# /**********
#  * CONFIG *
#  **********/
SOFTWARE_ENVIRONMENT_VARIABLE = os.getenv("SOFTWARE_ENVIRONMENT_VARIABLE", "PRODUCTION")
if SOFTWARE_ENVIRONMENT_VARIABLE == "PRODUCTION":
    DEBUG = False
else:
    DEBUG = True


db_config = {
    "host": os.getenv("DB_HOST", "localhost"),
    "database": "mark_db",
    "user": os.getenv("DB_USER", "your_username"),
    "password": os.getenv("DB_PASSWORD", "your_password"),
    "port": 5432,
}
print(f"Connecting to database at {db_config['host']} with user {db_config['user']}")

# /*************
#  * UTILITIES *
#  *************/


def get_equity_data(
    account_id: int,
    host: str = "localhost",
    database: str = "your_database",
    user: str = "your_username",
    password: str = "your_password",
    port: int = 5432,
) -> pd.DataFrame:
    """
    从PostgreSQL数据库获取指定账户的权益记录数据

    Args:
        account_id (int): 账户ID
        host (str): 数据库主机地址
        database (str): 数据库名称
        user (str): 用户名
        password (str): 密码
        port (int): 端口号,默认5432

    Returns:
        pd.DataFrame: 包含equity_value和record_time列的DataFrame

    Raises:
        psycopg2.Error: 数据库连接或查询错误
        Exception: 其他异常
    """
    connection = None
    cursor = None

    try:
        # 建立数据库连接
        connection = psycopg2.connect(
            host=host, database=database, user=user, password=password, port=port
        )

        # 创建游标
        cursor = connection.cursor()

        # 执行SQL查询
        sql_query = """
        SELECT equity_value, record_time 
        FROM onlineequityrecord 
        WHERE account_id = %s
        ORDER BY record_time
        """

        cursor.execute(sql_query, (account_id,))

        # 获取查询结果
        results = cursor.fetchall()

        # 获取列名
        column_names = [desc[0] for desc in cursor.description]

        # 创建DataFrame
        df = pd.DataFrame(results, columns=column_names)

        print(f"成功获取账户 {account_id} 的 {len(df)} 条记录")

        return df

    except psycopg2.Error as e:
        print(f"数据库错误: {e}")
        raise
    except Exception as e:
        print(f"其他错误: {e}")
        raise
    finally:
        # 关闭游标和连接
        if cursor:
            cursor.close()
        if connection:
            connection.close()


# /**************
#  * MAIN LOGIC *
#  **************/

account_0_data: pd.DataFrame = get_equity_data(account_id=0, **db_config)
total_memory = account_0_data.memory_usage(deep=True).sum()
print(f"总内存使用: {total_memory / (1024*1024):.2f} MB")
account_0_data["date"] = pd.to_datetime(account_0_data["record_time"].dt.date)


page = vm.Page(
    title="Graph without ModeBar",
    components=[
        vm.Graph(
            title="Equity Curve Account 0",
            figure=px.line(account_0_data, x="record_time", y="equity_value"),
        ),
    ],
    controls=[
        vm.Filter(
            column="date",
            selector=vm.DatePicker(title="选择日期范围 Account 0", range=True),
        ),
    ],
)

dashboard = vm.Dashboard(pages=[page])
Vizro().build(dashboard).run(debug=DEBUG)