Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/pythonapp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix :
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest]
python-version: [3.6, 3.7, 3.8]
steps:
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
Expand All @@ -38,4 +44,5 @@ jobs:
- name: Test with pytest
run: |
pip install pytest
pytest
pytest tests/unit
pytest tests/integration --ledger_suffix ${{ matrix.python-version }}-${{ matrix.os }}
3 changes: 2 additions & 1 deletion buildspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ phases:
- pip install -e .
build:
commands:
- pytest
- pytest tests/unit/
- pytest tests/integration/ -rfs
post_build:
commands:
- python setup.py sdist bdist_wheel
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
26 changes: 26 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
import pytest


def pytest_addoption(parser):
# Collect config values from cmd line or setup.cfg
parser.addoption('--region', action='store', help='')
parser.addoption(
"--ledger_suffix", action="store", default="", help=""
)


@pytest.fixture(scope='class', autouse=True)
def config_variables(request):
# Set as class attribute on the invoking test context.
request.cls.region = request.config.getoption("--region")
request.cls.ledger_suffix = request.config.getoption("--ledger_suffix").replace(".", "-")
23 changes: 23 additions & 0 deletions tests/integration/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.


TABLE_NAME = "PythonIntegrationTestTable"
CREATE_TABLE_NAME = "PythonIntegrationTestCreateTable"

INDEX_ATTRIBUTE = "Name"
COLUMN_NAME = "Name"

SINGLE_DOCUMENT_VALUE = "SingleDocumentValue"
MULTIPLE_DOCUMENT_VALUE_1 = "MultipleDocumentValue1"
MULTIPLE_DOCUMENT_VALUE_2 = "MultipleDocumentValue2"

LEDGER_NAME = "Pytest"
94 changes: 94 additions & 0 deletions tests/integration/integration_test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
from logging import basicConfig, getLogger, INFO
from threading import Thread
from time import sleep

import boto3
from botocore.exceptions import ClientError
from pyqldb.driver.pooled_qldb_driver import DEFAULT_TIMEOUT_SECONDS, PooledQldbDriver

logger = getLogger(__name__)
basicConfig(level=INFO)


class IntegrationTestBase:

def __init__(self, ledger_name, region):
self.ledger_name = ledger_name
self.region = region
session = boto3.Session()
self.qldb = session.client(service_name="qldb", region_name=self.region)

def force_delete_ledger(self):
try:
logger.info("Deleting ledger %s", self.ledger_name)
self.qldb.update_ledger(Name=self.ledger_name, DeletionProtection=False)
self.delete_ledger()
except ClientError as ce:
logger.warning("Encountered an error while force deleting ledger %s: %s", self.ledger_name, ce)

def delete_ledger(self):
logger.info("Deleting ledger %s", self.ledger_name)
self.qldb.update_ledger(Name=self.ledger_name, DeletionProtection=False)
self.qldb.delete_ledger(Name=self.ledger_name)
self.wait_for_deletion()

def create_ledger(self):
logger.info("Creating ledger named: {}...".format(self.ledger_name))
self.qldb.create_ledger(Name=self.ledger_name, PermissionsMode='ALLOW_ALL')
self.wait_for_active()

def wait_for_active(self):
logger.info('Waiting for ledger to become active...')
while True:
result = self.qldb.describe_ledger(Name=self.ledger_name)
if result.get('State') == "ACTIVE":
logger.info('Success. Ledger is active and ready to use.')
return result
logger.info('The ledger is still creating. Please wait...')
sleep(5)

def wait_for_deletion(self):
logger.info('Waiting for ledger to be deleted...')
while True:
try:
self.qldb.describe_ledger(Name=self.ledger_name)
sleep(5)
logger.info('The ledger is still deleting. Please wait...')
except self.qldb.exceptions.ResourceNotFoundException:
logger.info('The ledger is deleted')
return

def pooled_qldb_driver(self, ledger_name=None, pool_limit=0, time_out=DEFAULT_TIMEOUT_SECONDS, retry_limit=4):
if ledger_name is not None:
ledger_name = ledger_name
else:
ledger_name = self.ledger_name

return PooledQldbDriver(ledger_name=ledger_name, region_name=self.region, pool_limit=pool_limit,
timeout=time_out, retry_limit=retry_limit)


class ThreadThatSavesException(Thread):
"""
Extend the Python Thread library to pass in a queue for saving exceptions.
"""

def __init__(self, target, bucket, args=()):
Thread.__init__(self, target=target, args=args)
self.bucket = bucket

