-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Expand file tree
/
Copy pathopenai_conversations_session.py
More file actions
127 lines (104 loc) · 4.52 KB
/
openai_conversations_session.py
File metadata and controls
127 lines (104 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from __future__ import annotations
from openai import AsyncOpenAI
from agents.models._openai_shared import get_default_openai_client
from ..items import TResponseInputItem
from .session import SessionABC
from .session_settings import SessionSettings, resolve_session_limit
async def start_openai_conversations_session(openai_client: AsyncOpenAI | None = None) -> str:
_maybe_openai_client = openai_client
if openai_client is None:
_maybe_openai_client = get_default_openai_client() or AsyncOpenAI()
# this never be None here
_openai_client: AsyncOpenAI = _maybe_openai_client # type: ignore [assignment]
response = await _openai_client.conversations.create(items=[])
return response.id
class OpenAIConversationsSession(SessionABC):
_server_managed_conversation_session = True
session_settings: SessionSettings | None = None
def __init__(
self,
*,
conversation_id: str | None = None,
openai_client: AsyncOpenAI | None = None,
session_settings: SessionSettings | None = None,
):
self._session_id: str | None = conversation_id
self.session_settings = session_settings or SessionSettings()
_openai_client = openai_client
if _openai_client is None:
_openai_client = get_default_openai_client() or AsyncOpenAI()
# this never be None here
self._openai_client: AsyncOpenAI = _openai_client
@property
def session_id(self) -> str:
"""Get the session ID (conversation ID).
Returns:
The conversation ID for this session.
Raises:
ValueError: If the session has not been initialized yet.
Call any session method (get_items, add_items, etc.) first
to trigger lazy initialization.
"""
if self._session_id is None:
raise ValueError(
"Session ID not yet available. The session is lazily initialized "
"on first API call. Call get_items(), add_items(), or similar first."
)
return self._session_id
@session_id.setter
def session_id(self, value: str) -> None:
"""Set the session ID (conversation ID)."""
self._session_id = value
async def _get_session_id(self) -> str:
if self._session_id is None:
self._session_id = await start_openai_conversations_session(self._openai_client)
return self._session_id
async def _clear_session_id(self) -> None:
self._session_id = None
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
session_id = await self._get_session_id()
session_limit = resolve_session_limit(limit, self.session_settings)
all_items = []
if session_limit is None:
async for item in self._openai_client.conversations.items.list(
conversation_id=session_id,
order="asc",
):
# calling model_dump() to make this serializable
all_items.append(item.model_dump(exclude_unset=True))
else:
async for item in self._openai_client.conversations.items.list(
conversation_id=session_id,
limit=session_limit,
order="desc",
):
# calling model_dump() to make this serializable
all_items.append(item.model_dump(exclude_unset=True))
if session_limit is not None and len(all_items) >= session_limit:
break
all_items.reverse()
return all_items # type: ignore
async def add_items(self, items: list[TResponseInputItem]) -> None:
session_id = await self._get_session_id()
if not items:
return
await self._openai_client.conversations.items.create(
conversation_id=session_id,
items=items,
)
async def pop_item(self) -> TResponseInputItem | None:
session_id = await self._get_session_id()
items = await self.get_items(limit=1)
if not items:
return None
item_id: str = str(items[0]["id"]) # type: ignore [typeddict-item]
await self._openai_client.conversations.items.delete(
conversation_id=session_id, item_id=item_id
)
return items[0]
async def clear_session(self) -> None:
session_id = await self._get_session_id()
await self._openai_client.conversations.delete(
conversation_id=session_id,
)
await self._clear_session_id()