Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion azure/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from ._abc import TimerRequest, InputStream, Context, Out
from ._eventhub import EventHubEvent
from ._eventgrid import EventGridEvent, EventGridOutputEvent
from ._eventgrid import CloudEvent, EventGridEvent, EventGridOutputEvent
from ._cosmosdb import Document, DocumentList
from ._http import HttpRequest, HttpResponse
from .decorators import (FunctionApp, Function, Blueprint,
Expand Down Expand Up @@ -54,6 +54,7 @@
# Binding rich types, sorted alphabetically.
'Document',
'DocumentList',
'CloudEvent',
'EventGridEvent',
'EventGridOutputEvent',
'EventHubEvent',
Expand Down
48 changes: 48 additions & 0 deletions azure/functions/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,54 @@ def data_version(self) -> str:
pass


class CloudEvent(abc.ABC):
"""A CloudEvents v1.0 event message."""

@property
@abc.abstractmethod
def id(self) -> str:
pass

@property
@abc.abstractmethod
def source(self) -> str:
pass

@property
@abc.abstractmethod
def type(self) -> str:
pass

@property
@abc.abstractmethod
def specversion(self) -> str:
pass

@property
@abc.abstractmethod
def time(self) -> typing.Optional[datetime.datetime]:
pass

@property
@abc.abstractmethod
def subject(self) -> typing.Optional[str]:
pass

@property
@abc.abstractmethod
def datacontenttype(self) -> typing.Optional[str]:
pass

@property
@abc.abstractmethod
def dataschema(self) -> typing.Optional[str]:
pass

@abc.abstractmethod
def get_json(self) -> typing.Any:
pass


class Document(abc.ABC):

@classmethod
Expand Down
73 changes: 73 additions & 0 deletions azure/functions/_eventgrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,76 @@ def __repr__(self) -> str:
f'subject={self.subject} '
f'at 0x{id(self):0x}>'
)


class CloudEvent(azf_abc.CloudEvent):
"""A CloudEvents v1.0 event message."""

def __init__(self, *,
id: str,
source: str,
type: str,
specversion: str,
data: typing.Optional[typing.Any],
time: typing.Optional[datetime.datetime] = None,
subject: typing.Optional[str] = None,
datacontenttype: typing.Optional[str] = None,
dataschema: typing.Optional[str] = None,
**extensions: typing.Any) -> None:
self.__id = id
self.__source = source
self.__type = type
self.__specversion = specversion
self.__data = data
self.__time = time
self.__subject = subject
self.__datacontenttype = datacontenttype
self.__dataschema = dataschema
self.__extensions = extensions

@property
def id(self) -> str:
return self.__id

@property
def source(self) -> str:
return self.__source

@property
def type(self) -> str:
return self.__type

@property
def specversion(self) -> str:
return self.__specversion

@property
def time(self) -> typing.Optional[datetime.datetime]:
return self.__time

@property
def subject(self) -> typing.Optional[str]:
return self.__subject

@property
def datacontenttype(self) -> typing.Optional[str]:
return self.__datacontenttype

@property
def dataschema(self) -> typing.Optional[str]:
return self.__dataschema

def get_json(self) -> typing.Any:
return self.__data

@property
def extension_attrs(self) -> typing.Dict[str, typing.Any]:
return dict(self.__extensions)

def __repr__(self) -> str:
return (
f'<azure.CloudEvent id={self.id} '
f'source={self.source} '
f'type={self.type} '
f'at 0x{id(self):0x}>'
)
26 changes: 23 additions & 3 deletions azure/functions/eventgrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ def check_input_type_annotation(cls, pytype: type) -> bool:
"""
Event Grid always sends an array and may send more than one event in
the array. The runtime invokes function once for each array element,
thus no need to parse List[EventGridEvent]
thus no need to parse List[EventGridEvent].
Accepts both EventGridEvent (Event Grid schema) and CloudEvent
(CloudEvents v1.0 schema).
"""
valid_types = azf_eventgrid.EventGridEvent
valid_types = (azf_eventgrid.EventGridEvent, azf_eventgrid.CloudEvent)
return isinstance(pytype, type) and issubclass(pytype, valid_types)

@classmethod
def decode(cls, data: meta.Datum, *,
trigger_metadata) -> azf_eventgrid.EventGridEvent:
trigger_metadata) -> Union[azf_eventgrid.EventGridEvent,
azf_eventgrid.CloudEvent]:
data_type = data.type

