first commit

This commit is contained in:
theolem 2019-12-08 17:01:53 +01:00
commit 0f984b2c33
25 changed files with 592 additions and 0 deletions

109
$ Executable file
View File

@ -0,0 +1,109 @@
#include <stdio.h>
#include "master.h"
int MASTERID = 0;
int activeNodes[10]; //nodes active in the network. Assumes that all nodes have a unique id.
char *currentPrograms[100]; //current programs : array of char pointers, each is a program string.
void main()
{
initCurrentProgramsArray();
int newProgramIndex = manageUserInput();
printf("New program : %s \n", currentPrograms[newProgramIndex]);
/*
for(;;)
{
manageUserInput();
printf("Entrez le nom d'un programme : \n >> ") ;
scanf("%s", program );
printf(" %d ", sizeof(program));
addToCurrentPrograms(program);
//calls roundRobin to decide where it should be executed
//sends it over the network
//gets program return output and print
}*/
}
void initCurrentProgramsArray()
{
int i=0;
while(i<sizeof(currentPrograms)){
currentPrograms[i] = NULL;
i++;
}
}
// adds the program string to current program array, and returns the index where it was added.
int addToCurrentPrograms(char* newProgram){
int i=0;
while(i<sizeof(currentPrograms))
{
if(currentPrograms[i] == "-1"){
currentPrograms[i] = newProgram;
printf("Empty program at %d", i);
break;
}
i++;
}
return i ;
}
int commandSlaveNode(int programIndex)
{
return 1;
}
int roundRobin()
{
return 1;
}
void printProgramOutput(char programName[])
{
}
/*********************
* UTIL ARRAY FUNCTIONS
*********************/
int returnsSizeOfOccupiedArray(char array[]){
int i=0;
while(i<sizeof(array))
{
if(array[i]==NULL)
break;
i++;
}
return i;
}
// copies content of bigger array to smaller one, fitting everything it cans in the smaller array.
void copyToSmallerArray(char bigArray[], char smallArray[])
{
int i=0;
while(i<sizeof(smallArray))
{
smallArray[i]=bigArray[i];
i++;
}
}
int manageUserInput()
{
char program[50];
printf("Entrez un programme à exécuter : \n");
scanf("%s", program );
char smallarray[returnsSizeOfOccupiedArray(program)];
copyToSmallerArray(program, smallarray);
int index = addToCurrentPrograms(smallarray);
return index;
}

15
Makefile Executable file
View File

