Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Python
__pycache__/
**/__pycache__/
*.py[cod]
*.pyo
*$py.class
*.so
.Python
Expand Down
2 changes: 1 addition & 1 deletion local_prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ if [ ! "$(docker ps -aq -f name=${PROJECT_NAME_SLUG}_rabbitmq)" ]; then
-p $MESSAGE_BROKER_PORT:5672 \
-e RABBITMQ_DEFAULT_USER=$MESSAGE_BROKER_USER \
-e RABBITMQ_DEFAULT_PASS=$MESSAGE_BROKER_PASSWORD \
rabbitmq:3.11.6-management || true
rabbitmq:4.1.4-management-alpine || true
fi
echo " ✅ ${PROJECT_NAME_SLUG}_rabbitmq UP"

Expand Down
2 changes: 1 addition & 1 deletion src/app/infrastructure/messaging/clients/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def is_healthy(self) -> bool:
brokers = metadata.brokers() if callable(metadata.brokers) else metadata.brokers
return len(brokers) > 0
except Exception as ex:
logger.error(f"{ex}")
logger.warning(f"{ex}")
return False
finally:
await client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def is_healthy(self) -> bool:
await connection_.close()
return True
except Exception as ex:
logger.error(f"{ex}")
logger.warning(f"{ex}")
return False

async def __get_connection(self) -> AbstractRobustConnection:
Expand Down
22 changes: 22 additions & 0 deletions src/app/interfaces/cli/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,28 @@ async def queue_processing_aggregator(data: dict, handlers_by_event: Dict[str, D
asyncio.set_event_loop(e_loop)

try:

# =================================================
# WAIT FOR READINESS
# =================================================
sleep_before = 120
slept = 10
logger.info("Waiting for readiness ..")
e_loop.run_until_complete(asyncio.sleep(slept))

is_healthy = e_loop.run_until_complete(mq_client.is_healthy())
while slept < sleep_before and not is_healthy:
logger.info(f"Waiting for readiness {slept}/{sleep_before} sec..")
sleep_ = 15
e_loop.run_until_complete(asyncio.sleep(sleep_))
slept += sleep_
is_healthy = e_loop.run_until_complete(mq_client.is_healthy())
logger.info("READY.." if is_healthy else "NOT READY!")

# =================================================
# RUN CONSUMER
# =================================================

handlers_by_event_ = HANDLERS_MAP
aggregator_ = queue_processing_aggregator
e_loop.run_until_complete(
Expand Down