已比較的版本

索引鍵

  • 此行已新增。
  • 此行已移除。
  • 格式已變更。

目錄
stylenone

MediaTek DaVinci Assistant API 介紹

...

Text & image as input (Streaming)

程式碼區塊
languagepy
import jsonasyncio
import json
uuid
import tracebackhttpx

from openai import AsyncOpenAI
OpenAI
from
typingASSISTANT_extensionsAPI import override
from openai import AssistantEventHandler, OpenAI
from openai.types.beta.threads import Text, TextDelta
from openai.types.beta.threads.runs import ToolCall, ToolCallDelta
from openai.types.beta.threads import Message, MessageDelta
from openai.types.beta.threads.runs import ToolCall, RunStep
from openai.types.beta import AssistantStreamEvent
 

ASSISTANT_API='https://prod.dvcbot.net/api/assts/v1'
API_KEY='PLACE YOUR API KEY HERE'
client = OpenAI(
    base_url=ASSISTANT_API,
    api_key=API_KEY,
)

class EventHandler(AssistantEventHandler):
   def __init__(self, thread_id, assistant_id):
       super().__init__()
       self.output = None
       self.tool_id = None
       self.thread_id = thread_id
       self.assistant_id = assistant_id
       self.run_id = None
       self.run_step = None
       self.function_name = ""
       self.arguments = ""
      
   @override
   def on_text_created(self, text) -> None:
       print(f"\nassistant on_text_created > ", end="", flush=True)

   @override
   def on_text_delta(self, delta, snapshot):
       # print(f"\nassistant on_text_delta > {delta.value}", end="", flush=True)
       print(f"{delta.value}")

   @override
   def on_end(self, ):
       print(f"\n end assistant > ",self.current_run_step_snapshot, end="", flush=True)

   @override
   def on_exception(self, exception: Exception) -> None:
       """Fired whenever an exception happens during streaming"""
       print(f"\nassistant > {exception}\n", end="", flush=True)

   @override
   def on_message_created(self, message: Message) -> None:
       print(f"\nassistant on_message_created > {message}\n", end="", flush=True)

   @override
   def on_message_done(self, message: Message) -> None:
       print(f"\nassistant on_message_done > {message}\n", end="", flush=True)

   @override
   def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:
       # print(f"\nassistant on_message_delta > {delta}\n", end="", flush=True)
       pass

   def on_tool_call_created(self, tool_call):
       # 4
       print(f"\nassistant on_tool_call_created > {tool_call}")
       self.function_name = tool_call.function.name       
       self.tool_id = tool_call.id
       print(f"\on_tool_call_created > run_step.status > {self.run_step.status}")
      
       print(f"\nassistant > {tool_call.type} {self.function_name}\n", flush=True)

       keep_retrieving_run = client.beta.threads.runs.retrieve(
           thread_id=self.thread_id,
           run_id=self.run_id
       )

       while keep_retrieving_run.status in ["queued", "in_progress"]: 
           keep_retrieving_run = client.beta.threads.runs.retrieve(
               thread_id=self.thread_id,
               run_id=self.run_id
           )
          
           print(f"\nSTATUS: {keep_retrieving_run.status}")      
      
   @override
   def on_tool_call_done(self, tool_call: ToolCall) -> None:       
       keep_retrieving_run = client.beta.threads.runs.retrieve(
           thread_id=self.thread_id,
           run_id=self.run_id
       )
      
       print(f"\nDONE STATUS: {keep_retrieving_run.status}")
      
       if keep_retrieving_run.status == "completed":
           all_messages = client.beta.threads.messages.list(
               thread_id=current_thread.id
           )

           print(all_messages.data[0].content[0].text.value, "", "")
           return
      
       elif keep_retrieving_run.status == "requires_action":
           print("here you would call your function")

           if self.function_name == "SEARCH":

               # ====
                outputs = []
                for call in keep_retrieving_run.required_action.submit_tool_outputs.tool_calls:
                    resp = client._client.post(ASSISTANT_API+'/pluginapi', params={"tid": self.thread_id, "aid": asst_id, "pid": call.function.name}, headers={"Authorization": "Bearer " + API_KEY}, json=json.loads(call.function.arguments))
                    outputs.append({"tool_call_id": call.id, "output": resp.text[:8000]})
                self.output=outputs
              
                with client.beta.threads.runs.submit_tool_outputs_stream(
                    thread_id=self.thread_id,
                    run_id=self.run_id,
                    tool_outputs=self.output,
                    event_handler=EventHandler(self.thread_id, self.assistant_id)
                ) as stream:
                    stream.until_done()                       
           else:
               print("unknown function")
               return
      
   @override
   def on_run_step_created(self, run_step: RunStep) -> None:
       # 2       
       print(f"on_run_step_created")
       self.run_id = run_step.run_id
       self.run_step = run_step
       print("The type ofrun_step run step is ", type(run_step), flush=True)
       print(f"\n run step created assistant > {run_step}\n", flush=True)

   @override
   def on_run_step_done(self, run_step: RunStep) -> None:
       print(f"\n run step done assistant > {run_step}\n", flush=True)

   def on_tool_call_delta(self, delta, snapshot): 
       if delta.type == 'function':
           # the arguments stream thorugh here and then you get the requires action event
           print(delta.function.arguments, end="", flush=True)
           self.arguments += delta.function.arguments
       elif delta.type == 'code_interpreter':
           print(f"on_tool_call_delta > code_interpreter")
           if delta.code_interpreter.input:
               print(delta.code_interpreter.input, end="", flush=True)
           if delta.code_interpreter.outputs:
               print(f"\n\noutput >", flush=True)
               for output in delta.code_interpreter.outputs:
                   if output.type == "logs":
                       print(f"\n{output.logs}", flush=True)
       else:
           print("ELSE")
           print(delta, end="", flush=True)

   @override
   def on_event(self, event: AssistantStreamEvent) -> None:
       # print("In on_event of event is ", event.event, flush=True)

       if event.event == "thread.run.requires_action":
           print("\nthread.run.requires_action > submit tool call")
           print(f"ARGS: {self.arguments}")
 



