aboutsummaryrefslogtreecommitdiffstats
path: root/agni.c
diff options
context:
space:
mode:
authorZoRo <dos21h@gmail.com>2017-01-26 22:23:28 +0000
committerZoRo <dos21h@gmail.com>2017-01-26 22:23:28 +0000
commit67860598185d248756316549a7522968f7294990 (patch)
treeb329576eca4f5dccbfec2bf7a4952abc93b8b9c8 /agni.c
parenta588aa017512d3cc70dde6627d1218020e755259 (diff)
downloadagni-67860598185d248756316549a7522968f7294990.tar.gz
agni-67860598185d248756316549a7522968f7294990.zip
Made working basic mq IPC communication
Diffstat (limited to 'agni.c')
-rw-r--r--agni.c498
1 files changed, 400 insertions, 98 deletions
diff --git a/agni.c b/agni.c
index 5705543..342e21d 100644
--- a/agni.c
+++ b/agni.c
@@ -49,23 +49,13 @@ int irc_connect( char *hostname, char *port )
return fd;
}
-char *create_mq_cmd(char *out, size_t out_sz, int id, char *cmd, size_t cmd_sz, char *payload, size_t p_sz)
+//async io multiplexer
+typedef struct mq_ntf_io_mplx
{
- char *ret = out;
- int n,i;
+ struct timeval tv;
+ fd_set io_fds;
+} mq_ntf_io_mplx;
- n = snprintf(out, out_sz, ":%d:%s ", id, cmd);
- if (n < 0)
- {
- return NULL;
- }
- for (i=0;i<p_sz,n<out_sz-1;i++,n++)
- {
- out[n] = payload[i];
- }
- out[n++]=0x0;
- return ret;
-}
/*
supposed format agni-[id]-[in/out]
@@ -78,9 +68,11 @@ typedef struct mq_ntf_mdt
int id;
mqd_t mq_in;
mqd_t mq_out;
+ mq_ntf_io_mplx io_mplx;
//_Atomic int used; nado?
} mq_ntf_mdt;
+
/*
return -1 on error
*/
@@ -91,6 +83,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id)
if (!mq)
{
+ PERM();
return -1;
}
mq->id = id;
@@ -100,6 +93,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id)
O_RDWR | O_CREAT, 0666, NULL);
if (mq->mq_in == -1)
{
+ ENL();
perror("mq_open");
return -1;
}
@@ -112,15 +106,96 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id)
perror("mq_open");
return -1;
}
+
+ PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out);
+
+ return 0;
+}
+
+int mq_ntf_mplx(mq_ntf_mdt *mq, int msec)
+{
+ fd_set io_fds;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ if (msec < 0)
+ {
+ PERM();
+ return -1;
+ }
+
+ //mq->io_mplx.tv.tv_sec = msec/1000;
+ //mq->io_mplx.tv.tv_usec = (msec%1000)*1000;
+ mq->io_mplx.tv.tv_sec = 2;
+ mq->io_mplx.tv.tv_usec = 0;
+ PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out);
+ FD_ZERO(&io_fds);
+ FD_SET(mq->mq_in, &io_fds);//in first
+ FD_SET(mq->mq_out, &io_fds);//out second
+ mq->io_mplx.io_fds = io_fds;
+
return 0;
}
+/* return number if there is something to read
+RETURN:
+ -1 some error
+ 0x00 nothing to read
+ 0x01 input ready
+ 0x02 output ready
+ 0x02|0x01 input&output ready
-/* return number if there is something to read */
+*/
+//if mplx havent done, everything could blow up
int mq_ntf_select(mq_ntf_mdt *mq)
{
+ int fdnum, fd1, fd2;
+ int ret=0x00;
+ fd_set io_fds;
+ struct timeval tv;
- return 0;
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ //we should allways have just 2 descriptors to listen
+ //io_fds = mq->io_mplx.io_fds;
+ fd1 = mq->mq_in;
+ fd2 = mq->mq_out;
+ FD_ZERO(&io_fds);
+ FD_SET(fd1, &io_fds);
+ FD_SET(fd2, &io_fds);
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ fdnum = select(2,
+ &io_fds,
+ NULL,
+ NULL,
+ &tv );
+
+ if (fdnum == -1)
+ {
+ ENL();
+ return -1;
+ } else {
+ if (FD_ISSET(mq->mq_in, &mq->io_mplx.io_fds))
+ {
+ ret = 0x01;
+ }
+
+ if (FD_ISSET(mq->mq_out, &mq->io_mplx.io_fds))
+ {
+ ret = 0x02;
+ }
+ }
+
+ return ret;
}
@@ -128,10 +203,10 @@ int mq_ntf_select(mq_ntf_mdt *mq)
int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
size_t bytes;
unsigned int prio = 10;
- int fret;
+ int fret=0;
+
fret = mq_getattr(mq->mq_in, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
@@ -146,21 +221,21 @@ int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size)
perror("mq_receive read in");
return -1;
}
- return 0;
+ return bytes;
}
/* read from output queue */
int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
+ //char *msg;
size_t bytes;
unsigned int prio = 10;
int fret;
fret = mq_getattr(mq->mq_out, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
- printf("queue %d out full", mq->id);
+ printf("queue %d out full\n", mq->id);
return -1;
}
bytes = mq_receive(mq->mq_out, buf,
@@ -171,7 +246,7 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
perror("mq_receive read out");
return -1;
}
- return 0;
+ return bytes;
}
@@ -179,10 +254,10 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
+ //char *msg;
size_t bytes;
unsigned int prio = 10;
- int fret;
+ int fret=0;
fret = mq_getattr(mq->mq_out, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
@@ -197,17 +272,17 @@ int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size)
perror("mq_send");
return -1;
}
- return 0;
+ return bytes;
}
/* write to input quque */
int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size)
{
struct mq_attr attr;
- char *msg;
+ //char *msg;
size_t bytes;
unsigned int prio = 10;
- int fret;
+ int fret=0;
fret = mq_getattr(mq->mq_in, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
@@ -222,43 +297,35 @@ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size)
perror("mq_send");
return -1;
}
- return 0;
+ return bytes;
}
-/* drain all messages from quque */
-int mq_ntf_drain(mq_ntf_mdt *mq)
+//dir - dirction 1 in 2 out
+int mq_ntf_count(mq_ntf_mdt *mq, int dir)
{
- int i,j;
- struct mq_attr curattr;
- char *buf;
- ssize_t bytes;
- int prio = 10;
- mqd_t mqlist[2] = {mq->mq_in, mq->mq_out};
-
- for (i=0;i<2;i++)
- {
- mq_getattr(mqlist[i], &curattr);
- buf = malloc(curattr.mq_maxmsg);
- for (j=0; j<curattr.mq_curmsgs; j++)
- {
- bytes = mq_receive(mqlist[i], buf, curattr.mq_maxmsg, &prio);
- if (bytes == -1)
- {
- perror("mq_receive drain");
- }
- printf("Drain %d bytes\n", bytes);
- }
- free(buf);
+ struct mq_attr attr;
+ int mq_fd;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
}
- return 0;
-}
+ if (dir == 1)
+ {
+ mq_fd = mq->mq_in;
+ } else
+ {
+ mq_fd = mq->mq_out;
+ }
-/* check if there is space in quque */
-int mq_ntf_full(mq_ntf_mdt *mq)
-{
+ if (mq_getattr(mq_fd, &attr) != -1)
+ {
+ return attr.mq_curmsgs;
+ }
- return 0;
+ return -1;
}
@@ -276,6 +343,8 @@ int mq_ntf_close(mq_ntf_mdt *mq)
snprintf(name, name_size, MQ_PREFIX"%d-out",id);
mq_unlink(name);
+ //clean file descriptors for io_mplx
+
return 0;
}
@@ -292,6 +361,210 @@ int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd)
return 0;
}
+int mq_drain_q(mqd_t mqd)
+{
+ struct mq_attr attr;
+ char *buf = NULL;
+ unsigned int prio = 0;
+ int cnt_msg = 0;
+ int num_read = 0;
+ int i = 0;
+
+ if (mq_getattr(mqd, &attr) == -1)
+ {
+ ENL();
+ return -1;
+ }
+
+ buf = malloc(attr.mq_msgsize);
+ if (buf == NULL)
+ {
+ ENL();
+ return -1;
+ }
+
+ for (i=0; i<attr.mq_curmsgs; i++)
+ {
+ num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio);
+ if (num_read == -1)
+ {
+ ENL();
+ free(buf);
+ return -1;
+ }
+ cnt_msg += 1;
+ }
+
+ free(buf);
+ return cnt_msg;
+}
+
+//drain all mq buffers to zero
+//to use at begining and clear all buffers to zero
+int mq_ntf_drain(mq_ntf_mdt *mq)
+{
+ int err;
+
+ err = mq_drain_q(mq->mq_in);
+ if (err < 0)
+ {
+ PERM();
+ return -1;
+ }
+ printf("Drained in %d messages\n", err);
+
+ err = mq_drain_q(mq->mq_out);
+ if (err < 0)
+ {
+ PERM();
+ return -1;
+ }
+ printf("Drained out %d messages\n", err);
+
+ return 0;
+}
+
+#define MQ_OUT 1
+#define MQ_IN 2
+
+int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr)
+{
+ struct mq_attr gattr;
+ mqd_t mqdt;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ if (dir == MQ_OUT)
+ {
+ mqdt = mq->mq_out;
+ } else if (dir == MQ_IN)
+ {
+ mqdt = mq->mq_in;
+ } else
+ {
+ return -1;
+ }
+
+ if (mq_getattr(mqdt, &gattr) == -1)
+ {
+ ENL();
+ return -1;
+ }
+
+ memcpy(*attr,&gattr,sizeof(struct mq_attr));
+
+ return 0;
+}
+
+
+
+int send_mq_cmd(mq_ntf_mdt *mq,
+ int io,
+ int cmd_id,
+ char *cmd,
+ size_t cmd_sz,
+ char *param,
+ size_t param_sz)
+{
+ int err = 0;
+ mq_cmd *cmd_send = NULL;
+ cmd_send = mq_cmd_create(cmd_id,cmd,cmd_sz,param,param_sz);
+ if (cmd_send == NULL)
+ {
+ printf("Cant create command\n");
+ return -1;
+ }
+
+ err = -1;
+ if (io == MQ_OUT)
+ {
+ err = mq_ntf_write_out(mq, cmd_send->buf, cmd_send->sz);
+ } else if (io == MQ_IN)
+ {
+ err = mq_ntf_write_in(mq, cmd_send->buf, cmd_send->sz);
+ } else
+ {
+ printf("Unknown direction");
+ return -1;
+ }
+
+ if (err == -1)
+ {
+ printf("Couldnt recieve command\n");
+ mq_cmd_free(cmd_send);
+ return -1;
+ }
+
+ mq_cmd_free(cmd_send);
+
+ return 0;
+}
+
+int recv_mq_cmd(mq_ntf_mdt *mq,
+ int io,
+ mq_cmd **cmd) //return back recieved command
+{
+ int err = 0;
+ mq_cmd *cmd_recv = NULL;
+ int recv_sz=-1;
+ char *in_buf=NULL;
+
+ in_buf = malloc(MQ_MSG_SIZE);
+ if (in_buf == NULL)
+ {
+ ENL();
+ return -1;
+ }
+
+ err = -1;
+ if (io == MQ_OUT)
+ {
+ err = mq_ntf_read_out(mq, in_buf, MQ_MSG_SIZE);
+ } else if (io == MQ_IN)
+ {
+ err = mq_ntf_read_in(mq, in_buf, MQ_MSG_SIZE);
+ } else
+ {
+ printf("Unknown direction");
+ return -1;
+ }
+
+ if (err == -1)
+ {
+ printf("Couldnt recieve command\n");
+ free(in_buf);
+ return -1;
+ }
+ recv_sz = err;
+
+ //in err should be still the size of recieved buffer
+ cmd_recv = mq_cmd_creates(in_buf, recv_sz);
+ if (cmd_recv == NULL)
+ {
+ printf("Cannot create cmd\n");
+ free(in_buf);
+ return -1;
+ }
+
+ *cmd = cmd_recv;
+ free(in_buf);
+
+ return 0;
+}
+
+#define SEND_CMD_IN(MQ,ID,CMD,PARAM) \
+ send_mq_cmd(MQ,MQ_IN,ID,CMD,strlen(CMD),PARAM,strlen(PARAM));
+#define SEND_CMD_OUT(MQ,ID,CMD,PARAM) \
+ send_mq_cmd(MQ,MQ_OUT,ID,CMD,strlen(CMD),PARAM,strlen(PARAM));
+#define RECV_CMD_IN(MQ,CMD) \
+ recv_mq_cmd(MQ,MQ_IN,CMD);
+#define RECV_CMD_OUT(MQ,CMD) \
+ recv_mq_cmd(MQ,MQ_OUT,CMD);
+
/*
|--------|<--IN---|---------|
@@ -331,33 +604,36 @@ typedef struct server_cfg
int th_start_client(void *data)
{
-
int cmd_id = 1;
- int ret = 0;
- char cmd_buf[MQ_MSG_SIZE];
- mq_cmd *cmd=NULL;
+ int err;
+ //char cmd_buf[MQ_MSG_SIZE];
+ //mq_cmd *cmd=NULL;
+ int run;
server_cfg *cfg = data;
mq_ntf_mdt *mq = cfg->mq;
atomic_fetch_add(&cfg->running,1);
printf("Start client\n");
printf("Server %d\n",cfg->tid);
- sleep(10);
+ sleep(1);
//send command wait for response
- cmd = CMD_CREATE(cmd_id,"PING","NOPARAM");
- if (cmd != NULL)
+ run = 1;
+ while (run)
{
- if (ret = mq_ntf_write_out(mq,cmd->buf,cmd->sz) == -1)
+ //printf("Send msg\n");
+ err = SEND_CMD_OUT(mq,cmd_id,"INIT","NO");
+ printf("err = %d\n",err);
+ cmd_id += 1;
+ run += 1;
+ sleep(1);
+ if (run == 10)
{
- printf("Couldnt send command\n");
+ break;
}
-
}
- cmd_id += 1;
-
printf("End client\n");
atomic_fetch_sub( &cfg->running,1);
@@ -389,41 +665,65 @@ typedef struct event_handler_cfg {
/* Thread to reacieve messages and return them back */
int th_event_manager(void *data)
{
- int state;
- int i;
event_handler_cfg *cfg = data;
atomic_fetch_add(&cfg->running,1);
- const int buf_size=MQ_MSG_SIZE;
- char buf[buf_size];
- int ret;
+ mq_ntf_mdt *mq=NULL;
+ char *buf = NULL;
+ int run;
+ int err;
+ int mq_event;
//read any command
- mq_cmd *cmd = NULL;
-
-
+ //mq_cmd *cmd = NULL;
+ struct mq_attr out_attr, *ptr_out_attr=&out_attr;
printf("Start event thread\n");
- state = EH_STATE_INIT;
- while (state != EH_STATE_EXIT)
- switch (state)
+ mq = cfg->mq_listen;
+ err = mq_ntf_mplx(mq, 10000);
+ if (err == -1)
{
- case EH_STATE_INIT:
- printf("TH STATE INIT\n");
- sleep(1);
- state = TH_STATE_START;
- break;
- case EH_STATE_START:
- printf("TH STATE START\n");
- sleep(1);
- state = TH_STATE_EXIT;
- break;
- case EH_STATE_EXIT:
- printf("TH STATE EXIT\n");
+ printf("Ups something whent wrong\n");
+ }
+
+ //get mq attributes
+ if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1)
+ {
+ printf("Cant get attribute\n");
+ }
+
+ buf = malloc(out_attr.mq_msgsize);
+ //maybe its not null
+
+ printf("Start event loop\n");
+ run = 1;
+ while(run)
+ {
+ //check for messages
+ run += 1;
+ mq_event = mq_ntf_select(mq);
+ switch(mq_event)
+ {
+ case 0:
+ break;
+ case 1:
+ break;
+ case 2:
+ if (mq_ntf_read_out(mq, buf, out_attr.mq_msgsize) == -1)
+ {
+ printf("Cant read message\n");
+ } else
+ {
+ buf[out_attr.mq_msgsize-1] = 0x0;
+ printf("Recieve %s\n", buf);
+ }
+ break;
+ default:
+ printf("Unknown event type\n");
+ }
sleep(1);
- state = TH_STATE_EXIT;
- break;
- default:
- printf("Wrong state\n");
+ if (run == 10)
+ break;
}
+
printf("End event thread\n");
atomic_fetch_sub( &cfg->running,1);
@@ -474,6 +774,8 @@ int main(int argc, char **argv)
printf("Couldnt open mq_ntf_open\n");
}
srvc->mq = &mq_array[i];
+ //try to drain mq
+ mq_ntf_drain(&mq_array[i]);
/* clone new proc */
clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc);
@@ -502,7 +804,7 @@ int main(int argc, char **argv)
if (val != 0)
cnt_running += 1;
}
- printf("cnt_running %d\n",cnt_running);
+ //PRINT("cnt_running %d\n",cnt_running);
sleep(1);
}