X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Fconfig_agent%2Fconfig.py;fp=common%2Fpython%2Frift%2Fmano%2Fconfig_agent%2Fconfig.py;h=7500bac72d9717e2e80e4d144f06382b9feae9f4;hb=6f07e6f33f751ab4ffe624f6037f887b243bece2;hp=0000000000000000000000000000000000000000;hpb=72a563886272088feb7cb52e4aafbe6d2c580ff9;p=osm%2FSO.git diff --git a/common/python/rift/mano/config_agent/config.py b/common/python/rift/mano/config_agent/config.py new file mode 100644 index 00000000..7500bac7 --- /dev/null +++ b/common/python/rift/mano/config_agent/config.py @@ -0,0 +1,228 @@ + +# +# Copyright 2016 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import rw_peas + +import gi +gi.require_version('RwDts', '1.0') +import rift.tasklets + +from gi.repository import ( + RwcalYang as rwcal, + RwDts as rwdts, + RwConfigAgentYang as rwcfg_agent, + ProtobufC, + ) + +class ConfigAccountNotFound(Exception): + pass + +class ConfigAccountError(Exception): + pass + + +def get_add_delete_update_cfgs(dts_member_reg, xact, key_name): + # Unforunately, it is currently difficult to figure out what has exactly + # changed in this xact without Pbdelta support (RIFT-4916) + # As a workaround, we can fetch the pre and post xact elements and + # perform a comparison to figure out adds/deletes/updates + xact_cfgs = list(dts_member_reg.get_xact_elements(xact)) + curr_cfgs = list(dts_member_reg.elements) + + xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs} + curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs} + + # Find Adds + added_keys = set(xact_key_map) - set(curr_key_map) + added_cfgs = [xact_key_map[key] for key in added_keys] + + # Find Deletes + deleted_keys = set(curr_key_map) - set(xact_key_map) + deleted_cfgs = [curr_key_map[key] for key in deleted_keys] + + # Find Updates + updated_keys = set(curr_key_map) & set(xact_key_map) + updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]] + + return added_cfgs, deleted_cfgs, updated_cfgs + + +class ConfigAgentCallbacks(object): + def __init__(self, + on_add_apply=None, on_add_prepare=None, + on_delete_apply=None, on_delete_prepare=None): + + @asyncio.coroutine + def prepare_noop(*args, **kwargs): + pass + + def apply_noop(*args, **kwargs): + pass + + self.on_add_apply = on_add_apply + self.on_add_prepare = on_add_prepare + self.on_delete_apply = on_delete_apply + self.on_delete_prepare = on_delete_prepare + + for f in ('on_add_apply', 'on_delete_apply'): + ref = getattr(self, f) + if ref is None: + setattr(self, f, apply_noop) + continue + + if asyncio.iscoroutinefunction(ref): + raise ValueError('%s cannot be a coroutine' % (f,)) + + for f in ('on_add_prepare', 'on_delete_prepare'): + ref = getattr(self, f) + if ref is None: + setattr(self, f, prepare_noop) + continue + + if not asyncio.iscoroutinefunction(ref): + raise ValueError("%s must be a coroutine" % f) + + +class ConfigAgentSubscriber(object): + XPATH = "C,/rw-config-agent:config-agent/account" + + def __init__(self, dts, log, config_callbacks): + self._dts = dts + self._log = log + self._reg = None + + self.accounts = {} + + self._config_callbacks = config_callbacks + + def add_account(self, account_msg): + self._log.info("adding config account: {}".format(account_msg)) + + self.accounts[account_msg.name] = account_msg + + self._config_callbacks.on_add_apply(account_msg) + + def delete_account(self, account_msg): + self._log.info("deleting config account: {}".format(account_msg.name)) + del self.accounts[account_msg.name] + + self._config_callbacks.on_delete_apply(account_msg) + + def update_account(self, account_msg): + """ Update an existing config-agent account + + In order to simplify update, turn an update into a delete followed by + an add. The drawback to this approach is that we will not support + updates of an "in-use" config-agent account, but this seems like a + reasonable trade-off. + + Arguments: + account_msg - The config-agent account config message + """ + + self._log.info("updating config-agent account: {}".format(account_msg)) + self.delete_account(account_msg) + self.add_account(account_msg) + + def register(self): + def apply_config(dts, acg, xact, action, _): + self._log.debug("Got config account apply config (xact: %s) (action: %s)", xact, action) + + if xact.xact is None: + # When RIFT first comes up, an INSTALL is called with the current config + # Since confd doesn't actally persist data this never has any data so + # skip this for now. + self._log.debug("No xact handle. Skipping apply config") + return + + add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs( + dts_member_reg=self._reg, + xact=xact, + key_name="name", + ) + + # Handle Deletes + for cfg in delete_cfgs: + self.delete_account(cfg) + + # Handle Adds + for cfg in add_cfgs: + self.add_account(cfg) + + # Handle Updates + for cfg in update_cfgs: + self.update_account(cfg) + + @asyncio.coroutine + def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): + """ Prepare callback from DTS for Config Account """ + + action = xact_info.handle.query_action + self._log.debug("Config account on_prepare config received (action: %s): %s", + xact_info.handle.query_action, msg) + + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: + # If the account already exists, then this is an update. + if msg.name in self.accounts: + self._log.debug("Config account already exists. Invoking on_prepare update request") + if msg.has_field("account_type"): + raise ConfigAccountError("Cannot change config's account-type") + + # Since updates are handled by a delete followed by an add, invoke the + # delete prepare callbacks to give clients an opportunity to reject. + yield from self._config_callbacks.on_delete_prepare(msg.name) + + else: + self._log.debug("Config account does not already exist. Invoking on_prepare add request") + if not msg.has_field('account_type'): + raise ConfigAccountError("New Config account must contain account_type field.") + + account = msg + yield from self._config_callbacks.on_add_prepare(account) + + elif action == rwdts.QueryAction.DELETE: + # Check if the entire cloud account got deleted + fref = ProtobufC.FieldReference.alloc() + fref.goto_whole_message(msg.to_pbcm()) + if fref.is_field_deleted(): + yield from self._config_callbacks.on_delete_prepare(msg.name) + else: + self._log.error("Deleting individual fields for config account not supported") + xact_info.respond_xpath(rwdts.XactRspCode.NACK) + return + + else: + self._log.error("Action (%s) NOT SUPPORTED", action) + xact_info.respond_xpath(rwdts.XactRspCode.NACK) + + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + + self._log.debug("Registering for Config Account config using xpath: %s", + ConfigAgentSubscriber.XPATH, + ) + + acg_handler = rift.tasklets.AppConfGroup.Handler( + on_apply=apply_config, + ) + + with self._dts.appconf_group_create(acg_handler) as acg: + self._reg = acg.register( + xpath=ConfigAgentSubscriber.XPATH, + flags=rwdts.Flag.SUBSCRIBER, + on_prepare=on_prepare, + )