#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config_servers.h" #include "config_cmds.h" #include "config_load.h" #include "darray.h" #include "buf.h" #include "mq_ntf.h" #include "tbl_qcmd.h" #include "util.h" #include "sock_conn.h" #include "irc_parse.h" #include "mmm.h" #include "stat.h" #include "log.h" #include "arg.h" #include "nbrpc_event.h" /* no defence programming, no error checking, no argument checking just PoC nothing else */ /* Define command line arguments */ s_arg_val arg_config_filename = { .ptr = "config/servers.ini", .default_ptr = "config/servers.ini", .result = NULL }; def_arg agni_cmd_args[] = { ARG_ENTRY("-c",VAL,&arg_config_filename,"ini config file"), {NULL,0,NULL} }; arg_t *cfg = NULL; /* configuration variables */ config_ini *config=NULL; #define MQ_MSG_SIZE 8192 //HACK extern int __irc_buf_drain_io(irc_buf *ib); /* Wrap call back, to manage how going be executed callback. Manage input and output params for callback. INPUT(string)->OUTPUT(string) */ tble_cmd_resp* cllbk_wrapper( void *(*call)(void *), tble_cmd_param *param) { void *data_out = NULL; tble_cmd_resp *resp = NULL; char *data = NULL; if (call == NULL) { PERM(); return NULL; } if (param == NULL) { PERM(); return NULL; } //prepare response resp = tbl_cmd_resp_c(param); if (resp == NULL) { PERM(); } tbl_cmd_resp_print(resp); data = alloc_new_str(param->param); //call callback data_out = call(data); //set result to returned response if (data_out == NULL) { //response doesnt have any output if (resp) { //response dont have result resp->ret_code = TBL_RSP_NORESP; resp->resp = NULL; } } else { if (resp) { //set succesfull repsonse resp->ret_code = TBL_RSP_OK; resp->resp = alloc_new_str(data_out); } } //dangerouse place =P FREE(data_out); FREE(data); return resp; } /* convert irc ident to 3 newly alocated string */ int irciedent_parse(char *ident, rpc_request *req) { int count1, count2; sds params1 = sdsnew(ident), params2; sds *tokens1, *tokens2; tokens1 = sdssplitlen(params1, sdslen(params1), "!", 1, &count1); req->user = alloc_new_str(tokens1[0]); params2 = sdsnew(tokens1[1]); tokens2 = sdssplitlen(params2, sdslen(params2), "@", 1, &count2); req->mask = alloc_new_str(tokens2[0]); req->server = alloc_new_str(tokens2[1]); sdsfree(params1); sdsfree(params2); sdsfreesplitres(tokens1, count1); sdsfreesplitres(tokens2, count2); return 0; } /* return unique ID, with atomic counter should work in all cases */ _Atomic int _glbl_id=0; #define SEND_CMD_IN(MQ,ID,CMD,PARAM) \ send_mq_cmd(MQ,MQ_IN,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); #define SEND_CMD_OUT(MQ,ID,CMD,PARAM) \ send_mq_cmd(MQ,MQ_OUT,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); #define RECV_CMD_IN(MQ,CMD) \ recv_mq_cmd(MQ,MQ_IN,CMD); #define RECV_CMD_OUT(MQ,CMD) \ recv_mq_cmd(MQ,MQ_OUT,CMD); /* |--------|<--IN---|---------| | SERVER | | MANAGER | |--------|--OUT-->|---------| */ #define STACK_SIZE (1024*128) /* not supposed to be changed. */ typedef struct server_cfg { /* thread params */ int tid; void *stack; //atomic_int running; _Atomic int running; mq_ntf_mdt *mq; /* irc server config */ char *user; char *password; char *server; //should be changed to hostname? char *channels[SERV_CHAN_MAX_NUM]; char *port; int ssl; } server_cfg; /******************************************************************************* server thread *******************************************************************************/ /* server_cfg struct as input */ #define TH_STATE_INIT 0 #define TH_STATE_START 1 #define TH_STATE_LISTEN_IN 2 #define TH_STATE_LISTEN_OUT 3 #define TH_STATE_SEND_IN 4 #define TH_STATE_SEND_OUT 5 #define TH_STATE_TERMINATION 6 #define TH_STATE_EXIT 7 #define TH_CONN_BUF_SZ 1024 int th_start_client(void *data) { //mq_cmd *qcmd=NULL; //queue command int run; int mq_event; int byte_num; int fret=-1; server_cfg *cfg = data; mq_ntf_mdt *mq = cfg->mq; struct mq_attr out_attr, *ptr_out_attr=&out_attr; struct mq_attr in_attr, *ptr_in_attr=&in_attr; char *out_buf = NULL; char *in_buf = NULL; //network creation var int cret = -1; //int conn=-1; irc_conn conn; char conn_buf[TH_CONN_BUF_SZ]; char cmd_buf[TH_CONN_BUF_SZ]; //irc parsting irc_buf *ib = NULL; irc_token *itok = NULL; char *irc_line = NULL; //table to match response/request tble_qcmd *qcmd = NULL; tbl_qcmd *qtbl = NULL; //create response table qtbl = tbl_qcmd_c(100); if (qtbl == NULL) { LOG_NONE_W(); } ////////////////////////////////////////////////////////////////////////////// //nbrpc char *buf_nb = NULL; rpc_request *req = NULL; rpc_response *resp = NULL; netbyte_store *nb_req, *nb_resp; nb_resp = malloc(sizeof(netbyte_store)); //collect stats stat_server stats; memset(&stats, 0, sizeof(stat_server)); atomic_fetch_add(&cfg->running,1); LOG_NONE_I("Start client\n"); LOG_NONE_I("Server %d\n",cfg->tid); sleep(1); //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); /* //PNL(); //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); conn = irc_connect(cfg->server, cfg->port); //conn = irc_connect("irc.freenode.net", "6667"); //PNL(); if (conn < 0) { PNL(); printf("cant connect to server\n"); //well we dont whant to just exit from thread //lets put inside main thread CONNection checker //if no connection just send some commmand and create //some logic how to deal with it atomic_fetch_sub(&cfg->running,1); //return 0; }*/ memset(&conn, 0, sizeof(irc_conn)); fret = irc_open(cfg->server, cfg->port, &conn); if (fret != 0) { LOG_NONE_E("Cant connect\n"); return -1; } //lets timeout after 150 sec, ping from server is 120 sec irc_read_timeout(&conn,150); //PNL(); //send some commands to irc to register nick snprintf(cmd_buf, TH_CONN_BUF_SZ, "USER %s 0 0 :%s\r\n", cfg->user, cfg->user); write(conn.conn_fd, cmd_buf, strlen(cmd_buf)); snprintf(cmd_buf, TH_CONN_BUF_SZ,"NICK %s \r\n", cfg->user); write(conn.conn_fd, cmd_buf, strlen(cmd_buf)); //prepare message queue mq = cfg->mq; if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { LOG_NONE_E("Cant get attribute\n"); } //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); PRINT("out_attr.mq_msgsize %ld\n", out_attr.mq_msgsize); out_buf = malloc(out_attr.mq_msgsize); if (out_buf == NULL) { LOG_NONE_E("Can allocate out_buf, size of %ld\n", out_attr.mq_msgsize); } memset(out_buf, 0, out_attr.mq_msgsize); //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1) { LOG_NONE_E("Cant get attribute\n"); } in_buf = malloc(in_attr.mq_msgsize); memset(in_buf, 0, in_attr.mq_msgsize); if (in_buf == NULL) { LOG_NONE_E(); } //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); //create irc parsing structures memset(conn_buf, 0, TH_CONN_BUF_SZ); memset(cmd_buf, 0, TH_CONN_BUF_SZ); ib = irc_buf_create(); if (ib == NULL) { LOG_NONE_E(); //return 0; } //send command wait for response LOG_NONE_I("Start loop\n"); run = 1; while (run) { //Collect events from MQ //printf("Client loop tick\n"); mq_event = mq_ntf_select(mq, MQ_IN); switch(mq_event) { case 0: //PRINT("EVENT 0\n"); break; case 1: //PRINT("MQ_EVENT IN\n"); //memset(in_buf, 0, in_attr.mq_msgsize); if ((byte_num = mq_ntf_read(mq, MQ_IN, in_buf, in_attr.mq_msgsize)) == -1) { LOG_NONE_W("Cant read input message \n"); } else { in_buf[byte_num] = 0x0; LOG_NONE_E("Recieve %s\n", in_buf); } break; default: LOG_NONE_W("Unknown event type\n"); } //fast code to exit if QUIT command is recieved //for command in queue //recieve commands from queue if (mq_event == 1) { nb_init(nb_resp); nb_load(nb_resp, (unsigned char *)in_buf); rpc_resp_unmarsh(nb_resp, &resp); LOG_NONE_D("RESP: %d-%s(%s)\n", resp->id, resp->result, resp->error); tble_cmd_resp *match_resp = malloc(sizeof(tble_cmd_resp)); memset(match_resp, 0, sizeof(tble_cmd_resp)); if (match_resp != NULL) { match_resp->qid = resp->id; match_resp->resp = alloc_new_str(resp->result); int cmd_id = tbl_qcmd_resp(qtbl, match_resp); if (cmd_id > 0) { //BUG HERE!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!.\.,\<\/\/\// LOG_NONE_D("cmd_id %d\n", cmd_id); tble_qcmd *cmd = tbl_qcmd_match_id(qtbl, cmd_id); if (cmd != NULL) { char *sep = strchr(cmd->ircident,'!'); //BUGGY char *user = alloc_new_str(cmd->ircident); //BUGGY user[sep-cmd->ircident] = 0x0; char *resp_user = token_new_str(cmd->tok, 2); //PRINT("resp_user %s\n",resp_user); //int formated_size = -1; int count; sds str_out; sds *tokens; int out_i; if (resp_user[0]=='#') { //printf("RESPONSE :%s PRIVMSG %s :%s\n", cfg->user, user, resp->result); //formated_size = snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :%s \r\n", cfg->user, resp_user, resp->result); snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :", cfg->user, resp_user); //PRINT("%s\n",cmd_buf); } else { //printf("RESPONSE :%s PRIVMSG %s :%s\n", user, resp_user, resp->result); //formated_size = snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :%s \r\n", resp_user, user, resp->result); snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :", resp_user, user); //PRINT("%s\n",cmd_buf); } str_out = sdsnew(resp->result); tokens = sdssplitlen(str_out,sdslen(str_out),"\n",1,&count); for (out_i=0; out_i>>>%s",sending); write(conn.conn_fd, sending, sdslen(sending)); sdsfree(sending); } sdsfreesplitres(tokens, count); sdsfree(str_out); FREE(user); FREE(resp_user); //remove command by id //for now just remove command without checkings its state tbl_qcmd_del_by_id(qtbl, cmd_id); } } } //nb_print(nb_resp); //nb_free(nb_resp); } //should be added proper command state check and so on tbl_qcmd_mng_states(qtbl); // This should be runed regulary //recieve irc commands and pass to MQ, save command to table //fix this by allowing drain just chunks, not whole buffer memset(conn_buf, 0, TH_CONN_BUF_SZ); //PNL(); cret = 0; //if((cret = read(conn,conn_buf, 256))>0) if ((cret = irc_read(&conn, conn_buf, 256))>0) { LOG_NONE_D(">>READ %d<<\n", cret); irc_buf_puts(ib, conn_buf, cret); while (irc_buf_ready(ib) == 1) { irc_line = irc_buf_line(ib); if (irc_line) { //printf("PARSE>>%s<<\n",irc_line); if (memcmp(irc_line,"PING",4)==0) { int fret; const int pong_buf_sz = 128; char pong_buf[pong_buf_sz]; snprintf(pong_buf, pong_buf_sz, "PONG %s\n", cfg->user); LOG_NONE_D("%s", pong_buf) fret = write(conn.conn_fd, pong_buf, strlen(pong_buf)); LOG_NONE_D("OUT>>PONG %d\n",fret); } itok = irc_parse(irc_line,strlen(irc_line)); if (itok != NULL) { int j; for (j=0;jtk_list[j]->token); } if (token_cmp(itok,1,"NOTICE")==1) { } //something is sended to cbot if (token_cmp(itok,1,"PRIVMSG") == 1) { memset(cmd_buf,0,128); char *uname = token_new_str(itok,0); char *ircident = token_new_str(itok,0); char *irccmd = token_new_str(itok,1); char *dest_name = token_new_str(itok,2); char *msg = token_new_str(itok,3); //could send without first symbol pal LOG_NONE_D("dest_name %s\n",dest_name); char *sep = strchr(uname,'!'); uname[sep-uname] = 0x0; if (strncmp(":join",msg,5)==0) { //lets check if one more argument is there { char *chan_name = NULL; int iter = 0; char *sep_chan = strchr(msg,' ')+1; //small hack //check if channel in allowed list, if yes then continue iter=0; while (cfg->channels[iter]!=NULL) { chan_name = cfg->channels[iter]; LOG_NONE_D("%s\n",chan_name); if ((strncmp(chan_name,sep_chan,strlen(chan_name))==0) && (strlen(sep_chan)==strlen(chan_name))) { //lets join channel memset(cmd_buf, 0, TH_CONN_BUF_SZ); int fret = snprintf(cmd_buf, TH_CONN_BUF_SZ, ":%s JOIN %s\r\n", dest_name, sep_chan); write(conn.conn_fd, cmd_buf, fret); break; } iter++; if (cfg->channels[iter]==NULL) { LOG_NONE_W("Cant find this chan in da list\n"); } } } } else { //just send to test that thos commands works mate //create cmd table command and add to table qcmd = tbl_qcmd_cmd_c(); if (qcmd != NULL) { qcmd->cid = uniq_id(); qcmd->state = QCMD_NONE; qcmd->ircident = alloc_new_str(ircident); qcmd->cmd = alloc_new_str(irccmd); qcmd->param = alloc_new_str(msg); qcmd->timestamp = time(NULL); tbl_qcmd_add_tok(qcmd, itok); //HOW? add commands that are in execution list, dont process all messages tbl_qcmd_add(qtbl, qcmd); tbl_qcmd_print_tbl(qtbl,TBL_PF_QCMD_ID |TBL_PF_QCMD_CID |TBL_PF_QCMD_STATE |TBL_PF_QCMD_CMD |TBL_PF_QCMD_IRCIDENT |TBL_PF_QCMD_PARAM |TBL_PF_QCMD_TID |TBL_PF_QCMD_TIDX); } req = rpc_req_new(qcmd->cmd, qcmd->param, qcmd->cid); irciedent_parse(qcmd->ircident, req); rpc_req_marsh(req, &nb_req); buf_nb = (char *)nb_create(nb_req); memcpy(out_buf, buf_nb, nb_req->size > out_attr.mq_msgsize ? out_attr.mq_msgsize : nb_req->size); mq_ntf_write(mq, MQ_OUT, out_buf, out_attr.mq_msgsize); FREE(buf_nb); LOG_NONE_D("Write %d bytes\n", nb_req->size); rpc_req_free(req); nb_free(nb_req); //for safety make null, as no one should use it anymore qcmd = NULL; } FREE(msg); FREE(uname); FREE(ircident); FREE(irccmd); } token_destroy(itok); itok = NULL; } if (irc_line!=NULL) { memset(irc_line,0,strlen(irc_line)); FREE(irc_line); } __irc_buf_drain_io(ib); } } } else if (cret == 0) { //TODO add lofic to retry number, and thex exit from thread //hope server where disconnected if (irc_reconnect(&conn)==-1) { while (-1 == irc_reconnect(&conn)) { LOG_NONE_I("Reconnect in 10 sec\n"); sleep(10); } //sleep(1); } //send some commands to irc to register nick snprintf(cmd_buf, TH_CONN_BUF_SZ, "USER %s 0 0 :%s\r\n", cfg->user, cfg->user); write(conn.conn_fd, cmd_buf, strlen(cmd_buf)); snprintf(cmd_buf, TH_CONN_BUF_SZ,"NICK %s \r\n", cfg->user); write(conn.conn_fd, cmd_buf, strlen(cmd_buf)); } else { if (conn.err == ERR_SC_TIMEOUT) { LOG_NONE_I("TIMEOUT\n"); if (irc_reconnect(&conn)==-1) { while (-1 == irc_reconnect(&conn)) { LOG_NONE_I("Reconnect in 10 sec\n"); sleep(10); } //sleep(1); } //send some commands to irc to register nick snprintf(cmd_buf, TH_CONN_BUF_SZ, "USER %s 0 0 :%s\r\n", cfg->user, cfg->user); write(conn.conn_fd, cmd_buf, strlen(cmd_buf)); snprintf(cmd_buf, TH_CONN_BUF_SZ,"NICK %s \r\n", cfg->user); write(conn.conn_fd, cmd_buf, strlen(cmd_buf)); } } //send command over queue } LOG_NONE_I("End client\n"); atomic_fetch_sub( &cfg->running,1); return 0; } #define EVENT_HND_STACK_SIZE (1024*1024) typedef struct event_handler_cfg { /* thread params*/ void *stack; mq_ntf_mdt *mq_listen; int mq_num; _Atomic int running; } event_handler_cfg; /* server_cfg struct as input */ #define EH_STATE_INIT 0 #define EH_STATE_START 1 #define EH_STATE_LISTEN_IN 2 #define EH_STATE_LISTEN_OUT 3 #define EH_STATE_SEND_IN 4 #define EH_STATE_SEND_OUT 5 #define EH_STATE_TERMINATION 6 #define EH_STATE_EXIT 7 /******************************************************************************* Event thead. Recieve all events from server thread and pass them for execution. Return results from execution to server thread. *******************************************************************************/ /* Thread to reacieve messages and return them back */ int th_event_manager(void *data) { event_handler_cfg *cfg = data; atomic_fetch_add(&cfg->running,1); mq_ntf_mdt *mq= NULL,*mq_cur=NULL; int i_mq = 0; //to iter trought mqueue char *out_buf = NULL; char *in_buf = NULL; int run; int mq_event; //read any command struct mq_attr out_attr, *ptr_out_attr=&out_attr; struct mq_attr in_attr, *ptr_in_attr=&in_attr; //execution and command table generation //tble_exec *ecmd = NULL; tbl_exec *etbl = NULL; etbl = tbl_exec_list_c(30); if (etbl == NULL) { PERM(); } //collect stats stat_event stats; memset(&stats, 0, sizeof(stat_event)); //load predefined/precompiled command list { single_cmd_def *single_cmd = confgi_cmd_list; while ((single_cmd!=NULL)&&(single_cmd->name!=NULL)) { tble_exec *ecmd = NULL; ecmd = tbl_exec_c(); if (ecmd == NULL) { PERM(); } ecmd->id = uniq_id(); ecmd->name = alloc_new_str("local-executor"); ecmd->cmd = alloc_new_str(single_cmd->name); switch(single_cmd->type) { case CMD_T_SIMPLE: ecmd->type = TBL_T_SIMPLE; break; case CMD_T_RPC: ecmd->type = TBL_T_RPC; break; case CMD_T_LUA: ecmd->type = TBL_T_LUA; break; default: LOG_NONE_E("No such type, something wrong here\n"); } ecmd->callback = single_cmd->callback; if (-1 == tbl_exec_add(etbl, ecmd)) { PERM(); } single_cmd++; } } tbl_exec_print_tbl(etbl, TBL_PF_EXEC_ALL); //config mq LOG_NONE_I("Start event thread\n"); mq = cfg->mq_listen; //get mq attributes, FIX out/in everytime could be different pal if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { LOG_NONE_E("Cant get attribute\n"); } if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1) { LOG_NONE_E("Cant get attribute\n"); } ////////////////////////////////////////////////////////////////////////////// //nbrpc prepare variables rpc_request *req = NULL; rpc_response *resp = NULL; netbyte_store *nb_req = NULL, *nb_resp=NULL; char *buf_nb = NULL; nb_resp = malloc(sizeof(netbyte_store)); //not freed nb_req = malloc(sizeof(netbyte_store)); //not freed out_buf = malloc(out_attr.mq_msgsize); //maybe its not null in_buf = malloc(in_attr.mq_msgsize); LOG_NONE_I("Start event loop\n"); run = 1; while(run) { run += 1; for (i_mq=0;i_mqmq_num;i_mq++) { mq = cfg->mq_listen; mq_event = mq_ntf_select(&mq[i_mq], MQ_OUT); mq_cur = &mq[i_mq]; if (mq_event == 1) { int bytes; //PRINT("MQ_EVENT OUT\n"); memset(out_buf, 0, out_attr.mq_msgsize); //will fail if some of queue have different buffer size param if ((bytes = mq_ntf_read(&mq[i_mq], MQ_OUT, out_buf, out_attr.mq_msgsize)) == -1) { LOG_NONE_D("Cant read output message\n"); } else { out_buf[out_attr.mq_msgsize-1] = 0x0; stats.cnt_ipc_rx += bytes; LOG_NONE_D("Recieve %s\n", out_buf); } break; } } //mq_cur have currently queue where message comes from, need to make some buffer that gona pass/save this info //if QUIT then quit the thread if (mq_event == 1) { nb_init(nb_req); if (0 == nb_load(nb_req, (unsigned char *)out_buf)) { rpc_req_unmarsh(nb_req, &req); LOG_NONE_D("EVENT-REQ: %d-%s(%s) %s-%s-%s\n", req->id, req->method, req->params, req->user, req->mask, req->server); //nb_print(nb_req); if (strncmp(req->method,":PRIVMSG",8)) { if (tbl_exec_in_s(etbl, req->params+1)>=0) { void *ret_msg = NULL; //special set of commands sds exec_params; exec_params = sdsempty(); exec_params = sdscat(exec_params, req->params+1); if (0 == strncmp(exec_params,"STAT",4)) { char *stat_str = stat_event2str(&stats); exec_params = sdscat(exec_params, " "); exec_params = sdscat(exec_params, stat_str); FREE(stat_str); } if (tbl_exec_run(etbl, exec_params, req->user, req->mask, req->server, &ret_msg)>=0) { if (ret_msg != NULL) { LOG_NONE_D("Ret message %s\n",(char *)ret_msg); //send response resp = rpc_resp_new(ret_msg,"None",req->id); resp->user = alloc_new_str(""); resp->mask = alloc_new_str(""); resp->server = alloc_new_str(""); rpc_resp_marsh(resp, &nb_resp); buf_nb = (char *)nb_create(nb_resp); mq_ntf_write(mq_cur, MQ_IN, buf_nb, nb_resp->size > in_attr.mq_msgsize ? in_attr.mq_msgsize : nb_resp->size); stats.cnt_ipc_tx += nb_resp->size > in_attr.mq_msgsize ? in_attr.mq_msgsize : nb_resp->size; FREE(buf_nb); LOG_NONE_D("EVENT-Write %ld bytes\n", nb_resp->size > in_attr.mq_msgsize ? in_attr.mq_msgsize : nb_resp->size); FREE(ret_msg); nb_free(nb_resp); rpc_resp_free(resp); stats.cnt_cmd_succ += 1; } } else { LOG_NONE_E("Command execution error\n"); stats.cnt_cmd_err += 1; } sdsfree(exec_params); } else { LOG_NONE_E("Command not in a table\n"); } } else { LOG_NONE_E("Unsupporetd irc command\n"); } //PNL();nb_free(nb_req);PNL(); } else { LOG_NONE_E("wrong recieved rpc command\n"); } } //applay to recieved command executor //other command pass to cmd/execution matching table //pass to exec handler //get response if command is immidieate otherwise wait for response in next execution } FREE(out_buf); LOG_NONE_D("End event thread\n"); atomic_fetch_sub( &cfg->running,1); return 0; } /******************************************************************************* Main code entry code. Init threads. Give basic params to them. And then become as a watch dog. *******************************************************************************/ int main(int argc, char **argv) { int i,j; int cnt_servers,cnt_running; server_cfg *cfg_list; event_handler_cfg *evhnd_cfg; mq_ntf_mdt *mq_array; /*set atomic variables to init value*/ atomic_store(&_glbl_id, 0); /*logging facilities*/ log_init(); log_register_sub(LOG_SUB_NONE, LOG_LEVEL_DEBUG, LOG_LOC_STDIO, NULL); /*Load configs*/ cfg = arg_load(argc, argv, agni_cmd_args); config_load_ini(arg_config_filename.result->ptr, &config); printf("%s\n",arg_config_filename.result->ptr); /* Load configuration */ cnt_servers = config_server_num(config); if (cnt_servers == 0) { printf("No configured servers to load\n"); return 0; } cfg_list = malloc(sizeof(server_cfg)*cnt_servers); if (cfg_list == NULL) { ENL(); } memset(cfg_list, 0, sizeof(server_cfg)*cnt_servers); mq_array = malloc(sizeof(mq_ntf_mdt)*cnt_servers); if (mq_array == NULL) { ENL(); } /* For each configuration create listener */ for (i=0;itid = i; srvc->stack = malloc(STACK_SIZE); //NULL will brake everything ;) if (srvc->stack == NULL) { LOG_NONE_F("BLow"); } srvc->user = alloc_new_str(isrvc.user); IF_NULL(srvc->user,{},{LOG_NONE_F("Cant allocate srvc->user\n")}); //srvc->password = alloc_new_str(isrvc.password); srvc->server = alloc_new_str(isrvc.server); IF_NULL(srvc->user,{},{LOG_NONE_F("Cant allocate srvc->server\n")}); srvc->port = alloc_new_str(isrvc.port); IF_NULL(srvc->user,{},{LOG_NONE_F("Cant allocate srvc->port\n")}); srvc->ssl = isrvc.ssl; //srvc->channels = isrvc.channels; for (j=0;jchannels[j] = alloc_new_str(isrvc.channels[j]); IF_NULL(srvc->channels[j],{},{LOG_NONE_F("Cannt alloc srvc->channels[j]\n");}) } else { srvc->channels[j] = NULL; } } free_irc_server_conf(&isrvc); //atomic_init( &srvc->running, 1); atomic_store(&srvc->running, 0); /* initalise posix mq */ if (0 != mq_ntf_open(&mq_array[i], i)) { LOG_NONE_F("Couldnt open mq_ntf_open\n"); } srvc->mq = &mq_array[i]; //try to drain mq mq_ntf_drain(&mq_array[i]); LOG_NONE_I("SERVER:%s PORT:%s USER:%s\n", srvc->server, srvc->port, srvc->user); /* clone new proc */ if (-1 == clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc)) { LOG_NONE_E("Cant allocate new server process\n"); } } /* event handler thread */ evhnd_cfg = malloc(sizeof(event_handler_cfg)); if (evhnd_cfg == NULL) { LOG_NONE_E("cant allocate event handler\n"); } memset(evhnd_cfg, 0, sizeof(event_handler_cfg)); evhnd_cfg->stack = malloc(EVENT_HND_STACK_SIZE); if (evhnd_cfg->stack == NULL) { LOG_NONE_E("Cant allocate event handler stack\n"); } atomic_store(&evhnd_cfg->running, 0); evhnd_cfg->mq_num = cnt_servers; evhnd_cfg->mq_listen = mq_array; if (-1 == clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg)) { LOG_NONE_E("Couldnt create process\n"); } //PNL(); /* wait a sec while all threads will start running */ sleep(1); /* run until all threads are up */ cnt_running = 1; while(cnt_running != 0) { //printf("Count\n"); /*count how many proceses is running there*/ int val; cnt_running = 0; for (i=0;i