@ -0,0 +1,15 @@
MASTERNAME = master
SLAVENAME = slave
SRC_DIR = src
BUILD_DIR = build
all : $(BUILD_DIR)/$(MASTERNAME) $(BUILD_DIR)/$(SLAVENAME)
$(BUILD_DIR)/$(MASTERNAME) : $(SRC_DIR)/$(MASTERNAME)
gcc -o $(BUILD_DIR)/$(MASTERNAME) $(SRC_DIR)/$(MASTERNAME).c
$(BUILD_DIR)/$(SLAVENAME) : $(SRC_DIR)/$(SLAVENAME)
gcc -o $(BUILD_DIR)/$(SLAVENAME) $(SRC_DIR)/$(SLAVENAME).c
clean :
rm $(BUILD_DIR)/*

1
README.md Executable file
View File

@ -0,0 +1 @@
# PILS_load_balancer

1
TODO Executable file
View File

@ -0,0 +1 @@
* read/write to file instead of program variables for current programs, etc => better for recovery

2
auto.sh Executable file
View File

@ -0,0 +1,2 @@
make
./build/master

BIN
build/master Executable file

Binary file not shown.

BIN
build/slave Executable file

Binary file not shown.

14
cc Executable file
View File

@ -0,0 +1,14 @@
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello

2
dependencies Executable file
View File

@ -0,0 +1,2 @@
gcc
libc6-dev

0
files/program Executable file
View File

0
hello Executable file
View File

0
max] Executable file
View File

3
scripts/copy.sh Executable file
View File

@ -0,0 +1,3 @@
# copies code from master to all slaves
scp -r $PWD/../* theo@192.168.0.2:~Documents/recupPC/ recupPC
scp -r $PWD/../* theo@192.168.0.3:~Documents/recupPC/ recupPC

4
scripts/netconfig.sh Executable file
View File

@ -0,0 +1,4 @@
# add ip address to ethernet interface
ip addr add 192.168.0.$1 dev eth0
ip link set eth0 up

6
scripts/wait.sh Executable file
View File

@ -0,0 +1,6 @@
echo "Testing..."
for var in $@
do
sleep $var
done
echo "Done."

14
scripts/wait_parallel.sh Executable file
View File

@ -0,0 +1,14 @@
echo "Testing..."
SLEEPMAX=1
for var in $@
do
sleep $var &
if (( $var > $SLEEPMAX ));
then
SLEEPMAX=$var
fi
done
sleep "$SLEEPMAX"
echo "Done."

Binary file not shown.

123
src/balancer.py Executable file
View File

@ -0,0 +1,123 @@
import socket
import threading
import logging
import time
import sys
from util import *
message_queue=[]
if len(sys.argv) > 1 :
args = sys.argv[1:]
for arg in args :
message_queue.append("prog : " + arg)
else :
message_queue=["prog : echo hello"]
print(message_queue)
connections_threads=[] # active clients
data_socks={}
HOST = "127.0.0.1"
PORT = 1235
data_port = input("Port data ?") ## port de donnée utilisé pour le premier utilisateur (ensuite incrémenté par 1)
if data_port == "":
data_port = 43222
else :
data_port = int(data_port)
nb_connections = 0
stop = 0
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
### SERVER THREADS
class thread_get_input(threading.Thread):
def __init__(self) :
threading.Thread.__init__(self)
def run(self) :
while True :
program = get_program_input()
if program != "" :
message_queue.append("prog : " + program)
if stop == 1 :
break
class thread_command_execution(threading.Thread) :
def __init__(self) :
threading.Thread.__init__(self)
self.round_robin_inc = 0
def run(self) :
try :
while True :
if len(connections_threads) != 0 and len(message_queue) != 0 :
index = self.round_robin_inc % len(connections_threads)
thread = connections_threads[index]
self.round_robin_inc = self.round_robin_inc + 1
program = message_queue.pop()
thread.command_program_execution(program)
if stop == 1 :
break
except KeyboardInterrupt :
self.s_data.close()
raise
class thread_handle_socket(threading.Thread):
def __init__(self) :
threading.Thread.__init__(self)
data_socks[data_port] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s_data = data_socks[data_port]
self.s_data.bind(('', data_port))
self.s_data.listen()
self.conn, self.addr = self.s_data.accept()
def command_program_execution(self, program):
self.conn.send(encode_message(program))
start_time = time.time()
data = self.conn.recv(1024)
end_time = time.time()
mtype, ret_val = get_message_type(decode_message(data))
print("Received " + str(ret_val) + " from " + str(self.addr[0]) + " in %s seconds ", end_time - start_time )
def run(self) :
while True :
time.sleep(1)
if stop == 1 :
break
## MAIN
# starts thread that reads input from user
thread_input = thread_get_input()
thread_input.start()
thread_exec = thread_command_execution()
thread_exec.start()
s.bind(('', PORT))
s.listen()
try :
while True :
conn, addr = s.accept()
print("Nouvelle machine connectée : " + addr[0])
#send data port to connected host
message = "prt : "+ str(data_port)
conn.sendall(encode_message(message))
# opens a new thread, which will open a data socket
connections_threads.append(thread_handle_socket())
connections_threads[nb_connections].start()
# increment it for the next one
data_port = data_port + 5;
nb_connections = nb_connections + 1
conn.close()
except KeyboardInterrupt :
s.close()
stop = 1
print(" Exiting...")
raise

86
src/client.py Executable file
View File

@ -0,0 +1,86 @@
import socket
import threading
import time
import sys
from util import *
received_message_queue = []
dataport = 0
PORT = 1235
stop = 0
empty_inc = 0 #inc var counting number of empty packets received
# server control socket
if len(sys.argv) > 1:
SERVER=sys.argv[1]
else :
SERVER = "127.0.0.1"
print("server address: " , SERVER);
### FUNCTIONS
def thread_execute_programs() :
while True :
if len(received_message_queue) != 0 :
print("handling message in thread")
message = received_message_queue.pop()
handle_message(message)
if stop == 1 :
break
time.sleep(1)
## MAIN CODE
if __name__ == "__main__" :
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((SERVER, PORT)) # connection to server. Crashes if refused.
# first receives port and closes connection to control socket...
data = s.recv(1024)
dataport = get_data_port(data)
# close connection
s.shutdown(socket.SHUT_RDWR)
s.close()
print("Trying to open connection to data socket... ")
# opens the new server socket
while True :
try :
s_data = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_data.connect((SERVER, dataport))
break
except ConnectionRefusedError :
print("retrying...")
time.sleep(1)
thread_exec = threading.Thread(target = thread_execute_programs, args=() )
thread_exec.start()
# then gets data
while True :
try :
print("Waiting for data from server...")
data = s_data.recv(1024)
if len(data) == 0 :
empty_inc = empty_inc + 1
else :
empty_inc = 0
if empty_inc > 10 :
raise KeyboardInterrupt
message = decode_message(data)
ret_val = handle_message(message)
if ret_val is not None :
s_data.send(str.encode(ret_val))
elif ret_val == "-1" :
print("Closing data socket...")
raise KeyboardInterrupt
except KeyboardInterrupt :
s_data.close()
stop = 1
print(" Exiting...")
raise

BIN
src/master Executable file

Binary file not shown.

110
src/master.c Executable file
View File

@ -0,0 +1,110 @@
#include <stdio.h>
#include <stdlib.h>
#include "master.h"
#define MASTERID = 0;
#define MAX_PROGRAM_NUMBER = 10;
#define MAX_PROGRAM_STR_LEN = 15;
int activeNodes[10]; //nodes active in the network. Assumes that all nodes have a unique id.
//char currentPrograms[MAX_PROGRAM_NUMBER][MAX_PROGRAM_STR_LEN]; //current programs : array of char pointers, each is a program string.
void main()
{
for(;;)
{
char program[50];
printf("Entrez un programme à exécuter : \n");
scanf("%s", program );
//calls roundRobin to decide where it should be executed
//sends it over the network
//gets program return output and print
}
}
/*
void initCurrentProgramsArray()
{
int i=0;
while(i<sizeof(currentPrograms)){
strcpy(currentPrograms[i], "-1");
i++;
}
}
void freeCurrentProgramsArray()
{
int i=0;
while(i<sizeof(currentPrograms)){
free(currentPrograms[i]);
i++;
}
}
// adds the program string to current program array, and returns the index where it was added.
int addToCurrentPrograms(char* newProgram){
int i=0;
while(i<sizeof(currentPrograms))
{
if(currentPrograms[i] == "-1"){
strcpy(currentPrograms[i], newProgram);
printf("Empty program at %d", i);
break;
}
i++;
}
return i ;
}
*/
int commandSlaveNode(int programIndex)
{
return 1;
}
int roundRobin()
{
return 1;
}
void printProgramOutput(char programName[])
{
}
/*********************
* UTIL ARRAY FUNCTIONS
*********************/
/*
int returnsSizeOfOccupiedArray(char array[]){
int i=0;
while(i<sizeof(array))
{
if(&array[i]==NULL)
break;
i++;
}
return i;
}
// copies content of bigger array to smaller one, fitting everything it cans in the smaller array.
void copyToSmallerArray(char bigArray[], char smallArray[])
{
int i=0;
while(i<sizeof(smallArray))
{
smallArray[i]=bigArray[i];
i++;
}
}
int manageUserInput()
{
char program[50];
printf("Entrez un programme à exécuter : \n");
scanf("%s", program );
int index = 0 ; // = addToCurrentPrograms...
return index;
}
*/

