aboutsummaryrefslogtreecommitdiffstats
path: root/agni.c
diff options
context:
space:
mode:
authorZoRo <dos21h@gmail.com>2016-12-15 19:17:14 +0000
committerZoRo <dos21h@gmail.com>2016-12-15 19:17:14 +0000
commita588aa017512d3cc70dde6627d1218020e755259 (patch)
treea070cd171d18f3efbaedb7cfa0f9d54e2bb3b362 /agni.c
downloadagni-a588aa017512d3cc70dde6627d1218020e755259.tar.gz
agni-a588aa017512d3cc70dde6627d1218020e755259.zip
Initial commit
Diffstat (limited to 'agni.c')
-rw-r--r--agni.c512
1 files changed, 512 insertions, 0 deletions
diff --git a/agni.c b/agni.c
new file mode 100644
index 0000000..5705543
--- /dev/null
+++ b/agni.c
@@ -0,0 +1,512 @@
+#define _GNU_SOURCE
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <sys/syscall.h>
+#include <sched.h>
+#include <stdatomic.h>
+
+#include <sys/stat.h>
+#include <sched.h>
+#include <mqueue.h>
+
+#include "config_servers.h"
+
+#include "darray.h"
+#include "buf.h"
+#include "mq_cmd.h"
+
+/*
+no defence programming, no error checking, no argument checking just PoC
+nothing else
+*/
+
+#define MQ_MSG_SIZE 8192
+
+/*
+return fd!=ifconnection there
+*/
+int irc_connect( char *hostname, char *port )
+{
+ int fd=0;
+
+ struct addrinfo serv, *res;
+
+ memset(&serv, 0, sizeof(serv));
+ serv.ai_family = AF_INET;
+ serv.ai_socktype = SOCK_STREAM;
+ getaddrinfo(hostname, port, &serv, &res);
+ fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ connect(fd, res->ai_addr, res->ai_addrlen);
+
+ return fd;
+}
+
+char *create_mq_cmd(char *out, size_t out_sz, int id, char *cmd, size_t cmd_sz, char *payload, size_t p_sz)
+{
+ char *ret = out;
+ int n,i;
+
+ n = snprintf(out, out_sz, ":%d:%s ", id, cmd);
+ if (n < 0)
+ {
+ return NULL;
+ }
+ for (i=0;i<p_sz,n<out_sz-1;i++,n++)
+ {
+ out[n] = payload[i];
+ }
+ out[n++]=0x0;
+ return ret;
+}
+
+/*
+supposed format agni-[id]-[in/out]
+in is for input of thread (send to in and thread will recieve),
+out if for output of thread (read from out to recieve from thread)
+*/
+#define MQ_PREFIX "/agni-"
+typedef struct mq_ntf_mdt
+{
+ int id;
+ mqd_t mq_in;
+ mqd_t mq_out;
+ //_Atomic int used; nado?
+} mq_ntf_mdt;
+
+/*
+return -1 on error
+*/
+int mq_ntf_open(mq_ntf_mdt *mq, int id)
+{
+ const int name_size = 32;
+ char name[name_size];
+
+ if (!mq)
+ {
+ return -1;
+ }
+ mq->id = id;
+ snprintf(name, name_size, MQ_PREFIX"%d-in",id);
+ /*create with default configs*/
+ mq->mq_in = mq_open(name,
+ O_RDWR | O_CREAT, 0666, NULL);
+ if (mq->mq_in == -1)
+ {
+ perror("mq_open");
+ return -1;
+ }
+ snprintf(name, name_size, MQ_PREFIX"%d-out",id);
+ /*create with default configs*/
+ mq->mq_out = mq_open(name,
+ O_RDWR | O_CREAT, 0666, NULL);
+ if (mq->mq_out == -1)
+ {
+ perror("mq_open");
+ return -1;
+ }
+ return 0;
+}
+
+
+/* return number if there is something to read */
+int mq_ntf_select(mq_ntf_mdt *mq)
+{
+
+ return 0;
+}
+
+
+/* read from input queue */
+int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size)
+{
+ struct mq_attr attr;
+ char *msg;
+ size_t bytes;
+ unsigned int prio = 10;
+ int fret;
+ fret = mq_getattr(mq->mq_in, &attr);
+ if (attr.mq_maxmsg == attr.mq_curmsgs)
+ {
+ printf("queue %d in full", mq->id);
+ return -1;
+ }
+ bytes = mq_receive(mq->mq_in, buf,
+ size > attr.mq_msgsize ? attr.mq_msgsize : size,
+ &prio);
+ if (bytes == -1)
+ {
+ perror("mq_receive read in");
+ return -1;
+ }
+ return 0;
+}
+
+/* read from output queue */
+int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size)
+{
+ struct mq_attr attr;
+ char *msg;
+ size_t bytes;
+ unsigned int prio = 10;
+ int fret;
+ fret = mq_getattr(mq->mq_out, &attr);
+ if (attr.mq_maxmsg == attr.mq_curmsgs)
+ {
+ printf("queue %d out full", mq->id);
+ return -1;
+ }
+ bytes = mq_receive(mq->mq_out, buf,
+ size > attr.mq_msgsize ? attr.mq_msgsize : size,
+ &prio);
+ if (bytes == -1)
+ {
+ perror("mq_receive read out");
+ return -1;
+ }
+ return 0;
+}
+
+
+/* write to output quque */
+int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size)
+{
+ struct mq_attr attr;
+ char *msg;
+ size_t bytes;
+ unsigned int prio = 10;
+ int fret;
+ fret = mq_getattr(mq->mq_out, &attr);
+ if (attr.mq_maxmsg == attr.mq_curmsgs)
+ {
+ printf("queue %d out full", mq->id);
+ return -1;
+ }
+ bytes = mq_send(mq->mq_out, buf,
+ size > attr.mq_msgsize ? attr.mq_msgsize : size,
+ prio);
+ if (bytes == -1)
+ {
+ perror("mq_send");
+ return -1;
+ }
+ return 0;
+}
+
+/* write to input quque */
+int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size)
+{
+ struct mq_attr attr;
+ char *msg;
+ size_t bytes;
+ unsigned int prio = 10;
+ int fret;
+ fret = mq_getattr(mq->mq_in, &attr);
+ if (attr.mq_maxmsg == attr.mq_curmsgs)
+ {
+ printf("queue %d in full", mq->id);
+ return -1;
+ }
+ bytes = mq_send(mq->mq_in, buf,
+ size > attr.mq_msgsize ? attr.mq_msgsize : size,
+ prio);
+ if (bytes == -1)
+ {
+ perror("mq_send");
+ return -1;
+ }
+ return 0;
+}
+
+/* drain all messages from quque */
+int mq_ntf_drain(mq_ntf_mdt *mq)
+{
+ int i,j;
+ struct mq_attr curattr;
+ char *buf;
+ ssize_t bytes;
+ int prio = 10;
+ mqd_t mqlist[2] = {mq->mq_in, mq->mq_out};
+
+ for (i=0;i<2;i++)
+ {
+ mq_getattr(mqlist[i], &curattr);
+ buf = malloc(curattr.mq_maxmsg);
+ for (j=0; j<curattr.mq_curmsgs; j++)
+ {
+ bytes = mq_receive(mqlist[i], buf, curattr.mq_maxmsg, &prio);
+ if (bytes == -1)
+ {
+ perror("mq_receive drain");
+ }
+ printf("Drain %d bytes\n", bytes);
+ }
+ free(buf);
+ }
+
+ return 0;
+}
+
+/* check if there is space in quque */
+int mq_ntf_full(mq_ntf_mdt *mq)
+{
+
+ return 0;
+}
+
+
+/* close queue */
+int mq_ntf_close(mq_ntf_mdt *mq)
+{
+ const int name_size = 32;
+ char name[name_size];
+ int id;
+
+ id = mq->id;
+ snprintf(name, name_size, MQ_PREFIX"%d-in",id);
+ mq_unlink(name);
+
+ snprintf(name, name_size, MQ_PREFIX"%d-out",id);
+ mq_unlink(name);
+
+ return 0;
+}
+
+//send command
+int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd)
+{
+
+ return 0;
+}
+
+//recieve command from other end
+int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd)
+{
+ return 0;
+}
+
+/*
+
+|--------|<--IN---|---------|
+| SERVER | | MANAGER |
+|--------|--OUT-->|---------|
+
+*/
+#define STACK_SIZE (1024*16)
+/* not supposed to be changed. */
+typedef struct server_cfg
+{
+ /* thread params */
+ int tid;
+ void *stack;
+ //atomic_int running;
+ _Atomic int running;
+ mq_ntf_mdt *mq;
+ /* irc server config */
+ char *user;
+ char *password;
+ char *server;
+ char **channels;
+ int port;
+ int ssl;
+} server_cfg;
+
+
+/* server_cfg struct as input */
+#define TH_STATE_INIT 0
+#define TH_STATE_START 1
+#define TH_STATE_LISTEN_IN 2
+#define TH_STATE_LISTEN_OUT 3
+#define TH_STATE_SEND_IN 4
+#define TH_STATE_SEND_OUT 5
+#define TH_STATE_TERMINATION 6
+#define TH_STATE_EXIT 7
+
+int th_start_client(void *data)
+{
+
+ int cmd_id = 1;
+ int ret = 0;
+ char cmd_buf[MQ_MSG_SIZE];
+ mq_cmd *cmd=NULL;
+
+ server_cfg *cfg = data;
+ mq_ntf_mdt *mq = cfg->mq;
+ atomic_fetch_add(&cfg->running,1);
+ printf("Start client\n");
+ printf("Server %d\n",cfg->tid);
+ sleep(10);
+
+ //send command wait for response
+
+ cmd = CMD_CREATE(cmd_id,"PING","NOPARAM");
+ if (cmd != NULL)
+ {
+ if (ret = mq_ntf_write_out(mq,cmd->buf,cmd->sz) == -1)
+ {
+ printf("Couldnt send command\n");
+ }
+
+ }
+
+ cmd_id += 1;
+
+
+ printf("End client\n");
+ atomic_fetch_sub( &cfg->running,1);
+
+ return 0;
+}
+
+
+#define EVENT_HND_STACK_SIZE (16*1024)
+typedef struct event_handler_cfg {
+ /* thread params*/
+ void *stack;
+ mq_ntf_mdt *mq_listen;
+ int mq_num;
+ _Atomic int running;
+
+} event_handler_cfg;
+
+/* server_cfg struct as input */
+#define EH_STATE_INIT 0
+#define EH_STATE_START 1
+#define EH_STATE_LISTEN_IN 2
+#define EH_STATE_LISTEN_OUT 3
+#define EH_STATE_SEND_IN 4
+#define EH_STATE_SEND_OUT 5
+#define EH_STATE_TERMINATION 6
+#define EH_STATE_EXIT 7
+
+/* Thread to reacieve messages and return them back */
+int th_event_manager(void *data)
+{
+ int state;
+ int i;
+ event_handler_cfg *cfg = data;
+ atomic_fetch_add(&cfg->running,1);
+ const int buf_size=MQ_MSG_SIZE;
+ char buf[buf_size];
+ int ret;
+ //read any command
+ mq_cmd *cmd = NULL;
+
+
+
+ printf("Start event thread\n");
+ state = EH_STATE_INIT;
+ while (state != EH_STATE_EXIT)
+ switch (state)
+ {
+ case EH_STATE_INIT:
+ printf("TH STATE INIT\n");
+ sleep(1);
+ state = TH_STATE_START;
+ break;
+ case EH_STATE_START:
+ printf("TH STATE START\n");
+ sleep(1);
+ state = TH_STATE_EXIT;
+ break;
+ case EH_STATE_EXIT:
+ printf("TH STATE EXIT\n");
+ sleep(1);
+ state = TH_STATE_EXIT;
+ break;
+ default:
+ printf("Wrong state\n");
+ }
+ printf("End event thread\n");
+
+ atomic_fetch_sub( &cfg->running,1);
+ return 0;
+}
+
+int main(int argc, char **argv)
+{
+ int i;
+
+ int cnt_servers,cnt_running;
+ server_cfg *cfg_list;
+ event_handler_cfg *evhnd_cfg;
+ mq_ntf_mdt *mq_array;
+
+ /* Load configuration */
+ cnt_servers = SIZEOF_SERVER_LIST;
+ for (i=0;i<SIZEOF_SERVER_LIST;i++)
+ {
+ printf("Load config for server %s\n",server_list[i].server);
+ }
+
+ cfg_list = malloc(sizeof(server_cfg)*cnt_servers);
+ mq_array = malloc(sizeof(mq_ntf_mdt)*cnt_servers);
+
+ /* For each configuration create listener */
+
+ for (i=0;i<SIZEOF_SERVER_LIST;i++)
+ {
+ /*initialise server config*/
+ server_cfg *srvc = cfg_list+i;
+ irc_server_conf *isrvc = &server_list[i];
+ srvc->tid = i;
+ srvc->stack = malloc(STACK_SIZE); //NULL will brake everything ;)
+ srvc->user = isrvc->user;
+ srvc->password = isrvc->password;
+ srvc->server = isrvc->server;
+ srvc->channels = isrvc->channels;
+ srvc->port = isrvc->port;
+ srvc->ssl = isrvc->ssl;
+
+ //atomic_init( &srvc->running, 1);
+ atomic_store(&srvc->running, 0);
+
+ /* initalise posix mq */
+ if (0 != mq_ntf_open(&mq_array[i], i))
+ {
+ printf("Couldnt open mq_ntf_open\n");
+ }
+ srvc->mq = &mq_array[i];
+
+ /* clone new proc */
+ clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc);
+ }
+
+ /* event handler thread */
+ evhnd_cfg = malloc(sizeof(event_handler_cfg));
+ memset(evhnd_cfg, 0, sizeof(event_handler_cfg));
+ evhnd_cfg->stack = malloc(EVENT_HND_STACK_SIZE);
+ atomic_store(&evhnd_cfg->running, 0);
+ evhnd_cfg->mq_num = cnt_servers;
+ evhnd_cfg->mq_listen = mq_array;
+ clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg);
+
+ /* run until all threads are up */
+ cnt_running = 1;
+ while(cnt_running != 0)
+ {
+ //printf("Count\n");
+ /*count how many proceses is running there*/
+ int val;
+ cnt_running = 0;
+ for (i=0;i<cnt_servers;i++)
+ {
+ val = atomic_load(&cfg_list[i].running);
+ if (val != 0)
+ cnt_running += 1;
+ }
+ printf("cnt_running %d\n",cnt_running);
+ sleep(1);
+ }
+
+ free(cfg_list);
+
+ return 0;
+} \ No newline at end of file