Skip to content
82 changes: 40 additions & 42 deletions test/test_mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,32 @@ def on_lifecycle_disconnection(self, lifecycle_disconnect_data: mqtt5.LifecycleD

class Mqtt5ClientTest(NativeResourceTest):

def _create_tls_context(self):
cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT")
key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY")
return io.ClientTlsContext(
io.TlsContextOptions.create_client_with_mtls_from_path(cert, key))

def _create_client(
self,
client_options: mqtt5.ClientOptions = None,
callbacks: Mqtt5TestCallbacks = None):

default_host = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

if client_options is None:
client_options = mqtt5.ClientOptions(
host_name=default_host,
port=8883
)
port=8883,
tls_ctx=self._create_tls_context())

if (client_options.host_name == _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") and
client_options.tls_ctx is None):
client_options.tls_ctx = self._create_tls_context()

if client_options.port is None:
client_options.port = 8883

if client_options.connect_options is None:
client_options.connect_options = mqtt5.ConnectPacket()
client_options.connect_options.client_id = create_client_id()
Expand Down Expand Up @@ -198,12 +213,10 @@ def test_client_creation_maximum(self):
# ==============================================================

def _test_direct_connect_minimum(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port
host_name=input_host_name
)
callbacks = Mqtt5TestCallbacks()
client = self._create_client(client_options=client_options, callbacks=callbacks)
Expand Down Expand Up @@ -321,8 +334,7 @@ def test_direct_connect_http_proxy_tls(self):
test_retry_wrapper(self._test_direct_connect_http_proxy_tls)

def _test_direct_connect_maximum(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

user_properties = []
user_properties.append(mqtt5.UserProperty(name="name1", value="value1"))
Expand Down Expand Up @@ -355,7 +367,7 @@ def _test_direct_connect_maximum(self):
)
client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port,
port=8883,
connect_options=connect_options,
session_behavior=mqtt5.ClientSessionBehaviorType.CLEAN,
extended_validation_and_flow_control_options=mqtt5.ExtendedValidationAndFlowControlOptions.AWS_IOT_CORE_DEFAULTS,
Expand Down Expand Up @@ -564,7 +576,7 @@ def test_connect_with_invalid_host_name(self):
callbacks.future_stopped.result(TIMEOUT)

