aboutsummaryrefslogtreecommitdiffstats
path: root/agni.c
diff options
context:
space:
mode:
authorZoRo <dos21h@gmail.com>2017-02-24 20:45:20 +0000
committerZoRo <dos21h@gmail.com>2017-02-24 20:45:20 +0000
commitc57abcff74e57baadd252d7d461b2648cdfbd8b2 (patch)
tree6412b2e6a8ef3a3e0e93adc5175dc9c26aa45b9c /agni.c
parentabfd02fc7233734ffcc3c396efcbedddfba49727 (diff)
downloadagni-c57abcff74e57baadd252d7d461b2648cdfbd8b2.tar.gz
agni-c57abcff74e57baadd252d7d461b2648cdfbd8b2.zip
Some lates updates
Diffstat (limited to 'agni.c')
-rw-r--r--agni.c151
1 files changed, 104 insertions, 47 deletions
diff --git a/agni.c b/agni.c
index ba35505..9b13cc2 100644
--- a/agni.c
+++ b/agni.c
@@ -77,6 +77,7 @@ typedef struct mq_ntf_io_mplx
#define MQ_OUT 1
#define MQ_IN 2
+
/*
supposed format agni-[id]-[in/out]
in is for input of thread (send to in and thread will recieve),
@@ -150,7 +151,7 @@ int mq_ntf_mplx(mq_ntf_mdt *mq, int msec)
//mq->io_mplx.tv.tv_sec = msec/1000;
//mq->io_mplx.tv.tv_usec = (msec%1000)*1000;
- mq->io_mplx.tv.tv_sec = 2;
+ mq->io_mplx.tv.tv_sec = 1;
mq->io_mplx.tv.tv_usec = 0;
PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out);
FD_ZERO(&io_fds);
@@ -165,16 +166,14 @@ int mq_ntf_mplx(mq_ntf_mdt *mq, int msec)
RETURN:
-1 some error
0x00 nothing to read
- 0x01 input ready
- 0x02 output ready
- 0x02|0x01 input&output ready
+ 0x01 something in queue
*/
//if mplx havent done, everything could blow up
-int mq_ntf_select(mq_ntf_mdt *mq)
+int mq_ntf_select(mq_ntf_mdt *mq, int dir)
{
- int fdnum, fd1, fd2;
- int ret=0x00;
+ int fdnum, fd;
+ int ret=0;
fd_set io_fds;
struct timeval tv;
@@ -186,14 +185,18 @@ int mq_ntf_select(mq_ntf_mdt *mq)
//we should allways have just 2 descriptors to listen
//io_fds = mq->io_mplx.io_fds;
- fd1 = mq->mq_in;
- fd2 = mq->mq_out;
+ if (dir == MQ_IN)
+ {
+ fd = mq->mq_in;
+ } else if (dir == MQ_OUT)
+ {
+ fd = mq->mq_out;
+ }
FD_ZERO(&io_fds);
- FD_SET(fd1, &io_fds);
- FD_SET(fd2, &io_fds);
+ FD_SET(fd, &io_fds);
tv.tv_sec = 1;
tv.tv_usec = 0;
- fdnum = select(2,
+ fdnum = select(1,
&io_fds,
NULL,
NULL,
@@ -204,14 +207,9 @@ int mq_ntf_select(mq_ntf_mdt *mq)
ENL();
return -1;
} else {
- if (FD_ISSET(mq->mq_in, &mq->io_mplx.io_fds))
- {
- ret = 0x01;
- }
-
- if (FD_ISSET(mq->mq_out, &mq->io_mplx.io_fds))
+ if (FD_ISSET(fd, &mq->io_mplx.io_fds))
{
- ret = 0x02;
+ ret = 1;
}
}
@@ -600,32 +598,88 @@ int th_start_client(void *data)
//char cmd_buf[MQ_MSG_SIZE];
//mq_cmd *cmd=NULL;
int run;
+ int mq_event;
server_cfg *cfg = data;
mq_ntf_mdt *mq = cfg->mq;
+ mq_cmd *recv_cmd = NULL; // for recieved cmd from mq
+ struct mq_attr out_attr, *ptr_out_attr=&out_attr;
+ struct mq_attr in_attr, *ptr_in_attr=&in_attr;
+ char *out_buf = NULL;
+ char *in_buf = NULL;
+
atomic_fetch_add(&cfg->running,1);
printf("Start client\n");
printf("Server %d\n",cfg->tid);
sleep(1);
+ //prepare message queue
+ mq = cfg->mq;
+ err = mq_ntf_mplx(mq, 10000);
+ if (err == -1)
+ {
+ printf("Ups something whent wrong\n");
+ }
+ if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1)
+ {
+ printf("Cant get attribute\n");
+ ENL();
+ }
+ out_buf = malloc(out_attr.mq_msgsize);
+
+ if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1)
+ {
+ printf("Cant get attribute\n");
+ ENL();
+ }
+ in_buf = malloc(in_attr.mq_msgsize);
+
+
//send command wait for response
run = 1;
while (run)
{
- //printf("Send msg\n");
- err = SEND_CMD_OUT(mq,cmd_id,"FROM_CLIENT","NO");
- printf("err = %d\n",err);
- cmd_id += 1;
- run += 1;
- sleep(1);
- if (run == 2)
+ printf("Client loop tick\n");
+ mq_event = mq_ntf_select(mq, MQ_IN);
+ switch(mq_event)
{
+ case 0:
+ PRINT("EVENT 0\n");
break;
+ case 1:
+ PRINT("EVENT 1\n");
+ if (mq_ntf_read(mq, MQ_IN, in_buf, in_attr.mq_msgsize) == -1)
+ {
+ printf("Cant read input message \n");
+ } else
+ {
+ in_buf[in_attr.mq_msgsize-1] = 0x0;
+ printf("Recieve %s\n", in_buf);
+ }
+ break;
+ default:
+ printf("Unknown event type\n");
+ }
+ sleep(1);
+
+ if (mq_event == 1)
+ {
+ PNL();
+ recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1);
+ if (recv_cmd != NULL)
+ {
+ if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0)
+ {
+ printf("QUIT recieved lets quit main loop\n");
+ break;
+ }
+ }
}
}
+
printf("End client\n");
atomic_fetch_sub( &cfg->running,1);
@@ -690,17 +744,17 @@ int th_event_manager(void *data)
{
//check if there is some message and save it to buffer
run += 1;
- mq_event = mq_ntf_select(mq);
+ mq_event = mq_ntf_select(mq,MQ_OUT);
switch(mq_event)
{
case 0:
+ PRINT("EVENT 0\n");
break;
case 1:
- break;
- case 2:
+ PRINT("EVENT MQ_OUT\n");
if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1)
{
- printf("Cant read message\n");
+ printf("Cant read output message\n");
} else
{
out_buf[out_attr.mq_msgsize-1] = 0x0;
@@ -715,28 +769,31 @@ int th_event_manager(void *data)
// break;
//if QUIT then quit the thread
- recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, -1);
- if (recv_cmd != NULL)
+ if (mq_event == 1)
{
- PNL();
- if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0)
+ recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, -1);
+ if (recv_cmd != NULL)
{
- printf("QUIT recieved lets quit main loop\n");
- break;
- }
+ PNL();
+ if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0)
+ {
+ printf("QUIT recieved lets quit main loop\n");
+ break;
+ }
+
+ if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD1") == 0)
+ {
+ printf("Hey dude it works\n");
+ }
+
+ if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0)
+ {
+ printf("Hey dude it works second time\n");
+ }
- if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD1") == 0)
- {
- printf("Hey dude it works\n");
- }
-
- if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0)
- {
- printf("Hey dude it works second time\n");
}
-
}
-
+
//applay to recieved command executor
//other command pass to cmd/execution matching table