diff options
author | ZoRo <dos21h@gmail.com> | 2016-12-15 19:17:14 +0000 |
---|---|---|
committer | ZoRo <dos21h@gmail.com> | 2016-12-15 19:17:14 +0000 |
commit | a588aa017512d3cc70dde6627d1218020e755259 (patch) | |
tree | a070cd171d18f3efbaedb7cfa0f9d54e2bb3b362 /agni.c | |
download | agni-a588aa017512d3cc70dde6627d1218020e755259.tar.gz agni-a588aa017512d3cc70dde6627d1218020e755259.zip |
Initial commit
Diffstat (limited to 'agni.c')
-rw-r--r-- | agni.c | 512 |
1 files changed, 512 insertions, 0 deletions
@@ -0,0 +1,512 @@ +#define _GNU_SOURCE + +#include <ctype.h> +#include <errno.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> +#include <netdb.h> +#include <sys/syscall.h> +#include <sched.h> +#include <stdatomic.h> + +#include <sys/stat.h> +#include <sched.h> +#include <mqueue.h> + +#include "config_servers.h" + +#include "darray.h" +#include "buf.h" +#include "mq_cmd.h" + +/* +no defence programming, no error checking, no argument checking just PoC +nothing else +*/ + +#define MQ_MSG_SIZE 8192 + +/* +return fd!=ifconnection there +*/ +int irc_connect( char *hostname, char *port ) +{ + int fd=0; + + struct addrinfo serv, *res; + + memset(&serv, 0, sizeof(serv)); + serv.ai_family = AF_INET; + serv.ai_socktype = SOCK_STREAM; + getaddrinfo(hostname, port, &serv, &res); + fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + connect(fd, res->ai_addr, res->ai_addrlen); + + return fd; +} + +char *create_mq_cmd(char *out, size_t out_sz, int id, char *cmd, size_t cmd_sz, char *payload, size_t p_sz) +{ + char *ret = out; + int n,i; + + n = snprintf(out, out_sz, ":%d:%s ", id, cmd); + if (n < 0) + { + return NULL; + } + for (i=0;i<p_sz,n<out_sz-1;i++,n++) + { + out[n] = payload[i]; + } + out[n++]=0x0; + return ret; +} + +/* +supposed format agni-[id]-[in/out] +in is for input of thread (send to in and thread will recieve), +out if for output of thread (read from out to recieve from thread) +*/ +#define MQ_PREFIX "/agni-" +typedef struct mq_ntf_mdt +{ + int id; + mqd_t mq_in; + mqd_t mq_out; + //_Atomic int used; nado? +} mq_ntf_mdt; + +/* +return -1 on error +*/ +int mq_ntf_open(mq_ntf_mdt *mq, int id) +{ + const int name_size = 32; + char name[name_size]; + + if (!mq) + { + return -1; + } + mq->id = id; + snprintf(name, name_size, MQ_PREFIX"%d-in",id); + /*create with default configs*/ + mq->mq_in = mq_open(name, + O_RDWR | O_CREAT, 0666, NULL); + if (mq->mq_in == -1) + { + perror("mq_open"); + return -1; + } + snprintf(name, name_size, MQ_PREFIX"%d-out",id); + /*create with default configs*/ + mq->mq_out = mq_open(name, + O_RDWR | O_CREAT, 0666, NULL); + if (mq->mq_out == -1) + { + perror("mq_open"); + return -1; + } + return 0; +} + + +/* return number if there is something to read */ +int mq_ntf_select(mq_ntf_mdt *mq) +{ + + return 0; +} + + +/* read from input queue */ +int mq_ntf_read_in(mq_ntf_mdt *mq, char *buf, size_t size) +{ + struct mq_attr attr; + char *msg; + size_t bytes; + unsigned int prio = 10; + int fret; + fret = mq_getattr(mq->mq_in, &attr); + if (attr.mq_maxmsg == attr.mq_curmsgs) + { + printf("queue %d in full", mq->id); + return -1; + } + bytes = mq_receive(mq->mq_in, buf, + size > attr.mq_msgsize ? attr.mq_msgsize : size, + &prio); + if (bytes == -1) + { + perror("mq_receive read in"); + return -1; + } + return 0; +} + +/* read from output queue */ +int mq_ntf_read_out(mq_ntf_mdt *mq, char *buf, size_t size) +{ + struct mq_attr attr; + char *msg; + size_t bytes; + unsigned int prio = 10; + int fret; + fret = mq_getattr(mq->mq_out, &attr); + if (attr.mq_maxmsg == attr.mq_curmsgs) + { + printf("queue %d out full", mq->id); + return -1; + } + bytes = mq_receive(mq->mq_out, buf, + size > attr.mq_msgsize ? attr.mq_msgsize : size, + &prio); + if (bytes == -1) + { + perror("mq_receive read out"); + return -1; + } + return 0; +} + + +/* write to output quque */ +int mq_ntf_write_out(mq_ntf_mdt *mq, const char *buf, size_t size) +{ + struct mq_attr attr; + char *msg; + size_t bytes; + unsigned int prio = 10; + int fret; + fret = mq_getattr(mq->mq_out, &attr); + if (attr.mq_maxmsg == attr.mq_curmsgs) + { + printf("queue %d out full", mq->id); + return -1; + } + bytes = mq_send(mq->mq_out, buf, + size > attr.mq_msgsize ? attr.mq_msgsize : size, + prio); + if (bytes == -1) + { + perror("mq_send"); + return -1; + } + return 0; +} + +/* write to input quque */ +int mq_ntf_write_in(mq_ntf_mdt *mq, const char *buf, size_t size) +{ + struct mq_attr attr; + char *msg; + size_t bytes; + unsigned int prio = 10; + int fret; + fret = mq_getattr(mq->mq_in, &attr); + if (attr.mq_maxmsg == attr.mq_curmsgs) + { + printf("queue %d in full", mq->id); + return -1; + } + bytes = mq_send(mq->mq_in, buf, + size > attr.mq_msgsize ? attr.mq_msgsize : size, + prio); + if (bytes == -1) + { + perror("mq_send"); + return -1; + } + return 0; +} + +/* drain all messages from quque */ +int mq_ntf_drain(mq_ntf_mdt *mq) +{ + int i,j; + struct mq_attr curattr; + char *buf; + ssize_t bytes; + int prio = 10; + mqd_t mqlist[2] = {mq->mq_in, mq->mq_out}; + + for (i=0;i<2;i++) + { + mq_getattr(mqlist[i], &curattr); + buf = malloc(curattr.mq_maxmsg); + for (j=0; j<curattr.mq_curmsgs; j++) + { + bytes = mq_receive(mqlist[i], buf, curattr.mq_maxmsg, &prio); + if (bytes == -1) + { + perror("mq_receive drain"); + } + printf("Drain %d bytes\n", bytes); + } + free(buf); + } + + return 0; +} + +/* check if there is space in quque */ +int mq_ntf_full(mq_ntf_mdt *mq) +{ + + return 0; +} + + +/* close queue */ +int mq_ntf_close(mq_ntf_mdt *mq) +{ + const int name_size = 32; + char name[name_size]; + int id; + + id = mq->id; + snprintf(name, name_size, MQ_PREFIX"%d-in",id); + mq_unlink(name); + + snprintf(name, name_size, MQ_PREFIX"%d-out",id); + mq_unlink(name); + + return 0; +} + +//send command +int mq_ntf_cmd_send(mq_ntf_mdt *mq, mq_cmd *cmd) +{ + + return 0; +} + +//recieve command from other end +int mq_ntf_cmd_recv(mq_ntf_mdt *mq, mq_cmd *cmd) +{ + return 0; +} + +/* + +|--------|<--IN---|---------| +| SERVER | | MANAGER | +|--------|--OUT-->|---------| + +*/ +#define STACK_SIZE (1024*16) +/* not supposed to be changed. */ +typedef struct server_cfg +{ + /* thread params */ + int tid; + void *stack; + //atomic_int running; + _Atomic int running; + mq_ntf_mdt *mq; + /* irc server config */ + char *user; + char *password; + char *server; + char **channels; + int port; + int ssl; +} server_cfg; + + +/* server_cfg struct as input */ +#define TH_STATE_INIT 0 +#define TH_STATE_START 1 +#define TH_STATE_LISTEN_IN 2 +#define TH_STATE_LISTEN_OUT 3 +#define TH_STATE_SEND_IN 4 +#define TH_STATE_SEND_OUT 5 +#define TH_STATE_TERMINATION 6 +#define TH_STATE_EXIT 7 + +int th_start_client(void *data) +{ + + int cmd_id = 1; + int ret = 0; + char cmd_buf[MQ_MSG_SIZE]; + mq_cmd *cmd=NULL; + + server_cfg *cfg = data; + mq_ntf_mdt *mq = cfg->mq; + atomic_fetch_add(&cfg->running,1); + printf("Start client\n"); + printf("Server %d\n",cfg->tid); + sleep(10); + + //send command wait for response + + cmd = CMD_CREATE(cmd_id,"PING","NOPARAM"); + if (cmd != NULL) + { + if (ret = mq_ntf_write_out(mq,cmd->buf,cmd->sz) == -1) + { + printf("Couldnt send command\n"); + } + + } + + cmd_id += 1; + + + printf("End client\n"); + atomic_fetch_sub( &cfg->running,1); + + return 0; +} + + +#define EVENT_HND_STACK_SIZE (16*1024) +typedef struct event_handler_cfg { + /* thread params*/ + void *stack; + mq_ntf_mdt *mq_listen; + int mq_num; + _Atomic int running; + +} event_handler_cfg; + +/* server_cfg struct as input */ +#define EH_STATE_INIT 0 +#define EH_STATE_START 1 +#define EH_STATE_LISTEN_IN 2 +#define EH_STATE_LISTEN_OUT 3 +#define EH_STATE_SEND_IN 4 +#define EH_STATE_SEND_OUT 5 +#define EH_STATE_TERMINATION 6 +#define EH_STATE_EXIT 7 + +/* Thread to reacieve messages and return them back */ +int th_event_manager(void *data) +{ + int state; + int i; + event_handler_cfg *cfg = data; + atomic_fetch_add(&cfg->running,1); + const int buf_size=MQ_MSG_SIZE; + char buf[buf_size]; + int ret; + //read any command + mq_cmd *cmd = NULL; + + + + printf("Start event thread\n"); + state = EH_STATE_INIT; + while (state != EH_STATE_EXIT) + switch (state) + { + case EH_STATE_INIT: + printf("TH STATE INIT\n"); + sleep(1); + state = TH_STATE_START; + break; + case EH_STATE_START: + printf("TH STATE START\n"); + sleep(1); + state = TH_STATE_EXIT; + break; + case EH_STATE_EXIT: + printf("TH STATE EXIT\n"); + sleep(1); + state = TH_STATE_EXIT; + break; + default: + printf("Wrong state\n"); + } + printf("End event thread\n"); + + atomic_fetch_sub( &cfg->running,1); + return 0; +} + +int main(int argc, char **argv) +{ + int i; + + int cnt_servers,cnt_running; + server_cfg *cfg_list; + event_handler_cfg *evhnd_cfg; + mq_ntf_mdt *mq_array; + + /* Load configuration */ + cnt_servers = SIZEOF_SERVER_LIST; + for (i=0;i<SIZEOF_SERVER_LIST;i++) + { + printf("Load config for server %s\n",server_list[i].server); + } + + cfg_list = malloc(sizeof(server_cfg)*cnt_servers); + mq_array = malloc(sizeof(mq_ntf_mdt)*cnt_servers); + + /* For each configuration create listener */ + + for (i=0;i<SIZEOF_SERVER_LIST;i++) + { + /*initialise server config*/ + server_cfg *srvc = cfg_list+i; + irc_server_conf *isrvc = &server_list[i]; + srvc->tid = i; + srvc->stack = malloc(STACK_SIZE); //NULL will brake everything ;) + srvc->user = isrvc->user; + srvc->password = isrvc->password; + srvc->server = isrvc->server; + srvc->channels = isrvc->channels; + srvc->port = isrvc->port; + srvc->ssl = isrvc->ssl; + + //atomic_init( &srvc->running, 1); + atomic_store(&srvc->running, 0); + + /* initalise posix mq */ + if (0 != mq_ntf_open(&mq_array[i], i)) + { + printf("Couldnt open mq_ntf_open\n"); + } + srvc->mq = &mq_array[i]; + + /* clone new proc */ + clone(th_start_client, srvc->stack+STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)srvc); + } + + /* event handler thread */ + evhnd_cfg = malloc(sizeof(event_handler_cfg)); + memset(evhnd_cfg, 0, sizeof(event_handler_cfg)); + evhnd_cfg->stack = malloc(EVENT_HND_STACK_SIZE); + atomic_store(&evhnd_cfg->running, 0); + evhnd_cfg->mq_num = cnt_servers; + evhnd_cfg->mq_listen = mq_array; + clone(th_event_manager, evhnd_cfg->stack+EVENT_HND_STACK_SIZE, CLONE_VM|CLONE_FILES, (void *)evhnd_cfg); + + /* run until all threads are up */ + cnt_running = 1; + while(cnt_running != 0) + { + //printf("Count\n"); + /*count how many proceses is running there*/ + int val; + cnt_running = 0; + for (i=0;i<cnt_servers;i++) + { + val = atomic_load(&cfg_list[i].running); + if (val != 0) + cnt_running += 1; + } + printf("cnt_running %d\n",cnt_running); + sleep(1); + } + + free(cfg_list); + + return 0; +}
\ No newline at end of file |