#include "mq_ntf.h" mq_ntf_mdt *mq_ntf_create(int id) { int fret; mq_ntf_mdt *ret = NULL; ret = malloc(sizeof(mq_ntf_mdt)); if (ret == NULL) { PERM(); return NULL; } memset(ret, 0, sizeof(mq_ntf_mdt)); fret = mq_ntf_open(ret, id); if (fret == -1) { PERM(); return NULL; } return ret; } /* return -1 on error */ int mq_ntf_open(mq_ntf_mdt *mq, int id) { const int name_size = 32; char name[name_size]; if (!mq) { PERM(); return -1; } mq->id = id; snprintf(name, name_size, MQ_PREFIX"%d-in",id); /*create with default configs*/ mq->mq_in = mq_open(name, O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_in == -1) { ENL(); perror("mq_open"); return -1; } snprintf(name, name_size, MQ_PREFIX"%d-out",id); /*create with default configs*/ mq->mq_out = mq_open(name, O_RDWR | O_CREAT, 0666, NULL); if (mq->mq_out == -1) { perror("mq_open"); return -1; } PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out); return 0; } /* return number if there is something to read RETURN: -1 some error 0x00 nothing to read 0x01 something in queue */ //if mplx havent done, everything could blow up int mq_ntf_select(mq_ntf_mdt *mq, int dir) { int fdnum, fd; int ret=0; fd_set io_fds; struct timeval tv; if (mq == NULL) { PERM(); return -1; } //we should allways have just 2 descriptors to listen //io_fds = mq->io_mplx.io_fds; if (dir == MQ_IN) { fd = mq->mq_in; } else if (dir == MQ_OUT) { fd = mq->mq_out; } else { ENL(); } //PRINT("fd=%d\n", fd); //PNL(); FD_ZERO(&io_fds); FD_SET(fd, &io_fds); tv.tv_sec = 0; tv.tv_usec = 100000; //PNL(); //first argument magic numbder fdnum = select(fd+1, &io_fds, NULL, NULL, &tv ); //PNL(); if (fdnum == -1) { //ENL(); return -1; } else if (fdnum == 0) { //ENL(); return 0; } else { //ENL(); if (FD_ISSET(fd, &io_fds)) { //PNL(); return 1; } } //PNL(); return ret; } int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size) { struct mq_attr attr; size_t bytes; mqd_t mqd; unsigned int prio = 10; if (dir == MQ_IN) { mqd = mq->mq_in; } else if (dir == MQ_OUT) { mqd = mq->mq_out; } else { perror("Wrong direction"); } mq_getattr(mqd, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d out full\n", mq->id); return -1; } //PNL(); /*bytes = mq_receive(mqd, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, &prio);*/ bytes = mq_receive(mqd, buf, size, &prio); //PNL(); if (bytes == -1) { perror("mq_receive read out"); return -1; } return bytes; } int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size) { struct mq_attr attr; size_t bytes; mqd_t mqd; unsigned int prio = 10; if (dir == MQ_IN) { mqd = mq->mq_in; } else if (dir == MQ_OUT) { mqd = mq->mq_out; } else { perror("Wrong direction"); } mq_getattr(mqd, &attr); if (attr.mq_maxmsg == attr.mq_curmsgs) { printf("queue %d out full\n", mq->id); return -1; } //PNL(); /*bytes = mq_send(mqd, buf, size > attr.mq_msgsize ? attr.mq_msgsize : size, prio);*/ bytes = mq_send(mqd, buf, size, prio); //PNL(); if (bytes == -1) { perror("mq_send"); return -1; } return bytes; } //dir - dirction 1 in 2 out int mq_ntf_count(mq_ntf_mdt *mq, int dir) { struct mq_attr attr; int mq_fd; if (mq == NULL) { PERM(); return -1; } if (dir == 1) { mq_fd = mq->mq_in; } else { mq_fd = mq->mq_out; } if (mq_getattr(mq_fd, &attr) != -1) { return attr.mq_curmsgs; } return -1; } /* close queue */ int mq_ntf_close(mq_ntf_mdt *mq) { const int name_size = 32; char name[name_size]; int id; id = mq->id; snprintf(name, name_size, MQ_PREFIX"%d-in",id); mq_unlink(name); snprintf(name, name_size, MQ_PREFIX"%d-out",id); mq_unlink(name); //clean file descriptors for io_mplx return 0; } int mq_drain_q(mqd_t mqd) { struct mq_attr attr; char *buf = NULL; unsigned int prio = 0; int cnt_msg = 0; int num_read = 0; int i = 0; if (mq_getattr(mqd, &attr) == -1) { ENL(); return -1; } buf = malloc(attr.mq_msgsize); if (buf == NULL) { ENL(); return -1; } for (i=0; imq_in); if (err < 0) { PERM(); return -1; } printf("Drained in %d messages\n", err); err = mq_drain_q(mq->mq_out); if (err < 0) { PERM(); return -1; } printf("Drained out %d messages\n", err); return 0; } int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr) { struct mq_attr gattr; mqd_t mqdt; if (mq == NULL) { PERM(); return -1; } if (dir == MQ_OUT) { mqdt = mq->mq_out; } else if (dir == MQ_IN) { mqdt = mq->mq_in; } else { return -1; } if (mq_getattr(mqdt, &gattr) == -1) { ENL(); return -1; } memcpy(*attr,&gattr,sizeof(struct mq_attr)); return 0; } void mq_ntf_free(mq_ntf_mdt *mq) { if (mq == NULL) { } }