blob: 6e431772f56b3157c79d08d7460792ccbc0384f0 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2017 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc
import asyncio
import logging
from time import sleep
import gi
gi.require_version('RwProjectManoYang', '1.0')
gi.require_version('RwDts', '1.0')
from gi.repository import (
RwProjectManoYang,
RwDts as rwdts,
ProtobufC,
RwTypes,
)
import rift.tasklets
class ManoProjectError(Exception):
pass
class ManoProjNameSetErr(ManoProjectError):
pass
class ManoProjXpathNoProjErr(ManoProjectError):
pass
class ManoProjXpathKeyErr(ManoProjectError):
pass
class ManoProjXpathNotRootErr(ManoProjectError):
pass
class ManoProjXpathPresentErr(ManoProjectError):
pass
NS = 'rw-project'
PROJECT = 'project'
NS_PROJECT = '{}:{}'.format(NS, PROJECT)
XPATH = '/{}'.format(NS_PROJECT)
XPATH_LEN = len(XPATH)
NAME = 'name'
NAME_LEN = len(NAME)
NS_NAME = '{}:{}'.format(NS, NAME)
DEFAULT_PROJECT = 'default'
DEFAULT_PREFIX = "{}[{}='{}']".format(XPATH,
NS_NAME,
DEFAULT_PROJECT)
class ManoProject(object):
'''Class to handle the project name'''
log = None
@classmethod
def instance_from_xpath(cls, xpath, log):
name = cls.from_xpath(xpath, log)
if name is None:
return None
proj = ManoProject(log, name=name)
return proj
@classmethod
def from_xpath(cls, xpath, log):
log.debug("Get project name from {}".format(xpath));
if XPATH in xpath:
idx = xpath.find(XPATH)
if idx == -1:
msg = "Project not found in XPATH: {}".format(xpath)
log.error(msg)
raise ManoProjXpathNoProjErr(msg)
sub = xpath[idx+XPATH_LEN:].strip()
if (len(sub) < NAME_LEN) or (sub[0] != '['):
msg = "Project name not found in XPath: {}".format(xpath)
log.error(msg)
raise ManoProjXpathKeyErr(msg)
sub = sub[1:].strip()
idx = sub.find(NS_NAME)
if idx == -1:
idx = sub.find(NAME)
if idx != 0:
msg = "Project name not found in XPath: {}".format(xpath)
log.error(msg)
raise ManoProjXpathKeyErr(msg)
idx = sub.find(']')
if idx == -1:
msg = "XPath is invalid: {}".format(xpath)
log.error(msg)
raise ManoProjXpathKeyErr(msg)
sub = sub[:idx].strip()
try:
log.debug("Key and value found: {}".format(sub))
k, n = sub.split("=", 2)
name = n.strip(' \'"')
if name is None:
msg = "Project name is empty in XPath".format(xpath)
log.error(msg)
raise ManoProjXpathKeyErr (msg)
log.debug("Found project name {} from XPath {}".
format(name, xpath))
return name
except ValueError as e:
msg = "Project name not found in XPath: {}, exception: {}" \
.format(xpath, e)
log.exception(msg)
raise ManoProjXpathKeyErr(msg)
else:
msg = "Project not found in XPATH: {}".format(xpath)
log.error(msg)
raise ManoProjXpathNoProjErr(msg)
@classmethod
def get_log(cls):
if not cls.log:
cls.log = logging.getLogger('rw-mano-log.rw-project')
cls.log.setLevel(logging.ERROR)
@classmethod
def prefix_project(cls, xpath, project=None, log=None):
if log is None:
log = cls.get_log()
if project is None:
project = DEFAULT_PROJECT
proj_prefix = DEFAULT_PREFIX
else:
proj_prefix = "{}[{}='{}']".format(XPATH,
NS_NAME,
project)
prefix = ''
suffix = xpath
idx = xpath.find('C,/')
if idx == -1:
idx = xpath.find('D,/')
suffix = xpath
if idx != -1:
prefix = xpath[:2]
suffix = xpath[2:]
if suffix[0] != '/':
msg = "Non-rooted xpath provided: {}".format(xpath)
log.error(msg)
raise ManoProjXpathNotRootErr(msg)
idx = suffix.find(XPATH)
if idx == 0:
name = cls.from_xpath(xpath, log)
if name == project:
log.debug("Project already in the XPATH: {}".format(xpath))
return xpath
else:
msg = "Different project {} already in XPATH {}". \
format(name, xpath)
log.error(msg)
raise ManoProjXpathPresentErr(msg)
ret = prefix + proj_prefix + suffix
return ret
def __init__(self, log, name=None, tasklet=None):
self._log = log
self._name = None
self._prefix = None
self._pbcm = None
self._tasklet = None
self._dts = None
self._loop = None
self._log_hdl = None
# Track if the apply config was received
self._apply = False
if name:
self.name = name
def update(self, tasklet):
# Store the commonly used properties from a tasklet
self._tasklet = tasklet
self._log_hdl = tasklet.log_hdl
self._dts = tasklet.dts
self._loop = tasklet.loop
@property
def name(self):
return self._name
@property
def log(self):
return self._log
@property
def prefix(self):
return self._prefix
@property
def pbcm(self):
return self._pbcm
@property
def config(self):
return self._pbcm.project_config
@property
def tasklet(self):
return self._tasklet
@property
def log_hdl(self):
return self._log_hdl
@property
def dts(self):
return self._dts
@property
def loop(self):
return self._loop
@name.setter
def name(self, value):
if self._name is None:
self._name = value
self._prefix = "{}[{}='{}']".format(XPATH,
NS_NAME,
self._name)
self._pbcm = RwProjectManoYang.YangData_RwProject_Project(
name=self._name)
elif self._name == value:
self._log.debug("Setting the same name again for project {}".
format(value))
else:
msg = "Project name already set to {}".format(self._name)
self._log.error(msg)
raise ManoProjNameSetErr(msg)
def set_from_xpath(self, xpath):
self.name = ManoProject.from_xpath(xpath, self._log)
def add_project(self, xpath):
return ManoProject.prefix_project(xpath, log=self._log, project=self._name)
@abc.abstractmethod
@asyncio.coroutine
def delete_prepare(self):
self._log.debug("Delete prepare for project {}".format(self._name))
return (True, "True")
@abc.abstractmethod
@asyncio.coroutine
def register(self):
msg = "Register not implemented for project type {}". \
format(self.__class__.__name__)
self._log.error(msg)
raise NotImplementedError(msg)
@abc.abstractmethod
def deregister(self):
msg = "De-register not implemented for project type {}". \
format(self.__class__.__name__)
self._log.error(msg)
raise NotImplementedError(msg)
def rpc_check(self, msg, xact_info=None):
'''Check if the rpc is for this project'''
try:
project = msg.project_name
except AttributeError as e:
project = DEFAULT_PROJECT
if project != self.name:
self._log.debug("Project {}: RPC is for different project {}".
format(self.name, project))
if xact_info:
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
return False
return True
@asyncio.coroutine
def create_project(self, dts):
proj_xpath = "C,{}/config".format(self.prefix)
self._log.info("Creating project: {} with {}".
format(proj_xpath, self.config.as_dict()))
yield from dts.query_create(proj_xpath,
rwdts.XactFlag.ADVISE,
self.config)
def get_add_delete_update_cfgs(dts_member_reg, xact, key_name):
#TODO: Check why this is getting called during project delete
if not dts_member_reg:
return [], [], []
# Unforunately, it is currently difficult to figure out what has exactly
# changed in this xact without Pbdelta support (RIFT-4916)
# As a workaround, we can fetch the pre and post xact elements and
# perform a comparison to figure out adds/deletes/updates
xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
curr_cfgs = list(dts_member_reg.elements)
xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
# Find Adds
added_keys = set(xact_key_map) - set(curr_key_map)
added_cfgs = [xact_key_map[key] for key in added_keys]
# Find Deletes
deleted_keys = set(curr_key_map) - set(xact_key_map)
deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
# Find Updates
updated_keys = set(curr_key_map) & set(xact_key_map)
updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
return added_cfgs, deleted_cfgs, updated_cfgs
class ProjectConfigCallbacks(object):
def __init__(self,
on_add_apply=None, on_add_prepare=None,
on_delete_apply=None, on_delete_prepare=None):
@asyncio.coroutine
def prepare_noop(*args, **kwargs):
pass
def apply_noop(*args, **kwargs):
pass
self.on_add_apply = on_add_apply
self.on_add_prepare = on_add_prepare
self.on_delete_apply = on_delete_apply
self.on_delete_prepare = on_delete_prepare
for f in ('on_add_apply', 'on_delete_apply'):
ref = getattr(self, f)
if ref is None:
setattr(self, f, apply_noop)
continue
if asyncio.iscoroutinefunction(ref):
raise ValueError('%s cannot be a coroutine' % (f,))
for f in ('on_add_prepare', 'on_delete_prepare'):
ref = getattr(self, f)
if ref is None:
setattr(self, f, prepare_noop)
continue
if not asyncio.iscoroutinefunction(ref):
raise ValueError("%s must be a coroutine" % f)
class ProjectDtsHandler(object):
XPATH = "C,{}/project-config".format(XPATH)
def __init__(self, dts, log, callbacks, sub_config=True):
self._dts = dts
self._log = log
self._callbacks = callbacks
if sub_config:
self.xpath = ProjectDtsHandler.XPATH
self._key = 'name_ref'
else:
self.xpath = "C,{}".format(XPATH)
self._key = 'name'
self.reg = None
self.projects = []
@property
def log(self):
return self._log
@property
def dts(self):
return self._dts
def add_project(self, name):
self._log.info("Adding project: {}".format(name))
if name not in self.projects:
self._callbacks.on_add_apply(name)
self.projects.append(name)
else:
self._log.error("Project already present: {}".
format(name))
def delete_project(self, name):
self._log.info("Deleting project: {}".format(name))
if name in self.projects:
self._callbacks.on_delete_apply(name)
self.projects.remove(name)
else:
self._log.error("Unrecognized project: {}".
format(name))
def update_project(self, name):
""" Update an existing project
Currently, we do not take any action on MANO for this,
so no callbacks are defined
Arguments:
msg - The project config message
"""
self._log.info("Updating project: {}".format(name))
if name in self.projects:
pass
else:
self.add_project(name)
def register(self):
def on_init(acg, xact, scratch):
self._log.debug("on_init")
scratch["projects"] = {
"added": [],
"deleted": [],
"updated": [],
}
return scratch
def readd_projects(xact):
self._log.info("Re-add projects")
for cfg, ks in self._reg.get_xact_elements(xact, include_keyspec=True):
xpath = ks.to_xpath(RwProjectManoYang.get_schema())
self._log.debug("Got ks {} for cfg {}".format(xpath, cfg.as_dict()))
name = ManoProject.from_xpath(xpath, self._log)
self._log.debug("Project to add: {}".format(name))
self.add_project(name)
@asyncio.coroutine
def apply_config(dts, acg, xact, action, scratch):
self._log.debug("Got project apply config (xact: %s) (action: %s): %s",
xact, action, scratch)
if xact.xact is None:
if action == rwdts.AppconfAction.INSTALL:
readd_projects(xact)
else:
self._log.debug("No xact handle. Skipping apply config")
return
try:
add_cfgs = scratch["projects"]["added"]
except KeyError:
add_cfgs = []
try:
del_cfgs = scratch["projects"]["deleted"]
except KeyError:
del_cfgs = []
try:
update_cfgs = scratch["projects"]["updated"]
except KeyError:
update_cfgs = []
# Handle Deletes
for name in del_cfgs:
self.delete_project(name)
# Handle Adds
for name, msg in add_cfgs:
self.add_project(name)
# Handle Updates
for name, msg in update_cfgs:
self.update_project(name)
try:
del scratch["projects"]
except KeyError:
pass
return RwTypes.RwStatus.SUCCESS
@asyncio.coroutine
def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
""" Prepare callback from DTS for Project """
action = xact_info.query_action
xpath = ks_path.to_xpath(RwProjectManoYang.get_schema())
self._log.debug("Project xpath: {}".format(xpath))
name = ManoProject.from_xpath(xpath, self._log)
self._log.debug("Project %s on_prepare config received (action: %s): %s",
name, xact_info.query_action, msg)
if action == rwdts.QueryAction.CREATE:
if name in self.projects:
self._log.debug("Project {} already exists. Ignore request".
format(name))
else:
yield from self._callbacks.on_add_prepare(name)
scratch["projects"]["added"].append((name, msg))
elif action == rwdts.QueryAction.UPDATE:
if name in self.projects:
scratch["projects"]["updated"].append((name, msg))
else:
self._log.debug("Project {}: Invoking on_prepare add request".
format(name))
yield from self._callbacks.on_add_prepare(name)
scratch["projects"]["added"].append((name, msg))
elif action == rwdts.QueryAction.DELETE:
# Check if the entire project got deleted
fref = ProtobufC.FieldReference.alloc()
fref.goto_whole_message(msg.to_pbcm())
if fref.is_field_deleted():
if name in self.projects:
rc, delete_msg = yield from self._callbacks.on_delete_prepare(name)
if not rc:
self._log.error("Project {} should not be deleted. Reason : {}".
format(name, delete_msg))
xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
ProjectDtsHandler.XPATH,
delete_msg)
xact_info.respond_xpath(rwdts.XactRspCode.NACK)
return
scratch["projects"]["deleted"].append(name)
else:
self._log.warning("Delete on unknown project: {}".
format(name))
else:
self._log.error("Action (%s) NOT SUPPORTED", action)
xact_info.respond_xpath(rwdts.XactRspCode.NACK)
return
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
self._log.debug("Registering for project config using xpath: %s",
ProjectDtsHandler.XPATH,
)
acg_handler = rift.tasklets.AppConfGroup.Handler(
on_apply=apply_config,
on_init=on_init)
with self._dts.appconf_group_create(acg_handler) as acg:
self._reg = acg.register(
xpath=ProjectDtsHandler.XPATH,
flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
on_prepare=on_prepare,
)
class ProjectHandler(object):
def __init__(self, tasklet, project_class, **kw):
self._tasklet = tasklet
self._log = tasklet.log
self._log_hdl = tasklet.log_hdl
self._dts = tasklet.dts
self._loop = tasklet.loop
self._class = project_class
self._kw = kw
self._log.debug("Creating project config handler")
self.project_cfg_handler = ProjectDtsHandler(
self._dts, self._log,
ProjectConfigCallbacks(
on_add_apply=self.on_project_added,
on_add_prepare=self.on_add_prepare,
on_delete_apply=self.on_project_deleted,
on_delete_prepare=self.on_delete_prepare,
)
)
def _get_tasklet_name(self):
return self._tasklet.tasklet_info.instance_name
def _get_project(self, name):
try:
proj = self._tasklet.projects[name]
except Exception as e:
self._log.exception("Project {} ({})not found for tasklet {}: {}".
format(name, list(self._tasklet.projects.keys()),
self._get_tasklet_name(), e))
raise e
return proj
def on_project_deleted(self, name):
self._log.debug("Project {} deleted".format(name))
try:
self._get_project(name).deregister()
except Exception as e:
self._log.exception("Project {} deregister for {} failed: {}".
format(name, self._get_tasklet_name(), e))
try:
proj = self._tasklet.projects.pop(name)
del proj
except Exception as e:
self._log.exception("Project {} delete for {} failed: {}".
format(name, self._get_tasklet_name(), e))
def on_project_added(self, name):
if name not in self._tasklet.projects:
try:
self._tasklet.projects[name] = \
self._class(name, self._tasklet, **(self._kw))
task = asyncio.ensure_future(self._get_project(name).register(),
loop=self._loop)
self._log.debug("Project {} register: {}".format(name, str(task)))
except Exception as e:
self._log.exception("Project {} create for {} failed: {}".
format(name, self._get_tasklet_name(), e))
raise e
self._log.debug("Project {} added to tasklet {}".
format(name, self._get_tasklet_name()))
self._get_project(name)._apply = True
@asyncio.coroutine
def on_add_prepare(self, name):
self._log.debug("Project {} to be added to {}".
format(name, self._get_tasklet_name()))
if name in self._tasklet.projects:
self._log.error("Project {} already exists for {}".
format(name, self._get_tasklet_name()))
return
try:
self._tasklet.projects[name] = \
self._class(name, self._tasklet, **(self._kw))
yield from self._get_project(name).register()
except Exception as e:
self._log.exception("Project {} create for {} failed: {}".
format(name, self._get_tasklet_name(), e))
raise e
@asyncio.coroutine
def on_delete_prepare(self, name):
self._log.debug("Project {} being deleted for tasklet {}".
format(name, self._get_tasklet_name()))
rc, delete_msg = yield from self._get_project(name).delete_prepare()
return rc, delete_msg
def register(self):
self.project_cfg_handler.register()