#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config_servers.h" #include "darray.h" #include "buf.h" #include "mq_cmd.h" /* no defence programming, no error checking, no argument checking just PoC nothing else */ #define MQ_MSG_SIZE 8192 /* return fd!=ifconnection there */ int irc_connect( char *hostname, char *port ) { int fd=0; struct addrinfo serv, *res; memset(&serv, 0, sizeof(serv)); serv.ai_family = AF_INET; serv.ai_socktype = SOCK_STREAM; getaddrinfo(hostname, port, &serv, &res); fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); connect(fd, res->ai_addr, res->ai_addrlen); return fd; } char *create_mq_cmd(char *out, size_t out_sz, int id, char *cmd, size_t cmd_sz, char *payload, size_t p_sz) { char *ret = out; int n,i; n = snprintf(out, out_sz, ":%d:%s ", id, cmd); if (n < 0) { return NULL; } for (i=0;iid = id; snprintf(name, name_size, MQ_PREFIX"%d-in",id); /*create with default configs*/ mq->mq_in = mq_open(name, O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_in == -1) { perror("mq_open"); return -1; } snprintf(name, name_size, MQ_PREFIX"%d-out",id); /*create with default configs*/ mq->mq_out = mq_open(name, O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_out == -1) { perror("mq_open"); return -1; } return 0; } /* return number if there is something to read */ int mq_ntf_select(mq_ntf_mdt *mq) { return 0; } /* read from input queue */ int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size) { struct mq_attr attr; char *msg; size_t bytes; unsigned int prio = 10; int fret; fret = mq_getattr(mq->mq_in, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d in full", mq->id); return -1; } bytes = mq_receive(mq->mq_in, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, &prio); if (bytes == -1) { perror("mq_receive read in"); return -1; } return 0; } /* read from output queue */ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) { struct mq_attr attr; char *msg; size_t bytes; unsigned int prio = 10; int fret; fret = mq_getattr(mq->mq_out, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d out full", mq->id); return -1; } bytes = mq_receive(mq->mq_out, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, &prio); if (bytes == -1) { perror("mq_receive read out"); return -1; } return 0; } /* write to output quque */ int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size) { struct mq_attr attr; char *msg; size_t bytes; unsigned int prio = 10; int fret; fret = mq_getattr(mq->mq_out, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d out full", mq->id); return -1; } bytes = mq_send(mq->mq_out, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, prio); if (bytes == -1) { perror("mq_send"); return -1; } return 0; } /* write to input quque */ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size) { struct mq_attr attr; char *msg; size_t bytes; unsigned int prio = 10; int fret; fret = mq_getattr(mq->mq_in, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d in full", mq->id); return -1; } bytes = mq_send(mq->mq_in, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, prio); if (bytes == -1) { perror("mq_send"); return -1; } return 0; } /* drain all messages from quque */ int mq_ntf_drain(mq_ntf_mdt *mq) { int i,j; struct mq_attr curattr; char *buf; ssize_t bytes; int prio = 10; mqd_t mqlist[2] = {mq->mq_in, mq->mq_out}; for (i=0;i<2;i++) { mq_getattr(mqlist[i], &curattr); buf = malloc(curattr.mq_maxmsg); for (j=0; jid; snprintf(name, name_size, MQ_PREFIX"%d-in",id); mq_unlink(name); snprintf(name, name_size, MQ_PREFIX"%d-out",id); mq_unlink(name); return 0; } //send command int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd) { return 0; } //recieve command from other end int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd) { return 0; } /* |--------|<--IN---|---------| | SERVER | | MANAGER | |--------|--OUT-->|---------| */ #define STACK_SIZE (1024*16) /* not supposed to be changed. */ typedef struct server_cfg { /* thread params */ int tid; void *stack; //atomic_int running; _Atomic int running; mq_ntf_mdt *mq; /* irc server config */ char *user; char *password; char *server; char **channels; int port; int ssl; } server_cfg; /* server_cfg struct as input */ #define TH_STATE_INIT 0 #define TH_STATE_START 1 #define TH_STATE_LISTEN_IN 2 #define TH_STATE_LISTEN_OUT 3 #define TH_STATE_SEND_IN 4 #define TH_STATE_SEND_OUT 5 #define TH_STATE_TERMINATION 6 #define TH_STATE_EXIT 7 int th_start_client(void *data) { int cmd_id = 1; int ret = 0; char cmd_buf[MQ_MSG_SIZE]; mq_cmd *cmd=NULL; server_cfg *cfg = data; mq_ntf_mdt *mq = cfg->mq; atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); sleep(10); //send command wait for response cmd = CMD_CREATE(cmd_id,"PING","NOPARAM"); if (cmd != NULL) { if (ret = mq_ntf_write_out(mq,cmd->buf,cmd->sz) == -1) { printf("Couldnt send command\n"); } } cmd_id += 1; printf("End client\n"); atomic_fetch_sub( &cfg->running,1); return 0; } #define EVENT_HND_STACK_SIZE (16*1024) typedef struct event_handler_cfg { /* thread params*/ void *stack; mq_ntf_mdt *mq_listen; int mq_num; _Atomic int running; } event_handler_cfg; /* server_cfg struct as input */ #define EH_STATE_INIT 0 #define EH_STATE_START 1 #define EH_STATE_LISTEN_IN 2 #define EH_STATE_LISTEN_OUT 3 #define EH_STATE_SEND_IN 4 #define EH_STATE_SEND_OUT 5 #define EH_STATE_TERMINATION 6 #define EH_STATE_EXIT 7 /* Thread to reacieve messages and return them back */ int th_event_manager(void *data) { int state; int i; event_handler_cfg *cfg = data; atomic_fetch_add(&cfg->running,1); const int buf_size=MQ_MSG_SIZE; char buf[buf_size]; int ret; //read any command mq_cmd *cmd = NULL; printf("Start event thread\n"); state = EH_STATE_INIT; while (state != EH_STATE_EXIT) switch (state) { case EH_STATE_INIT: printf("TH STATE INIT\n"); sleep(1); state = TH_STATE_START; break; case EH_STATE_START: printf("TH STATE START\n"); sleep(1); state = TH_STATE_EXIT; break; case EH_STATE_EXIT: printf("TH STATE EXIT\n"); sleep(1); state = TH_STATE_EXIT; break; default: printf("Wrong state\n"); } printf("End event thread\n"); atomic_fetch_sub( &cfg->running,1); return 0; } int main(int argc, char **argv) { int i; int cnt_servers,cnt_running; server_cfg *cfg_list; event_handler_cfg *evhnd_cfg; mq_ntf_mdt *mq_array; /* Load configuration */ cnt_servers = SIZEOF_SERVER_LIST; for (i=0;itid = i; srvc->stack = malloc(STACK_SIZE); //NULL will brake everything ;) srvc->user = isrvc->user; srvc->password = isrvc->password; srvc->server = isrvc->server; srvc->channels = isrvc->channels; srvc->port = isrvc->port; srvc->ssl = isrvc->ssl; //atomic_init( &srvc->running, 1); atomic_store(&srvc->running, 0); /* initalise posix mq */ if (0 != mq_ntf_open(&mq_array[i], i)) { printf("Couldnt open mq_ntf_open\n"); } srvc->mq = &mq_array[i]; /* clone new proc */ clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc); } /* event handler thread */ evhnd_cfg = malloc(sizeof(event_handler_cfg)); memset(evhnd_cfg, 0, sizeof(event_handler_cfg)); evhnd_cfg->stack = malloc(EVENT_HND_STACK_SIZE); atomic_store(&evhnd_cfg->running, 0); evhnd_cfg->mq_num = cnt_servers; evhnd_cfg->mq_listen = mq_array; clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg); /* run until all threads are up */ cnt_running = 1; while(cnt_running != 0) { //printf("Count\n"); /*count how many proceses is running there*/ int val; cnt_running = 0; for (i=0;i