#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config_servers.h" #include "darray.h" #include "buf.h" #include "mq_cmd.h" #include "tbl_qcmd.h" #include "util.h" #include "sock_conn.h" #include "irc_parse.h" /* no defence programming, no error checking, no argument checking just PoC nothing else */ #define MQ_MSG_SIZE 8192 /* return unique ID, with atomic counter should work in all cases */ _Atomic int _glbl_id=0; int uniq_id() { int ret=-1,id; //what possible could go wrong? id = atomic_load(&_glbl_id); ret = id; id += 1; atomic_store(&_glbl_id, id); return ret; } //async io multiplexer typedef struct mq_ntf_io_mplx { struct timeval tv; fd_set io_fds; } mq_ntf_io_mplx; #define MQ_OUT 1 #define MQ_IN 2 /* supposed format agni-[id]-[in/out] in is for input of thread (send to in and thread will recieve), out if for output of thread (read from out to recieve from thread) */ #define MQ_PREFIX "/agni-" typedef struct mq_ntf_mdt { int id; mqd_t mq_in; mqd_t mq_out; mq_ntf_io_mplx io_mplx; //_Atomic int used; nado? } mq_ntf_mdt; /* return -1 on error */ int mq_ntf_open(mq_ntf_mdt *mq, int id) { const int name_size = 32; char name[name_size]; if (!mq) { PERM(); return -1; } mq->id = id; snprintf(name, name_size, MQ_PREFIX"%d-in",id); /*create with default configs*/ mq->mq_in = mq_open(name, O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_in == -1) { ENL(); perror("mq_open"); return -1; } snprintf(name, name_size, MQ_PREFIX"%d-out",id); /*create with default configs*/ mq->mq_out = mq_open(name, O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_out == -1) { perror("mq_open"); return -1; } PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out); return 0; } int mq_ntf_mplx(mq_ntf_mdt *mq, int msec) { fd_set io_fds; if (mq == NULL) { PERM(); return -1; } if (msec < 0) { PERM(); return -1; } //mq->io_mplx.tv.tv_sec = msec/1000; //mq->io_mplx.tv.tv_usec = (msec%1000)*1000; mq->io_mplx.tv.tv_sec = 1; mq->io_mplx.tv.tv_usec = 0; PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out); FD_ZERO(&io_fds); FD_SET(mq->mq_in, &io_fds);//in first FD_SET(mq->mq_out, &io_fds);//out second mq->io_mplx.io_fds = io_fds; return 0; } /* return number if there is something to read RETURN: -1 some error 0x00 nothing to read 0x01 something in queue */ //if mplx havent done, everything could blow up int mq_ntf_select(mq_ntf_mdt *mq, int dir) { int fdnum, fd; int ret=0; fd_set io_fds; struct timeval tv; if (mq == NULL) { PERM(); return -1; } //we should allways have just 2 descriptors to listen //io_fds = mq->io_mplx.io_fds; if (dir == MQ_IN) { fd = mq->mq_in; } else if (dir == MQ_OUT) { fd = mq->mq_out; } FD_ZERO(&io_fds); FD_SET(fd, &io_fds); tv.tv_sec = 1; tv.tv_usec = 0; fdnum = select(1, &io_fds, NULL, NULL, &tv ); if (fdnum == -1) { ENL(); return -1; } else { if (FD_ISSET(fd, &mq->io_mplx.io_fds)) { ret = 1; } } return ret; } int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size) { struct mq_attr attr; size_t bytes; mqd_t mqd; unsigned int prio = 10; if (dir == MQ_IN) { mqd = mq->mq_in; } else if (dir == MQ_OUT) { mqd = mq->mq_out; } else { perror("Wrong direction"); } mq_getattr(mqd, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d out full\n", mq->id); return -1; } bytes = mq_receive(mqd, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, &prio); if (bytes == -1) { perror("mq_receive read out"); return -1; } return bytes; } int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size) { struct mq_attr attr; size_t bytes; mqd_t mqd; unsigned int prio = 10; if (dir == MQ_IN) { mqd = mq->mq_in; } else if (dir == MQ_OUT) { mqd = mq->mq_out; } else { perror("Wrong direction"); } mq_getattr(mqd, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d out full\n", mq->id); return -1; } bytes = mq_send(mqd, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, prio); if (bytes == -1) { perror("mq_send"); return -1; } return bytes; } //dir - dirction 1 in 2 out int mq_ntf_count(mq_ntf_mdt *mq, int dir) { struct mq_attr attr; int mq_fd; if (mq == NULL) { PERM(); return -1; } if (dir == 1) { mq_fd = mq->mq_in; } else { mq_fd = mq->mq_out; } if (mq_getattr(mq_fd, &attr) != -1) { return attr.mq_curmsgs; } return -1; } /* close queue */ int mq_ntf_close(mq_ntf_mdt *mq) { const int name_size = 32; char name[name_size]; int id; id = mq->id; snprintf(name, name_size, MQ_PREFIX"%d-in",id); mq_unlink(name); snprintf(name, name_size, MQ_PREFIX"%d-out",id); mq_unlink(name); //clean file descriptors for io_mplx return 0; } //send command int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd) { return 0; } //recieve command from other end int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd) { return 0; } int mq_drain_q(mqd_t mqd) { struct mq_attr attr; char *buf = NULL; unsigned int prio = 0; int cnt_msg = 0; int num_read = 0; int i = 0; if (mq_getattr(mqd, &attr) == -1) { ENL(); return -1; } buf = malloc(attr.mq_msgsize); if (buf == NULL) { ENL(); return -1; } for (i=0; imq_in); if (err < 0) { PERM(); return -1; } printf("Drained in %d messages\n", err); err = mq_drain_q(mq->mq_out); if (err < 0) { PERM(); return -1; } printf("Drained out %d messages\n", err); return 0; } int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr) { struct mq_attr gattr; mqd_t mqdt; if (mq == NULL) { PERM(); return -1; } if (dir == MQ_OUT) { mqdt = mq->mq_out; } else if (dir == MQ_IN) { mqdt = mq->mq_in; } else { return -1; } if (mq_getattr(mqdt, &gattr) == -1) { ENL(); return -1; } memcpy(*attr,&gattr,sizeof(struct mq_attr)); return 0; } int send_mq_cmd(mq_ntf_mdt *mq, int io, int cmd_id, char *cmd, size_t cmd_sz, char *param, size_t param_sz) { int err = 0; mq_cmd *cmd_send = NULL; cmd_send = mq_cmd_create(cmd_id,cmd,cmd_sz,param,param_sz); if (cmd_send == NULL) { printf("Cant create command\n"); return -1; } err = -1; if (io == MQ_OUT) { err = mq_ntf_write(mq, MQ_OUT, cmd_send->buf, cmd_send->sz); } else if (io == MQ_IN) { err = mq_ntf_write(mq, MQ_IN, cmd_send->buf, cmd_send->sz); } else { printf("Unknown direction"); return -1; } if (err == -1) { printf("Couldnt recieve command\n"); mq_cmd_free(cmd_send); return -1; } mq_cmd_free(cmd_send); return 0; } int recv_mq_cmd(mq_ntf_mdt *mq, int io, mq_cmd **cmd) //return back recieved command { int err = 0; mq_cmd *cmd_recv = NULL; int recv_sz=-1; char *in_buf=NULL; in_buf = malloc(MQ_MSG_SIZE); if (in_buf == NULL) { ENL(); return -1; } err = -1; if (io == MQ_OUT) { err = mq_ntf_read(mq, MQ_OUT, in_buf, MQ_MSG_SIZE); } else if (io == MQ_IN) { err = mq_ntf_read(mq, MQ_IN, in_buf, MQ_MSG_SIZE); } else { printf("Unknown direction"); return -1; } if (err == -1) { printf("Couldnt recieve command\n"); free(in_buf); return -1; } recv_sz = err; //in err should be still the size of recieved buffer cmd_recv = mq_cmd_creates(in_buf, recv_sz, 0); if (cmd_recv == NULL) { printf("Cannot create cmd\n"); free(in_buf); return -1; } *cmd = cmd_recv; free(in_buf); return 0; } #define SEND_CMD_IN(MQ,ID,CMD,PARAM) \ send_mq_cmd(MQ,MQ_IN,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); #define SEND_CMD_OUT(MQ,ID,CMD,PARAM) \ send_mq_cmd(MQ,MQ_OUT,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); #define RECV_CMD_IN(MQ,CMD) \ recv_mq_cmd(MQ,MQ_IN,CMD); #define RECV_CMD_OUT(MQ,CMD) \ recv_mq_cmd(MQ,MQ_OUT,CMD); /* |--------|<--IN---|---------| | SERVER | | MANAGER | |--------|--OUT-->|---------| */ #define STACK_SIZE (1024*16) /* not supposed to be changed. */ typedef struct server_cfg { /* thread params */ int tid; void *stack; //atomic_int running; _Atomic int running; mq_ntf_mdt *mq; /* irc server config */ char *user; char *password; char *server; char **channels; int port; int ssl; } server_cfg; /* server_cfg struct as input */ #define TH_STATE_INIT 0 #define TH_STATE_START 1 #define TH_STATE_LISTEN_IN 2 #define TH_STATE_LISTEN_OUT 3 #define TH_STATE_SEND_IN 4 #define TH_STATE_SEND_OUT 5 #define TH_STATE_TERMINATION 6 #define TH_STATE_EXIT 7 int th_start_client(void *data) { int cmd_id = 1; int err; //char cmd_buf[MQ_MSG_SIZE]; //mq_cmd *cmd=NULL; int run; int mq_event; server_cfg *cfg = data; mq_ntf_mdt *mq = cfg->mq; mq_cmd *recv_cmd = NULL; // for recieved cmd from mq struct mq_attr out_attr, *ptr_out_attr=&out_attr; struct mq_attr in_attr, *ptr_in_attr=&in_attr; char *out_buf = NULL; char *in_buf = NULL; atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); sleep(1); //prepare message queue mq = cfg->mq; err = mq_ntf_mplx(mq, 10000); if (err == -1) { printf("Ups something whent wrong\n"); } if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { printf("Cant get attribute\n"); ENL(); } out_buf = malloc(out_attr.mq_msgsize); if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1) { printf("Cant get attribute\n"); ENL(); } in_buf = malloc(in_attr.mq_msgsize); //send command wait for response run = 1; while (run) { printf("Client loop tick\n"); mq_event = mq_ntf_select(mq, MQ_IN); switch(mq_event) { case 0: PRINT("EVENT 0\n"); break; case 1: PRINT("EVENT 1\n"); if (mq_ntf_read(mq, MQ_IN, in_buf, in_attr.mq_msgsize) == -1) { printf("Cant read input message \n"); } else { in_buf[in_attr.mq_msgsize-1] = 0x0; printf("Recieve %s\n", in_buf); } break; default: printf("Unknown event type\n"); } sleep(1); if (mq_event == 1) { PNL(); recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1); if (recv_cmd != NULL) { if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) { printf("QUIT recieved lets quit main loop\n"); break; } } } } printf("End client\n"); atomic_fetch_sub( &cfg->running,1); return 0; } #define EVENT_HND_STACK_SIZE (16*1024) typedef struct event_handler_cfg { /* thread params*/ void *stack; mq_ntf_mdt *mq_listen; int mq_num; _Atomic int running; } event_handler_cfg; /* server_cfg struct as input */ #define EH_STATE_INIT 0 #define EH_STATE_START 1 #define EH_STATE_LISTEN_IN 2 #define EH_STATE_LISTEN_OUT 3 #define EH_STATE_SEND_IN 4 #define EH_STATE_SEND_OUT 5 #define EH_STATE_TERMINATION 6 #define EH_STATE_EXIT 7 /* Thread to reacieve messages and return them back */ int th_event_manager(void *data) { event_handler_cfg *cfg = data; atomic_fetch_add(&cfg->running,1); mq_ntf_mdt *mq=NULL; char *out_buf = NULL; int run; int err; int mq_event; //read any command mq_cmd *recv_cmd = NULL; // for recieved cmd from mq struct mq_attr out_attr, *ptr_out_attr=&out_attr; printf("Start event thread\n"); mq = cfg->mq_listen; err = mq_ntf_mplx(mq, 10000); if (err == -1) { printf("Ups something whent wrong\n"); } //get mq attributes if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { printf("Cant get attribute\n"); } out_buf = malloc(out_attr.mq_msgsize); //maybe its not null printf("Start event loop\n"); run = 1; while(run) { //check if there is some message and save it to buffer run += 1; mq_event = mq_ntf_select(mq,MQ_OUT); switch(mq_event) { case 0: PRINT("EVENT 0\n"); break; case 1: PRINT("EVENT MQ_OUT\n"); if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1) { printf("Cant read output message\n"); } else { out_buf[out_attr.mq_msgsize-1] = 0x0; printf("Recieve %s\n", out_buf); } break; default: printf("Unknown event type\n"); } sleep(1); //if (run == 10) // break; //if QUIT then quit the thread if (mq_event == 1) { recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, -1); if (recv_cmd != NULL) { PNL(); if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) { printf("QUIT recieved lets quit main loop\n"); break; } if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD1") == 0) { printf("Hey dude it works\n"); } if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0) { printf("Hey dude it works second time\n"); } } } //applay to recieved command executor //other command pass to cmd/execution matching table //pass to exec handler //get response if command is immidieate otherwise wait for response in next execution } free(out_buf); printf("End event thread\n"); atomic_fetch_sub( &cfg->running,1); return 0; } int main(int argc, char **argv) { int i; int cnt_servers,cnt_running; server_cfg *cfg_list; event_handler_cfg *evhnd_cfg; mq_ntf_mdt *mq_array; /*set atomic variables to init value*/ atomic_store(&_glbl_id, 0); /* Load configuration */ cnt_servers = SIZEOF_SERVER_LIST; for (i=0;itid = i; srvc->stack = malloc(STACK_SIZE); //NULL will brake everything ;) srvc->user = isrvc->user; srvc->password = isrvc->password; srvc->server = isrvc->server; srvc->channels = isrvc->channels; srvc->port = isrvc->port; srvc->ssl = isrvc->ssl; //atomic_init( &srvc->running, 1); atomic_store(&srvc->running, 0); /* initalise posix mq */ if (0 != mq_ntf_open(&mq_array[i], i)) { printf("Couldnt open mq_ntf_open\n"); } srvc->mq = &mq_array[i]; //try to drain mq mq_ntf_drain(&mq_array[i]); /* clone new proc */ clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc); } /* event handler thread */ evhnd_cfg = malloc(sizeof(event_handler_cfg)); memset(evhnd_cfg, 0, sizeof(event_handler_cfg)); evhnd_cfg->stack = malloc(EVENT_HND_STACK_SIZE); atomic_store(&evhnd_cfg->running, 0); evhnd_cfg->mq_num = cnt_servers; evhnd_cfg->mq_listen = mq_array; clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg); /* run until all threads are up */ cnt_running = 1; while(cnt_running != 0) { //printf("Count\n"); /*count how many proceses is running there*/ int val; cnt_running = 0; for (i=0;i