summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZoRo <dos21h@gmail.com>2017-01-26 22:23:28 +0000
committerZoRo <dos21h@gmail.com>2017-01-26 22:23:28 +0000
commit67860598185d248756316549a7522968f7294990 (patch)
treeb329576eca4f5dccbfec2bf7a4952abc93b8b9c8
parenta588aa017512d3cc70dde6627d1218020e755259 (diff)
downloadagni-67860598185d248756316549a7522968f7294990.tar.gz
agni-67860598185d248756316549a7522968f7294990.zip
Made working basic mq IPC communication
-rw-r--r--Makefile2
-rw-r--r--agni.c498
-rw-r--r--config_servers.h4
-rw-r--r--debug.h30
-rw-r--r--mq_cmd.c46
-rw-r--r--mq_cmd.h3
-rw-r--r--tool/Makefile2
-rw-r--r--tool/mqtool.c183
8 files changed, 667 insertions, 101 deletions
diff --git a/Makefile b/Makefile
index e0bf9ea..07d1ec0 100644
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ make:
gcc mmm.c -c
gcc darray.c -c
gcc mq_cmd.c -c
- gcc mq_cmd.o buf.o mmm.o darray.o agni.c -o agni -std=c11 -lrt
+ gcc -Wall mq_cmd.o buf.o mmm.o darray.o agni.c -o agni -std=c11 -lrt
clean:
rm -f agni *.o \ No newline at end of file
diff --git a/agni.c b/agni.c
index 5705543..342e21d 100644
--- a/agni.c
+++ b/agni.c
@@ -49,23 +49,13 @@ int irc_connect( char *hostname, char *port )
return fd;
}
-char *create_mq_cmd(char *out, size_t out_sz, int id, char *cmd, size_t cmd_sz, char *payload, size_t p_sz)
+//async io multiplexer
+typedef struct mq_ntf_io_mplx
{
- char *ret = out;
- int n,i;
+ struct timeval tv;
+ fd_set io_fds;
+} mq_ntf_io_mplx;
- n = snprintf(out, out_sz, ":%d:%s ", id, cmd);
- if (n < 0)
- {
- return NULL;
- }
- for (i=0;i<p_sz,n<out_sz-1;i++,n++)
- {
- out[n] = payload[i];
- }
- out[n++]=0x0;
- return ret;
-}
/*
supposed format agni-[id]-[in/out]
@@ -78,9 +68,11 @@ typedef struct mq_ntf_mdt
int id;
mqd_t mq_in;
mqd_t mq_out;
+ mq_ntf_io_mplx io_mplx;
//_Atomic int used; nado?
} mq_ntf_mdt;
+
/*
return -1 on error
*/
@@ -91,6 +83,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id)
if (!mq)
{
+ PERM();
return -1;
}
mq->id = id;
@@ -100,6 +93,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id)
O_RDWR | O_CREAT, 0666, NULL);
if (mq->mq_in == -1)
{
+ ENL();
perror("mq_open");
return -1;
}
@@ -112,15 +106,96 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id)
perror("mq_open");
return -1;
}
+
+ PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out);
+
+ return 0;
+}
+
+int mq_ntf_mplx(mq_ntf_mdt *mq, int msec)
+{
+ fd_set io_fds;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ if (msec < 0)
+ {
+ PERM();
+ return -1;
+ }
+
+ //mq->io_mplx.tv.tv_sec = msec/1000;
+ //mq->io_mplx.tv.tv_usec = (msec%1000)*1000;
+ mq->io_mplx.tv.tv_sec = 2;
+ mq->io_mplx.tv.tv_usec = 0;
+ PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out);
+ FD_ZERO(&io_fds);
+ FD_SET(mq->mq_in, &io_fds);//in first
+ FD_SET(mq->mq_out, &io_fds);//out second
+ mq->io_mplx.io_fds = io_fds;
+
return 0;
}
+/* return number if there is something to read
+RETURN:
+ -1 some error
+ 0x00 nothing to read
+ 0x01 input ready
+ 0x02 output ready
+ 0x02|0x01 input&output ready
-/* return number if there is something to read */
+*/
+//if mplx havent done, everything could blow up
int mq_ntf_select(mq_ntf_mdt *mq)
{
+ int fdnum, fd1, fd2;
+ int ret=0x00;
+ fd_set io_fds;
+ struct timeval tv;
- return 0;
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ //we should allways have just 2 descriptors to listen
+ //io_fds = mq->io_mplx.io_fds;
+ fd1 = mq->mq_in;
+ fd2 = mq->mq_out;
+ FD_ZERO(&io_fds);
+ FD_SET(fd1, &io_fds);
+ FD_SET(fd2, &io_fds);
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ fdnum = select(2,
+ &io_fds,
+ NULL,
+ NULL,
+ &tv );
+
+ if (fdnum == -1)
+ {
+ ENL();
+ return -1;
+ } else {
+ if (FD_ISSET(mq->mq_in, &mq->io_mplx.io_fds))
+ {
+ ret = 0x01;
+ }
+
+ if (FD_ISSET(mq->mq_out, &mq->io_mplx.io_fds))
+ {
+ ret = 0x02;
+ }
+ }
+
+ return ret;
}
@@ -128,10 +203,10 @@ int mq_ntf_select(mq_ntf_mdt *mq)
int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
size_t bytes;
unsigned int prio = 10;
- int fret;
+ int fret=0;
+
fret = mq_getattr(mq->mq_in, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
@@ -146,21 +221,21 @@ int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size)
perror("mq_receive read in");
return -1;
}
- return 0;
+ return bytes;
}
/* read from output queue */
int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
+ //char *msg;
size_t bytes;
unsigned int prio = 10;
int fret;
fret = mq_getattr(mq->mq_out, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
- printf("queue %d out full", mq->id);
+ printf("queue %d out full\n", mq->id);
return -1;
}
bytes = mq_receive(mq->mq_out, buf,
@@ -171,7 +246,7 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
perror("mq_receive read out");
return -1;
}
- return 0;
+ return bytes;
}
@@ -179,10 +254,10 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
+ //char *msg;
size_t bytes;
unsigned int prio = 10;
- int fret;
+ int fret=0;
fret = mq_getattr(mq->mq_out, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
@@ -197,17 +272,17 @@ int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size)
perror("mq_send");
return -1;
}
- return 0;
+ return bytes;
}
/* write to input quque */
int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
+ //char *msg;
size_t bytes;
unsigned int prio = 10;
- int fret;
+ int fret=0;
fret = mq_getattr(mq->mq_in, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
@@ -222,43 +297,35 @@ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size)
perror("mq_send");
return -1;
}
- return 0;
+ return bytes;
}
-/* drain all messages from quque */
-int mq_ntf_drain(mq_ntf_mdt *mq)
+//dir - dirction 1 in 2 out
+int mq_ntf_count(mq_ntf_mdt *mq, int dir)
{
- int i,j;
- struct mq_attr curattr;
- char *buf;
- ssize_t bytes;
- int prio = 10;
- mqd_t mqlist[2] = {mq->mq_in, mq->mq_out};
-
- for (i=0;i<2;i++)
- {
- mq_getattr(mqlist[i], &curattr);
- buf = malloc(curattr.mq_maxmsg);
- for (j=0; j<curattr.mq_curmsgs; j++)
- {
- bytes = mq_receive(mqlist[i], buf, curattr.mq_maxmsg, &prio);
- if (bytes == -1)
- {
- perror("mq_receive drain");
- }
- printf("Drain %d bytes\n", bytes);
- }
- free(buf);
+ struct mq_attr attr;
+ int mq_fd;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
}
- return 0;
-}
+ if (dir == 1)
+ {
+ mq_fd = mq->mq_in;
+ } else
+ {
+ mq_fd = mq->mq_out;
+ }
-/* check if there is space in quque */
-int mq_ntf_full(mq_ntf_mdt *mq)
-{
+ if (mq_getattr(mq_fd, &attr) != -1)
+ {
+ return attr.mq_curmsgs;
+ }
- return 0;
+ return -1;
}
@@ -276,6 +343,8 @@ int mq_ntf_close(mq_ntf_mdt *mq)
snprintf(name, name_size, MQ_PREFIX"%d-out",id);
mq_unlink(name);
+ //clean file descriptors for io_mplx
+
return 0;
}
@@ -292,6 +361,210 @@ int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd)
return 0;
}
+int mq_drain_q(mqd_t mqd)
+{
+ struct mq_attr attr;
+ char *buf = NULL;
+ unsigned int prio = 0;
+ int cnt_msg = 0;
+ int num_read = 0;
+ int i = 0;
+
+ if (mq_getattr(mqd, &attr) == -1)
+ {
+ ENL();
+ return -1;
+ }
+
+ buf = malloc(attr.mq_msgsize);
+ if (buf == NULL)
+ {
+ ENL();
+ return -1;
+ }
+
+ for (i=0; i<attr.mq_curmsgs; i++)
+ {
+ num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio);
+ if (num_read == -1)
+ {
+ ENL();
+ free(buf);
+ return -1;
+ }
+ cnt_msg += 1;
+ }
+
+ free(buf);
+ return cnt_msg;
+}
+
+//drain all mq buffers to zero
+//to use at begining and clear all buffers to zero
+int mq_ntf_drain(mq_ntf_mdt *mq)
+{
+ int err;
+
+ err = mq_drain_q(mq->mq_in);
+ if (err < 0)
+ {
+ PERM();
+ return -1;
+ }
+ printf("Drained in %d messages\n", err);
+
+ err = mq_drain_q(mq->mq_out);
+ if (err < 0)
+ {
+ PERM();
+ return -1;
+ }
+ printf("Drained out %d messages\n", err);
+
+ return 0;
+}
+
+#define MQ_OUT 1
+#define MQ_IN 2
+
+int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr)
+{
+ struct mq_attr gattr;
+ mqd_t mqdt;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ if (dir == MQ_OUT)
+ {
+ mqdt = mq->mq_out;
+ } else if (dir == MQ_IN)
+ {
+ mqdt = mq->mq_in;
+ } else
+ {
+ return -1;
+ }
+
+ if (mq_getattr(mqdt, &gattr) == -1)
+ {
+ ENL();
+ return -1;
+ }
+
+ memcpy(*attr,&gattr,sizeof(struct mq_attr));
+
+ return 0;
+}
+
+
+
+int send_mq_cmd(mq_ntf_mdt *mq,
+ int io,
+ int cmd_id,
+ char *cmd,
+ size_t cmd_sz,
+ char *param,
+ size_t param_sz)
+{
+ int err = 0;
+ mq_cmd *cmd_send = NULL;
+ cmd_send = mq_cmd_create(cmd_id,cmd,cmd_sz,param,param_sz);
+ if (cmd_send == NULL)
+ {
+ printf("Cant create command\n");
+ return -1;
+ }
+
+ err = -1;
+ if (io == MQ_OUT)
+ {
+ err = mq_ntf_write_out(mq, cmd_send->buf, cmd_send->sz);
+ } else if (io == MQ_IN)
+ {
+ err = mq_ntf_write_in(mq, cmd_send->buf, cmd_send->sz);
+ } else
+ {
+ printf("Unknown direction");
+ return -1;
+ }
+
+ if (err == -1)
+ {
+ printf("Couldnt recieve command\n");
+ mq_cmd_free(cmd_send);
+ return -1;
+ }
+
+ mq_cmd_free(cmd_send);
+
+ return 0;
+}
+
+int recv_mq_cmd(mq_ntf_mdt *mq,
+ int io,
+ mq_cmd **cmd) //return back recieved command
+{
+ int err = 0;
+ mq_cmd *cmd_recv = NULL;
+ int recv_sz=-1;
+ char *in_buf=NULL;
+
+ in_buf = malloc(MQ_MSG_SIZE);
+ if (in_buf == NULL)
+ {
+ ENL();
+ return -1;
+ }
+
+ err = -1;
+ if (io == MQ_OUT)
+ {
+ err = mq_ntf_read_out(mq, in_buf, MQ_MSG_SIZE);
+ } else if (io == MQ_IN)
+ {
+ err = mq_ntf_read_in(mq, in_buf, MQ_MSG_SIZE);
+ } else
+ {
+ printf("Unknown direction");
+ return -1;
+ }
+
+ if (err == -1)
+ {
+ printf("Couldnt recieve command\n");
+ free(in_buf);
+ return -1;
+ }
+ recv_sz = err;
+
+ //in err should be still the size of recieved buffer
+ cmd_recv = mq_cmd_creates(in_buf, recv_sz);
+ if (cmd_recv == NULL)
+ {
+ printf("Cannot create cmd\n");
+ free(in_buf);
+ return -1;
+ }
+
+ *cmd = cmd_recv;
+ free(in_buf);
+
+ return 0;
+}
+
+#define SEND_CMD_IN(MQ,ID,CMD,PARAM) \
+ send_mq_cmd(MQ,MQ_IN,ID,CMD,strlen(CMD),PARAM,strlen(PARAM));
+#define SEND_CMD_OUT(MQ,ID,CMD,PARAM) \
+ send_mq_cmd(MQ,MQ_OUT,ID,CMD,strlen(CMD),PARAM,strlen(PARAM));
+#define RECV_CMD_IN(MQ,CMD) \
+ recv_mq_cmd(MQ,MQ_IN,CMD);
+#define RECV_CMD_OUT(MQ,CMD) \
+ recv_mq_cmd(MQ,MQ_OUT,CMD);
+
/*
|--------|<--IN---|---------|
@@ -331,33 +604,36 @@ typedef struct server_cfg
int th_start_client(void *data)
{
-
int cmd_id = 1;
- int ret = 0;
- char cmd_buf[MQ_MSG_SIZE];
- mq_cmd *cmd=NULL;
+ int err;
+ //char cmd_buf[MQ_MSG_SIZE];
+ //mq_cmd *cmd=NULL;
+ int run;
server_cfg *cfg = data;
mq_ntf_mdt *mq = cfg->mq;
atomic_fetch_add(&cfg->running,1);
printf("Start client\n");
printf("Server %d\n",cfg->tid);
- sleep(10);
+ sleep(1);
//send command wait for response
- cmd = CMD_CREATE(cmd_id,"PING","NOPARAM");
- if (cmd != NULL)
+ run = 1;
+ while (run)
{
- if (ret = mq_ntf_write_out(mq,cmd->buf,cmd->sz) == -1)
+ //printf("Send msg\n");
+ err = SEND_CMD_OUT(mq,cmd_id,"INIT","NO");
+ printf("err = %d\n",err);
+ cmd_id += 1;
+ run += 1;
+ sleep(1);
+ if (run == 10)
{
- printf("Couldnt send command\n");
+ break;
}
-
}
- cmd_id += 1;
-
printf("End client\n");
atomic_fetch_sub( &cfg->running,1);
@@ -389,41 +665,65 @@ typedef struct event_handler_cfg {
/* Thread to reacieve messages and return them back */
int th_event_manager(void *data)
{
- int state;
- int i;
event_handler_cfg *cfg = data;
atomic_fetch_add(&cfg->running,1);
- const int buf_size=MQ_MSG_SIZE;
- char buf[buf_size];
- int ret;
+ mq_ntf_mdt *mq=NULL;
+ char *buf = NULL;
+ int run;
+ int err;
+ int mq_event;
//read any command
- mq_cmd *cmd = NULL;
-
-
+ //mq_cmd *cmd = NULL;
+ struct mq_attr out_attr, *ptr_out_attr=&out_attr;
printf("Start event thread\n");
- state = EH_STATE_INIT;
- while (state != EH_STATE_EXIT)
- switch (state)
+ mq = cfg->mq_listen;
+ err = mq_ntf_mplx(mq, 10000);
+ if (err == -1)
{
- case EH_STATE_INIT:
- printf("TH STATE INIT\n");
- sleep(1);
- state = TH_STATE_START;
- break;
- case EH_STATE_START:
- printf("TH STATE START\n");
- sleep(1);
- state = TH_STATE_EXIT;
- break;
- case EH_STATE_EXIT:
- printf("TH STATE EXIT\n");
+ printf("Ups something whent wrong\n");
+ }
+
+ //get mq attributes
+ if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1)
+ {
+ printf("Cant get attribute\n");
+ }
+
+ buf = malloc(out_attr.mq_msgsize);
+ //maybe its not null
+
+ printf("Start event loop\n");
+ run = 1;
+ while(run)
+ {
+ //check for messages
+ run += 1;
+ mq_event = mq_ntf_select(mq);
+ switch(mq_event)
+ {
+ case 0:
+ break;
+ case 1:
+ break;
+ case 2:
+ if (mq_ntf_read_out(mq, buf, out_attr.mq_msgsize) == -1)
+ {
+ printf("Cant read message\n");
+ } else
+ {
+ buf[out_attr.mq_msgsize-1] = 0x0;
+ printf("Recieve %s\n", buf);
+ }
+ break;
+ default:
+ printf("Unknown event type\n");
+ }
sleep(1);
- state = TH_STATE_EXIT;
- break;
- default:
- printf("Wrong state\n");
+ if (run == 10)
+ break;
}
+
printf("End event thread\n");
atomic_fetch_sub( &cfg->running,1);
@@ -474,6 +774,8 @@ int main(int argc, char **argv)
printf("Couldnt open mq_ntf_open\n");
}
srvc->mq = &mq_array[i];
+ //try to drain mq
+ mq_ntf_drain(&mq_array[i]);
/* clone new proc */
clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc);
@@ -502,7 +804,7 @@ int main(int argc, char **argv)
if (val != 0)
cnt_running += 1;
}
- printf("cnt_running %d\n",cnt_running);
+ //PRINT("cnt_running %d\n",cnt_running);
sleep(1);
}
diff --git a/config_servers.h b/config_servers.h
index 3e4ee94..9bf4b9e 100644
--- a/config_servers.h
+++ b/config_servers.h
@@ -24,7 +24,7 @@ static irc_server_conf server_list[] =
.channels = {"#mainlv",NULL},
.port = 6667,
.ssl = 0,
- },
+ }/*,
{
.user = "agni",
.password = NULL,
@@ -32,7 +32,7 @@ static irc_server_conf server_list[] =
.channels = {"#default","#bot",NULL},
.port = 6697,
.ssl = 1,
- }
+ }*/
};
#define SIZEOF_SERVER_LIST (sizeof(server_list)/sizeof(irc_server_conf))
diff --git a/debug.h b/debug.h
index a8bf211..919eea4 100644
--- a/debug.h
+++ b/debug.h
@@ -19,6 +19,12 @@
#define E_COLOR "1;31m"
#define E_COLOR_S "\033[" E_COLOR
#define E_COLOR_E "\033[0m"
+ #define W_COLOR "1;35m"
+ #define W_COLOR_S "\033[" W_COLOR
+ #define W_COLOR_E "\033[0m"
+ #define PE_COLOR "1;33m"
+ #define PE_COLOR_S "\033[" PE_COLOR
+ #define PE_COLOR_E "\033[0m"
#else
#define D_COLOR
#define D_COLOR_S
@@ -26,6 +32,12 @@
#define E_COLOR
#define E_COLOR_S
#define E_COLOR_E
+ #define W_COLOR
+ #define W_COLOR_S
+ #define W_COLOR_E
+ #define PE_COLOR
+ #define PE_COLOR_S
+ #define PE_COLOR_E
#endif
//print debug line
@@ -49,8 +61,12 @@
//print debug string
#ifdef PRINT_DEBUG
#define PRINT_DEBUG_F "Debug: "
+ #define PRINT_WARNING_F "WARN: "
+ #define PRINT_PERROR_F "PRME: "
#else
#define PRINT_DEBUG_F ""
+ #define PRINT_WARNING_F ""
+ #define PRINT_PERROR_F ""
#endif
#define PRINT( format, args ... ) PRINTF( D_COLOR_S PRINT_DEBUG_F \
@@ -61,9 +77,23 @@
PRINT_FILE_F PRINT_LINE_F format E_COLOR_E, PRINT_FILE_D, \
PRINT_LINE_D, ##args);
+#define WARNING( format, args ... ) PRINTF( W_COLOR_S PRINT_WARNING_F \
+ PRINT_FILE_F PRINT_LINE_F format W_COLOR_E, PRINT_FILE_D, \
+ PRINT_LINE_D, ##args);
+
+#define PERROR( format, args ... ) PRINTF( PE_COLOR_S PRINT_WARNING_F \
+ PRINT_FILE_F PRINT_LINE_F format PE_COLOR_E, PRINT_FILE_D, \
+ PRINT_LINE_D, ##args);
+
#define PNL() PRINT("\n");
#define ENL() ERROR("\n");
+#define WRN() WARNING("\n");
+
+#define PERM() PERROR("\n");
+
+/*ERROR LEVEL TYPES*/
+
#endif
diff --git a/mq_cmd.c b/mq_cmd.c
index 7f1887a..ce48713 100644
--- a/mq_cmd.c
+++ b/mq_cmd.c
@@ -7,19 +7,29 @@ mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t para
mq_cmd *ret = NULL;
if (cmd == NULL)
+ {
+ PERM();
return NULL;
+ }
if (param == NULL)
+ {
+ PERM();
return NULL;
+ }
ret = malloc(sizeof(mq_cmd));
if (ret == NULL)
+ {
+ ENL();
return NULL;
+ }
ret->id = id;
ret->sz = MQ_CMD_MSG_SIZE;
ret->buf = malloc(ret->sz);
if (!ret->buf)
{
+ ENL();
free(ret);
return NULL;
}
@@ -31,6 +41,25 @@ mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t para
return ret;
}
+mq_cmd* mq_cmd_creates(char *str, size_t sz)
+{
+ mq_cmd *ret = NULL;
+
+ //just naive belive everything is fine in string
+ ret = malloc(sizeof(mq_cmd));
+ ret->buf = malloc(sz);
+ if (ret->buf == NULL)
+ {
+ PERM();
+ return NULL;
+ }
+ ret->sz = sz;
+ memcpy(ret->buf,str,sz);
+ ret->id = -1;
+
+ return ret;
+}
+
int mq_cmd_id(mq_cmd *cmd, int *id)
{
int i,j,strt,sz;
@@ -40,9 +69,15 @@ int mq_cmd_id(mq_cmd *cmd, int *id)
/*:[ID]:*/
if (cmd == NULL)
+ {
+ PERM();
return -1;
+ }
if (id == NULL)
+ {
+ PERM();
return -1;
+ }
i = 0;
while ((cmd->buf[i] != 0) && (i<cmd->sz))
@@ -84,11 +119,20 @@ char* mq_cmd_cmd(mq_cmd *cmd, char **buf, size_t *sz)
char *p;
if (cmd == NULL)
+ {
+ PERM();
return NULL;
+ }
if (buf == NULL)
+ {
+ PERM();
return NULL;
+ }
if (sz == NULL)
+ {
+ PERM();
return NULL;
+ }
p = cmd->buf;
@@ -164,6 +208,7 @@ size_t mq_cmd_size(mq_cmd *cmd)
{
if (cmd == NULL)
{
+ PERM();
return -1;
}
return cmd->sz;
@@ -173,6 +218,7 @@ char *mq_cmd_buf(mq_cmd *cmd)
{
if (cmd == NULL)
{
+ PERM();
return NULL;
}
diff --git a/mq_cmd.h b/mq_cmd.h
index ef255d9..7872acf 100644
--- a/mq_cmd.h
+++ b/mq_cmd.h
@@ -5,6 +5,8 @@
#include <stdio.h>
#include <string.h>
+#include "debug.h"
+
typedef struct mq_cmd
{
int id;
@@ -13,6 +15,7 @@ typedef struct mq_cmd
} mq_cmd;
mq_cmd* mq_cmd_create(int id, char *cmd, size_t cmd_sz, char *param, size_t param_sz);
+mq_cmd* mq_cmd_creates(char *str, size_t sz);
int mq_cmd_id(mq_cmd *cmd, int *id);
char* mq_cmd_cmd(mq_cmd *cmd, char **buf, size_t *sz);
int mq_cmd_param(mq_cmd *cmd, char **param, size_t *sz);
diff --git a/tool/Makefile b/tool/Makefile
new file mode 100644
index 0000000..915bf78
--- /dev/null
+++ b/tool/Makefile
@@ -0,0 +1,2 @@
+make:
+ gcc mqtool.c -o mqtool -lrt \ No newline at end of file
diff --git a/tool/mqtool.c b/tool/mqtool.c
new file mode 100644
index 0000000..61e78af
--- /dev/null
+++ b/tool/mqtool.c
@@ -0,0 +1,183 @@
+#define _GNU_SOURCE
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <sys/syscall.h>
+#include <sched.h>
+#include <stdatomic.h>
+#include <sys/types.h>
+#include <dirent.h>
+
+#include <sys/stat.h>
+#include <sched.h>
+#include <mqueue.h>
+
+#define DEVMQPATH "/dev/mqueue/"
+#define AGNIPATH_IN "agni-%d-in"
+#define AGNIPATH_OUT "agni-%d-out"
+
+//------------------------------------------------------------------------------
+int main(int argc, char **argv)
+{
+ //generic variables
+ int c;
+ int i,j;
+ int iRet; //any func return param
+
+ //for directory listing
+ DIR *dirPath;
+ struct dirent *entryPath;
+
+ //list all mqs
+ int flagListAll = 0;
+ //show mq param
+ int flagMqParam = 0;
+ char* mqName = NULL;
+ //clear message queue
+ int flagClear = 0;
+ char* mqNameClear = NULL;
+ //clear print extra info if there is
+ int flagExtra = 0;
+
+
+ //get arguments
+ while ((c = getopt(argc, argv, "elp:c:")) != -1)
+ switch(c)
+ {
+ case 'l':
+ flagListAll = 1;
+ break;
+ case 'p':
+ flagMqParam = 1;
+ mqName = optarg;
+ break;
+ case 'c':
+ flagClear = 1;
+ mqNameClear = optarg;
+ break;
+ case 'e':
+ flagExtra = 1;
+ break;
+ default:
+ break;
+ }
+
+ //----------------------------------------------------------------------
+ //main logic
+
+ //list all
+ if (flagListAll)
+ {
+ dirPath = opendir(DEVMQPATH);
+ if (dirPath != NULL)
+ {
+ while (entryPath = readdir(dirPath))
+ {
+ int id;
+ if (entryPath->d_type == DT_REG)
+ {
+ if(sscanf(entryPath->d_name, "agni-%d", &id) == 1)
+ {
+ printf("%s\n",entryPath->d_name);
+ }
+ }
+ //should free entryPath? maybe
+ }
+ closedir(dirPath);
+ } else
+ {
+ perror("Cant open directory");
+ }
+
+ exit(0);
+ }
+
+ //get mqueue params
+ if (flagMqParam)
+ {
+ struct mq_attr mqAttr;
+ int mqFd = -1;
+
+ mqFd = mq_open(mqName, O_RDWR, 0666, NULL);
+ if (mqFd != -1)
+ {
+ iRet = mq_getattr(mqFd, &mqAttr);
+ if (iRet != -1)
+ {
+ printf("FLAGS %8ld, ", mqAttr.mq_flags);
+ printf("MSG SIZE %8ld, ", mqAttr.mq_msgsize);
+ printf("CUR MSGS %ld/%ld\n", mqAttr.mq_curmsgs,mqAttr.mq_maxmsg);
+ }
+ } else {
+ printf("Cant open %s\n", mqName);
+ }
+ exit(0);
+ }
+
+ //clear mq from messages
+ if (flagClear)
+ {
+ struct mq_attr mqAttr;
+ int mqFd = -1;
+ char *cBuf = NULL;
+ int numRead = -1;
+ int prio = 0;
+ int iCountMsg = 0;
+
+ mqFd = mq_open(mqNameClear, O_RDWR, 0666, NULL);
+ if (mqFd == -1)
+ {
+ printf("Cant open %s\n", mqName);
+ exit(1);
+ }
+
+ iRet = mq_getattr(mqFd, &mqAttr);
+ if (iRet == -1)
+ {
+ perror("Cant get attributes");
+ exit(1);
+ }
+
+ cBuf = malloc(mqAttr.mq_msgsize);
+ if (cBuf == NULL)
+ {
+ exit(1);
+ }
+
+ for (i=0; i<mqAttr.mq_curmsgs; i++)
+ {
+ numRead = mq_receive(mqFd, cBuf, mqAttr.mq_msgsize, &prio);
+ if (numRead == -1)
+ {
+ free(cBuf);
+ exit(0);
+ }
+ iCountMsg += 1;
+
+ //extra output of message value
+ if (flagExtra)
+ {
+ for (j=0; j<numRead; j++)
+ {
+ printf("%c",cBuf[j]);
+ }
+ printf("\n");
+ }
+ }
+
+ free(cBuf);
+
+ printf("Readed %d messages from %ld\n", iCountMsg, mqAttr.mq_curmsgs);
+
+
+ exit(0);
+ }
+
+} \ No newline at end of file