diff options
Diffstat (limited to 'agni.c')
-rw-r--r-- | agni.c | 557 |
1 files changed, 165 insertions, 392 deletions
@@ -22,6 +22,7 @@ #include "darray.h" #include "buf.h" #include "mq_cmd.h" +#include "mq_ntf.h" #include "tbl_qcmd.h" #include "util.h" #include "sock_conn.h" @@ -37,408 +38,106 @@ nothing else //HACK extern int __irc_buf_drain_io(irc_buf *ib); -/* -return unique ID, with atomic counter should work in all cases -*/ -_Atomic int _glbl_id=0; -int uniq_id() +void *cmd_pong(void *data) { - int ret=-1,id; + char *param = (char *)data; + char *ret = NULL; - //what possible could go wrong? - // sry emilsp this is not important for now - id = atomic_load(&_glbl_id); - ret = id; - id += 1; - atomic_store(&_glbl_id, id); - return ret; - -} - -//async io multiplexer -typedef struct mq_ntf_io_mplx -{ - struct timeval tv; - fd_set io_fds; -} mq_ntf_io_mplx; - -#define MQ_OUT 1 -#define MQ_IN 2 - - -/* -supposed format agni-[id]-[in/out] -in is for input of thread (send to in and thread will recieve), -out if for output of thread (read from out to recieve from thread) -*/ -#define MQ_PREFIX "/agni-" -typedef struct mq_ntf_mdt -{ - int id; - mqd_t mq_in; - mqd_t mq_out; - mq_ntf_io_mplx io_mplx; - //_Atomic int used; nado? -} mq_ntf_mdt; - - -/* -return -1 on error -*/ -int mq_ntf_open(mq_ntf_mdt *mq, int id) -{ - const int name_size = 32; - char name[name_size]; - - if (!mq) + if (param == NULL) { - PERM(); - return -1; - } - mq->id = id; - snprintf(name, name_size, MQ_PREFIX"%d-in",id); - /*create with default configs*/ - mq->mq_in = mq_open(name, - O_RDWR | O_CREAT, 0666, NULL); - if (mq->mq_in == -1) - { - ENL(); - perror("mq_open"); - return -1; - } - snprintf(name, name_size, MQ_PREFIX"%d-out",id); - /*create with default configs*/ - mq->mq_out = mq_open(name, - O_RDWR | O_CREAT, 0666, NULL); - if (mq->mq_out == -1) - { - perror("mq_open"); - return -1; + return NULL; } - PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out); + printf("PONG\n"); + + ret = alloc_new_str("PONG"); - return 0; + return ret; } -int mq_ntf_mplx(mq_ntf_mdt *mq, int msec) +/* +Wrap call back, to manage how going be executed callback. +Manage input and output params for callback. +INPUT(string)->OUTPUT(string) +*/ +tble_cmd_resp* cllbk_wrapper( void *(*call)(void *), tble_cmd_param *param) { - fd_set io_fds; + void *data_out = NULL; + tble_cmd_resp *resp = NULL; + char *data=NULL; - if (mq == NULL) + if (call == NULL) { PERM(); - return -1; + return NULL; } - if (msec < 0) + if (param == NULL) { PERM(); - return -1; + return NULL; } - //mq->io_mplx.tv.tv_sec = msec/1000; - //mq->io_mplx.tv.tv_usec = (msec%1000)*1000; - mq->io_mplx.tv.tv_sec = 1; - mq->io_mplx.tv.tv_usec = 0; - PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out); - FD_ZERO(&io_fds); - FD_SET(mq->mq_in, &io_fds);//in first - FD_SET(mq->mq_out, &io_fds);//out second - mq->io_mplx.io_fds = io_fds; - - return 0; -} - -/* return number if there is something to read -RETURN: - -1 some error - 0x00 nothing to read - 0x01 something in queue - -*/ -//if mplx havent done, everything could blow up -int mq_ntf_select(mq_ntf_mdt *mq, int dir) -{ - int fdnum, fd; - int ret=0; - fd_set io_fds; - struct timeval tv; - - if (mq == NULL) + //prepare response + resp = tbl_cmd_resp_c(param); + if (resp == NULL) { PERM(); - return -1; } + tbl_cmd_resp_print(resp); + data = alloc_new_str(param->param); - //we should allways have just 2 descriptors to listen - //io_fds = mq->io_mplx.io_fds; - if (dir == MQ_IN) - { - fd = mq->mq_in; - } else if (dir == MQ_OUT) - { - fd = mq->mq_out; - } - FD_ZERO(&io_fds); - FD_SET(fd, &io_fds); - tv.tv_sec = 1; - tv.tv_usec = 0; - fdnum = select(1, - &io_fds, - NULL, - NULL, - &tv ); - - if (fdnum == -1) + //call callback + data_out = call(data); + + //set result to returned response + if (data_out == NULL) { - ENL(); - return -1; - } else { - if (FD_ISSET(fd, &mq->io_mplx.io_fds)) + //response doesnt have any output + if (resp) { - ret = 1; + //response dont have result + resp->ret_code = TBL_RSP_NORESP; + resp->resp = NULL; } - } - - return ret; -} - -int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size) -{ - struct mq_attr attr; - size_t bytes; - mqd_t mqd; - - unsigned int prio = 10; - - if (dir == MQ_IN) - { - mqd = mq->mq_in; - } else if (dir == MQ_OUT) - { - mqd = mq->mq_out; - } else - { - perror("Wrong direction"); - } - - mq_getattr(mqd, &attr); - if (attr.mq_maxmsg == attr.mq_curmsgs) - { - printf("queue %d out full\n", mq->id); - return -1; - } - bytes = mq_receive(mqd, buf, - size > attr.mq_msgsize ? attr.mq_msgsize : size, - &prio); - if (bytes == -1) - { - perror("mq_receive read out"); - return -1; - } - - return bytes; -} - -int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size) -{ - struct mq_attr attr; - size_t bytes; - mqd_t mqd; - - unsigned int prio = 10; - - if (dir == MQ_IN) - { - mqd = mq->mq_in; - } else if (dir == MQ_OUT) - { - mqd = mq->mq_out; - } else - { - perror("Wrong direction"); - } - - mq_getattr(mqd, &attr); - if (attr.mq_maxmsg == attr.mq_curmsgs) - { - printf("queue %d out full\n", mq->id); - return -1; - } - bytes = mq_send(mqd, buf, - size > attr.mq_msgsize ? attr.mq_msgsize : size, - prio); - if (bytes == -1) - { - perror("mq_send"); - return -1; - } - - return bytes; -} - -//dir - dirction 1 in 2 out -int mq_ntf_count(mq_ntf_mdt *mq, int dir) -{ - struct mq_attr attr; - int mq_fd; - - if (mq == NULL) - { - PERM(); - return -1; - } - - if (dir == 1) - { - mq_fd = mq->mq_in; } else { - mq_fd = mq->mq_out; - } - - if (mq_getattr(mq_fd, &attr) != -1) - { - return attr.mq_curmsgs; - } - - return -1; -} - - -/* close queue */ -int mq_ntf_close(mq_ntf_mdt *mq) -{ - const int name_size = 32; - char name[name_size]; - int id; - - id = mq->id; - snprintf(name, name_size, MQ_PREFIX"%d-in",id); - mq_unlink(name); - - snprintf(name, name_size, MQ_PREFIX"%d-out",id); - mq_unlink(name); - - //clean file descriptors for io_mplx - - return 0; -} - -//send command -int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd) -{ - - return 0; -} - -//recieve command from other end -int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd) -{ - return 0; -} - -int mq_drain_q(mqd_t mqd) -{ - struct mq_attr attr; - char *buf = NULL; - unsigned int prio = 0; - int cnt_msg = 0; - int num_read = 0; - int i = 0; - - if (mq_getattr(mqd, &attr) == -1) - { - ENL(); - return -1; - } - - buf = malloc(attr.mq_msgsize); - if (buf == NULL) - { - ENL(); - return -1; - } - - for (i=0; i<attr.mq_curmsgs; i++) - { - num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio); - if (num_read == -1) + if (resp) { - ENL(); - free(buf); - return -1; + //set succesfull repsonse + resp->ret_code = TBL_RSP_OK; + resp->resp = alloc_new_str(data_out); } - cnt_msg += 1; } - free(buf); - return cnt_msg; -} - -//drain all mq buffers to zero -//to use at begining and clear all buffers to zero -int mq_ntf_drain(mq_ntf_mdt *mq) -{ - int err; - - err = mq_drain_q(mq->mq_in); - if (err < 0) - { - PERM(); - return -1; - } - printf("Drained in %d messages\n", err); - - err = mq_drain_q(mq->mq_out); - if (err < 0) - { - PERM(); - return -1; - } - printf("Drained out %d messages\n", err); + //dangerouse place =P + free(data_out); + free(data); - return 0; + return resp; } - - -int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr) +/* +return unique ID, with atomic counter should work in all cases +*/ +_Atomic int _glbl_id=0; +int uniq_id() { - struct mq_attr gattr; - mqd_t mqdt; - - if (mq == NULL) - { - PERM(); - return -1; - } - - if (dir == MQ_OUT) - { - mqdt = mq->mq_out; - } else if (dir == MQ_IN) - { - mqdt = mq->mq_in; - } else - { - return -1; - } - - if (mq_getattr(mqdt, &gattr) == -1) - { - ENL(); - return -1; - } + int ret=-1,id; - memcpy(*attr,&gattr,sizeof(struct mq_attr)); + //what possible could go wrong? + // sry emilsp this is not important for now + id = atomic_load(&_glbl_id); + ret = id; + id += 1; + atomic_store(&_glbl_id, id); + return ret; - return 0; } + int send_mq_cmd(mq_ntf_mdt *mq, int io, int cmd_id, @@ -568,7 +267,9 @@ typedef struct server_cfg int ssl; } server_cfg; - +/******************************************************************************* +server thread +*******************************************************************************/ /* server_cfg struct as input */ #define TH_STATE_INIT 0 #define TH_STATE_START 1 @@ -607,16 +308,18 @@ int th_start_client(void *data) irc_token *itok = NULL; char *irc_line = NULL; + + atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); sleep(1); - PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); + //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); //PNL(); - PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); + //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); //conn = irc_connect(cfg->server, cfg->port); conn = irc_connect("irc.freenode.net", "6667"); //PNL(); @@ -640,17 +343,12 @@ int th_start_client(void *data) //prepare message queue mq = cfg->mq; - err = mq_ntf_mplx(mq, 10000); - if (err == -1) - { - printf("Ups something whent wrong\n"); - } if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { printf("Cant get attribute\n"); ENL(); } - PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); + //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); out_buf = malloc(out_attr.mq_msgsize); memset(out_buf, 0, out_attr.mq_msgsize); if (out_buf == NULL) @@ -658,7 +356,7 @@ int th_start_client(void *data) ENL(); } - PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); + //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1) { printf("Cant get attribute\n"); @@ -671,7 +369,7 @@ int th_start_client(void *data) ENL(); } - PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); + //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user); //create irc parsing structures memset(conn_buf, 0, TH_CONN_BUF_SZ); memset(cmd_buf, 0, TH_CONN_BUF_SZ); @@ -683,8 +381,6 @@ int th_start_client(void *data) //return 0; } - - //send command wait for response printf("Start loop\n"); run = 1; @@ -696,8 +392,10 @@ int th_start_client(void *data) switch(mq_event) { case 0: - PRINT("SERVER EVENT 0\n"); - PNL(); + PRINT("EVENT 0\n"); + break; + case 1: + PRINT("MQ_EVENT IN\n"); if (mq_ntf_read(mq, MQ_IN, in_buf, in_attr.mq_msgsize) == -1) { PNL(); @@ -710,19 +408,18 @@ int th_start_client(void *data) } PNL(); break; - case 1: default: printf("Unknown event type\n"); } //PNL(); - sleep(1); + //sleep(1); //PNL(); //fast code to exit if QUIT command is recieved if (mq_event == 1) { - PNL(); + //PNL(); recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1); if (recv_cmd != NULL) { @@ -741,12 +438,12 @@ int th_start_client(void *data) //PNL(); if((cret = read(conn,conn_buf, 256))>0) { - printf("IN>>%s",conn_buf); + //printf("IN>>%s",conn_buf); irc_buf_puts(ib, conn_buf, cret); //if (irc_buf_ready(ib) == 1) while (irc_buf_ready(ib) == 1) { - PNL(); + //PNL(); irc_line = irc_buf_line(ib); if (irc_line) { @@ -785,6 +482,8 @@ int th_start_client(void *data) { int fret,fret2; //char send_back[128]; + + PNL(); memset(cmd_buf,0,128); char *uname = token_new_str(itok,0); @@ -800,6 +499,7 @@ int th_start_client(void *data) //create command and send to mq mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); char *str_mq = mq_cmd_buf(privcmd); + printf("str_mq %s\n",str_mq); mq_ntf_write(mq, MQ_OUT, str_mq, strlen(str_mq)); //free(str_mq); mq_cmd_free(privcmd); @@ -808,6 +508,7 @@ int th_start_client(void *data) free(msg); free(uname); printf("OUT<<Send %d\n", fret); + } //PNL(); token_destroy(itok); @@ -825,13 +526,14 @@ int th_start_client(void *data) printf("End client\n"); + PNL(); atomic_fetch_sub( &cfg->running,1); return 0; } -#define EVENT_HND_STACK_SIZE (16*1024) +#define EVENT_HND_STACK_SIZE (64*1024) typedef struct event_handler_cfg { /* thread params*/ void *stack; @@ -851,6 +553,11 @@ typedef struct event_handler_cfg { #define EH_STATE_TERMINATION 6 #define EH_STATE_EXIT 7 + +/******************************************************************************* +Event thead. Recieve all events from server thread and pass them for execution. +Return results from execution to server thread. +*******************************************************************************/ /* Thread to reacieve messages and return them back */ int th_event_manager(void *data) { @@ -861,18 +568,56 @@ int th_event_manager(void *data) int run; int err; int mq_event; + //read any command mq_cmd *recv_cmd = NULL; // for recieved cmd from mq struct mq_attr out_attr, *ptr_out_attr=&out_attr; - printf("Start event thread\n"); - mq = cfg->mq_listen; - err = mq_ntf_mplx(mq, 10000); - if (err == -1) + //execution and command table generation + tble_exec *ecmd = NULL; + tbl_exec *etbl = NULL; + tble_qcmd *qcmd=NULL; + tbl_qcmd *qtbl = NULL; + + //create execution table + etbl = tbl_exec_list_c(10); + if (etbl == NULL) + { + PERM(); + return -1; + } + + ecmd = tbl_exec_c(); + if (ecmd == NULL) + { + PERM(); + return -1; + } + ecmd->id = uniq_id(); + ecmd->name = alloc_new_str("local-executor"); + ecmd->cmd = alloc_new_str("PING"); + ecmd->callback = cmd_pong; + + if (-1 == tbl_exec_add(etbl, ecmd)) + { + PERM(); + return -1; + } + tbl_exec_print_tbl(etbl, TBL_PF_EXEC_ALL); + etbl = tbl_exec_list_c(10); + if (etbl == NULL) { - printf("Ups something whent wrong\n"); + PERM(); + return -1; } + //create command table + qtbl = tbl_qcmd_c(10); + + //config mq + printf("Start event thread\n"); + mq = cfg->mq_listen; + //get mq attributes if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { @@ -886,16 +631,17 @@ int th_event_manager(void *data) run = 1; while(run) { + //PNL(); //check if there is some message and save it to buffer run += 1; - mq_event = mq_ntf_select(mq,MQ_OUT); + mq_event = mq_ntf_select(mq, MQ_OUT); switch(mq_event) { case 0: PRINT("EVENT 0\n"); break; case 1: - PRINT("EVENT MQ_OUT\n"); + PRINT("MQ_EVENT OUT\n"); if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1) { printf("Cant read output message\n"); @@ -908,17 +654,19 @@ int th_event_manager(void *data) default: printf("Unknown event type\n"); } - sleep(1); + //sleep(1); //if (run == 10) // break; + //PNL(); //if QUIT then quit the thread if (mq_event == 1) { + //PNL(); recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, -1); if (recv_cmd != NULL) { - PNL(); + //PNL(); if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) { printf("QUIT recieved lets quit main loop\n"); @@ -938,6 +686,17 @@ int th_event_manager(void *data) mq_cmd_free(privcmd); } + if (mq_cmd_o_cmp_cmd(recv_cmd,"PRIVMSG") == 0) + { + printf("Some private message\n"); + char msg[] = "PONG"; + mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); + char *str_mq = mq_cmd_buf(privcmd); + mq_ntf_write(mq, MQ_IN, str_mq, strlen(str_mq)); + free(str_mq); + mq_cmd_free(privcmd); + } + if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0) { printf("Hey dude it works second time\n"); @@ -957,12 +716,17 @@ int th_event_manager(void *data) } free(out_buf); + PNL(); printf("End event thread\n"); atomic_fetch_sub( &cfg->running,1); return 0; } +/******************************************************************************* +Main code entry code. Init threads. Give basic params to them. And then become +as a watch dog. +*******************************************************************************/ int main(int argc, char **argv) { int i; @@ -983,7 +747,15 @@ int main(int argc, char **argv) } cfg_list = malloc(sizeof(server_cfg)*cnt_servers); + if (cfg_list == NULL) + { + ENL(); + } mq_array = malloc(sizeof(mq_ntf_mdt)*cnt_servers); + if (mq_array == NULL) + { + ENL(); + } /* For each configuration create listener */ @@ -1032,6 +804,7 @@ int main(int argc, char **argv) evhnd_cfg->mq_listen = mq_array; clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg); + PNL(); /* run until all threads are up */ cnt_running = 1; while(cnt_running != 0) @@ -1046,7 +819,7 @@ int main(int argc, char **argv) if (val != 0) cnt_running += 1; } - //PRINT("cnt_running %d\n",cnt_running); + PRINT("cnt_running %d\n",cnt_running); sleep(1); } |