aboutsummaryrefslogblamecommitdiffstats
path: root/mq_ntf.c
blob: 7f2953aba14973486bb3efa794c240b1c71d26bd (plain) (tree)





































































































                                                             

                            





















































                                                                
                



                                                                
                

































                                                                       
                



                                                                
                
























































                                                        








































































































                                                                        
#include "mq_ntf.h"

mq_ntf_mdt *mq_ntf_create(int id)
{
	int fret;
	mq_ntf_mdt *ret = NULL;
	
	ret = malloc(sizeof(mq_ntf_mdt));
	if (ret == NULL)
	{
		PERM();
		return NULL;
	}

	memset(ret, 0, sizeof(mq_ntf_mdt));

	fret = mq_ntf_open(ret, id);
	if (fret == -1)
	{
		PERM();
		return NULL;
	}

	return ret;
}

/*
return -1 on error
*/
int mq_ntf_open(mq_ntf_mdt *mq, int id)
{
	const int name_size = 32;
	char name[name_size];

	if (!mq)
	{
		PERM();
		return -1;
	}
	mq->id = id;
	snprintf(name, name_size, MQ_PREFIX"%d-in",id);
	/*create with default configs*/
	mq->mq_in = mq_open(name,
		O_RDWR | O_CREAT, 0666, NULL);
	if (mq->mq_in == -1)
	{
		ENL();
		perror("mq_open");
		return -1;
	}
	snprintf(name, name_size, MQ_PREFIX"%d-out",id);
	/*create with default configs*/
	mq->mq_out = mq_open(name,
		O_RDWR | O_CREAT, 0666, NULL);
	if (mq->mq_out == -1)
	{
		perror("mq_open");
		return -1;
	}

	PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out);

	return 0;
}

/* return number if there is something to read
RETURN:
	-1        some error
	0x00      nothing to read
	0x01      something in queue

*/
//if mplx havent done, everything could blow up
int mq_ntf_select(mq_ntf_mdt *mq, int dir)
{
	int fdnum, fd;
	int ret=0;
	fd_set io_fds;
	struct timeval tv;

	if (mq == NULL)
	{
		PERM();
		return -1;
	}

	//we should allways have just 2 descriptors to listen
	//io_fds = mq->io_mplx.io_fds;
	if (dir == MQ_IN)
	{
		fd = mq->mq_in;
	} else if (dir == MQ_OUT)
	{
		fd = mq->mq_out;
	} else
	{
		ENL();
	}
	//PRINT("fd=%d\n", fd);
	//PNL();
	FD_ZERO(&io_fds);
	FD_SET(fd, &io_fds);
	tv.tv_sec = 0;
	tv.tv_usec = 100000;
	//PNL();
	//first argument magic numbder
	fdnum = select(fd+1,
		&io_fds,
		NULL,
		NULL,
		&tv );

	//PNL();
	if (fdnum == -1)
	{
		//ENL();
		return -1;
	} else if (fdnum == 0) {
		//ENL();
		return 0;
	} else {
		//ENL();
		if (FD_ISSET(fd, &io_fds))
		{
			//PNL();
			return 1;
		}
	}
	//PNL();

	return ret;
}

int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size)
{
	struct mq_attr attr;
	size_t bytes;
	mqd_t mqd;

	unsigned int prio = 10;

	if (dir == MQ_IN)
	{
		mqd = mq->mq_in;
	} else if (dir == MQ_OUT)
	{
		mqd = mq->mq_out;
	} else
	{
		perror("Wrong direction");
	}

	mq_getattr(mqd, &attr);
	if (attr.mq_maxmsg == attr.mq_curmsgs)
	{
		printf("queue %d out full\n", mq->id);
		return -1;
	}
	//PNL();
	/*bytes = mq_receive(mqd, buf,
		size > attr.mq_msgsize ? attr.mq_msgsize : size,
		&prio);*/
	bytes = mq_receive(mqd, buf, size, &prio);
	//PNL();
	if (bytes == -1)
	{
		perror("mq_receive read out");
		return -1;
	}

	return bytes;
}

