From 4790e4c9a8472dd9388cb9c5fbfc56c9ab9146cb Mon Sep 17 00:00:00 2001 From: Yamini Pradhan Date: Tue, 15 Apr 2025 02:10:11 -0400 Subject: [PATCH 1/7] adds callautomation mcs sample --- callautomation-live-transcription/main.py | 477 +++++------------- .../requirements.txt | 8 +- callautomation-mcs-sample/botActivity.py | 6 + callautomation-mcs-sample/callContext.py | 6 + callautomation-mcs-sample/conversation.py | 9 + callautomation-mcs-sample/main.py | 246 +++++++++ callautomation-mcs-sample/requirements.txt | 4 + 7 files changed, 412 insertions(+), 344 deletions(-) create mode 100644 callautomation-mcs-sample/botActivity.py create mode 100644 callautomation-mcs-sample/callContext.py create mode 100644 callautomation-mcs-sample/conversation.py create mode 100644 callautomation-mcs-sample/main.py create mode 100644 callautomation-mcs-sample/requirements.txt diff --git a/callautomation-live-transcription/main.py b/callautomation-live-transcription/main.py index 1fd7c5b..743e953 100644 --- a/callautomation-live-transcription/main.py +++ b/callautomation-live-transcription/main.py @@ -1,372 +1,167 @@ - -import ast -import uuid -import os -from pathlib import Path -from urllib.parse import urlencode, urljoin, urlparse, urlunparse -from azure.eventgrid import EventGridEvent, SystemEventNames -import requests -from quart import Quart, Response, request, json, redirect, websocket, render_template -import json -from logging import INFO -import re +from quart import Quart, request, Response, websocket from azure.communication.callautomation import ( - PhoneNumberIdentifier, - RecognizeInputType, - TextSource, - # TranscriptionConfiguration, - TranscriptionTransportType, - ServerCallLocator, - TranscriptionOptions, - RecordingContent, - RecordingChannel, - RecordingFormat - ) -from azure.communication.callautomation.aio import ( - CallAutomationClient - ) + CallAutomationClient, SsmlSource, PlayToAllOptions, TranscriptionOptions, TranscriptionTransportType +) from azure.core.messaging import CloudEvent -import time +from azure.eventgrid import EventGridEvent, SystemEventNames import asyncio import json -from azure.communication.callautomation._shared.models import identifier_from_raw_id -from transcriptionDataHandler import process_websocket_message_async -# import openai +import uuid +import re +from collections import defaultdict +from httpx import AsyncClient -# from openai.api_resources import ( -# ChatCompletion -# ) +# Initialize Quart app +app = Quart(__name__) -# Your ACS resource connection string +# Configuration ACS_CONNECTION_STRING = "" +COGNITIVE_SERVICE_ENDPOINT = "" +AGENT_PHONE_NUMBER = "" +DIRECT_LINE_SECRET = "" +BASE_URI = "".rstrip("/") +BASE_WSS_URI = BASE_URI.split("https://")[1] -# Cognitive service endpoint -COGNITIVE_SERVICE_ENDPOINT="" - -# Acs Phone Number -ACS_PHONE_NUMBER="" - -# Transcription Locale -LOCALE="" - -# Agent Phone Number -AGENT_PHONE_NUMBER="" - -# Callback events URI to handle callback events. -CALLBACK_URI_HOST = "" -CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks" - -HELP_IVR_PROMPT = "Welcome to the Contoso Utilities. To access your account, we need to verify your identity. Please enter your date of birth in the format DDMMYYYY using the keypad on your phone. Once we’ve validated your identity we will connect you to the next available agent. Please note this call will be recorded!" -ADD_AGENT_PROMPT = "Thank you for verifying your identity. We are now connecting you to the next available agent. Please hold the line and we will be with you shortly. Thank you for your patience." -INCORRECT_DOB_PROMPT = "Sorry, we were unable to verify your identity based on the date of birth you entered. Please try again. Remember to enter your date of birth in the format DDMMYYYY using the keypad on your phone. Once you've entered your date of birth, press the pound key. Thank you!" -ADD_PARTICIPANT_FAILURE_PROMPT = "We're sorry, we were unable to connect you to an agent at this time, we will get the next available agent to call you back as soon as possible." -GOODBYE_PROMPT = "Thank you for calling Contoso Utilities. We hope we were able to assist you today. Goodbye" -TIMEOUT_SILENCE_PROMPT = "I’m sorry, I didn’t receive any input. Please type your date of birth in the format of DDMMYYYY." -GOODBYE_CONTEXT = "Goodbye" -ADD_AGENT_CONTEXT = "AddAgent" -INCORRECT_DOB_CONTEXT = "IncorrectDob" -ADD_PARTICIPANT_FAILURE_CONTEXT = "FailedToAddParticipant" -INCOMING_CALL_CONTEXT = "incomingCallContext" - -DOB_REGEX = r"^(0[1-9]|[12][0-9]|3[01])(0[1-9]|1[012])[12][0-9]{3}$" - +# Initialize Call Automation Client call_automation_client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING) -recording_id = None -recording_chunks_location = [] -recording_callback_url = None -max_retry = 2 -words_to_numbers = { - 'one': 1, - 'two': 2, - 'three': 3, - 'four': 4, - 'five': 5, - 'six': 6, - 'seven': 7, - 'eight': 8, - 'nine': 9, - 'zero': 0 - } +# HTTP client for Direct Line +http_client = AsyncClient(headers={"Authorization": f"Bearer {DIRECT_LINE_SECRET}"}) -TEMPLATE_FILES_PATH = "template" -app = Quart(__name__, - template_folder=TEMPLATE_FILES_PATH) +# Store call contexts +call_store = defaultdict(dict) -async def handle_recognize(text_to_play,caller_id,call_connection_id,context=""): - play_source = TextSource(text=text_to_play, voice_name="en-US-NancyNeural") - connection_client = call_automation_client.get_call_connection(call_connection_id) - try: - recognize_result = await connection_client.start_recognizing_media( - dtmf_max_tones_to_collect=8, - input_type=RecognizeInputType.DTMF, - target_participant=PhoneNumberIdentifier(caller_id), - end_silence_timeout=15, - dtmf_inter_tone_timeout=5, - play_prompt=play_source, - operation_context=context) - app.logger.info("handle_recognize : data=%s",recognize_result) - except Exception as ex: - app.logger.info("Error in recognize: %s", ex) - -async def handle_play(call_connection_id, text_to_play, context): - play_source = TextSource(text=text_to_play, voice_name= "en-US-NancyNeural") - await call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source,operation_context=context) +@app.route("/", methods=["GET"]) +async def home(): + return "Hello ACS CallAutomation - MCS Sample!" -async def handle_hangup(call_connection_id): - await call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True) +@app.route("/api/incomingCall", methods=["POST"]) +async def incoming_call(): + event_grid_events = await request.json + for event_grid_event in event_grid_events: + event = EventGridEvent.from_dict(event_grid_event) + if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: + validation_code = event.data["validationCode"] + return Response(response=json.dumps({"validationResponse": validation_code}), status=200) -@app.route("/api/incomingCall", methods=['POST']) -async def incoming_call_handler(): - app.logger.info("Received incoming call event.") - try: - for event_dict in await request.json: - event = EventGridEvent.from_dict(event_dict) - app.logger.info("incoming event data --> %s", event.data) - if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: - app.logger.info("Validating subscription") - validation_code = event.data['validationCode'] - return Response(response=json.dumps({"validationResponse": validation_code}), status=200) + incoming_call_context = event.data["incomingCallContext"] + callback_uri = f"{BASE_URI}/api/calls/{uuid.uuid4()}" - if event.event_type == "Microsoft.Communication.IncomingCall": - app.logger.info("Incoming call received: data=%s", event.data) - if event.data['from']['kind'] =="phoneNumber": - caller_id = event.data['from']["phoneNumber"]["value"] - else : - caller_id = event.data['from']['rawId'] - app.logger.info("incoming call handler caller id: %s", - caller_id) - incoming_call_context = event.data['incomingCallContext'] - guid = uuid.uuid4() - callback_uri = f"{CALLBACK_EVENTS_URI}/{guid}?callerId={caller_id}" - websocket_url = urlunparse(("wss", urlparse(CALLBACK_URI_HOST).netloc, "/ws", "", "", "")) - global recording_callback_url - recording_callback_url = callback_uri - transcription_config = TranscriptionOptions( - transport_url=websocket_url, - transport_type=TranscriptionTransportType.WEBSOCKET, - locale=LOCALE, - start_transcription=True - ) + transcription_options = TranscriptionOptions( + transport_url=f"wss://{BASE_WSS_URI}/ws", + locale="en-US", + start_transcription=True, + transport_type=TranscriptionTransportType.WEBSOCKET + ) - try: - answer_call_result = await call_automation_client.answer_call( - incoming_call_context=incoming_call_context, - transcription=transcription_config, - cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, - callback_url=callback_uri - ) - app.logger.info(f"Call answered, connection ID: {answer_call_result.call_connection_id}") - except Exception as e: - app.logger.error(f"Failed to answer call: {e}") - return Response(status=500) - return Response(status=200) - except Exception as ex: - app.logger.error(f"Error handling incoming call: {ex}") - return Response(status=500) - -@app.route("/api/callbacks/", methods=["POST"]) -async def handle_callback(contextId): - try: - global caller_id , call_connection_id - # app.logger.info("Request Json: %s", request.json) - for event_dict in await request.json: - event = CloudEvent.from_dict(event_dict) - call_connection_id = event.data['callConnectionId'] + try: + answer_call_result = await call_automation_client.answer_call( + incoming_call_context=incoming_call_context, + callback_url=callback_uri, + cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, + transcription=transcription_options + ) + correlation_id = answer_call_result.call_connection_properties.correlation_id + if correlation_id: + call_store[correlation_id] = {"correlation_id": correlation_id} + except Exception as ex: + app.logger.error(f"Error answering call: {ex}") + return Response(status=200) - app.logger.info("%s event received for call connection id: %s, correlation id: %s", - event.type, call_connection_id, event.data["correlationId"]) - caller_id = request.args.get("callerId").strip() - if "+" not in caller_id: - caller_id="+".strip()+caller_id.strip() +@app.route("/api/calls/", methods=["POST"]) +async def call_events(context_id): + cloud_events = await request.json + for cloud_event in cloud_events: + event = CloudEvent.from_dict(cloud_event) + call_connection = call_automation_client.get_call_connection(event.data["callConnectionId"]) + call_media = call_connection.get_call_media() + correlation_id = event.data["correlationId"] - app.logger.info("call connected : data=%s", event.data) - if event.type == "Microsoft.Communication.CallConnected": - recording_result = await call_automation_client.start_recording( - server_call_id=event.data["serverCallId"], - recording_content_type=RecordingContent.AUDIO_VIDEO, - recording_channel_type=RecordingChannel.MIXED, - recording_format_type=RecordingFormat.MP4, - recording_state_callback_url=recording_callback_url, - pause_on_start=True - ) - global recording_id - recording_id=recording_result.recording_id - global call_properties - call_properties = await call_automation_client.get_call_connection(call_connection_id).get_call_properties() - app.logger.info("Transcription subscription--->=%s", call_properties.transcription_subscription) - - elif event.type == "Microsoft.Communication.PlayStarted": - app.logger.info("Received PlayStarted event.") - elif event.type == "Microsoft.Communication.PlayCompleted": - context=event.data['operationContext'] - app.logger.info("Play completed: context=%s", event.data['operationContext']) - if context == ADD_AGENT_CONTEXT: - app.logger.info("Add agent to the call: %s", ADD_AGENT_CONTEXT) - #Add agent - target = PhoneNumberIdentifier(AGENT_PHONE_NUMBER) - source_caller_id_number = PhoneNumberIdentifier(ACS_PHONE_NUMBER) - app.logger.info("source_caller_id_number: %s", source_caller_id_number) - call_connection = call_automation_client.get_call_connection(call_connection_id) - add_participant_result= await call_connection.add_participant(target_participant=target, - source_caller_id_number=source_caller_id_number, - operation_context=None, - invitation_timeout=15) - app.logger.info("Add agent to the call: %s", add_participant_result.invitation_id) - elif context == GOODBYE_CONTEXT or context == ADD_PARTICIPANT_FAILURE_CONTEXT: - await stop_transcription_and_recording(call_connection_id, recording_id=recording_id) - await handle_hangup(call_connection_id=call_connection_id) - elif event.type == "Microsoft.Communication.RecognizeCompleted": - app.logger.info("Recognize completed: data=%s", event.data) - if event.data['recognitionType'] == "dtmf": - dtmf_tones = event.data['dtmfResult']['tones']; - app.logger.info("Recognition completed, dtmf tones =%s", dtmf_tones) - global words_to_numbers - numbers = "".join(str(words_to_numbers [x]) for x in dtmf_tones) - regex = re.compile(DOB_REGEX) - match = regex.search(numbers) - if match: - await resume_transcription_and_recording(call_connection_id, recording_id) - else: - await handle_recognize(INCORRECT_DOB_PROMPT, caller_id, call_connection_id, INCORRECT_DOB_CONTEXT) - elif event.type == "Microsoft.Communication.RecognizeFailed": - resultInformation = event.data['resultInformation'] - app.logger.info("Recognize failed event received: message=%s, sub code=%s", resultInformation['message'],resultInformation['subCode']) - reasonCode = resultInformation['subCode'] - context=event.data['operationContext'] - global max_retry - if reasonCode == 8510 and 0 < max_retry: - await handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id, context="retryContext") - max_retry -= 1 - else: - await handle_play(call_connection_id=call_connection_id,text_to_play=GOODBYE_PROMPT, context=GOODBYE_CONTEXT) - elif event.type == "Microsoft.Communication.AddParticipantFailed": - resultInformation = event.data['resultInformation'] - app.logger.info("Received Add Participants Failed message=%s, sub code=%s",resultInformation['message'],resultInformation['subCode']) - await handle_play(call_connection_id=call_connection_id,text_to_play=ADD_PARTICIPANT_FAILURE_PROMPT, context=ADD_PARTICIPANT_FAILURE_CONTEXT) - elif event.type == "Microsoft.Communication.RecordingStateChanged": - app.logger.info("Received RecordingStateChanged event.") - app.logger.info(event.data['state']) - elif event.type == "Microsoft.Communication.TranscriptionStarted": - app.logger.info("Received TranscriptionStarted event.") - operation_context = None - if 'operationContext' in event.data: - operation_context = event.data['operationContext'] - - if operation_context is None: - await call_automation_client.get_call_connection(event.data['callConnectionId']).stop_transcription(operation_context="nextRecognizeContext") - elif operation_context is not None and operation_context == 'StartTranscriptionContext': - await handle_play(event.data['callConnectionId'], ADD_AGENT_PROMPT, ADD_AGENT_CONTEXT) - - elif event.type == "Microsoft.Communication.TranscriptionStopped": - app.logger.info("Received TranscriptionStopped event.") - operation_context = None - if 'operationContext' in event.data: - operation_context = event.data['operationContext'] + if event.type == "Microsoft.Communication.CallConnected": + conversation = await start_conversation() + conversation_id = conversation["conversationId"] + call_store[correlation_id]["conversation_id"] = conversation_id - if operation_context is not None and operation_context == 'nextRecognizeContext': - await handle_recognize(HELP_IVR_PROMPT,caller_id,call_connection_id,context="hellocontext") - - elif event.type == "Microsoft.Communication.TranscriptionUpdated": - app.logger.info("Received TranscriptionUpdated event.") - transcriptionUpdate = event.data['transcriptionUpdate'] - app.logger.info(transcriptionUpdate["transcriptionStatus"]) - app.logger.info(transcriptionUpdate["transcriptionStatusDetails"]) - elif event.type == "Microsoft.Communication.TranscriptionFailed": - app.logger.info("Received TranscriptionFailed event.") - resultInformation = event.data['resultInformation'] - app.logger.info("Encountered error during Transcription, message=%s, code=%s, subCode=%s", - resultInformation['message'], - resultInformation['code'], - resultInformation['subCode']) - return Response(status=200) - except Exception as ex: - app.logger.info("error in event handling") + asyncio.create_task(listen_to_bot_websocket(conversation["streamUrl"], call_connection)) + await send_message(conversation_id, "Hi") -@app.route('/api/recordingFileStatus', methods=['POST']) -async def recording_file_status(): - try: - for event_dict in await request.json: - event = EventGridEvent.from_dict(event_dict) - if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: - code = event.data['validationCode'] - if code: - data = {"validationResponse": code} - app.logger.info("Successfully Subscribed EventGrid.ValidationEvent --> " + str(data)) - return Response(response=str(data), status=200) + elif event.type == "Microsoft.Communication.PlayFailed": + app.logger.info("Play Failed") - if event.event_type == SystemEventNames.AcsRecordingFileStatusUpdatedEventName: - acs_recording_file_status_updated_event_data = event.data - acs_recording_chunk_info_properties = acs_recording_file_status_updated_event_data['recordingStorageInfo']['recordingChunks'][0] - app.logger.info("acsRecordingChunkInfoProperties response data --> " + str(acs_recording_chunk_info_properties)) - global content_location - content_location = acs_recording_chunk_info_properties['contentLocation'] - return Response(response="Ok") - - except Exception as ex: - app.logger.error( "Failed to get recording file") - return Response(response='Failed to get recording file', status=400) + elif event.type == "Microsoft.Communication.PlayCompleted": + app.logger.info("Play Completed") -@app.route('/download') -async def download_recording(): - try: - app.logger.info("Content location : %s", content_location) - downloads_folder = str(Path.home() / "Downloads") - file_path = os.path.join(downloads_folder, "Recording_File.mp4") + elif event.type == "Microsoft.Communication.TranscriptionStarted": + app.logger.info(f"Transcription started: {event.data['operationContext']}") - recording_data = await call_automation_client.download_recording(content_location) - with open(file_path, "wb") as binary_file: - binary_file.write(await recording_data.read()) - return redirect("/") - except Exception as ex: - app.logger.info("Failed to download recording --> " + str(ex)) - return Response(text=str(ex), status=500) - -async def initiate_transcription(call_connection_id): - app.logger.info("initiate_transcription is called %s", call_connection_id) - call_connection = call_automation_client.get_call_connection(call_connection_id) - await call_connection.start_transcription(locale=LOCALE, operation_context="StartTranscriptionContext") - app.logger.info("Starting the transcription") - -async def stop_transcription_and_recording(call_connection_id, recording_id): - app.logger.info("stop_transcription_and_recording method triggered.") - call_properties = await call_automation_client.get_call_connection(call_connection_id).get_call_properties() - recording_properties = await call_automation_client.get_recording_properties(recording_id) - if call_properties.transcription_subscription.state == 'active': - await call_automation_client.get_call_connection(call_connection_id).stop_transcription() - if recording_properties.recording_state == "active": - await call_automation_client.stop_recording(recording_id=recording_id) + elif event.type == "Microsoft.Communication.TranscriptionStopped": + app.logger.info(f"Transcription stopped: {event.data['operationContext']}") -async def resume_transcription_and_recording(call_connection_id, recording_id): - await initiate_transcription(call_connection_id) - app.logger.info("Transcription re initiated.") + elif event.type == "Microsoft.Communication.CallDisconnected": + call_store.pop(correlation_id, None) + return Response(status=200) - await call_automation_client.resume_recording(recording_id) - app.logger.info(f"Recording resumed. RecordingId: {recording_id}") - - # WebSocket. @app.websocket('/ws') async def ws(): - print("Client connected to WebSocket") + correlation_id = websocket.headers.get("x-ms-call-correlation-id") + call_connection_id = websocket.headers.get("x-ms-call-connection-id") + call_media = call_automation_client.get_call_connection(call_connection_id).get_call_media() + conversation_id = call_store[correlation_id]["conversation_id"] + try: while True: - try: - # Receive data from the client - message = await websocket.receive() - await process_websocket_message_async(message) - except Exception as e: - print(f"Error while receiving message: {e}") - break # Close connection on error - except Exception as e: - print(f"WebSocket connection closed: {e}") - finally: - # Any cleanup or final logs can go here - print("WebSocket connection closed") + message = await websocket.receive() + if "Intermediate" in message: + await call_media.cancel_all_media_operations() + else: + transcription_data = json.loads(message) + if transcription_data.get("resultState") == "Final": + await send_message(conversation_id, transcription_data["text"]) + except Exception as ex: + app.logger.error(f"WebSocket error: {ex}") + +async def start_conversation(): + response = await http_client.post("https://directline.botframework.com/v3/directline/conversations") + response.raise_for_status() + return response.json() + +async def listen_to_bot_websocket(stream_url, call_connection): + async with AsyncClient() as ws_client: + async with ws_client.stream("GET", stream_url) as websocket: + async for message in websocket.aiter_text(): + bot_activity = extract_latest_bot_activity(message) + if bot_activity["type"] == "message": + await play_to_all(call_connection.get_call_media(), bot_activity["text"]) + elif bot_activity["type"] == "endOfConversation": + await call_connection.hang_up(is_for_everyone=True) + +async def send_message(conversation_id, message): + payload = { + "type": "message", + "from": {"id": "user1"}, + "text": message + } + await http_client.post( + f"https://directline.botframework.com/v3/directline/conversations/{conversation_id}/activities", + json=payload + ) + +def extract_latest_bot_activity(raw_message): + try: + activities = json.loads(raw_message).get("activities", []) + for activity in reversed(activities): + if activity["type"] == "message" and activity["from"]["id"] != "user1": + return {"type": "message", "text": activity.get("text", "")} + elif activity["type"] == "endOfConversation": + return {"type": "endOfConversation"} + except Exception as ex: + app.logger.error(f"Error parsing bot activity: {ex}") + return {"type": "error", "text": "Something went wrong"} -@app.route('/') -async def index_handler(): - return await render_template("index.html") +async def play_to_all(call_media, message): + ssml_source = SsmlSource(f"{message}") + play_options = PlayToAllOptions(ssml_source) + await call_media.play_to_all(play_options) -if __name__ == '__main__': - app.logger.setLevel(INFO) - app.run(port=8080) +if __name__ == "__main__": + app.run(port=8080) \ No newline at end of file diff --git a/callautomation-live-transcription/requirements.txt b/callautomation-live-transcription/requirements.txt index 0f35282..5ed774f 100644 --- a/callautomation-live-transcription/requirements.txt +++ b/callautomation-live-transcription/requirements.txt @@ -1,4 +1,6 @@ -Quart>=0.19.6 -azure-eventgrid==4.11.0 -aiohttp>= 3.11.9 +quart==0.19.6 azure-communication-callautomation==1.3.0b1 +azure-eventgrid==4.11.0 +azure-core==1.28.0 +requests==2.31.0 +websockets==11.0.3 \ No newline at end of file diff --git a/callautomation-mcs-sample/botActivity.py b/callautomation-mcs-sample/botActivity.py new file mode 100644 index 0000000..a96e248 --- /dev/null +++ b/callautomation-mcs-sample/botActivity.py @@ -0,0 +1,6 @@ +from typing import Optional + +class BotActivity: + def __init__(self, type: Optional[str] = None, text: Optional[str] = None): + self.type = type + self.text = text diff --git a/callautomation-mcs-sample/callContext.py b/callautomation-mcs-sample/callContext.py new file mode 100644 index 0000000..76005a0 --- /dev/null +++ b/callautomation-mcs-sample/callContext.py @@ -0,0 +1,6 @@ +from typing import Optional + +class CallContext: + def __init__(self, correlation_id: Optional[str] = None, conversation_id: Optional[str] = None): + self.correlation_id = correlation_id + self.conversation_id = conversation_id diff --git a/callautomation-mcs-sample/conversation.py b/callautomation-mcs-sample/conversation.py new file mode 100644 index 0000000..a06a781 --- /dev/null +++ b/callautomation-mcs-sample/conversation.py @@ -0,0 +1,9 @@ +from typing import Optional + +class Conversation: + def __init__(self, conversation_id: Optional[str] = None, token: Optional[str] = None, + stream_url: Optional[str] = None, reference_grammar_id: Optional[str] = None): + self.conversation_id = conversation_id + self.token = token + self.stream_url = stream_url + self.reference_grammar_id = reference_grammar_id diff --git a/callautomation-mcs-sample/main.py b/callautomation-mcs-sample/main.py new file mode 100644 index 0000000..9fac800 --- /dev/null +++ b/callautomation-mcs-sample/main.py @@ -0,0 +1,246 @@ +from quart import Quart, request, Response, websocket +from azure.communication.callautomation import CallAutomationClient, SsmlSource +import os +import asyncio +import requests +from collections import defaultdict +from azure.eventgrid import EventGridEvent, SystemEventNames +from azure.core.messaging import CloudEvent +import websockets + +# Flask app setup +app = Quart(__name__) + +# Load configuration from environment variables +ACS_CONNECTION_STRING = os.getenv("ACS_CONNECTION_STRING") +COGNITIVE_SERVICE_ENDPOINT = os.getenv("COGNITIVE_SERVICE_ENDPOINT") +DIRECT_LINE_SECRET = os.getenv("DIRECT_LINE_SECRET") +BASE_URI = os.getenv("BASE_URI", "").rstrip("/") +BASE_WSS_URI = BASE_URI.split("https://")[1] if BASE_URI else None + +# Initialize Call Automation Client +call_automation_client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING) + +# Store call contexts +call_store = defaultdict(dict) + +# HTTP client for Direct Line +headers = {"Authorization": f"Bearer {DIRECT_LINE_SECRET}"} +http_client = requests.Session() +http_client.headers.update(headers) + + +@app.route("/", methods=["GET"]) +async def home(): + app.logger.info(f"Received events") + return "Hello ACS CallAutomation - MCS Sample!" + + +@app.route("/api/incomingCall", methods=["POST"]) +async def incoming_call(): + event_grid_events = await request.json + app.logger.info(f"Received events: {event_grid_events}") + for event_grid_event in event_grid_events: + event = EventGridEvent.from_dict(event_grid_event) + if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: + validation_code = event.data["validationCode"] + # Respond to validation event + return Response(response=json.dumps({"validationResponse": validation_code}), status=200) + + incoming_call_context = event.data["incomingCallContext"] + callback_uri = f"{BASE_URI}/api/calls/{uuid.uuid4()}" + + try: + answer_call_result = await call_automation_client.answer_call( + incoming_call_context=incoming_call_context, + callback_url=callback_uri, + cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT + ) + correlation_id = answer_call_result.call_connection_properties.correlation_id + if correlation_id: + call_store[correlation_id] = {"correlation_id": correlation_id} + except Exception as ex: + app.logger.error(f"Error answering call: {ex}") + return Response(status=200) + + +@app.route("/api/calls/", methods=["POST"]) +async def call_events(context_id): + cloud_events = await request.json + for cloud_event in cloud_events: + event = CloudEvent.from_dict(cloud_event) + call_connection = call_automation_client.get_call_connection(event.data["callConnectionId"]) + call_media = call_connection.get_call_media() + correlation_id = event.data["correlationId"] + + if event.type == "Microsoft.Communication.CallConnected": + conversation = await start_conversation() + conversation_id = conversation["conversationId"] + call_store[correlation_id]["conversation_id"] = conversation_id + + asyncio.create_task(listen_to_bot_websocket(conversation["streamUrl"], call_connection)) + await send_message(conversation_id, "Hi") + + elif event.type == "Microsoft.Communication.PlayFailed": + app.logger.info("Play Failed") + + elif event.type == "Microsoft.Communication.PlayCompleted": + app.logger.info("Play Completed") + + elif event.type == "Microsoft.Communication.CallDisconnected": + call_store.pop(correlation_id, None) + return Response(status=200) + +@app.websocket('/ws') +async def websocket_handler(): + # Extract headers + correlation_id = websocket.headers.get("x-ms-call-correlation-id") + call_connection_id = websocket.headers.get("x-ms-call-connection-id") + + # Get call media + call_media = None + if call_connection_id: + call_connection = call_automation_client.get_call_connection(call_connection_id) + call_media = call_connection.get_call_media() + + app.logger.info(f"****************************** Correlation ID: {correlation_id}") + app.logger.info(f"****************************** Call Connection ID: {call_connection_id}") + + conversation_id = call_store.get(correlation_id, {}).get("conversation_id") + + try: + partial_data = "" + + while True: + # Receive data from WebSocket + data = await websocket.receive() + if not data: + break + + try: + # Handle partial data + partial_data += data + if data.endswith("\n"): # Assuming messages are newline-delimited + message = partial_data.strip() + partial_data = "" + + app.logger.info(f"\n[{asyncio.get_event_loop().time()}] {message}") + + if "Intermediate" in message: + app.logger.info("\nCanceling prompt") + if call_media: + await call_media.cancel_all_media_operations() + else: + # Parse transcription data + transcription_data = json.loads(message) + if transcription_data.get("type") == "TranscriptionData": + text = transcription_data.get("text", "") + app.logger.info(f"\n[{asyncio.get_event_loop().time()}] {text}") + + if transcription_data.get("resultState") == "Final": + if not conversation_id: + conversation_id = call_store.get(correlation_id, {}).get("conversation_id") + + if conversation_id: + await send_message_to_bot(conversation_id, text) + else: + app.logger.info("\nConversation ID is null") + except Exception as ex: + app.logger.info(f"Exception while processing WebSocket message: {ex}") + except Exception as ex: + app.logger.info(f"WebSocket error: {ex}") + finally: + app.logger.info("WebSocket connection closed") + +async def start_conversation(): + response = http_client.post("https://directline.botframework.com/v3/directline/conversations") + response.raise_for_status() + return response.json() + + +async def send_message_to_bot(conversation_id, message): + payload = { + "type": "message", + "from": {"id": "user1"}, + "text": message, + } + response = http_client.post( + f"https://directline.botframework.com/v3/directline/conversations/{conversation_id}/activities", + json=payload, + ) + response.raise_for_status() + +def extract_latest_bot_activity(raw_message): + try: + activities = json.loads(raw_message).get("activities", []) + for activity in reversed(activities): + if activity["type"] == "message" and activity["from"]["id"] != "user1": + return {"type": "message", "text": remove_references(activity.get("text", ""))} + elif activity["type"] == "endOfConversation": + return {"type": "endOfConversation"} + except Exception as ex: + app.logger.info(f"Error parsing bot activity: {ex}") + return {"type": "error", "text": "Something went wrong"} + + +def remove_references(input_text): + # Remove inline references like [1], [2], etc. + without_inline_refs = re.sub(r"\[\d+\]", "", input_text) + + # Remove reference list at the end (lines starting with [number]:) + without_ref_list = re.sub(r"\n\[\d+\]:.*(\n|$)", "", without_inline_refs) + + return without_ref_list.strip() + +async def listen_to_bot_websocket(stream_url, call_connection): + if not stream_url: + app.logger.info("WebSocket streaming is not enabled for this MCS bot.") + return + + async with websockets.connect(stream_url) as ws: + try: + while True: + message = await ws.recv() + bot_activity = extract_latest_bot_activity(message) + + if bot_activity["type"] == "message": + app.logger.info(f"\nPlaying Bot Response: {bot_activity['text']}\n") + await play_to_all(call_connection.get_call_media(), bot_activity["text"]) + elif bot_activity["type"] == "endOfConversation": + app.logger.info("\nEnd of Conversation\n") + await call_connection.hang_up() + break + except Exception as ex: + app.logger.info(f"WebSocket error: {ex}") + +def play_to_all(correlation_id, message): + ssml = f""" + + {message} + + """ + play_source = SsmlSource(ssml) + call_media = call_automation_client.get_call_connection(correlation_id).get_call_media() + + # Use the play_to_all method directly + call_media.play_to_all( + play_source=play_source, + operation_context="Testing" # Optional: Add context for tracking + ) + + +def extract_latest_bot_activity(raw_message): + try: + activities = json.loads(raw_message).get("activities", []) + for activity in reversed(activities): + if activity["type"] == "message" and activity["from"]["id"] != "user1": + return {"type": "message", "text": activity.get("text", "")} + elif activity["type"] == "endOfConversation": + return {"type": "endOfConversation"} + except Exception as ex: + app.logger.info(f"Error parsing bot activity: {ex}") + return {"type": "error", "text": "Something went wrong"} + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=49412, debug=True) diff --git a/callautomation-mcs-sample/requirements.txt b/callautomation-mcs-sample/requirements.txt new file mode 100644 index 0000000..3ff0d72 --- /dev/null +++ b/callautomation-mcs-sample/requirements.txt @@ -0,0 +1,4 @@ +Quart>=0.19.6 +azure-eventgrid==4.11.0 +aiohttp>= 3.11.9 +azure-communication-callautomation==1.3.0b1 \ No newline at end of file From 5b818a54b6f5daf88a27e6c361f395e1fbb5ba56 Mon Sep 17 00:00:00 2001 From: Yamini Pradhan Date: Tue, 15 Apr 2025 11:36:48 -0400 Subject: [PATCH 2/7] Adds websocket logic --- callautomation-mcs-sample/main.py | 206 +++++++++++---------- callautomation-mcs-sample/requirements.txt | 3 +- 2 files changed, 114 insertions(+), 95 deletions(-) diff --git a/callautomation-mcs-sample/main.py b/callautomation-mcs-sample/main.py index 9fac800..c6a648c 100644 --- a/callautomation-mcs-sample/main.py +++ b/callautomation-mcs-sample/main.py @@ -1,14 +1,19 @@ from quart import Quart, request, Response, websocket -from azure.communication.callautomation import CallAutomationClient, SsmlSource -import os -import asyncio -import requests -from collections import defaultdict +from azure.communication.callautomation import TextSource +from azure.communication.callautomation.aio import CallAutomationClient from azure.eventgrid import EventGridEvent, SystemEventNames from azure.core.messaging import CloudEvent +import asyncio +import requests import websockets +import json +import re +import uuid +from collections import defaultdict +from urllib.parse import urlparse, urlunparse +import os -# Flask app setup +# Initialize Quart app app = Quart(__name__) # Load configuration from environment variables @@ -29,48 +34,71 @@ http_client = requests.Session() http_client.headers.update(headers) - @app.route("/", methods=["GET"]) async def home(): - app.logger.info(f"Received events") + """Home route to verify the service is running.""" + app.logger.info("Received events") return "Hello ACS CallAutomation - MCS Sample!" - @app.route("/api/incomingCall", methods=["POST"]) async def incoming_call(): - event_grid_events = await request.json - app.logger.info(f"Received events: {event_grid_events}") - for event_grid_event in event_grid_events: - event = EventGridEvent.from_dict(event_grid_event) - if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: - validation_code = event.data["validationCode"] - # Respond to validation event - return Response(response=json.dumps({"validationResponse": validation_code}), status=200) - - incoming_call_context = event.data["incomingCallContext"] - callback_uri = f"{BASE_URI}/api/calls/{uuid.uuid4()}" - - try: - answer_call_result = await call_automation_client.answer_call( - incoming_call_context=incoming_call_context, - callback_url=callback_uri, - cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT - ) - correlation_id = answer_call_result.call_connection_properties.correlation_id - if correlation_id: - call_store[correlation_id] = {"correlation_id": correlation_id} - except Exception as ex: - app.logger.error(f"Error answering call: {ex}") - return Response(status=200) - + """ + Handles incoming call events from Azure Event Grid. + Validates subscription and answers incoming calls. + """ + app.logger.info("Received incoming call event.") + try: + for event_dict in await request.json: + event = EventGridEvent.from_dict(event_dict) + app.logger.info("Incoming event data: %s", event.data) + + # Handle subscription validation + if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: + app.logger.info("Validating subscription") + validation_code = event.data['validationCode'] + return Response(response=json.dumps({"validationResponse": validation_code}), status=200) + + # Handle incoming call + if event.event_type == "Microsoft.Communication.IncomingCall": + app.logger.info("Incoming call received: data=%s", event.data) + caller_id = event.data['from'].get("phoneNumber", {}).get("value", event.data['from']['rawId']) + app.logger.info("Incoming call handler caller ID: %s", caller_id) + + incoming_call_context = event.data['incomingCallContext'] + guid = uuid.uuid4() + callback_uri = f"{BASE_URI}/api/calls/{guid}?callerId={caller_id}" + app.logger.info(f"Callback URI: {callback_uri}") + + try: + answer_call_result = await call_automation_client.answer_call( + incoming_call_context=incoming_call_context, + cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, + callback_url=callback_uri + ) + app.logger.info(f"Call answered, connection ID: {answer_call_result.call_connection_id}") + except Exception as e: + app.logger.error(f"Failed to answer call: {e}") + return Response(status=500) + return Response(status=200) + except Exception as ex: + app.logger.error(f"Error handling incoming call: {ex}") + return Response(status=500) @app.route("/api/calls/", methods=["POST"]) async def call_events(context_id): + """ + Handles call events such as CallConnected, PlayCompleted, and CallDisconnected. + """ + app.logger.info("Received call event.") + app.logger.info(f"Context ID: {context_id}") + app.logger.info(f"Request data: {await request.data}") cloud_events = await request.json + for cloud_event in cloud_events: event = CloudEvent.from_dict(cloud_event) - call_connection = call_automation_client.get_call_connection(event.data["callConnectionId"]) - call_media = call_connection.get_call_media() + call_connection_id = event.data["callConnectionId"] + app.logger.info(f"Call connection ID: {call_connection_id}") + call_connection = call_automation_client.get_call_connection(call_connection_id) correlation_id = event.data["correlationId"] if event.type == "Microsoft.Communication.CallConnected": @@ -78,8 +106,8 @@ async def call_events(context_id): conversation_id = conversation["conversationId"] call_store[correlation_id]["conversation_id"] = conversation_id - asyncio.create_task(listen_to_bot_websocket(conversation["streamUrl"], call_connection)) - await send_message(conversation_id, "Hi") + asyncio.create_task(listen_to_bot_websocket(conversation["streamUrl"], call_connection, call_connection_id)) + await send_message_to_bot(conversation_id, "Hi") elif event.type == "Microsoft.Communication.PlayFailed": app.logger.info("Play Failed") @@ -93,18 +121,14 @@ async def call_events(context_id): @app.websocket('/ws') async def websocket_handler(): - # Extract headers + """ + WebSocket handler for processing transcription data and bot responses. + """ correlation_id = websocket.headers.get("x-ms-call-correlation-id") call_connection_id = websocket.headers.get("x-ms-call-connection-id") - # Get call media - call_media = None - if call_connection_id: - call_connection = call_automation_client.get_call_connection(call_connection_id) - call_media = call_connection.get_call_media() - - app.logger.info(f"****************************** Correlation ID: {correlation_id}") - app.logger.info(f"****************************** Call Connection ID: {call_connection_id}") + app.logger.info(f"Correlation ID: {correlation_id}") + app.logger.info(f"Call Connection ID: {call_connection_id}") conversation_id = call_store.get(correlation_id, {}).get("conversation_id") @@ -112,30 +136,28 @@ async def websocket_handler(): partial_data = "" while True: - # Receive data from WebSocket data = await websocket.receive() if not data: break try: - # Handle partial data partial_data += data - if data.endswith("\n"): # Assuming messages are newline-delimited + if data.endswith("\n"): message = partial_data.strip() partial_data = "" - app.logger.info(f"\n[{asyncio.get_event_loop().time()}] {message}") + app.logger.info(f"Received message: {message}") if "Intermediate" in message: - app.logger.info("\nCanceling prompt") - if call_media: - await call_media.cancel_all_media_operations() + app.logger.info("Canceling prompt") + if call_connection_id: + call_connection = call_automation_client.get_call_connection(call_connection_id) + await call_connection.cancel_all_media_operations() else: - # Parse transcription data transcription_data = json.loads(message) if transcription_data.get("type") == "TranscriptionData": text = transcription_data.get("text", "") - app.logger.info(f"\n[{asyncio.get_event_loop().time()}] {text}") + app.logger.info(f"Transcription text: {text}") if transcription_data.get("resultState") == "Final": if not conversation_id: @@ -144,7 +166,7 @@ async def websocket_handler(): if conversation_id: await send_message_to_bot(conversation_id, text) else: - app.logger.info("\nConversation ID is null") + app.logger.info("Conversation ID is null") except Exception as ex: app.logger.info(f"Exception while processing WebSocket message: {ex}") except Exception as ex: @@ -153,12 +175,17 @@ async def websocket_handler(): app.logger.info("WebSocket connection closed") async def start_conversation(): + """ + Starts a new conversation with the bot using Direct Line API. + """ response = http_client.post("https://directline.botframework.com/v3/directline/conversations") response.raise_for_status() return response.json() - async def send_message_to_bot(conversation_id, message): + """ + Sends a message to the bot using Direct Line API. + """ payload = { "type": "message", "from": {"id": "user1"}, @@ -171,6 +198,9 @@ async def send_message_to_bot(conversation_id, message): response.raise_for_status() def extract_latest_bot_activity(raw_message): + """ + Extracts the latest bot activity from a WebSocket message. + """ try: activities = json.loads(raw_message).get("activities", []) for activity in reversed(activities): @@ -182,21 +212,24 @@ def extract_latest_bot_activity(raw_message): app.logger.info(f"Error parsing bot activity: {ex}") return {"type": "error", "text": "Something went wrong"} - def remove_references(input_text): - # Remove inline references like [1], [2], etc. + """ + Removes inline references and reference lists from the input text. + """ without_inline_refs = re.sub(r"\[\d+\]", "", input_text) - - # Remove reference list at the end (lines starting with [number]:) without_ref_list = re.sub(r"\n\[\d+\]:.*(\n|$)", "", without_inline_refs) - return without_ref_list.strip() -async def listen_to_bot_websocket(stream_url, call_connection): +async def listen_to_bot_websocket(stream_url, call_connection, call_connection_id): + """ + Listens to the bot's WebSocket stream and processes bot responses. + """ if not stream_url: app.logger.info("WebSocket streaming is not enabled for this MCS bot.") return + app.logger.info(f"Connecting to WebSocket: {stream_url}") + app.logger.info(f"Call Connection ID: {call_connection_id}") async with websockets.connect(stream_url) as ws: try: while True: @@ -204,43 +237,28 @@ async def listen_to_bot_websocket(stream_url, call_connection): bot_activity = extract_latest_bot_activity(message) if bot_activity["type"] == "message": - app.logger.info(f"\nPlaying Bot Response: {bot_activity['text']}\n") - await play_to_all(call_connection.get_call_media(), bot_activity["text"]) + app.logger.info(f"Playing Bot Response: {bot_activity['text']}") + await play_to_all(call_connection_id, bot_activity["text"]) elif bot_activity["type"] == "endOfConversation": - app.logger.info("\nEnd of Conversation\n") + app.logger.info("End of Conversation") await call_connection.hang_up() break except Exception as ex: app.logger.info(f"WebSocket error: {ex}") -def play_to_all(correlation_id, message): - ssml = f""" - - {message} - - """ - play_source = SsmlSource(ssml) - call_media = call_automation_client.get_call_connection(correlation_id).get_call_media() - - # Use the play_to_all method directly - call_media.play_to_all( +async def play_to_all(correlation_id, message): + """ + Plays a message to all participants in the call. + """ + app.logger.info(f"Playing message: {message}") + play_source = TextSource(text=message, voice_name="en-US-NancyNeural") + app.logger.info(f"Play source: {play_source}") + call_media = call_automation_client.get_call_connection(correlation_id) + + await call_media.play_media_to_all( play_source=play_source, - operation_context="Testing" # Optional: Add context for tracking + operation_context="Testing" ) - -def extract_latest_bot_activity(raw_message): - try: - activities = json.loads(raw_message).get("activities", []) - for activity in reversed(activities): - if activity["type"] == "message" and activity["from"]["id"] != "user1": - return {"type": "message", "text": activity.get("text", "")} - elif activity["type"] == "endOfConversation": - return {"type": "endOfConversation"} - except Exception as ex: - app.logger.info(f"Error parsing bot activity: {ex}") - return {"type": "error", "text": "Something went wrong"} - - if __name__ == "__main__": - app.run(host="0.0.0.0", port=49412, debug=True) + app.run(host="0.0.0.0", port=49412, debug=True) \ No newline at end of file diff --git a/callautomation-mcs-sample/requirements.txt b/callautomation-mcs-sample/requirements.txt index 3ff0d72..dab1829 100644 --- a/callautomation-mcs-sample/requirements.txt +++ b/callautomation-mcs-sample/requirements.txt @@ -1,4 +1,5 @@ Quart>=0.19.6 azure-eventgrid==4.11.0 aiohttp>= 3.11.9 -azure-communication-callautomation==1.3.0b1 \ No newline at end of file +azure-communication-callautomation==1.3.0b1 +websockets==11.0.3 \ No newline at end of file From 52c9a535a01f92c37c83ddfe2a288eb2d3c77774 Mon Sep 17 00:00:00 2001 From: Yamini Pradhan Date: Wed, 16 Apr 2025 15:20:51 -0400 Subject: [PATCH 3/7] Adds readme.md file --- callautomation-mcs-sample/main.py | 178 ++++++++++++++------- callautomation-mcs-sample/readme.md | 63 ++++++++ callautomation-mcs-sample/requirements.txt | 6 +- 3 files changed, 182 insertions(+), 65 deletions(-) create mode 100644 callautomation-mcs-sample/readme.md diff --git a/callautomation-mcs-sample/main.py b/callautomation-mcs-sample/main.py index c6a648c..327611d 100644 --- a/callautomation-mcs-sample/main.py +++ b/callautomation-mcs-sample/main.py @@ -1,5 +1,5 @@ from quart import Quart, request, Response, websocket -from azure.communication.callautomation import TextSource +from azure.communication.callautomation import TranscriptionOptions, TranscriptionTransportType, SsmlSource from azure.communication.callautomation.aio import CallAutomationClient from azure.eventgrid import EventGridEvent, SystemEventNames from azure.core.messaging import CloudEvent @@ -12,6 +12,8 @@ from collections import defaultdict from urllib.parse import urlparse, urlunparse import os +from logging import INFO +import html # Initialize Quart app app = Quart(__name__) @@ -22,6 +24,7 @@ DIRECT_LINE_SECRET = os.getenv("DIRECT_LINE_SECRET") BASE_URI = os.getenv("BASE_URI", "").rstrip("/") BASE_WSS_URI = BASE_URI.split("https://")[1] if BASE_URI else None +LOCALE="en-US" # Initialize Call Automation Client call_automation_client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING) @@ -37,7 +40,6 @@ @app.route("/", methods=["GET"]) async def home(): """Home route to verify the service is running.""" - app.logger.info("Received events") return "Hello ACS CallAutomation - MCS Sample!" @app.route("/api/incomingCall", methods=["POST"]) @@ -50,7 +52,6 @@ async def incoming_call(): try: for event_dict in await request.json: event = EventGridEvent.from_dict(event_dict) - app.logger.info("Incoming event data: %s", event.data) # Handle subscription validation if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: @@ -60,28 +61,34 @@ async def incoming_call(): # Handle incoming call if event.event_type == "Microsoft.Communication.IncomingCall": - app.logger.info("Incoming call received: data=%s", event.data) caller_id = event.data['from'].get("phoneNumber", {}).get("value", event.data['from']['rawId']) app.logger.info("Incoming call handler caller ID: %s", caller_id) incoming_call_context = event.data['incomingCallContext'] guid = uuid.uuid4() callback_uri = f"{BASE_URI}/api/calls/{guid}?callerId={caller_id}" - app.logger.info(f"Callback URI: {callback_uri}") + websocket_url = urlunparse(("wss", urlparse(BASE_URI).netloc, "/ws", "", "", "")) + transcription_config = TranscriptionOptions( + transport_url=websocket_url, + transport_type=TranscriptionTransportType.WEBSOCKET, + locale=LOCALE, + start_transcription=True + ) try: answer_call_result = await call_automation_client.answer_call( incoming_call_context=incoming_call_context, cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, - callback_url=callback_uri + callback_url=callback_uri, + transcription=transcription_config ) - app.logger.info(f"Call answered, connection ID: {answer_call_result.call_connection_id}") + app.logger.info("Call answered, connection ID: %s", answer_call_result.call_connection_id) except Exception as e: - app.logger.error(f"Failed to answer call: {e}") + app.logger.error("Failed to answer call: %s", e) return Response(status=500) return Response(status=200) except Exception as ex: - app.logger.error(f"Error handling incoming call: {ex}") + app.logger.error("Error handling incoming call: %s", ex) return Response(status=500) @app.route("/api/calls/", methods=["POST"]) @@ -90,32 +97,50 @@ async def call_events(context_id): Handles call events such as CallConnected, PlayCompleted, and CallDisconnected. """ app.logger.info("Received call event.") - app.logger.info(f"Context ID: {context_id}") - app.logger.info(f"Request data: {await request.data}") cloud_events = await request.json for cloud_event in cloud_events: event = CloudEvent.from_dict(cloud_event) call_connection_id = event.data["callConnectionId"] - app.logger.info(f"Call connection ID: {call_connection_id}") - call_connection = call_automation_client.get_call_connection(call_connection_id) + app.logger.info("Call connection ID: %s", call_connection_id) correlation_id = event.data["correlationId"] if event.type == "Microsoft.Communication.CallConnected": + app.logger.info("Call connected event received.") conversation = await start_conversation() conversation_id = conversation["conversationId"] call_store[correlation_id]["conversation_id"] = conversation_id - - asyncio.create_task(listen_to_bot_websocket(conversation["streamUrl"], call_connection, call_connection_id)) + call_properties = await call_automation_client.get_call_connection(call_connection_id).get_call_properties() + app.logger.info("Transcription subscription: %s", call_properties.transcription_subscription) + + asyncio.create_task(listen_to_bot_websocket(conversation["streamUrl"], call_connection_id)) await send_message_to_bot(conversation_id, "Hi") + elif event.type == "Microsoft.Communication.RecognizeCompleted": + app.logger.info("Recognize Completed event received.") + + elif event.type == "Microsoft.Communication.TranscriptionStarted": + app.logger.info("Transcription Started event received.") + + elif event.type == "Microsoft.Communication.TranscriptionCompleted": + app.logger.info("Transcription Completed event received.") + elif event.type == "Microsoft.Communication.PlayFailed": - app.logger.info("Play Failed") + app.logger.info("Play Failed event received.") + result_info = event.data['resultInformation'] + app.logger.info("Code: %s, Subcode: %s", result_info['code'], result_info['subCode']) + app.logger.info("Message: %s", result_info['message']) elif event.type == "Microsoft.Communication.PlayCompleted": - app.logger.info("Play Completed") + app.logger.info("Play Completed event received.") + context=event.data['operationContext'] + app.logger.info("Context: %s", context) + + elif event.type == "Microsoft.Communication.PlayStarted": + app.logger.info("Play Started event received.") elif event.type == "Microsoft.Communication.CallDisconnected": + app.logger.info("Call Disconnected event received.") call_store.pop(correlation_id, None) return Response(status=200) @@ -127,50 +152,55 @@ async def websocket_handler(): correlation_id = websocket.headers.get("x-ms-call-correlation-id") call_connection_id = websocket.headers.get("x-ms-call-connection-id") - app.logger.info(f"Correlation ID: {correlation_id}") - app.logger.info(f"Call Connection ID: {call_connection_id}") + app.logger.info("Correlation ID: %s", correlation_id) + app.logger.info("Call Connection ID: %s", call_connection_id) conversation_id = call_store.get(correlation_id, {}).get("conversation_id") - try: - partial_data = "" + # Get call connection + call_connection = None + if call_connection_id: + call_connection = call_automation_client.get_call_connection(call_connection_id) + try: while True: data = await websocket.receive() if not data: break + app.logger.info("Received raw WebSocket message: %s", data) + try: - partial_data += data - if data.endswith("\n"): - message = partial_data.strip() - partial_data = "" - - app.logger.info(f"Received message: {message}") - - if "Intermediate" in message: - app.logger.info("Canceling prompt") - if call_connection_id: - call_connection = call_automation_client.get_call_connection(call_connection_id) - await call_connection.cancel_all_media_operations() - else: - transcription_data = json.loads(message) - if transcription_data.get("type") == "TranscriptionData": - text = transcription_data.get("text", "") - app.logger.info(f"Transcription text: {text}") - - if transcription_data.get("resultState") == "Final": - if not conversation_id: - conversation_id = call_store.get(correlation_id, {}).get("conversation_id") - - if conversation_id: - await send_message_to_bot(conversation_id, text) - else: - app.logger.info("Conversation ID is null") + message = data.strip() + + app.logger.info("Processed WebSocket message: %s", message) + if "Intermediate" in message: + print("\nCanceling prompt") + if call_connection: + await call_connection.cancel_all_media_operations() + else: + transcription_data = json.loads(message) + if transcription_data.get("kind") == "TranscriptionData": + transcription = transcription_data.get("transcriptionData", {}) + text = transcription.get("text", "") + result_status = transcription.get("resultStatus", "") + + app.logger.info("Transcription text: %s", text) + app.logger.info("Transcription result status: %s", result_status) + + if result_status == "Final": + if not conversation_id: + conversation_id = call_store.get(correlation_id, {}).get("conversation_id") + + if conversation_id: + await send_message_to_bot(conversation_id, text) + app.logger.info("Sent transcription to bot: %s", text) + else: + app.logger.info("Conversation ID is null") except Exception as ex: - app.logger.info(f"Exception while processing WebSocket message: {ex}") + app.logger.info("Exception while processing WebSocket message: %s", ex) except Exception as ex: - app.logger.info(f"WebSocket error: {ex}") + app.logger.info("WebSocket error: %s", ex) finally: app.logger.info("WebSocket connection closed") @@ -186,6 +216,7 @@ async def send_message_to_bot(conversation_id, message): """ Sends a message to the bot using Direct Line API. """ + app.logger.info("Sending message to bot: %s", message) payload = { "type": "message", "from": {"id": "user1"}, @@ -196,11 +227,16 @@ async def send_message_to_bot(conversation_id, message): json=payload, ) response.raise_for_status() + app.logger.info("Message sent successfully.") def extract_latest_bot_activity(raw_message): """ Extracts the latest bot activity from a WebSocket message. """ + if not raw_message.strip(): + app.logger.info("Received an empty message.") + return {"type": "error", "text": "Empty message received"} + try: activities = json.loads(raw_message).get("activities", []) for activity in reversed(activities): @@ -208,9 +244,12 @@ def extract_latest_bot_activity(raw_message): return {"type": "message", "text": remove_references(activity.get("text", ""))} elif activity["type"] == "endOfConversation": return {"type": "endOfConversation"} + except json.JSONDecodeError as ex: + app.logger.info(f"JSON decoding error: {ex}") except Exception as ex: - app.logger.info(f"Error parsing bot activity: {ex}") - return {"type": "error", "text": "Something went wrong"} + app.logger.info(f"Unexpected error: {ex}") + + return {"type": "error", "text": "Invalid or unrecognized message format"} def remove_references(input_text): """ @@ -220,7 +259,7 @@ def remove_references(input_text): without_ref_list = re.sub(r"\n\[\d+\]:.*(\n|$)", "", without_inline_refs) return without_ref_list.strip() -async def listen_to_bot_websocket(stream_url, call_connection, call_connection_id): +async def listen_to_bot_websocket(stream_url, call_connection_id): """ Listens to the bot's WebSocket stream and processes bot responses. """ @@ -241,24 +280,39 @@ async def listen_to_bot_websocket(stream_url, call_connection, call_connection_i await play_to_all(call_connection_id, bot_activity["text"]) elif bot_activity["type"] == "endOfConversation": app.logger.info("End of Conversation") - await call_connection.hang_up() + await call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True) break except Exception as ex: app.logger.info(f"WebSocket error: {ex}") async def play_to_all(correlation_id, message): """ - Plays a message to all participants in the call. + Plays a message to all participants in the call using SSML. """ + if not message.strip(): + app.logger.warning("Cannot play an empty message. Skipping playback.") + return + app.logger.info(f"Playing message: {message}") - play_source = TextSource(text=message, voice_name="en-US-NancyNeural") - app.logger.info(f"Play source: {play_source}") - call_media = call_automation_client.get_call_connection(correlation_id) - await call_media.play_media_to_all( - play_source=play_source, - operation_context="Testing" - ) + # Escape special characters in the message + escaped_message = html.escape(message) + + # Create an SSML source + ssml_template = f"""{escaped_message}""" + app.logger.info(f"Generated SSML: {ssml_template}") + + try: + play_source = SsmlSource(ssml_text=ssml_template) + call_media = call_automation_client.get_call_connection(correlation_id) + + await call_media.play_media_to_all( + play_source=play_source, + operation_context="Testing" + ) + except Exception as ex: + app.logger.error(f"Error playing message: {ex}") if __name__ == "__main__": - app.run(host="0.0.0.0", port=49412, debug=True) \ No newline at end of file + app.logger.setLevel(INFO) + app.run(port=8080) \ No newline at end of file diff --git a/callautomation-mcs-sample/readme.md b/callautomation-mcs-sample/readme.md new file mode 100644 index 0000000..4b80d61 --- /dev/null +++ b/callautomation-mcs-sample/readme.md @@ -0,0 +1,63 @@ +|page_type| languages |products +|---|-----------------------------------------|---| +|sample|
Python
|
azureazure-communication-services
| + +# Call Automation - Quick Start Sample + +This is a sample application that demonstrates the integration of **Azure Communication Services (ACS)** with **Azure Cognitive Services** and a bot using the **Direct Line API**. It enables real-time transcription of calls and interaction with a bot, with responses played back to the caller using SSML (Speech Synthesis Markup Language). + +## Prerequisites + +- **Azure Account**: Create an Azure account with an active subscription. For details, see [Create an account for free](https://azure.microsoft.com/free/). +- **Azure Communication Services Resource**: Create an ACS resource. For details, see [Create an Azure Communication Resource](https://docs.microsoft.com/azure/communication-services/quickstarts/create-communication-resource). Record your resource **connection string** for this sample. +- **Calling-Enabled Phone Number**: Obtain a phone number. For details, see [Get a phone number](https://learn.microsoft.com/en-us/azure/communication-services/quickstarts/telephony/get-phone-number?tabs=windows&pivots=platform-azp). +- **Azure Cognitive Services Resource**: Set up a Cognitive Services resource. For details, see [Create a Cognitive Services resource](https://learn.microsoft.com/en-us/azure/cognitive-services/cognitive-services-apis-create-account). +- **Bot Framework**: Create a bot and enable the **Direct Line channel**. Obtain the **Direct Line secret**. +- **Azure Dev Tunnels CLI**: Install and configure Azure Dev Tunnels. For details, see [Enable dev tunnel](https://learn.microsoft.com/en-us/azure/developer/dev-tunnels/get-started?tabs=windows). + +## Before running the sample for the first time + +1. Open an instance of PowerShell, Windows Terminal, Command Prompt or equivalent and navigate to the directory that you would like to clone the sample to. +2. git clone `https://github.com/Azure-Samples/communication-services-python-quickstarts.git`. +3. Navigate to `callautomation-mcs-sample` folder and open `main.py` file. + +### Setup the Python environment + +[Optional] Create and activate python virtual environment and install required packages using following command +``` +python -m venv venv +venv\Scripts\activate +``` +Install the required packages using the following command +``` +pip install -r requirements.txt +``` + +### Setup and host your Azure DevTunnel + +[Azure DevTunnels](https://learn.microsoft.com/en-us/azure/developer/dev-tunnels/overview) is an Azure service that enables you to share local web services hosted on the internet. Use the commands below to connect your local development environment to the public internet. This creates a tunnel with a persistent endpoint URL and which allows anonymous access. We will then use this endpoint to notify your application of calling events from the ACS Call Automation service. + +```bash +devtunnel create --allow-anonymous +devtunnel port create -p 8080 +devtunnel host +``` + +### Configuring application + +Create a `.env` file in the project root and add the following settings: + +```plaintext +ACS_CONNECTION_STRING= +COGNITIVE_SERVICE_ENDPOINT= +DIRECT_LINE_SECRET= +BASE_URI= # e.g., https://your-dev-tunnel-url +``` + +## Run app locally + +1. Navigate to `callautomation-msc-sample` folder and run `main.py` in debug mode or use command `python ./main.py` to run it from PowerShell, Command Prompt or Unix Terminal +2. Browser should pop up with the below page. If not navigate it to `http://localhost:8080/` or your ngrok url which points to 8080 port. +4. Register an EventGrid Webhook for the IncomingCall(`https:///api/incomingCall`). Instructions [here](https://learn.microsoft.com/en-us/azure/communication-services/concepts/call-automation/incoming-call-notification). + +Once these steps are completed, you should have a running application. To test the application, place a call to your ACS phone number and interact with your intelligent agent. \ No newline at end of file diff --git a/callautomation-mcs-sample/requirements.txt b/callautomation-mcs-sample/requirements.txt index dab1829..3c83cd3 100644 --- a/callautomation-mcs-sample/requirements.txt +++ b/callautomation-mcs-sample/requirements.txt @@ -1,5 +1,5 @@ -Quart>=0.19.6 -azure-eventgrid==4.11.0 -aiohttp>= 3.11.9 +quart==0.20.0 azure-communication-callautomation==1.3.0b1 +azure-eventgrid==4.11.0 +requests==2.31.0 websockets==11.0.3 \ No newline at end of file From 55b0dbbf601eb4b463a9912d7813048dfdde593b Mon Sep 17 00:00:00 2001 From: Yamini Pradhan Date: Wed, 16 Apr 2025 17:18:16 -0400 Subject: [PATCH 4/7] updates mcs bot description in readme --- callautomation-mcs-sample/readme.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/callautomation-mcs-sample/readme.md b/callautomation-mcs-sample/readme.md index 4b80d61..949eb0c 100644 --- a/callautomation-mcs-sample/readme.md +++ b/callautomation-mcs-sample/readme.md @@ -4,7 +4,7 @@ # Call Automation - Quick Start Sample -This is a sample application that demonstrates the integration of **Azure Communication Services (ACS)** with **Azure Cognitive Services** and a bot using the **Direct Line API**. It enables real-time transcription of calls and interaction with a bot, with responses played back to the caller using SSML (Speech Synthesis Markup Language). +This is a sample application that demonstrates the integration of **Azure Communication Services (ACS)** with **Microsoft Copilot Studio (MCS)** bot using the **Direct Line API**. It enables real-time transcription of calls and interaction with a MCS bot, with responses played back to the caller using SSML (Speech Synthesis Markup Language). ## Prerequisites @@ -12,7 +12,7 @@ This is a sample application that demonstrates the integration of **Azure Commun - **Azure Communication Services Resource**: Create an ACS resource. For details, see [Create an Azure Communication Resource](https://docs.microsoft.com/azure/communication-services/quickstarts/create-communication-resource). Record your resource **connection string** for this sample. - **Calling-Enabled Phone Number**: Obtain a phone number. For details, see [Get a phone number](https://learn.microsoft.com/en-us/azure/communication-services/quickstarts/telephony/get-phone-number?tabs=windows&pivots=platform-azp). - **Azure Cognitive Services Resource**: Set up a Cognitive Services resource. For details, see [Create a Cognitive Services resource](https://learn.microsoft.com/en-us/azure/cognitive-services/cognitive-services-apis-create-account). -- **Bot Framework**: Create a bot and enable the **Direct Line channel**. Obtain the **Direct Line secret**. +- **MCS Bot Framework**: Create a MCS bot and enable the **Direct Line channel**. Obtain the **Direct Line secret**. - **Azure Dev Tunnels CLI**: Install and configure Azure Dev Tunnels. For details, see [Enable dev tunnel](https://learn.microsoft.com/en-us/azure/developer/dev-tunnels/get-started?tabs=windows). ## Before running the sample for the first time From b8cfe4544ae5af29561334cc2c3ace165c0c412e Mon Sep 17 00:00:00 2001 From: Yamini Pradhan Date: Wed, 16 Apr 2025 18:36:55 -0400 Subject: [PATCH 5/7] revert changes to live transcription sample --- callautomation-live-transcription/main.py | 475 +++++++++++++----- .../requirements.txt | 8 +- 2 files changed, 343 insertions(+), 140 deletions(-) diff --git a/callautomation-live-transcription/main.py b/callautomation-live-transcription/main.py index 743e953..14c0e95 100644 --- a/callautomation-live-transcription/main.py +++ b/callautomation-live-transcription/main.py @@ -1,167 +1,372 @@ -from quart import Quart, request, Response, websocket + +import ast +import uuid +import os +from pathlib import Path +from urllib.parse import urlencode, urljoin, urlparse, urlunparse +from azure.eventgrid import EventGridEvent, SystemEventNames +import requests +from quart import Quart, Response, request, json, redirect, websocket, render_template +import json +from logging import INFO +import re from azure.communication.callautomation import ( - CallAutomationClient, SsmlSource, PlayToAllOptions, TranscriptionOptions, TranscriptionTransportType -) + PhoneNumberIdentifier, + RecognizeInputType, + TextSource, + # TranscriptionConfiguration, + TranscriptionTransportType, + ServerCallLocator, + TranscriptionOptions, + RecordingContent, + RecordingChannel, + RecordingFormat + ) +from azure.communication.callautomation.aio import ( + CallAutomationClient + ) from azure.core.messaging import CloudEvent -from azure.eventgrid import EventGridEvent, SystemEventNames +import time import asyncio import json -import uuid -import re -from collections import defaultdict -from httpx import AsyncClient +from azure.communication.callautomation._shared.models import identifier_from_raw_id +from transcriptionDataHandler import process_websocket_message_async +# import openai -# Initialize Quart app -app = Quart(__name__) +# from openai.api_resources import ( +# ChatCompletion +# ) -# Configuration +# Your ACS resource connection string ACS_CONNECTION_STRING = "" -COGNITIVE_SERVICE_ENDPOINT = "" -AGENT_PHONE_NUMBER = "" -DIRECT_LINE_SECRET = "" -BASE_URI = "".rstrip("/") -BASE_WSS_URI = BASE_URI.split("https://")[1] -# Initialize Call Automation Client -call_automation_client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING) +# Cognitive service endpoint +COGNITIVE_SERVICE_ENDPOINT="" -# HTTP client for Direct Line -http_client = AsyncClient(headers={"Authorization": f"Bearer {DIRECT_LINE_SECRET}"}) +# Acs Phone Number +ACS_PHONE_NUMBER="" -# Store call contexts -call_store = defaultdict(dict) +# Transcription Locale +LOCALE="" -@app.route("/", methods=["GET"]) -async def home(): - return "Hello ACS CallAutomation - MCS Sample!" +# Agent Phone Number +AGENT_PHONE_NUMBER="" -@app.route("/api/incomingCall", methods=["POST"]) -async def incoming_call(): - event_grid_events = await request.json - for event_grid_event in event_grid_events: - event = EventGridEvent.from_dict(event_grid_event) - if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: - validation_code = event.data["validationCode"] - return Response(response=json.dumps({"validationResponse": validation_code}), status=200) +# Callback events URI to handle callback events. +CALLBACK_URI_HOST = "" +CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks" - incoming_call_context = event.data["incomingCallContext"] - callback_uri = f"{BASE_URI}/api/calls/{uuid.uuid4()}" +HELP_IVR_PROMPT = "Welcome to the Contoso Utilities. To access your account, we need to verify your identity. Please enter your date of birth in the format DDMMYYYY using the keypad on your phone. Once we’ve validated your identity we will connect you to the next available agent. Please note this call will be recorded!" +ADD_AGENT_PROMPT = "Thank you for verifying your identity. We are now connecting you to the next available agent. Please hold the line and we will be with you shortly. Thank you for your patience." +INCORRECT_DOB_PROMPT = "Sorry, we were unable to verify your identity based on the date of birth you entered. Please try again. Remember to enter your date of birth in the format DDMMYYYY using the keypad on your phone. Once you've entered your date of birth, press the pound key. Thank you!" +ADD_PARTICIPANT_FAILURE_PROMPT = "We're sorry, we were unable to connect you to an agent at this time, we will get the next available agent to call you back as soon as possible." +GOODBYE_PROMPT = "Thank you for calling Contoso Utilities. We hope we were able to assist you today. Goodbye" +TIMEOUT_SILENCE_PROMPT = "I’m sorry, I didn’t receive any input. Please type your date of birth in the format of DDMMYYYY." +GOODBYE_CONTEXT = "Goodbye" +ADD_AGENT_CONTEXT = "AddAgent" +INCORRECT_DOB_CONTEXT = "IncorrectDob" +ADD_PARTICIPANT_FAILURE_CONTEXT = "FailedToAddParticipant" +INCOMING_CALL_CONTEXT = "incomingCallContext" - transcription_options = TranscriptionOptions( - transport_url=f"wss://{BASE_WSS_URI}/ws", - locale="en-US", - start_transcription=True, - transport_type=TranscriptionTransportType.WEBSOCKET - ) +DOB_REGEX = r"^(0[1-9]|[12][0-9]|3[01])(0[1-9]|1[012])[12][0-9]{3}$" - try: - answer_call_result = await call_automation_client.answer_call( - incoming_call_context=incoming_call_context, - callback_url=callback_uri, - cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, - transcription=transcription_options - ) - correlation_id = answer_call_result.call_connection_properties.correlation_id - if correlation_id: - call_store[correlation_id] = {"correlation_id": correlation_id} - except Exception as ex: - app.logger.error(f"Error answering call: {ex}") - return Response(status=200) +call_automation_client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING) -@app.route("/api/calls/", methods=["POST"]) -async def call_events(context_id): - cloud_events = await request.json - for cloud_event in cloud_events: - event = CloudEvent.from_dict(cloud_event) - call_connection = call_automation_client.get_call_connection(event.data["callConnectionId"]) - call_media = call_connection.get_call_media() - correlation_id = event.data["correlationId"] +recording_id = None +recording_chunks_location = [] +recording_callback_url = None +max_retry = 2 +words_to_numbers = { + 'one': 1, + 'two': 2, + 'three': 3, + 'four': 4, + 'five': 5, + 'six': 6, + 'seven': 7, + 'eight': 8, + 'nine': 9, + 'zero': 0 + } - if event.type == "Microsoft.Communication.CallConnected": - conversation = await start_conversation() - conversation_id = conversation["conversationId"] - call_store[correlation_id]["conversation_id"] = conversation_id +TEMPLATE_FILES_PATH = "template" +app = Quart(__name__, + template_folder=TEMPLATE_FILES_PATH) - asyncio.create_task(listen_to_bot_websocket(conversation["streamUrl"], call_connection)) - await send_message(conversation_id, "Hi") +async def handle_recognize(text_to_play,caller_id,call_connection_id,context=""): + play_source = TextSource(text=text_to_play, voice_name="en-US-NancyNeural") + connection_client = call_automation_client.get_call_connection(call_connection_id) + try: + recognize_result = await connection_client.start_recognizing_media( + dtmf_max_tones_to_collect=8, + input_type=RecognizeInputType.DTMF, + target_participant=PhoneNumberIdentifier(caller_id), + end_silence_timeout=15, + dtmf_inter_tone_timeout=5, + play_prompt=play_source, + operation_context=context) + app.logger.info("handle_recognize : data=%s",recognize_result) + except Exception as ex: + app.logger.info("Error in recognize: %s", ex) - elif event.type == "Microsoft.Communication.PlayFailed": - app.logger.info("Play Failed") +async def handle_play(call_connection_id, text_to_play, context): + play_source = TextSource(text=text_to_play, voice_name= "en-US-NancyNeural") + await call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source,operation_context=context) - elif event.type == "Microsoft.Communication.PlayCompleted": - app.logger.info("Play Completed") +async def handle_hangup(call_connection_id): + await call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True) - elif event.type == "Microsoft.Communication.TranscriptionStarted": - app.logger.info(f"Transcription started: {event.data['operationContext']}") +@app.route("/api/incomingCall", methods=['POST']) +async def incoming_call_handler(): + app.logger.info("Received incoming call event.") + try: + for event_dict in await request.json: + event = EventGridEvent.from_dict(event_dict) + app.logger.info("incoming event data --> %s", event.data) + if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: + app.logger.info("Validating subscription") + validation_code = event.data['validationCode'] + return Response(response=json.dumps({"validationResponse": validation_code}), status=200) - elif event.type == "Microsoft.Communication.TranscriptionStopped": - app.logger.info(f"Transcription stopped: {event.data['operationContext']}") + if event.event_type == "Microsoft.Communication.IncomingCall": + app.logger.info("Incoming call received: data=%s", event.data) + if event.data['from']['kind'] =="phoneNumber": + caller_id = event.data['from']["phoneNumber"]["value"] + else : + caller_id = event.data['from']['rawId'] + app.logger.info("incoming call handler caller id: %s", + caller_id) + incoming_call_context = event.data['incomingCallContext'] + guid = uuid.uuid4() + callback_uri = f"{CALLBACK_EVENTS_URI}/{guid}?callerId={caller_id}" + websocket_url = urlunparse(("wss", urlparse(CALLBACK_URI_HOST).netloc, "/ws", "", "", "")) + global recording_callback_url + recording_callback_url = callback_uri + transcription_config = TranscriptionOptions( + transport_url=websocket_url, + transport_type=TranscriptionTransportType.WEBSOCKET, + locale=LOCALE, + start_transcription=True + ) - elif event.type == "Microsoft.Communication.CallDisconnected": - call_store.pop(correlation_id, None) - return Response(status=200) + try: + answer_call_result = await call_automation_client.answer_call( + incoming_call_context=incoming_call_context, + transcription=transcription_config, + cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, + callback_url=callback_uri + ) + app.logger.info(f"Call answered, connection ID: {answer_call_result.call_connection_id}") + except Exception as e: + app.logger.error(f"Failed to answer call: {e}") + return Response(status=500) + return Response(status=200) + except Exception as ex: + app.logger.error(f"Error handling incoming call: {ex}") + return Response(status=500) + +@app.route("/api/callbacks/", methods=["POST"]) +async def handle_callback(contextId): + try: + global caller_id , call_connection_id + # app.logger.info("Request Json: %s", request.json) + for event_dict in await request.json: + event = CloudEvent.from_dict(event_dict) + call_connection_id = event.data['callConnectionId'] -@app.websocket('/ws') -async def ws(): - correlation_id = websocket.headers.get("x-ms-call-correlation-id") - call_connection_id = websocket.headers.get("x-ms-call-connection-id") - call_media = call_automation_client.get_call_connection(call_connection_id).get_call_media() - conversation_id = call_store[correlation_id]["conversation_id"] + app.logger.info("%s event received for call connection id: %s, correlation id: %s", + event.type, call_connection_id, event.data["correlationId"]) + caller_id = request.args.get("callerId").strip() + if "+" not in caller_id: + caller_id="+".strip()+caller_id.strip() - try: - while True: - message = await websocket.receive() - if "Intermediate" in message: - await call_media.cancel_all_media_operations() - else: - transcription_data = json.loads(message) - if transcription_data.get("resultState") == "Final": - await send_message(conversation_id, transcription_data["text"]) + app.logger.info("call connected : data=%s", event.data) + if event.type == "Microsoft.Communication.CallConnected": + recording_result = await call_automation_client.start_recording( + server_call_id=event.data["serverCallId"], + recording_content_type=RecordingContent.AUDIO_VIDEO, + recording_channel_type=RecordingChannel.MIXED, + recording_format_type=RecordingFormat.MP4, + recording_state_callback_url=recording_callback_url, + pause_on_start=True + ) + global recording_id + recording_id=recording_result.recording_id + global call_properties + call_properties = await call_automation_client.get_call_connection(call_connection_id).get_call_properties() + app.logger.info("Transcription subscription--->=%s", call_properties.transcription_subscription) + + elif event.type == "Microsoft.Communication.PlayStarted": + app.logger.info("Received PlayStarted event.") + elif event.type == "Microsoft.Communication.PlayCompleted": + context=event.data['operationContext'] + app.logger.info("Play completed: context=%s", event.data['operationContext']) + if context == ADD_AGENT_CONTEXT: + app.logger.info("Add agent to the call: %s", ADD_AGENT_CONTEXT) + #Add agent + target = PhoneNumberIdentifier(AGENT_PHONE_NUMBER) + source_caller_id_number = PhoneNumberIdentifier(ACS_PHONE_NUMBER) + app.logger.info("source_caller_id_number: %s", source_caller_id_number) + call_connection = call_automation_client.get_call_connection(call_connection_id) + add_participant_result= await call_connection.add_participant(target_participant=target, + source_caller_id_number=source_caller_id_number, + operation_context=None, + invitation_timeout=15) + app.logger.info("Add agent to the call: %s", add_participant_result.invitation_id) + elif context == GOODBYE_CONTEXT or context == ADD_PARTICIPANT_FAILURE_CONTEXT: + await stop_transcription_and_recording(call_connection_id, recording_id=recording_id) + await handle_hangup(call_connection_id=call_connection_id) + elif event.type == "Microsoft.Communication.RecognizeCompleted": + app.logger.info("Recognize completed: data=%s", event.data) + if event.data['recognitionType'] == "dtmf": + dtmf_tones = event.data['dtmfResult']['tones']; + app.logger.info("Recognition completed, dtmf tones =%s", dtmf_tones) + global words_to_numbers + numbers = "".join(str(words_to_numbers [x]) for x in dtmf_tones) + regex = re.compile(DOB_REGEX) + match = regex.search(numbers) + if match: + await resume_transcription_and_recording(call_connection_id, recording_id) + else: + await handle_recognize(INCORRECT_DOB_PROMPT, caller_id, call_connection_id, INCORRECT_DOB_CONTEXT) + elif event.type == "Microsoft.Communication.RecognizeFailed": + resultInformation = event.data['resultInformation'] + app.logger.info("Recognize failed event received: message=%s, sub code=%s", resultInformation['message'],resultInformation['subCode']) + reasonCode = resultInformation['subCode'] + context=event.data['operationContext'] + global max_retry + if reasonCode == 8510 and 0 < max_retry: + await handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id, context="retryContext") + max_retry -= 1 + else: + await handle_play(call_connection_id=call_connection_id,text_to_play=GOODBYE_PROMPT, context=GOODBYE_CONTEXT) + elif event.type == "Microsoft.Communication.AddParticipantFailed": + resultInformation = event.data['resultInformation'] + app.logger.info("Received Add Participants Failed message=%s, sub code=%s",resultInformation['message'],resultInformation['subCode']) + await handle_play(call_connection_id=call_connection_id,text_to_play=ADD_PARTICIPANT_FAILURE_PROMPT, context=ADD_PARTICIPANT_FAILURE_CONTEXT) + elif event.type == "Microsoft.Communication.RecordingStateChanged": + app.logger.info("Received RecordingStateChanged event.") + app.logger.info(event.data['state']) + elif event.type == "Microsoft.Communication.TranscriptionStarted": + app.logger.info("Received TranscriptionStarted event.") + operation_context = None + if 'operationContext' in event.data: + operation_context = event.data['operationContext'] + + if operation_context is None: + await call_automation_client.get_call_connection(event.data['callConnectionId']).stop_transcription(operation_context="nextRecognizeContext") + elif operation_context is not None and operation_context == 'StartTranscriptionContext': + await handle_play(event.data['callConnectionId'], ADD_AGENT_PROMPT, ADD_AGENT_CONTEXT) + + elif event.type == "Microsoft.Communication.TranscriptionStopped": + app.logger.info("Received TranscriptionStopped event.") + operation_context = None + if 'operationContext' in event.data: + operation_context = event.data['operationContext'] + + if operation_context is not None and operation_context == 'nextRecognizeContext': + await handle_recognize(HELP_IVR_PROMPT,caller_id,call_connection_id,context="hellocontext") + + elif event.type == "Microsoft.Communication.TranscriptionUpdated": + app.logger.info("Received TranscriptionUpdated event.") + transcriptionUpdate = event.data['transcriptionUpdate'] + app.logger.info(transcriptionUpdate["transcriptionStatus"]) + app.logger.info(transcriptionUpdate["transcriptionStatusDetails"]) + elif event.type == "Microsoft.Communication.TranscriptionFailed": + app.logger.info("Received TranscriptionFailed event.") + resultInformation = event.data['resultInformation'] + app.logger.info("Encountered error during Transcription, message=%s, code=%s, subCode=%s", + resultInformation['message'], + resultInformation['code'], + resultInformation['subCode']) + return Response(status=200) except Exception as ex: - app.logger.error(f"WebSocket error: {ex}") - -async def start_conversation(): - response = await http_client.post("https://directline.botframework.com/v3/directline/conversations") - response.raise_for_status() - return response.json() - -async def listen_to_bot_websocket(stream_url, call_connection): - async with AsyncClient() as ws_client: - async with ws_client.stream("GET", stream_url) as websocket: - async for message in websocket.aiter_text(): - bot_activity = extract_latest_bot_activity(message) - if bot_activity["type"] == "message": - await play_to_all(call_connection.get_call_media(), bot_activity["text"]) - elif bot_activity["type"] == "endOfConversation": - await call_connection.hang_up(is_for_everyone=True) - -async def send_message(conversation_id, message): - payload = { - "type": "message", - "from": {"id": "user1"}, - "text": message - } - await http_client.post( - f"https://directline.botframework.com/v3/directline/conversations/{conversation_id}/activities", - json=payload - ) + app.logger.info("error in event handling") -def extract_latest_bot_activity(raw_message): +@app.route('/api/recordingFileStatus', methods=['POST']) +async def recording_file_status(): try: - activities = json.loads(raw_message).get("activities", []) - for activity in reversed(activities): - if activity["type"] == "message" and activity["from"]["id"] != "user1": - return {"type": "message", "text": activity.get("text", "")} - elif activity["type"] == "endOfConversation": - return {"type": "endOfConversation"} + for event_dict in await request.json: + event = EventGridEvent.from_dict(event_dict) + if event.event_type == SystemEventNames.EventGridSubscriptionValidationEventName: + code = event.data['validationCode'] + if code: + data = {"validationResponse": code} + app.logger.info("Successfully Subscribed EventGrid.ValidationEvent --> " + str(data)) + return Response(response=str(data), status=200) + + if event.event_type == SystemEventNames.AcsRecordingFileStatusUpdatedEventName: + acs_recording_file_status_updated_event_data = event.data + acs_recording_chunk_info_properties = acs_recording_file_status_updated_event_data['recordingStorageInfo']['recordingChunks'][0] + app.logger.info("acsRecordingChunkInfoProperties response data --> " + str(acs_recording_chunk_info_properties)) + global content_location + content_location = acs_recording_chunk_info_properties['contentLocation'] + return Response(response="Ok") + except Exception as ex: - app.logger.error(f"Error parsing bot activity: {ex}") - return {"type": "error", "text": "Something went wrong"} + app.logger.error( "Failed to get recording file") + return Response(response='Failed to get recording file', status=400) + +@app.route('/download') +async def download_recording(): + try: + app.logger.info("Content location : %s", content_location) + downloads_folder = str(Path.home() / "Downloads") + file_path = os.path.join(downloads_folder, "Recording_File.mp4") + + recording_data = await call_automation_client.download_recording(content_location) + with open(file_path, "wb") as binary_file: + binary_file.write(await recording_data.read()) + return redirect("/") + except Exception as ex: + app.logger.info("Failed to download recording --> " + str(ex)) + return Response(text=str(ex), status=500) + +async def initiate_transcription(call_connection_id): + app.logger.info("initiate_transcription is called %s", call_connection_id) + call_connection = call_automation_client.get_call_connection(call_connection_id) + await call_connection.start_transcription(locale=LOCALE, operation_context="StartTranscriptionContext") + app.logger.info("Starting the transcription") + +async def stop_transcription_and_recording(call_connection_id, recording_id): + app.logger.info("stop_transcription_and_recording method triggered.") + call_properties = await call_automation_client.get_call_connection(call_connection_id).get_call_properties() + recording_properties = await call_automation_client.get_recording_properties(recording_id) + if call_properties.transcription_subscription.state == 'active': + await call_automation_client.get_call_connection(call_connection_id).stop_transcription() + if recording_properties.recording_state == "active": + await call_automation_client.stop_recording(recording_id=recording_id) + +async def resume_transcription_and_recording(call_connection_id, recording_id): + await initiate_transcription(call_connection_id) + app.logger.info("Transcription re initiated.") + + await call_automation_client.resume_recording(recording_id) + app.logger.info(f"Recording resumed. RecordingId: {recording_id}") + + # WebSocket. +@app.websocket('/ws') +async def ws(): + print("Client connected to WebSocket") + try: + while True: + try: + # Receive data from the client + message = await websocket.receive() + await process_websocket_message_async(message) + except Exception as e: + print(f"Error while receiving message: {e}") + break # Close connection on error + except Exception as e: + print(f"WebSocket connection closed: {e}") + finally: + # Any cleanup or final logs can go here + print("WebSocket connection closed") -async def play_to_all(call_media, message): - ssml_source = SsmlSource(f"{message}") - play_options = PlayToAllOptions(ssml_source) - await call_media.play_to_all(play_options) +@app.route('/') +async def index_handler(): + return await render_template("index.html") -if __name__ == "__main__": +if __name__ == '__main__': + app.logger.setLevel(INFO) app.run(port=8080) \ No newline at end of file diff --git a/callautomation-live-transcription/requirements.txt b/callautomation-live-transcription/requirements.txt index 5ed774f..3ff0d72 100644 --- a/callautomation-live-transcription/requirements.txt +++ b/callautomation-live-transcription/requirements.txt @@ -1,6 +1,4 @@ -quart==0.19.6 -azure-communication-callautomation==1.3.0b1 +Quart>=0.19.6 azure-eventgrid==4.11.0 -azure-core==1.28.0 -requests==2.31.0 -websockets==11.0.3 \ No newline at end of file +aiohttp>= 3.11.9 +azure-communication-callautomation==1.3.0b1 \ No newline at end of file From 8c9320e7dee0e786579d2c63708fe0f3aa099d77 Mon Sep 17 00:00:00 2001 From: Yamini Pradhan Date: Wed, 16 Apr 2025 18:41:19 -0400 Subject: [PATCH 6/7] remove extra line from live transcription sample --- callautomation-live-transcription/main.py | 2 +- callautomation-live-transcription/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/callautomation-live-transcription/main.py b/callautomation-live-transcription/main.py index 14c0e95..1fd7c5b 100644 --- a/callautomation-live-transcription/main.py +++ b/callautomation-live-transcription/main.py @@ -369,4 +369,4 @@ async def index_handler(): if __name__ == '__main__': app.logger.setLevel(INFO) - app.run(port=8080) \ No newline at end of file + app.run(port=8080) diff --git a/callautomation-live-transcription/requirements.txt b/callautomation-live-transcription/requirements.txt index 3ff0d72..0f35282 100644 --- a/callautomation-live-transcription/requirements.txt +++ b/callautomation-live-transcription/requirements.txt @@ -1,4 +1,4 @@ Quart>=0.19.6 azure-eventgrid==4.11.0 aiohttp>= 3.11.9 -azure-communication-callautomation==1.3.0b1 \ No newline at end of file +azure-communication-callautomation==1.3.0b1 From 57180129d679cef0db5658ae7ddedfc9aeb8c7fd Mon Sep 17 00:00:00 2001 From: Yamini Pradhan Date: Wed, 16 Apr 2025 18:46:24 -0400 Subject: [PATCH 7/7] upgrade to latest call automation version --- callautomation-mcs-sample/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/callautomation-mcs-sample/requirements.txt b/callautomation-mcs-sample/requirements.txt index 3c83cd3..34c2c75 100644 --- a/callautomation-mcs-sample/requirements.txt +++ b/callautomation-mcs-sample/requirements.txt @@ -1,5 +1,5 @@ quart==0.20.0 -azure-communication-callautomation==1.3.0b1 +azure-communication-callautomation==1.4.0b1 azure-eventgrid==4.11.0 requests==2.31.0 websockets==11.0.3 \ No newline at end of file