summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rw-r--r--agni.c53
-rw-r--r--mq_cmd.c4
-rw-r--r--mq_cmd.h2
-rw-r--r--tbl_qcmd.c307
-rw-r--r--tbl_qcmd.h267
-rw-r--r--tool/mqtool.c53
7 files changed, 672 insertions, 18 deletions
diff --git a/Makefile b/Makefile
index 07d1ec0..cbc208a 100644
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,9 @@ make:
gcc mmm.c -c
gcc darray.c -c
gcc mq_cmd.c -c
- gcc -Wall mq_cmd.o buf.o mmm.o darray.o agni.c -o agni -std=c11 -lrt
+ gcc tbl_qcmd.c -c
+ gcc -Wall tbl_qcmd.o mq_cmd.o buf.o mmm.o darray.o agni.c \
+ -o agni -std=c11 -lrt
clean:
rm -f agni *.o \ No newline at end of file
diff --git a/agni.c b/agni.c
index 79b8441..32b3e12 100644
--- a/agni.c
+++ b/agni.c
@@ -22,6 +22,7 @@
#include "darray.h"
#include "buf.h"
#include "mq_cmd.h"
+#include "tbl_qcmd.h"
/*
no defence programming, no error checking, no argument checking just PoC
@@ -49,6 +50,23 @@ int irc_connect( char *hostname, char *port )
return fd;
}
+/*
+return unique ID, with atomic counter should work in all cases
+*/
+_Atomic int _glbl_id=0;
+int uniq_id()
+{
+ int ret=-1,id;
+
+ //what possible could go wrong?
+ id = atomic_load(&_glbl_id);
+ ret = id;
+ id += 1;
+ atomic_store(&_glbl_id, id);
+ return ret;
+
+}
+
//async io multiplexer
typedef struct mq_ntf_io_mplx
{
@@ -515,7 +533,7 @@ int recv_mq_cmd(mq_ntf_mdt *mq,
recv_sz = err;
//in err should be still the size of recieved buffer
- cmd_recv = mq_cmd_creates(in_buf, recv_sz);
+ cmd_recv = mq_cmd_creates(in_buf, recv_sz, 0);
if (cmd_recv == NULL)
{
printf("Cannot create cmd\n");
@@ -596,12 +614,12 @@ int th_start_client(void *data)
while (run)
{
//printf("Send msg\n");
- err = SEND_CMD_OUT(mq,cmd_id,"INIT","NO");
+ err = SEND_CMD_OUT(mq,cmd_id,"FROM_CLIENT","NO");
printf("err = %d\n",err);
cmd_id += 1;
run += 1;
sleep(1);
- if (run == 10)
+ if (run == 2)
{
break;
}
@@ -641,7 +659,7 @@ int th_event_manager(void *data)
event_handler_cfg *cfg = data;
atomic_fetch_add(&cfg->running,1);
mq_ntf_mdt *mq=NULL;
- char *buf = NULL;
+ char *out_buf = NULL;
int run;
int err;
int mq_event;
@@ -663,14 +681,14 @@ int th_event_manager(void *data)
printf("Cant get attribute\n");
}
- buf = malloc(out_attr.mq_msgsize);
+ out_buf = malloc(out_attr.mq_msgsize);
//maybe its not null
printf("Start event loop\n");
run = 1;
while(run)
{
- //check for messages
+ //check if there is some message and save it to buffer
run += 1;
mq_event = mq_ntf_select(mq);
switch(mq_event)
@@ -680,23 +698,31 @@ int th_event_manager(void *data)
case 1:
break;
case 2:
- if (mq_ntf_read(mq, MQ_OUT, buf, out_attr.mq_msgsize) == -1)
+ if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1)
{
printf("Cant read message\n");
} else
{
- buf[out_attr.mq_msgsize-1] = 0x0;
- printf("Recieve %s\n", buf);
+ out_buf[out_attr.mq_msgsize-1] = 0x0;
+ printf("Recieve %s\n", out_buf);
}
break;
default:
printf("Unknown event type\n");
}
sleep(1);
- if (run == 10)
- break;
- }
+ //if (run == 10)
+ // break;
+
+ //applay to recieved command executor
+ //if QUIT then quit the thread
+ //other command pass to cmd/execution matching table
+ //pass to exec handler
+ //get response if command is immidieate otherwise wait for response in next execution
+
+ }
+ free(out_buf);
printf("End event thread\n");
atomic_fetch_sub( &cfg->running,1);
@@ -712,6 +738,9 @@ int main(int argc, char **argv)
event_handler_cfg *evhnd_cfg;
mq_ntf_mdt *mq_array;
+ /*set atomic variables to init value*/
+ atomic_store(&_glbl_id, 0);
+
/* Load configuration */
cnt_servers = SIZEOF_SERVER_LIST;
for (i=0;i<SIZEOF_SERVER_LIST;i++)
diff --git a/mq_cmd.c b/mq_cmd.c
index ce48713..8956bfc 100644
--- a/mq_cmd.c
+++ b/mq_cmd.c
@@ -41,7 +41,7 @@ mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t para
return ret;
}
-mq_cmd* mq_cmd_creates(char *str, size_t sz)
+mq_cmd* mq_cmd_creates(char *str, size_t sz, int id)
{
mq_cmd *ret = NULL;
@@ -55,7 +55,7 @@ mq_cmd* mq_cmd_creates(char *str, size_t sz)
}
ret->sz = sz;
memcpy(ret->buf,str,sz);
- ret->id = -1;
+ ret->id = id;
return ret;
}
diff --git a/mq_cmd.h b/mq_cmd.h
index 7872acf..120c427 100644
--- a/mq_cmd.h
+++ b/mq_cmd.h
@@ -15,7 +15,7 @@ typedef struct mq_cmd
} mq_cmd;
mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t param_sz);
-mq_cmd* mq_cmd_creates(char *str, size_t sz);
+mq_cmd* mq_cmd_creates(char *str, size_t sz, int id);
int mq_cmd_id(mq_cmd *cmd, int *id);
char* mq_cmd_cmd(mq_cmd *cmd, char **buf, size_t *sz);
int mq_cmd_param(mq_cmd *cmd, char **param, size_t *sz);
diff --git a/tbl_qcmd.c b/tbl_qcmd.c
new file mode 100644
index 0000000..fb45c5a
--- /dev/null
+++ b/tbl_qcmd.c
@@ -0,0 +1,307 @@
+#include "tbl_qcmd.h"
+
+tble_exec *tbl_exec_c()
+{
+ MVAR_ALLOC_STRC(ret,tble_exec,{
+ return NULL;});
+ return ret;
+}
+
+
+tbl_exec *tbl_exec_list_c(int size)
+{
+ MVAR_ALLOC_STRC(ret,tbl_exec,{
+ return NULL;});
+
+ ret->size = 0;
+ ret->max_size = size;
+
+ MVAR_ALLOC_ARR(reg_cmd,tble_exec*,size,{
+ free(ret);
+ return NULL;
+ })
+
+ ret->reg_cmd = reg_cmd;
+
+ return ret;
+}
+
+//TODO check if command allready excists
+int tbl_exec_add(tbl_exec *tbl, tble_exec *cmd)
+{
+ if (tbl == NULL || cmd == NULL)
+ {
+ return -1;
+ }
+
+ tbl->reg_cmd[tbl->size] = cmd;
+ if (tbl->size+1<tbl->max_size)
+ {
+ tbl->size += 1;
+ } else
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+int tbl_exec_in_s(tbl_exec *tbl, char *cmd)
+{
+ return -1;
+}
+
+tble_exec *tbl_exec_search_cmd(tbl_exec *tbl, char *cmd)
+{
+ tble_exec *ret = NULL;
+ tble_exec *cur_cmd = NULL;
+ int i;
+
+ if ((tbl == NULL)||(cmd == NULL))
+ {
+ return NULL;
+ }
+
+ for (i=0;i<tbl->max_size;i++)
+ {
+ cur_cmd = tbl->reg_cmd[i];
+ if ((strncmp(cur_cmd->cmd,cmd,strlen(cmd)) == 0) &&
+ (strlen(cmd) == strlen(cur_cmd->cmd)))
+ {
+ ret = cur_cmd;
+ break;
+ }
+ }
+
+ return ret;
+}
+
+int tbl_exec_print(tbl_exec *tbl)
+{
+ int i;
+ tble_exec *cmd=NULL;
+
+ if (tbl == NULL)
+ {
+ return -1;
+ }
+
+ for (i=0;i<tbl->size;i++)
+ {
+ cmd = tbl->reg_cmd[i];
+ if (cmd != NULL)
+ {
+ printf("->%d<-\n",i);
+ printf("ID: %d\n", cmd->id);
+ printf("TYPE: %d\n", cmd->type);
+ if (cmd->name)
+ {
+ printf("%s\n", cmd->name);
+ }
+ if (cmd->cmd)
+ {
+ printf("%s\n", cmd->cmd);
+ }
+ }
+ }
+ return 0;
+}
+
+tble_qcmd *tbl_qcmd_c()
+{
+ MVAR_ALLOC_STRC(ret,tble_qcmd,{
+ return NULL;});
+ return ret;
+}
+
+
+tbl_qcmd* tbl_qcmd_list_c(int size)
+{
+ MVAR_ALLOC_STRC(ret,tbl_qcmd,{
+ return NULL;});
+
+ ret->size = 0;
+ ret->max_size = size;
+
+ //alloc array of (tble_qcmd*) pointers
+ MVAR_ALLOC_ARR(cmd,tble_qcmd*,size,{
+ free(ret);
+ return NULL;
+ })
+
+ ret->cmd = cmd;
+
+ return ret;
+}
+
+int tbl_qcmd_set_exec(tble_qcmd *qcmd, tble_exec *exec)
+{
+ int ret = -1;
+
+ if ((qcmd == NULL) || (exec == NULL))
+ return -1;
+
+ //set executed command
+ qcmd->cmd = alloc_new_str(exec->cmd);
+ qcmd->state = QCMD_PREPARE;
+ qcmd->idx_exec = exec->id;
+
+ ret = 0;
+
+ return ret;
+}
+
+
+int tbl_qcmd_add(tbl_qcmd *tbl, tble_qcmd *cmd)
+{
+ if (tbl == NULL || cmd == NULL)
+ return -1;
+ tbl->cmd[tbl->size] = cmd;
+ if (tbl->size+1<tbl->max_size)
+ {
+ tbl->size += 1;
+ } else
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int tbl_qcmd_chk(tbl_qcmd *tbl)
+{
+ int ret=-1;
+ int i;
+
+ if (tbl == NULL)
+ {
+ return -1;
+ }
+
+ for (i=0;i<tbl->max_size;i++)
+ {
+ //if command in some state like segfaulted or
+ // finished running then lets check
+ if ((tbl->cmd[i]->state == QCMD_TIMEOUT)||
+ (tbl->cmd[i]->state == QCMD_DONE))
+ {
+ return i;
+ }
+ }
+
+ return ret;
+}
+
+
+int tbl_qcmd_del(tbl_qcmd *tbl, int idx)
+{
+ int ret=-1;
+ tble_qcmd *replace_cmd=NULL;
+
+ if (tbl == NULL || idx < 0)
+ {
+ return -1;
+ }
+
+ if (idx >= tbl->size)
+ {
+ return -1;
+ }
+
+ if (tbl->size = 1)
+ {
+ tbl->size = 0;
+ return 0;
+ }
+
+ //if last idx = size then will overwrite itself and decrease command counter
+ replace_cmd = tbl->cmd[tbl->size];
+ tbl->size -= 1;
+ tbl->cmd[idx] = replace_cmd;
+
+ //looks like no errors
+ ret = 0;
+
+ return ret;
+}
+
+int tbl_qcmd_resp(tbl_qcmd *tbl, tble_cmd_resp *resp)
+{
+ int ret = -1;
+ tble_qcmd *single_cmd=NULL;
+
+ int i;
+
+ if ((tbl == NULL) || (resp == NULL))
+ {
+ return -1;
+ }
+
+ for (i=0;i<tbl->size;i++)
+ {
+ single_cmd = tbl->cmd[i];
+ if ((single_cmd->id == resp->id))
+ {
+ return i;
+ }
+ }
+
+ return ret;
+}
+
+tble_cmd_param* tbl_cmd_param_c()
+{
+
+ MVAR_ALLOC_STRC(ret,tble_cmd_param,{
+ return NULL;});
+
+ return ret;
+}
+
+int tbl_cmd_param_set(tble_cmd_param *cmd_param, tble_qcmd *cmd)
+{
+
+ if ((cmd_param == NULL) || (cmd == NULL))
+ {
+ return -1;
+ }
+ cmd_param->id = cmd->id;
+ cmd_param->cmd = alloc_new_str(cmd->cmd);
+ cmd_param->param = alloc_new_str(cmd->param);
+ cmd_param->out_q = cmd->out_q;
+
+ return 0;
+}
+
+
+char *alloc_new_str_s(char *str, size_t size)
+{
+ char *ret = NULL;
+
+ if (str == NULL)
+ {
+ return NULL;
+ }
+
+ //1MB is enought
+ if (size > (1024*1024))
+ {
+ return NULL;
+ }
+
+ ret = malloc(size);
+ if (ret == NULL)
+ {
+ return NULL;
+ }
+
+ memcpy(ret, str, size);
+
+ return ret;
+}
+
+char *alloc_new_str(char *str)
+{
+ return alloc_new_str_s(str, strlen(str));
+} \ No newline at end of file
diff --git a/tbl_qcmd.h b/tbl_qcmd.h
new file mode 100644
index 0000000..4052812
--- /dev/null
+++ b/tbl_qcmd.h
@@ -0,0 +1,267 @@
+#ifndef __TBL_QCMD_H
+#define __TBL_QCMD_H
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <mqueue.h>
+#include <string.h>
+
+#define EXEC_CURRENT 0 //executes in current thread
+#define EXEC_SEPERATE 1 //executes in seperate thread
+
+/*
+Fields:
+*/
+typedef struct tble_exec
+{
+ int id;
+ char *name; /*module name, can be used just for note*/
+ char *cmd; /*command name*/
+ int type;
+} tble_exec;
+
+/*
+list of registed commands
+Fields:
+*/
+typedef struct tbl_exec
+{
+ int size;
+ int max_size;
+ tble_exec **reg_cmd;
+} tbl_exec;
+
+#define QCMD_NONE 0 //nothing happening with this command
+#define QCMD_TIMEOUT 1 //command running to long without result
+#define QCMD_DONE 2 //comand finsihed execution
+#define QCMD_RUNNING 3 //command still in execution state
+#define QCMD_PREPARE 4 //something need to be done before make it to run
+
+/*
+Fields:
+*/
+typedef struct tble_qcmd
+{
+ int id;
+ int timestamp; //when command started to execute
+ int state; //command execution state
+ int timeout; //timeout time for command
+ char *cmd; //cmd name
+ char *param; //params
+ mqd_t out_q;
+ mqd_t in_q;
+ int idx_exec; //table unique id of executor, check if still there ;]
+} tble_qcmd;
+
+/*
+table of commands executing
+Fields:
+*/
+typedef struct tbl_qcmd
+{
+ int size;
+ int max_size;
+ tble_qcmd **cmd; /*list of command executing and waiting for replay*/
+} tbl_qcmd;
+
+/*
+use this structure to give params to command execution thread
+Fields:
+*/
+typedef struct tble_cmd_param
+{
+ int id;
+ mqd_t out_q;
+ char *cmd;
+ char *param;
+} tble_cmd_param;
+
+/*
+Fields:
+*/
+typedef struct tble_cmd_resp
+{
+ int id;
+ int type;
+ int ret_code;
+ char *resp;
+} tble_cmd_resp;
+
+/*
+create exec command
+Input:
+Output:
+ !NULL - if everything whent ok
+ NULL - if there was some kind of mistake
+*/
+tble_exec *tbl_exec_c();
+
+/*
+create array where to put commands
+Input:
+ size - size of array that will be created
+Output:
+ !NULL - if everything whent ok
+ NULL - if there was some kind of mistake
+*/
+tbl_exec *tbl_exec_list_c(int size);
+
+/*
+add new command to array
+Input:
+ tbl - table of executed commands
+ cmd - command that shoule be added
+Output:
+ 0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_exec_add(tbl_exec *tbl, tble_exec *cmd);
+
+/*
+search for command in da list
+Input:
+ tbl - table of executed commands
+ cmd - command name to search in the list
+Output:
+ >=0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_exec_in_s(tbl_exec *tbl, char *cmd);
+
+/*
+if command is found then return pointer to it
+if you passs pointer further then if its will be freed
+or deleted from the list your programm will segfault
+so enjoy ;]
+Input:
+ tbl - table of executed commands
+ cmd - command name to search in the list
+Output:
+ >=0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+tble_exec *tbl_exec_search_cmd(tbl_exec *tbl, char *cmd);
+
+/*
+print all entries in table
+Input:
+ tbl - table of executed commands
+Output:
+ >=0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_exec_print(tbl_exec *tbl);
+
+
+/*
+create qcmd commands
+Input:
+Output:
+ !NULL - if everything whent ok
+ NULL - if there was some kind of mistake
+*/
+tble_qcmd *tbl_qcmd_c();
+
+/*
+create array for executed commands
+Input:
+Output:
+ !NULL - if everything whent ok
+ NULL - if there was some kind of mistake
+*/
+tbl_qcmd* tbl_qcmd_list_c(int size);
+
+/*
+add info from executor to command
+Input:
+ tbl - array of executed commands
+ cmd - command to be added to execution list
+Output:
+ 0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_qcmd_set_exec(tble_qcmd *qcmd, tble_exec *exec);
+
+/*
+add command to executed array
+Input:
+ tbl - array of executed commands
+ cmd - command to be added to execution list
+Output:
+ 0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_qcmd_add(tbl_qcmd *tbl, tble_qcmd *qcmd);
+/*
+check if there any event on commands
+Input:
+
+Output:
+ >=0 - if everything whent ok, returns index in the table (ret-1)
+ -1 - if there was some kind of mistake, or nothing happened
+*/
+int tbl_qcmd_chk(tbl_qcmd *tbl);
+
+/*
+delete command from the list
+Input:
+ tbl - array of executed commands
+ idx - index in array that should be removed
+Output:
+ 0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_qcmd_del(tbl_qcmd *tbl, int idx);
+
+/*
+if there is response then try to match response and return code to
+executed command requester
+Input:
+ tbl - array of executed cmds
+ resp - response recieved, match it to table values
+Output:
+ 0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_qcmd_resp(tbl_qcmd *tbl, tble_cmd_resp *resp);
+
+/*
+create cmd param
+Input:
+Output:
+ !NULL - if everything whent ok
+ NULL - if there was some kind of mistake
+*/
+tble_cmd_param* tbl_cmd_param_c();
+
+/*
+create cmd param
+Input:
+Output:
+ 0 - if everything whent ok
+ -1 - if there was some kind of mistake
+*/
+int tbl_cmd_param_set(tble_cmd_param *cmd_param, tble_qcmd *cmd);
+
+char *alloc_new_str_s(char *str, size_t size);
+char *alloc_new_str(char *str);
+
+/*mvar things*/
+//shitty macro
+#define MVAR_ALLOC_STRC(VNAME,VTYPE,VRET)\
+VTYPE *VNAME;\
+VNAME=malloc(sizeof(VTYPE));\
+if ((VNAME)==NULL){\
+VRET\
+}\
+memset(VNAME,0,sizeof(VTYPE));
+
+#define MVAR_ALLOC_ARR(VNAME,VTYPE,VSZ,VRET)\
+VTYPE *VNAME;\
+VNAME=malloc(sizeof(VTYPE)*(VSZ));\
+if ((VNAME)==NULL){\
+VRET\
+}\
+memset(VNAME,0,sizeof(VTYPE)*(VSZ));
+
+#endif
diff --git a/tool/mqtool.c b/tool/mqtool.c
index 61e78af..76bf660 100644
--- a/tool/mqtool.c
+++ b/tool/mqtool.c
@@ -45,23 +45,41 @@ int main(int argc, char **argv)
char* mqNameClear = NULL;
//clear print extra info if there is
int flagExtra = 0;
-
+ //send string to queue
+ int flagSendQueue = 0;
+ char *mqNameSend = NULL; //mq name where to send
+ //argument value that to send into queue
+ int flagSendStr = 0;
+ char *sStrSend = NULL; //mq string that should be push'ed in queue
//get arguments
- while ((c = getopt(argc, argv, "elp:c:")) != -1)
+ while ((c = getopt(argc, argv, "elp:c:s:q:")) != -1)
switch(c)
{
+ //list all mqueue's
case 'l':
flagListAll = 1;
break;
+ //show param of mq
case 'p':
flagMqParam = 1;
mqName = optarg;
break;
+ //get all messages, print out and clean queue
case 'c':
flagClear = 1;
mqNameClear = optarg;
break;
+ //send string to queue
+ case 's':
+ flagSendQueue = 1;
+ mqNameSend = optarg;
+ break;
+ case 'q':
+ flagSendStr = 1;
+ sStrSend = optarg;
+ break;
+ //show extra information if avaliable
case 'e':
flagExtra = 1;
break;
@@ -179,5 +197,36 @@ int main(int argc, char **argv)
exit(0);
}
+ if (flagSendQueue && flagSendStr)
+ {
+ size_t szStr=0;
+ int mqFd = -1;
+ int numRead = -1;
+ int prio = 0;
+ struct mq_attr mqAttr;
+
+ mqFd = mq_open(mqNameSend, O_RDWR, 0666, NULL);
+ if (mqFd == -1)
+ {
+ printf("Cant open %s\n", mqName);
+ exit(1);
+ }
+
+ iRet = mq_getattr(mqFd, &mqAttr);
+ if (iRet == -1)
+ {
+ perror("Cant get attributes");
+ exit(1);
+ }
+
+ iRet = mq_send(mqFd, sStrSend, strlen(sStrSend), prio);
+ if (iRet == -1)
+ {
+ perror("Cant send message");
+ exit(1);
+ }
+
+ exit(0);
+ }
} \ No newline at end of file