#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config_servers.h" #include "darray.h" #include "buf.h" #include "mq_cmd.h" #include "mq_ntf.h" #include "tbl_qcmd.h" #include "util.h" #include "sock_conn.h" #include "irc_parse.h" #include "mmm.h" /* no defence programming, no error checking, no argument checking just PoC nothing else */ #define MQ_MSG_SIZE 8192 //HACK extern int __irc_buf_drain_io(irc_buf *ib); void *cmd_pong(void *data) { char *param = (char *)data; char *ret = NULL; if (param == NULL) { return NULL; } printf("PONG\n"); ret = alloc_new_str("PONG"); return ret; } void *cmd_uptime(void *data) { char *param = (char *)data; char *ret = NULL; if (param == NULL) { return NULL; } printf("UPTIME\n"); ret = alloc_new_str("UpTime is infinite\n"); return ret; } void *cmd_date(void *data) { char *param = (char *)data; char *ret = NULL; if (param == NULL) { return NULL; } printf("DATE\n"); ret = alloc_new_str("I dont whant to date with you\n"); return ret; } /* Wrap call back, to manage how going be executed callback. Manage input and output params for callback. INPUT(string)->OUTPUT(string) */ tble_cmd_resp* cllbk_wrapper( void *(*call)(void *), tble_cmd_param *param) { void *data_out = NULL; tble_cmd_resp *resp = NULL; char *data=NULL; if (call == NULL) { PERM(); return NULL; } if (param == NULL) { PERM(); return NULL; } //prepare response resp = tbl_cmd_resp_c(param); if (resp == NULL) { PERM(); } tbl_cmd_resp_print(resp); data = alloc_new_str(param->param); //call callback data_out = call(data); //set result to returned response if (data_out == NULL) { //response doesnt have any output if (resp) { //response dont have result resp->ret_code = TBL_RSP_NORESP; resp->resp = NULL; } } else { if (resp) { //set succesfull repsonse resp->ret_code = TBL_RSP_OK; resp->resp = alloc_new_str(data_out); } } //dangerouse place =P free(data_out); free(data); return resp; } /* return unique ID, with atomic counter should work in all cases */ _Atomic int _glbl_id=0; int uniq_id() { int ret=-1,id; //what possible could go wrong? // sry emilsp this is not important for now id = atomic_load(&_glbl_id); ret = id; id += 1; atomic_store(&_glbl_id, id); return ret; } /* int send_mq_cmd(mq_ntf_mdt *mq, int io, int cmd_id, char *cmd, size_t cmd_sz, char *param, size_t param_sz) { int err = 0; mq_cmd *cmd_send = NULL; cmd_send = mq_cmd_create(cmd_id,cmd,cmd_sz,param,param_sz); if (cmd_send == NULL) { printf("Cant create command\n"); return -1; } err = -1; if (io == MQ_OUT) { err = mq_ntf_write(mq, MQ_OUT, cmd_send->buf, cmd_send->sz); } else if (io == MQ_IN) { err = mq_ntf_write(mq, MQ_IN, cmd_send->buf, cmd_send->sz); } else { printf("Unknown direction"); return -1; } if (err == -1) { printf("Couldnt recieve command\n"); mq_cmd_free(cmd_send); return -1; } mq_cmd_free(cmd_send); return 0; } int recv_mq_cmd(mq_ntf_mdt *mq, int io, mq_cmd **cmd) //return back recieved command { int err = 0; mq_cmd *cmd_recv = NULL; int recv_sz=-1; char *in_buf=NULL; in_buf = malloc(MQ_MSG_SIZE); if (in_buf == NULL) { ENL(); return -1; } err = -1; if (io == MQ_OUT) { err = mq_ntf_read(mq, MQ_OUT, in_buf, MQ_MSG_SIZE); } else if (io == MQ_IN) { err = mq_ntf_read(mq, MQ_IN, in_buf, MQ_MSG_SIZE); } else { printf("Unknown direction"); return -1; } if (err == -1) { printf("Couldnt recieve command\n"); free(in_buf); return -1; } recv_sz = err; //in err should be still the size of recieved buffer cmd_recv = mq_cmd_creates(in_buf, recv_sz, 0); if (cmd_recv == NULL) { printf("Cannot create cmd\n"); free(in_buf); return -1; } *cmd = cmd_recv; free(in_buf); return 0; } */ #define SEND_CMD_IN(MQ,ID,CMD,PARAM) \ send_mq_cmd(MQ,MQ_IN,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); #define SEND_CMD_OUT(MQ,ID,CMD,PARAM) \ send_mq_cmd(MQ,MQ_OUT,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); #define RECV_CMD_IN(MQ,CMD) \ recv_mq_cmd(MQ,MQ_IN,CMD); #define RECV_CMD_OUT(MQ,CMD) \ recv_mq_cmd(MQ,MQ_OUT,CMD); /* |--------|<--IN---|---------| | SERVER | | MANAGER | |--------|--OUT-->|---------| */ #define STACK_SIZE (1024*128) /* not supposed to be changed. */ typedef struct server_cfg { /* thread params */ int tid; void *stack; //atomic_int running; _Atomic int running; mq_ntf_mdt *mq; /* irc server config */ char *user; char *password; char *server; //should be changed to hostname? char **channels; char *port; int ssl; } server_cfg; /******************************************************************************* server thread *******************************************************************************/ /* server_cfg struct as input */ #define TH_STATE_INIT 0 #define TH_STATE_START 1 #define TH_STATE_LISTEN_IN 2 #define TH_STATE_LISTEN_OUT 3 #define TH_STATE_SEND_IN 4 #define TH_STATE_SEND_OUT 5 #define TH_STATE_TERMINATION 6 #define TH_STATE_EXIT 7 #define TH_CONN_BUF_SZ 1024 int th_start_client(void *data) { //int cmd_id = 1; int err; //mq_cmd *qcmd=NULL; //queue command int run; int mq_event; server_cfg *cfg = data; mq_ntf_mdt *mq = cfg->mq; mq_cmd *recv_cmd = NULL; // for recieved cmd from mq struct mq_attr out_attr, *ptr_out_attr=&out_attr; struct mq_attr in_attr, *ptr_in_attr=&in_attr; char *out_buf = NULL; char *in_buf = NULL; //network creation var int cret = -1; int conn=-1; char conn_buf[TH_CONN_BUF_SZ]; char cmd_buf[TH_CONN_BUF_SZ]; //irc parsting irc_buf *ib = NULL; irc_token *itok = NULL; char *irc_line = NULL; atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); sleep(1); //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); //PNL(); //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); //conn = irc_connect(cfg->server, cfg->port); conn = irc_connect("irc.freenode.net", "6667"); //PNL(); if (conn < 0) { PNL(); printf("cant connect to server\n"); //well we dont whant to just exit from thread //lets put inside main thread CONNection checker //if no connection just send some commmand and create //some logic how to deal with it atomic_fetch_sub(&cfg->running,1); //return 0; } //PNL(); //send some commands to irc to register nick snprintf(cmd_buf, TH_CONN_BUF_SZ, "USER %s 0 0 :%s\r\n", cfg->user, cfg->user); write(conn, cmd_buf, strlen(cmd_buf)); snprintf(cmd_buf, TH_CONN_BUF_SZ,"NICK %s \r\n", cfg->user); write(conn, cmd_buf, strlen(cmd_buf)); //prepare message queue mq = cfg->mq; if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { printf("Cant get attribute\n"); ENL(); } //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); out_buf = malloc(out_attr.mq_msgsize); memset(out_buf, 0, out_attr.mq_msgsize); if (out_buf == NULL) { ENL(); } //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1) { printf("Cant get attribute\n"); ENL(); } in_buf = malloc(in_attr.mq_msgsize); memset(in_buf, 0, in_attr.mq_msgsize); if (in_buf == NULL) { ENL(); } //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); //create irc parsing structures memset(conn_buf, 0, TH_CONN_BUF_SZ); memset(cmd_buf, 0, TH_CONN_BUF_SZ); ib = irc_buf_create(); if (ib == NULL) { PNL(); PERM(); //return 0; } //send command wait for response printf("Start loop\n"); run = 1; while (run) { //Collect events from MQ printf("Client loop tick\n"); mq_event = mq_ntf_select(mq, MQ_IN); switch(mq_event) { case 0: //PRINT("EVENT 0\n"); break; case 1: PRINT("MQ_EVENT IN\n"); if (mq_ntf_read(mq, MQ_IN, in_buf, in_attr.mq_msgsize) == -1) { PNL(); printf("Cant read input message \n"); } else { //PNL(); in_buf[in_attr.mq_msgsize-1] = 0x0; PRINT("Recieve %s\n", in_buf); } PNL(); break; default: printf("Unknown event type\n"); } //fast code to exit if QUIT command is recieved //for command in queue //recieve commands from queue if (mq_event == 1) { //PNL(); recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1); if (recv_cmd != NULL) { //char *bufPtr = mq_cmd_buf(recv_cmd); //printf("Recieved response [%s]\n",bufPtr); //char *cmdPtr; //size_t cmdSz; //mq_cmd_cmd(recv_cmd, &cmdPtr, &cmdSz); //printf("CMD %s %d\n", cmdPtr, cmdSz); if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) { printf("QUIT recieved lets quit main loop\n"); break; } else if (mq_cmd_o_cmp_cmd(recv_cmd,"PRIVMSG:") == 0) { PRINT("Response\n") char *paramPtr; size_t paramSz; mq_cmd_param(recv_cmd, ¶mPtr, ¶mSz); printf("Recieved response [%s]%d\n",paramPtr,paramSz); } else { PRINT("Unknown command\n"); } } mq_cmd_free(recv_cmd); recv_cmd = NULL; } //recieve irc commands and pass to MQ //fix this by allowing drain just chunks, not whole buffer //memset(conn_buf, 0, TH_CONN_BUF_SZ); if((cret = read(conn,conn_buf, 256))>0) { irc_buf_puts(ib, conn_buf, cret); while (irc_buf_ready(ib) == 1) { irc_line = irc_buf_line(ib); if (irc_line) { printf("PARSE>>%s\n",irc_line); if (memcmp(irc_line,"PING",4)==0) { write(conn,"PONG",strlen("PONG")); printf("OUT>>PONG\n"); } itok = irc_parse(irc_line,strlen(irc_line)); if (itok != NULL) { int j; for (j=0;jtk_list[j]->token); } if (token_cmp(itok,1,"NOTICE")==1) { //int fret; //fret = write(conn,"NOTICE",strlen("NOTICE")); //printf("OUT<user,uname); printf("FORMATED [%s]",cmd_buf); fret = write(conn,cmd_buf,fret2); //create command and send to mq PRINT("msg %s\n"); mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); char *bufPtr; bufPtr = mq_cmd_buf(privcmd); PRINT("BUF %s", bufPtr); mq_ntf_write(mq, MQ_OUT, bufPtr, strlen(bufPtr)); mq_cmd_free(privcmd); free(msg); free(uname); printf("OUT<running,1); return 0; } #define EVENT_HND_STACK_SIZE (1024*1024) typedef struct event_handler_cfg { /* thread params*/ void *stack; mq_ntf_mdt *mq_listen; int mq_num; _Atomic int running; } event_handler_cfg; /* server_cfg struct as input */ #define EH_STATE_INIT 0 #define EH_STATE_START 1 #define EH_STATE_LISTEN_IN 2 #define EH_STATE_LISTEN_OUT 3 #define EH_STATE_SEND_IN 4 #define EH_STATE_SEND_OUT 5 #define EH_STATE_TERMINATION 6 #define EH_STATE_EXIT 7 /******************************************************************************* Event thead. Recieve all events from server thread and pass them for execution. Return results from execution to server thread. *******************************************************************************/ /* Thread to reacieve messages and return them back */ int th_event_manager(void *data) { event_handler_cfg *cfg = data; atomic_fetch_add(&cfg->running,1); mq_ntf_mdt *mq=NULL; char *out_buf = NULL; int run; int mq_event; //read any command mq_cmd *recv_cmd = NULL; // for recieved cmd from mq struct mq_attr out_attr, *ptr_out_attr=&out_attr; //execution and command table generation tble_exec *ecmd = NULL; tbl_exec *etbl = NULL; tble_qcmd *qcmd = NULL; tbl_qcmd *qtbl = NULL; //create execution table etbl = tbl_exec_list_c(10); if (etbl == NULL) { PERM(); //return -1; } ecmd = tbl_exec_c(); if (ecmd == NULL) { PERM(); //return -1; } ecmd->id = uniq_id(); ecmd->name = alloc_new_str("local-executor"); ecmd->cmd = alloc_new_str("PING"); ecmd->callback = cmd_pong; if (-1 == tbl_exec_add(etbl, ecmd)) { PERM(); //return -1; } ecmd = tbl_exec_c(); if (ecmd == NULL) { PERM(); //return -1; } ecmd->id = uniq_id(); ecmd->name = alloc_new_str("local-executor"); ecmd->cmd = alloc_new_str("DATE"); ecmd->callback = cmd_date; if (-1 == tbl_exec_add(etbl, ecmd)) { PERM(); //return -1; } ecmd = tbl_exec_c(); if (ecmd == NULL) { PERM(); //return -1; } ecmd->id = uniq_id(); ecmd->name = alloc_new_str("local-executor"); ecmd->cmd = alloc_new_str("UPTIME"); ecmd->callback = cmd_uptime; if (-1 == tbl_exec_add(etbl, ecmd)) { PERM(); //return -1; } tbl_exec_print_tbl(etbl, TBL_PF_EXEC_ALL); etbl = tbl_exec_list_c(10); if (etbl == NULL) { PERM(); //return -1; }//PNL(); //create command table qtbl = tbl_qcmd_c(10);//well 10 should be ok right? //config mq printf("Start event thread\n"); mq = cfg->mq_listen; //get mq attributes if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { printf("Cant get attribute\n"); } out_buf = malloc(out_attr.mq_msgsize); //maybe its not null printf("Start event loop\n"); run = 1; while(run) { //PNL(); //check if there is some message and save it to buffer run += 1; mq_event = mq_ntf_select(mq, MQ_OUT); switch(mq_event) { case 0: //PRINT("EVENT 0\n"); break; case 1: PRINT("MQ_EVENT OUT\n"); if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1) { printf("Cant read output message\n"); } else { out_buf[out_attr.mq_msgsize-1] = 0x0; printf("Recieve %s\n", out_buf); } break; default: printf("Unknown event type\n"); } //sleep(1); //if (run == 10) // break; //PNL(); //if QUIT then quit the thread if (mq_event == 1) { PNL(); recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, 1); //PNL(); if (recv_cmd != NULL) { PNL(); PRINT("MQ:CMD %s\n", mq_cmd_buf(recv_cmd)); if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) { printf("QUIT recieved lets quit main loop\n"); break; } else if (mq_cmd_o_cmp_cmd(recv_cmd,"PING") == 0) { printf("Its fine im working\n"); //create command and send to mq char msg[] = "PONG"; mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); //char *str_mq = mq_cmd_buf(privcmd); //mq_ntf_write(mq, MQ_IN, str_mq, strlen(str_mq)); mq_cmd_free(privcmd); } else if (mq_cmd_o_cmp_cmd(recv_cmd,"PRIVMSG") == 0) { printf("Some private message\n"); /*char msg[] = "PONG EVENT THREAD"; mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); //recieve params and match command to them char *paramPtr=NULL; size_t paramSz=0; mq_cmd_param(recv_cmd, ¶mPtr, ¶mSz); PRINT("param = %s\n",paramPtr); mq_ntf_write(mq, MQ_IN, paramPtr, paramSz); mq_cmd_free(privcmd); */ if (mq_cmd_o_cmp_param(recv_cmd, ":DATE") == 0) { char msg[] = "No date"; mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); char *cmdBuf = mq_cmd_buf(privcmd); mq_ntf_write(mq, MQ_IN, cmdBuf, strlen(cmdBuf)); cmdBuf = NULL; mq_cmd_free(privcmd); privcmd = NULL; } else if (mq_cmd_o_cmp_param(recv_cmd, ":UPTIME") == 0) { char msg[] = "Forever"; mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); char *cmdBuf = mq_cmd_buf(privcmd); mq_ntf_write(mq, MQ_IN, cmdBuf, strlen(cmdBuf)); cmdBuf = NULL; mq_cmd_free(privcmd); privcmd = NULL; } else { char msg[] = "PONG EVENT THREAD"; mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); //recieve params and match command to them //char *paramPtr=NULL; //size_t paramSz=0; //mq_cmd_param(recv_cmd, ¶mPtr, ¶mSz); //PRINT("param = %s\n",paramPtr); char *cmdBuf = mq_cmd_buf(privcmd); mq_ntf_write(mq, MQ_IN, cmdBuf, strlen(cmdBuf)); mq_cmd_free(privcmd); } } else if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0) { printf("Hey dude it works second time\n"); } else { printf("Unknow command\n"); char *cmdPtr; size_t cmdSz; mq_cmd_cmd(recv_cmd, &cmdPtr, &cmdSz); char *cmdNew = alloc_new_str_s(cmdPtr,cmdSz); printf("Unknown cmd [%s]\n", cmdNew); FREE(cmdNew); } } mq_cmd_free(recv_cmd); } //applay to recieved command executor //other command pass to cmd/execution matching table //pass to exec handler //get response if command is immidieate otherwise wait for response in next execution } free(out_buf); PNL(); printf("End event thread\n"); atomic_fetch_sub( &cfg->running,1); return 0; } /******************************************************************************* Main code entry code. Init threads. Give basic params to them. And then become as a watch dog. *******************************************************************************/ int main(int argc, char **argv) { int i; int cnt_servers,cnt_running; server_cfg *cfg_list; event_handler_cfg *evhnd_cfg; mq_ntf_mdt *mq_array; /*set atomic variables to init value*/ atomic_store(&_glbl_id, 0); /* Load configuration */ cnt_servers = SIZEOF_SERVER_LIST; for (i=0;itid = i; srvc->stack = malloc(STACK_SIZE); //NULL will brake everything ;) if (srvc->stack == NULL) { ERROR("BLow"); } srvc->user = isrvc->user; srvc->password = isrvc->password; srvc->server = isrvc->server; srvc->channels = isrvc->channels; srvc->port = isrvc->port; srvc->ssl = isrvc->ssl; //atomic_init( &srvc->running, 1); atomic_store(&srvc->running, 0); /* initalise posix mq */ if (0 != mq_ntf_open(&mq_array[i], i)) { printf("Couldnt open mq_ntf_open\n"); } srvc->mq = &mq_array[i]; //try to drain mq mq_ntf_drain(&mq_array[i]); PRINT("SERVER:%s PORT:%s USER:%s\n", srvc->server, srvc->port, srvc->user); /* clone new proc */ clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc); } /* event handler thread */ evhnd_cfg = malloc(sizeof(event_handler_cfg)); memset(evhnd_cfg, 0, sizeof(event_handler_cfg)); evhnd_cfg->stack = malloc(EVENT_HND_STACK_SIZE); atomic_store(&evhnd_cfg->running, 0); evhnd_cfg->mq_num = cnt_servers; evhnd_cfg->mq_listen = mq_array; clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg); //PNL(); /* wait a sec while all threads will start running */ sleep(1); /* run until all threads are up */ cnt_running = 1; while(cnt_running != 0) { //printf("Count\n"); /*count how many proceses is running there*/ int val; cnt_running = 0; for (i=0;i