-
Notifications
You must be signed in to change notification settings - Fork 54
Open
Description
Hi,
Recently I am trying to build a voice AI agent with ACS. I tried to follow the intro from this example: https://github.com/Azure-Samples/communication-services-python-quickstarts/tree/main/callautomation-azure-openai-voice
However, I saw in the README, this example need a registered Phone number. But I thougt I can call with the created ACS user name right?
Besides, I don't have an Event Grid subscription yet. I only deploy my app to a Azure App Service to verify if it can work.
But I can only recieve the following events:
INFO: xxx:xxx:xxx:xxxx- "POST /api/callbacks/incomingCall HTTP/1.1" 200 OK
INFO:main:Received Event:-> Microsoft.Communication.CallDisconnected, Correlation Id:-> 447dd7fd-0b4b-4279-b609-08792e7295cd, CallConnectionId:-> 07004c80-34ed-4107-bfbe-c93d89e77abd
INFO: xxx:xxx:xxx:xxxx - "POST /api/callbacks/incomingCall HTTP/1.1" 200 OK
INFO:main:Received Event:-> Microsoft.Communication.CreateCallFailed, Correlation Id:-> 447dd7fd-0b4b-4279-b609-08792e7295cd, CallConnectionId:-> 07004c80-34ed-4107-bfbe-c93d89e77abdI will paste my code below:
from fastapi.responses import JSONResponse
from urllib.parse import urlencode, urlparse, urlunparse
import logging
import json
from azure.communication.callautomation import (
MediaStreamingOptions,
AudioFormat,
MediaStreamingContentType,
MediaStreamingAudioChannelType,
StreamingTransportType
)
from azure.communication.callautomation.aio import (
CallAutomationClient
)
import os
import uuid
import uvicorn
from dotenv import load_dotenv
load_dotenv()
ACS_HOST = os.environ.get("ACS_HOST")
ACS_KEY = os.environ.get("ACS_KEY")
# Callback events URI to handle callback events.
CALLBACK_URI_HOST = os.environ.get("CALLBACK_URI_HOST")
CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks"
ONE_ON_ONE_CALLER_MAP = {}
acs_client = CallAutomationClient(ACS_HOST, ACS_KEY)
app = FastAPI()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.get("/api/getOneOnOneCallUser")
async def get_one_on_one_call_user():
try:
# Create an identity
identity = acs_client.create_user()
logging.info("\nCreated an caller identity with ID: " + identity.properties['id'])
# Issue an access token with a validity of 24 hours and the "voip" scope for an identity
token_result = acs_client.get_token(identity, ["voip"])
logging.info("\nIssued an caller access token with 'voip' scope that expires at " + token_result.expires_on + ":")
logging.info(token_result.token)
# Create an identity
identity_acc = acs_client.create_user()
logging.info("\nCreated an accepter identity with ID: " + identity_acc.properties['id'])
# Issue an access token with a validity of 24 hours and the "voip" scope for an identity
token_result_acc = acs_client.get_token(identity_acc, ["voip"])
logging.info("\nIssued an accepter access token with 'voip' scope that expires at " + token_result_acc.expires_on + ":")
logging.info(token_result_acc.token)
id_str = uuid.uuid4()
data = {
"caller": {
"identity": identity,
"token": token_result
},
"accepter": {
"identity": identity_acc,
"token": token_result_acc
}
}
ONE_ON_ONE_CALLER_MAP[id_str] = data
return JSONResponse(content=data, status_code=200)
except Exception as e:
logger.error(f"Error creating users or tokens: {e}")
return JSONResponse(content={"error": "Failed to create users or tokens"}, status_code=500)
@app.post('/api/callbacks/incomingCall')
async def incoming_call_callbacks(request: Request):
event_list = await request.json()
for event in event_list:
# Parsing callback events
global call_connection_id
event_data = event['data']
call_connection_id = event_data["callConnectionId"]
logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}")
if event['type'] == "Microsoft.Communication.IncomingCall":
logger.info(f"Received IncomingCall event for connection id: {call_connection_id}")
logger.info("Start answering call")
incoming_call_response = await incoming_call_handler(event)
return incoming_call_response
return JSONResponse(content={}, status_code=200)
@app.post('/api/callbacks/{contextId}')
async def callbacks(contextId: str, request: Request):
event_list = await request.json()
for event in event_list:
# Parsing callback events
global call_connection_id
event_data = event['data']
call_connection_id = event_data["callConnectionId"]
logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}")
if event['type'] == "Microsoft.Communication.CallConnected":
call_connection_properties = await acs_client.get_call_connection(call_connection_id).get_call_properties()
media_streaming_subscription = call_connection_properties.media_streaming_subscription
logger.info(f"MediaStreamingSubscription:--> {media_streaming_subscription}")
logger.info(f"Received CallConnected event for connection id: {call_connection_id}")
logger.info("CORRELATION ID:--> %s", event_data["correlationId"])
logger.info("CALL CONNECTION ID:--> %s", event_data["callConnectionId"])
elif event['type'] == "Microsoft.Communication.MediaStreamingStarted":
logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
elif event['type'] == "Microsoft.Communication.MediaStreamingStopped":
logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
elif event['type'] == "Microsoft.Communication.MediaStreamingFailed":
logger.info(f"Code:->{event_data['resultInformation']['code']}, Subcode:-> {event_data['resultInformation']['subCode']}")
logger.info(f"Message:->{event_data['resultInformation']['message']}")
elif event['type'] == "Microsoft.Communication.CallDisconnected":
logger.info(f"Received CallDisconnected event for connection id: {call_connection_id}")
return JSONResponse(content={}, status_code=200)
# WebSocket.
@app.websocket('/ws')
async def ws(websocket: WebSocket):
await websocket.accept()
print("Client connected to WebSocket")
try:
while True:
# Receive data from the client
data = await websocket.receive_text()
print(f"Received data: {data}")
data = json.loads(data)
kind = data['kind']
if kind == "AudioData":
audio_data_section = data.get("audioData", {})
if not audio_data_section.get("silent", True):
audio_data = audio_data_section.get("data")
logging.info(f"Received audio data, type {type(data)}, length: {len(audio_data)}")
except WebSocketDisconnect:
print("WebSocket connection closed")
except Exception as e:
print(f"WebSocket error: {e}")
@app.get('/')
def home():
return 'Hello ACS CallAutomation!'
async def incoming_call_handler(event: dict):
caller_id = event["data"]['from']['rawId']
logger.info("incoming call handler caller id: %s", caller_id)
incoming_call_context=event.data['incomingCallContext']
logging.info(f"incoming call context:{incoming_call_context}")
guid =uuid.uuid4()
query_parameters = urlencode({"callerId": caller_id})
callback_uri = f"{CALLBACK_EVENTS_URI}/{guid}?{query_parameters}"
parsed_url = urlparse(CALLBACK_EVENTS_URI)
websocket_url = urlunparse(('wss',parsed_url.netloc,'/ws','', '', ''))
logger.info("callback url: %s", callback_uri)
logger.info("websocket url: %s", websocket_url)
media_streaming_options = MediaStreamingOptions(
transport_url=websocket_url,
transport_type=StreamingTransportType.WEBSOCKET,
content_type=MediaStreamingContentType.AUDIO,
audio_channel_type=MediaStreamingAudioChannelType.MIXED,
start_media_streaming=True,
enable_bidirectional=True,
audio_format=AudioFormat.PCM24_K_MONO)
answer_call_result = await acs_client.answer_call(
incoming_call_context=incoming_call_context,
operation_context="incomingCall",
callback_url=callback_uri,
media_streaming=media_streaming_options
)
logger.info("Answered call for connection id: %s", answer_call_result.call_connection_id)
return JSONResponse(content={}, status_code=200)
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels