Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.10
FROM python:3.12

ENV PYTHONUNBUFFERED=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,6 @@ def upgrade():
type_=sa.String(length=255),
existing_nullable=True)

# Adjust the length of the title field in the Item table
op.alter_column('item', 'title',
existing_type=sa.String(),
type_=sa.String(length=255),
existing_nullable=False)

# Adjust the length of the description field in the Item table
op.alter_column('item', 'description',
existing_type=sa.String(),
type_=sa.String(length=255),
existing_nullable=True)


def downgrade():
# Revert the length of the email field in the User table
Expand All @@ -55,15 +43,3 @@ def downgrade():
existing_type=sa.String(length=255),
type_=sa.String(),
existing_nullable=True)

# Revert the length of the title field in the Item table
op.alter_column('item', 'title',
existing_type=sa.String(length=255),
type_=sa.String(),
existing_nullable=False)

# Revert the length of the description field in the Item table
op.alter_column('item', 'description',
existing_type=sa.String(length=255),
type_=sa.String(),
existing_nullable=True)
48 changes: 48 additions & 0 deletions backend/app/alembic/versions/db80539b8422_add_smsoutbox_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Add SMSOutbox table

Revision ID: db80539b8422
Revises: 38a2c006503e
Create Date: 2025-12-31 14:17:14.727365

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes


