summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--agni.c105
1 files changed, 86 insertions, 19 deletions
diff --git a/agni.c b/agni.c
index a879049..6830893 100644
--- a/agni.c
+++ b/agni.c
@@ -233,6 +233,8 @@ int th_start_client(void *data)
rpc_response *resp=NULL;
netbyte_store *nb_req, *nb_resp;
+ nb_resp = malloc(sizeof(netbyte_store));
+
atomic_fetch_add(&cfg->running,1);
printf("Start client\n");
printf("Server %d\n",cfg->tid);
@@ -348,6 +350,7 @@ int th_start_client(void *data)
//recieve commands from queue
if (mq_event == 1)
{
+ /*
recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1);
if (recv_cmd != NULL)
{
@@ -430,6 +433,61 @@ int th_start_client(void *data)
}
mq_cmd_free(recv_cmd);
recv_cmd = NULL;
+ */
+
+ nb_init(nb_resp);
+ nb_load(nb_resp, in_buf);
+ rpc_resp_unmarsh(nb_resp, &resp);
+ PRINT("RESP: %d-%s(%s)\n", resp->id, resp->result, resp->error);
+
+ tble_cmd_resp *match_resp = malloc(sizeof(tble_cmd_resp));
+ memset(match_resp, 0, sizeof(tble_cmd_resp));
+ if (match_resp != NULL)
+ {
+ match_resp->qid = resp->id;
+ match_resp->resp = alloc_new_str(resp->result);
+
+ int cmd_id = tbl_qcmd_resp(qtbl, match_resp);
+ if (cmd_id > 0)
+ {
+ //BUG HERE!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!.\.,\<\/\/\//
+ printf("cmd_id %d\n", cmd_id);
+ tble_qcmd *cmd = tbl_qcmd_match_id(qtbl, cmd_id);
+ if (cmd != NULL)
+ {
+ char *sep = strchr(cmd->ircident,'!'); //BUGGY
+ char *user = alloc_new_str(cmd->ircident); //BUGGY
+ user[sep-cmd->ircident] = 0x0;
+ char *resp_user = token_new_str(cmd->tok, 2);
+ PRINT("resp_user %s\n",resp_user);
+
+ if (resp_user[0]=='#')
+ {
+ //printf("RESPONSE :%s PRIVMSG %s :%s\n", cfg->user, user, resp->result);
+ int fret = snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :%s \r\n", cfg->user, resp_user, resp->result);
+ PRINT("%s\n",cmd_buf);
+ write(conn.conn_fd, cmd_buf, fret);
+ } else
+ {
+ //printf("RESPONSE :%s PRIVMSG %s :%s\n", user, resp_user, resp->result);
+ int fret = snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :%s \r\n", resp_user, user, resp->result);
+ PRINT("%s\n",cmd_buf);
+ write(conn.conn_fd, cmd_buf, fret);
+ }
+
+ FREE(user);
+ FREE(resp_user);
+
+ //remove command by id
+ //for now just remove command without checkings its state
+ tbl_qcmd_del_by_id(qtbl, cmd_id);
+ }
+ }
+ }
+
+
+ //nb_print(nb_resp);
+ //nb_free(nb_resp);
}
@@ -492,6 +550,8 @@ int th_start_client(void *data)
char *dest_name = token_new_str(itok,2);
char *msg = token_new_str(itok,3); //could send without first symbol pal
+ PRINT("dest_name %s\n",dest_name);
+
char *sep = strchr(uname,'!');
uname[sep-uname] = 0x0;
@@ -542,25 +602,16 @@ int th_start_client(void *data)
tbl_qcmd_add_tok(qcmd, itok);
//HOW? add commands that are in execution list, dont process all messages
tbl_qcmd_add(qtbl, qcmd);
- //tbl_qcmd_print_tbl(qtbl,TBL_PF_QCMD_ID
- // |TBL_PF_QCMD_CID
- // |TBL_PF_QCMD_STATE
- // |TBL_PF_QCMD_CMD
- // |TBL_PF_QCMD_IRCIDENT
- // |TBL_PF_QCMD_PARAM
- // |TBL_PF_QCMD_TID
- // |TBL_PF_QCMD_TIDX);
+ tbl_qcmd_print_tbl(qtbl,TBL_PF_QCMD_ID
+ |TBL_PF_QCMD_CID
+ |TBL_PF_QCMD_STATE
+ |TBL_PF_QCMD_CMD
+ |TBL_PF_QCMD_IRCIDENT
+ |TBL_PF_QCMD_PARAM
+ |TBL_PF_QCMD_TID
+ |TBL_PF_QCMD_TIDX);
}
- //create command and send to mq
- //PRINT("MSG %s\n",msg);
- //mq_cmd *privcmd = mq_cmd_create(qcmd->cid,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg));
- //char *bufPtr;
- //bufPtr = mq_cmd_buf(privcmd);
- ////PRINT("%s\n",bufPtr);
- //PRINT("write %d bytes\n",mq_ntf_write(mq, MQ_OUT, bufPtr, strlen(bufPtr)));
- //mq_cmd_free(privcmd);
-
req = rpc_req_new(qcmd->cmd, qcmd->param, qcmd->cid);
rpc_req_marsh(req, &nb_req);
buf_nb = nb_create(nb_req);
@@ -677,6 +728,7 @@ int th_event_manager(void *data)
mq_ntf_mdt *mq=NULL,*mq_cur=NULL;
int i_mq=0; //to iter trought mqueue
char *out_buf = NULL;
+ char *in_buf = NULL;
int run;
int mq_event;
@@ -685,6 +737,7 @@ int th_event_manager(void *data)
//read any command
mq_cmd *recv_cmd = NULL; // for recieved cmd from mq
struct mq_attr out_attr, *ptr_out_attr=&out_attr;
+ struct mq_attr in_attr, *ptr_in_attr=&in_attr;
//execution and command table generation
//tble_exec *ecmd = NULL;
@@ -732,23 +785,29 @@ int th_event_manager(void *data)
printf("Start event thread\n");
mq = cfg->mq_listen;
- //get mq attributes
+ //get mq attributes, FIX out/in everytime could be different pal
if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1)
{
printf("Cant get attribute\n");
}
+ if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1)
+ {
+ printf("Cant get attribute\n");
+ }
//////////////////////////////////////////////////////////////////////////////
//nbrpc prepare variables
rpc_request *req=NULL;
rpc_response *resp=NULL;
netbyte_store *nb_req=NULL, *nb_resp=NULL;
+ char *buf_nb = NULL;
nb_resp = malloc(sizeof(netbyte_store)); //not freed
nb_req = malloc(sizeof(netbyte_store)); //not freed
out_buf = malloc(out_attr.mq_msgsize);
//maybe its not null
+ in_buf = malloc(in_attr.mq_msgsize);
printf("Start event loop\n");
run = 1;
@@ -826,9 +885,17 @@ int th_event_manager(void *data)
printf("Ret message %s\n",ret_msg);
//send response
-
+ resp = rpc_resp_new(ret_msg,"None",req->id);
+ rpc_resp_marsh(resp, &nb_resp);
+ buf_nb = nb_create(nb_resp);
+ mq_ntf_write(mq_cur, MQ_IN, buf_nb, nb_resp->size > in_attr.mq_msgsize ? in_attr.mq_msgsize : nb_resp->size);
+ FREE(buf_nb);
+ printf("EVENT-Write %d bytes\n", nb_resp->size > in_attr.mq_msgsize ? in_attr.mq_msgsize : nb_resp->size);
+
FREE(ret_msg);
+ nb_free(nb_resp);
+ rpc_resp_free(resp);
}
} else
{