def run(self):
try:
Thread.run(self)
except Exception as e:
self.bucket.put(e)
5 changes: 5 additions & 0 deletions tests/integration/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Configuration for pytest
[tool:pytest]
xfail_strict = true
addopts =
--region us-east-2
114 changes: 114 additions & 0 deletions tests/integration/test_session_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
# the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
# and limitations under the License.
import pytest
from queue import Queue
from unittest import TestCase

from .constants import *
from .integration_test_base import IntegrationTestBase, ThreadThatSavesException

from botocore.exceptions import ClientError
from pyqldb.errors import DriverClosedError, SessionPoolEmptyError


@pytest.mark.usefixtures("config_variables")
class TestSessionManagement(TestCase):

@classmethod
def setUpClass(cls):
cls.integration_test_base = IntegrationTestBase(LEDGER_NAME+cls.ledger_suffix, cls.region)
cls.integration_test_base.force_delete_ledger()
cls.integration_test_base.create_ledger()

@classmethod
def tearDownClass(cls):
cls.integration_test_base.delete_ledger()

def test_connect_to_non_existent_ledger(self):
with self.integration_test_base.pooled_qldb_driver("nonExistentLedger") as pooled_qldb_driver:
self.assertRaises(ClientError, pooled_qldb_driver.get_session)

def test_get_session_when_pool_does_not_have_session_and_has_not_hit_limit(self):
# Start a pooled driver with default pool limit so it doesn't have sessions in the pool
# and has not hit the limit.
with self.integration_test_base.pooled_qldb_driver() as pooled_qldb_driver:
try:
session = pooled_qldb_driver.get_session()
session.list_tables()
except ClientError as e:
self.fail(repr(e))

def test_get_session_when_pool_has_session_and_has_not_hit_limit(self):
try:
# Start a pooled driver with default pool limit so it doesn't have sessions in the pool
# and has not hit the limit.
with self.integration_test_base.pooled_qldb_driver() as pooled_qldb_driver:
# Call the first list_tables() to start session and put into pool.
session = pooled_qldb_driver.get_session()
session.list_tables()
session.close()

# Call the second list_tables() to use session from pool and is expected to execute successfully.
session = pooled_qldb_driver.get_session()
session.list_tables()
except ClientError as e:
self.fail(repr(e))

def test_get_session_when_pool_does_not_have_session_and_has_hit_limit(self):
# Saving exceptions in a queue because Python threads execute in their own stack.
bucket = Queue()

# With the time out set to 1 ms, only one thread should go through.
# The other thread will try to acquire the session, but because it can wait for only 1ms, it will error out.
with self.integration_test_base.pooled_qldb_driver(pool_limit=1, time_out=0.001) as pooled_qldb_driver:
thread_1 = ThreadThatSavesException(target=pooled_qldb_driver.get_session, bucket=bucket)
thread_2 = ThreadThatSavesException(target=pooled_qldb_driver.get_session, bucket=bucket)

thread_1.start()
thread_2.start()

thread_1.join()
thread_2.join()

self.assertEqual(1, bucket.qsize())
self.assertIsInstance(bucket.get(), SessionPoolEmptyError)

def test_get_session_when_pool_does_not_have_session_and_has_hit_limit_and_session_is_returned_to_pool(self):
# Saving exceptions in a queue because Python threads execute in their own stack.
bucket = Queue()

# Start a pooled driver with pool limit of 1 and default timeout of 30 seconds.
with self.integration_test_base.pooled_qldb_driver(pool_limit=1) as pooled_qldb_driver:
# Start two threads to get a session concurrently which will hit the session pool limit but
# will succeed because the session is returned to pool before timing out.
thread_1 = ThreadThatSavesException(target=get_session_and_return_to_pool, bucket=bucket,
args=(pooled_qldb_driver,))
thread_2 = ThreadThatSavesException(target=get_session_and_return_to_pool, bucket=bucket,
args=(pooled_qldb_driver,))

thread_1.start()
thread_2.start()

thread_1.join()
thread_2.join()

self.assertEqual(0, bucket.qsize())

def test_get_session_when_driver_is_closed(self):
pooled_qldb_driver = self.integration_test_base.pooled_qldb_driver()

pooled_qldb_driver.close()
self.assertRaises(DriverClosedError, pooled_qldb_driver.get_session)


def get_session_and_return_to_pool(pooled_qldb_driver):
session = pooled_qldb_driver.get_session()
session.close()
Loading