diff --git a/.dockerignore b/.dockerignore index 0be3b56..f4932db 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,7 @@ # Python -__pycache__/ +**/__pycache__/ *.py[cod] +*.pyo *$py.class *.so .Python diff --git a/local_prepare.sh b/local_prepare.sh index 909b946..aacc190 100644 --- a/local_prepare.sh +++ b/local_prepare.sh @@ -42,7 +42,7 @@ if [ ! "$(docker ps -aq -f name=${PROJECT_NAME_SLUG}_rabbitmq)" ]; then -p $MESSAGE_BROKER_PORT:5672 \ -e RABBITMQ_DEFAULT_USER=$MESSAGE_BROKER_USER \ -e RABBITMQ_DEFAULT_PASS=$MESSAGE_BROKER_PASSWORD \ - rabbitmq:3.11.6-management || true + rabbitmq:4.1.4-management-alpine || true fi echo " ✅ ${PROJECT_NAME_SLUG}_rabbitmq UP" diff --git a/src/app/infrastructure/messaging/clients/kafka_client.py b/src/app/infrastructure/messaging/clients/kafka_client.py index 44949f3..4a543d1 100644 --- a/src/app/infrastructure/messaging/clients/kafka_client.py +++ b/src/app/infrastructure/messaging/clients/kafka_client.py @@ -23,7 +23,7 @@ async def is_healthy(self) -> bool: brokers = metadata.brokers() if callable(metadata.brokers) else metadata.brokers return len(brokers) > 0 except Exception as ex: - logger.error(f"{ex}") + logger.warning(f"{ex}") return False finally: await client.close() diff --git a/src/app/infrastructure/messaging/clients/rabbitmq_client.py b/src/app/infrastructure/messaging/clients/rabbitmq_client.py index 0cf7127..6192e12 100644 --- a/src/app/infrastructure/messaging/clients/rabbitmq_client.py +++ b/src/app/infrastructure/messaging/clients/rabbitmq_client.py @@ -33,7 +33,7 @@ async def is_healthy(self) -> bool: await connection_.close() return True except Exception as ex: - logger.error(f"{ex}") + logger.warning(f"{ex}") return False async def __get_connection(self) -> AbstractRobustConnection: diff --git a/src/app/interfaces/cli/consume.py b/src/app/interfaces/cli/consume.py index 5e2f9a2..fa5f96f 100644 --- a/src/app/interfaces/cli/consume.py +++ b/src/app/interfaces/cli/consume.py @@ -45,6 +45,28 @@ async def queue_processing_aggregator(data: dict, handlers_by_event: Dict[str, D asyncio.set_event_loop(e_loop) try: + + # ================================================= + # WAIT FOR READINESS + # ================================================= + sleep_before = 120 + slept = 10 + logger.info("Waiting for readiness ..") + e_loop.run_until_complete(asyncio.sleep(slept)) + + is_healthy = e_loop.run_until_complete(mq_client.is_healthy()) + while slept < sleep_before and not is_healthy: + logger.info(f"Waiting for readiness {slept}/{sleep_before} sec..") + sleep_ = 15 + e_loop.run_until_complete(asyncio.sleep(sleep_)) + slept += sleep_ + is_healthy = e_loop.run_until_complete(mq_client.is_healthy()) + logger.info("READY.." if is_healthy else "NOT READY!") + + # ================================================= + # RUN CONSUMER + # ================================================= + handlers_by_event_ = HANDLERS_MAP aggregator_ = queue_processing_aggregator e_loop.run_until_complete(