From fc16bb9a991b7ba1e60aa3f9282323deaf80c077 Mon Sep 17 00:00:00 2001 From: FreeArtMan Date: Sun, 5 Mar 2017 22:45:47 +0000 Subject: Pushed full versions of mq_ntf --- mq_ntf.c | 381 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- mq_ntf.h | 53 ++++++++- 2 files changed, 432 insertions(+), 2 deletions(-) mode change 120000 => 100644 mq_ntf.c mode change 120000 => 100644 mq_ntf.h diff --git a/mq_ntf.c b/mq_ntf.c deleted file mode 120000 index b204385..0000000 --- a/mq_ntf.c +++ /dev/null @@ -1 +0,0 @@ -lib/mq_ntf/mq_ntf.c \ No newline at end of file diff --git a/mq_ntf.c b/mq_ntf.c new file mode 100644 index 0000000..d7c119b --- /dev/null +++ b/mq_ntf.c @@ -0,0 +1,380 @@ +#include "mq_ntf.h" + +mq_ntf_mdt *mq_ntf_create(int id) +{ + int fret; + mq_ntf_mdt *ret = NULL; + + ret = malloc(sizeof(mq_ntf_mdt)); + if (ret == NULL) + { + PERM(); + return NULL; + } + + memset(ret, 0, sizeof(mq_ntf_mdt)); + + fret = mq_ntf_open(ret, id); + if (fret == -1) + { + PERM(); + return NULL; + } + + return ret; +} + +/* +return -1 on error +*/ +int mq_ntf_open(mq_ntf_mdt *mq, int id) +{ + const int name_size = 32; + char name[name_size]; + + if (!mq) + { + PERM(); + return -1; + } + mq->id = id; + snprintf(name, name_size, MQ_PREFIX"%d-in",id); + /*create with default configs*/ + mq->mq_in = mq_open(name, + O_RDWR | O_CREAT, 0666, NULL); + if (mq->mq_in == -1) + { + ENL(); + perror("mq_open"); + return -1; + } + snprintf(name, name_size, MQ_PREFIX"%d-out",id); + /*create with default configs*/ + mq->mq_out = mq_open(name, + O_RDWR | O_CREAT, 0666, NULL); + if (mq->mq_out == -1) + { + perror("mq_open"); + return -1; + } + + PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out); + + return 0; +} + +/* return number if there is something to read +RETURN: + -1 some error + 0x00 nothing to read + 0x01 something in queue + +*/ +//if mplx havent done, everything could blow up +int mq_ntf_select(mq_ntf_mdt *mq, int dir) +{ + int fdnum, fd; + int ret=0; + fd_set io_fds; + struct timeval tv; + + if (mq == NULL) + { + PERM(); + return -1; + } + + //we should allways have just 2 descriptors to listen + //io_fds = mq->io_mplx.io_fds; + if (dir == MQ_IN) + { + fd = mq->mq_in; + } else if (dir == MQ_OUT) + { + fd = mq->mq_out; + } else + { + ENL(); + } + //PRINT("fd=%d\n", fd); + //PNL(); + FD_ZERO(&io_fds); + FD_SET(fd, &io_fds); + tv.tv_sec = 3; + tv.tv_usec = 0; + //PNL(); + //first argument magic numbder + fdnum = select(fd+1, + &io_fds, + NULL, + NULL, + &tv ); + + //PNL(); + if (fdnum == -1) + { + //ENL(); + return -1; + } else if (fdnum == 0) { + //ENL(); + return 0; + } else { + //ENL(); + if (FD_ISSET(fd, &io_fds)) + { + //PNL(); + return 1; + } + } + //PNL(); + + return ret; +} + +int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size) +{ + struct mq_attr attr; + size_t bytes; + mqd_t mqd; + + unsigned int prio = 10; + + if (dir == MQ_IN) + { + mqd = mq->mq_in; + } else if (dir == MQ_OUT) + { + mqd = mq->mq_out; + } else + { + perror("Wrong direction"); + } + + mq_getattr(mqd, &attr); + if (attr.mq_maxmsg == attr.mq_curmsgs) + { + printf("queue %d out full\n", mq->id); + return -1; + } + PNL(); + /*bytes = mq_receive(mqd, buf, + size > attr.mq_msgsize ? attr.mq_msgsize : size, + &prio);*/ + bytes = mq_receive(mqd, buf, size, &prio); + PNL(); + if (bytes == -1) + { + perror("mq_receive read out"); + return -1; + } + + return bytes; +} + +int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size) +{ + struct mq_attr attr; + size_t bytes; + mqd_t mqd; + + unsigned int prio = 10; + + if (dir == MQ_IN) + { + mqd = mq->mq_in; + } else if (dir == MQ_OUT) + { + mqd = mq->mq_out; + } else + { + perror("Wrong direction"); + } + + mq_getattr(mqd, &attr); + if (attr.mq_maxmsg == attr.mq_curmsgs) + { + printf("queue %d out full\n", mq->id); + return -1; + } + PNL(); + /*bytes = mq_send(mqd, buf, + size > attr.mq_msgsize ? attr.mq_msgsize : size, + prio);*/ + bytes = mq_send(mqd, buf, size, prio); + PNL(); + if (bytes == -1) + { + perror("mq_send"); + return -1; + } + + return bytes; +} + +//dir - dirction 1 in 2 out +int mq_ntf_count(mq_ntf_mdt *mq, int dir) +{ + struct mq_attr attr; + int mq_fd; + + if (mq == NULL) + { + PERM(); + return -1; + } + + if (dir == 1) + { + mq_fd = mq->mq_in; + } else + { + mq_fd = mq->mq_out; + } + + if (mq_getattr(mq_fd, &attr) != -1) + { + return attr.mq_curmsgs; + } + + return -1; +} + + +/* close queue */ +int mq_ntf_close(mq_ntf_mdt *mq) +{ + const int name_size = 32; + char name[name_size]; + int id; + + id = mq->id; + snprintf(name, name_size, MQ_PREFIX"%d-in",id); + mq_unlink(name); + + snprintf(name, name_size, MQ_PREFIX"%d-out",id); + mq_unlink(name); + + //clean file descriptors for io_mplx + + return 0; +} + +//send command +int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd) +{ + ENL(); + return 0; +} + +//recieve command from other end +int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd) +{ + ENL(); + return 0; +} + +int mq_drain_q(mqd_t mqd) +{ + struct mq_attr attr; + char *buf = NULL; + unsigned int prio = 0; + int cnt_msg = 0; + int num_read = 0; + int i = 0; + + if (mq_getattr(mqd, &attr) == -1) + { + ENL(); + return -1; + } + + buf = malloc(attr.mq_msgsize); + if (buf == NULL) + { + ENL(); + return -1; + } + + for (i=0; imq_in); + if (err < 0) + { + PERM(); + return -1; + } + printf("Drained in %d messages\n", err); + + err = mq_drain_q(mq->mq_out); + if (err < 0) + { + PERM(); + return -1; + } + printf("Drained out %d messages\n", err); + + return 0; +} + + + +int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr) +{ + struct mq_attr gattr; + mqd_t mqdt; + + if (mq == NULL) + { + PERM(); + return -1; + } + + if (dir == MQ_OUT) + { + mqdt = mq->mq_out; + } else if (dir == MQ_IN) + { + mqdt = mq->mq_in; + } else + { + return -1; + } + + if (mq_getattr(mqdt, &gattr) == -1) + { + ENL(); + return -1; + } + + memcpy(*attr,&gattr,sizeof(struct mq_attr)); + + return 0; +} + +void mq_ntf_free(mq_ntf_mdt *mq) +{ + if (mq == NULL) + { + + } +} \ No newline at end of file diff --git a/mq_ntf.h b/mq_ntf.h deleted file mode 120000 index 6a419fe..0000000 --- a/mq_ntf.h +++ /dev/null @@ -1 +0,0 @@ -lib/mq_ntf/mq_ntf.h \ No newline at end of file diff --git a/mq_ntf.h b/mq_ntf.h new file mode 100644 index 0000000..2a5ea9f --- /dev/null +++ b/mq_ntf.h @@ -0,0 +1,52 @@ +#ifndef __MQ_NTF_H +#define __MQ_NTF_H + +#include +#include +#include +#include + +#include "mq_cmd.h" +#include "debug.h" + + +//async io multiplexer +typedef struct mq_ntf_io_mplx +{ + struct timeval tv; + fd_set io_fds; +} mq_ntf_io_mplx; + +#define MQ_OUT 1 +#define MQ_IN 2 + +/* +supposed format agni-[id]-[in/out] +in is for input of thread (send to in and thread will recieve), +out if for output of thread (read from out to recieve from thread) +*/ +#define MQ_PREFIX "/agni-" +typedef struct mq_ntf_mdt +{ + int id; + mqd_t mq_in; + mqd_t mq_out; + //mq_ntf_io_mplx io_mplx; + //_Atomic int used; nado? +} mq_ntf_mdt; + +mq_ntf_mdt *mq_ntf_create(int id); +int mq_ntf_open(mq_ntf_mdt *mq, int id); +int mq_ntf_select(mq_ntf_mdt *mq, int dir); +int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size); +int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size); +int mq_ntf_count(mq_ntf_mdt *mq, int dir); +int mq_ntf_close(mq_ntf_mdt *mq); +int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd); +int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd); +int mq_drain_q(mqd_t mqd); +int mq_ntf_drain(mq_ntf_mdt *mq); +int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr); +void mq_ntf_free(mq_ntf_mdt *mq); + +#endif \ No newline at end of file -- cgit v1.2.3