...
要先拿到 Assistant ID & User API key
expot API KEY,ASSISTANT_ID 以及你的 input
程式碼區塊 language bash export ASSISTANT_ID="YOUR ASSISTANT ID" export API_KEY="YOUR API KEY" export IMAGE_URL="YOUR IMAGE URL HEHE"
IMAGE_URL 格式參考上方 Gradio image 範例
執行以下腳本 (請先確保環境有安裝 jq 套件)
mac: brew install jq
ubuntu: apt-get install jq
cent os: yum install jq
程式碼區塊 BASE_URL="https://prod.dvcbot.net/api/assts/v1" # create thread AUTH_HEADER="Authorization: Bearer ${API_KEY}" THREAD_URL="${BASE_URL}/threads" THREAD_ID=`curl -s --location "${THREAD_URL}" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}" \ --data '{}' | jq .id | tr -d '"'` # add msg to thread CREATE_MSG_DATA=$(< <(cat <<EOF { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": "$IMAGE_URL" } } ] } EOF )) MSG_URL="${BASE_URL}/threads/${THREAD_ID}/messages" curl -s --location "${MSG_URL}" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}" \ --data "${CREATE_MSG_DATA}" > /dev/null # run the assistant within thread CREATE_RUN_DATA=$(< <(cat <<EOF { "assistant_id": "$ASSISTANT_ID", "additional_instructions": "The current time is: `date '+%Y-%m-%d %H:%M:%S'`" } EOF )) RUN_URL="${BASE_URL}/threads/${THREAD_ID}/runs" RUN_ID=`curl -s --location "${RUN_URL}" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}" \ --data "${CREATE_RUN_DATA}" | jq .id | tr -d '"'` # get run result RUN_STAUS="" while [[ $RUN_STAUS != "completed" ]] do RESP=`curl -s --location --request GET "${RUN_URL}/$RUN_ID" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}"` RUN_STAUS=`echo $RESP| jq .status | tr -d '"'`; REQUIRED_ACTION=`echo $RESP| jq .required_action` while [[ $RUN_STAUS = "requires_action" ]] && [[ ! -z "$REQUIRED_ACTION" ]] do TOOL_OUTPUTS='[]' LEN=$( echo "$REQUIRED_ACTION" | jq '.submit_tool_outputs.tool_calls | length' ) for (( i=0; i<$LEN; i++ )) do FUNC_NAME=`echo "$REQUIRED_ACTION" | jq ".submit_tool_outputs.tool_calls[$i].function.name" | tr -d '"'` ARGS=`echo "$REQUIRED_ACTION" | jq ".submit_tool_outputs.tool_calls[$i].function.arguments"` ARGS=${ARGS//\\\"/\"} ARGS=${ARGS#"\""} ARGS=${ARGS%"\""} PLUGINAPI_URL="${BASE_URL}/pluginapi?tid=${THREAD_ID}&aid=${ASSISTANT_ID}&pid=${FUNC_NAME}" OUTPUT=`curl -s --location "${PLUGINAPI_URL}" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}" \ --data "${ARGS}"` OUTPUT="${OUTPUT:0:8000}" OUTPUT=${OUTPUT//\"/\\\"} CALL_ID=`echo "$REQUIRED_ACTION" | jq ".submit_tool_outputs.tool_calls[$i].id" | tr -d '"'` TOOL_OUTPUT=$(< <(cat <<EOF { "tool_call_id": "$CALL_ID", "output": "$OUTPUT" } EOF )) TOOL_OUTPUTS=$(jq --argjson obj "$TOOL_OUTPUT" '. += [$obj]' <<< "$TOOL_OUTPUTS") done SUBMIT_TOOL_OUTPUT_RUN_RUL="${BASE_URL}/threads/${THREAD_ID}/runs/${RUN_ID}/submit_tool_outputs" TOOL_OUTPUTS_DATA=$(< <(cat <<EOF { "tool_outputs": $TOOL_OUTPUTS } EOF )) curl -s --location "${SUBMIT_TOOL_OUTPUT_RUN_RUL}" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}" \ --data "${TOOL_OUTPUTS_DATA}" > /dev/null RESP=`curl -s --location --request GET "${RUN_URL}/$RUN_ID" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}"` RUN_STAUS=`echo $RESP| jq .status | tr -d '"'`; sleep 1 done sleep 1 done #list msg RESPONSE_MSG=`curl -s --location --request GET "${MSG_URL}" \ --header 'OpenAI-Beta: assistants=v2' \ --header 'Content-Type: application/json' \ --header "${AUTH_HEADER}" | jq .data[0].content[].text.value` echo "" echo "davinci bot: "$RESPONSE_MSG
即可看到結果如下
程式碼區塊 davinci bot: "response from assistant"
Python
Text or
...
image as Input
程式碼區塊 | ||
---|---|---|
| ||
import json
from openai import OpenAI
from datetime import datetime
ASSISTANT_API = 'https://prod.dvcbot.net/api/assts/v1'
API_KEY = ''
client = OpenAI(
base_url=ASSISTANT_API,
api_key=API_KEY,
)
ASSISTANT_ID = ''
# 定義多個訊息
messages = [
{"type": "text", "text": "tell me about the image"},
{"type": "image_url", "image_url": {"url": "https://xxx.xxx.xxx.jpg"}},
{"type": "text", "text": "What do you think about this image?"}
]
# 建立 thread
thread = client.beta.threads.create(messages=[])
# 連續發送訊息
for message in messages:
client.beta.threads.messages.create(thread_id=thread.id, role='user', content=[message])
# 執行 assistant
run = client.beta.threads.runs.create_and_poll(thread_id=thread.id, assistant_id=ASSISTANT_ID, additional_instructions=f"\nThe current time is: {datetime.now()}")
while run.status == 'requires_action' and run.required_action:
outputs = []
for call in run.required_action.submit_tool_outputs.tool_calls:
resp = client._client.post(ASSISTANT_API + '/pluginapi', params={"tid": thread.id, "aid": ASSISTANT_ID, "pid": call.function.name}, headers={"Authorization": "Bearer " + API_KEY}, json=json.loads(call.function.arguments))
outputs.append({"tool_call_id": call.id, "output": resp.text[:8000]})
run = client.beta.threads.runs.submit_tool_outputs_and_poll(run_id=run.id, thread_id=thread.id, tool_outputs=outputs)
if run.status == 'failed' and run.last_error:
print(run.last_error.model_dump_json())
msgs = client.beta.threads.messages.list(thread_id=thread.id, order='desc')
client.beta.threads.delete(thread_id=thread.id)
print(msgs.data[0].content[0].text.value) |
Text & image as input (Streaming)
程式碼區塊 | ||
---|---|---|
| ||
import json
import uuid
import traceback
from openai import OpenAI
from typing_extensions import override
from openai import AssistantEventHandler, OpenAI
from openai.types.beta.threads import Text, TextDelta
from openai.types.beta.threads.runs import ToolCall, ToolCallDelta
from openai.types.beta.threads import Message, MessageDelta
from openai.types.beta.threads.runs import ToolCall, RunStep
from openai.types.beta import AssistantStreamEvent
ASSISTANT_API='https://prod.dvcbot.net/api/assts/v1'
API_KEY='PLACE YOUR API KEY HERE'
client = OpenAI(
base_url=ASSISTANT_API,
api_key=API_KEY,
)
class EventHandler(AssistantEventHandler):
def __init__(self, thread_id, assistant_id):
super().__init__()
self.output = None
self.tool_id = None
self.thread_id = thread_id
self.assistant_id = assistant_id
self.run_id = None
self.run_step = None
self.function_name = ""
self.arguments = ""
@override
def on_text_created(self, text) -> None:
print(f"\nassistant on_text_created > ", end="", flush=True)
@override
def on_text_delta(self, delta, snapshot):
# print(f"\nassistant on_text_delta > {delta.value}", end="", flush=True)
print(f"{delta.value}")
@override
def on_end(self, ):
print(f"\n end assistant > ",self.current_run_step_snapshot, end="", flush=True)
@override
def on_exception(self, exception: Exception) -> None:
"""Fired whenever an exception happens during streaming"""
print(f"\nassistant > {exception}\n", end="", flush=True)
@override
def on_message_created(self, message: Message) -> None:
print(f"\nassistant on_message_created > {message}\n", end="", flush=True)
@override
def on_message_done(self, message: Message) -> None:
print(f"\nassistant on_message_done > {message}\n", end="", flush=True)
@override
def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:
# print(f"\nassistant on_message_delta > {delta}\n", end="", flush=True)
pass
def on_tool_call_created(self, tool_call):
# 4
print(f"\nassistant on_tool_call_created > {tool_call}")
self.function_name = tool_call.function.name
self.tool_id = tool_call.id
print(f"\on_tool_call_created > run_step.status > {self.run_step.status}")
print(f"\nassistant > {tool_call.type} {self.function_name}\n", flush=True)
keep_retrieving_run = client.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=self.run_id
)
while keep_retrieving_run.status in ["queued", "in_progress"]:
keep_retrieving_run = client.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=self.run_id
)
print(f"\nSTATUS: {keep_retrieving_run.status}")
@override
def on_tool_call_done(self, tool_call: ToolCall) -> None:
keep_retrieving_run = client.beta.threads.runs.retrieve(
thread_id=self.thread_id,
run_id=self.run_id
)
print(f"\nDONE STATUS: {keep_retrieving_run.status}")
if keep_retrieving_run.status == "completed":
all_messages = client.beta.threads.messages.list(
thread_id=current_thread.id
)
print(all_messages.data[0].content[0].text.value, "", "")
return
elif keep_retrieving_run.status == "requires_action":
print("here you would call your function")
if self.function_name == "SEARCH":
# ====
outputs = []
for call in keep_retrieving_run.required_action.submit_tool_outputs.tool_calls:
resp = client._client.post(ASSISTANT_API+'/pluginapi', params={"tid": self.thread_id, "aid": asst_id, "pid": call.function.name}, headers={"Authorization": "Bearer " + API_KEY}, json=json.loads(call.function.arguments))
outputs.append({"tool_call_id": call.id, "output": resp.text[:8000]})
self.output=outputs
with client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=self.thread_id,
run_id=self.run_id,
tool_outputs=self.output,
event_handler=EventHandler(self.thread_id, self.assistant_id)
) as stream:
stream.until_done()
else:
print("unknown function")
return
@override
def on_run_step_created(self, run_step: RunStep) -> None:
# 2
print(f"on_run_step_created")
self.run_id = run_step.run_id
self.run_step = run_step
print("The type ofrun_step run step is ", type(run_step), flush=True)
print(f"\n run step created assistant > {run_step}\n", flush=True)
@override
def on_run_step_done(self, run_step: RunStep) -> None:
print(f"\n run step done assistant > {run_step}\n", flush=True)
def on_tool_call_delta(self, delta, snapshot):
if delta.type == 'function':
# the arguments stream thorugh here and then you get the requires action event
print(delta.function.arguments, end="", flush=True)
self.arguments += delta.function.arguments
elif delta.type == 'code_interpreter':
print(f"on_tool_call_delta > code_interpreter")
if delta.code_interpreter.input:
print(delta.code_interpreter.input, end="", flush=True)
if delta.code_interpreter.outputs:
print(f"\n\noutput >", flush=True)
for output in delta.code_interpreter.outputs:
if output.type == "logs":
print(f"\n{output.logs}", flush=True)
else:
print("ELSE")
print(delta, end="", flush=True)
@override
def on_event(self, event: AssistantStreamEvent) -> None:
# print("In on_event of event is ", event.event, flush=True)
if event.event == "thread.run.requires_action":
print("\nthread.run.requires_action > submit tool call")
print(f"ARGS: {self.arguments}")
assistant = client.beta.assistants.create(
name='test example',
model='aide-gpt-4-turbo',
instructions="you are an assistant that will answer my question",
tools=[
{
"type": "function",
"function": {
"name": "SEARCH",
"description": "Search more knowledge or realtime information from the Internet to answer the user.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "object",
"properties": {
"q": {
"type": "string",
"description": "Query string to be searched for on the search engine. This should be infered from the user's question and the conversation. Please split the original user query completely into more than 5 closely related important keywords, which are devided by `space key` for searching. If searching site is specified by me, please gnerate it followed by a site:XXX.com"
},
"mkt": {
"type": "string",
"enum": [
"zh-TW",
"en-US"
],
"description": "The market that should be searched for. It should be aligned with the languages the role `user` adopts. E.g. the language #zh-TW maps with the `zh-TW` mkt option."
}
},
"required": [
"q",
"mkt"
]
},
"topk": {
"type": "string",
"description": "The number of search results, it should be set as a single integer number between 1~5."
}
},
"required": [
"query",
"topk"
]
}
}
}
],
metadata={
'backend_id': 'default'
}
)
asst_id=assistant.id
print(f"\nassistant created, id: ", asst_id, flush=True)
new_thread = client.beta.threads.create()
prompt = "中國在 2024 巴黎奧運的表現如何?"
client.beta.threads.messages.create(thread_id=new_thread.id, role="user", content=prompt)
with client.beta.threads.runs.create_and_stream(
thread_id=new_thread.id,
assistant_id=asst_id,
instructions=prompt,
event_handler=EventHandler(new_thread.id, asst_id),
) as stream:
stream.until_done() |
Voice mode
(Coming Soon)