9
src/master.h Executable file
View File

@ -0,0 +1,9 @@
void initCurrentProgramsArray();
void freeCurrentProgramsArray();
int addToCurrentPrograms(char newProgram[]);
int commandSlaveNode(int programIndex);
int roundRobin();
void printProgramOutput(char programName[]);
int returnsSizeOfOccupiedArray(char array[]);
void copyToSmallerArray(char bigArray[], char smallArray[]);
int manageUserInput();

BIN
src/slave Executable file

Binary file not shown.

30
src/slave.c Executable file
View File

@ -0,0 +1,30 @@
#include <stdio.h>
int node_id ; // id of local node, passed as an argument from shell
void main()
{
// read id from command line
for(;;)
{
// receives message from master
// runs said program (already on local system)
// returns the return value to master
}
}
char getsMessageFromMaster()
{
// reads message from socket and returns a char[] or Message structure
}
int returnProgramNameFromMessage(char message[])
{
// returns the formatted program string to be passed to system call
}
int runProgram(char programName[], char args[])
{
// simple system call
return 1 ; // or any other return form. For the moment int.
}

63
src/util.py Executable file
View File

@ -0,0 +1,63 @@
import re
import subprocess
regexp = "(.*) : (.*)"
prog = re.compile(regexp)
def add_client(addr) :
connections.append(addr)
print(connections)
def del_client(addr) :
if addr in connections :
connections.remove(addr)
print(connections)
def get_program_input() :
message = input("Entrez un programme à exécuter...")
return message
def print_program_return(addr, ret_val) :
print("The program executed from " + str(addr) + " return the value " + str(ret_val))
def get_message_type(message) :
result=prog.findall(message)
if len(result) == 0 :
return None, None
return str(result[0][0]), str(result[0][1])
def get_data_port(message) :
clr_msg = decode_message(message)
mtype, mbody = get_message_type(clr_msg)
return int(mbody)
def encode_message(message) :
return str.encode(message)
def decode_message(raw_message) :
return raw_message.decode("utf-8")
def execute_program(program) :
to_exec = program # executer en tâche de fond.
ret_val = subprocess.call(program + "&", shell=True)
return "val : " + str(ret_val)
def handle_message(message):
mtype, mbody = get_message_type(message)
if mtype is None :
return None
if mtype == "prog" :
print("Received prog " + mbody )
ret_val = execute_program(mbody)
return ret_val
if mtype == "clo" :
return -1
elif mbody == "ret" :
print(message)
return None
def round_robin() :
roundrobin_inc = roundrobin_inc + 1
print("Roundrobin... inc = " + str(roundrobin_inc))
return roundrobin_inc