diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3cf373 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +**/kafka/data/ +*.pyc diff --git a/API-breaking-change/readme.md b/API-breaking-change/readme.md deleted file mode 100644 index ce5e823..0000000 --- a/API-breaking-change/readme.md +++ /dev/null @@ -1,48 +0,0 @@ - - -# Handling API Breaking Changes - - -Imagine you’re designing and maintaining an internal or public-facing **Weather API**. A basic version of the response looks like: - -``` -{ - "Weather": [ - { "hour": 0, "temperature": "18", "condition": "Clear" }, - { "hour": 1, "temperature": "17", "condition": "Clear" }, - ... - { "hour": 23, "temperature": "16", "condition": "Cloudy" } - ] -} -``` - - Assumming this is the first published contract is consumed by multiple frontend apps already we need to introduced a change. please answer to these questions: - - -### 1 What Is a Breaking Change? - -Provide examples of what would constitute a **breaking change** to this API response for the frontends that are using tihs endpoints. provide at least 3 example. - -### 2 Coordinating Across Multiple Frontends - -You have **multiple frontend clients** some update imidiately and some take their update only every 1–2 months. -**How would you handle an API schema change across all of them safely?** - -### 3 How to Catch Breaking Changes During Development - -Describe how a team can **detect breaking changes early**, in your experince. please elaburate. - - -### 4 Policy for Releasing Changes - -What internal **policy/process** was established to manage schema changes safely, in your previous team? - - -## πŸ§ͺ Acceptance Criteria -- Answer these four questions thoroughly – at least one paragraph each, maximum half a page. -- Provide practical examples from your own experience. Don’t just rely on ChatGPT’s first suggestion β€” dig deeper! - - - - - diff --git a/README.md b/README.md index 52707bc..7868675 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,37 @@ +# HOW TO RUN +## Program execution +Run `sudo docker-compose up --build`. +In your host open the webrowser and to the url: https://localhost.com/weather?city= +The project comprises a kafka producer that produces weather data, a kafka consumer which uses those data, a kafka broker, a web app in Flask which responds to http get requests, nd a Redis DB for readily access weather data. -# NATIX Dev Challenge +Next step: +- use elastic search +- add SQL DB (SQLAlchemy) +- do some data transformations on the consumer and publish on redis to be used for the web-app +- add MongoDB +- Create alert Red and manage those from a Consumer +- add a second producer on a different city -Welcome to the official **NATIX Dev Challenge** repository! πŸš€ -The challenges are designed to evaluate your technical skills through facing with a practical real-world problems. +## Pytest +Install pytest on your host or you virtualenvironment. +To run pytests go to `dev-challenge/weather-service` and run `pytest` or `pytest tests/`. -## 🧠 Challenges -```bash -. -β”œβ”€β”€ API-breaking-change/ -β”œβ”€β”€ weather-service/ -└── README.md # You are here +# PROJECT STRUCTURE +``` +./ +β”œβ”€β”€ app +β”‚ β”œβ”€β”€ __init__.py: initialises Redis and Flask +β”‚ β”œβ”€β”€ externals.py: simulates external calls +β”‚ β”œβ”€β”€ routes.py: app endpoints +β”‚ └── structures.py: file containing custom classes for type hinting +β”œβ”€β”€ tests +β”‚ β”œβ”€β”€ conftest.py: config file for mocking Redis, etc. +β”‚ └── test_app.py: unit test file +β”œβ”€β”€ docker-compose.yaml +β”œβ”€β”€ Dockerfile +β”œβ”€β”€ pytest.ini +β”œβ”€β”€ readme.md +└── requirements.txt ``` - -## πŸ“¬ Questions? - -If anything is unclear or missing, feel free to reach out via the channel you received the challenge on. - - -Good luck, and have fun! -β€” *NATIX* - - diff --git a/app/Dockerfile b/app/Dockerfile new file mode 100644 index 0000000..e203049 --- /dev/null +++ b/app/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.11 + +RUN pip install --upgrade pip +WORKDIR /app + +#COPY app/weather.py ./ +COPY app /app +COPY ./structures/redis_structures.py /app +RUN pip install -r /app/requirements.txt + +# Set environment variables +#comment if not a package +ENV FLASK_APP=app +ENV FLASK_RUN_HOST=0.0.0.0 + +EXPOSE 5000 +#CMD ["flask", "run", "--host=0.0.0.0"] +#CMD python weather.py #ok +CMD ["flask", "--app=routes", "run", "--host=0.0.0.0", "--port=5000"] \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..9f19a80 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,11 @@ +""" +Initialisation module. It initialise Flask server and Redis client +""" +from flask import Flask +from redis import Redis + +# initialise Flask +app = Flask(__name__) +# initialise Redis +redis = Redis(host="redis", port=6379) +from app import routes \ No newline at end of file diff --git a/app/externals.py b/app/externals.py new file mode 100644 index 0000000..e92e7c0 --- /dev/null +++ b/app/externals.py @@ -0,0 +1,45 @@ +""" +Module to simulate external calls. +""" +from datetime import datetime, timedelta +import random +from app.structures import APIWeatherData + +def get_weather_data(city: str): + """Function to simulate external API returning weather ata + + Args: + city (str): the city for which weather data should be returned + + Returns: + _type_: weather data + """ + # after 1hr the counter must be reset + MAX_TIME_DELTA = timedelta(hours=1) + date = datetime.now() + + if len(get_weather_data.calls) == 0: + pass # date will be added at the end + elif (date - get_weather_data.calls[0])>MAX_TIME_DELTA: + # remove old calls + i = 0 + for i, call in enumerate(get_weather_data.calls): + if date-call=100: + return {} + weather_type = {0:"Cloudy", 1: "Clear", 2: "Rainy", 3: "Foggy"} + result: APIWeatherData = dict() + result["city"] = city + result["result"] = list() + base_temp = random.randrange(-20,35) + temp = base_temp + for i in range(date.hour+1): + temp = temp + random.randrange(-3,3) + result["result"].append({ "hour": i, "temperature": str(temp)+"Β°C", + "condition": weather_type[random.randrange(0,len(weather_type))] }) + get_weather_data.calls.append(date) + return result + +get_weather_data.calls = list() diff --git a/app/requirements.txt b/app/requirements.txt new file mode 100644 index 0000000..0ce8190 --- /dev/null +++ b/app/requirements.txt @@ -0,0 +1,5 @@ +flask +redis +pytest +pydantic +kafka-python \ No newline at end of file diff --git a/app/routes.py b/app/routes.py new file mode 100644 index 0000000..ffe129a --- /dev/null +++ b/app/routes.py @@ -0,0 +1,89 @@ +""" +Module implementing endpoints and some utilities +""" +from datetime import datetime +import json +from typing import Optional +from flask import request, jsonify +from app import app, redis +from app.structures import APIWeatherData +from app.redis_structures import Date, HourlyData, RedisEntry +from app.externals import get_weather_data + +def get_last_update(last_updated: Optional[Date]=None) -> bool: + """ + Function compares the passed datetime to the current + and return the old object if year, month, day and hour are the same, the new one if not + or if the input is none. + + Args: + last_updated (dict): datetime of the last updated entry in Redis + + Returns: + bool: true if the redis entry needs to be updated, i.e., the last entry + does not contain the new hour(s) + """ + current_date = datetime.now() + current_date: Date = Date(year=current_date.year, month=current_date.month, + day=current_date.day, hour=current_date.hour) + new_update_date = last_updated + if last_updated is None: + new_update_date = current_date + elif current_date.year>last_updated.year or current_date.month>last_updated.month or \ + current_date.day>last_updated.day or current_date.hour>last_updated.hour: + new_update_date = current_date + return new_update_date + + +@app.route('/weather', methods=['GET']) # By default it accepts only GET methods +def weather(): + """ + Decorator for "/weather" endpoing + Returns: + """ + try: + city: str = request.args.get('city').lower().capitalize() + except: + return jsonify({"result": []}) + redis_entry: RedisEntry = redis.get(city) + last_weather_data = {} + if not redis_entry: + # no entry means it's the first insertion for that city + dict_date: Date = Date.model_validate(get_last_update()) + new_entry: RedisEntry = RedisEntry.model_validate({"last_update": dict_date, "data": list()}) + # ASSUMPTION: a dictionary is returned directly + # ASSUMPTION: external api returns same data for past hours + #get data from external call + weather_data: APIWeatherData = APIWeatherData.model_validate(get_weather_data(city)) + for hour_weather in weather_data.result: + hour_weather_json = hour_weather.model_dump() + new_entry.data.append({int( hour_weather_json.pop("hour")) : HourlyData.model_validate(hour_weather_json)}) + redis.set(city, new_entry.model_dump_json()) + last_weather_data = weather_data + else: + redis_data: RedisEntry = RedisEntry.model_validate(json.loads(redis.get(city))) + last_update: Date = Date.model_validate(redis_data.last_update) + new_update_date: Date = Date.model_validate(get_last_update(last_update)) + if last_update == new_update_date: + # convert data from redis represention to API + weather_data: APIWeatherData = APIWeatherData.model_validate( + { + "city": city, + "result": [{"hour": list(element.keys())[0], **list(element.values())[0].model_dump()} + for element in redis_data.data + ] + } + ) + last_weather_data = weather_data + else: + # call external service + weather_data: APIWeatherData = APIWeatherData.model_validate(get_weather_data(city)) + last_hours_inserted: int = len(redis_data.data)-1 + for new_hourly_data in weather_data.result[last_hours_inserted+1:]: + hour_weather_json = new_hourly_data.model_dump() + hourly_data = {hour_weather_json.pop("hour") : hour_weather_json} + redis_data.data.append(hourly_data) + redis_data.last_update = new_update_date + redis.set(city, redis_data.model_dump_json()) + last_weather_data = weather_data + return jsonify(last_weather_data.model_dump()) diff --git a/app/structures.py b/app/structures.py new file mode 100644 index 0000000..c2aba43 --- /dev/null +++ b/app/structures.py @@ -0,0 +1,18 @@ +""" +Moule providing custom structures +""" +from typing import TypedDict, List, Dict +from pydantic import BaseModel +from app.redis_structures import HourlyData + +class APIHourlyData(HourlyData): + """Structure representing Hourly data returned by APIs""" + hour: int + + +class APIWeatherData(BaseModel): + """ + Class representing a + """ + city: str + result: List[APIHourlyData] \ No newline at end of file diff --git a/app/tests/conftest.py b/app/tests/conftest.py new file mode 100644 index 0000000..2aa0eff --- /dev/null +++ b/app/tests/conftest.py @@ -0,0 +1,19 @@ +from unittest.mock import patch +import pytest + +@pytest.fixture +def mock_redis(): + # Define a fake Redis cache + redis_cache = {} + + class MockRedis: + def get(self, key): + return redis_cache.get(key) + + def set(self, key, value): + redis_cache[key] = value + + mock_instance = MockRedis() + # PATCH THE REDIS USED IN app.routes (not app.__init__) + with patch("app.routes.redis", mock_instance): + yield mock_instance diff --git a/app/tests/test_app.py b/app/tests/test_app.py new file mode 100644 index 0000000..6267349 --- /dev/null +++ b/app/tests/test_app.py @@ -0,0 +1,57 @@ +import pytest +from unittest.mock import patch +from app import app +from app.externals import get_weather_data + +@pytest.fixture +def client(): + """Function to run the app + + Yields: + _type_: web server instance + """ + with app.test_client() as client: + yield client + +def test_weather(client, mock_redis): + get_weather_data.calls = list() + for _ in range(150): + response = client.get("/weather?city=Astana") + assert response.status_code == 200 + assert len(get_weather_data.calls) == 1 + +def test_weather_case_insensitive_city(client, mock_redis): + get_weather_data.calls = list() + response1 = client.get("/weather?city=Astana") + assert response1.status_code == 200 + assert len(get_weather_data.calls) == 1 + response2 = client.get("/weather?city=astana") + assert response2.status_code == 200 + assert len(get_weather_data.calls) == 1 + +def test_post_method(client): + """Test that the http post request is not allowed + + Args: + client (_type_): web server + """ + response = client.post("/weather?city=Astana") + assert response.status_code==405 + +def test_delete_method(client): + """Test that the http delete request is not allowed + + Args: + client (_type_): web server + """ + response = client.delete("/weather?city=Astana") + assert response.status_code==405 + +def test_put_method(client): + """Test that the http put request is not allowed + + Args: + client (_type_): web server + """ + response = client.put("/weather?city=Astana") + assert response.status_code==405 \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..7610a21 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,66 @@ +services: + web: + build: + context: . + dockerfile: app/Dockerfile + ports: + - "5000:5000" + #volumes: + # - ./app:/app avoid this to avoid ovrewriting the effect of copying + environment: + - FLASK_ENV=development + image: development:latest + redis: + image: "redis:alpine" + container_name: redis + ports: + - "6379:6379" + kafka: + image: confluentinc/cp-kafka:latest + hostname: kafka + container_name: kafka + ports: + - "9092:9092" + - "9093:9093" + environment: + KAFKA_KRAFT_MODE: "true" # Enables KRaft mode. + KAFKA_PROCESS_ROLES: controller,broker # Kafka acts as both controller and broker. + KAFKA_NODE_ID: 1 # Unique ID for the Kafka instance. + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093" # Controller quorum. + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LOG_DIRS: /var/lib/kafka/data # Log storage location. + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" # Enables automatic topic creation. + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # Single replica for simplicity. + KAFKA_LOG_RETENTION_HOURS: 168 # Log retention period (7 days). + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 # No rebalance delay. + CLUSTER_ID: "Mk3OEYBSD34fcwNTJENDM2Qk" # Unique Kafka cluster ID. + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ./kafka/data:/var/lib/kafka/data # Maps logs to local storage. + CagliariProducer: + build: + context: . + dockerfile: weatherSensor/Dockerfile + image: weathersensor:latest + container_name: CagliariWeatherSensor + # volumes: + # - ./weatherSensor:/weatherSensorCagliari #avoid overwriting already copied files + environment: + - LOCATION=Cagliari + depends_on: + - kafka + Transformer: + build: + context: ./ + dockerfile: transformer/Dockerfile + image: transformer:latest + container_name: Transformer + environment: + - LOCATION=Cagliari + depends_on: + - kafka + - CagliariProducer diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..b8757d1 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +# pytest.ini +[pytest] +pythonpath = . diff --git a/structures/redis_structures.py b/structures/redis_structures.py new file mode 100644 index 0000000..1da0a9c --- /dev/null +++ b/structures/redis_structures.py @@ -0,0 +1,25 @@ +from typing import List, Dict +from pydantic import BaseModel + +class Date(BaseModel): + """ + Class to represent last update of the redis update with hour granularity + """ + year: int + month: int + day: int + hour: int + +class HourlyData(BaseModel): + """ + Structure representing a single hour weather data + """ + temperature: str + condition: str + +class RedisEntry(BaseModel): + """ + Class representing a + """ + last_update: Date + data: List[Dict[int, HourlyData]] \ No newline at end of file diff --git a/transformer/Dockerfile b/transformer/Dockerfile new file mode 100644 index 0000000..47780fe --- /dev/null +++ b/transformer/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11 + +RUN pip install --upgrade pip +WORKDIR /transformer + +#COPY requirements.txt . +COPY transformer /transformer +RUN pip install -r /transformer/requirements.txt + +# Set environment variables +#comment if not a package +#COPY app/weatherSe.py ./ +#CMD ["flask", "run", "--host=0.0.0.0"] +#CMD python weather.py #ok +CMD ["python", "-u", "transformer.py"] \ No newline at end of file diff --git a/transformer/requirements.txt b/transformer/requirements.txt new file mode 100644 index 0000000..c26186b --- /dev/null +++ b/transformer/requirements.txt @@ -0,0 +1,4 @@ +pytest +pydantic +kafka-python +confluent-kafka diff --git a/transformer/transformer.py b/transformer/transformer.py new file mode 100644 index 0000000..d52cff8 --- /dev/null +++ b/transformer/transformer.py @@ -0,0 +1,47 @@ +import os +import time +from confluent_kafka import Consumer, KafkaError +import json + +LOCATION_KEY="LOCATION" +# Optional delivery callback +def delivery_report(err, msg): + if err is not None: + print(f"❌ Message delivery failed: {err}") + else: + print(f"βœ… Message delivered to {msg.topic()} [{msg.partition()}]") + + +if __name__=="__main__": + time.sleep(30) + LOCATION = os.getenv(LOCATION_KEY) + KAFKA_HOST = "kafka:9092" # Or the address you want + consumer_group = LOCATION+"Consumer" + consumer = Consumer({"bootstrap.servers": KAFKA_HOST, "group.id": consumer_group, "auto.offset.reset": "latest"}) + topic = topic = "weather_" + LOCATION + consumer.subscribe([topic,]) + try: + while True: + msg = consumer.poll(timeout=1.0) # Poll for message (timeout in seconds) + if msg is None: + continue # No message this time + if msg.error(): + print(f"❌ Error: {msg.error()}") + continue + + # Message received + print(f"βœ… Received message: {msg.value().decode('utf-8')}") + + # If it's JSON, decode it + try: + data = json.loads(msg.value()) + print(f"πŸ” Parsed JSON: {data}") + except json.JSONDecodeError: + print("⚠️ Not a valid JSON message") + + except KeyboardInterrupt: + print("πŸ‘‹ Stopping consumer...") + + finally: + # Always close the consumer on exit + consumer.close() diff --git a/weather-service/readme.md b/weather-service/readme.md deleted file mode 100644 index ba70ec8..0000000 --- a/weather-service/readme.md +++ /dev/null @@ -1,61 +0,0 @@ - - -# Resilient Weather Service -build a backend API that exposes weather data to a frontend. The frontend requests the today's weather for the city the user is in β€” there's a catch: the only way to get weather information is via an external weather API that is rate-limited. - -Your goal is to design a resilient backend that: -- Moderates (minimizes) calls to the external weather API -- Handles API failures gracefully - - -## Context and Scope -1. The external weather API is limited to 100 requests per hour. -2. The external weather API returns detailed weather data for a given city on the current day everytime it's called. As shown in the example -3. You must support approx. 100,000 daily active users across approx. 2,500 different cities across the globe. Users use the service at any time throughout the day. -4. User authentication and external API authentication are out of scope of this task. Simply assume that the API you develop will be open to any call and the external weather API will reply to requests coming from our cluster according to the limit mentioned in 1. - - -Example of response for a passed city to the external weather API - The result is the weather for today - -``` -{ - "result": [ - { "hour": 0, "temperature": "18Β°C", "condition": "Clear" }, - { "hour": 1, "temperature": "17Β°C", "condition": "Clear" }, - ... - { "hour": 23, "temperature": "16Β°C", "condition": "Cloudy" } - ] -} -``` - - - -## Endpoint Base Info - -``` GET /weather?city=CityName ``` - - -Response: -``` -{ - "weather": [ - { "hour": 0, "temperature": "18", "condition": "Clear" }, - { "hour": 1, "temperature": "17", "condition": "Clear" }, - ... - { "hour": 23, "temperature": "16", "condition": "Cloudy" } - ], - … -} -``` - - - -## πŸ§ͺ Acceptance Criteria -- You may use any programming language. Even pseudocode or structured texts (e.g. workflow-style logic in written fromat) is acceptable β€” what matters is the clarity and quality of your technical design and solution. -- You may mock any libraries or databases you need. The focus is not on third-party integerations. -- Write down any assumptions β€” either as comments in the code or as side notes in a document. -- Clearly describe the input and output of each major function/step in your solution. This helps us understand your reasoning behind your technical design. -- Improve the response object: the example provided is minimal. Based on your experience, design a response that communicates effectively with the frontend/UI. - - - diff --git a/weatherSensor/Dockerfile b/weatherSensor/Dockerfile new file mode 100644 index 0000000..1feccb4 --- /dev/null +++ b/weatherSensor/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11 + +RUN pip install --upgrade pip +WORKDIR /weatherSensor + +#COPY requirements.txt . +COPY weatherSensor /weatherSensor +RUN pip install -r /weatherSensor/requirements.txt + +# Set environment variables +#comment if not a package +#COPY app/weatherSe.py ./ +#CMD ["flask", "run", "--host=0.0.0.0"] +#CMD python weather.py #ok +CMD ["python", "-u", "weather_sensor.py"] \ No newline at end of file diff --git a/weatherSensor/requirements.txt b/weatherSensor/requirements.txt new file mode 100644 index 0000000..c26186b --- /dev/null +++ b/weatherSensor/requirements.txt @@ -0,0 +1,4 @@ +pytest +pydantic +kafka-python +confluent-kafka diff --git a/weatherSensor/weather_sensor.py b/weatherSensor/weather_sensor.py new file mode 100644 index 0000000..f557b7b --- /dev/null +++ b/weatherSensor/weather_sensor.py @@ -0,0 +1,31 @@ +import os +import time, datetime +from confluent_kafka import Producer +from random import randint +import json + +LOCATION_KEY="LOCATION" +# Optional delivery callback +def delivery_report(err, msg): + if err is not None: + print(f"❌ Message delivery failed: {err}") + else: + print(f"βœ… Message delivered to {msg.topic()} [{msg.partition()}]") + + +if __name__=="__main__": + time.sleep(30) + LOCATION = os.getenv(LOCATION_KEY) + KAFKA_HOST = "kafka:9092" # Or the address you want + producer = Producer({"bootstrap.servers": KAFKA_HOST}) + topic = "weather_"+LOCATION + print(topic) + while True: + MESSAGE = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + MESSAGE = {"temperature_celsius": randint(20, 35), "weather": "Cloudy", "Humidity": "50%", "Alert": "No", "time": MESSAGE} + try: + producer.produce(topic=topic, key=LOCATION, value=json.dumps(MESSAGE).encode('utf-8'), on_delivery=delivery_report) + producer.flush() + except Exception as ex: + print("Exception happened :",ex) + time.sleep(3)