blob: 455ac1e4a7fc3c8b57c0fa2e54f746b7be06dafc [file] [log] [blame]
/*
*
* Copyright 2016 RIFT.IO Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <poll.h>
#include <unistd.h>
#include <string.h>
#include <confd_lib.h>
#include "confd_cdb.h"
#include "confd_dp.h"
static struct confd_daemon_ctx *dctx;
static int ctlsock;
static int workersock;
typedef struct _foodata {
char *name;
struct _foodata *next;
} foodata_t;
typedef struct _opdata {
foodata_t *foo;
} opdata_t;
opdata_t *g_opdata = NULL;
int process_confd_subscription(int subsock)
{
int confd_result, flags, length, *subscription_points, i, j, nvalues;
enum cdb_sub_notification type;
confd_tag_value_t *values;
confd_result = cdb_read_subscription_socket2(subsock,
&type,
&flags,
&subscription_points,
&length);
if (confd_result != CONFD_OK) {
confd_fatal("Failed to read subscription data \n");
}
switch (type) {
case CDB_SUB_PREPARE:
for (i = 0; i < length; i++) {
printf("i = %d, point = %d\n", i, subscription_points[i]);
if (cdb_get_modifications(subsock, subscription_points[i], flags, &values, &nvalues,
"/") == CONFD_OK) {
for (j = 0; j < nvalues; j++) {
printf("j = %d\n", j);
confd_free_value(CONFD_GET_TAG_VALUE(&values[j]));
}
}
}
cdb_sync_subscription_socket(subsock, CDB_DONE_PRIORITY);
fprintf(stdout, "CBD_SUB_PREPARE\n");
break;
case CDB_SUB_COMMIT:
cdb_sync_subscription_socket(subsock, CDB_DONE_PRIORITY);
fprintf(stdout, "CDB_SUB_COMMIT\n");
break;
case CDB_SUB_ABORT:
fprintf(stdout, "CDB_SUB_ABORT\n");
break;
default:
confd_fatal("Invalid type %d in cdb_read_subscription_socket2\n", type);
}
return 0;
}
static int do_init_action(struct confd_user_info *uinfo)
{
int ret = CONFD_OK;
// fprintf(stdout, "init_action called\n");
confd_action_set_fd(uinfo, workersock);
return ret;
}
static int do_rw_action(struct confd_user_info *uinfo,
struct xml_tag *name,
confd_hkeypath_t *kp,
confd_tag_value_t *params,
int nparams)
{
// confd_tag_value_t reply[2];
// int status;
// char *ret_status;
int i;
char buf[BUFSIZ];
/* Just print the parameters and return */
//
for (i = 0; i < nparams; i++) {
confd_pp_value(buf, sizeof(buf), CONFD_GET_TAG_VALUE(&params[i]));
printf("param %2d: %9u:%-9u, %s\n", i, CONFD_GET_TAG_NS(&params[i]),
CONFD_GET_TAG_TAG(&params[i]), buf);
}
i = 0;
// CONFD_SET_TAG_INT32(&reply[i], NULL, 0); i++;
// CONFD_SET_TAG_STR(&reply[i], NULL, "success"); i++;
confd_action_reply_values(uinfo, NULL, i);
return CONFD_OK;
}
static int get_next(struct confd_trans_ctx *tctx,
confd_hkeypath_t *keypath,
long next)
{
opdata_t *opdata = tctx->t_opaque;
foodata_t *curr;
confd_value_t v[2];
if (next == -1) { /* first call */
curr = opdata->foo;
} else {
curr = (foodata_t *)next;
}
if (curr == NULL) {
confd_data_reply_next_key(tctx, NULL, -1, -1);
return CONFD_OK;
}
CONFD_SET_STR(&v[0], curr->name);
confd_data_reply_next_key(tctx, &v[0], 1, (long)curr->next);
return CONFD_OK;
}
static foodata_t *find_foo(confd_hkeypath_t *keypath, opdata_t *dp)
{
char *name = (char*)CONFD_GET_BUFPTR(&keypath->v[1][0]);
foodata_t *foo = dp->foo;
while (foo != NULL) {
if (strcmp(foo->name, name) == 0) {
return foo;
}
foo = foo->next;
}
return NULL;
}
/* Keypath example */
/* /arpentries/arpe{192.168.1.1 eth0}/hwaddr */
/* 3 2 1 0 */
static int get_elem(struct confd_trans_ctx *tctx,
confd_hkeypath_t *keypath)
{
confd_value_t v;
foodata_t *foo = find_foo(keypath, tctx->t_opaque);
if (foo == NULL) {
confd_data_reply_not_found(tctx);
return CONFD_OK;
}
CONFD_SET_STR(&v, foo->name);
confd_data_reply_value(tctx, &v);
return CONFD_OK;
}
static foodata_t *create_dummy_foodata_list(int count)
{
foodata_t *head, *curr, *prev;
int i;
char buf[64];
head = prev = curr = NULL;
for (i = 0; i < count; ++i) {
curr = malloc(sizeof(foodata_t));
memset(curr, 0, sizeof(foodata_t));
snprintf(buf, 64, "foo%d", i);
curr->name = strdup(buf);
if (prev) {
prev->next = curr;
} else {
head = curr;
}
prev = curr;
}
return head;
}
static void free_foodata_list(foodata_t *foo)
{
foodata_t *curr, *next;
curr = foo;
while (curr) {
next = curr->next;
if (curr->name) {
free(curr->name);
}
free(curr);
curr = next;
}
}
static void print_foodata_list(foodata_t *foo)
{
foodata_t *curr = foo;
while (curr) {
// fprintf(stdout, "%s\n", curr->name);
curr = curr->next;
}
}
static int s_init(struct confd_trans_ctx *tctx)
{
opdata_t *opdata;
if ((opdata = malloc(sizeof(opdata_t))) == NULL) {
return CONFD_ERR;
}
memset(opdata, 0, sizeof(opdata_t));
opdata->foo = create_dummy_foodata_list(10);
print_foodata_list(opdata->foo);
tctx->t_opaque = opdata;
confd_trans_set_fd(tctx, workersock);
return CONFD_OK;
}
static int s_finish(struct confd_trans_ctx *tctx)
{
opdata_t *opdata = tctx->t_opaque;
if (opdata != NULL) {
free_foodata_list(opdata->foo);
free(opdata);
}
return CONFD_OK;
}
int main(int argc, char **argv)
{
struct sockaddr_in addr;
int debuglevel = CONFD_TRACE;
struct confd_trans_cbs trans;
struct confd_data_cbs data;
struct confd_action_cbs action;
int i;
int subsock, datasock;
int status;
int spoint;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_family = AF_INET;
addr.sin_port = htons(CONFD_PORT);
/**
* Setup CDB subscription socket
*/
confd_init(argv[0], stderr, CONFD_DEBUG);
if ((subsock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
confd_fatal("Failed to open subscription socket\n");
}
printf("Subscription socket: %d\n", subsock);
for (i = 1; i < 10; ++i) {
if (cdb_connect(subsock, CDB_SUBSCRIPTION_SOCKET,
(struct sockaddr*)&addr,
sizeof (struct sockaddr_in)) < 0) {
sleep(2);
fprintf(stdout, "Failed in confd_connect() {attempt: %d}\n", i);
} else {
fprintf(stdout, "confd_connect succeeded\n");
break;
}
}
if ((status = cdb_subscribe2(subsock, CDB_SUB_RUNNING_TWOPHASE, 0, 0, &spoint, 0, "/"))
!= CONFD_OK) {
fprintf(stderr, "Terminate: subscribe %d\n", status);
exit(1);
}
if (cdb_subscribe_done(subsock) != CONFD_OK) {
confd_fatal("cdb_subscribe_done() failed");
}
/**
* Setup CBD data socket
*/
if ((datasock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
confd_fatal("Failed to open data socket\n");
}
if (cdb_connect(datasock, CDB_DATA_SOCKET,
(struct sockaddr*)&addr,
sizeof (struct sockaddr_in)) < 0) {
confd_fatal("Failed to confd_connect() to confd \n");
}
memset(&trans, 0, sizeof (struct confd_trans_cbs));
trans.init = s_init;
trans.finish = s_finish;
memset(&data, 0, sizeof (struct confd_data_cbs));
data.get_elem = get_elem;
data.get_next = get_next;
strcpy(data.callpoint, "base_show");
memset(&action, 0, sizeof (action));
strcpy(action.actionpoint, "rw_action");
action.init = do_init_action;
action.action = do_rw_action;
/* initialize confd library */
confd_init("confd_client_op_data_daemon", stderr, debuglevel);
for (i = 1; i < 10; ++i) {
if (confd_load_schemas((struct sockaddr*)&addr,
sizeof(struct sockaddr_in)) != CONFD_OK) {
fprintf(stdout, "Failed to load schemas from confd {attempt: %d}\n", i);
sleep(2);
} else {
fprintf(stdout, "confd_load_schemas succeeded\n");
break;
}
}
if ((dctx = confd_init_daemon("confd_client_op_data_daemon")) == NULL) {
confd_fatal("Failed to initialize confdlib\n");
}
/* Create the first control socket, all requests to */
/* create new transactions arrive here */
if ((ctlsock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
confd_fatal("Failed to open ctlsocket\n");
}
if (confd_connect(dctx, ctlsock, CONTROL_SOCKET, (struct sockaddr*)&addr,
sizeof (struct sockaddr_in)) < 0) {
confd_fatal("Failed to confd_connect() to confd \n");
}
/* Also establish a workersocket, this is the most simple */
/* case where we have just one ctlsock and one workersock */
if ((workersock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
confd_fatal("Failed to open workersocket\n");
}
if (confd_connect(dctx, workersock, WORKER_SOCKET,(struct sockaddr*)&addr,
sizeof (struct sockaddr_in)) < 0) {
confd_fatal("Failed to confd_connect() to confd \n");
}
if (confd_register_trans_cb(dctx, &trans) == CONFD_ERR) {
confd_fatal("Failed to register trans cb \n");
}
if (confd_register_data_cb(dctx, &data) == CONFD_ERR) {
confd_fatal("Failed to register data cb \n");
}
if (confd_register_action_cbs(dctx, &action) == CONFD_ERR) {
confd_fatal("Failed to register action cb \n");
}
if (confd_register_done(dctx) != CONFD_OK) {
confd_fatal("Failed to complete registration \n");
}
while(1) {
struct pollfd set[3];
int ret;
set[0].fd = ctlsock;
set[0].events = POLLIN;
set[0].revents = 0;
set[1].fd = workersock;
set[1].events = POLLIN;
set[1].revents = 0;
set[2].fd = subsock;
set[2].events = POLLIN;
set[2].revents = 0;
if (poll(set, sizeof(set)/sizeof(*set), -1) < 0) {
perror("Poll failed:");
continue;
}
/* Check for I/O */
if (set[0].revents & POLLIN) {
if ((ret = confd_fd_ready(dctx, ctlsock)) == CONFD_EOF) {
confd_fatal("Control socket closed\n");
} else if (ret == CONFD_ERR && confd_errno != CONFD_ERR_EXTERNAL) {
confd_fatal("Error on control socket request: %s (%d): %s\n",
confd_strerror(confd_errno), confd_errno, confd_lasterr());
}
}
if (set[1].revents & POLLIN) {
if ((ret = confd_fd_ready(dctx, workersock)) == CONFD_EOF) {
confd_fatal("Worker socket closed\n");
} else if (ret == CONFD_ERR && confd_errno != CONFD_ERR_EXTERNAL) {
confd_fatal("Error on worker socket request: %s (%d): %s\n",
confd_strerror(confd_errno), confd_errno, confd_lasterr());
}
}
if (set[2].revents & POLLIN) {
process_confd_subscription(set[2].fd);
}
}
return 0;
}