# revision identifiers, used by Alembic.
revision = 'db80539b8422'
down_revision = '38a2c006503e'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('smsoutbox',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('sms_message_id', sa.Uuid(), nullable=False),
sa.Column('device_id', sa.Uuid(), nullable=True),
sa.Column('payload', sa.JSON(), nullable=True),
sa.Column('status', sqlmodel.sql.sqltypes.AutoString(length=50), nullable=False),
sa.Column('attempts', sa.Integer(), nullable=False),
sa.Column('next_attempt_at', sa.DateTime(), nullable=True),
sa.Column('last_error', sqlmodel.sql.sqltypes.AutoString(length=500), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('sending_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['device_id'], ['smsdevice.id'], ),
sa.ForeignKeyConstraint(['sms_message_id'], ['smsmessage.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_smsoutbox_sms_message_id'), 'smsoutbox', ['sms_message_id'], unique=False)
op.create_index(op.f('ix_smsoutbox_status'), 'smsoutbox', ['status'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_smsoutbox_status'), table_name='smsoutbox')
op.drop_index(op.f('ix_smsoutbox_sms_message_id'), table_name='smsoutbox')
op.drop_table('smsoutbox')
# ### end Alembic commands ###
5 changes: 3 additions & 2 deletions backend/app/api/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from fastapi import APIRouter

from app.api.routes import android, login, private, sms, users, utils
from app.api.routes import login, plans, private, sms, users, utils, webhooks
from app.core.config import settings

api_router = APIRouter()
api_router.include_router(login.router)
api_router.include_router(users.router)
api_router.include_router(utils.router)
api_router.include_router(sms.router)
api_router.include_router(android.router)
api_router.include_router(plans.router)
api_router.include_router(webhooks.router)


if settings.ENVIRONMENT == "local":
Expand Down
231 changes: 101 additions & 130 deletions backend/app/api/routes/android.py
Original file line number Diff line number Diff line change
@@ -1,157 +1,128 @@
import logging
import uuid
from datetime import datetime, timezone

from fastapi import APIRouter, Query, WebSocket, WebSocketDisconnect
from sqlmodel import Session, select
from sqlmodel import Session

from app import crud
from app.api.deps import get_device_by_api_key
from app.core.db import engine
from app.models import SMSDevice, SMSMessage
from app.models import SMSDevice
from app.services.websocket_manager import websocket_manager
from app.tasks.sms_tasks import process_incoming_sms, update_message_status
from app.tasks.sms_tasks import process_incoming_sms, process_sms_ack

router = APIRouter(prefix="/android", tags=["android"])


async def _send_pending_messages(
session: Session, device: SMSDevice, websocket: WebSocket
) -> None:
"""Send pending assigned messages to the device"""
statement = (
select(SMSMessage)
.where(SMSMessage.device_id == device.id)
.where(SMSMessage.status == "assigned")
.where(SMSMessage.message_type == "outgoing")
.order_by(SMSMessage.created_at)
.limit(50)
)
messages = session.exec(statement).all()

# Refresh device. I need to find a better way to manage the Session
session.refresh(device)

for message in messages:
message_data = {
"type": "task",
"message_id": str(message.id),
"to": message.to,
"body": message.body,
}
await websocket.send_json(message_data)
# Update to sending status
message.status = "sending"
session.add(message)

session.commit()


@router.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
api_key: str = Query(...),
):
"""Persistent WebSocket connection for Android devices"""
"""Persistent WebSocket connection for Android devices."""
with Session(engine) as session:
# API Key validation
device = get_device_by_api_key(session=session, api_key=api_key)

await websocket_manager.connect_device(websocket, device.id)

# Update device status
device.status = "online"
device.last_heartbeat = datetime.now(timezone.utc)
session.add(device)
session.commit()

try:
# Send pending message on connection
await _send_pending_messages(session, device, websocket)

while True:
# Receive client messages
data = await websocket.receive_json()

message_type = data.get("type")

if message_type == "register":
# Update device info
device.name = data.get("device_name", device.name)
device.phone_number = data.get("phone_number", device.phone_number)
device.last_heartbeat = datetime.now(timezone.utc)
session.add(device)
session.commit()

logging.info("Device attempting to connect via WebSocket:", device)
if not device:
await websocket.close(code=4001, reason="Invalid API Key")
return

# Connect device and start Redis listener
listener_task = await websocket_manager.connect(websocket, device.id)

# Update device status in DB (last heartbeat)
_update_device_heartbeat_in_db(device.id)

try:
while True:
data = await websocket.receive_json()
message_type = data.get("type")

if message_type == "register":
_update_device_info_in_db(device.id, data)
await websocket.send_json(
{
"type": "registered",
"device_id": str(device.id),
"status": "ok",
}
)

elif message_type == "sms_report":
raw_outbox_id = data.get("outbox_id")
try:
outbox_id = uuid.UUID(raw_outbox_id)
except (TypeError, ValueError):
await websocket.send_json(
{
"type": "registered",
"device_id": str(device.id),
"status": "ok",
}
)

elif message_type == "sms_report":
# SMS status delivery
raw_id = data.get("message_id")
try:
message_id = uuid.UUID(raw_id)
except (TypeError, ValueError):
await websocket.send_json(
{"type": "error", "message": "Invalid message_id"}
)
continue
status_value = data.get("status") # sent, failed
error = data.get("error")

message = crud.get_sms_message(
session=session, message_id=message_id
)
if message and message.device_id == device.id:
update_message_status.delay(
str(message_id), status_value, error
)

await websocket.send_json(
{"type": "ack", "message_id": data.get("message_id")}
)

elif message_type == "sms_incoming":
# Report sms incoming
from_number = data.get("from")
body = data.get("body")
timestamp = data.get("timestamp")

process_incoming_sms.delay(
str(device.user_id), from_number, body, timestamp
{"type": "error", "message": "Invalid outbox_id"}
)
continue

status_value = data.get("status") # sent, failed
error = data.get("error")
process_sms_ack.delay(str(outbox_id), status_value, error)
await websocket.send_json({"type": "ack", "outbox_id": raw_outbox_id})

elif message_type == "sms_incoming":
process_incoming_sms.delay(
str(device.user_id),
data.get("from"),
data.get("body"),
data.get("timestamp"),
)
await websocket.send_json({"type": "ack", "status": "received"})

elif message_type == "ping":
await websocket_manager.refresh_heartbeat(device.id)
_update_device_heartbeat_in_db(device.id)
await websocket.send_json({"type": "pong"})

else:
await websocket.send_json(
{
"type": "error",
"message": f"Unknown message type: {message_type}",
}
)

except WebSocketDisconnect:
pass # Normal disconnect
except Exception:
# Log exception here if needed
pass
finally:
# Cleanup
listener_task.cancel()
await websocket_manager.disconnect(device.id)
_update_device_status_in_db(device.id, "offline")


# Helper functions to interact with DB in a new session
def _update_device_heartbeat_in_db(device_id: uuid.UUID):
with Session(engine) as session:
device = session.get(SMSDevice, device_id)
if device:
device.last_heartbeat = datetime.now(timezone.utc)
device.status = "online"
session.add(device)
session.commit()

await websocket.send_json({"type": "ack", "status": "received"})

elif message_type == "ping":
# Heartbeat/keepalive
device.last_heartbeat = datetime.now(timezone.utc)
session.add(device)
session.commit()

# Send pending messages with the ping
await _send_pending_messages(session, device, websocket)

await websocket.send_json({"type": "pong"})

else:
await websocket.send_json(
{"type": "error", "message": f"Unknown message: {message_type}"}
)

except WebSocketDisconnect:
# Disconnect Device
await websocket_manager.disconnect_device(device.id)
device.status = "offline"
def _update_device_info_in_db(device_id: uuid.UUID, data: dict):
with Session(engine) as session:
device = session.get(SMSDevice, device_id)
if device:
device.name = data.get("device_name", device.name)
device.phone_number = data.get("phone_number", device.phone_number)
device.last_heartbeat = datetime.now(timezone.utc)
session.add(device)
session.commit()
except Exception:
await websocket_manager.disconnect_device(device.id)
device.status = "offline"


def _update_device_status_in_db(device_id: uuid.UUID, status: str):
with Session(engine) as session:
device = session.get(SMSDevice, device_id)
if device:
device.status = status
session.add(device)
session.commit()
raise
Loading
Loading