Py.Cafe

AIPHeX/

solara-therapeutic-chatbot

Therapeutic Chatbot with OpenAI

DocsPricing
  • app.py
  • requirements.txt
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# app.py — Solara therapeutic chatbot with advanced silence/typing handling

"""
A Solara web‑app that streams GPT‑4 responses and reacts compassionately to
silence or long typing pauses, following Dutch trauma‑informed guidelines.
"""

from __future__ import annotations

import asyncio
import os
import random
from datetime import datetime
from typing import List, cast

from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionMessageParam
from typing_extensions import TypedDict

import solara
import solara.lab

# ------------------------------------------------------------------
# UI helpers
# ------------------------------------------------------------------

# Monkey‑patch Solara's default placeholder text to Dutch
_original_TextField = solara.v.TextField

def _custom_TextField(*args, **kwargs):
    if kwargs.get("label") == "Type a message...":
        kwargs["label"] = "Typ een bericht..."
    return _original_TextField(*args, **kwargs)

solara.v.TextField = _custom_TextField

# ------------------------------------------------------------------
# Reactive state
# ------------------------------------------------------------------

class MessageDict(TypedDict):
    role: str  # "user" | "assistant" | "system"
    content: str

messages: solara.Reactive[List[MessageDict]] = solara.reactive([])
input_text: solara.Reactive[str] = solara.reactive("")

# ------------------------------------------------------------------
# OpenAI setup
# ------------------------------------------------------------------
try:
    import pycafe  # type: ignore

    OPENAI_API_KEY = pycafe.get_secret(
        "OPENAI_API_KEY",
        """You need an OpenAI API key to run the chatbot.\n""",
    )
except ModuleNotFoundError:
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

openai = AsyncOpenAI(api_key=OPENAI_API_KEY) if OPENAI_API_KEY else None

# ------------------------------------------------------------------
# Constants & timer‑related globals
# ------------------------------------------------------------------
APP_START_TIME = datetime.now()

inactivity_task: asyncio.Task | None = None  # 30‑60 s silence
typing_task: asyncio.Task | None = None      # 2‑3 min typing pause
followup_task: asyncio.Task | None = None    # post‑invitation scenarios

# ------------------------------------------------------------------
# Helper functions
# ------------------------------------------------------------------

def next_silence_threshold() -> int:  # 30‑60 s
    return random.randint(30, 60)

def next_typing_threshold() -> int:  # 2‑3 min
    return random.randint(120, 180)

def nl_time_system_message() -> MessageDict:
    """Return current local timestamp as system message."""
    return {
        "role": "system",
        "content": f"Current NL time: {datetime.now():%Y-%m-%d %H:%M:%S}",
    }

def no_api_key_message():
    messages.value = [
        {
            "role": "assistant",
            "content": "No OpenAI API key found. Please set OPENAI_API_KEY.",
        }
    ]

# ------------------------------------------------------------------
# Streaming wrapper
# ------------------------------------------------------------------

def _append_system_and_stream(prompt: str):
    """Append a system prompt then stream GPT‑4 reply into messages."""
    messages.value = [*messages.value, {"role": "system", "content": prompt}]
    if openai is None:
        no_api_key_message()
        return

    async def _stream():
        response = await openai.chat.completions.create(
            model="gpt-4-1106-preview",
            messages=cast(List[ChatCompletionMessageParam], messages.value),
            stream=True,
            temperature=0.5,
        )
        messages.value = [*messages.value, {"role": "assistant", "content": ""}]
        async for chunk in response:
            if chunk.choices[0].finish_reason == "stop":
                break
            delta = chunk.choices[0].delta.content or ""
            messages.value[-1]["content"] += delta
    asyncio.create_task(_stream())

# ------------------------------------------------------------------
# Silence / typing invitation timers
# ------------------------------------------------------------------

