diff options
Diffstat (limited to 'agni.c')
-rw-r--r-- | agni.c | 498 |
1 files changed, 400 insertions, 98 deletions
@@ -49,23 +49,13 @@ int irc_connect( char *hostname, char *port ) return fd; } -char *create_mq_cmd(char *out, size_t out_sz, int id, char *cmd, size_t cmd_sz, char *payload, size_t p_sz) +//async io multiplexer +typedef struct mq_ntf_io_mplx { - char *ret = out; - int n,i; + struct timeval tv; + fd_set io_fds; +} mq_ntf_io_mplx; - n = snprintf(out, out_sz, ":%d:%s ", id, cmd); - if (n < 0) - { - return NULL; - } - for (i=0;i<p_sz,n<out_sz-1;i++,n++) - { - out[n] = payload[i]; - } - out[n++]=0x0; - return ret; -} /* supposed format agni-[id]-[in/out] @@ -78,9 +68,11 @@ typedef struct mq_ntf_mdt int id; mqd_t mq_in; mqd_t mq_out; + mq_ntf_io_mplx io_mplx; //_Atomic int used; nado? } mq_ntf_mdt; + /* return -1 on error */ @@ -91,6 +83,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id) if (!mq) { + PERM(); return -1; } mq->id = id; @@ -100,6 +93,7 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id) O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_in == -1) { + ENL(); perror("mq_open"); return -1; } @@ -112,15 +106,96 @@ int mq_ntf_open(mq_ntf_mdt *mq, int id) perror("mq_open"); return -1; } + + PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out); + + return 0; +} + +int mq_ntf_mplx(mq_ntf_mdt *mq, int msec) +{ + fd_set io_fds; + + if (mq == NULL) + { + PERM(); + return -1; + } + + if (msec < 0) + { + PERM(); + return -1; + } + + //mq->io_mplx.tv.tv_sec = msec/1000; + //mq->io_mplx.tv.tv_usec = (msec%1000)*1000; + mq->io_mplx.tv.tv_sec = 2; + mq->io_mplx.tv.tv_usec = 0; + PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out); + FD_ZERO(&io_fds); + FD_SET(mq->mq_in, &io_fds);//in first + FD_SET(mq->mq_out, &io_fds);//out second + mq->io_mplx.io_fds = io_fds; + return 0; } +/* return number if there is something to read +RETURN: + -1 some error + 0x00 nothing to read + 0x01 input ready + 0x02 output ready + 0x02|0x01 input&output ready -/* return number if there is something to read */ +*/ +//if mplx havent done, everything could blow up int mq_ntf_select(mq_ntf_mdt *mq) { + int fdnum, fd1, fd2; + int ret=0x00; + fd_set io_fds; + struct timeval tv; - return 0; + if (mq == NULL) + { + PERM(); + return -1; + } + + //we should allways have just 2 descriptors to listen + //io_fds = mq->io_mplx.io_fds; + fd1 = mq->mq_in; + fd2 = mq->mq_out; + FD_ZERO(&io_fds); + FD_SET(fd1, &io_fds); + FD_SET(fd2, &io_fds); + tv.tv_sec = 1; + tv.tv_usec = 0; + fdnum = select(2, + &io_fds, + NULL, + NULL, + &tv ); + + if (fdnum == -1) + { + ENL(); + return -1; + } else { + if (FD_ISSET(mq->mq_in, &mq->io_mplx.io_fds)) + { + ret = 0x01; + } + + if (FD_ISSET(mq->mq_out, &mq->io_mplx.io_fds)) + { + ret = 0x02; + } + } + + return ret; } @@ -128,10 +203,10 @@ int mq_ntf_select(mq_ntf_mdt *mq) int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size) { struct mq_attr attr; - char *msg; size_t bytes; unsigned int prio = 10; - int fret; + int fret=0; + fret = mq_getattr(mq->mq_in, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { @@ -146,21 +221,21 @@ int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size) perror("mq_receive read in"); return -1; } - return 0; + return bytes; } /* read from output queue */ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) { struct mq_attr attr; - char *msg; + //char *msg; size_t bytes; unsigned int prio = 10; int fret; fret = mq_getattr(mq->mq_out, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { - printf("queue %d out full", mq->id); + printf("queue %d out full\n", mq->id); return -1; } bytes = mq_receive(mq->mq_out, buf, @@ -171,7 +246,7 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) perror("mq_receive read out"); return -1; } - return 0; + return bytes; } @@ -179,10 +254,10 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size) { struct mq_attr attr; - char *msg; + //char *msg; size_t bytes; unsigned int prio = 10; - int fret; + int fret=0; fret = mq_getattr(mq->mq_out, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { @@ -197,17 +272,17 @@ int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size) perror("mq_send"); return -1; } - return 0; + return bytes; } /* write to input quque */ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size) { struct mq_attr attr; - char *msg; + //char *msg; size_t bytes; unsigned int prio = 10; - int fret; + int fret=0; fret = mq_getattr(mq->mq_in, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { @@ -222,43 +297,35 @@ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size) perror("mq_send"); return -1; } - return 0; + return bytes; } -/* drain all messages from quque */ -int mq_ntf_drain(mq_ntf_mdt *mq) +//dir - dirction 1 in 2 out +int mq_ntf_count(mq_ntf_mdt *mq, int dir) { - int i,j; - struct mq_attr curattr; - char *buf; - ssize_t bytes; - int prio = 10; - mqd_t mqlist[2] = {mq->mq_in, mq->mq_out}; - - for (i=0;i<2;i++) - { - mq_getattr(mqlist[i], &curattr); - buf = malloc(curattr.mq_maxmsg); - for (j=0; j<curattr.mq_curmsgs; j++) - { - bytes = mq_receive(mqlist[i], buf, curattr.mq_maxmsg, &prio); - if (bytes == -1) - { - perror("mq_receive drain"); - } - printf("Drain %d bytes\n", bytes); - } - free(buf); + struct mq_attr attr; + int mq_fd; + + if (mq == NULL) + { + PERM(); + return -1; } - return 0; -} + if (dir == 1) + { + mq_fd = mq->mq_in; + } else + { + mq_fd = mq->mq_out; + } -/* check if there is space in quque */ -int mq_ntf_full(mq_ntf_mdt *mq) -{ + if (mq_getattr(mq_fd, &attr) != -1) + { + return attr.mq_curmsgs; + } - return 0; + return -1; } @@ -276,6 +343,8 @@ int mq_ntf_close(mq_ntf_mdt *mq) snprintf(name, name_size, MQ_PREFIX"%d-out",id); mq_unlink(name); + //clean file descriptors for io_mplx + return 0; } @@ -292,6 +361,210 @@ int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd) return 0; } +int mq_drain_q(mqd_t mqd) +{ + struct mq_attr attr; + char *buf = NULL; + unsigned int prio = 0; + int cnt_msg = 0; + int num_read = 0; + int i = 0; + + if (mq_getattr(mqd, &attr) == -1) + { + ENL(); + return -1; + } + + buf = malloc(attr.mq_msgsize); + if (buf == NULL) + { + ENL(); + return -1; + } + + for (i=0; i<attr.mq_curmsgs; i++) + { + num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio); + if (num_read == -1) + { + ENL(); + free(buf); + return -1; + } + cnt_msg += 1; + } + + free(buf); + return cnt_msg; +} + +//drain all mq buffers to zero +//to use at begining and clear all buffers to zero +int mq_ntf_drain(mq_ntf_mdt *mq) +{ + int err; + + err = mq_drain_q(mq->mq_in); + if (err < 0) + { + PERM(); + return -1; + } + printf("Drained in %d messages\n", err); + + err = mq_drain_q(mq->mq_out); + if (err < 0) + { + PERM(); + return -1; + } + printf("Drained out %d messages\n", err); + + return 0; +} + +#define MQ_OUT 1 +#define MQ_IN 2 + +int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr) +{ + struct mq_attr gattr; + mqd_t mqdt; + + if (mq == NULL) + { + PERM(); + return -1; + } + + if (dir == MQ_OUT) + { + mqdt = mq->mq_out; + } else if (dir == MQ_IN) + { + mqdt = mq->mq_in; + } else + { + return -1; + } + + if (mq_getattr(mqdt, &gattr) == -1) + { + ENL(); + return -1; + } + + memcpy(*attr,&gattr,sizeof(struct mq_attr)); + + return 0; +} + + + +int send_mq_cmd(mq_ntf_mdt *mq, + int io, + int cmd_id, + char *cmd, + size_t cmd_sz, + char *param, + size_t param_sz) +{ + int err = 0; + mq_cmd *cmd_send = NULL; + cmd_send = mq_cmd_create(cmd_id,cmd,cmd_sz,param,param_sz); + if (cmd_send == NULL) + { + printf("Cant create command\n"); + return -1; + } + + err = -1; + if (io == MQ_OUT) + { + err = mq_ntf_write_out(mq, cmd_send->buf, cmd_send->sz); + } else if (io == MQ_IN) + { + err = mq_ntf_write_in(mq, cmd_send->buf, cmd_send->sz); + } else + { + printf("Unknown direction"); + return -1; + } + + if (err == -1) + { + printf("Couldnt recieve command\n"); + mq_cmd_free(cmd_send); + return -1; + } + + mq_cmd_free(cmd_send); + + return 0; +} + +int recv_mq_cmd(mq_ntf_mdt *mq, + int io, + mq_cmd **cmd) //return back recieved command +{ + int err = 0; + mq_cmd *cmd_recv = NULL; + int recv_sz=-1; + char *in_buf=NULL; + + in_buf = malloc(MQ_MSG_SIZE); + if (in_buf == NULL) + { + ENL(); + return -1; + } + + err = -1; + if (io == MQ_OUT) + { + err = mq_ntf_read_out(mq, in_buf, MQ_MSG_SIZE); + } else if (io == MQ_IN) + { + err = mq_ntf_read_in(mq, in_buf, MQ_MSG_SIZE); + } else + { + printf("Unknown direction"); + return -1; + } + + if (err == -1) + { + printf("Couldnt recieve command\n"); + free(in_buf); + return -1; + } + recv_sz = err; + + //in err should be still the size of recieved buffer + cmd_recv = mq_cmd_creates(in_buf, recv_sz); + if (cmd_recv == NULL) + { + printf("Cannot create cmd\n"); + free(in_buf); + return -1; + } + + *cmd = cmd_recv; + free(in_buf); + + return 0; +} + +#define SEND_CMD_IN(MQ,ID,CMD,PARAM) \ + send_mq_cmd(MQ,MQ_IN,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); +#define SEND_CMD_OUT(MQ,ID,CMD,PARAM) \ + send_mq_cmd(MQ,MQ_OUT,ID,CMD,strlen(CMD),PARAM,strlen(PARAM)); +#define RECV_CMD_IN(MQ,CMD) \ + recv_mq_cmd(MQ,MQ_IN,CMD); +#define RECV_CMD_OUT(MQ,CMD) \ + recv_mq_cmd(MQ,MQ_OUT,CMD); + /* |--------|<--IN---|---------| @@ -331,33 +604,36 @@ typedef struct server_cfg int th_start_client(void *data) { - int cmd_id = 1; - int ret = 0; - char cmd_buf[MQ_MSG_SIZE]; - mq_cmd *cmd=NULL; + int err; + //char cmd_buf[MQ_MSG_SIZE]; + //mq_cmd *cmd=NULL; + int run; server_cfg *cfg = data; mq_ntf_mdt *mq = cfg->mq; atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); - sleep(10); + sleep(1); //send command wait for response - cmd = CMD_CREATE(cmd_id,"PING","NOPARAM"); - if (cmd != NULL) + run = 1; + while (run) { - if (ret = mq_ntf_write_out(mq,cmd->buf,cmd->sz) == -1) + //printf("Send msg\n"); + err = SEND_CMD_OUT(mq,cmd_id,"INIT","NO"); + printf("err = %d\n",err); + cmd_id += 1; + run += 1; + sleep(1); + if (run == 10) { - printf("Couldnt send command\n"); + break; } - } - cmd_id += 1; - printf("End client\n"); atomic_fetch_sub( &cfg->running,1); @@ -389,41 +665,65 @@ typedef struct event_handler_cfg { /* Thread to reacieve messages and return them back */ int th_event_manager(void *data) { - int state; - int i; event_handler_cfg *cfg = data; atomic_fetch_add(&cfg->running,1); - const int buf_size=MQ_MSG_SIZE; - char buf[buf_size]; - int ret; + mq_ntf_mdt *mq=NULL; + char *buf = NULL; + int run; + int err; + int mq_event; //read any command - mq_cmd *cmd = NULL; - - + //mq_cmd *cmd = NULL; + struct mq_attr out_attr, *ptr_out_attr=&out_attr; printf("Start event thread\n"); - state = EH_STATE_INIT; - while (state != EH_STATE_EXIT) - switch (state) + mq = cfg->mq_listen; + err = mq_ntf_mplx(mq, 10000); + if (err == -1) { - case EH_STATE_INIT: - printf("TH STATE INIT\n"); - sleep(1); - state = TH_STATE_START; - break; - case EH_STATE_START: - printf("TH STATE START\n"); - sleep(1); - state = TH_STATE_EXIT; - break; - case EH_STATE_EXIT: - printf("TH STATE EXIT\n"); + printf("Ups something whent wrong\n"); + } + + //get mq attributes + if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) + { + printf("Cant get attribute\n"); + } + + buf = malloc(out_attr.mq_msgsize); + //maybe its not null + + printf("Start event loop\n"); + run = 1; + while(run) + { + //check for messages + run += 1; + mq_event = mq_ntf_select(mq); + switch(mq_event) + { + case 0: + break; + case 1: + break; + case 2: + if (mq_ntf_read_out(mq, buf, out_attr.mq_msgsize) == -1) + { + printf("Cant read message\n"); + } else + { + buf[out_attr.mq_msgsize-1] = 0x0; + printf("Recieve %s\n", buf); + } + break; + default: + printf("Unknown event type\n"); + } sleep(1); - state = TH_STATE_EXIT; - break; - default: - printf("Wrong state\n"); + if (run == 10) + break; } + printf("End event thread\n"); atomic_fetch_sub( &cfg->running,1); @@ -474,6 +774,8 @@ int main(int argc, char **argv) printf("Couldnt open mq_ntf_open\n"); } srvc->mq = &mq_array[i]; + //try to drain mq + mq_ntf_drain(&mq_array[i]); /* clone new proc */ clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc); @@ -502,7 +804,7 @@ int main(int argc, char **argv) if (val != 0) cnt_running += 1; } - printf("cnt_running %d\n",cnt_running); + //PRINT("cnt_running %d\n",cnt_running); sleep(1); } |