if data_type == 'json':
Expand All @@ -36,6 +39,23 @@ def decode(cls, data: meta.Datum, *,
raise NotImplementedError(
f'unsupported event grid payload type: {data_type}')

if 'specversion' in body:
known = {'specversion', 'id', 'source', 'type', 'time',
'subject', 'datacontenttype', 'dataschema', 'data'}
extensions = {k: v for k, v in body.items() if k not in known}
return azf_eventgrid.CloudEvent(
id=body.get('id'),
source=body.get('source'),
type=body.get('type'),
specversion=body.get('specversion'),
time=cls._parse_datetime(body.get('time')),
subject=body.get('subject'),
datacontenttype=body.get('datacontenttype'),
dataschema=body.get('dataschema'),
data=body.get('data'),
**extensions,
)

return azf_eventgrid.EventGridEvent(
id=body.get('id'),
topic=body.get('topic'),
Expand Down
68 changes: 68 additions & 0 deletions tests/test_eventgrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ def test_eventgrid_input_type(self):
self.assertFalse(check_input_type(str))
self.assertFalse(check_input_type(bytes))

def test_cloudevent_input_type(self):
check_input_type = azf_event_grid.EventGridEventInConverter.\
check_input_type_annotation
self.assertTrue(check_input_type(func.CloudEvent))
self.assertFalse(check_input_type(List[func.CloudEvent]))
self.assertFalse(check_input_type(str))
self.assertFalse(check_input_type(bytes))

def test_eventgrid_output_type(self):
check_output_type = azf_event_grid.EventGridEventOutConverter.\
check_output_type_annotation
Expand Down Expand Up @@ -148,6 +156,66 @@ def _generate_multiple_eventgrid_event(with_date=True):
data_version='dataVersion',
)]

def test_cloudevent_decode(self):
event = azf_event_grid.EventGridEventInConverter.decode(
data=self._generate_single_cloudevent_datum(), trigger_metadata=None
)
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent)
self.assertEqual(event.id, "A234-1234-1234")
self.assertEqual(event.source, "/mycontext/subcontext")
self.assertEqual(event.type, "com.example.type")
self.assertEqual(event.specversion, "1.0")
self.assertEqual(event.subject, "mysubject")
self.assertEqual(event.datacontenttype, "application/json")
self.assertIsNotNone(event.time)
self.assertIsNotNone(event.get_json())
self.assertEqual(event.get_json().get("key"), "value")

def test_cloudevent_decode_with_null_data(self):
event = azf_event_grid.EventGridEventInConverter.decode(
data=self._generate_single_cloudevent_datum(with_data=False),
trigger_metadata=None
)
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent)
self.assertEqual(event.id, "A234-1234-1234")
self.assertIsNone(event.get_json())

def test_cloudevent_decode_with_extensions(self):
event = azf_event_grid.EventGridEventInConverter.decode(
data=self._generate_single_cloudevent_datum(with_extensions=True),
trigger_metadata=None
)
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.CloudEvent)
self.assertEqual(event.extension_attrs.get("customext"), "extvalue")

def test_eventgrid_schema_not_broken(self):
event = azf_event_grid.EventGridEventInConverter.decode(
data=self._generate_single_eventgrid_datum(), trigger_metadata=None
)
self.assertIsInstance(event, azf_event_grid.azf_eventgrid.EventGridEvent)
self.assertEqual(event.id, "00010001-0001-0001-0001-000100010001")
self.assertEqual(event.topic, "/TestTopic/namespaces/test")
self.assertEqual(event.event_type, "captureFileCreated")

@staticmethod
def _generate_single_cloudevent_datum(with_data=True,
with_extensions=False):
payload = {
"specversion": "1.0",
"type": "com.example.type",
"source": "/mycontext/subcontext",
"id": "A234-1234-1234",
"time": "2018-04-05T17:31:00Z",
"subject": "mysubject",
"datacontenttype": "application/json",
}
if with_data:
payload["data"] = {"key": "value"}
if with_extensions:
payload["customext"] = "extvalue"
import json as _json
return func.meta.Datum(_json.dumps(payload), 'json')

@staticmethod
def _generate_single_eventgrid_str(in_bytes=False):
string_representation = '{"id": "id", ' \
Expand Down