summaryrefslogtreecommitdiff
path: root/agni.c
diff options
context:
space:
mode:
Diffstat (limited to 'agni.c')
-rw-r--r--agni.c101
1 files changed, 37 insertions, 64 deletions
diff --git a/agni.c b/agni.c
index 342e21d..79b8441 100644
--- a/agni.c
+++ b/agni.c
@@ -56,6 +56,8 @@ typedef struct mq_ntf_io_mplx
fd_set io_fds;
} mq_ntf_io_mplx;
+#define MQ_OUT 1
+#define MQ_IN 2
/*
supposed format agni-[id]-[in/out]
@@ -198,47 +200,32 @@ int mq_ntf_select(mq_ntf_mdt *mq)
return ret;
}
-
-/* read from input queue */
-int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size)
+int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size)
{
struct mq_attr attr;
size_t bytes;
+ mqd_t mqd;
+
unsigned int prio = 10;
- int fret=0;
-
- fret = mq_getattr(mq->mq_in, &attr);
- if (attr.mq_maxmsg == attr.mq_curmsgs)
+
+ if (dir == MQ_IN)
{
- printf("queue %d in full", mq->id);
- return -1;
- }
- bytes = mq_receive(mq->mq_in, buf,
- size > attr.mq_msgsize ? attr.mq_msgsize : size,
- &prio);
- if (bytes == -1)
+ mqd = mq->mq_in;
+ } else if (dir == MQ_OUT)
{
- perror("mq_receive read in");
- return -1;
+ mqd = mq->mq_out;
+ } else
+ {
+ perror("Wrong direction");
}
- return bytes;
-}
-/* read from output queue */
-int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
-{
- struct mq_attr attr;
- //char *msg;
- size_t bytes;
- unsigned int prio = 10;
- int fret;
- fret = mq_getattr(mq->mq_out, &attr);
+ mq_getattr(mqd, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
printf("queue %d out full\n", mq->id);
return -1;
}
- bytes = mq_receive(mq->mq_out, buf,
+ bytes = mq_receive(mqd, buf,
size > attr.mq_msgsize ? attr.mq_msgsize : size,
&prio);
if (bytes == -1)
@@ -246,50 +233,36 @@ int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
perror("mq_receive read out");
return -1;
}
+
return bytes;
}
-
-/* write to output quque */
-int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size)
+int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size)
{
struct mq_attr attr;
- //char *msg;
size_t bytes;
+ mqd_t mqd;
+
unsigned int prio = 10;
- int fret=0;
- fret = mq_getattr(mq->mq_out, &attr);
- if (attr.mq_maxmsg == attr.mq_curmsgs)
+
+ if (dir == MQ_IN)
{
- printf("queue %d out full", mq->id);
- return -1;
- }
- bytes = mq_send(mq->mq_out, buf,
- size > attr.mq_msgsize ? attr.mq_msgsize : size,
- prio);
- if (bytes == -1)
+ mqd = mq->mq_in;
+ } else if (dir == MQ_OUT)
{
- perror("mq_send");
- return -1;
+ mqd = mq->mq_out;
+ } else
+ {
+ perror("Wrong direction");
}
- return bytes;
-}
-/* write to input quque */
-int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size)
-{
- struct mq_attr attr;
- //char *msg;
- size_t bytes;
- unsigned int prio = 10;
- int fret=0;
- fret = mq_getattr(mq->mq_in, &attr);
+ mq_getattr(mqd, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
- printf("queue %d in full", mq->id);
+ printf("queue %d out full\n", mq->id);
return -1;
}
- bytes = mq_send(mq->mq_in, buf,
+ bytes = mq_send(mqd, buf,
size > attr.mq_msgsize ? attr.mq_msgsize : size,
prio);
if (bytes == -1)
@@ -297,6 +270,7 @@ int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size)
perror("mq_send");
return -1;
}
+
return bytes;
}
@@ -424,8 +398,7 @@ int mq_ntf_drain(mq_ntf_mdt *mq)
return 0;
}
-#define MQ_OUT 1
-#define MQ_IN 2
+
int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr)
{
@@ -482,10 +455,10 @@ int send_mq_cmd(mq_ntf_mdt *mq,
err = -1;
if (io == MQ_OUT)
{
- err = mq_ntf_write_out(mq, cmd_send->buf, cmd_send->sz);
+ err = mq_ntf_write(mq, MQ_OUT, cmd_send->buf, cmd_send->sz);
} else if (io == MQ_IN)
{
- err = mq_ntf_write_in(mq, cmd_send->buf, cmd_send->sz);
+ err = mq_ntf_write(mq, MQ_IN, cmd_send->buf, cmd_send->sz);
} else
{
printf("Unknown direction");
@@ -523,10 +496,10 @@ int recv_mq_cmd(mq_ntf_mdt *mq,
err = -1;
if (io == MQ_OUT)
{
- err = mq_ntf_read_out(mq, in_buf, MQ_MSG_SIZE);
+ err = mq_ntf_read(mq, MQ_OUT, in_buf, MQ_MSG_SIZE);
} else if (io == MQ_IN)
{
- err = mq_ntf_read_in(mq, in_buf, MQ_MSG_SIZE);
+ err = mq_ntf_read(mq, MQ_IN, in_buf, MQ_MSG_SIZE);
} else
{
printf("Unknown direction");
@@ -707,7 +680,7 @@ int th_event_manager(void *data)
case 1:
break;
case 2:
- if (mq_ntf_read_out(mq, buf, out_attr.mq_msgsize) == -1)
+ if (mq_ntf_read(mq, MQ_OUT, buf, out_attr.mq_msgsize) == -1)
{
printf("Cant read message\n");
} else