-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
118 lines (107 loc) · 4.34 KB
/
main.py
File metadata and controls
118 lines (107 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
from typing import Tuple
from src.definitions import INPUT_FILES_DIR, DATABASE_ENV
from src.etl import ETL
from src.sources.data_source import DataSource
from src.sources.file_data_source import FileDataSource
from src.sources.simulation_data_source import SimulationDataSource
from src.sinks.data_sink import DataSink
from src.sinks.console_data_sink import ConsoleDataSink
from src.sinks.postgresql_data_sink import PostgreSQLDataSink
def main() -> None:
quit_ = False
while not quit_:
clear()
print("1. Start ETL")
print("2. Exit")
print(">> ", end='')
option = input()
if option == '1':
source_cls, source_args = select_source()
sink_cls, sink_args = select_sink()
print("Launching ETL process...")
ETL().source(source_cls, *source_args).sink(sink_cls, *sink_args).run()
print("ETL process has finished. Press 'Enter' to continue...")
input()
elif option == '2':
print("Quiting...")
quit_ = True
else:
print("Invalid input. Press 'Enter' to continue...")
input()
def select_source() -> Tuple[DataSource, list]:
source_cls, source_args = None, []
valid = False
while not valid:
clear()
print("Select a data source class:")
print("1: Simulation")
print("2: File")
print(">> ", end='')
option = input()
if option == '1':
source_cls = SimulationDataSource
valid = True
elif option == '2':
source_cls = FileDataSource
print("'source filepath':")
print(f">> {INPUT_FILES_DIR}/", end='')
source_args.append(os.path.join(INPUT_FILES_DIR, input()))
print("'chunk size': (default is 256)")
print(">> ", end='')
chunk_size = input()
if len(chunk_size.strip()): # user has entered custom chunk size
source_args.append(int(chunk_size))
valid = True
else:
print("Invalid input. Press 'Enter' to continue...")
input()
return source_cls, source_args
def select_sink() -> Tuple[DataSink, list]:
sink_cls, sink_args = None, []
valid = False
while not valid:
clear()
print("Select a data sink class:")
print("1: Console")
print("2: PostgreSQL")
print(">> ", end='')
option = input()
if option == '1':
sink_cls = ConsoleDataSink
print("'output format': (default is 'key: {} | value: {} | ts: {}')")
print(">> ", end='')
output_format = input()
sink_args.append(output_format if len(output_format.strip()) else "key: {} | value: {} | ts: {}")
valid = True
elif option == '2':
sink_cls = PostgreSQLDataSink
print(f"'DB name': (default is '{DATABASE_ENV['POSTGRES_DB']}')")
print(f">> ", end='')
dbname = input()
sink_args.append(dbname.strip() if len(dbname.strip()) else DATABASE_ENV["POSTGRES_DB"])
print(f"'DB user': (default is '{DATABASE_ENV['POSTGRES_USER']}')")
print(f">> ", end='')
dbuser = input()
sink_args.append(dbuser.strip() if len(dbuser.strip()) else DATABASE_ENV["POSTGRES_USER"])
print(f"'DB password': (default is '{DATABASE_ENV['POSTGRES_PASSWORD']}')")
print(f">> ", end='')
dbpassword = input()
sink_args.append(dbpassword.strip() if len(dbpassword.strip()) else DATABASE_ENV["POSTGRES_PASSWORD"])
print(f"'DB host': (default is '{DATABASE_ENV['POSTGRES_HOST']}')")
print(f">> ", end='')
dbhost = input()
sink_args.append(dbhost.strip() if len(dbhost.strip()) else DATABASE_ENV["POSTGRES_HOST"])
print(f"'DB port': (default is '{DATABASE_ENV['POSTGRES_PORT']}')")
print(f">> ", end='')
dbport = input()
sink_args.append(dbport.strip() if len(dbport.strip()) else DATABASE_ENV["POSTGRES_PORT"])
valid = True
else:
print("Invalid input. Press 'Enter' to continue...")
input()
return sink_cls, sink_args
def clear() -> None:
os.system("cls" if os.name == "nt" else "clear")
if __name__ == "__main__":
main()