def restart_inactivity_timer():
    """(Re)start the 30‑60 s silence detector."""
    global inactivity_task
    if inactivity_task and not inactivity_task.done():
        inactivity_task.cancel()

    T = next_silence_threshold()
    user_msgs_before = sum(1 for m in messages.value if m["role"] == "user")

    async def _watch():
        await asyncio.sleep(T)
        # if user spoke meanwhile, abort
        if any(m["role"] == "user" for m in messages.value[user_msgs_before:]):
            return
        typed_since = bool(input_text.value.strip())

        # ---------- build invitation prompt -----------------------
        if user_msgs_before == 0 and not typed_since:  # opening silence
            invite = (
                f"Note to the therapeutic assistant: about {T} seconds of silence "
                "have passed since the session start and the client hasn't "
                "typed. Craft a gentle Dutch opener: validate that starting can "
                "feel awkward, silence is welcome, you're here whenever they're "
                "ready, optionally add a light personal aside. Keep it warm." 
            )
        elif user_msgs_before == 0 and typed_since:  # opening typing pause
            invite = (
                f"Note to the therapeutic assistant: the client has been typing "
                f"for about {T} seconds without sending their first message. "
                "Write a Dutch reply acknowledging it takes courage to press "
                "send, reassure there's no rush, optionally a light personal "
                "aside."
            )
        elif user_msgs_before > 0 and not typed_since:  # mid‑conversation silence
            invite = (
                f"Note to the therapeutic assistant: about {T} seconds of silence "
                "since the client's last message, no typing activity. "
                "Reply in Dutch: briefly reference their last share, validate the "
                "pause, invite noticing body sensations and which system might "
                "be active; vary wording if similar pauses happened before." 
            )
        else:  # mid‑conversation typing pause
            invite = (
                f"Note to the therapeutic assistant: the client has been typing "
                f"for about {T} seconds without sending. "
                "Reply in Dutch: note it's fine to take time, reflect last topic, "
                "offer simple way to answer (bullet points, feeling word), keep "
                "tone supportive; vary wording if repeat event."
            )

        # send invitation
        _append_system_and_stream(invite)
        # schedule deeper follow‑ups
        schedule_no_reply_followups("silence", user_msgs_before)

    inactivity_task = asyncio.create_task(_watch())


def restart_typing_timer():
    """(Re)start the 2‑3 min long‑typing detector."""
    global typing_task
    if typing_task and not typing_task.done():
        typing_task.cancel()

    T = next_typing_threshold()
    user_msgs_before = sum(1 for m in messages.value if m["role"] == "user")

    async def _watch():
        await asyncio.sleep(T)
        if not input_text.value.strip():
            return  # user cleared field
        # no new sent message?
        if any(m["role"] == "user" for m in messages.value[user_msgs_before:]):
            return

        # This counts as a typing‑pause invitation, so reuse inactivity logic
        if user_msgs_before == 0:
            invite = (
                f"Note to the therapeutic assistant: the client has been typing "
                f"for about {T} seconds without sending the first message. "
                "Dutch reply: acknowledge courage, reassure unlimited time, "
                "optional light aside."
            )
        else:
            invite = (
                f"Note to the therapeutic assistant: about {T} seconds of typing "
                "without sending, mid‑conversation. Dutch reply: it's okay to "
                "take time, reflect last topic, suggest easier response path." 
            )

        _append_system_and_stream(invite)
        schedule_no_reply_followups("typing", user_msgs_before)

    typing_task = asyncio.create_task(_watch())

# ------------------------------------------------------------------
# Follow‑up chain after invitations
# ------------------------------------------------------------------

def schedule_no_reply_followups(invitation_kind: str, user_msgs_before: int):
    """Handle 3‑4 min silence after an invitation and subsequent wrap‑up."""
    global followup_task
    if followup_task and not followup_task.done():
        followup_task.cancel()

    T_first = random.randint(180, 240)  # 3‑4 min

    async def _follow():
        await asyncio.sleep(T_first)
        # Did user speak meanwhile?
        if any(m["role"] == "user" for m in messages.value[user_msgs_before:]):
            return
        typed_since_invite = bool(input_text.value.strip())

        # ---------------- choose scenario prompt -------------------
        if user_msgs_before == 0 and not typed_since_invite:  # 1A
            prompt = (
                "Therapeutic system note · Scenario 1A\n"
                "Context: ~3‑4 min of full silence since invitation; no typing.\n\n"
                "Respond in Dutch: acknowledge the quiet as okay and even "
                "beautiful; reassure you're here whenever needed; optionally "
                "share a brief casual aside (e.g. 'ik schenk nog wat thee'). "
                "After this stay dormant until client responds."
            )
            _append_system_and_stream(prompt)
            return

        if user_msgs_before == 0 and typed_since_invite:  # 1B
            prompt = (
                "Therapeutic system note · Scenario 1B\n"
                "Context: Client typed but never sent first message (~3‑4 min).\n\n"
                "Dutch reply: validate that finding words takes courage, there's "
                "all the time in the world; optional light aside; remain dormant "
                "afterwards until client responds."
            )
            _append_system_and_stream(prompt)
            return

        # mid‑conversation variants
        if user_msgs_before > 0 and not typed_since_invite:  # 2A
            prompt = (
                "Therapeutic system note · Scenario 2A\n"
                "Context: Previous messages exist; client silent (~3‑4 min), "
                "no typing.\n\n"
                "Dutch: briefly reference last topic, validate pause, invite body "
                "signals & system check, optional brief aside. If still no reply "
                "after 1‑2 min ask about closing with summary; if still silent "
                "produce summary & ask for edits."
            )
            _append_system_and_stream(prompt)
            await _wrap_up_sequence(user_msgs_before)
            return

        # 2B
        prompt = (
            "Therapeutic system note · Scenario 2B\n"
            "Context: Previous messages exist; client typing for ~3‑4 min "
            "without sending.\n\n"
            "Dutch: note it's fine to take time, reflect last topic, suggest an "
            "easier way to respond; optional small suggestion/change of topic. "
            "Same wrap‑up flow as 2A if silence continues."
        )
        _append_system_and_stream(prompt)
        await _wrap_up_sequence(user_msgs_before)

    followup_task = asyncio.create_task(_follow())


