Skip to content

Commit

Permalink
feat: added master-child arch
Browse files Browse the repository at this point in the history
  • Loading branch information
AnsahMohammad committed May 1, 2024
1 parent 81834d0 commit eeded97
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 0 deletions.
Empty file added cloud/__init__.py
Empty file.
84 changes: 84 additions & 0 deletions cloud/child.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Child.py
import socket
import threading
from logger import Logger

class Child:
def __init__(self, server_host="0.0.0.0", server_port=9999):
self.server_host = server_host
self.server_port = server_port

self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.running = False
self.thread = None

self.logger = Logger(show_logs=True, author="Ph-Child")
self.log = self.logger.log

self.log(f"Child initialized to {self.server_host}:{self.server_port}")

def connect(self):
self.running = True
self.client.connect((self.server_host, self.server_port))
self.log("connected to server", "<connect>")
self.thread = threading.Thread(target=self.listen_to_server)
self.thread.start()
self.log("thread started", "<connect>")

while self.running:
command = input("Enter command: ")
if command == "stop":
break
elif command == "send":
message = input("Enter message: ")
child.send(message)
else:
print("Invalid command")

self.log("closing connection", "<connect>")
self.close()

def status(self):
self.log(f"status : {self.running}")
message = "status,"+str(int(self.running))
self.client.send(message.encode())
self.log("Status uploaded")

def listen_to_server(self):
self.log("Listening to server")
self.client.settimeout(1)
while self.running:
try:
response = self.client.recv(1024)
self.log(f"Received: {response}")
if response == b"status":
self.status()
if response == b"stop":
self.log("close requested from server")
break
except socket.timeout:
continue

self.running = False
self.log("waiting to be closed")

def send(self, message):
message = str(message).encode()
self.client.send(message)
self.log(f"sent {message}")

def close(self):
self.log("closing connection...")
self.running = False
if self.thread is not None:
self.thread.join()

self.send("close,0")
self.client.close()
self.log("close client success")

child = Child(server_port=9999)
try:
child.connect()
except:
print("could not connect to server")
27 changes: 27 additions & 0 deletions cloud/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import time


class Logger:
def __init__(self, show_logs=False, author = None):
self.show_logs = show_logs
self.logs = []
self.author = author

def log(self, content, author=None, **kwargs):
author = self.author if author is None else author
log_ = f"{time.strftime('%H:%M:%S')} : "
if author:
log_ += f"{author} : "

log_ += f"{content} | {kwargs}"

self.logs.append(log_)
if self.show_logs:
print(log_)

def save(self, filename="logs.txt"):
with open(filename, "w") as f:
for log in self.logs:
f.write(log + "\n")
self.log("Logs saved to logs.txt", "Log")
self.logs.clear()
134 changes: 134 additions & 0 deletions cloud/master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Parent.py
import socket
import threading
from logger import Logger

# sudo lsof -ti :9999 | xargs --no-run-if-empty kill


class Server:
def __init__(self, host="0.0.0.0", port=9999, clients = 5):
self.host = host
self.port = port
self.num_clients = clients
self.running = False
self.server = None
self.nodes = []
self.clients = []
self.connection = None
self.logger = Logger(show_logs=True, author="Ph-Master")
self.log = self.logger.log

def handle_client(self, client_socket):
raddr = client_socket.getpeername()
self.log(f"listening to : {raddr[1]}", "<handle_client>")
while self.running:
request = client_socket.recv(1024)
self.log(f"{raddr[1]}: {request}", "<handle_client>")

request = request.decode().split(",")

action = request[0]
if action == "close":
self.log(f"{raddr[1]} request closure", "<handle_client>")
self._close_client(raddr[1])
break
else:
client_socket.send(b"ACK!")

client_socket.close()
self.log("Connection closed", "<handle_client>")

def _close_client(self, id):
self.log(f"Closing client {id}")
index = self.nodes.index(id)
self.nodes.pop(index)
self.clients.pop(index)

def status(self):
self.log("status requested", "<status>")
self.log(f"Nodes: {self.nodes}", "<status>")
for node in self.nodes:
self.log(f"Requesting status from {node}")
self.send_message("status", node)

def send_message(self, message, address):
self.log(f"Sending message to {address}")
index = self.nodes.index(address)
self.clients[index].send(message.encode())

def _broadcast(self, message):
self.log(f"broadcasting message : {message}")

for client in self.clients:
client.send(message.encode())

def run(self):
self.log("Starting the server", "<run>")
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((self.host, self.port))
self.server.listen(5)
self.server.settimeout(1)
self.log(f"accepting {self.num_clients} clients", "<run>")
self.log(f"Listening on port {self.port}", "<run>")

while self.running:
# print("<run()> : current status : ", self.running)
try:
client, addr = self.server.accept()
self.nodes.append(addr[1])
self.clients.append(client)
self.log(f"Accepted connection from: {addr[0]}:{addr[1]}")
client_handler = threading.Thread(target=self.handle_client, args=(client,))
client_handler.start()
except socket.timeout:
# self.log("Timeout", "<run>")
continue

self.log("server loop exit, waiting for closing", "<run>")

def start(self):
self.running = True
self.connection = threading.Thread(target=self.run)
self.log("Starting the connection ", "<start>")
self.connection.start()

while True:
command = input("Enter the command : ")
if command == "status":
self.status()
elif command == "broadcast":
msg = input("Enter the broadcast msg : ")
self._broadcast(msg)
elif command == "stop":
break
else:
print("Invalid command")

print("server stop issued", "<start>")
self.stop()


def stop(self):
print(self.nodes)
self.running = False
self.log("running => false", "<stop>")

if self.connection:
self.log("stopping connection", "<stop>")
self.connection.join()
self.log("connection closed", "<stop>")

if self.server:
self.log("server closing", "<stop>")
self.server.close()
self.log("server closed", "<stop>")

self.log("service stopped")

server = Server(port=9999)
try:
server.start()
except:
print("Error occured!, closing the server")
server.stop()

0 comments on commit eeded97

Please sign in to comment.