-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
232 lines (188 loc) · 8.65 KB
/
bot.py
File metadata and controls
232 lines (188 loc) · 8.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import os
import logging
from dotenv import load_dotenv
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters, CallbackQueryHandler
from api_client import APIClient
# Load Environment
load_dotenv()
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# Setup Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Initialize API Client
api = APIClient()
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
user = update.effective_user
is_linked = api.check_linkage(user.id)
if is_linked:
await update.message.reply_text(
f"Hi {user.first_name}! You are linked and ready to go.\n"
"Commands:\n"
"/groups - List groups\n"
"/friends - List friends\n"
"/stats - View stats\n"
"/dues [filter] - View dues\n"
"Send text for personal expenses.\n"
"Send a photo for receipt scanning."
)
else:
await update.message.reply_text(
f"Hi {user.first_name}! Your account is not linked.\n"
"Please go to the SplitLLM App -> Profile -> Connect Telegram using your User ID: `{user.id}`"
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"/start - Check connection\n"
"/groups - List your groups\n"
"/friends - List your friends\n"
"/stats - View spending stats\n"
"/dues - View all dues\n"
"/dues <name> - View dues for a specific friend/group\n"
"To add a personal expense, just type it: 'Lunch 20'\n"
"To add a group expense, upload a receipt or use: `/g_expense <group> <amount>` (Not impl in v1)"
)
async def groups(update: Update, context: ContextTypes.DEFAULT_TYPE):
res = api.get_groups(update.effective_user.id)
if "error" in res:
await update.message.reply_text(f"Error: {res['error']}")
return
group_list = res.get("groups", [])
if not group_list:
await update.message.reply_text("You have no groups.")
else:
msg = "Your Groups:\n" + "\n".join([f"- {g['name']}" for g in group_list])
await update.message.reply_text(msg)
async def friends(update: Update, context: ContextTypes.DEFAULT_TYPE):
res = api.get_friends(update.effective_user.id)
if "error" in res:
await update.message.reply_text(f"Error: {res['error']}")
return
friend_list = res.get("friends", [])
if not friend_list:
await update.message.reply_text("You have no friends added.")
else:
msg = "Your Friends:\n" + "\n".join([f"- {f['name']}" for f in friend_list])
await update.message.reply_text(msg)
async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
res = api.get_stats(update.effective_user.id)
if "error" in res:
await update.message.reply_text(f"Error: {res['error']}")
return
# Assuming generic stats structure
msg = (f"Stats:\n"
f"Total Spent: {res.get('total_spent', 0)}\n"
f"Total Owed: {res.get('total_owed', 0)}\n"
f"Total Due: {res.get('total_due', 0)}")
await update.message.reply_text(msg)
async def dues(update: Update, context: ContextTypes.DEFAULT_TYPE):
args = context.args
filter_by = " ".join(args) if args else None
res = api.get_dues(update.effective_user.id, filter_by)
if "error" in res:
await update.message.reply_text(f"Error: {res['error']}")
return
dues_list = res.get("dues", [])
if not dues_list:
await update.message.reply_text("No outstanding dues found.")
else:
msg = "Dues:\n"
for d in dues_list:
msg += f"- {d['name']}: {d['amount']} ({d['status']})\n"
await update.message.reply_text(msg)
async def handle_text_expense(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text
# Simple check to avoid processing accidental messages (can be improved)
if len(text) < 3:
return
await update.message.reply_text("Processing personal expense...")
res = api.post_personal_expense(update.effective_user.id, text)
if "error" in res:
await update.message.reply_text(f"Error: {res['error']}")
else:
expense = res.get("expense", {})
await update.message.reply_text(f"Added Personal Expense:\n{expense.get('description', 'Item')} - {expense.get('amount', 0)}")
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
photo_file = await update.message.photo[-1].get_file()
# We need to pick a group first.
# Fetch groups to show as buttons
res = api.get_groups(update.effective_user.id)
if "error" in res:
await update.message.reply_text(f"Error fetching groups: {res['error']}")
return
groups_list = res.get("groups", [])
if not groups_list:
await update.message.reply_text("You have no groups to add this receipt to.")
return
# Download photo temporarily
file_path = f"temp_{update.effective_user.id}.jpg"
await photo_file.download_to_drive(file_path)
# Store file path in user_data to use in callback
context.user_data['receipt_path'] = file_path
keyboard = []
for g in groups_list:
keyboard.append([InlineKeyboardButton(g['name'], callback_data=f"GROUP:{g['id']}")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("Select a group for this receipt:", reply_markup=reply_markup)
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
data = query.data
if data.startswith("GROUP:"):
group_id = data.split(":")[1]
file_path = context.user_data.get('receipt_path')
if not file_path or not os.path.exists(file_path):
await query.edit_message_text("Error: Receipt file expired or missing. Please upload again.")
return
await query.edit_message_text(f"Processing receipt for group ID {group_id}...")
with open(file_path, 'rb') as f:
res = api.post_receipt(update.effective_user.id, f, group_id)
# Cleanup
os.remove(file_path)
del context.user_data['receipt_path']
if "error" in res:
await query.message.reply_text(f"Error processing receipt: {res['error']}")
else:
exp = res.get("expense", {})
await query.message.reply_text(f"Receipt Processed!\nAdded to group.\nTotal: {exp.get('amount', 0)}\nItems: {len(exp.get('items', []))}")
async def g_expense(update: Update, context: ContextTypes.DEFAULT_TYPE):
args = context.args
if len(args) < 3:
await update.message.reply_text("Usage: /g_expense <group_name> <amount> <description>\nExample: /g_expense 'Trip' 50 'Lunch'")
return
group_name = args[0]
try:
amount = float(args[1])
except ValueError:
await update.message.reply_text("Invalid amount. Please use a number.")
return
description = " ".join(args[2:])
res = api.post_group_expense(update.effective_user.id, group_name, amount, description)
if "error" in res:
await update.message.reply_text(f"Error: {res['error']}")
else:
exp = res.get("expense", {})
await update.message.reply_text(f"Added Group Expense to '{group_name}':\n{exp.get('description', description)} - {exp.get('amount', amount)}")
if __name__ == '__main__':
if not TOKEN:
print("Error: TELEGRAM_BOT_TOKEN not found in .env")
exit(1)
application = ApplicationBuilder().token(TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("groups", groups))
application.add_handler(CommandHandler("friends", friends))
application.add_handler(CommandHandler("stats", stats))
application.add_handler(CommandHandler("dues", dues))
application.add_handler(CommandHandler("g_expense", g_expense))
# Text handler for personal expenses
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text_expense))
# Photo handler
application.add_handler(MessageHandler(filters.PHOTO, handle_photo))
# Callback handler
application.add_handler(CallbackQueryHandler(button_callback))
print("Bot is running...")
application.run_polling()