aboutsummaryrefslogtreecommitdiffstats
path: root/agni.c
diff options
context:
space:
mode:
authorFreeArtMan <dos21h@gmail.com>2017-03-05 22:44:01 +0000
committerFreeArtMan <dos21h@gmail.com>2017-03-05 22:44:01 +0000
commitb04690e9015d3263b2ee2bb42dd7a9891fc4d89b (patch)
tree6e31dbecceaf91797a77669aaeb5f7c0f10a3738 /agni.c
parentaec990a7c389903f079bb5361319ba8af5e9fc98 (diff)
downloadagni-b04690e9015d3263b2ee2bb42dd7a9891fc4d89b.tar.gz
agni-b04690e9015d3263b2ee2bb42dd7a9891fc4d89b.zip
Removed mq_ntf from main code. Commented out some messages
Diffstat (limited to 'agni.c')
-rw-r--r--agni.c557
1 files changed, 165 insertions, 392 deletions
diff --git a/agni.c b/agni.c
index e1cf0d5..8dc5291 100644
--- a/agni.c
+++ b/agni.c
@@ -22,6 +22,7 @@
#include "darray.h"
#include "buf.h"
#include "mq_cmd.h"
+#include "mq_ntf.h"
#include "tbl_qcmd.h"
#include "util.h"
#include "sock_conn.h"
@@ -37,408 +38,106 @@ nothing else
//HACK
extern int __irc_buf_drain_io(irc_buf *ib);
-/*
-return unique ID, with atomic counter should work in all cases
-*/
-_Atomic int _glbl_id=0;
-int uniq_id()
+void *cmd_pong(void *data)
{
- int ret=-1,id;
+ char *param = (char *)data;
+ char *ret = NULL;
- //what possible could go wrong?
- // sry emilsp this is not important for now
- id = atomic_load(&_glbl_id);
- ret = id;
- id += 1;
- atomic_store(&_glbl_id, id);
- return ret;
-
-}
-
-//async io multiplexer
-typedef struct mq_ntf_io_mplx
-{
- struct timeval tv;
- fd_set io_fds;
-} mq_ntf_io_mplx;
-
-#define MQ_OUT 1
-#define MQ_IN 2
-
-
-/*
-supposed format agni-[id]-[in/out]
-in is for input of thread (send to in and thread will recieve),
-out if for output of thread (read from out to recieve from thread)
-*/
-#define MQ_PREFIX "/agni-"
-typedef struct mq_ntf_mdt
-{
- int id;
- mqd_t mq_in;
- mqd_t mq_out;
- mq_ntf_io_mplx io_mplx;
- //_Atomic int used; nado?
-} mq_ntf_mdt;
-
-
-/*
-return -1 on error
-*/
-int mq_ntf_open(mq_ntf_mdt *mq, int id)
-{
- const int name_size = 32;
- char name[name_size];
-
- if (!mq)
+ if (param == NULL)
{
- PERM();
- return -1;
- }
- mq->id = id;
- snprintf(name, name_size, MQ_PREFIX"%d-in",id);
- /*create with default configs*/
- mq->mq_in = mq_open(name,
- O_RDWR | O_CREAT, 0666, NULL);
- if (mq->mq_in == -1)
- {
- ENL();
- perror("mq_open");
- return -1;
- }
- snprintf(name, name_size, MQ_PREFIX"%d-out",id);
- /*create with default configs*/
- mq->mq_out = mq_open(name,
- O_RDWR | O_CREAT, 0666, NULL);
- if (mq->mq_out == -1)
- {
- perror("mq_open");
- return -1;
+ return NULL;
}
- PRINT("opened = %d %d\n", mq->mq_in, mq->mq_out);
+ printf("PONG\n");
+
+ ret = alloc_new_str("PONG");
- return 0;
+ return ret;
}
-int mq_ntf_mplx(mq_ntf_mdt *mq, int msec)
+/*
+Wrap call back, to manage how going be executed callback.
+Manage input and output params for callback.
+INPUT(string)->OUTPUT(string)
+*/
+tble_cmd_resp* cllbk_wrapper( void *(*call)(void *), tble_cmd_param *param)
{
- fd_set io_fds;
+ void *data_out = NULL;
+ tble_cmd_resp *resp = NULL;
+ char *data=NULL;
- if (mq == NULL)
+ if (call == NULL)
{
PERM();
- return -1;
+ return NULL;
}
- if (msec < 0)
+ if (param == NULL)
{
PERM();
- return -1;
+ return NULL;
}
- //mq->io_mplx.tv.tv_sec = msec/1000;
- //mq->io_mplx.tv.tv_usec = (msec%1000)*1000;
- mq->io_mplx.tv.tv_sec = 1;
- mq->io_mplx.tv.tv_usec = 0;
- PRINT("mplx = %d %d\n", mq->mq_in, mq->mq_out);
- FD_ZERO(&io_fds);
- FD_SET(mq->mq_in, &io_fds);//in first
- FD_SET(mq->mq_out, &io_fds);//out second
- mq->io_mplx.io_fds = io_fds;
-
- return 0;
-}
-
-/* return number if there is something to read
-RETURN:
- -1 some error
- 0x00 nothing to read
- 0x01 something in queue
-
-*/
-//if mplx havent done, everything could blow up
-int mq_ntf_select(mq_ntf_mdt *mq, int dir)
-{
- int fdnum, fd;
- int ret=0;
- fd_set io_fds;
- struct timeval tv;
-
- if (mq == NULL)
+ //prepare response
+ resp = tbl_cmd_resp_c(param);
+ if (resp == NULL)
{
PERM();
- return -1;
}
+ tbl_cmd_resp_print(resp);
+ data = alloc_new_str(param->param);
- //we should allways have just 2 descriptors to listen
- //io_fds = mq->io_mplx.io_fds;
- if (dir == MQ_IN)
- {
- fd = mq->mq_in;
- } else if (dir == MQ_OUT)
- {
- fd = mq->mq_out;
- }
- FD_ZERO(&io_fds);
- FD_SET(fd, &io_fds);
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- fdnum = select(1,
- &io_fds,
- NULL,
- NULL,
- &tv );
-
- if (fdnum == -1)
+ //call callback
+ data_out = call(data);
+
+ //set result to returned response
+ if (data_out == NULL)
{
- ENL();
- return -1;
- } else {
- if (FD_ISSET(fd, &mq->io_mplx.io_fds))
+ //response doesnt have any output
+ if (resp)
{
- ret = 1;
+ //response dont have result
+ resp->ret_code = TBL_RSP_NORESP;
+ resp->resp = NULL;
}
- }
-
- return ret;
-}
-
-int mq_ntf_read(mq_ntf_mdt *mq, int dir, char *buf, size_t size)
-{
- struct mq_attr attr;
- size_t bytes;
- mqd_t mqd;
-
- unsigned int prio = 10;
-
- if (dir == MQ_IN)
- {
- mqd = mq->mq_in;
- } else if (dir == MQ_OUT)
- {
- mqd = mq->mq_out;
- } else
- {
- perror("Wrong direction");
- }
-
- mq_getattr(mqd, &attr);
- if (attr.mq_maxmsg == attr.mq_curmsgs)
- {
- printf("queue %d out full\n", mq->id);
- return -1;
- }
- bytes = mq_receive(mqd, buf,
- size > attr.mq_msgsize ? attr.mq_msgsize : size,
- &prio);
- if (bytes == -1)
- {
- perror("mq_receive read out");
- return -1;
- }
-
- return bytes;
-}
-
-int mq_ntf_write(mq_ntf_mdt *mq, int dir, const char *buf, size_t size)
-{
- struct mq_attr attr;
- size_t bytes;
- mqd_t mqd;
-
- unsigned int prio = 10;
-
- if (dir == MQ_IN)
- {
- mqd = mq->mq_in;
- } else if (dir == MQ_OUT)
- {
- mqd = mq->mq_out;
- } else
- {
- perror("Wrong direction");
- }
-
- mq_getattr(mqd, &attr);
- if (attr.mq_maxmsg == attr.mq_curmsgs)
- {
- printf("queue %d out full\n", mq->id);
- return -1;
- }
- bytes = mq_send(mqd, buf,
- size > attr.mq_msgsize ? attr.mq_msgsize : size,
- prio);
- if (bytes == -1)
- {
- perror("mq_send");
- return -1;
- }
-
- return bytes;
-}
-
-//dir - dirction 1 in 2 out
-int mq_ntf_count(mq_ntf_mdt *mq, int dir)
-{
- struct mq_attr attr;
- int mq_fd;
-
- if (mq == NULL)
- {
- PERM();
- return -1;
- }
-
- if (dir == 1)
- {
- mq_fd = mq->mq_in;
} else
{
- mq_fd = mq->mq_out;
- }
-
- if (mq_getattr(mq_fd, &attr) != -1)
- {
- return attr.mq_curmsgs;
- }
-
- return -1;
-}
-
-
-/* close queue */
-int mq_ntf_close(mq_ntf_mdt *mq)
-{
- const int name_size = 32;
- char name[name_size];
- int id;
-
- id = mq->id;
- snprintf(name, name_size, MQ_PREFIX"%d-in",id);
- mq_unlink(name);
-
- snprintf(name, name_size, MQ_PREFIX"%d-out",id);
- mq_unlink(name);
-
- //clean file descriptors for io_mplx
-
- return 0;
-}
-
-//send command
-int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd)
-{
-
- return 0;
-}
-
-//recieve command from other end
-int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd)
-{
- return 0;
-}
-
-int mq_drain_q(mqd_t mqd)
-{
- struct mq_attr attr;
- char *buf = NULL;
- unsigned int prio = 0;
- int cnt_msg = 0;
- int num_read = 0;
- int i = 0;
-
- if (mq_getattr(mqd, &attr) == -1)
- {
- ENL();
- return -1;
- }
-
- buf = malloc(attr.mq_msgsize);
- if (buf == NULL)
- {
- ENL();
- return -1;
- }
-
- for (i=0; i<attr.mq_curmsgs; i++)
- {
- num_read = mq_receive(mqd, buf, attr.mq_msgsize, &prio);
- if (num_read == -1)
+ if (resp)
{
- ENL();
- free(buf);
- return -1;
+ //set succesfull repsonse
+ resp->ret_code = TBL_RSP_OK;
+ resp->resp = alloc_new_str(data_out);
}
- cnt_msg += 1;
}
- free(buf);
- return cnt_msg;
-}
-
-//drain all mq buffers to zero
-//to use at begining and clear all buffers to zero
-int mq_ntf_drain(mq_ntf_mdt *mq)
-{
- int err;
-
- err = mq_drain_q(mq->mq_in);
- if (err < 0)
- {
- PERM();
- return -1;
- }
- printf("Drained in %d messages\n", err);
-
- err = mq_drain_q(mq->mq_out);
- if (err < 0)
- {
- PERM();
- return -1;
- }
- printf("Drained out %d messages\n", err);
+ //dangerouse place =P
+ free(data_out);
+ free(data);
- return 0;
+ return resp;
}
-
-
-int mq_ntf_getattr(mq_ntf_mdt *mq, int dir, struct mq_attr **attr)
+/*
+return unique ID, with atomic counter should work in all cases
+*/
+_Atomic int _glbl_id=0;
+int uniq_id()
{
- struct mq_attr gattr;
- mqd_t mqdt;
-
- if (mq == NULL)
- {
- PERM();
- return -1;
- }
-
- if (dir == MQ_OUT)
- {
- mqdt = mq->mq_out;
- } else if (dir == MQ_IN)
- {
- mqdt = mq->mq_in;
- } else
- {
- return -1;
- }
-
- if (mq_getattr(mqdt, &gattr) == -1)
- {
- ENL();
- return -1;
- }
+ int ret=-1,id;
- memcpy(*attr,&gattr,sizeof(struct mq_attr));
+ //what possible could go wrong?
+ // sry emilsp this is not important for now
+ id = atomic_load(&_glbl_id);
+ ret = id;
+ id += 1;
+ atomic_store(&_glbl_id, id);
+ return ret;
- return 0;
}
+
int send_mq_cmd(mq_ntf_mdt *mq,
int io,
int cmd_id,
@@ -568,7 +267,9 @@ typedef struct server_cfg
int ssl;
} server_cfg;
-
+/*******************************************************************************
+server thread
+*******************************************************************************/
/* server_cfg struct as input */
#define TH_STATE_INIT 0
#define TH_STATE_START 1
@@ -607,16 +308,18 @@ int th_start_client(void *data)
irc_token *itok = NULL;
char *irc_line = NULL;
+
+
atomic_fetch_add(&cfg->running,1);
printf("Start client\n");
printf("Server %d\n",cfg->tid);
sleep(1);
- PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
+ //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
//PNL();
- PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
+ //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
//conn = irc_connect(cfg->server, cfg->port);
conn = irc_connect("irc.freenode.net", "6667");
//PNL();
@@ -640,17 +343,12 @@ int th_start_client(void *data)
//prepare message queue
mq = cfg->mq;
- err = mq_ntf_mplx(mq, 10000);
- if (err == -1)
- {
- printf("Ups something whent wrong\n");
- }
if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1)
{
printf("Cant get attribute\n");
ENL();
}
- PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
+ //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
out_buf = malloc(out_attr.mq_msgsize);
memset(out_buf, 0, out_attr.mq_msgsize);
if (out_buf == NULL)
@@ -658,7 +356,7 @@ int th_start_client(void *data)
ENL();
}
- PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
+ //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1)
{
printf("Cant get attribute\n");
@@ -671,7 +369,7 @@ int th_start_client(void *data)
ENL();
}
- PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
+ //PRINT("SERVER:%s PORT:%s USER:%s\n", cfg->server, cfg->port, cfg->user);
//create irc parsing structures
memset(conn_buf, 0, TH_CONN_BUF_SZ);
memset(cmd_buf, 0, TH_CONN_BUF_SZ);
@@ -683,8 +381,6 @@ int th_start_client(void *data)
//return 0;
}
-
-
//send command wait for response
printf("Start loop\n");
run = 1;
@@ -696,8 +392,10 @@ int th_start_client(void *data)
switch(mq_event)
{
case 0:
- PRINT("SERVER EVENT 0\n");
- PNL();
+ PRINT("EVENT 0\n");
+ break;
+ case 1:
+ PRINT("MQ_EVENT IN\n");
if (mq_ntf_read(mq, MQ_IN, in_buf, in_attr.mq_msgsize) == -1)
{
PNL();
@@ -710,19 +408,18 @@ int th_start_client(void *data)
}
PNL();
break;
- case 1:
default:
printf("Unknown event type\n");
}
//PNL();
- sleep(1);
+ //sleep(1);
//PNL();
//fast code to exit if QUIT command is recieved
if (mq_event == 1)
{
- PNL();
+ //PNL();
recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1);
if (recv_cmd != NULL)
{
@@ -741,12 +438,12 @@ int th_start_client(void *data)
//PNL();
if((cret = read(conn,conn_buf, 256))>0)
{
- printf("IN>>%s",conn_buf);
+ //printf("IN>>%s",conn_buf);
irc_buf_puts(ib, conn_buf, cret);
//if (irc_buf_ready(ib) == 1)
while (irc_buf_ready(ib) == 1)
{
- PNL();
+ //PNL();
irc_line = irc_buf_line(ib);
if (irc_line)
{
@@ -785,6 +482,8 @@ int th_start_client(void *data)
{
int fret,fret2;
//char send_back[128];
+
+ PNL();
memset(cmd_buf,0,128);
char *uname = token_new_str(itok,0);
@@ -800,6 +499,7 @@ int th_start_client(void *data)
//create command and send to mq
mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg));
char *str_mq = mq_cmd_buf(privcmd);
+ printf("str_mq %s\n",str_mq);
mq_ntf_write(mq, MQ_OUT, str_mq, strlen(str_mq));
//free(str_mq);
mq_cmd_free(privcmd);
@@ -808,6 +508,7 @@ int th_start_client(void *data)
free(msg);
free(uname);
printf("OUT<<Send %d\n", fret);
+
}
//PNL();
token_destroy(itok);
@@ -825,13 +526,14 @@ int th_start_client(void *data)
printf("End client\n");
+ PNL();
atomic_fetch_sub( &cfg->running,1);
return 0;
}
-#define EVENT_HND_STACK_SIZE (16*1024)
+#define EVENT_HND_STACK_SIZE (64*1024)
typedef struct event_handler_cfg {
/* thread params*/
void *stack;
@@ -851,6 +553,11 @@ typedef struct event_handler_cfg {
#define EH_STATE_TERMINATION 6
#define EH_STATE_EXIT 7
+
+/*******************************************************************************
+Event thead. Recieve all events from server thread and pass them for execution.
+Return results from execution to server thread.
+*******************************************************************************/
/* Thread to reacieve messages and return them back */
int th_event_manager(void *data)
{
@@ -861,18 +568,56 @@ int th_event_manager(void *data)
int run;
int err;
int mq_event;
+
//read any command
mq_cmd *recv_cmd = NULL; // for recieved cmd from mq
struct mq_attr out_attr, *ptr_out_attr=&out_attr;
- printf("Start event thread\n");
- mq = cfg->mq_listen;
- err = mq_ntf_mplx(mq, 10000);
- if (err == -1)
+ //execution and command table generation
+ tble_exec *ecmd = NULL;
+ tbl_exec *etbl = NULL;
+ tble_qcmd *qcmd=NULL;
+ tbl_qcmd *qtbl = NULL;
+
+ //create execution table
+ etbl = tbl_exec_list_c(10);
+ if (etbl == NULL)
+ {
+ PERM();
+ return -1;
+ }
+
+ ecmd = tbl_exec_c();
+ if (ecmd == NULL)
+ {
+ PERM();
+ return -1;
+ }
+ ecmd->id = uniq_id();
+ ecmd->name = alloc_new_str("local-executor");
+ ecmd->cmd = alloc_new_str("PING");
+ ecmd->callback = cmd_pong;
+
+ if (-1 == tbl_exec_add(etbl, ecmd))
+ {
+ PERM();
+ return -1;
+ }
+ tbl_exec_print_tbl(etbl, TBL_PF_EXEC_ALL);
+ etbl = tbl_exec_list_c(10);
+ if (etbl == NULL)
{
- printf("Ups something whent wrong\n");
+ PERM();
+ return -1;
}
+ //create command table
+ qtbl = tbl_qcmd_c(10);
+
+ //config mq
+ printf("Start event thread\n");
+ mq = cfg->mq_listen;
+
//get mq attributes
if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1)
{
@@ -886,16 +631,17 @@ int th_event_manager(void *data)
run = 1;
while(run)
{
+ //PNL();
//check if there is some message and save it to buffer
run += 1;
- mq_event = mq_ntf_select(mq,MQ_OUT);
+ mq_event = mq_ntf_select(mq, MQ_OUT);
switch(mq_event)
{
case 0:
PRINT("EVENT 0\n");
break;
case 1:
- PRINT("EVENT MQ_OUT\n");
+ PRINT("MQ_EVENT OUT\n");
if (mq_ntf_read(mq, MQ_OUT, out_buf, out_attr.mq_msgsize) == -1)
{
printf("Cant read output message\n");
@@ -908,17 +654,19 @@ int th_event_manager(void *data)
default:
printf("Unknown event type\n");
}
- sleep(1);
+ //sleep(1);
//if (run == 10)
// break;
+ //PNL();
//if QUIT then quit the thread
if (mq_event == 1)
{
+ //PNL();
recv_cmd = mq_cmd_creates(out_buf, out_attr.mq_msgsize, -1);
if (recv_cmd != NULL)
{
- PNL();
+ //PNL();
if (mq_cmd_o_cmp_cmd(recv_cmd,"QUIT") == 0)
{
printf("QUIT recieved lets quit main loop\n");
@@ -938,6 +686,17 @@ int th_event_manager(void *data)
mq_cmd_free(privcmd);
}
+ if (mq_cmd_o_cmp_cmd(recv_cmd,"PRIVMSG") == 0)
+ {
+ printf("Some private message\n");
+ char msg[] = "PONG";
+ mq_cmd *privcmd = mq_cmd_create(1,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg));
+ char *str_mq = mq_cmd_buf(privcmd);
+ mq_ntf_write(mq, MQ_IN, str_mq, strlen(str_mq));
+ free(str_mq);
+ mq_cmd_free(privcmd);
+ }
+
if (mq_cmd_o_cmp_cmd(recv_cmd,"CMD2") == 0)
{
printf("Hey dude it works second time\n");
@@ -957,12 +716,17 @@ int th_event_manager(void *data)
}
free(out_buf);
+ PNL();
printf("End event thread\n");
atomic_fetch_sub( &cfg->running,1);
return 0;
}
+/*******************************************************************************
+Main code entry code. Init threads. Give basic params to them. And then become
+as a watch dog.
+*******************************************************************************/
int main(int argc, char **argv)
{
int i;
@@ -983,7 +747,15 @@ int main(int argc, char **argv)
}
cfg_list = malloc(sizeof(server_cfg)*cnt_servers);
+ if (cfg_list == NULL)
+ {
+ ENL();
+ }
mq_array = malloc(sizeof(mq_ntf_mdt)*cnt_servers);
+ if (mq_array == NULL)
+ {
+ ENL();
+ }
/* For each configuration create listener */
@@ -1032,6 +804,7 @@ int main(int argc, char **argv)
evhnd_cfg->mq_listen = mq_array;
clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg);
+ PNL();
/* run until all threads are up */
cnt_running = 1;
while(cnt_running != 0)
@@ -1046,7 +819,7 @@ int main(int argc, char **argv)
if (val != 0)
cnt_running += 1;
}
- //PRINT("cnt_running %d\n",cnt_running);
+ PRINT("cnt_running %d\n",cnt_running);
sleep(1);
}