From d68b34a2d6f7af3666715ed02659e2eef5056b4d Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Mon, 23 Feb 2026 15:11:57 -0600 Subject: [PATCH] Cloud event support for event grid --- azure/functions/__init__.py | 3 +- azure/functions/_abc.py | 48 +++++++++++++++++++++++ azure/functions/_eventgrid.py | 73 +++++++++++++++++++++++++++++++++++ azure/functions/eventgrid.py | 26 +++++++++++-- tests/test_eventgrid.py | 68 ++++++++++++++++++++++++++++++++ 5 files changed, 214 insertions(+), 4 deletions(-) diff --git a/azure/functions/__init__.py b/azure/functions/__init__.py index 8170a14e..cb6371da 100644 --- a/azure/functions/__init__.py +++ b/azure/functions/__init__.py @@ -3,7 +3,7 @@ from ._abc import TimerRequest, InputStream, Context, Out from ._eventhub import EventHubEvent -from ._eventgrid import EventGridEvent, EventGridOutputEvent +from ._eventgrid import CloudEvent, EventGridEvent, EventGridOutputEvent from ._cosmosdb import Document, DocumentList from ._http import HttpRequest, HttpResponse from .decorators import (FunctionApp, Function, Blueprint, @@ -54,6 +54,7 @@ # Binding rich types, sorted alphabetically. 'Document', 'DocumentList', + 'CloudEvent', 'EventGridEvent', 'EventGridOutputEvent', 'EventHubEvent', diff --git a/azure/functions/_abc.py b/azure/functions/_abc.py index 5812787a..2f5cb692 100644 --- a/azure/functions/_abc.py +++ b/azure/functions/_abc.py @@ -369,6 +369,54 @@ def data_version(self) -> str: pass +class CloudEvent(abc.ABC): + """A CloudEvents v1.0 event message.""" + + @property + @abc.abstractmethod + def id(self) -> str: + pass + + @property + @abc.abstractmethod + def source(self) -> str: + pass + + @property + @abc.abstractmethod + def type(self) -> str: + pass + + @property + @abc.abstractmethod + def specversion(self) -> str: + pass + + @property + @abc.abstractmethod + def time(self) -> typing.Optional[datetime.datetime]: + pass + + @property + @abc.abstractmethod + def subject(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def datacontenttype(self) -> typing.Optional[str]: + pass + + @property + @abc.abstractmethod + def dataschema(self) -> typing.Optional[str]: + pass + + @abc.abstractmethod + def get_json(self) -> typing.Any: + pass + + class Document(abc.ABC): @classmethod diff --git a/azure/functions/_eventgrid.py b/azure/functions/_eventgrid.py index 9ebc29cc..9eb40929 100644 --- a/azure/functions/_eventgrid.py +++ b/azure/functions/_eventgrid.py @@ -109,3 +109,76 @@ def __repr__(self) -> str: f'subject={self.subject} ' f'at 0x{id(self):0x}>' ) + + +class CloudEvent(azf_abc.CloudEvent): + """A CloudEvents v1.0 event message.""" + + def __init__(self, *, + id: str, + source: str, + type: str, + specversion: str, + data: typing.Optional[typing.Any], + time: typing.Optional[datetime.datetime] = None, + subject: typing.Optional[str] = None, + datacontenttype: typing.Optional[str] = None, + dataschema: typing.Optional[str] = None, + **extensions: typing.Any) -> None: + self.__id = id + self.__source = source + self.__type = type + self.__specversion = specversion + self.__data = data + self.__time = time + self.__subject = subject + self.__datacontenttype = datacontenttype + self.__dataschema = dataschema + self.__extensions = extensions + + @property + def id(self) -> str: + return self.__id + + @property + def source(self) -> str: + return self.__source + + @property + def type(self) -> str: + return self.__type + + @property + def specversion(self) -> str: + return self.__specversion + + @property + def time(self) -> typing.Optional[datetime.datetime]: + return self.__time + + @property + def subject(self) -> typing.Optional[str]: + return self.__subject + + @property + def datacontenttype(self) -> typing.Optional[str]: + return self.__datacontenttype + + @property + def dataschema(self) -> typing.Optional[str]: + return self.__dataschema + + def get_json(self) -> typing.Any: + return self.__data + + @property + def extension_attrs(self) -> typing.Dict[str, typing.Any]: + return dict(self.__extensions) + + def __repr__(self) -> str: + return ( + f'' + ) diff --git a/azure/functions/eventgrid.py b/azure/functions/eventgrid.py index 3cddd9e1..77ad180f 100644 --- a/azure/functions/eventgrid.py +++ b/azure/functions/eventgrid.py @@ -20,14 +20,17 @@ def check_input_type_annotation(cls, pytype: type) -> bool: """ Event Grid always sends an array and may send more than one event in the array. The runtime invokes function once for each array element, - thus no need to parse List[EventGridEvent] + thus no need to parse List[EventGridEvent]. + Accepts both EventGridEvent (Event Grid schema) and CloudEvent + (CloudEvents v1.0 schema). """ - valid_types = azf_eventgrid.EventGridEvent + valid_types = (azf_eventgrid.EventGridEvent, azf_eventgrid.CloudEvent) return isinstance(pytype, type) and issubclass(pytype, valid_types) @classmethod def decode(cls, data: meta.Datum, *, - trigger_metadata) -> azf_eventgrid.EventGridEvent: + trigger_metadata) -> Union[azf_eventgrid.EventGridEvent, + azf_eventgrid.CloudEvent]: data_type = data.type if data_type == 'json': @@ -36,6 +39,23 @@ def decode(cls, data: meta.Datum, *, raise NotImplementedError( f'unsupported event grid payload type: {data_type}') + if 'specversion' in body: + known = {'specversion', 'id', 'source', 'type', 'time', + 'subject', 'datacontenttype', 'dataschema', 'data'} + extensions = {k: v for k, v in body.items() if k not in known} + return azf_eventgrid.CloudEvent( + id=body.get('id'), + source=body.get('source'), + type=body.get('type'), + specversion=body.get('specversion'), + time=cls._parse_datetime(body.get('time')), + subject=body.get('subject'), + datacontenttype=body.get('datacontenttype'), + dataschema=body.get('dataschema'), + data=body.get('data'), + **extensions, + ) + return azf_eventgrid.EventGridEvent( id=body.get('id'), topic=body.get('topic'), diff --git a/tests/test_eventgrid.py b/tests/test_eventgrid.py index f1a08bda..09199b1b 100644 --- a/tests/test_eventgrid.py +++ b/tests/test_eventgrid.py @@ -18,6 +18,14 @@ def test_eventgrid_input_type(self): self.assertFalse(check_input_type(str)) self.assertFalse(check_input_type(bytes)) + def test_cloudevent_input_type(self): + check_input_type = azf_event_grid.EventGridEventInConverter.\ + check_input_type_annotation + self.assertTrue(check_input_type(func.CloudEvent)) + self.assertFalse(check_input_type(List[func.CloudEvent])) + self.assertFalse(check_input_type(str)) + self.assertFalse(check_input_type(bytes)) + def test_eventgrid_output_type(self): check_output_type = azf_event_grid.EventGridEventOutConverter.\ check_output_type_annotation @@ -148,6 +156,66 @@ def _generate_multiple_eventgrid_event(with_date=True): data_version='dataVersion', )] + def test_cloudevent_decode(self): + event = azf_event_grid.EventGridEventInConverter.decode( + data=self._generate_single_cloudevent_datum(), trigger_metadata=None + ) + self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent) + self.assertEqual(event.id, "A234-1234-1234") + self.assertEqual(event.source, "/mycontext/subcontext") + self.assertEqual(event.type, "com.example.type") + self.assertEqual(event.specversion, "1.0") + self.assertEqual(event.subject, "mysubject") + self.assertEqual(event.datacontenttype, "application/json") + self.assertIsNotNone(event.time) + self.assertIsNotNone(event.get_json()) + self.assertEqual(event.get_json().get("key"), "value") + + def test_cloudevent_decode_with_null_data(self): + event = azf_event_grid.EventGridEventInConverter.decode( + data=self._generate_single_cloudevent_datum(with_data=False), + trigger_metadata=None + ) + self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent) + self.assertEqual(event.id, "A234-1234-1234") + self.assertIsNone(event.get_json()) + + def test_cloudevent_decode_with_extensions(self): + event = azf_event_grid.EventGridEventInConverter.decode( + data=self._generate_single_cloudevent_datum(with_extensions=True), + trigger_metadata=None + ) + self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent) + self.assertEqual(event.extension_attrs.get("customext"), "extvalue") + + def test_eventgrid_schema_not_broken(self): + event = azf_event_grid.EventGridEventInConverter.decode( + data=self._generate_single_eventgrid_datum(), trigger_metadata=None + ) + self.assertIsInstance(event, azf_event_grid.azf_eventgrid.EventGridEvent) + self.assertEqual(event.id, "00010001-0001-0001-0001-000100010001") + self.assertEqual(event.topic, "/TestTopic/namespaces/test") + self.assertEqual(event.event_type, "captureFileCreated") + + @staticmethod + def _generate_single_cloudevent_datum(with_data=True, + with_extensions=False): + payload = { + "specversion": "1.0", + "type": "com.example.type", + "source": "/mycontext/subcontext", + "id": "A234-1234-1234", + "time": "2018-04-05T17:31:00Z", + "subject": "mysubject", + "datacontenttype": "application/json", + } + if with_data: + payload["data"] = {"key": "value"} + if with_extensions: + payload["customext"] = "extvalue" + import json as _json + return func.meta.Datum(_json.dumps(payload), 'json') + @staticmethod def _generate_single_eventgrid_str(in_bytes=False): string_representation = '{"id": "id", ' \