forked from hieunguyent12/telemetry_processor
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.cpp
More file actions
135 lines (102 loc) · 4.01 KB
/
main.cpp
File metadata and controls
135 lines (102 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <iostream>
#include <string>
#include "sockpp/tcp_acceptor.h"
#include "sockpp/stream_socket.h"
#include "SensorData.h"
#include "SensorRules.h"
#include "cparse/shunting-yard.h"
using namespace std;
// TODO: call this function on the data
SensorData handleExpression(SensorData &data, SensorRules &rules) {
cparse::calculator c1(rules.expr); // Create calculator based on expression
cparse::TokenMap vars;
vars["value"] = data.value;
vars["previous_value"] = rules.prevValue;
rules.prevValue = data.value; // Set new previous value
cparse::packToken temp;
temp = c1.eval(vars);
int outputValue = temp.asInt(); // Store calculator output
time_t currTimeTemp = std::time(nullptr); // Get current time
int currTime = static_cast<int>(currTimeTemp);
SensorData output(currTime, rules.labelOut, rules.unitsOut, outputValue); // Create output sensor data object
return output;
}
std::string bufferToString(char* buffer, int bufflen)
{
std::string ret(buffer, bufflen);
return ret;
}
void process_data(sockpp::tcp_socket sock) {
char buf[512];
fd_set readfds;
int fd_max;
struct timeval tv;
// This while loop will keep the socket alive as long as the client is still connected
// the "select" function will notify us when new data is available on the socket for us to consume
while (true) {
FD_ZERO(&readfds);
FD_SET(sock.handle(), &readfds);
fd_max = sock.handle();
tv.tv_sec = 10;
tv.tv_usec = 500000;
int rv = select(fd_max + 1, &readfds, NULL, NULL, &tv);
if (rv == -1) {
perror("select");
} else if (rv == 0) {
// timeout
printf("Timeout occurred! No data after 10.5 seconds.\n");
}
else {
if (FD_ISSET(sock.handle(), &readfds)) {
cout << "there is new data available!!" << endl;
// if sock.read(...) returns 0, that means the client has disconnected
if (sock.read(buf, sizeof(buf)) == 0) {
break;
}
// TODO: figure out a more efficient way to parse the buffer
int length = int((unsigned char)(buf[0]) << 24 |
(unsigned char)(buf[1]) << 16 |
(unsigned char)(buf[2]) << 8 |
(unsigned char)(buf[3]));
vector<char> data;
// I don't really know why we have to add 3 here, but it works
char new_data[length + 3];
// starting from index 4, we get all elements from the buffer until we reach the `length`
for (int i = 4; i <= length + 3; i++) {
data.push_back(buf[i]);
}
std::copy(data.begin(), data.end(), new_data);
string data_string = bufferToString(new_data, sizeof(new_data));
data_string = data_string.substr(0, data_string.size() - 3);
cout << "Data: " << data_string << endl;
cout << "Length: " << length << endl;
}
}
}
cout << "Connection closed from " << sock.peer_address() << endl;
}
int main(int argc, char* argv[]) {
cout << "Telemetry Processor 🚀" << endl;
in_port_t port = 9000;
sockpp::initialize();
sockpp::tcp_acceptor acc(port);
// TODO: put socket data into processing queue
vector<int> processing_queue;
if (!acc) {
cerr << "Error creating server: " << acc.last_error_str() << endl;
return 1;
}
cout << "Listening on port " << port << "..." << endl;
while (true) {
sockpp::inet_address peer;
sockpp::tcp_socket sock = acc.accept(&peer);
cout << "Received a connection request from " << peer << endl;
if (!sock) {
cerr << "Error accepting incoming connection: "
<< acc.last_error_str() << endl;
}
else {
process_data(std::move(sock));
}
}
}