int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size)
{
	struct mq_attr attr;
	size_t bytes;
	mqd_t mqd;

	unsigned int prio = 10;

	if (dir == MQ_IN)
	{
		mqd = mq->mq_in;
	} else if (dir == MQ_OUT)
	{
		mqd = mq->mq_out;
	} else
	{
		perror("Wrong direction");
	}

	mq_getattr(mqd, &attr);
	if (attr.mq_maxmsg == attr.mq_curmsgs)
	{
		printf("queue %d out full\n", mq->id);
		return -1;
	}
	//PNL();
	/*bytes = mq_send(mqd, buf,
		size > attr.mq_msgsize ? attr.mq_msgsize : size,
		prio);*/
	bytes = mq_send(mqd, buf, size, prio);
	//PNL();
	if (bytes == -1)
	{
		perror("mq_send");
		return -1;
	}

	return bytes;
}

//dir - dirction 1 in 2 out
int mq_ntf_count(mq_ntf_mdt *mq, int dir)
{
	struct mq_attr attr;
	int mq_fd;

	if (mq == NULL)
	{
		PERM();
		return -1;
	}

	if (dir == 1)
	{
		mq_fd = mq->mq_in;
	} else
	{
		mq_fd = mq->mq_out;
	}

	if (mq_getattr(mq_fd, &attr) != -1)
	{
		return attr.mq_curmsgs;
	}

	return -1;
}


/* close queue */
int mq_ntf_close(mq_ntf_mdt *mq)
{
	const int name_size = 32;
	char name[name_size];
	int id;

	id = mq->id;
	snprintf(name, name_size, MQ_PREFIX"%d-in",id);
	mq_unlink(name);

	snprintf(name, name_size, MQ_PREFIX"%d-out",id);
	mq_unlink(name);

	//clean file descriptors for io_mplx

	return 0;
}

int mq_drain_q(mqd_t mqd)
{
	struct mq_attr attr;
	char *buf = NULL;
	unsigned int prio = 0;
	int cnt_msg = 0;
	int num_read = 0;
	int i = 0;

	if (mq_getattr(mqd, &attr) == -1)
	{
		ENL();
		return -1;
	}

	buf = malloc(attr.mq_msgsize);
	if (buf == NULL)
	{
		ENL();
		return -1;
	}

	for (i=0; i<attr.mq_curmsgs; i++)
	{
		num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio);
		if (num_read == -1)
		{
			ENL();
			free(buf);
			return -1;
		}
		cnt_msg += 1;
	}

	free(buf);
	return cnt_msg;
}

//drain all mq buffers to zero
//to use at begining and clear all buffers to zero
int mq_ntf_drain(mq_ntf_mdt *mq)
{
	int err;

	err = mq_drain_q(mq->mq_in);
	if (err < 0)
	{
		PERM();
		return -1;
	}
	printf("Drained in %d messages\n", err);

	err = mq_drain_q(mq->mq_out);
	if (err < 0)
	{
		PERM();
		return -1;
	}
	printf("Drained out %d messages\n", err);

	return 0;
}



int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr)
{
	struct mq_attr gattr;
	mqd_t mqdt;

	if (mq == NULL)
	{
		PERM();
		return -1;
	}

	if (dir == MQ_OUT)
	{
		mqdt = mq->mq_out;
	} else if (dir == MQ_IN)
	{
		mqdt = mq->mq_in;
	} else
	{
		return -1;
	}

	if (mq_getattr(mqdt, &gattr) == -1)
	{
		ENL();
		return -1;
	}

	memcpy(*attr,&gattr,sizeof(struct mq_attr));

	return 0;
}

void mq_ntf_free(mq_ntf_mdt *mq)
{
	if (mq == NULL)
	{

	}
}