X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=rwlaunchpad%2Fplugins%2Frwvns%2Frift%2Ftopmgr%2Frwtopmgr.py;fp=rwlaunchpad%2Fplugins%2Frwvns%2Frift%2Ftopmgr%2Frwtopmgr.py;h=b095fbc0c951251c45de1cdaf6310f1e8d6db0b1;hb=6f07e6f33f751ab4ffe624f6037f887b243bece2;hp=0000000000000000000000000000000000000000;hpb=72a563886272088feb7cb52e4aafbe6d2c580ff9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwvns/rift/topmgr/rwtopmgr.py b/rwlaunchpad/plugins/rwvns/rift/topmgr/rwtopmgr.py new file mode 100755 index 00000000..b095fbc0 --- /dev/null +++ b/rwlaunchpad/plugins/rwvns/rift/topmgr/rwtopmgr.py @@ -0,0 +1,329 @@ + +# +# Copyright 2016 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio + +import gi +gi.require_version('RwDts', '1.0') +gi.require_version('RwcalYang', '1.0') +gi.require_version('RwTypes', '1.0') +gi.require_version('RwSdn', '1.0') +from gi.repository import ( + RwDts as rwdts, + IetfNetworkYang, + IetfNetworkTopologyYang, + IetfL2TopologyYang, + RwTopologyYang, + RwsdnYang, + RwTypes +) + +from gi.repository.RwTypes import RwStatus +import rw_peas +import rift.tasklets + +class SdnGetPluginError(Exception): + """ Error while fetching SDN plugin """ + pass + + +class SdnGetInterfaceError(Exception): + """ Error while fetching SDN interface""" + pass + + +class SdnAccountMgr(object): + """ Implements the interface to backend plugins to fetch topology """ + def __init__(self, log, log_hdl, loop): + self._account = {} + self._log = log + self._log_hdl = log_hdl + self._loop = loop + self._sdn = {} + + self._regh = None + + self._status = RwsdnYang.SDNAccount_ConnectionStatus( + status='unknown', + details="Connection status lookup not started" + ) + + self._validate_task = None + + def set_sdn_account(self,account): + if (account.name in self._account): + self._log.error("SDN Account is already set") + else: + sdn_account = RwsdnYang.SDNAccount() + sdn_account.from_dict(account.as_dict()) + sdn_account.name = account.name + self._account[account.name] = sdn_account + self._log.debug("Account set is %s , %s",type(self._account), self._account) + self.start_validate_credentials(self._loop, account.name) + + def del_sdn_account(self, name): + self._log.debug("Account deleted is %s , %s", type(self._account), name) + del self._account[name] + + def update_sdn_account(self,account): + self._log.debug("Account updated is %s , %s", type(self._account), account) + if account.name in self._account: + sdn_account = self._account[account.name] + + sdn_account.from_dict( + account.as_dict(), + ignore_missing_keys=True, + ) + self._account[account.name] = sdn_account + self.start_validate_credentials(self._loop, account.name) + + def get_sdn_account(self, name): + """ + Creates an object for class RwsdnYang.SdnAccount() + """ + if (name in self._account): + return self._account[name] + else: + self._log.error("ERROR : SDN account is not configured") + + def get_saved_sdn_accounts(self, name): + ''' Get SDN Account corresponding to passed name, or all saved accounts if name is None''' + saved_sdn_accounts = [] + + if name is None or name == "": + sdn_accounts = list(self._account.values()) + saved_sdn_accounts.extend(sdn_accounts) + elif name in self._account: + account = self._account[name] + saved_sdn_accounts.append(account) + else: + errstr = "SDN account {} does not exist".format(name) + raise KeyError(errstr) + + return saved_sdn_accounts + + def get_sdn_plugin(self,name): + """ + Loads rw.sdn plugin via libpeas + """ + if (name in self._sdn): + return self._sdn[name] + account = self.get_sdn_account(name) + plugin_name = getattr(account, account.account_type).plugin_name + self._log.info("SDN plugin being created") + plugin = rw_peas.PeasPlugin(plugin_name, 'RwSdn-1.0') + engine, info, extension = plugin() + + self._sdn[name] = plugin.get_interface("Topology") + try: + rc = self._sdn[name].init(self._log_hdl) + assert rc == RwStatus.SUCCESS + except: + self._log.error("ERROR:SDN plugin instantiation failed ") + else: + self._log.info("SDN plugin successfully instantiated") + return self._sdn[name] + + @asyncio.coroutine + def validate_sdn_account_credentials(self, loop, name): + self._log.debug("Validating SDN Account credentials %s", name) + self._status = RwsdnYang.SDNAccount_ConnectionStatus( + status="validating", + details="SDN account connection validation in progress" + ) + + _sdnacct = self.get_sdn_account(name) + if (_sdnacct is None): + raise SdnGetPluginError + _sdnplugin = self.get_sdn_plugin(name) + if (_sdnplugin is None): + raise SdnGetInterfaceError + + rwstatus, status = yield from loop.run_in_executor( + None, + _sdnplugin.validate_sdn_creds, + _sdnacct, + ) + + if rwstatus == RwTypes.RwStatus.SUCCESS: + self._status = RwsdnYang.SDNAccount_ConnectionStatus.from_dict(status.as_dict()) + else: + self._status = RwsdnYang.SDNAccount_ConnectionStatus( + status="failure", + details="Error when calling CAL validate sdn creds" + ) + + self._log.info("Got sdn account validation response: %s", self._status) + _sdnacct.connection_status = self._status + + def start_validate_credentials(self, loop, name): + if self._validate_task is not None: + self._validate_task.cancel() + self._validate_task = None + + self._validate_task = asyncio.ensure_future( + self.validate_sdn_account_credentials(loop, name), + loop=loop + ) + + +class NwtopDiscoveryDtsHandler(object): + """ Handles DTS interactions for the Discovered Topology registration """ + DISC_XPATH = "D,/nd:network" + + def __init__(self, dts, log, loop, acctmgr, nwdatastore): + self._dts = dts + self._log = log + self._loop = loop + self._acctmgr = acctmgr + self._nwdatastore = nwdatastore + + self._regh = None + + @property + def regh(self): + """ The registration handle associated with this Handler""" + return self._regh + + @asyncio.coroutine + def register(self): + """ Register for the Discovered Topology path """ + + @asyncio.coroutine + def on_ready(regh, status): + """ On_ready for Discovered Topology registration """ + self._log.debug("PUB reg ready for Discovered Topology handler regn_hdl(%s) status %s", + regh, status) + + @asyncio.coroutine + def on_prepare(xact_info, action, ks_path, msg): + """ prepare for Discovered Topology registration""" + self._log.debug( + "Got topology on_prepare callback (xact_info: %s, action: %s): %s", + xact_info, action, msg + ) + + if action == rwdts.QueryAction.READ: + + for name in self._acctmgr._account: + _sdnacct = self._acctmgr.get_sdn_account(name) + if (_sdnacct is None): + raise SdnGetPluginError + + _sdnplugin = self._acctmgr.get_sdn_plugin(name) + if (_sdnplugin is None): + raise SdnGetInterfaceError + + rc, nwtop = _sdnplugin.get_network_list(_sdnacct) + #assert rc == RwStatus.SUCCESS + if rc != RwStatus.SUCCESS: + self._log.error("Fetching get network list for SDN Account %s failed", name) + xact_info.respond_xpath(rwdts.XactRspCode.NACK) + return + + self._log.debug("Topology: Retrieved network attributes ") + for nw in nwtop.network: + # Add SDN account name + nw.rw_network_attributes.sdn_account_name = name + nw.server_provided = False + nw.network_id = name + ':' + nw.network_id + self._log.debug("...Network id %s", nw.network_id) + nw_xpath = ("D,/nd:network[network-id=\'{}\']").format(nw.network_id) + xact_info.respond_xpath(rwdts.XactRspCode.MORE, + nw_xpath, nw) + + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + #err = "%s action on discovered Topology not supported" % action + #raise NotImplementedError(err) + + self._log.debug("Registering for discovered topology using xpath %s", NwtopDiscoveryDtsHandler.DISC_XPATH) + + handler = rift.tasklets.DTS.RegistrationHandler( + on_ready=on_ready, + on_prepare=on_prepare, + ) + + yield from self._dts.register( + NwtopDiscoveryDtsHandler.DISC_XPATH, + flags=rwdts.Flag.PUBLISHER, + handler=handler + ) + + +class NwtopStaticDtsHandler(object): + """ Handles DTS interactions for the Static Topology registration """ + STATIC_XPATH = "C,/nd:network" + + def __init__(self, dts, log, loop, acctmgr, nwdatastore): + self._dts = dts + self._log = log + self._loop = loop + self._acctmgr = acctmgr + + self._regh = None + self.pending = {} + self._nwdatastore = nwdatastore + + @property + def regh(self): + """ The registration handle associated with this Handler""" + return self._regh + + + @asyncio.coroutine + def register(self): + """ Register for the Static Topology path """ + + @asyncio.coroutine + def prepare_nw_cfg(dts, acg, xact, xact_info, ksp, msg, scratch): + """Prepare for application configuration. Stash the pending + configuration object for subsequent transaction phases""" + self._log.debug("Prepare Network config received network id %s, msg %s", + msg.network_id, msg) + self.pending[xact.id] = msg + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + + def apply_nw_config(dts, acg, xact, action, scratch): + """Apply the pending configuration object""" + if action == rwdts.AppconfAction.INSTALL and xact.id is None: + self._log.debug("No xact handle. Skipping apply config") + return + + if xact.id not in self.pending: + raise KeyError("No stashed configuration found with transaction id [{}]".format(xact.id)) + + try: + if action == rwdts.AppconfAction.INSTALL: + self._nwdatastore.create_network(self.pending[xact.id].network_id, self.pending[xact.id]) + elif action == rwdts.AppconfAction.RECONCILE: + self._nwdatastore.update_network(self.pending[xact.id].network_id, self.pending[xact.id]) + except: + raise + + self._log.debug("Create network config done") + return RwTypes.RwStatus.SUCCESS + + self._log.debug("Registering for static topology using xpath %s", NwtopStaticDtsHandler.STATIC_XPATH) + handler=rift.tasklets.AppConfGroup.Handler( + on_apply=apply_nw_config) + + with self._dts.appconf_group_create(handler=handler) as acg: + acg.register(xpath = NwtopStaticDtsHandler.STATIC_XPATH, + flags = rwdts.Flag.SUBSCRIBER, + on_prepare=prepare_nw_cfg) + +