summaryrefslogtreecommitdiff
path: root/mq_ntf.c
diff options
context:
space:
mode:
Diffstat (limited to 'mq_ntf.c')
-rw-r--r--[l---------]mq_ntf.c381
1 files changed, 380 insertions, 1 deletions
diff --git a/mq_ntf.c b/mq_ntf.c
index b204385..d7c119b 120000..100644
--- a/mq_ntf.c
+++ b/mq_ntf.c
@@ -1 +1,380 @@
-lib/mq_ntf/mq_ntf.c \ No newline at end of file
+#include "mq_ntf.h"
+
+mq_ntf_mdt *mq_ntf_create(int id)
+{
+ int fret;
+ mq_ntf_mdt *ret = NULL;
+
+ ret = malloc(sizeof(mq_ntf_mdt));
+ if (ret == NULL)
+ {
+ PERM();
+ return NULL;
+ }
+
+ memset(ret, 0, sizeof(mq_ntf_mdt));
+
+ fret = mq_ntf_open(ret, id);
+ if (fret == -1)
+ {
+ PERM();
+ return NULL;
+ }
+
+ return ret;
+}
+
+/*
+return -1 on error
+*/
+int mq_ntf_open(mq_ntf_mdt *mq, int id)
+{
+ const int name_size = 32;
+ char name[name_size];
+
+ if (!mq)
+ {
+ PERM();
+ return -1;
+ }
+ mq->id = id;
+ snprintf(name, name_size, MQ_PREFIX"%d-in",id);
+ /*create with default configs*/
+ mq->mq_in = mq_open(name,
+ O_RDWR | O_CREAT, 0666, NULL);
+ if (mq->mq_in == -1)
+ {
+ ENL();
+ perror("mq_open");
+ return -1;
+ }
+ snprintf(name, name_size, MQ_PREFIX"%d-out",id);
+ /*create with default configs*/
+ mq->mq_out = mq_open(name,
+ O_RDWR | O_CREAT, 0666, NULL);
+ if (mq->mq_out == -1)
+ {
+ perror("mq_open");
+ return -1;
+ }
+
+ PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out);
+
+ return 0;
+}
+
+/* return number if there is something to read
+RETURN:
+ -1 some error
+ 0x00 nothing to read
+ 0x01 something in queue
+
+*/
+//if mplx havent done, everything could blow up
+int mq_ntf_select(mq_ntf_mdt *mq, int dir)
+{
+ int fdnum, fd;
+ int ret=0;
+ fd_set io_fds;
+ struct timeval tv;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ //we should allways have just 2 descriptors to listen
+ //io_fds = mq->io_mplx.io_fds;
+ if (dir == MQ_IN)
+ {
+ fd = mq->mq_in;
+ } else if (dir == MQ_OUT)
+ {
+ fd = mq->mq_out;
+ } else
+ {
+ ENL();
+ }
+ //PRINT("fd=%d\n", fd);
+ //PNL();
+ FD_ZERO(&io_fds);
+ FD_SET(fd, &io_fds);
+ tv.tv_sec = 3;
+ tv.tv_usec = 0;
+ //PNL();
+ //first argument magic numbder
+ fdnum = select(fd+1,
+ &io_fds,
+ NULL,
+ NULL,
+ &tv );
+
+ //PNL();
+ if (fdnum == -1)
+ {
+ //ENL();
+ return -1;
+ } else if (fdnum == 0) {
+ //ENL();
+ return 0;
+ } else {
+ //ENL();
+ if (FD_ISSET(fd, &io_fds))
+ {
+ //PNL();
+ return 1;
+ }
+ }
+ //PNL();
+
+ return ret;
+}
+
+int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size)
+{
+ struct mq_attr attr;
+ size_t bytes;
+ mqd_t mqd;
+
+ unsigned int prio = 10;
+
+ if (dir == MQ_IN)
+ {
+ mqd = mq->mq_in;
+ } else if (dir == MQ_OUT)
+ {
+ mqd = mq->mq_out;
+ } else
+ {
+ perror("Wrong direction");
+ }
+
+ mq_getattr(mqd, &attr);
+ if (attr.mq_maxmsg == attr.mq_curmsgs)
+ {
+ printf("queue %d out full\n", mq->id);
+ return -1;
+ }
+ PNL();
+ /*bytes = mq_receive(mqd, buf,
+ size > attr.mq_msgsize ? attr.mq_msgsize : size,
+ &prio);*/
+ bytes = mq_receive(mqd, buf, size, &prio);
+ PNL();
+ if (bytes == -1)
+ {
+ perror("mq_receive read out");
+ return -1;
+ }
+
+ return bytes;
+}
+
+int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size)
+{
+ struct mq_attr attr;
+ size_t bytes;
+ mqd_t mqd;
+
+ unsigned int prio = 10;
+
+ if (dir == MQ_IN)
+ {
+ mqd = mq->mq_in;
+ } else if (dir == MQ_OUT)
+ {
+ mqd = mq->mq_out;
+ } else
+ {
+ perror("Wrong direction");
+ }
+
+ mq_getattr(mqd, &attr);
+ if (attr.mq_maxmsg == attr.mq_curmsgs)
+ {
+ printf("queue %d out full\n", mq->id);
+ return -1;
+ }
+ PNL();
+ /*bytes = mq_send(mqd, buf,
+ size > attr.mq_msgsize ? attr.mq_msgsize : size,
+ prio);*/
+ bytes = mq_send(mqd, buf, size, prio);
+ PNL();
+ if (bytes == -1)
+ {
+ perror("mq_send");
+ return -1;
+ }
+
+ return bytes;
+}
+
+//dir - dirction 1 in 2 out
+int mq_ntf_count(mq_ntf_mdt *mq, int dir)
+{
+ struct mq_attr attr;
+ int mq_fd;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ if (dir == 1)
+ {
+ mq_fd = mq->mq_in;
+ } else
+ {
+ mq_fd = mq->mq_out;
+ }
+
+ if (mq_getattr(mq_fd, &attr) != -1)
+ {
+ return attr.mq_curmsgs;
+ }
+
+ return -1;
+}
+
+
+/* close queue */
+int mq_ntf_close(mq_ntf_mdt *mq)
+{
+ const int name_size = 32;
+ char name[name_size];
+ int id;
+
+ id = mq->id;
+ snprintf(name, name_size, MQ_PREFIX"%d-in",id);
+ mq_unlink(name);
+
+ snprintf(name, name_size, MQ_PREFIX"%d-out",id);
+ mq_unlink(name);
+
+ //clean file descriptors for io_mplx
+
+ return 0;
+}
+
+//send command
+int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd)
+{
+ ENL();
+ return 0;
+}
+
+//recieve command from other end
+int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd)
+{
+ ENL();
+ return 0;
+}
+
+int mq_drain_q(mqd_t mqd)
+{
+ struct mq_attr attr;
+ char *buf = NULL;
+ unsigned int prio = 0;
+ int cnt_msg = 0;
+ int num_read = 0;
+ int i = 0;
+
+ if (mq_getattr(mqd, &attr) == -1)
+ {
+ ENL();
+ return -1;
+ }
+
+ buf = malloc(attr.mq_msgsize);
+ if (buf == NULL)
+ {
+ ENL();
+ return -1;
+ }
+
+ for (i=0; i<attr.mq_curmsgs; i++)
+ {
+ num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio);
+ if (num_read == -1)
+ {
+ ENL();
+ free(buf);
+ return -1;
+ }
+ cnt_msg += 1;
+ }
+
+ free(buf);
+ return cnt_msg;
+}
+
+//drain all mq buffers to zero
+//to use at begining and clear all buffers to zero
+int mq_ntf_drain(mq_ntf_mdt *mq)
+{
+ int err;
+
+ err = mq_drain_q(mq->mq_in);
+ if (err < 0)
+ {
+ PERM();
+ return -1;
+ }
+ printf("Drained in %d messages\n", err);
+
+ err = mq_drain_q(mq->mq_out);
+ if (err < 0)
+ {
+ PERM();
+ return -1;
+ }
+ printf("Drained out %d messages\n", err);
+
+ return 0;
+}
+
+
+
+int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr)
+{
+ struct mq_attr gattr;
+ mqd_t mqdt;
+
+ if (mq == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ if (dir == MQ_OUT)
+ {
+ mqdt = mq->mq_out;
+ } else if (dir == MQ_IN)
+ {
+ mqdt = mq->mq_in;
+ } else
+ {
+ return -1;
+ }
+
+ if (mq_getattr(mqdt, &gattr) == -1)
+ {
+ ENL();
+ return -1;
+ }
+
+ memcpy(*attr,&gattr,sizeof(struct mq_attr));
+
+ return 0;
+}
+
+void mq_ntf_free(mq_ntf_mdt *mq)
+{
+ if (mq == NULL)
+ {
+
+ }
+} \ No newline at end of file