-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathClientHandler.java
More file actions
112 lines (97 loc) · 4.71 KB
/
ClientHandler.java
File metadata and controls
112 lines (97 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
public class ClientHandler implements Runnable {
private Socket clientSocket;
private BlockingQueue<Packet> queue;
private Slave1 slave1;
private Slave2 slave2;
private int counter1 = 0; // shared resource for queue1
private int counter2 = 0; // shared resource for queue2
// Client handler accepts the socket, a queue, and the two slave threads from main
public ClientHandler(Socket clientSocket, BlockingQueue<Packet> queue) {
this.clientSocket = clientSocket;
this.queue = queue;
// this.slave1 = slave1;
// this.slave2 = slave2;
}
@Override
public void run() {
try (
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)
) {
String input; // the packet that is received from the client, in string form
while ((input = in.readLine()) != null) {
Packet receivedPacket = Packet.fromString(input); // convert packet string to a packet
queue.put(receivedPacket); // add packet to shared queue of ALL packets
System.out.println("Received: " + receivedPacket.toString());
Operation op = receivedPacket.getOperation();
int id = receivedPacket.getId();
out.println("Packet #" + id + ", (" + op + ") acknowledged by master.");
// Decide where to put the packet based on the current counters before updating them
synchronized (this) {
// display the counters
System.out.println("Counter for queue 1: " + counter1);
System.out.println("Counter for queue 2: " + counter2);
switch (op) {
case JOB_1:
System.out.println("Received a packet with job 1. Placing it on a slave...");
if ((counter1 - counter2) >= 6) {
// Send to slave 2 if counts1 is significantly higher
slave2.getQueue().put(receivedPacket);
System.out.println("Packet sent to slave 2");
counter2 += 10;
} else {
// Otherwise, send to slave 1
slave1.getQueue().put(receivedPacket);
System.out.println("Packet sent to slave 1");
counter1 += 2;
}
break;
case JOB_2:
System.out.println("Received a packet with job 2. Placing it on a slave...");
if ((counter2 - counter1) >= 6) {
// Send to slave 1 if counts2 is significantly higher
slave1.getQueue().put(receivedPacket);
System.out.println("Packet sent to slave 1");
counter1 += 10;
} else {
// Otherwise, send to slave 2
slave2.getQueue().put(receivedPacket);
System.out.println("Packet sent to slave 2");
counter2 += 2;
}
break;
}
// Send back the confirmation to the client
out.println(op + " completed by slaves for packet #" + id);
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Client disconnected.");
}
}
// Synchronized method to update counters after job completion
public synchronized void updateCounterAfterCompletion(Operation op, int slave) {
if (op == Operation.JOB_1) {
if (slave == 1)
counter1 -= 2; // Decrease counter after JOB_1 is completed
else if (slave == 2)
counter2 -= 10;
} else if (op == Operation.JOB_2) {
if (slave == 1)
counter1 -= 10; // Decrease counter after JOB_2 is completed
else if (slave == 2)
counter2 -= 2;
}
// System.out.println("Updated counters: Counter 1 = " + counter1 + ", Counter 2 = " + counter2);
}
}