From da722516cb31e523d58ebb4a2f827988e3a1782b Mon Sep 17 00:00:00 2001 From: FreeArtMan Date: Tue, 5 Sep 2017 23:49:34 +0100 Subject: New rpc is working --- agni.c | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 86 insertions(+), 19 deletions(-) (limited to 'agni.c') diff --git a/agni.c b/agni.c index a879049..6830893 100644 --- a/agni.c +++ b/agni.c @@ -233,6 +233,8 @@ int th_start_client(void *data) rpc_response *resp=NULL; netbyte_store *nb_req, *nb_resp; + nb_resp = malloc(sizeof(netbyte_store)); + atomic_fetch_add(&cfg->running,1); printf("Start client\n"); printf("Server %d\n",cfg->tid); @@ -348,6 +350,7 @@ int th_start_client(void *data) //recieve commands from queue if (mq_event == 1) { + /* recv_cmd = mq_cmd_creates(in_buf, in_attr.mq_msgsize, -1); if (recv_cmd != NULL) { @@ -430,6 +433,61 @@ int th_start_client(void *data) } mq_cmd_free(recv_cmd); recv_cmd = NULL; + */ + + nb_init(nb_resp); + nb_load(nb_resp, in_buf); + rpc_resp_unmarsh(nb_resp, &resp); + PRINT("RESP: %d-%s(%s)\n", resp->id, resp->result, resp->error); + + tble_cmd_resp *match_resp = malloc(sizeof(tble_cmd_resp)); + memset(match_resp, 0, sizeof(tble_cmd_resp)); + if (match_resp != NULL) + { + match_resp->qid = resp->id; + match_resp->resp = alloc_new_str(resp->result); + + int cmd_id = tbl_qcmd_resp(qtbl, match_resp); + if (cmd_id > 0) + { + //BUG HERE!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!.\.,\<\/\/\// + printf("cmd_id %d\n", cmd_id); + tble_qcmd *cmd = tbl_qcmd_match_id(qtbl, cmd_id); + if (cmd != NULL) + { + char *sep = strchr(cmd->ircident,'!'); //BUGGY + char *user = alloc_new_str(cmd->ircident); //BUGGY + user[sep-cmd->ircident] = 0x0; + char *resp_user = token_new_str(cmd->tok, 2); + PRINT("resp_user %s\n",resp_user); + + if (resp_user[0]=='#') + { + //printf("RESPONSE :%s PRIVMSG %s :%s\n", cfg->user, user, resp->result); + int fret = snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :%s \r\n", cfg->user, resp_user, resp->result); + PRINT("%s\n",cmd_buf); + write(conn.conn_fd, cmd_buf, fret); + } else + { + //printf("RESPONSE :%s PRIVMSG %s :%s\n", user, resp_user, resp->result); + int fret = snprintf(cmd_buf, TH_CONN_BUF_SZ,":%s PRIVMSG %s :%s \r\n", resp_user, user, resp->result); + PRINT("%s\n",cmd_buf); + write(conn.conn_fd, cmd_buf, fret); + } + + FREE(user); + FREE(resp_user); + + //remove command by id + //for now just remove command without checkings its state + tbl_qcmd_del_by_id(qtbl, cmd_id); + } + } + } + + + //nb_print(nb_resp); + //nb_free(nb_resp); } @@ -492,6 +550,8 @@ int th_start_client(void *data) char *dest_name = token_new_str(itok,2); char *msg = token_new_str(itok,3); //could send without first symbol pal + PRINT("dest_name %s\n",dest_name); + char *sep = strchr(uname,'!'); uname[sep-uname] = 0x0; @@ -542,25 +602,16 @@ int th_start_client(void *data) tbl_qcmd_add_tok(qcmd, itok); //HOW? add commands that are in execution list, dont process all messages tbl_qcmd_add(qtbl, qcmd); - //tbl_qcmd_print_tbl(qtbl,TBL_PF_QCMD_ID - // |TBL_PF_QCMD_CID - // |TBL_PF_QCMD_STATE - // |TBL_PF_QCMD_CMD - // |TBL_PF_QCMD_IRCIDENT - // |TBL_PF_QCMD_PARAM - // |TBL_PF_QCMD_TID - // |TBL_PF_QCMD_TIDX); + tbl_qcmd_print_tbl(qtbl,TBL_PF_QCMD_ID + |TBL_PF_QCMD_CID + |TBL_PF_QCMD_STATE + |TBL_PF_QCMD_CMD + |TBL_PF_QCMD_IRCIDENT + |TBL_PF_QCMD_PARAM + |TBL_PF_QCMD_TID + |TBL_PF_QCMD_TIDX); } - //create command and send to mq - //PRINT("MSG %s\n",msg); - //mq_cmd *privcmd = mq_cmd_create(qcmd->cid,"PRIVMSG",strlen("PRIVMSG"),msg,strlen(msg)); - //char *bufPtr; - //bufPtr = mq_cmd_buf(privcmd); - ////PRINT("%s\n",bufPtr); - //PRINT("write %d bytes\n",mq_ntf_write(mq, MQ_OUT, bufPtr, strlen(bufPtr))); - //mq_cmd_free(privcmd); - req = rpc_req_new(qcmd->cmd, qcmd->param, qcmd->cid); rpc_req_marsh(req, &nb_req); buf_nb = nb_create(nb_req); @@ -677,6 +728,7 @@ int th_event_manager(void *data) mq_ntf_mdt *mq=NULL,*mq_cur=NULL; int i_mq=0; //to iter trought mqueue char *out_buf = NULL; + char *in_buf = NULL; int run; int mq_event; @@ -685,6 +737,7 @@ int th_event_manager(void *data) //read any command mq_cmd *recv_cmd = NULL; // for recieved cmd from mq struct mq_attr out_attr, *ptr_out_attr=&out_attr; + struct mq_attr in_attr, *ptr_in_attr=&in_attr; //execution and command table generation //tble_exec *ecmd = NULL; @@ -732,23 +785,29 @@ int th_event_manager(void *data) printf("Start event thread\n"); mq = cfg->mq_listen; - //get mq attributes + //get mq attributes, FIX out/in everytime could be different pal if (mq_ntf_getattr(mq, MQ_OUT, &ptr_out_attr) == -1) { printf("Cant get attribute\n"); } + if (mq_ntf_getattr(mq, MQ_IN, &ptr_in_attr) == -1) + { + printf("Cant get attribute\n"); + } ////////////////////////////////////////////////////////////////////////////// //nbrpc prepare variables rpc_request *req=NULL; rpc_response *resp=NULL; netbyte_store *nb_req=NULL, *nb_resp=NULL; + char *buf_nb = NULL; nb_resp = malloc(sizeof(netbyte_store)); //not freed nb_req = malloc(sizeof(netbyte_store)); //not freed out_buf = malloc(out_attr.mq_msgsize); //maybe its not null + in_buf = malloc(in_attr.mq_msgsize); printf("Start event loop\n"); run = 1; @@ -826,9 +885,17 @@ int th_event_manager(void *data) printf("Ret message %s\n",ret_msg); //send response - + resp = rpc_resp_new(ret_msg,"None",req->id); + rpc_resp_marsh(resp, &nb_resp); + buf_nb = nb_create(nb_resp); + mq_ntf_write(mq_cur, MQ_IN, buf_nb, nb_resp->size > in_attr.mq_msgsize ? in_attr.mq_msgsize : nb_resp->size); + FREE(buf_nb); + printf("EVENT-Write %d bytes\n", nb_resp->size > in_attr.mq_msgsize ? in_attr.mq_msgsize : nb_resp->size); + FREE(ret_msg); + nb_free(nb_resp); + rpc_resp_free(resp); } } else { -- cgit v1.2.3