assistant = client.beta.assistants.create(
    name='test example',
    model='aide-gpt-4-turbo',
    instructions="you are an assistant that will answer my question",
    tools=[
        {
            "type": "function",
             "function": {
                "name": "SEARCH",
                "description": "Search more knowledge or realtime information from the Internet to answer the user.",
                "parameters": {
                    "type": "object",
                    "properties": {
                    "query": {
                        "type": "object",
                        "properties": {
                        "q": {
                            "type": "string",
                            "description": "Query string to be searched for on the search engine. This should be infered from the user's question and the conversation. Please split the original user query completely into more than 5 closely related important keywords, which are devided by `space key` for searching. If searching site is specified by me, please gnerate it followed by a site:XXX.com"
                        },
                        "mkt": {
                            "type": "string",
                            "enum": [
                            "zh-TW",
                            "en-US"
                            ],
                            "description": "The market that should be searched for. It should be aligned with the languages the role `user` adopts. E.g. the language #zh-TW maps with the `zh-TW` mkt option."
   = "https://prod.dvcbot.net/api/assts/v1"

API_KEY = ""
ASSISTANT_ID = "" 

USER_PROMPT = "從一數到一千"


async def main():
    http_client = httpx.AsyncClient(verify=False)
    client = AsyncOpenAI(base_url=ASSISTANT_API, api_key=API_KEY, http_client=http_client)

    thread = await client.beta.threads.create()

    user_prompt = USER_PROMPT

    await client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user",
        content=user_prompt,
  }  )

    stream = await client.beta.threads.runs.create(
        assistant_id=ASSISTANT_ID,
    },    thread_id=thread.id,
        stream=True,
    )
    requires_action_run_id = "required":
[    async for event in stream:
        if     event.event == "thread.message.delta":
      "q",      print(event)
        elif event.event == "thread.run.requires_action":
          "mkt"  requires_action_run_id = event.data.id

    if requires_action_run_id != "":
        run =  ]
 await client.beta.threads.runs.retrieve(requires_action_run_id, thread_id=thread.id)
        outputs = []
       }, for call in run.required_action.submit_tool_outputs.tool_calls:
            print(f"call plugin {call.function.name} with "topk"args: {call.function.arguments}")
            resp = await client._client.post(
         "type": "string",       ASSISTANT_API + "/pluginapi",
                params={"tid": thread.id, "descriptionaid": ASSISTANT_ID, "pid"The number of search results, it should be set as a single integer number between 1~5."
      : call.function.name},
                headers={"Authorization": "Bearer " + API_KEY},
             }   json=json.loads(call.function.arguments),
                 }timeout=30,
            )
       "required": [    result        = resp.text[:8000]
        "query",    print(f"plugin {call.function.name} result {result}")
             "topk"
  outputs.append({"tool_call_id": call.id, "output": result})
        stream = await client.beta.threads.runs.submit_tool_outputs(
     ]       run_id=run.id,
         }   stream=True,
         }   thread_id=thread.id,
     }     ],     metadata={tool_outputs=outputs,
        'backend_id': 'default')
     } ) asst_id=assistant.id
print(f"\nassistant created, id: ", asst_id, flush=True)

new_thread = client.beta.threads.create()
prompt = "中國在 2024 巴黎奧運的表現如何?"
client.beta.threads.messages.create(thread_id=new_thread.id, role="user", content=prompt)

with async for event in stream:
            print(event)

    await client.beta.threads.runs.create_and_stream(
    delete(thread_id=new_thread.id,)


if  assistant_id=asst_id,
    instructions=prompt,
    event_handler=EventHandler(new_thread.id, asst_id),
) as stream__name__ == "__main__":
    stream.until_done(asyncio.run(main())

Voice mode

(Coming Soon)