From 0397a4c9dee0300e0ee843fe85aa4c2d77630758 Mon Sep 17 00:00:00 2001 From: ZoRo Date: Tue, 7 Feb 2017 08:32:09 +0000 Subject: Added matching table for executed commands --- Makefile | 4 +- agni.c | 53 +++++++--- mq_cmd.c | 4 +- mq_cmd.h | 2 +- tbl_qcmd.c | 307 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tbl_qcmd.h | 267 ++++++++++++++++++++++++++++++++++++++++++++++++++ tool/mqtool.c | 53 +++++++++- 7 files changed, 672 insertions(+), 18 deletions(-) create mode 100644 tbl_qcmd.c create mode 100644 tbl_qcmd.h diff --git a/Makefile b/Makefile index 07d1ec0..cbc208a 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,9 @@ make: gcc mmm.c -c gcc darray.c -c gcc mq_cmd.c -c - gcc -Wall mq_cmd.o buf.o mmm.o darray.o agni.c -o agni -std=c11 -lrt + gcc tbl_qcmd.c -c + gcc -Wall tbl_qcmd.o mq_cmd.o buf.o mmm.o darray.o agni.c \ + -o agni -std=c11 -lrt clean: rm -f agni *.o \ No newline at end of file diff --git a/agni.c b/agni.c index 79b8441..32b3e12 100644 --- a/agni.c +++ b/agni.c @@ -22,6 +22,7 @@ #include "darray.h" #include "buf.h" #include "mq_cmd.h" +#include "tbl_qcmd.h" /* no defence programming, no error checking, no argument checking just PoC @@ -49,6 +50,23 @@ int irc_connect( char *hostname, char *port ) return fd; } +/* +return unique ID, with atomic counter should work in all cases +*/ +_Atomic int _glbl_id=0; +int uniq_id() +{ + int ret=-1,id; + + //what possible could go wrong? + id = atomic_load(&_glbl_id); + ret = id; + id += 1; + atomic_store(&_glbl_id, id); + return ret; + +} + //async io multiplexer typedef struct mq_ntf_io_mplx { @@ -515,7 +533,7 @@ int recv_mq_cmd(mq_ntf_mdt *mq, recv_sz = err; //in err should be still the size of recieved buffer - cmd_recv = mq_cmd_creates(in_buf, recv_sz); + cmd_recv = mq_cmd_creates(in_buf, recv_sz, 0); if (cmd_recv == NULL) { printf("Cannot create cmd\n"); @@ -596,12 +614,12 @@ int th_start_client(void *data) while (run) { //printf("Send msg\n"); - err = SEND_CMD_OUT(mq,cmd_id,"INIT","NO"); + err = SEND_CMD_OUT(mq,cmd_id,"FROM_CLIENT","NO"); printf("err = %d\n",err); cmd_id += 1; run += 1; sleep(1); - if (run == 10) + if (run == 2) { break; } @@ -641,7 +659,7 @@ int th_event_manager(void *data) event_handler_cfg *cfg = data; atomic_fetch_add(&cfg->running,1); mq_ntf_mdt *mq=NULL; - char *buf = NULL; + char *out_buf = NULL; int run; int err; int mq_event; @@ -663,14 +681,14 @@ int th_event_manager(void *data) printf("Cant get attribute\n"); } - buf = malloc(out_attr.mq_msgsize); + out_buf = malloc(out_attr.mq_msgsize); //maybe its not null printf("Start event loop\n"); run = 1; while(run) { - //check for messages + //check if there is some message and save it to buffer run += 1; mq_event = mq_ntf_select(mq); switch(mq_event) @@ -680,23 +698,31 @@ int th_event_manager(void *data) case 1: break; case 2: - if (mq_ntf_read(mq, MQ_OUT, buf, out_attr.mq_msgsize) == -1) + if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1) { printf("Cant read message\n"); } else { - buf[out_attr.mq_msgsize-1] = 0x0; - printf("Recieve %s\n", buf); + out_buf[out_attr.mq_msgsize-1] = 0x0; + printf("Recieve %s\n", out_buf); } break; default: printf("Unknown event type\n"); } sleep(1); - if (run == 10) - break; - } + //if (run == 10) + // break; + + //applay to recieved command executor + //if QUIT then quit the thread + //other command pass to cmd/execution matching table + //pass to exec handler + //get response if command is immidieate otherwise wait for response in next execution + + } + free(out_buf); printf("End event thread\n"); atomic_fetch_sub( &cfg->running,1); @@ -712,6 +738,9 @@ int main(int argc, char **argv) event_handler_cfg *evhnd_cfg; mq_ntf_mdt *mq_array; + /*set atomic variables to init value*/ + atomic_store(&_glbl_id, 0); + /* Load configuration */ cnt_servers = SIZEOF_SERVER_LIST; for (i=0;isz = sz; memcpy(ret->buf,str,sz); - ret->id = -1; + ret->id = id; return ret; } diff --git a/mq_cmd.h b/mq_cmd.h index 7872acf..120c427 100644 --- a/mq_cmd.h +++ b/mq_cmd.h @@ -15,7 +15,7 @@ typedef struct mq_cmd } mq_cmd; mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t param_sz); -mq_cmd* mq_cmd_creates(char *str, size_t sz); +mq_cmd* mq_cmd_creates(char *str, size_t sz, int id); int mq_cmd_id(mq_cmd *cmd, int *id); char* mq_cmd_cmd(mq_cmd *cmd, char **buf, size_t *sz); int mq_cmd_param(mq_cmd *cmd, char **param, size_t *sz); diff --git a/tbl_qcmd.c b/tbl_qcmd.c new file mode 100644 index 0000000..fb45c5a --- /dev/null +++ b/tbl_qcmd.c @@ -0,0 +1,307 @@ +#include "tbl_qcmd.h" + +tble_exec *tbl_exec_c() +{ + MVAR_ALLOC_STRC(ret,tble_exec,{ + return NULL;}); + return ret; +} + + +tbl_exec *tbl_exec_list_c(int size) +{ + MVAR_ALLOC_STRC(ret,tbl_exec,{ + return NULL;}); + + ret->size = 0; + ret->max_size = size; + + MVAR_ALLOC_ARR(reg_cmd,tble_exec*,size,{ + free(ret); + return NULL; + }) + + ret->reg_cmd = reg_cmd; + + return ret; +} + +//TODO check if command allready excists +int tbl_exec_add(tbl_exec *tbl, tble_exec *cmd) +{ + if (tbl == NULL || cmd == NULL) + { + return -1; + } + + tbl->reg_cmd[tbl->size] = cmd; + if (tbl->size+1max_size) + { + tbl->size += 1; + } else + { + return -1; + } + + return 0; +} + +int tbl_exec_in_s(tbl_exec *tbl, char *cmd) +{ + return -1; +} + +tble_exec *tbl_exec_search_cmd(tbl_exec *tbl, char *cmd) +{ + tble_exec *ret = NULL; + tble_exec *cur_cmd = NULL; + int i; + + if ((tbl == NULL)||(cmd == NULL)) + { + return NULL; + } + + for (i=0;imax_size;i++) + { + cur_cmd = tbl->reg_cmd[i]; + if ((strncmp(cur_cmd->cmd,cmd,strlen(cmd)) == 0) && + (strlen(cmd) == strlen(cur_cmd->cmd))) + { + ret = cur_cmd; + break; + } + } + + return ret; +} + +int tbl_exec_print(tbl_exec *tbl) +{ + int i; + tble_exec *cmd=NULL; + + if (tbl == NULL) + { + return -1; + } + + for (i=0;isize;i++) + { + cmd = tbl->reg_cmd[i]; + if (cmd != NULL) + { + printf("->%d<-\n",i); + printf("ID: %d\n", cmd->id); + printf("TYPE: %d\n", cmd->type); + if (cmd->name) + { + printf("%s\n", cmd->name); + } + if (cmd->cmd) + { + printf("%s\n", cmd->cmd); + } + } + } + return 0; +} + +tble_qcmd *tbl_qcmd_c() +{ + MVAR_ALLOC_STRC(ret,tble_qcmd,{ + return NULL;}); + return ret; +} + + +tbl_qcmd* tbl_qcmd_list_c(int size) +{ + MVAR_ALLOC_STRC(ret,tbl_qcmd,{ + return NULL;}); + + ret->size = 0; + ret->max_size = size; + + //alloc array of (tble_qcmd*) pointers + MVAR_ALLOC_ARR(cmd,tble_qcmd*,size,{ + free(ret); + return NULL; + }) + + ret->cmd = cmd; + + return ret; +} + +int tbl_qcmd_set_exec(tble_qcmd *qcmd, tble_exec *exec) +{ + int ret = -1; + + if ((qcmd == NULL) || (exec == NULL)) + return -1; + + //set executed command + qcmd->cmd = alloc_new_str(exec->cmd); + qcmd->state = QCMD_PREPARE; + qcmd->idx_exec = exec->id; + + ret = 0; + + return ret; +} + + +int tbl_qcmd_add(tbl_qcmd *tbl, tble_qcmd *cmd) +{ + if (tbl == NULL || cmd == NULL) + return -1; + tbl->cmd[tbl->size] = cmd; + if (tbl->size+1max_size) + { + tbl->size += 1; + } else + { + return -1; + } + + return 0; +} + + +int tbl_qcmd_chk(tbl_qcmd *tbl) +{ + int ret=-1; + int i; + + if (tbl == NULL) + { + return -1; + } + + for (i=0;imax_size;i++) + { + //if command in some state like segfaulted or + // finished running then lets check + if ((tbl->cmd[i]->state == QCMD_TIMEOUT)|| + (tbl->cmd[i]->state == QCMD_DONE)) + { + return i; + } + } + + return ret; +} + + +int tbl_qcmd_del(tbl_qcmd *tbl, int idx) +{ + int ret=-1; + tble_qcmd *replace_cmd=NULL; + + if (tbl == NULL || idx < 0) + { + return -1; + } + + if (idx >= tbl->size) + { + return -1; + } + + if (tbl->size = 1) + { + tbl->size = 0; + return 0; + } + + //if last idx = size then will overwrite itself and decrease command counter + replace_cmd = tbl->cmd[tbl->size]; + tbl->size -= 1; + tbl->cmd[idx] = replace_cmd; + + //looks like no errors + ret = 0; + + return ret; +} + +int tbl_qcmd_resp(tbl_qcmd *tbl, tble_cmd_resp *resp) +{ + int ret = -1; + tble_qcmd *single_cmd=NULL; + + int i; + + if ((tbl == NULL) || (resp == NULL)) + { + return -1; + } + + for (i=0;isize;i++) + { + single_cmd = tbl->cmd[i]; + if ((single_cmd->id == resp->id)) + { + return i; + } + } + + return ret; +} + +tble_cmd_param* tbl_cmd_param_c() +{ + + MVAR_ALLOC_STRC(ret,tble_cmd_param,{ + return NULL;}); + + return ret; +} + +int tbl_cmd_param_set(tble_cmd_param *cmd_param, tble_qcmd *cmd) +{ + + if ((cmd_param == NULL) || (cmd == NULL)) + { + return -1; + } + cmd_param->id = cmd->id; + cmd_param->cmd = alloc_new_str(cmd->cmd); + cmd_param->param = alloc_new_str(cmd->param); + cmd_param->out_q = cmd->out_q; + + return 0; +} + + +char *alloc_new_str_s(char *str, size_t size) +{ + char *ret = NULL; + + if (str == NULL) + { + return NULL; + } + + //1MB is enought + if (size > (1024*1024)) + { + return NULL; + } + + ret = malloc(size); + if (ret == NULL) + { + return NULL; + } + + memcpy(ret, str, size); + + return ret; +} + +char *alloc_new_str(char *str) +{ + return alloc_new_str_s(str, strlen(str)); +} \ No newline at end of file diff --git a/tbl_qcmd.h b/tbl_qcmd.h new file mode 100644 index 0000000..4052812 --- /dev/null +++ b/tbl_qcmd.h @@ -0,0 +1,267 @@ +#ifndef __TBL_QCMD_H +#define __TBL_QCMD_H + +#include +#include +#include +#include + +#define EXEC_CURRENT 0 //executes in current thread +#define EXEC_SEPERATE 1 //executes in seperate thread + +/* +Fields: +*/ +typedef struct tble_exec +{ + int id; + char *name; /*module name, can be used just for note*/ + char *cmd; /*command name*/ + int type; +} tble_exec; + +/* +list of registed commands +Fields: +*/ +typedef struct tbl_exec +{ + int size; + int max_size; + tble_exec **reg_cmd; +} tbl_exec; + +#define QCMD_NONE 0 //nothing happening with this command +#define QCMD_TIMEOUT 1 //command running to long without result +#define QCMD_DONE 2 //comand finsihed execution +#define QCMD_RUNNING 3 //command still in execution state +#define QCMD_PREPARE 4 //something need to be done before make it to run + +/* +Fields: +*/ +typedef struct tble_qcmd +{ + int id; + int timestamp; //when command started to execute + int state; //command execution state + int timeout; //timeout time for command + char *cmd; //cmd name + char *param; //params + mqd_t out_q; + mqd_t in_q; + int idx_exec; //table unique id of executor, check if still there ;] +} tble_qcmd; + +/* +table of commands executing +Fields: +*/ +typedef struct tbl_qcmd +{ + int size; + int max_size; + tble_qcmd **cmd; /*list of command executing and waiting for replay*/ +} tbl_qcmd; + +/* +use this structure to give params to command execution thread +Fields: +*/ +typedef struct tble_cmd_param +{ + int id; + mqd_t out_q; + char *cmd; + char *param; +} tble_cmd_param; + +/* +Fields: +*/ +typedef struct tble_cmd_resp +{ + int id; + int type; + int ret_code; + char *resp; +} tble_cmd_resp; + +/* +create exec command +Input: +Output: + !NULL - if everything whent ok + NULL - if there was some kind of mistake +*/ +tble_exec *tbl_exec_c(); + +/* +create array where to put commands +Input: + size - size of array that will be created +Output: + !NULL - if everything whent ok + NULL - if there was some kind of mistake +*/ +tbl_exec *tbl_exec_list_c(int size); + +/* +add new command to array +Input: + tbl - table of executed commands + cmd - command that shoule be added +Output: + 0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_exec_add(tbl_exec *tbl, tble_exec *cmd); + +/* +search for command in da list +Input: + tbl - table of executed commands + cmd - command name to search in the list +Output: + >=0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_exec_in_s(tbl_exec *tbl, char *cmd); + +/* +if command is found then return pointer to it +if you passs pointer further then if its will be freed +or deleted from the list your programm will segfault +so enjoy ;] +Input: + tbl - table of executed commands + cmd - command name to search in the list +Output: + >=0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +tble_exec *tbl_exec_search_cmd(tbl_exec *tbl, char *cmd); + +/* +print all entries in table +Input: + tbl - table of executed commands +Output: + >=0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_exec_print(tbl_exec *tbl); + + +/* +create qcmd commands +Input: +Output: + !NULL - if everything whent ok + NULL - if there was some kind of mistake +*/ +tble_qcmd *tbl_qcmd_c(); + +/* +create array for executed commands +Input: +Output: + !NULL - if everything whent ok + NULL - if there was some kind of mistake +*/ +tbl_qcmd* tbl_qcmd_list_c(int size); + +/* +add info from executor to command +Input: + tbl - array of executed commands + cmd - command to be added to execution list +Output: + 0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_qcmd_set_exec(tble_qcmd *qcmd, tble_exec *exec); + +/* +add command to executed array +Input: + tbl - array of executed commands + cmd - command to be added to execution list +Output: + 0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_qcmd_add(tbl_qcmd *tbl, tble_qcmd *qcmd); +/* +check if there any event on commands +Input: + +Output: + >=0 - if everything whent ok, returns index in the table (ret-1) + -1 - if there was some kind of mistake, or nothing happened +*/ +int tbl_qcmd_chk(tbl_qcmd *tbl); + +/* +delete command from the list +Input: + tbl - array of executed commands + idx - index in array that should be removed +Output: + 0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_qcmd_del(tbl_qcmd *tbl, int idx); + +/* +if there is response then try to match response and return code to +executed command requester +Input: + tbl - array of executed cmds + resp - response recieved, match it to table values +Output: + 0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_qcmd_resp(tbl_qcmd *tbl, tble_cmd_resp *resp); + +/* +create cmd param +Input: +Output: + !NULL - if everything whent ok + NULL - if there was some kind of mistake +*/ +tble_cmd_param* tbl_cmd_param_c(); + +/* +create cmd param +Input: +Output: + 0 - if everything whent ok + -1 - if there was some kind of mistake +*/ +int tbl_cmd_param_set(tble_cmd_param *cmd_param, tble_qcmd *cmd); + +char *alloc_new_str_s(char *str, size_t size); +char *alloc_new_str(char *str); + +/*mvar things*/ +//shitty macro +#define MVAR_ALLOC_STRC(VNAME,VTYPE,VRET)\ +VTYPE *VNAME;\ +VNAME=malloc(sizeof(VTYPE));\ +if ((VNAME)==NULL){\ +VRET\ +}\ +memset(VNAME,0,sizeof(VTYPE)); + +#define MVAR_ALLOC_ARR(VNAME,VTYPE,VSZ,VRET)\ +VTYPE *VNAME;\ +VNAME=malloc(sizeof(VTYPE)*(VSZ));\ +if ((VNAME)==NULL){\ +VRET\ +}\ +memset(VNAME,0,sizeof(VTYPE)*(VSZ)); + +#endif diff --git a/tool/mqtool.c b/tool/mqtool.c index 61e78af..76bf660 100644 --- a/tool/mqtool.c +++ b/tool/mqtool.c @@ -45,23 +45,41 @@ int main(int argc, char **argv) char* mqNameClear = NULL; //clear print extra info if there is int flagExtra = 0; - + //send string to queue + int flagSendQueue = 0; + char *mqNameSend = NULL; //mq name where to send + //argument value that to send into queue + int flagSendStr = 0; + char *sStrSend = NULL; //mq string that should be push'ed in queue //get arguments - while ((c = getopt(argc, argv, "elp:c:")) != -1) + while ((c = getopt(argc, argv, "elp:c:s:q:")) != -1) switch(c) { + //list all mqueue's case 'l': flagListAll = 1; break; + //show param of mq case 'p': flagMqParam = 1; mqName = optarg; break; + //get all messages, print out and clean queue case 'c': flagClear = 1; mqNameClear = optarg; break; + //send string to queue + case 's': + flagSendQueue = 1; + mqNameSend = optarg; + break; + case 'q': + flagSendStr = 1; + sStrSend = optarg; + break; + //show extra information if avaliable case 'e': flagExtra = 1; break; @@ -177,6 +195,37 @@ int main(int argc, char **argv) printf("Readed %d messages from %ld\n", iCountMsg, mqAttr.mq_curmsgs); + exit(0); + } + if (flagSendQueue && flagSendStr) + { + size_t szStr=0; + int mqFd = -1; + int numRead = -1; + int prio = 0; + struct mq_attr mqAttr; + + mqFd = mq_open(mqNameSend, O_RDWR, 0666, NULL); + if (mqFd == -1) + { + printf("Cant open %s\n", mqName); + exit(1); + } + + iRet = mq_getattr(mqFd, &mqAttr); + if (iRet == -1) + { + perror("Cant get attributes"); + exit(1); + } + + iRet = mq_send(mqFd, sStrSend, strlen(sStrSend), prio); + if (iRet == -1) + { + perror("Cant send message"); + exit(1); + } + exit(0); } -- cgit v1.2.3