From abc47207ee20901809081531d5a038018d10ff44 Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Wed, 27 May 2020 23:19:50 +0000 Subject: [PATCH] Add integration test suite --- .github/workflows/pythonapp.yml | 11 +- buildspec.yml | 3 +- tests/integration/__init__.py | 10 + tests/integration/conftest.py | 26 + tests/integration/constants.py | 23 + tests/integration/integration_test_base.py | 94 +++ tests/integration/setup.cfg | 5 + tests/integration/test_session_management.py | 114 +++ tests/integration/test_statement_execution.py | 652 ++++++++++++++++++ 9 files changed, 935 insertions(+), 3 deletions(-) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/constants.py create mode 100644 tests/integration/integration_test_base.py create mode 100644 tests/integration/setup.cfg create mode 100644 tests/integration/test_session_management.py create mode 100644 tests/integration/test_statement_execution.py diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index 0adcb14..5fb0ced 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -15,9 +15,15 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix : - os: [ubuntu-latest, macos-latest, windows-latest] + os: [ubuntu-latest] python-version: [3.6, 3.7, 3.8] steps: + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v1 @@ -38,4 +44,5 @@ jobs: - name: Test with pytest run: | pip install pytest - pytest + pytest tests/unit + pytest tests/integration --ledger_suffix ${{ matrix.python-version }}-${{ matrix.os }} diff --git a/buildspec.yml b/buildspec.yml index 1447974..628e390 100644 --- a/buildspec.yml +++ b/buildspec.yml @@ -20,7 +20,8 @@ phases: - pip install -e . build: commands: - - pytest + - pytest tests/unit/ + - pytest tests/integration/ -rfs post_build: commands: - python setup.py sdist bdist_wheel diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..62e7899 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,10 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with +# the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions +# and limitations under the License. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..20dd098 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,26 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with +# the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions +# and limitations under the License. +import pytest + + +def pytest_addoption(parser): + # Collect config values from cmd line or setup.cfg + parser.addoption('--region', action='store', help='') + parser.addoption( + "--ledger_suffix", action="store", default="", help="" + ) + + +@pytest.fixture(scope='class', autouse=True) +def config_variables(request): + # Set as class attribute on the invoking test context. + request.cls.region = request.config.getoption("--region") + request.cls.ledger_suffix = request.config.getoption("--ledger_suffix").replace(".", "-") \ No newline at end of file diff --git a/tests/integration/constants.py b/tests/integration/constants.py new file mode 100644 index 0000000..07b6f33 --- /dev/null +++ b/tests/integration/constants.py @@ -0,0 +1,23 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with +# the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions +# and limitations under the License. + + +TABLE_NAME = "PythonIntegrationTestTable" +CREATE_TABLE_NAME = "PythonIntegrationTestCreateTable" + +INDEX_ATTRIBUTE = "Name" +COLUMN_NAME = "Name" + +SINGLE_DOCUMENT_VALUE = "SingleDocumentValue" +MULTIPLE_DOCUMENT_VALUE_1 = "MultipleDocumentValue1" +MULTIPLE_DOCUMENT_VALUE_2 = "MultipleDocumentValue2" + +LEDGER_NAME = "Pytest" diff --git a/tests/integration/integration_test_base.py b/tests/integration/integration_test_base.py new file mode 100644 index 0000000..c5c5bce --- /dev/null +++ b/tests/integration/integration_test_base.py @@ -0,0 +1,94 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with +# the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions +# and limitations under the License. +from logging import basicConfig, getLogger, INFO +from threading import Thread +from time import sleep + +import boto3 +from botocore.exceptions import ClientError +from pyqldb.driver.pooled_qldb_driver import DEFAULT_TIMEOUT_SECONDS, PooledQldbDriver + +logger = getLogger(__name__) +basicConfig(level=INFO) + + +class IntegrationTestBase: + + def __init__(self, ledger_name, region): + self.ledger_name = ledger_name + self.region = region + session = boto3.Session() + self.qldb = session.client(service_name="qldb", region_name=self.region) + + def force_delete_ledger(self): + try: + logger.info("Deleting ledger %s", self.ledger_name) + self.qldb.update_ledger(Name=self.ledger_name, DeletionProtection=False) + self.delete_ledger() + except ClientError as ce: + logger.warning("Encountered an error while force deleting ledger %s: %s", self.ledger_name, ce) + + def delete_ledger(self): + logger.info("Deleting ledger %s", self.ledger_name) + self.qldb.update_ledger(Name=self.ledger_name, DeletionProtection=False) + self.qldb.delete_ledger(Name=self.ledger_name) + self.wait_for_deletion() + + def create_ledger(self): + logger.info("Creating ledger named: {}...".format(self.ledger_name)) + self.qldb.create_ledger(Name=self.ledger_name, PermissionsMode='ALLOW_ALL') + self.wait_for_active() + + def wait_for_active(self): + logger.info('Waiting for ledger to become active...') + while True: + result = self.qldb.describe_ledger(Name=self.ledger_name) + if result.get('State') == "ACTIVE": + logger.info('Success. Ledger is active and ready to use.') + return result + logger.info('The ledger is still creating. Please wait...') + sleep(5) + + def wait_for_deletion(self): + logger.info('Waiting for ledger to be deleted...') + while True: + try: + self.qldb.describe_ledger(Name=self.ledger_name) + sleep(5) + logger.info('The ledger is still deleting. Please wait...') + except self.qldb.exceptions.ResourceNotFoundException: + logger.info('The ledger is deleted') + return + + def pooled_qldb_driver(self, ledger_name=None, pool_limit=0, time_out=DEFAULT_TIMEOUT_SECONDS, retry_limit=4): + if ledger_name is not None: + ledger_name = ledger_name + else: + ledger_name = self.ledger_name + + return PooledQldbDriver(ledger_name=ledger_name, region_name=self.region, pool_limit=pool_limit, + timeout=time_out, retry_limit=retry_limit) + + +class ThreadThatSavesException(Thread): + """ + Extend the Python Thread library to pass in a queue for saving exceptions. + """ + + def __init__(self, target, bucket, args=()): + Thread.__init__(self, target=target, args=args) + self.bucket = bucket + + def run(self): + try: + Thread.run(self) + except Exception as e: + self.bucket.put(e) diff --git a/tests/integration/setup.cfg b/tests/integration/setup.cfg new file mode 100644 index 0000000..f38d489 --- /dev/null +++ b/tests/integration/setup.cfg @@ -0,0 +1,5 @@ +# Configuration for pytest +[tool:pytest] +xfail_strict = true +addopts = + --region us-east-2 diff --git a/tests/integration/test_session_management.py b/tests/integration/test_session_management.py new file mode 100644 index 0000000..705de25 --- /dev/null +++ b/tests/integration/test_session_management.py @@ -0,0 +1,114 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with +# the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions +# and limitations under the License. +import pytest +from queue import Queue +from unittest import TestCase + +from .constants import * +from .integration_test_base import IntegrationTestBase, ThreadThatSavesException + +from botocore.exceptions import ClientError +from pyqldb.errors import DriverClosedError, SessionPoolEmptyError + + +@pytest.mark.usefixtures("config_variables") +class TestSessionManagement(TestCase): + + @classmethod + def setUpClass(cls): + cls.integration_test_base = IntegrationTestBase(LEDGER_NAME+cls.ledger_suffix, cls.region) + cls.integration_test_base.force_delete_ledger() + cls.integration_test_base.create_ledger() + + @classmethod + def tearDownClass(cls): + cls.integration_test_base.delete_ledger() + + def test_connect_to_non_existent_ledger(self): + with self.integration_test_base.pooled_qldb_driver("nonExistentLedger") as pooled_qldb_driver: + self.assertRaises(ClientError, pooled_qldb_driver.get_session) + + def test_get_session_when_pool_does_not_have_session_and_has_not_hit_limit(self): + # Start a pooled driver with default pool limit so it doesn't have sessions in the pool + # and has not hit the limit. + with self.integration_test_base.pooled_qldb_driver() as pooled_qldb_driver: + try: + session = pooled_qldb_driver.get_session() + session.list_tables() + except ClientError as e: + self.fail(repr(e)) + + def test_get_session_when_pool_has_session_and_has_not_hit_limit(self): + try: + # Start a pooled driver with default pool limit so it doesn't have sessions in the pool + # and has not hit the limit. + with self.integration_test_base.pooled_qldb_driver() as pooled_qldb_driver: + # Call the first list_tables() to start session and put into pool. + session = pooled_qldb_driver.get_session() + session.list_tables() + session.close() + + # Call the second list_tables() to use session from pool and is expected to execute successfully. + session = pooled_qldb_driver.get_session() + session.list_tables() + except ClientError as e: + self.fail(repr(e)) + + def test_get_session_when_pool_does_not_have_session_and_has_hit_limit(self): + # Saving exceptions in a queue because Python threads execute in their own stack. + bucket = Queue() + + # With the time out set to 1 ms, only one thread should go through. + # The other thread will try to acquire the session, but because it can wait for only 1ms, it will error out. + with self.integration_test_base.pooled_qldb_driver(pool_limit=1, time_out=0.001) as pooled_qldb_driver: + thread_1 = ThreadThatSavesException(target=pooled_qldb_driver.get_session, bucket=bucket) + thread_2 = ThreadThatSavesException(target=pooled_qldb_driver.get_session, bucket=bucket) + + thread_1.start() + thread_2.start() + + thread_1.join() + thread_2.join() + + self.assertEqual(1, bucket.qsize()) + self.assertIsInstance(bucket.get(), SessionPoolEmptyError) + + def test_get_session_when_pool_does_not_have_session_and_has_hit_limit_and_session_is_returned_to_pool(self): + # Saving exceptions in a queue because Python threads execute in their own stack. + bucket = Queue() + + # Start a pooled driver with pool limit of 1 and default timeout of 30 seconds. + with self.integration_test_base.pooled_qldb_driver(pool_limit=1) as pooled_qldb_driver: + # Start two threads to get a session concurrently which will hit the session pool limit but + # will succeed because the session is returned to pool before timing out. + thread_1 = ThreadThatSavesException(target=get_session_and_return_to_pool, bucket=bucket, + args=(pooled_qldb_driver,)) + thread_2 = ThreadThatSavesException(target=get_session_and_return_to_pool, bucket=bucket, + args=(pooled_qldb_driver,)) + + thread_1.start() + thread_2.start() + + thread_1.join() + thread_2.join() + + self.assertEqual(0, bucket.qsize()) + + def test_get_session_when_driver_is_closed(self): + pooled_qldb_driver = self.integration_test_base.pooled_qldb_driver() + + pooled_qldb_driver.close() + self.assertRaises(DriverClosedError, pooled_qldb_driver.get_session) + + +def get_session_and_return_to_pool(pooled_qldb_driver): + session = pooled_qldb_driver.get_session() + session.close() diff --git a/tests/integration/test_statement_execution.py b/tests/integration/test_statement_execution.py new file mode 100644 index 0000000..c085fe8 --- /dev/null +++ b/tests/integration/test_statement_execution.py @@ -0,0 +1,652 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with +# the License. A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions +# and limitations under the License. +import pytest +from unittest import TestCase + +from amazon.ion.simple_types import IonPyNull +from amazon.ion.simpleion import dumps, loads +from botocore.exceptions import ClientError +from pyqldb.errors import is_occ_conflict_exception + +from .constants import CREATE_TABLE_NAME, COLUMN_NAME, MULTIPLE_DOCUMENT_VALUE_1, MULTIPLE_DOCUMENT_VALUE_2, \ + INDEX_ATTRIBUTE, LEDGER_NAME, SINGLE_DOCUMENT_VALUE, TABLE_NAME +from .integration_test_base import IntegrationTestBase + + +@pytest.mark.usefixtures("config_variables") +class TestStatementExecution(TestCase): + + @classmethod + def setUpClass(cls): + cls.integration_test_base = IntegrationTestBase(LEDGER_NAME+cls.ledger_suffix, cls.region) + cls.integration_test_base.force_delete_ledger() + cls.integration_test_base.create_ledger() + + cls.pooled_qldb_driver = cls.integration_test_base.pooled_qldb_driver() + + # Create table. + cls.pooled_qldb_driver.execute_statement("CREATE TABLE {}".format(TABLE_NAME)) + session = cls.pooled_qldb_driver.get_session() + session.list_tables() + + @classmethod + def tearDownClass(cls): + cls.pooled_qldb_driver.close() + cls.integration_test_base.delete_ledger() + + def tearDown(self): + # Delete all documents in table. + self.pooled_qldb_driver.execute_statement("DELETE FROM {}".format(TABLE_NAME)) + + def test_drop_an_existing_table(self): + # Given. + query = "CREATE TABLE {}".format(CREATE_TABLE_NAME) + + def execute_statement_and_return_count(txn, query): + cursor = txn.execute_statement(query) + count = 0 + for row in cursor: + count += 1 + return count + + create_table_count = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_count(txn, query)) + self.assertEqual(1, create_table_count) + + session = self.pooled_qldb_driver.get_session() + table_cursor = session.list_tables() + tables = list() + for row in table_cursor: + tables.append(row) + + self.assertTrue(CREATE_TABLE_NAME in tables) + + query = "DROP TABLE {}".format(CREATE_TABLE_NAME) + + def execute_statement_and_return_count(txn, query): + cursor = txn.execute_statement(query) + count = 0 + for row in cursor: + count += 1 + return count + + # When. + drop_table_count = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_count(txn, query)) + # Then. + self.assertEqual(1, drop_table_count) + + def test_list_tables(self): + # When. + session = self.pooled_qldb_driver.get_session() + cursor = session.list_tables() + + # Then. + tables = list() + for row in cursor: + tables.append(row) + + self.assertTrue(TABLE_NAME in tables) + self.assertTrue(len(tables) == 1) + + def test_create_table_that_already_exists(self): + # Given. + query = "CREATE TABLE {}".format(TABLE_NAME) + + # Then. + self.assertRaises(ClientError, self.pooled_qldb_driver.execute_statement, query) + + def test_create_index(self): + # Given. + query = "CREATE INDEX on {} ({})".format(TABLE_NAME, INDEX_ATTRIBUTE) + + def execute_statement_and_return_count(txn, query): + cursor = txn.execute_statement(query) + count = 0 + for row in cursor: + count += 1 + return count + + # When + count = self.pooled_qldb_driver.execute_lambda(lambda txn: execute_statement_and_return_count(txn, query)) + self.assertEqual(1, count) + + # Then + search_query = "SELECT VALUE indexes[0] FROM information_schema.user_tables WHERE status = 'ACTIVE' " \ + "AND name = '{}'".format(TABLE_NAME) + + def execute_statement_and_return_index_value(txn, query): + cursor = txn.execute_statement(query) + # Extract the index name by quering the information_schema. + # This gives: + # { + # expr: "[MyColumn]" + # } + return next(cursor)['expr'] + + value = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_index_value(txn, search_query)) + self.assertEqual("[" + INDEX_ATTRIBUTE + "]", value) + + def test_returns_empty_when_no_records_are_found(self): + # Given. + query = "SELECT * FROM {}".format(TABLE_NAME) + + def execute_statement_and_return_count(txn, query): + cursor = txn.execute_statement(query) + count = 0 + for row in cursor: + count += 1 + return count + + # When. + count = self.pooled_qldb_driver.execute_lambda(lambda txn: execute_statement_and_return_count(txn, query)) + + # Then. + self.assertEqual(0, count) + + def test_insert_document(self): + # Given. + # Create Ion struct to insert. + ion_value = loads(dumps({COLUMN_NAME: SINGLE_DOCUMENT_VALUE})) + + query = "INSERT INTO {} ?".format(TABLE_NAME) + + def execute_statement_with_parameter_and_return_count(txn, query, parameter): + cursor = txn.execute_statement(query, parameter) + count = 0 + for row in cursor: + count += 1 + return count + + # When. + count = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_with_parameter_and_return_count(txn, query, + ion_value)) + self.assertEqual(1, count) + + # Then. + search_query = "SELECT VALUE {} FROM {} WHERE {} = ?".format(COLUMN_NAME, TABLE_NAME, COLUMN_NAME) + ion_string = loads(dumps(SINGLE_DOCUMENT_VALUE)) + + def execute_statement_and_return_value(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + return next(cursor) + + value = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_and_return_value(txn, search_query, ion_string)) + self.assertEqual(SINGLE_DOCUMENT_VALUE, value) + + def test_read_single_field(self): + # Given. + # Create Ion struct to insert. + ion_value = loads(dumps({COLUMN_NAME: SINGLE_DOCUMENT_VALUE})) + + query = "INSERT INTO {} ?".format(TABLE_NAME) + + def execute_statement_with_parameter_and_return_count(txn, query, parameter): + cursor = txn.execute_statement(query, parameter) + count = 0 + for row in cursor: + count += 1 + return count + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_with_parameter_and_return_count(txn, query, ion_value)) + self.assertEqual(1, count) + + search_query = "SELECT VALUE {} FROM {} WHERE {} = ?".format(COLUMN_NAME, TABLE_NAME, COLUMN_NAME) + ion_string = loads(dumps(SINGLE_DOCUMENT_VALUE)) + + def execute_statement_and_return_value(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + return next(cursor) + + # When. + value = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_and_return_value(txn, search_query, ion_string)) + + # Then. + self.assertEqual(SINGLE_DOCUMENT_VALUE, value) + + def test_query_table_enclosed_in_quotes(self): + # Given. + # Create Ion struct to insert. + ion_value = loads(dumps({COLUMN_NAME: SINGLE_DOCUMENT_VALUE})) + + query = "INSERT INTO {} ?".format(TABLE_NAME) + + def execute_statement_with_parameter_and_return_count(txn, query, parameter): + cursor = txn.execute_statement(query, parameter) + count = 0 + for row in cursor: + count += 1 + return count + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_with_parameter_and_return_count(txn, query, ion_value)) + self.assertEqual(1, count) + + search_query = "SELECT VALUE {} FROM \"{}\" WHERE {} = ?".format(COLUMN_NAME, TABLE_NAME, COLUMN_NAME) + ion_string = loads(dumps(SINGLE_DOCUMENT_VALUE)) + + def execute_statement_and_return_value(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + return next(cursor) + + # When. + value = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_and_return_value(txn, search_query, ion_string)) + + # Then. + self.assertEqual(SINGLE_DOCUMENT_VALUE, value) + + def test_insert_multiple_documents(self): + # Given. + # Create Ion structs to insert. + parameter_1 = loads(dumps({COLUMN_NAME: MULTIPLE_DOCUMENT_VALUE_1})) + parameter_2 = loads(dumps({COLUMN_NAME: MULTIPLE_DOCUMENT_VALUE_2})) + + query = "INSERT INTO {} <>".format(TABLE_NAME) + + def execute_statement_with_parameters_and_return_count(txn, query, parameter_1, parameter_2): + cursor = txn.execute_statement(query, parameter_1, parameter_2) + count = 0 + for row in cursor: + count += 1 + return count + + # When. + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_with_parameters_and_return_count(txn, query, parameter_1, parameter_2)) + self.assertEqual(2, count) + + # Then. + search_query = "SELECT VALUE {} FROM {} WHERE {} IN (?,?)".format(COLUMN_NAME, TABLE_NAME, COLUMN_NAME) + + ion_string_1 = loads(dumps(MULTIPLE_DOCUMENT_VALUE_1)) + ion_string_2 = loads(dumps(MULTIPLE_DOCUMENT_VALUE_2)) + + def execute_statement_with_parameters_and_return_list_of_values(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + values = list() + for row in cursor: + values.append(row) + return values + + values = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_with_parameters_and_return_list_of_values(txn, search_query, ion_string_1, + ion_string_2)) + + self.assertTrue(MULTIPLE_DOCUMENT_VALUE_1 in values) + self.assertTrue(MULTIPLE_DOCUMENT_VALUE_2 in values) + + def test_delete_single_document(self): + # Given. + # Create Ion struct to insert. + ion_value = loads(dumps({COLUMN_NAME: SINGLE_DOCUMENT_VALUE})) + + query = "INSERT INTO {} ?".format(TABLE_NAME) + + def execute_statement_with_parameter_and_return_count(txn, query, parameter): + cursor = txn.execute_statement(query, parameter) + count = 0 + for row in cursor: + count += 1 + return count + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_with_parameter_and_return_count(txn, query, ion_value)) + self.assertEqual(1, count) + + # When. + delete_query = "DELETE FROM {} WHERE {} = ?".format(TABLE_NAME, COLUMN_NAME) + ion_string = loads(dumps(SINGLE_DOCUMENT_VALUE)) + + def execute_statement_and_return_count(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + count = 0 + for row in cursor: + count += 1 + return count + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_and_return_count(txn, delete_query, ion_string)) + self.assertEqual(1, count) + + # Then + def execute_count_statement_and_return_count(txn): + search_query = "SELECT COUNT(*) FROM {}".format(TABLE_NAME) + # This gives: + # { + # _1: 1 + # } + cursor = txn.execute_statement(search_query) + return next(cursor)['_1'] + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_count_statement_and_return_count(txn)) + self.assertEqual(0, count) + + def test_delete_all_documents(self): + # Given. + # Create Ion structs to insert. + parameter_1 = loads(dumps({COLUMN_NAME: MULTIPLE_DOCUMENT_VALUE_1})) + parameter_2 = loads(dumps({COLUMN_NAME: MULTIPLE_DOCUMENT_VALUE_2})) + + query = "INSERT INTO {} <>".format(TABLE_NAME) + + def execute_statement_with_parameters_and_return_count(txn, query, parameter_1, parameter_2): + cursor = txn.execute_statement(query, parameter_1, parameter_2) + count = 0 + for row in cursor: + count += 1 + return count + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_with_parameters_and_return_count(txn, query, parameter_1, parameter_2)) + self.assertEqual(2, count) + + # When. + delete_query = "DELETE FROM {}".format(TABLE_NAME, COLUMN_NAME) + + def execute_statement_and_return_count(txn, query): + cursor = txn.execute_statement(query) + count = 0 + for row in cursor: + count += 1 + return count + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_statement_and_return_count(txn, delete_query)) + self.assertEqual(2, count) + + # Then. + def execute_count_statement_and_return_count(txn): + search_query = "SELECT COUNT(*) FROM {}".format(TABLE_NAME) + # This gives: + # { + # _1: 1 + # } + cursor = txn.execute_statement(search_query) + return next(cursor)['_1'] + + count = self.pooled_qldb_driver.execute_lambda( + lambda txn: execute_count_statement_and_return_count(txn)) + self.assertEqual(0, count) + + def test_occ_exception_is_thrown(self): + # Create driver with zero retry limit to trigger OCC exception. + driver = self.integration_test_base.pooled_qldb_driver(retry_limit=0) + + # Insert document. + ion_value = loads(dumps({COLUMN_NAME: 0})) + + def execute_statement_with_parameter_and_return_count(txn, query, parameter): + cursor = txn.execute_statement(query, parameter) + count = 0 + for row in cursor: + count += 1 + return count + + query = "INSERT INTO {} ?".format(TABLE_NAME) + count = driver.execute_lambda(lambda txn: execute_statement_with_parameter_and_return_count(txn, query, + ion_value)) + self.assertEqual(1, count) + + # For testing purposes only. Forcefully causes an OCC conflict to occur. + # Do not invoke driver.execute_lambda within the parameter function under normal circumstances. + def query_and_update_record(transaction_executor): + # Query document. + transaction_executor.execute_statement("SELECT VALUE {} FROM {}".format(COLUMN_NAME, TABLE_NAME)) + + # The following update document will be committed before query document thus resulting in OCC. + driver.execute_lambda(lambda transaction_executor: + transaction_executor.execute_statement( + "UPDATE {} SET {} = ?".format(TABLE_NAME, COLUMN_NAME), 5)) + + try: + driver.execute_lambda(lambda transaction_executor: query_and_update_record(transaction_executor)) + self.fail("Did not throw OCC exception.") + except ClientError as ce: + self.assertTrue(is_occ_conflict_exception(ce)) + + def test_insert_and_read_ion_types(self): + # Use subTest context manager to setup parameterized tests. + for ion_value in create_ion_values(): + with self.subTest(ion_value=ion_value): + # Given. + # Create Ion struct to insert. + ion_struct = loads(dumps({COLUMN_NAME: ion_value})) + query = "INSERT INTO {} ?".format(TABLE_NAME) + + def execute_statement_and_return_count(txn, query, *parameter): + cursor = txn.execute_statement(query, *parameter) + count = 0 + for row in cursor: + count += 1 + return count + + # When. + count = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_count(txn, query, + ion_struct)) + self.assertEqual(1, count) + + def execute_statement_and_return_value(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + return next(cursor) + + # Then. + if isinstance(ion_value, IonPyNull): + search_query = "SELECT VALUE {} FROM {} WHERE {} IS NULL".format(COLUMN_NAME, TABLE_NAME, + COLUMN_NAME) + value = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_value(txn, + search_query)) + + else: + search_query = "SELECT VALUE {} FROM {} WHERE {} = ?".format(COLUMN_NAME, TABLE_NAME, COLUMN_NAME) + value = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_value(txn, search_query, + ion_value)) + + self.assertEqual(ion_value.ion_type, value.ion_type) + + # Delete documents in table for testing next Ion value. + self.pooled_qldb_driver.execute_statement("DELETE FROM {}".format(TABLE_NAME, COLUMN_NAME)) + + def test_update_ion_types(self): + # Given. + # Create Ion struct to insert. + ion_value = loads(dumps({COLUMN_NAME: SINGLE_DOCUMENT_VALUE})) + + def execute_statement_and_return_count(txn, query, parameter): + cursor = txn.execute_statement(query, parameter) + count = 0 + for row in cursor: + count += 1 + return count + + # Insert first record that will be subsequently updated. + query = "INSERT INTO {} ?".format(TABLE_NAME) + count = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_count(txn, query, ion_value)) + self.assertEqual(1, count) + + # Use subTest context manager to setup parameterized tests. + for ion_value in create_ion_values(): + with self.subTest(ion_value=ion_value): + # When. + query = "UPDATE {} SET {} = ?".format(TABLE_NAME, COLUMN_NAME) + count = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_count(txn, query, + ion_value)) + self.assertEqual(1, count) + + def execute_statement_and_return_value(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + return next(cursor) + + # Then. + if isinstance(ion_value, IonPyNull): + search_query = "SELECT VALUE {} FROM {} WHERE {} IS NULL".format(COLUMN_NAME, TABLE_NAME, + COLUMN_NAME) + value = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_value(txn, + search_query)) + + else: + search_query = "SELECT VALUE {} FROM {} WHERE {} = ?".format(COLUMN_NAME, TABLE_NAME, COLUMN_NAME) + value = self.pooled_qldb_driver.execute_lambda(lambda txn: + execute_statement_and_return_value(txn, search_query, + ion_value)) + + self.assertEqual(ion_value.ion_type, value.ion_type) + + def test_execute_lambda_that_does_not_return_value(self): + # Given. + # Insert Ion struct to insert. + ion_struct = loads(dumps({COLUMN_NAME: SINGLE_DOCUMENT_VALUE})) + + # When. + query = "INSERT INTO {} ?".format(TABLE_NAME) + self.pooled_qldb_driver.execute_lambda(lambda txn: txn.execute_statement(query, ion_struct)) + + # Then. + search_query = "SELECT VALUE {} FROM {} WHERE {} = ?".format(COLUMN_NAME, TABLE_NAME, COLUMN_NAME) + ion_string = loads(dumps(SINGLE_DOCUMENT_VALUE)) + + def execute_statement_and_return_value(txn, query, *parameters): + cursor = txn.execute_statement(query, *parameters) + return next(cursor) + + value = self.pooled_qldb_driver.execute_lambda(lambda txn: execute_statement_and_return_value(txn, + search_query, + ion_string)) + self.assertEqual(SINGLE_DOCUMENT_VALUE, value) + + def test_delete_table_that_does_not_exist(self): + # Given. + query = "DELETE FROM NonExistentTable" + + # When. + self.assertRaises(ClientError, self.pooled_qldb_driver.execute_statement, query) + + +def create_ion_values(): + ion_values = list() + + ion_clob = loads('{{"This is a CLOB of text."}}') + ion_values.append(ion_clob) + ion_blob = loads('{{aGVsbG8=}}') + ion_values.append(ion_blob) + ion_bool = loads('true') + ion_values.append(ion_bool) + ion_decimal = loads('0.1') + ion_values.append(ion_decimal) + ion_float = loads('0.2e0') + ion_values.append(ion_float) + ion_int = loads('1') + ion_values.append(ion_int) + ion_list = loads('[1,2]') + ion_values.append(ion_list) + ion_null = loads('null') + ion_values.append(ion_null) + ion_sexp = loads('(cons 1 2)') + ion_values.append(ion_sexp) + ion_string = loads('\"string\"') + ion_values.append(ion_string) + ion_struct = loads('{a:1}') + ion_values.append(ion_struct) + ion_symbol = loads('abc') + ion_values.append(ion_symbol) + ion_timestamp = loads('2016-12-20T05:23:43.000000-00:00') + ion_values.append(ion_timestamp) + + ion_null_clob = loads('null.clob') + ion_values.append(ion_null_clob) + ion_null_blob = loads('null.blob') + ion_values.append(ion_null_blob) + ion_null_bool = loads('null.bool') + ion_values.append(ion_null_bool) + ion_null_decimal = loads('null.decimal') + ion_values.append(ion_null_decimal) + ion_null_float = loads('null.float') + ion_values.append(ion_null_float) + ion_null_int = loads('null.int') + ion_values.append(ion_null_int) + ion_null_list = loads('null.list') + ion_values.append(ion_null_list) + ion_null_sexp = loads('null.sexp') + ion_values.append(ion_null_sexp) + ion_null_string = loads('null.string') + ion_values.append(ion_null_string) + ion_null_struct = loads('null.struct') + ion_values.append(ion_null_struct) + ion_null_symbol = loads('null.symbol') + ion_values.append(ion_null_symbol) + ion_null_timestamp = loads('null.timestamp') + ion_values.append(ion_null_timestamp) + + ion_clob_with_annotation = loads('annotation::{{"This is a CLOB of text."}}') + ion_values.append(ion_clob_with_annotation) + ion_blob_with_annotation = loads('annotation::{{aGVsbG8=}}') + ion_values.append(ion_blob_with_annotation) + ion_bool_with_annotation = loads('annotation::true') + ion_values.append(ion_bool_with_annotation) + ion_decimal_with_annotation = loads('annotation::0.1') + ion_values.append(ion_decimal_with_annotation) + ion_float_with_annotation = loads('annotation::0.2e0') + ion_values.append(ion_float_with_annotation) + ion_int_with_annotation = loads('annotation::1') + ion_values.append(ion_int_with_annotation) + ion_list_with_annotation = loads('annotation::[1,2]') + ion_values.append(ion_list_with_annotation) + ion_null_with_annotation = loads('annotation::null') + ion_values.append(ion_null_with_annotation) + ion_sexp_with_annotation = loads('annotation::(cons 1 2)') + ion_values.append(ion_sexp_with_annotation) + ion_string_with_annotation = loads('annotation::\"string\"') + ion_values.append(ion_string_with_annotation) + ion_struct_with_annotation = loads('annotation::{a:1}') + ion_values.append(ion_struct_with_annotation) + ion_symbol_with_annotation = loads('annotation::abc') + ion_values.append(ion_symbol_with_annotation) + ion_timestamp_with_annotation = loads('annotation::2016-12-20T05:23:43.000000-00:00') + ion_values.append(ion_timestamp_with_annotation) + + ion_null_clob_with_annotation = loads('annotation::null.clob') + ion_values.append(ion_null_clob_with_annotation) + ion_null_blob_with_annotation = loads('annotation::null.blob') + ion_values.append(ion_null_blob_with_annotation) + ion_null_bool_with_annotation = loads('annotation::null.bool') + ion_values.append(ion_null_bool_with_annotation) + ion_null_decimal_with_annotation = loads('annotation::null.decimal') + ion_values.append(ion_null_decimal_with_annotation) + ion_null_float_with_annotation = loads('annotation::null.float') + ion_values.append(ion_null_float_with_annotation) + ion_null_int_with_annotation = loads('annotation::null.int') + ion_values.append(ion_null_int_with_annotation) + ion_null_list_with_annotation = loads('annotation::null.list') + ion_values.append(ion_null_list_with_annotation) + ion_null_sexp_with_annotation = loads('annotation::null.sexp') + ion_values.append(ion_null_sexp_with_annotation) + ion_null_string_with_annotation = loads('annotation::null.string') + ion_values.append(ion_null_string_with_annotation) + ion_null_struct_with_annotation = loads('annotation::null.struct') + ion_values.append(ion_null_struct_with_annotation) + ion_null_symbol_with_annotation = loads('annotation::null.symbol') + ion_values.append(ion_null_symbol_with_annotation) + ion_null_timestamp_with_annotation = loads('annotation::null.timestamp') + ion_values.append(ion_null_timestamp_with_annotation) + + return ion_values