async def _wrap_up_sequence(user_msgs_before: int):
    """Runs the 1‑2 min wrap‑up → summary chain for 2A/2B."""
    T_wrap = random.randint(60, 120)

    # ask about wrap‑up
    await asyncio.sleep(T_wrap)
    if any(m["role"] == "user" for m in messages.value[user_msgs_before:]):
        return
    wrap_prompt = (
        "Therapeutic system note · wrap‑up request\n"
        "No reply after gentle check‑in. Ask in Dutch, softly, if they'd like "
        "to finish with a short summary for their therapist (no extra details)."
    )
    _append_system_and_stream(wrap_prompt)

    # if still silent, send summary
    await asyncio.sleep(T_wrap)
    if any(m["role"] == "user" for m in messages.value[user_msgs_before:]):
        return
    summary_prompt = (
        "Therapeutic system note · produce summary\n"
        "Still no reply. Provide a concise Dutch summary (key feelings, "
        "interventions discussed, next steps), then ask if they want to add or "
        "change anything."
    )
    _append_system_and_stream(summary_prompt)

# ------------------------------------------------------------------
# System instructions & initialisation
# ------------------------------------------------------------------

SYSTEM_INSTRUCTIONS = """<the long metaprompt from the original code>"""
# ensure first system message is instructions
if not messages.value or messages.value[0]["role"] != "system":
    messages.value.insert(0, {"role": "system", "content": SYSTEM_INSTRUCTIONS})
elif messages.value[0]["content"] != SYSTEM_INSTRUCTIONS:
    messages.value[0] = {"role": "system", "content": SYSTEM_INSTRUCTIONS}

# ------------------------------------------------------------------
# Main send callback
# ------------------------------------------------------------------

@solara.lab.task
async def promt_ai(message: str):
    if openai is None:
        no_api_key_message()
        return

    # cancel ongoing timers because user spoke
    for t in (inactivity_task, typing_task, followup_task):
        if t and not t.done():
            t.cancel()

    # clear typing field
    input_text.set("")

    # add user message plus timestamp
    messages.value = [
        *messages.value,
        nl_time_system_message(),
        {"role": "user", "content": message},
    ]

    # stream assistant reply
    response = await openai.chat.completions.create(
        model="gpt-4-1106-preview",
        messages=cast(List[ChatCompletionMessageParam], messages.value),
        stream=True,
        temperature=0.5,
    )
    messages.value = [*messages.value, {"role": "assistant", "content": ""}]

    async for chunk in response:
        if chunk.choices[0].finish_reason == "stop":
            break
        delta = chunk.choices[0].delta.content or ""
        messages.value[-1]["content"] += delta

    # restart silence detector after assistant finishes
    restart_inactivity_timer()

# ------------------------------------------------------------------
# Solara page
# ------------------------------------------------------------------

@solara.component
def Page():
    with solara.Column(style={"width": "100%", "height": "50vh"}):
        with solara.lab.ChatBox():
            for item in messages.value:
                if item["role"] == "system":
                    continue
                with solara.lab.ChatMessage(
                    user=item["role"] == "user",
                    avatar=False,
                    name="Kompassie" if item["role"] == "assistant" else "Gebruiker",
                    color="rgba(0,0,0,0.06)" if item["role"] == "assistant" else "#ff991f",
                    avatar_background_color="primary" if item["role"] == "assistant" else None,
                    border_radius="20px",
                ):
                    solara.Markdown(item["content"])
        if promt_ai.pending:
            solara.Text("Ik denk erover na...", style={"font-size": "1rem", "padding-left": "20px"})
            solara.ProgressLinear()

        # ChatInput bound to reactive input_text
        solara.lab.ChatInput(
            value=input_text,
            on_value=lambda v: (
                input_text.set(v),
                restart_typing_timer() if v.strip() else None
            ),
            send_callback=promt_ai,
            disabled_send=promt_ai.pending,
            autofocus=True,
        ).key("input")

# schedule first inactivity timer when app starts
restart_inactivity_timer()