-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlTime.py
More file actions
339 lines (315 loc) · 12.9 KB
/
sqlTime.py
File metadata and controls
339 lines (315 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import getpass
import os
import readCsv
import time
import mysql.connector
from prettytable import PrettyTable
# attempts to login to the mysql database
def login():
os.system('clear')
password = getpass.getpass("Enter your password: ") # get user to enter the password
try:
db_config = {
"host": "localhost",
"user": "me",
"password": password,
"database": "dswork",
"charset" :"utf8mb4"
}
connection = mysql.connector.connect(**db_config) # connect to mysql
except mysql.connector.Error as e:
print("Error connecting to MySQL: " + str(e))
print("Please restart the application and try again")
else:
print("Connected to MySQL")
return connection
# takes a list of sql statements and runs them 1 at a time
def execute_sql_statements(cursor, sql_statements):
for sql_statement in sql_statements:
if sql_statement.strip(): # Check if the statement is not empty
cursor.execute(sql_statement)
row = cursor.fetchone()
while row is not None: # if we somehow get a return
print(row)
row = cursor.fetchone()
for result in cursor.stored_results(): #just in case
for row in result.fetchall():
print(row)
# loads in sql scripts for tables, procedures triggers
# then loads the data and calls the menu for the user to use
def driver():
connection = login()
if connection:
cursor = connection.cursor()
delTables = "sql/delTables.sql"
tables = "sql/nobelTables.sql"
delprocs = "sql/delProc.sql"
procedure = "sql/procedures.sql"
trigger = "sql/triggers.sql"
views = "sql/views.sql"
delop = "sql/delOps.sql"
indexes = "sql/indexes.sql"
fileLoader(cursor,delTables)
fileLoader(cursor,tables)
fileLoader(cursor, delprocs)
loadMultiStatementFiles(cursor,trigger)
loadMultiStatementFiles(cursor, procedure)
fileLoader(cursor,views)
fileLoader(cursor,delop)
fileLoader(cursor,indexes)
readCsv.enterData(cursor)
question(cursor)
connection.commit()
cursor.close()
connection.close()
# read a sql script into a list and exports to the execute func
def fileLoader(cursor, sourceFile):
with open(sourceFile, 'r') as file:
sqlCommand = file.read()
sql_statements = sqlCommand.split(';') # split on a semi colon for end of query
for sql_statement in sql_statements:
if sql_statement.strip(): # Check if the statement is not empty
sql_statement=sql_statement+";" # add back the semi colon cause spliting on the semi colon means its not included
execute_sql_statements(cursor, sql_statements)
# read a sql script into a list and exports to the execute func
# but for sql scripts that have a multi statement
def loadMultiStatementFiles(cursor,sourceFile):
with open (sourceFile,'r') as file:
sqlCommand = file.read()
sql_statements = sqlCommand.split('-- end here') # split on this so that commands with multiple ; are not broken apart
execute_sql_statements(cursor, sql_statements)
# displays a menu to the user
# takes a user input and runs a question
# or does a database operation
def question(cursor):
os.system('clear')
questions ="sql/questions.sql"
with open (questions,'r') as file:
sqlCommand = file.read() #read in the question queries
sql_statements = sqlCommand.split('-- ends here')
run = True
while run:
print("""Select a query to run:
1: How many physics awards have been given out
2: The different categories of the nobel prize
3: Average age of a nobel laureate
4: Amount of women who have won a nobel prize per category
5: Rankings of the top ten affiliates
6: Organisations by category
7: Summary of nobel prizes for the last year of awards
8: Youngest and oldest nobel prize winner by cat
9: All current living nobel prize winners
10: Top five winners count
11: Database Operations
x : to exit""") # print menu to the user
sel = input("Selection: ") # prompt for user to input selection
if(numChecker(sel)): # validate input
sel = int(sel)
if(sel != 11):
os.system('clear')
sel = int(sel) -1
if (sel != 0):
cursor.execute(sql_statements[sel]) # execute the chose question
result = cursor.fetchall()
headers = [header[0] for header in cursor.description] # use pretty tables to display results nicely
tables = PrettyTable(headers)
for row in result:
tables.add_row(row)
print(tables)
time.sleep(1) # add a litte delay
else:
outVal = None
cursor.execute("CALL CountPhysicsAwards(@physicsAwardCount);") # yes is yucky but it works
cursor.execute("SELECT @physicsAwardCount;") # get the return val
outVal = cursor.fetchone()[0]
table = PrettyTable() # display results
table.field_names =["Physics Award Count"]
table.add_row([outVal])
table.align["Physics Award Count"] = "1"
print(table)
else: # user wihses to perform database operations
os.system('clear')
dbOps(cursor)
elif(sel == 'x'): #user has chosen to exit
run = False
else:
os.system('clear')
print("Invalid Input")
# was originally designed to allow the user to interact
# with all the tables but this proved to ambitious
# code has been left commented out for later versions to hopefully be re implemented
# just calls the tableOps function using the individual table
def dbOps(cursor):
# cursor.execute("""SELECT TABLE_NAME
# FROM information_schema.tables
# WHERE TABLE_SCHEMA = 'dswork' -- Replace with your actual database name
# AND TABLE_TYPE = 'BASE TABLE';
# """)
# result = cursor.fetchall()
# run = True
# while run:
# print("Select a table to modify: ")
# for table in result:
# print(table[0])
# sel = input("Enter the table you want to operate with: ")
# tableNames = [table[0] for table in result]
# if sel in tableNames:
tableOps(cursor, 'Individual')
# elif(sel == 'x'):
# print("returning to previous menu")
# run = False
# else:
# print("Invalid Selection try again")
# time.sleep(2)
# run= False
# prompts the user for an input from 1-3 to pick there databse operation on the table
def tableOps(cursor, tableName):
print("""What would you like to do to the indivdual table:
1. Insert
2. Delete
3. Update""") # display options to the user
sel = input("Selection: ") # get the users input for selection
try:
sel =int(sel) # atempt to cast to int
if(1<= sel <= 3): # check input within range for the valid options
if sel == 1:
tabInsert(cursor,tableName)
elif sel == 2:
tabDel(cursor,tableName)
elif sel == 3:
tabUp(cursor,tableName)
else:
raise ValueError("Value must be between 1-3")
except Exception as e:
print(f"Error: {e}")
# takes user inputs for a new entry into the table and inserts it
def tabInsert(cursor,tableName):
vals =[]
try:
cursor.execute(f"DESC {tableName}") # get table columns
columns = cursor.fetchall()
print(F"Insert values into {tableName}")
for column in columns:
print("Enter value")
sel = input(f"{column[0]}: ") # print the column name to indicate to user what value to provide for insertion
vals.append(sel)
vals =readCsv.replaceEmptyWithNull(vals) #convert empty strings to None value
proc = getIns(tableName)
cursor.callproc('insertRecipient',(vals[0],'I')) # insert
cursor.callproc(str(proc),vals)
print(f"{vals[1]} has been insert Successfull\n")
except mysql.connector.Error as e:
print(f"A SQL error occured {e}")
except Exception as e:
print(f"An error occured {e}")
# takes a user input id to delete the matching entry in the table
def tabDel(cursor, tableName):
vals=[]
primaryKeys = getPrimaryKeys(cursor,tableName) # get the primary key of the table were deleting from
try:
for key in primaryKeys:
print(f'Enter: {key}') # promt the user to enter value for the primary key attributes so that the entry can be selected
vals.append(input())
proc = getDel(tableName)
statement = "SELECT Name FROM Individual WHERE ID = %s" #find entry if exist and get name
cursor.execute(statement,(vals[0],))
result = cursor.fetchall()
if result: # if entry found delete
cursor.callproc(proc,vals)
print(f"{result} has been deleted")
else: # if not found display error message
print(f"No entry found with {primaryKeys} = {vals}. No Entry to delete")
except mysql.connector.Error as e:
print(f"Error connecting to MySQL: {e}\n try again")
except Exception as e:
print(f"Error {e}\n try again")
# updates a created test value using the user input
def tabUp(cursor, tableName):
# create test entry
values = (9999, "John Smith", "M", "1901-10-10", "2000-10-10", "Perth", "Australia", "Oceania", "Perth", "Australia", "Oceania")
cursor.callproc('insertRecipient', (9999, 'I'))
cursor.callproc('insertInd', values) #insert test entry
cursor.execute("SELECT * FROM Individual WHERE ID = 9999") # display test entry to the user
result = cursor.fetchall()
headers = [header[0] for header in cursor.description]
tables = PrettyTable(headers)
for row in result:
tables.add_row(row)
print("Individual before update\n")
print(tables)
try:
newName = input("Enter a new name: ") # get new name to update
UPDATEq = "UPDATE Individual SET Name = %s WHERE ID = 9999;" # update entry
cursor.execute(UPDATEq, (newName,))
cursor.execute("SELECT * FROM Individual WHERE ID = 9999")
result = cursor.fetchall()
headers = [header[0] for header in cursor.description] # dispay the entry after update to the user
tables = PrettyTable(headers)
for row in result:
tables.add_row(row)
print("Individual after update \n")
print(tables)
print("Many changes much wow")
except mysql.connector.Error as e:
print("Error connecting to MySQL: " + str(e))
except Exception as e:
print(f"Unknown error occurred! {e}")
cursor.execute("DELETE FROM Recipient WHERE ID = 9999;") # remove test entry once done
# get the matching insert Procedure for the table
def getIns(tableName):
proc = ""
if(tableName == "Recipient"):
proc = 'insertRecipient'
elif(tableName == "Organisation"):
proc = 'insertOrg'
elif(tableName == "Individual"):
proc = 'insertInd'
elif(tableName == "Affiliate"):
proc = 'insertAff'
elif(tableName == "AffiliatedTo"):
proc = 'insertAffTo'
elif(tableName == "Prize"):
proc = 'insertPrize'
elif(tableName == "AwardedTo"):
proc = 'insertAwardTo'
return proc
# gets the matching del procedure for the table
def getDel(tableName):
proc =''
if(tableName == "Recipient"):
proc = 'DeleteRecipient'
elif(tableName == "Organisation"):
proc = 'DeleteOrganisation'
elif(tableName == "Individual"):
proc = 'DeleteOrganisation'
elif(tableName == "Affiliate"):
proc = 'DeleteAffiliate'
elif(tableName == "Prize"):
proc = 'DeletePrize'
return proc
# get the primary keys of the passed table
def getPrimaryKeys(cursor,tablename):
query =f""" SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = 'dswork'
AND TABLE_NAME = '{tablename}'
AND CONSTRAINT_NAME = 'PRIMARY';
"""
cursor.execute(query)
result = cursor.fetchall()
prmyKey = [row[0] for row in result]
return prmyKey
# checks that the passsed value is between 1-11
def numChecker(sel):
try:
sel = int(sel)
if 1 <= sel <= 11:
return True
else:
return False
except ValueError:
return False
def main():
driver()
if __name__== "__main__":
main()