diff options
author | ZoRo <dos21h@gmail.com> | 2017-01-26 22:23:28 +0000 |
---|---|---|
committer | ZoRo <dos21h@gmail.com> | 2017-01-26 22:23:28 +0000 |
commit | 67860598185d248756316549a7522968f7294990 (patch) | |
tree | b329576eca4f5dccbfec2bf7a4952abc93b8b9c8 | |
parent | a588aa017512d3cc70dde6627d1218020e755259 (diff) | |
download | agni-67860598185d248756316549a7522968f7294990.tar.gz agni-67860598185d248756316549a7522968f7294990.zip |
Made working basic mq IPC communication
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | agni.c | 498 | ||||
-rw-r--r-- | config_servers.h | 4 | ||||
-rw-r--r-- | debug.h | 30 | ||||
-rw-r--r-- | mq_cmd.c | 46 | ||||
-rw-r--r-- | mq_cmd.h | 3 | ||||
-rw-r--r-- | tool/Makefile | 2 | ||||
-rw-r--r-- | tool/mqtool.c | 183 |
8 files changed, 667 insertions, 101 deletions
@@ -3,7 +3,7 @@ make: gcc mmm.c -c gcc darray.c -c gcc mq_cmd.c -c - gcc mq_cmd.o buf.o mmm.o darray.o agni.c -o agni -std=c11 -lrt + gcc -Wall mq_cmd.o buf.o mmm.o darray.o agni.c -o agni -std=c11 -lrt clean: rm -f agni *.o
\ No newline at end of file @@ -49,23 +49,13 @@ int irc_connect( char *hostname, char *port ) return fd; } -char *create_mq_cmd(char *out, size_t out_sz, int id, char *cmd, size_t cmd_sz, char *payload, size_t p_sz) +//async io multiplexer +typedef struct mq_ntf_io_mplx { - char *ret = out; - int n,i; + struct timeval tv; + fd_set io_fds; +} mq_ntf_io_mplx; - n = snprintf(out, out_sz, ":%d:%s ", id, cmd); - if (n < 0) - { - return NULL; - } - for (i=0;i<p_sz,n<out_sz-1;i++,n++) - { - out[n] = payload[i]; - } - out[n++]=0x0; - return ret; -} /* supposed format agni-[id]-[in/out] @@ -78,9 +68,11 @@ typedef struct mq_ntf_mdt int id; mqd_t mq_in; mqd_t mq_out; + mq_ntf_io_mplx io_mplx; //_Atomic int used; nado? } mq_ntf_mdt; + /* return -1 on error */ @@ -91,6 +83,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id) if (!mq) { + PERM(); return -1; } mq->id = id; @@ -100,6 +93,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id) O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_in == -1) { + ENL(); perror("mq_open"); return -1; } @@ -112,15 +106,96 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id) perror("mq_open"); return -1; } + + PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out); + + return 0; +} + +int mq_ntf_mplx(mq_ntf_mdt *mq, int msec) +{ + fd_set io_fds; + + if (mq == NULL) + { + PERM(); + return -1; + } + + if (msec < 0) + { + PERM(); + return -1; + } + + //mq->io_mplx.tv.tv_sec = msec/1000; + //mq->io_mplx.tv.tv_usec = (msec%1000)*1000; + mq->io_mplx.tv.tv_sec = 2; + mq->io_mplx.tv.tv_usec = 0; + PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out); + FD_ZERO(&io_fds); + FD_SET(mq->mq_in, &io_fds);//in first + FD_SET(mq->mq_out, &io_fds);//out second + mq->io_mplx.io_fds = io_fds; + return 0; } +/* return number if there is something to read +RETURN: + -1 some error + 0x00 nothing to read + 0x01 input ready + 0x02 output ready + 0x02|0x01 input&output ready -/* return number if there is something to read */ +*/ +//if mplx havent done, everything could blow up int mq_ntf_select(mq_ntf_mdt *mq) { + int fdnum, fd1, fd2; + int ret=0x00; + fd_set io_fds; + struct timeval tv; - return 0; + if (mq == NULL) + { + PERM(); + return -1; + } + + //we should allways have just 2 descriptors to listen + //io_fds = mq->io_mplx.io_fds; + fd1 = mq->mq_in; + fd2 = mq->mq_out; + FD_ZERO(&io_fds); + FD_SET(fd1, &io_fds); + FD_SET(fd2, &io_fds); + tv.tv_sec = 1; + tv.tv_usec = 0; + fdnum = select(2, + &io_fds, + NULL, + NULL, + &tv ); + + if (fdnum == -1) + { + ENL(); + return -1; + } else { + if (FD_ISSET(mq->mq_in, &mq->io_mplx.io_fds)) + { + ret = 0x01; + } + + if (FD_ISSET(mq->mq_out, &mq->io_mplx.io_fds)) + { + ret = 0x02; + } + } + + return ret; } @@ -128,10 +203,10 @@ int mq_ntf_select(mq_ntf_mdt *mq) int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size) { struct mq_attr attr; - char *msg; size_t bytes; unsigned int prio = 10; - int fret; + int fret=0; + fret = mq_getattr(mq->mq_in, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { @@ -146,21 +221,21 @@ int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size) perror("mq_receive read in"); return -1; } - return 0; + return bytes; } /* read from output queue */ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) { struct mq_attr attr; - char *msg; + //char *msg; size_t bytes; unsigned int prio = 10; int fret; fret = mq_getattr(mq->mq_out, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { - printf("queue %d out full", mq->id); + printf("queue %d out full\n", mq->id); return -1; } bytes = mq_receive(mq->mq_out, buf, @@ -171,7 +246,7 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) perror("mq_receive read out"); return -1; } - return 0; + return bytes; } @@ -179,10 +254,10 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size) { struct mq_attr attr; - char *msg; + //char *msg; size_t bytes; unsigned int prio = 10; - int fret; + int fret=0; fret = mq_getattr(mq->mq_out, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { @@ -197,17 +272,17 @@ int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size) perror("mq_send"); return -1; } - return 0; + return bytes; } /* write to input quque */ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size) { struct mq_attr attr; - char *msg; + //char *msg; size_t bytes; unsigned int prio = 10; - int fret; + int fret=0; fret = mq_getattr(mq->mq_in, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { @@ -222,43 +297,35 @@ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size) perror("mq_send"); return -1; } - return 0; + return bytes; } -/* drain all messages from quque */ -int mq_ntf_drain(mq_ntf_mdt *mq) +//dir - dirction 1 in 2 out +int mq_ntf_count(mq_ntf_mdt *mq, int dir) { - int i,j; - struct mq_attr curattr; - char *buf; - ssize_t bytes; - int prio = 10; - mqd_t mqlist[2] = {mq->mq_in, mq->mq_out}; - - for (i=0;i<2;i++) - { - mq_getattr(mqlist[i], &curattr); - buf = malloc(curattr.mq_maxmsg); - for (j=0; j<curattr.mq_curmsgs; j++) - { - bytes = mq_receive(mqlist[i], buf, curattr.mq_maxmsg, &prio); - if (bytes == -1) - { - perror("mq_receive drain"); - } - printf("Drain %d bytes\n", bytes); - } - free(buf); + struct mq_attr attr; + int mq_fd; + + if (mq == NULL) + { + PERM(); + return -1; } - return 0; -} + if (dir == 1) + { + mq_fd = mq->mq_in; + } else + { + mq_fd = mq->mq_out; + } -/* check if there is space in quque */ -int mq_ntf_full(mq_ntf_mdt *mq) -{ + if (mq_getattr(mq_fd, &attr) != -1) + { + return attr.mq_curmsgs; + } - return 0; + return -1; } @@ -276,6 +343,8 @@ int mq_ntf_close(mq_ntf_mdt *mq) snprintf(name, name_size, MQ_PREFIX"%d-out",id); mq_unlink(name); + //clean file descriptors for io_mplx + return 0; } @@ -292,6 +361,210 @@ int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd) return 0; } +int mq_drain_q(mqd_t mqd) +{ + struct mq_attr attr; + char *buf = NULL; + unsigned int prio = 0; + int cnt_msg = 0; + int num_read = 0; + int i = 0; + + if (mq_getattr(mqd, &attr) == -1) + { + ENL(); + return -1; + } + + buf = malloc(attr.mq_msgsize); + if (buf == NULL) + { + ENL(); + return -1; + } + + for (i=0; i<attr.mq_curmsgs; i++) + { + num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio); + if (num_read == -1) + { + ENL(); + free(buf); + return -1; + } + cnt_msg += 1; + } + + free(buf); + return cnt_msg; +} + +//drain all mq buffers to zero +//to use at begining and clear all buffers to zero +int mq_ntf_drain(mq_ntf_mdt *mq) +{ + int err; + + err = mq_drain_q(mq->mq_in); + if (err < 0) + { + PERM(); + return -1; + } + printf("Drained in %d messages\n", err); + + err = mq_drain_q(mq->mq_out); + if (err < 0) + { + PERM(); + return -1; + } + printf("Drained out %d messages\n", err); + + return 0; +} + +#define MQ_OUT 1 +#define MQ_IN 2 + +int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr) +{ + struct mq_attr gattr; + mqd_t mqdt; + + if (mq == NULL) + { + PERM(); + return -1; + } + + if (dir == MQ_OUT) + { + mqdt = mq->mq_out; + } else if (dir == MQ_IN) + { + mqdt = mq->mq_in; + } else + { + return -1; + } + + if (mq_getattr(mqdt, &gattr) == -1) + { + ENL(); + return -1; + } + + memcpy(*attr,&gattr,sizeof(struct mq_attr)); + + return 0; +} + + + +int send_mq_cmd(mq_ntf_mdt *mq, + int io, + int cmd_id, + char *cmd, + size_t cmd_sz, + char *param, + size_t param_sz) +{ + int err = 0; + mq_cmd *cmd_send = NULL; + cmd_send = mq_cmd_create(cmd_id,cmd,cmd_sz,param,param_sz); + if (cmd_send == NULL) + { + printf("Cant create command\n"); + return -1; + } + + err = -1; + if (io == MQ_OUT) + { + err = mq_ntf_write_out(mq, cmd_send->buf, cmd_send->sz); + } else if (io == MQ_IN) + { + err = mq_ntf_write_in(mq, cmd_send->buf, cmd_send->sz); + } else + { + printf("Unknown direction"); + return -1; + } + + if (err == -1) + { + printf("Couldnt recieve command\n"); + mq_cmd_free(cmd_send); + return -1; + } + + mq_cmd_free(cmd_send); + + return 0; +} + +int recv_mq_cmd(mq_ntf_mdt *mq, + int io, + mq_cmd **cmd) //return back recieved command +{ + int err = 0; + mq_cmd *cmd_recv = NULL; + int recv_sz=-1; + char *in_buf=NULL; + + in_buf = malloc(MQ_MSG_SIZE); + if (in_buf == NULL) + { + ENL(); + return -1; + } + + err = -1; + if (io == MQ_OUT) + { + err = mq_ntf_read_out(mq, in_buf, MQ_MSG_SIZE); + } else if (io == MQ_IN) + { + err = mq_ntf_read_in(mq, in_buf, MQ_MSG_SIZE); + } else + { + printf("Unknown direction"); + return -1; + } + + if (err == -1) + { + printf("Couldnt recieve command\n"); + free(in_buf); + return -1; + } + recv_sz = err; + + //in err should be still the size of recieved buffer + cmd_recv = mq_cmd_creates(in_buf, recv_sz); + if (cmd_recv == NULL) + { + printf("Cannot create cmd\n"); + free(in_buf); + return -1; + } + + *cmd = cmd_recv; + free(in_buf); + + return 0; +} + +#define SEND_CMD_IN(MQ,ID,CMD,PARAM) \ + send_mq_cmd(MQ,MQ_IN,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); +#define SEND_CMD_OUT(MQ,ID,CMD,PARAM) \ + send_mq_cmd(MQ,MQ_OUT,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); +#define RECV_CMD_IN(MQ,CMD) \ + recv_mq_cmd(MQ,MQ_IN,CMD); +#define RECV_CMD_OUT(MQ,CMD) \ + recv_mq_cmd(MQ,MQ_OUT,CMD); + /* |--------|<--IN---|---------| @@ -331,33 +604,36 @@ typedef struct server_cfg int th_start_client(void *data) { - int cmd_id = 1; - int ret = 0; - char cmd_buf[MQ_MSG_SIZE]; - mq_cmd *cmd=NULL; + int err; + //char cmd_buf[MQ_MSG_SIZE]; + //mq_cmd *cmd=NULL; + int run; server_cfg *cfg = data; mq_ntf_mdt *mq = cfg->mq; atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); - sleep(10); + sleep(1); //send command wait for response - cmd = CMD_CREATE(cmd_id,"PING","NOPARAM"); - if (cmd != NULL) + run = 1; + while (run) { - if (ret = mq_ntf_write_out(mq,cmd->buf,cmd->sz) == -1) + //printf("Send msg\n"); + err = SEND_CMD_OUT(mq,cmd_id,"INIT","NO"); + printf("err = %d\n",err); + cmd_id += 1; + run += 1; + sleep(1); + if (run == 10) { - printf("Couldnt send command\n"); + break; } - } - cmd_id += 1; - printf("End client\n"); atomic_fetch_sub( &cfg->running,1); @@ -389,41 +665,65 @@ typedef struct event_handler_cfg { /* Thread to reacieve messages and return them back */ int th_event_manager(void *data) { - int state; - int i; event_handler_cfg *cfg = data; atomic_fetch_add(&cfg->running,1); - const int buf_size=MQ_MSG_SIZE; - char buf[buf_size]; - int ret; + mq_ntf_mdt *mq=NULL; + char *buf = NULL; + int run; + int err; + int mq_event; //read any command - mq_cmd *cmd = NULL; - - + //mq_cmd *cmd = NULL; + struct mq_attr out_attr, *ptr_out_attr=&out_attr; printf("Start event thread\n"); - state = EH_STATE_INIT; - while (state != EH_STATE_EXIT) - switch (state) + mq = cfg->mq_listen; + err = mq_ntf_mplx(mq, 10000); + if (err == -1) { - case EH_STATE_INIT: - printf("TH STATE INIT\n"); - sleep(1); - state = TH_STATE_START; - break; - case EH_STATE_START: - printf("TH STATE START\n"); - sleep(1); - state = TH_STATE_EXIT; - break; - case EH_STATE_EXIT: - printf("TH STATE EXIT\n"); + printf("Ups something whent wrong\n"); + } + + //get mq attributes + if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) + { + printf("Cant get attribute\n"); + } + + buf = malloc(out_attr.mq_msgsize); + //maybe its not null + + printf("Start event loop\n"); + run = 1; + while(run) + { + //check for messages + run += 1; + mq_event = mq_ntf_select(mq); + switch(mq_event) + { + case 0: + break; + case 1: + break; + case 2: + if (mq_ntf_read_out(mq, buf, out_attr.mq_msgsize) == -1) + { + printf("Cant read message\n"); + } else + { + buf[out_attr.mq_msgsize-1] = 0x0; + printf("Recieve %s\n", buf); + } + break; + default: + printf("Unknown event type\n"); + } sleep(1); - state = TH_STATE_EXIT; - break; - default: - printf("Wrong state\n"); + if (run == 10) + break; } + printf("End event thread\n"); atomic_fetch_sub( &cfg->running,1); @@ -474,6 +774,8 @@ int main(int argc, char **argv) printf("Couldnt open mq_ntf_open\n"); } srvc->mq = &mq_array[i]; + //try to drain mq + mq_ntf_drain(&mq_array[i]); /* clone new proc */ clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc); @@ -502,7 +804,7 @@ int main(int argc, char **argv) if (val != 0) cnt_running += 1; } - printf("cnt_running %d\n",cnt_running); + //PRINT("cnt_running %d\n",cnt_running); sleep(1); } diff --git a/config_servers.h b/config_servers.h index 3e4ee94..9bf4b9e 100644 --- a/config_servers.h +++ b/config_servers.h @@ -24,7 +24,7 @@ static irc_server_conf server_list[] = .channels = {"#mainlv",NULL}, .port = 6667, .ssl = 0, - }, + }/*, { .user = "agni", .password = NULL, @@ -32,7 +32,7 @@ static irc_server_conf server_list[] = .channels = {"#default","#bot",NULL}, .port = 6697, .ssl = 1, - } + }*/ }; #define SIZEOF_SERVER_LIST (sizeof(server_list)/sizeof(irc_server_conf)) @@ -19,6 +19,12 @@ #define E_COLOR "1;31m" #define E_COLOR_S "\033[" E_COLOR #define E_COLOR_E "\033[0m" + #define W_COLOR "1;35m" + #define W_COLOR_S "\033[" W_COLOR + #define W_COLOR_E "\033[0m" + #define PE_COLOR "1;33m" + #define PE_COLOR_S "\033[" PE_COLOR + #define PE_COLOR_E "\033[0m" #else #define D_COLOR #define D_COLOR_S @@ -26,6 +32,12 @@ #define E_COLOR #define E_COLOR_S #define E_COLOR_E + #define W_COLOR + #define W_COLOR_S + #define W_COLOR_E + #define PE_COLOR + #define PE_COLOR_S + #define PE_COLOR_E #endif //print debug line @@ -49,8 +61,12 @@ //print debug string #ifdef PRINT_DEBUG #define PRINT_DEBUG_F "Debug: " + #define PRINT_WARNING_F "WARN: " + #define PRINT_PERROR_F "PRME: " #else #define PRINT_DEBUG_F "" + #define PRINT_WARNING_F "" + #define PRINT_PERROR_F "" #endif #define PRINT( format, args ... ) PRINTF( D_COLOR_S PRINT_DEBUG_F \ @@ -61,9 +77,23 @@ PRINT_FILE_F PRINT_LINE_F format E_COLOR_E, PRINT_FILE_D, \ PRINT_LINE_D, ##args); +#define WARNING( format, args ... ) PRINTF( W_COLOR_S PRINT_WARNING_F \ + PRINT_FILE_F PRINT_LINE_F format W_COLOR_E, PRINT_FILE_D, \ + PRINT_LINE_D, ##args); + +#define PERROR( format, args ... ) PRINTF( PE_COLOR_S PRINT_WARNING_F \ + PRINT_FILE_F PRINT_LINE_F format PE_COLOR_E, PRINT_FILE_D, \ + PRINT_LINE_D, ##args); + #define PNL() PRINT("\n"); #define ENL() ERROR("\n"); +#define WRN() WARNING("\n"); + +#define PERM() PERROR("\n"); + +/*ERROR LEVEL TYPES*/ + #endif @@ -7,19 +7,29 @@ mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t para mq_cmd *ret = NULL; if (cmd == NULL) + { + PERM(); return NULL; + } if (param == NULL) + { + PERM(); return NULL; + } ret = malloc(sizeof(mq_cmd)); if (ret == NULL) + { + ENL(); return NULL; + } ret->id = id; ret->sz = MQ_CMD_MSG_SIZE; ret->buf = malloc(ret->sz); if (!ret->buf) { + ENL(); free(ret); return NULL; } @@ -31,6 +41,25 @@ mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t para return ret; } +mq_cmd* mq_cmd_creates(char *str, size_t sz) +{ + mq_cmd *ret = NULL; + + //just naive belive everything is fine in string + ret = malloc(sizeof(mq_cmd)); + ret->buf = malloc(sz); + if (ret->buf == NULL) + { + PERM(); + return NULL; + } + ret->sz = sz; + memcpy(ret->buf,str,sz); + ret->id = -1; + + return ret; +} + int mq_cmd_id(mq_cmd *cmd, int *id) { int i,j,strt,sz; @@ -40,9 +69,15 @@ int mq_cmd_id(mq_cmd *cmd, int *id) /*:[ID]:*/ if (cmd == NULL) + { + PERM(); return -1; + } if (id == NULL) + { + PERM(); return -1; + } i = 0; while ((cmd->buf[i] != 0) && (i<cmd->sz)) @@ -84,11 +119,20 @@ char* mq_cmd_cmd(mq_cmd *cmd, char **buf, size_t *sz) char *p; if (cmd == NULL) + { + PERM(); return NULL; + } if (buf == NULL) + { + PERM(); return NULL; + } if (sz == NULL) + { + PERM(); return NULL; + } p = cmd->buf; @@ -164,6 +208,7 @@ size_t mq_cmd_size(mq_cmd *cmd) { if (cmd == NULL) { + PERM(); return -1; } return cmd->sz; @@ -173,6 +218,7 @@ char *mq_cmd_buf(mq_cmd *cmd) { if (cmd == NULL) { + PERM(); return NULL; } @@ -5,6 +5,8 @@ #include <stdio.h> #include <string.h> +#include "debug.h" + typedef struct mq_cmd { int id; @@ -13,6 +15,7 @@ typedef struct mq_cmd } mq_cmd; mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t param_sz); +mq_cmd* mq_cmd_creates(char *str, size_t sz); int mq_cmd_id(mq_cmd *cmd, int *id); char* mq_cmd_cmd(mq_cmd *cmd, char **buf, size_t *sz); int mq_cmd_param(mq_cmd *cmd, char **param, size_t *sz); diff --git a/tool/Makefile b/tool/Makefile new file mode 100644 index 0000000..915bf78 --- /dev/null +++ b/tool/Makefile @@ -0,0 +1,2 @@ +make: + gcc mqtool.c -o mqtool -lrt
\ No newline at end of file diff --git a/tool/mqtool.c b/tool/mqtool.c new file mode 100644 index 0000000..61e78af --- /dev/null +++ b/tool/mqtool.c @@ -0,0 +1,183 @@ +#define _GNU_SOURCE + +#include <ctype.h> +#include <errno.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> +#include <netdb.h> +#include <sys/syscall.h> +#include <sched.h> +#include <stdatomic.h> +#include <sys/types.h> +#include <dirent.h> + +#include <sys/stat.h> +#include <sched.h> +#include <mqueue.h> + +#define DEVMQPATH "/dev/mqueue/" +#define AGNIPATH_IN "agni-%d-in" +#define AGNIPATH_OUT "agni-%d-out" + +//------------------------------------------------------------------------------ +int main(int argc, char **argv) +{ + //generic variables + int c; + int i,j; + int iRet; //any func return param + + //for directory listing + DIR *dirPath; + struct dirent *entryPath; + + //list all mqs + int flagListAll = 0; + //show mq param + int flagMqParam = 0; + char* mqName = NULL; + //clear message queue + int flagClear = 0; + char* mqNameClear = NULL; + //clear print extra info if there is + int flagExtra = 0; + + + //get arguments + while ((c = getopt(argc, argv, "elp:c:")) != -1) + switch(c) + { + case 'l': + flagListAll = 1; + break; + case 'p': + flagMqParam = 1; + mqName = optarg; + break; + case 'c': + flagClear = 1; + mqNameClear = optarg; + break; + case 'e': + flagExtra = 1; + break; + default: + break; + } + + //---------------------------------------------------------------------- + //main logic + + //list all + if (flagListAll) + { + dirPath = opendir(DEVMQPATH); + if (dirPath != NULL) + { + while (entryPath = readdir(dirPath)) + { + int id; + if (entryPath->d_type == DT_REG) + { + if(sscanf(entryPath->d_name, "agni-%d", &id) == 1) + { + printf("%s\n",entryPath->d_name); + } + } + //should free entryPath? maybe + } + closedir(dirPath); + } else + { + perror("Cant open directory"); + } + + exit(0); + } + + //get mqueue params + if (flagMqParam) + { + struct mq_attr mqAttr; + int mqFd = -1; + + mqFd = mq_open(mqName, O_RDWR, 0666, NULL); + if (mqFd != -1) + { + iRet = mq_getattr(mqFd, &mqAttr); + if (iRet != -1) + { + printf("FLAGS %8ld, ", mqAttr.mq_flags); + printf("MSG SIZE %8ld, ", mqAttr.mq_msgsize); + printf("CUR MSGS %ld/%ld\n", mqAttr.mq_curmsgs,mqAttr.mq_maxmsg); + } + } else { + printf("Cant open %s\n", mqName); + } + exit(0); + } + + //clear mq from messages + if (flagClear) + { + struct mq_attr mqAttr; + int mqFd = -1; + char *cBuf = NULL; + int numRead = -1; + int prio = 0; + int iCountMsg = 0; + + mqFd = mq_open(mqNameClear, O_RDWR, 0666, NULL); + if (mqFd == -1) + { + printf("Cant open %s\n", mqName); + exit(1); + } + + iRet = mq_getattr(mqFd, &mqAttr); + if (iRet == -1) + { + perror("Cant get attributes"); + exit(1); + } + + cBuf = malloc(mqAttr.mq_msgsize); + if (cBuf == NULL) + { + exit(1); + } + + for (i=0; i<mqAttr.mq_curmsgs; i++) + { + numRead = mq_receive(mqFd, cBuf, mqAttr.mq_msgsize, &prio); + if (numRead == -1) + { + free(cBuf); + exit(0); + } + iCountMsg += 1; + + //extra output of message value + if (flagExtra) + { + for (j=0; j<numRead; j++) + { + printf("%c",cBuf[j]); + } + printf("\n"); + } + } + + free(cBuf); + + printf("Readed %d messages from %ld\n", iCountMsg, mqAttr.mq_curmsgs); + + + exit(0); + } + +}
\ No newline at end of file |