#include "mq_ntf.h"
mq_ntf_mdt *mq_ntf_create(int id)
{
int fret;
mq_ntf_mdt *ret = NULL;
ret = malloc(sizeof(mq_ntf_mdt));
if (ret == NULL)
{
PERM();
return NULL;
}
memset(ret, 0, sizeof(mq_ntf_mdt));
fret = mq_ntf_open(ret, id);
if (fret == -1)
{
PERM();
return NULL;
}
return ret;
}
/*
return -1 on error
*/
int mq_ntf_open(mq_ntf_mdt *mq, int id)
{
const int name_size = 32;
char name[name_size];
if (!mq)
{
PERM();
return -1;
}
mq->id = id;
snprintf(name, name_size, MQ_PREFIX"%d-in",id);
/*create with default configs*/
mq->mq_in = mq_open(name,
O_RDWR | O_CREAT, 0666, NULL);
if (mq->mq_in == -1)
{
ENL();
perror("mq_open");
return -1;
}
snprintf(name, name_size, MQ_PREFIX"%d-out",id);
/*create with default configs*/
mq->mq_out = mq_open(name,
O_RDWR | O_CREAT, 0666, NULL);
if (mq->mq_out == -1)
{
perror("mq_open");
return -1;
}
PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out);
return 0;
}
/* return number if there is something to read
RETURN:
-1 some error
0x00 nothing to read
0x01 something in queue
*/
//if mplx havent done, everything could blow up
int mq_ntf_select(mq_ntf_mdt *mq, int dir)
{
int fdnum, fd;
int ret=0;
fd_set io_fds;
struct timeval tv;
if (mq == NULL)
{
PERM();
return -1;
}
//we should allways have just 2 descriptors to listen
//io_fds = mq->io_mplx.io_fds;
if (dir == MQ_IN)
{
fd = mq->mq_in;
} else if (dir == MQ_OUT)
{
fd = mq->mq_out;
} else
{
ENL();
}
//PRINT("fd=%d\n", fd);
//PNL();
FD_ZERO(&io_fds);
FD_SET(fd, &io_fds);
tv.tv_sec = 0;
tv.tv_usec = 100000;
//PNL();
//first argument magic numbder
fdnum = select(fd+1,
&io_fds,
NULL,
NULL,
&tv );
//PNL();
if (fdnum == -1)
{
//ENL();
return -1;
} else if (fdnum == 0) {
//ENL();
return 0;
} else {
//ENL();
if (FD_ISSET(fd, &io_fds))
{
//PNL();
return 1;
}
}
//PNL();
return ret;
}
int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size)
{
struct mq_attr attr;
size_t bytes;
mqd_t mqd;
unsigned int prio = 10;
if (dir == MQ_IN)
{
mqd = mq->mq_in;
} else if (dir == MQ_OUT)
{
mqd = mq->mq_out;
} else
{
perror("Wrong direction");
}
mq_getattr(mqd, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
printf("queue %d out full\n", mq->id);
return -1;
}
//PNL();
/*bytes = mq_receive(mqd, buf,
size > attr.mq_msgsize ? attr.mq_msgsize : size,
&prio);*/
bytes = mq_receive(mqd, buf, size, &prio);
//PNL();
if (bytes == -1)
{
perror("mq_receive read out");
return -1;
}
return bytes;
}
int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size)
{
struct mq_attr attr;
size_t bytes;
mqd_t mqd;
unsigned int prio = 10;
if (dir == MQ_IN)
{
mqd = mq->mq_in;
} else if (dir == MQ_OUT)
{
mqd = mq->mq_out;
} else
{
perror("Wrong direction");
}
mq_getattr(mqd, &attr);
if (attr.mq_maxmsg == attr.mq_curmsgs)
{
printf("queue %d out full\n", mq->id);
return -1;
}
//PNL();
/*bytes = mq_send(mqd, buf,
size > attr.mq_msgsize ? attr.mq_msgsize : size,
prio);*/
bytes = mq_send(mqd, buf, size, prio);
//PNL();
if (bytes == -1)
{
perror("mq_send");
return -1;
}
return bytes;
}
//dir - dirction 1 in 2 out
int mq_ntf_count(mq_ntf_mdt *mq, int dir)
{
struct mq_attr attr;
int mq_fd;
if (mq == NULL)
{
PERM();
return -1;
}
if (dir == 1)
{
mq_fd = mq->mq_in;
} else
{
mq_fd = mq->mq_out;
}
if (mq_getattr(mq_fd, &attr) != -1)
{
return attr.mq_curmsgs;
}
return -1;
}
/* close queue */
int mq_ntf_close(mq_ntf_mdt *mq)
{
const int name_size = 32;
char name[name_size];
int id;
id = mq->id;
snprintf(name, name_size, MQ_PREFIX"%d-in",id);
mq_unlink(name);
snprintf(name, name_size, MQ_PREFIX"%d-out",id);
mq_unlink(name);
//clean file descriptors for io_mplx
return 0;
}
int mq_drain_q(mqd_t mqd)
{
struct mq_attr attr;
char *buf = NULL;
unsigned int prio = 0;
int cnt_msg = 0;
int num_read = 0;
int i = 0;
if (mq_getattr(mqd, &attr) == -1)
{
ENL();
return -1;
}
buf = malloc(attr.mq_msgsize);
if (buf == NULL)
{
ENL();
return -1;
}
for (i=0; i<attr.mq_curmsgs; i++)
{
num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio);
if (num_read == -1)
{
ENL();
free(buf);
return -1;
}
cnt_msg += 1;
}
free(buf);
return cnt_msg;
}
//drain all mq buffers to zero
//to use at begining and clear all buffers to zero
int mq_ntf_drain(mq_ntf_mdt *mq)
{
int err;
err = mq_drain_q(mq->mq_in);
if (err < 0)
{
PERM();
return -1;
}
printf("Drained in %d messages\n", err);
err = mq_drain_q(mq->mq_out);
if (err < 0)
{
PERM();
return -1;
}
printf("Drained out %d messages\n", err);
return 0;
}
int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr)
{
struct mq_attr gattr;
mqd_t mqdt;
if (mq == NULL)
{
PERM();
return -1;
}
if (dir == MQ_OUT)
{
mqdt = mq->mq_out;
} else if (dir == MQ_IN)
{
mqdt = mq->mq_in;
} else
{
return -1;
}
if (mq_getattr(mqdt, &gattr) == -1)
{
ENL();
return -1;
}
memcpy(*attr,&gattr,sizeof(struct mq_attr));
return 0;
}
void mq_ntf_free(mq_ntf_mdt *mq)
{
if (mq == NULL)
{
}
}