def test_connect_with_invalid_port(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")
client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=444
Expand All @@ -577,7 +589,7 @@ def test_connect_with_invalid_port(self):
callbacks.future_stopped.result(TIMEOUT)

def test_connect_with_invalid_port_for_websocket_connection(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_WS_MQTT_HOST")
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")
client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=1883
Expand Down Expand Up @@ -628,14 +640,12 @@ def test_connect_with_incorrect_basic_authentication_credentials(self):
# test_websocket_handshake_failure : tested in the SDK

def test_double_client_id_failure(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")
shared_client_id = create_client_id()

connect_options = mqtt5.ConnectPacket(client_id=shared_client_id)
client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port,
connect_options=connect_options
)
callbacks = Mqtt5TestCallbacks()
Expand Down Expand Up @@ -893,15 +903,13 @@ def test_negative_subscribe_packet_properties(self):
# ==============================================================

def _test_negotiated_settings_minimal_settings(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

connect_options = mqtt5.ConnectPacket(
session_expiry_interval_sec=600000
)
client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port,
connect_options=connect_options
)
callbacks = Mqtt5TestCallbacks()
Expand All @@ -910,7 +918,7 @@ def _test_negotiated_settings_minimal_settings(self):
callbacks.future_connection_success.result(TIMEOUT)

self.assertIsNotNone(callbacks.negotiated_settings)
self.assertEqual(callbacks.negotiated_settings.session_expiry_interval_sec, 600000)
self.assertEqual(callbacks.negotiated_settings.session_expiry_interval_sec, 3600)

client.stop()
callbacks.future_stopped.result(TIMEOUT)
Expand All @@ -919,8 +927,7 @@ def test_negotiated_settings_minimal_settings(self):
test_retry_wrapper(self._test_negotiated_settings_minimal_settings)

def _test_negotiated_settings_maximum_settings(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

client_id = create_client_id()
connect_options = mqtt5.ConnectPacket(
Expand All @@ -931,7 +938,6 @@ def _test_negotiated_settings_maximum_settings(self):

client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port,
connect_options=connect_options
)
callbacks = Mqtt5TestCallbacks()
Expand All @@ -945,13 +951,13 @@ def _test_negotiated_settings_maximum_settings(self):
self.assertEqual(callbacks.negotiated_settings.session_expiry_interval_sec, 600)
self.assertEqual(callbacks.negotiated_settings.server_keep_alive_sec, 1000)
self.assertEqual(callbacks.negotiated_settings.maximum_qos, mqtt5.QoS.AT_LEAST_ONCE)
self.assertEqual(callbacks.negotiated_settings.receive_maximum_from_server, 10)
self.assertEqual(callbacks.negotiated_settings.maximum_packet_size_to_server, 268435460)
self.assertEqual(callbacks.negotiated_settings.topic_alias_maximum_to_server, 10)
self.assertEqual(callbacks.negotiated_settings.receive_maximum_from_server, 100)
self.assertEqual(callbacks.negotiated_settings.maximum_packet_size_to_server, 149504)
self.assertEqual(callbacks.negotiated_settings.topic_alias_maximum_to_server, 8)
self.assertEqual(callbacks.negotiated_settings.topic_alias_maximum_to_client, 0)
self.assertTrue(callbacks.negotiated_settings.retain_available)
self.assertTrue(callbacks.negotiated_settings.wildcard_subscriptions_available)
self.assertTrue(callbacks.negotiated_settings.subscription_identifiers_available)
self.assertFalse(callbacks.negotiated_settings.subscription_identifiers_available)
self.assertTrue(callbacks.negotiated_settings.shared_subscriptions_available)
self.assertFalse(callbacks.negotiated_settings.rejoined_session)
self.assertEqual(callbacks.negotiated_settings.client_id, client_id)
Expand All @@ -963,8 +969,7 @@ def test_negotiated_settings_maximum_settings(self):
test_retry_wrapper(self._test_negotiated_settings_maximum_settings)

def _test_negotiated_settings_server_limit(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

uint32_max = 4294967295
uint16_max = 65535
Expand All @@ -978,7 +983,6 @@ def _test_negotiated_settings_server_limit(self):

client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port,
connect_options=connect_options
)
callbacks = Mqtt5TestCallbacks()
Expand All @@ -989,8 +993,8 @@ def _test_negotiated_settings_server_limit(self):
self.assertIsNotNone(callbacks.negotiated_settings)
self.assertNotEqual(callbacks.negotiated_settings.receive_maximum_from_server, uint16_max)
self.assertNotEqual(callbacks.negotiated_settings.maximum_packet_size_to_server, uint32_max)
self.assertEqual(callbacks.negotiated_settings.server_keep_alive_sec, uint16_max)
self.assertEqual(callbacks.negotiated_settings.session_expiry_interval_sec, uint32_max)
self.assertEqual(callbacks.negotiated_settings.server_keep_alive_sec, 1200)
self.assertEqual(callbacks.negotiated_settings.session_expiry_interval_sec, 3600)

client.stop()
callbacks.future_stopped.result(TIMEOUT)
Expand Down Expand Up @@ -1400,12 +1404,10 @@ def test_operation_publish_correlation_data_bytes_binary_precedence(self):
# ==============================================================

def _test_operation_error_null_publish(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port
host_name=input_host_name
)
callbacks = Mqtt5TestCallbacks()
client = self._create_client(client_options=client_options, callbacks=callbacks)
Expand All @@ -1422,12 +1424,10 @@ def test_operation_error_null_publish(self):
test_retry_wrapper(self._test_operation_error_null_publish)

def _test_operation_error_null_subscribe(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port
host_name=input_host_name
)
callbacks = Mqtt5TestCallbacks()
client = self._create_client(client_options=client_options, callbacks=callbacks)
Expand All @@ -1444,12 +1444,10 @@ def test_operation_error_null_subscribe(self):
test_retry_wrapper(self._test_operation_error_null_subscribe)

def _test_operation_error_null_unsubscribe(self):
input_host_name = _get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"))
input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST")

client_options = mqtt5.ClientOptions(
host_name=input_host_name,
port=input_port
host_name=input_host_name
)
callbacks = Mqtt5TestCallbacks()
client = self._create_client(client_options=client_options, callbacks=callbacks)
Expand Down