import jsonasyncio
import json
uuid
import tracebackhttpx
from openai import AsyncOpenAI
OpenAI
from
typingASSISTANT_extensionsAPI import override
from openai import AssistantEventHandler, OpenAI
from openai.types.beta.threads import Text, TextDelta
from openai.types.beta.threads.runs import ToolCall, ToolCallDelta
from openai.types.beta.threads import Message, MessageDelta
from openai.types.beta.threads.runs import ToolCall, RunStep
from openai.types.beta import AssistantStreamEvent
ASSISTANT_API='https://prod.dvcbot.net/api/assts/v1'
API_KEY='PLACE YOUR API KEY HERE'
client = OpenAI(
base_url=ASSISTANT_API,
api_key=API_KEY,
)
class EventHandler(AssistantEventHandler):
def __init__(self, thread_id, assistant_id):
super().__init__()
self.output = None
self.tool_id = None
self.thread_id = thread_id
self.assistant_id = assistant_id
self.run_id = None
self.run_step = None
self.function_name = ""
self.arguments = ""
@override
def on_text_created(self, text) -> None:
print(f"\nassistant on_text_created > ", end="", flush=True)
@override
def on_text_delta(self, delta, snapshot):
# print(f"\nassistant on_text_delta > {delta.value}", end="", flush=True)
print(f"{delta.value}")
@override
def on_end(self, ):
print(f"\n end assistant > ",self.current_run_step_snapshot, end="", flush=True)
@override
def on_exception(self, exception: Exception) -> None:
"""Fired whenever an exception happens during streaming"""
print(f"\nassistant > {exception}\n", end="", flush=True)
@override
def on_message_created(self, message: Message) -> None:
print(f"\nassistant on_message_created > {message}\n", end="", flush=True)
@override
def on_message_done(self, message: Message) -> None:
print(f"\nassistant on_message_done > {message}\n", end="", flush=True)
@override
def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:
# print(f"\nassistant on_message_delta > {delta}\n", end="", flush=True)
pass
def on_tool_call_created(self, tool_call):
# 4
print(f"\nassistant on_tool_call_created > {tool_call}")
self.function_name = tool_call.function.name
self.tool_id = tool_call.id
print(f"\on_tool_call_created > run_step.status > {self.run_step.status}")
print(f"\nassistant > {tool_call.type} {self.function_name}\n", flush=True)
keep_retrieving_run = client.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=self.run_id
)
while keep_retrieving_run.status in ["queued", "in_progress"]:
keep_retrieving_run = client.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=self.run_id
)
print(f"\nSTATUS: {keep_retrieving_run.status}")
@override
def on_tool_call_done(self, tool_call: ToolCall) -> None:
keep_retrieving_run = client.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=self.run_id
)
print(f"\nDONE STATUS: {keep_retrieving_run.status}")
if keep_retrieving_run.status == "completed":
all_messages = client.beta.threads.messages.list(
thread_id=current_thread.id
)
print(all_messages.data[0].content[0].text.value, "", "")
return
elif keep_retrieving_run.status == "requires_action":
print("here you would call your function")
if self.function_name == "SEARCH":
# ====
outputs = []
for call in keep_retrieving_run.required_action.submit_tool_outputs.tool_calls:
resp = client._client.post(ASSISTANT_API+'/pluginapi', params={"tid": self.thread_id, "aid": asst_id, "pid": call.function.name}, headers={"Authorization": "Bearer " + API_KEY}, json=json.loads(call.function.arguments))
outputs.append({"tool_call_id": call.id, "output": resp.text[:8000]})
self.output=outputs
with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=self.thread_id,
run_id=self.run_id,
tool_outputs=self.output,
event_handler=EventHandler(self.thread_id, self.assistant_id)
) as stream:
stream.until_done()
else:
print("unknown function")
return
@override
def on_run_step_created(self, run_step: RunStep) -> None:
# 2
print(f"on_run_step_created")
self.run_id = run_step.run_id
self.run_step = run_step
print("The type ofrun_step run step is ", type(run_step), flush=True)
print(f"\n run step created assistant > {run_step}\n", flush=True)
@override
def on_run_step_done(self, run_step: RunStep) -> None:
print(f"\n run step done assistant > {run_step}\n", flush=True)
def on_tool_call_delta(self, delta, snapshot):
if delta.type == 'function':
# the arguments stream thorugh here and then you get the requires action event
print(delta.function.arguments, end="", flush=True)
self.arguments += delta.function.arguments
elif delta.type == 'code_interpreter':
print(f"on_tool_call_delta > code_interpreter")
if delta.code_interpreter.input:
print(delta.code_interpreter.input, end="", flush=True)
if delta.code_interpreter.outputs:
print(f"\n\noutput >", flush=True)
for output in delta.code_interpreter.outputs:
if output.type == "logs":
print(f"\n{output.logs}", flush=True)
else:
print("ELSE")
print(delta, end="", flush=True)
@override
def on_event(self, event: AssistantStreamEvent) -> None:
# print("In on_event of event is ", event.event, flush=True)
if event.event == "thread.run.requires_action":
print("\nthread.run.requires_action > submit tool call")
print(f"ARGS: {self.arguments}")
assistant = client.beta.assistants.create(
name='test example',
model='aide-gpt-4-turbo',
instructions="you are an assistant that will answer my question",
tools=[
{
"type": "function",
"function": {
"name": "SEARCH",
"description": "Search more knowledge or realtime information from the Internet to answer the user.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "object",
"properties": {
"q": {
"type": "string",
"description": "Query string to be searched for on the search engine. This should be infered from the user's question and the conversation. Please split the original user query completely into more than 5 closely related important keywords, which are devided by `space key` for searching. If searching site is specified by me, please gnerate it followed by a site:XXX.com"
},
"mkt": {
"type": "string",
"enum": [
"zh-TW",
"en-US"
],
"description": "The market that should be searched for. It should be aligned with the languages the role `user` adopts. E.g. the language #zh-TW maps with the `zh-TW` mkt option."
= "https://prod.dvcbot.net/api/assts/v1"
API_KEY = ""
ASSISTANT_ID = ""
USER_PROMPT = "從一數到一千"
async def main():
http_client = httpx.AsyncClient(verify=False)
client = AsyncOpenAI(base_url=ASSISTANT_API, api_key=API_KEY, http_client=http_client)
thread = await client.beta.threads.create()
user_prompt = USER_PROMPT
await client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=user_prompt,
} )
stream = await client.beta.threads.runs.create(
assistant_id=ASSISTANT_ID,
}, thread_id=thread.id,
stream=True,
)
requires_action_run_id = "required":
[ async for event in stream:
if event.event == "thread.message.delta":
"q", print(event)
elif event.event == "thread.run.requires_action":
"mkt" requires_action_run_id = event.data.id
if requires_action_run_id != "":
run = ]
await client.beta.threads.runs.retrieve(requires_action_run_id, thread_id=thread.id)
outputs = []
}, for call in run.required_action.submit_tool_outputs.tool_calls:
print(f"call plugin {call.function.name} with "topk"args: {call.function.arguments}")
resp = await client._client.post(
"type": "string", ASSISTANT_API + "/pluginapi",
params={"tid": thread.id, "descriptionaid": ASSISTANT_ID, "pid"The number of search results, it should be set as a single integer number between 1~5."
: call.function.name},
headers={"Authorization": "Bearer " + API_KEY},
} json=json.loads(call.function.arguments),
}timeout=30,
)
"required": [ result = resp.text[:8000]
"query", print(f"plugin {call.function.name} result {result}")
"topk"
outputs.append({"tool_call_id": call.id, "output": result})
stream = await client.beta.threads.runs.submit_tool_outputs(
] run_id=run.id,
} stream=True,
} thread_id=thread.id,
} ], metadata={tool_outputs=outputs,
'backend_id': 'default')
} ) asst_id=assistant.id
print(f"\nassistant created, id: ", asst_id, flush=True)
new_thread = client.beta.threads.create()
prompt = "中國在 2024 巴黎奧運的表現如何?"
client.beta.threads.messages.create(thread_id=new_thread.id, role="user", content=prompt)
with async for event in stream:
print(event)
await client.beta.threads.runs.create_and_stream(
delete(thread_id=new_thread.id,)
if assistant_id=asst_id,
instructions=prompt,
event_handler=EventHandler(new_thread.id, asst_id),
) as stream__name__ == "__main__":
stream.until_done(asyncio.run(main()) |