diff options
Diffstat (limited to 'agni.c')
-rw-r--r-- | agni.c | 151 |
1 files changed, 104 insertions, 47 deletions
@@ -77,6 +77,7 @@ typedef struct mq_ntf_io_mplx #define MQ_OUT 1 #define MQ_IN 2 + /* supposed format agni-[id]-[in/out] in is for input of thread (send to in and thread will recieve), @@ -150,7 +151,7 @@ int mq_ntf_mplx(mq_ntf_mdt *mq, int msec) //mq->io_mplx.tv.tv_sec = msec/1000; //mq->io_mplx.tv.tv_usec = (msec%1000)*1000; - mq->io_mplx.tv.tv_sec = 2; + mq->io_mplx.tv.tv_sec = 1; mq->io_mplx.tv.tv_usec = 0; PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out); FD_ZERO(&io_fds); @@ -165,16 +166,14 @@ int mq_ntf_mplx(mq_ntf_mdt *mq, int msec) RETURN: -1 some error 0x00 nothing to read - 0x01 input ready - 0x02 output ready - 0x02|0x01 input&output ready + 0x01 something in queue */ //if mplx havent done, everything could blow up -int mq_ntf_select(mq_ntf_mdt *mq) +int mq_ntf_select(mq_ntf_mdt *mq, int dir) { - int fdnum, fd1, fd2; - int ret=0x00; + int fdnum, fd; + int ret=0; fd_set io_fds; struct timeval tv; @@ -186,14 +185,18 @@ int mq_ntf_select(mq_ntf_mdt *mq) //we should allways have just 2 descriptors to listen //io_fds = mq->io_mplx.io_fds; - fd1 = mq->mq_in; - fd2 = mq->mq_out; + if (dir == MQ_IN) + { + fd = mq->mq_in; + } else if (dir == MQ_OUT) + { + fd = mq->mq_out; + } FD_ZERO(&io_fds); - FD_SET(fd1, &io_fds); - FD_SET(fd2, &io_fds); + FD_SET(fd, &io_fds); tv.tv_sec = 1; tv.tv_usec = 0; - fdnum = select(2, + fdnum = select(1, &io_fds, NULL, NULL, @@ -204,14 +207,9 @@ int mq_ntf_select(mq_ntf_mdt *mq) ENL(); return -1; } else { - if (FD_ISSET(mq->mq_in, &mq->io_mplx.io_fds)) - { - ret = 0x01; - } - - if (FD_ISSET(mq->mq_out, &mq->io_mplx.io_fds)) + if (FD_ISSET(fd, &mq->io_mplx.io_fds)) { - ret = 0x02; + ret = 1; } } @@ -600,32 +598,88 @@ int th_start_client(void *data) //char cmd_buf[MQ_MSG_SIZE]; //mq_cmd *cmd=NULL; int run; + int mq_event; server_cfg *cfg = data; mq_ntf_mdt *mq = cfg->mq; + mq_cmd *recv_cmd = NULL; // for recieved cmd from mq + struct mq_attr out_attr, *ptr_out_attr=&out_attr; + struct mq_attr in_attr, *ptr_in_attr=&in_attr; + char *out_buf = NULL; + char *in_buf = NULL; + atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); sleep(1); + //prepare message queue + mq = cfg->mq; + err = mq_ntf_mplx(mq, 10000); + if (err == -1) + { + printf("Ups something whent wrong\n"); + } + if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) + { + printf("Cant get attribute\n"); + ENL(); + } + out_buf = malloc(out_attr.mq_msgsize); + + if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1) + { + printf("Cant get attribute\n"); + ENL(); + } + in_buf = malloc(in_attr.mq_msgsize); + + //send command wait for response run = 1; while (run) { - //printf("Send msg\n"); - err = SEND_CMD_OUT(mq,cmd_id,"FROM_CLIENT","NO"); - printf("err = %d\n",err); - cmd_id += 1; - run += 1; - sleep(1); - if (run == 2) + printf("Client loop tick\n"); + mq_event = mq_ntf_select(mq, MQ_IN); + switch(mq_event) { + case 0: + PRINT("EVENT 0\n"); break; + case 1: + PRINT("EVENT 1\n"); + if (mq_ntf_read(mq, MQ_IN, in_buf, in_attr.mq_msgsize) == -1) + { + printf("Cant read input message \n"); + } else + { + in_buf[in_attr.mq_msgsize-1] = 0x0; + printf("Recieve %s\n", in_buf); + } + break; + default: + printf("Unknown event type\n"); + } + sleep(1); + + if (mq_event == 1) + { + PNL(); + recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1); + if (recv_cmd != NULL) + { + if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) + { + printf("QUIT recieved lets quit main loop\n"); + break; + } + } } } + printf("End client\n"); atomic_fetch_sub( &cfg->running,1); @@ -690,17 +744,17 @@ int th_event_manager(void *data) { //check if there is some message and save it to buffer run += 1; - mq_event = mq_ntf_select(mq); + mq_event = mq_ntf_select(mq,MQ_OUT); switch(mq_event) { case 0: + PRINT("EVENT 0\n"); break; case 1: - break; - case 2: + PRINT("EVENT MQ_OUT\n"); if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1) { - printf("Cant read message\n"); + printf("Cant read output message\n"); } else { out_buf[out_attr.mq_msgsize-1] = 0x0; @@ -715,28 +769,31 @@ int th_event_manager(void *data) // break; //if QUIT then quit the thread - recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, -1); - if (recv_cmd != NULL) + if (mq_event == 1) { - PNL(); - if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) + recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, -1); + if (recv_cmd != NULL) { - printf("QUIT recieved lets quit main loop\n"); - break; - } + PNL(); + if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0) + { + printf("QUIT recieved lets quit main loop\n"); + break; + } + + if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD1") == 0) + { + printf("Hey dude it works\n"); + } + + if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0) + { + printf("Hey dude it works second time\n"); + } - if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD1") == 0) - { - printf("Hey dude it works\n"); - } - - if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0) - { - printf("Hey dude it works second time\n"); } - } - + //applay to recieved command executor //other command pass to cmd/execution matching table |