From bb0404445b9278a8769d1ba8e79a5bcbaa55d8eb Mon Sep 17 00:00:00 2001 From: Pilli Vamshi Date: Thu, 23 May 2024 18:19:15 +0530 Subject: [PATCH 1/3] Added Async await for openAI --- callautomation-openai-sample/main.py | 36 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/callautomation-openai-sample/main.py b/callautomation-openai-sample/main.py index 2d6c37a..630d2bb 100644 --- a/callautomation-openai-sample/main.py +++ b/callautomation-openai-sample/main.py @@ -21,28 +21,28 @@ ) # Your ACS resource connection string -ACS_CONNECTION_STRING = "" +ACS_CONNECTION_STRING = "endpoint=https://dacsrecordingtest.unitedstates.communication.azure.com/;accesskey=P03pgjuvw8Yo9nlRkdgjrm/TmT0yIkYt3fXowddBQ0QbOYZ6GbDaAir6om8N8sHOt7ifhJqT20aOsy4EDulO+A==" # Cognitive service endpoint -COGNITIVE_SERVICE_ENDPOINT="" +COGNITIVE_SERVICE_ENDPOINT="https://cognitive-service-waferwire.cognitiveservices.azure.com/" # Cognitive service endpoint -AZURE_OPENAI_SERVICE_KEY = "" +AZURE_OPENAI_SERVICE_KEY = "6d67cd33c89849019687cf69dfc80149" # Open AI service endpoint -AZURE_OPENAI_SERVICE_ENDPOINT="" +AZURE_OPENAI_SERVICE_ENDPOINT="https://waferwireopenai.openai.azure.com/" # Azure Open AI Deployment Model Name -AZURE_OPENAI_DEPLOYMENT_MODEL_NAME="" +AZURE_OPENAI_DEPLOYMENT_MODEL_NAME="call-automation-deployment" # Azure Open AI Deployment Model AZURE_OPENAI_DEPLOYMENT_MODEL="gpt-3.5-turbo" # Agent Phone Number -AGENT_PHONE_NUMBER="" +AGENT_PHONE_NUMBER="+918688023395" # Callback events URI to handle callback events. -CALLBACK_URI_HOST = "" +CALLBACK_URI_HOST = "https://2cp8hpr0.inc1.devtunnels.ms:8080" CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks" ANSWER_PROMPT_SYSTEM_TEMPLATE = """ @@ -83,7 +83,7 @@ app = Flask(__name__) -def get_chat_completions_async(system_prompt,user_prompt): +async def get_chat_completions_async(system_prompt,user_prompt): openai.api_key = AZURE_OPENAI_SERVICE_KEY openai.api_base = AZURE_OPENAI_SERVICE_ENDPOINT # your endpoint should look like the following https://YOUR_RESOURCE_NAME.openai.azure.com/ openai.api_type = 'azure' @@ -97,7 +97,7 @@ def get_chat_completions_async(system_prompt,user_prompt): global response_content try: - response = ChatCompletion.create(model=AZURE_OPENAI_DEPLOYMENT_MODEL,deployment_id=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME, messages=chat_request,max_tokens = 1000) + response = await ChatCompletion.create(model=AZURE_OPENAI_DEPLOYMENT_MODEL,deployment_id=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME, messages=chat_request,max_tokens = 1000) except ex: app.logger.info("error in openai api call : %s",ex) @@ -108,12 +108,12 @@ def get_chat_completions_async(system_prompt,user_prompt): response_content="" return response_content -def get_chat_gpt_response(speech_input): - return get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input) +async def get_chat_gpt_response(speech_input): + return await get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input) def handle_recognize(replyText,callerId,call_connection_id,context=""): play_source = TextSource(text=replyText, voice_name="en-US-NancyNeural") - recognize_result=call_automation_client.get_call_connection(call_connection_id).start_recognizing_media( + recognize_result= call_automation_client.get_call_connection(call_connection_id).start_recognizing_media( input_type=RecognizeInputType.SPEECH, target_participant=PhoneNumberIdentifier(callerId), end_silence_timeout=10, @@ -132,12 +132,12 @@ def handle_hangup(call_connection_id): def detect_escalate_to_agent_intent(speech_text, logger): return has_intent_async(user_query=speech_text, intent_description="talk to agent", logger=logger) -def has_intent_async(user_query, intent_description, logger): +async def has_intent_async(user_query, intent_description, logger): is_match=False system_prompt = "You are a helpful assistant" combined_prompt = f"In 1 word: does {user_query} have a similar meaning as {intent_description}?" #combined_prompt = base_user_prompt.format(user_query, intent_description) - response = get_chat_completions_async(system_prompt, combined_prompt) + response = await get_chat_completions_async(system_prompt, combined_prompt) if "yes" in response.lower(): is_match =True logger.info(f"OpenAI results: is_match={is_match}, customer_query='{user_query}', intent_description='{intent_description}'") @@ -183,7 +183,7 @@ def incoming_call_handler(): return Response(status=200) @app.route("/api/callbacks/", methods=["POST"]) -def handle_callback(contextId): +async def handle_callback(contextId): try: global caller_id , call_connection_id app.logger.info("Request Json: %s", request.json) @@ -198,7 +198,7 @@ def handle_callback(contextId): app.logger.info("call connected : data=%s", event.data) if event.type == "Microsoft.Communication.CallConnected": - handle_recognize(HELLO_PROMPT, + handle_recognize(HELLO_PROMPT, caller_id,call_connection_id, context="GetFreeFormText") @@ -211,7 +211,7 @@ def handle_callback(contextId): if detect_escalate_to_agent_intent(speech_text=speech_text,logger=app.logger): handle_play(call_connection_id=call_connection_id,text_to_play=END_CALL_PHRASE_TO_CONNECT_AGENT,context=CONNECT_AGENT_CONTEXT) else: - chat_gpt_response= get_chat_gpt_response(speech_text) + chat_gpt_response= await get_chat_gpt_response(speech_text) app.logger.info(f"Chat GPT response:{chat_gpt_response}") regex = re.compile(CHAT_RESPONSE_EXTRACT_PATTERN) match = regex.search(chat_gpt_response) @@ -255,7 +255,7 @@ def handle_callback(contextId): else: app.logger.info(f"Initializing the Call transfer...") transfer_destination=PhoneNumberIdentifier(AGENT_PHONE_NUMBER) - call_connection_client =call_automation_client.get_call_connection(call_connection_id=call_connection_id) + call_connection_client = call_automation_client.get_call_connection(call_connection_id=call_connection_id) call_connection_client.transfer_call_to_participant(target_participant=transfer_destination) app.logger.info(f"Transfer call initiated: {context}") From 62b49b1ac771eb01133b17186906fde5cc0a7c0f Mon Sep 17 00:00:00 2001 From: Pilli Vamshi Date: Thu, 23 May 2024 18:23:12 +0530 Subject: [PATCH 2/3] removed endpoints --- callautomation-openai-sample/main.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/callautomation-openai-sample/main.py b/callautomation-openai-sample/main.py index 630d2bb..b84ac44 100644 --- a/callautomation-openai-sample/main.py +++ b/callautomation-openai-sample/main.py @@ -21,16 +21,16 @@ ) # Your ACS resource connection string -ACS_CONNECTION_STRING = "endpoint=https://dacsrecordingtest.unitedstates.communication.azure.com/;accesskey=P03pgjuvw8Yo9nlRkdgjrm/TmT0yIkYt3fXowddBQ0QbOYZ6GbDaAir6om8N8sHOt7ifhJqT20aOsy4EDulO+A==" +ACS_CONNECTION_STRING = "" # Cognitive service endpoint -COGNITIVE_SERVICE_ENDPOINT="https://cognitive-service-waferwire.cognitiveservices.azure.com/" +COGNITIVE_SERVICE_ENDPOINT="" # Cognitive service endpoint -AZURE_OPENAI_SERVICE_KEY = "6d67cd33c89849019687cf69dfc80149" +AZURE_OPENAI_SERVICE_KEY = "" # Open AI service endpoint -AZURE_OPENAI_SERVICE_ENDPOINT="https://waferwireopenai.openai.azure.com/" +AZURE_OPENAI_SERVICE_ENDPOINT="" # Azure Open AI Deployment Model Name AZURE_OPENAI_DEPLOYMENT_MODEL_NAME="call-automation-deployment" @@ -39,10 +39,10 @@ AZURE_OPENAI_DEPLOYMENT_MODEL="gpt-3.5-turbo" # Agent Phone Number -AGENT_PHONE_NUMBER="+918688023395" +AGENT_PHONE_NUMBER="" # Callback events URI to handle callback events. -CALLBACK_URI_HOST = "https://2cp8hpr0.inc1.devtunnels.ms:8080" +CALLBACK_URI_HOST = "" CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks" ANSWER_PROMPT_SYSTEM_TEMPLATE = """ From 3d9dcf3f2d074dfb1e3b30144a0d80f0d223715a Mon Sep 17 00:00:00 2001 From: Vinothini Dharmaraj Date: Thu, 23 May 2024 23:59:01 -0700 Subject: [PATCH 3/3] adding the async --- callautomation-openai-sample/main.py | 189 ++++++++++++------ callautomation-openai-sample/requirements.txt | 2 +- 2 files changed, 125 insertions(+), 66 deletions(-) diff --git a/callautomation-openai-sample/main.py b/callautomation-openai-sample/main.py index b84ac44..3f430f8 100644 --- a/callautomation-openai-sample/main.py +++ b/callautomation-openai-sample/main.py @@ -5,20 +5,21 @@ from azure.eventgrid import EventGridEvent, SystemEventNames import requests from flask import Flask, Response, request, json +import functools from logging import INFO import re +from aiohttp import web from azure.communication.callautomation import ( - CallAutomationClient, PhoneNumberIdentifier, RecognizeInputType, TextSource ) +from azure.communication.callautomation.aio import ( + CallAutomationClient + ) from azure.core.messaging import CloudEvent -import openai - -from openai.api_resources import ( - ChatCompletion -) +import asyncio +from openai import AsyncAzureOpenAI # Your ACS resource connection string ACS_CONNECTION_STRING = "" @@ -76,18 +77,36 @@ recording_chunks_location = [] max_retry = 2 -openai.api_key = AZURE_OPENAI_SERVICE_KEY -openai.api_base = AZURE_OPENAI_SERVICE_ENDPOINT # your endpoint should look like the following https://YOUR_RESOURCE_NAME.openai.azure.com/ -openai.api_type = 'azure' -openai.api_version = '2023-05-15' # this may change in the future - app = Flask(__name__) +""" def async_action(): + def wrapper(func): + @functools.wraps(func) + async def wrapped(*args, **kwargs): + # Some fancy foo stuff + return await func(*args, **kwargs) + return wrapped + return wrapper """ + +def async_action(f): + @functools.wraps(f) + async def wrapped(*args, **kwargs): + return await f(*args, **kwargs) + return wrapped + +openai_client = AsyncAzureOpenAI( + # This is the default and can be omitted + api_key=AZURE_OPENAI_SERVICE_KEY, + api_version="2024-02-01", + azure_endpoint=AZURE_OPENAI_SERVICE_ENDPOINT +) + + async def get_chat_completions_async(system_prompt,user_prompt): - openai.api_key = AZURE_OPENAI_SERVICE_KEY + """ openai.api_key = AZURE_OPENAI_SERVICE_KEY openai.api_base = AZURE_OPENAI_SERVICE_ENDPOINT # your endpoint should look like the following https://YOUR_RESOURCE_NAME.openai.azure.com/ openai.api_type = 'azure' - openai.api_version = '2023-05-15' # this may change in the future + openai.api_version = '2023-05-15' # this may change in the future """ # Define your chat completions request chat_request = [ @@ -95,25 +114,41 @@ async def get_chat_completions_async(system_prompt,user_prompt): {"role": "user", "content": f"In less than 200 characters: respond to this question: {user_prompt}?"} ] + app.logger.info("get_chat_completions_async") + + + """ models = openai_client.models + for model in models: + app.logger.info(model) """ global response_content + global response try: - response = await ChatCompletion.create(model=AZURE_OPENAI_DEPLOYMENT_MODEL,deployment_id=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME, messages=chat_request,max_tokens = 1000) - except ex: - app.logger.info("error in openai api call : %s",ex) - + response = await openai_client.chat.completions.create( + model=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME, + messages=chat_request, + max_tokens = 1000) + except Exception as ex: + app.logger.info("error in openai api call : %s", ex) + # Extract the response content if response is not None : - response_content = response['choices'][0]['message']['content'] + response_content = response.choices[0].message.content else : - response_content="" - return response_content + response_content = "" + + app.logger.info("chat gpt resonse content : %s", response_content) + return response_content + async def get_chat_gpt_response(speech_input): - return await get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input) + app.logger.info("get_chat_gpt_response, speech_text =%s", + speech_input) + return await get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input) -def handle_recognize(replyText,callerId,call_connection_id,context=""): + +async def handle_recognize(replyText,callerId,call_connection_id,context=""): play_source = TextSource(text=replyText, voice_name="en-US-NancyNeural") - recognize_result= call_automation_client.get_call_connection(call_connection_id).start_recognizing_media( + recognize_result= await call_automation_client.get_call_connection(call_connection_id).start_recognizing_media( input_type=RecognizeInputType.SPEECH, target_participant=PhoneNumberIdentifier(callerId), end_silence_timeout=10, @@ -121,21 +156,28 @@ def handle_recognize(replyText,callerId,call_connection_id,context=""): operation_context=context) app.logger.info("handle_recognize : data=%s",recognize_result) -def handle_play(call_connection_id, text_to_play, context): + +async def handle_play(call_connection_id, text_to_play, context): play_source = TextSource(text=text_to_play, voice_name= "en-US-NancyNeural") - call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source, + await call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source, operation_context=context) - -def handle_hangup(call_connection_id): - call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True) -def detect_escalate_to_agent_intent(speech_text, logger): - return has_intent_async(user_query=speech_text, intent_description="talk to agent", logger=logger) + +async def handle_hangup(call_connection_id): + await call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True) + + +async def detect_escalate_to_agent_intent(speech_text, logger): + return await has_intent_async(user_query=speech_text, intent_description="talk to agent", logger=logger) + async def has_intent_async(user_query, intent_description, logger): is_match=False system_prompt = "You are a helpful assistant" combined_prompt = f"In 1 word: does {user_query} have a similar meaning as {intent_description}?" + + logger.info("has_intent_async method executing") + #combined_prompt = base_user_prompt.format(user_query, intent_description) response = await get_chat_completions_async(system_prompt, combined_prompt) if "yes" in response.lower(): @@ -150,7 +192,9 @@ def get_sentiment_score(sentiment_score): return int(match.group()) if match else -1 @app.route("/api/incomingCall", methods=['POST']) -def incoming_call_handler(): +#@async_action +async def incoming_call_handler(): + #data = await request.get_json() for event_dict in request.json: event = EventGridEvent.from_dict(event_dict) app.logger.info("incoming event data --> %s", event.data) @@ -175,7 +219,7 @@ def incoming_call_handler(): app.logger.info("callback url: %s", callback_uri) - answer_call_result = call_automation_client.answer_call(incoming_call_context=incoming_call_context, + answer_call_result = await call_automation_client.answer_call(incoming_call_context=incoming_call_context, cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, callback_url=callback_uri) app.logger.info("Answered call for connection id: %s", @@ -183,9 +227,11 @@ def incoming_call_handler(): return Response(status=200) @app.route("/api/callbacks/", methods=["POST"]) +#@async_action async def handle_callback(contextId): try: global caller_id , call_connection_id + #request_json = await request.get_json() app.logger.info("Request Json: %s", request.json) for event_dict in request.json: event = CloudEvent.from_dict(event_dict) @@ -198,7 +244,7 @@ async def handle_callback(contextId): app.logger.info("call connected : data=%s", event.data) if event.type == "Microsoft.Communication.CallConnected": - handle_recognize(HELLO_PROMPT, + await handle_recognize(HELLO_PROMPT, caller_id,call_connection_id, context="GetFreeFormText") @@ -208,30 +254,32 @@ async def handle_callback(contextId): app.logger.info("Recognition completed, speech_text =%s", speech_text); if speech_text is not None and len(speech_text) > 0: - if detect_escalate_to_agent_intent(speech_text=speech_text,logger=app.logger): + """ if await detect_escalate_to_agent_intent(speech_text=speech_text,logger=app.logger): handle_play(call_connection_id=call_connection_id,text_to_play=END_CALL_PHRASE_TO_CONNECT_AGENT,context=CONNECT_AGENT_CONTEXT) + else: """ + app.logger.info("sending text to opanai, speech_text =%s", + speech_text); + chat_gpt_response= await get_chat_gpt_response(speech_text) + app.logger.info(f"Chat GPT response:{chat_gpt_response}") + regex = re.compile(CHAT_RESPONSE_EXTRACT_PATTERN) + match = regex.search(chat_gpt_response) + if match: + answer = match.group(1) + sentiment_score = match.group(2).strip() + intent = match.group(3) + category = match.group(4) + app.logger.info(f"Chat GPT Answer={answer}, Sentiment Rating={sentiment_score}, Intent={intent}, Category={category}") + score=get_sentiment_score(sentiment_score) + app.logger.info(f"Score={score}") + if -1 < score < 5: + app.logger.info(f"Score is less than 5") + await handle_play(call_connection_id=call_connection_id,text_to_play=CONNECT_AGENT_PROMPT,context=CONNECT_AGENT_CONTEXT) + else: + app.logger.info(f"Score is more than 5") + await handle_recognize(answer,caller_id,call_connection_id,context="OpenAISample") else: - chat_gpt_response= await get_chat_gpt_response(speech_text) - app.logger.info(f"Chat GPT response:{chat_gpt_response}") - regex = re.compile(CHAT_RESPONSE_EXTRACT_PATTERN) - match = regex.search(chat_gpt_response) - if match: - answer = match.group(1) - sentiment_score = match.group(2).strip() - intent = match.group(3) - category = match.group(4) - app.logger.info(f"Chat GPT Answer={answer}, Sentiment Rating={sentiment_score}, Intent={intent}, Category={category}") - score=get_sentiment_score(sentiment_score) - app.logger.info(f"Score={score}") - if -1 < score < 5: - app.logger.info(f"Score is less than 5") - handle_play(call_connection_id=call_connection_id,text_to_play=CONNECT_AGENT_PROMPT,context=CONNECT_AGENT_CONTEXT) - else: - app.logger.info(f"Score is more than 5") - handle_recognize(answer,caller_id,call_connection_id,context="OpenAISample") - else: - app.logger.info("No match found") - handle_recognize(chat_gpt_response,caller_id,call_connection_id,context="OpenAISample") + app.logger.info("No match found") + await handle_recognize(chat_gpt_response,caller_id,call_connection_id,context="OpenAISample") elif event.type == "Microsoft.Communication.RecognizeFailed": resultInformation = event.data['resultInformation'] @@ -239,24 +287,24 @@ async def handle_callback(contextId): context=event.data['operationContext'] global max_retry if reasonCode == 8510 and 0 < max_retry: - handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id) + await handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id) max_retry -= 1 else: - handle_play(call_connection_id,GOODBYE_PROMPT, GOODBYE_CONTEXT) + await handle_play(call_connection_id,GOODBYE_PROMPT, GOODBYE_CONTEXT) elif event.type == "Microsoft.Communication.PlayCompleted": - context=event.data['operationContext'] + context=event.data['operationContext'] + app.logger.info(f"Context: " + context) if context.lower() == TRANSFER_FAILED_CONTEXT.lower() or context.lower() == GOODBYE_CONTEXT.lower() : - handle_hangup(call_connection_id) + await handle_hangup(call_connection_id) elif context.lower() == CONNECT_AGENT_CONTEXT.lower(): if not AGENT_PHONE_NUMBER or AGENT_PHONE_NUMBER.isspace(): app.logger.info(f"Agent phone number is empty") - handle_play(call_connection_id=call_connection_id,text_to_play=AGENT_PHONE_NUMBER_EMPTY_PROMPT) + await handle_play(call_connection_id=call_connection_id,text_to_play=AGENT_PHONE_NUMBER_EMPTY_PROMPT) else: app.logger.info(f"Initializing the Call transfer...") transfer_destination=PhoneNumberIdentifier(AGENT_PHONE_NUMBER) - call_connection_client = call_automation_client.get_call_connection(call_connection_id=call_connection_id) - call_connection_client.transfer_call_to_participant(target_participant=transfer_destination) + await call_automation_client.get_call_connection(call_connection_id=call_connection_id).transfer_call_to_participant(target_participant=transfer_destination) app.logger.info(f"Transfer call initiated: {context}") elif event.type == "Microsoft.Communication.CallTransferAccepted": @@ -268,15 +316,26 @@ async def handle_callback(contextId): sub_code = resultInformation['subCode'] # check for message extraction and code app.logger.info(f"Encountered error during call transfer, message=, code=, subCode={sub_code}") - handle_play(call_connection_id=call_connection_id,text_to_play=CALLTRANSFER_FAILURE_PROMPT, context=TRANSFER_FAILED_CONTEXT) + await handle_play(call_connection_id=call_connection_id,text_to_play=CALLTRANSFER_FAILURE_PROMPT, context=TRANSFER_FAILED_CONTEXT) return Response(status=200) - except Exception as ex: - app.logger.info("error in event handling") + except ex: + app.logger.info(f"error in event handling exception = {ex}") @app.route("/") def hello(): return "Hello ACS CallAutomation!..test" +async def init_app(): + """Initialize the aiohttp web application.""" + app = web.Application() + app.router.add_post('/api/incomingCall', incoming_call_handler) + app.router.add_post('/api/callbacks/', handle_callback) + return app + if __name__ == '__main__': app.logger.setLevel(INFO) app.run(port=8080) + + #loop = asyncio.get_event_loop() + #web.run_app(init_app(), host='0.0.0.0', port=5000, loop=loop) + #asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) diff --git a/callautomation-openai-sample/requirements.txt b/callautomation-openai-sample/requirements.txt index 94dc155..d7cfa1d 100644 --- a/callautomation-openai-sample/requirements.txt +++ b/callautomation-openai-sample/requirements.txt @@ -1,4 +1,4 @@ Flask>=2.3.2 azure-eventgrid==4.11.0 azure-communication-callautomation==1.1.0 -openai==0.28.1 \ No newline at end of file +openai==1.30.2 \ No newline at end of file