RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwvnffgmgr.py
diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwvnffgmgr.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwvnffgmgr.py
new file mode 100755 (executable)
index 0000000..0ebe9df
--- /dev/null
@@ -0,0 +1,422 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+
+from gi.repository import (
+    RwDts as rwdts,
+    RwsdnYang,
+    RwTypes,
+    ProtobufC,
+)
+
+from gi.repository.RwTypes import RwStatus
+import rw_peas
+import rift.tasklets
+
+class SdnGetPluginError(Exception):
+    """ Error while fetching SDN plugin """
+    pass
+
+
+class SdnGetInterfaceError(Exception):
+    """ Error while fetching SDN interface"""
+    pass
+
+
+class SdnAccountError(Exception):
+    """ Error while creating/deleting/updating SDN Account"""
+    pass
+
+class VnffgrDoesNotExist(Exception):
+    """ Error while fetching SDN interface"""
+    pass
+
+class VnffgrAlreadyExist(Exception):
+    """ Vnffgr already exists Error"""
+    pass
+
+class VnffgrCreationFailed(Exception):
+    """ Error while creating VNFFGR"""
+    pass
+
+
+class VnffgrUpdateFailed(Exception):
+    """ Error while updating VNFFGR"""
+    pass
+
+class VnffgMgr(object):
+    """ Implements the interface to backend plugins to fetch topology """
+    def __init__(self, dts, log, log_hdl, loop):
+        self._account = {}
+        self._dts = dts
+        self._log = log
+        self._log_hdl = log_hdl
+        self._loop = loop
+        self._sdn = {}
+        self._sdn_handler = SDNAccountDtsHandler(self._dts,self._log,self)
+        self._vnffgr_list = {}
+
+    @asyncio.coroutine
+    def register(self):
+        yield from self._sdn_handler.register()
+
+    def set_sdn_account(self,account):
+        if (account.name in self._account):
+            self._log.error("SDN Account is already set")
+        else:
+            sdn_account           = RwsdnYang.SDNAccount()
+            sdn_account.from_dict(account.as_dict())
+            sdn_account.name = account.name
+            self._account[account.name] = sdn_account
+            self._log.debug("Account set is %s , %s",type(self._account), self._account)
+
+    def del_sdn_account(self, name):
+        self._log.debug("Account deleted is %s , %s", type(self._account), name)
+        del self._account[name]
+
+    def update_sdn_account(self,account):
+        self._log.debug("Account updated is %s , %s", type(self._account), account)
+        if account.name in self._account:
+            sdn_account = self._account[account.name]
+
+            sdn_account.from_dict(
+                account.as_dict(),
+                ignore_missing_keys=True,
+                )
+            self._account[account.name] = sdn_account
+
+    def get_sdn_account(self, name):
+        """
+        Creates an object for class RwsdnYang.SdnAccount()
+        """
+        if (name in self._account):
+            return self._account[name]
+        else:
+            self._log.error("SDN account is not configured")
+
+
+    def get_sdn_plugin(self,name):
+        """
+        Loads rw.sdn plugin via libpeas
+        """
+        if (name in self._sdn):
+            return self._sdn[name]
+        account = self.get_sdn_account(name)
+        plugin_name = getattr(account, account.account_type).plugin_name
+        self._log.debug("SDN plugin being created")
+        plugin = rw_peas.PeasPlugin(plugin_name, 'RwSdn-1.0')
+        engine, info, extension = plugin()
+
+        self._sdn[name] = plugin.get_interface("Topology")
+        try:
+            rc = self._sdn[name].init(self._log_hdl)
+            assert rc == RwStatus.SUCCESS
+        except:
+            self._log.error("ERROR:SDN plugin instantiation failed ")
+        else:
+            self._log.debug("SDN plugin successfully instantiated")
+        return self._sdn[name]
+
+    def fetch_vnffgr(self,vnffgr_id):
+        if vnffgr_id not in self._vnffgr_list:
+            self._log.error("VNFFGR with id %s not present in VNFFGMgr", vnffgr_id)
+            msg = "VNFFGR with id {} not present in VNFFGMgr".format(vnffgr_id)
+            raise VnffgrDoesNotExist(msg)
+        self.update_vnffgrs(self._vnffgr_list[vnffgr_id].sdn_account)
+        vnffgr = self._vnffgr_list[vnffgr_id].deep_copy()
+        self._log.debug("VNFFGR for id %s is %s",vnffgr_id,vnffgr)
+        return vnffgr
+
+    def create_vnffgr(self, vnffgr,classifier_list,sff_list):
+        """
+        """
+        self._log.debug("Received VNFFG chain Create msg %s",vnffgr)
+        if vnffgr.id in self._vnffgr_list:
+            self._log.error("VNFFGR with id %s already present in VNFFGMgr", vnffgr.id)
+            vnffgr.operational_status = 'failed'
+            msg = "VNFFGR with id {} already present in VNFFGMgr".format(vnffgr.id)
+            raise VnffgrAlreadyExist(msg)
+
+        self._vnffgr_list[vnffgr.id] = vnffgr
+        vnffgr.operational_status = 'init'
+        if len(self._account) == 0:
+            self._log.error("SDN Account not configured")
+            vnffgr.operational_status = 'failed'
+            return
+        if vnffgr.sdn_account:
+            sdn_acct_name = vnffgr.sdn_account
+        else:
+            self._log.error("SDN Account is not associated to create VNFFGR")
+            # TODO Fail the VNFFGR creation if SDN account is not associated
+            #vnffgr.operational_status = 'failed'
+            #msg = "SDN Account is not associated to create VNFFGR"
+            #raise VnffgrCreationFailed(msg)
+            sdn_account = [sdn_account.name for _,sdn_account in self._account.items()]
+            sdn_acct_name = sdn_account[0]
+            vnffgr.sdn_account = sdn_acct_name
+        sdn_plugin = self.get_sdn_plugin(sdn_acct_name)
+
+        for rsp in vnffgr.rsp:
+            vnffg = RwsdnYang.VNFFGChain()
+            vnffg.name = rsp.name
+            vnffg.classifier_name = rsp.classifier_name
+
+            vnfr_list = list()
+            for index,cp_ref in enumerate(rsp.vnfr_connection_point_ref):
+                cpath = vnffg.vnf_chain_path.add()
+                cpath.order=cp_ref.hop_number
+                cpath.service_function_type = cp_ref.service_function_type
+                cpath.nsh_aware=True
+                cpath.transport_type = 'vxlan-gpe'
+
+                vnfr=cpath.vnfr_ids.add()
+                vnfr.vnfr_id = cp_ref.vnfr_id_ref
+                vnfr.vnfr_name = cp_ref.vnfr_name_ref
+                vnfr.mgmt_address = cp_ref.connection_point_params.mgmt_address
+                vnfr.mgmt_port = 5000
+                vnfr_list.append(vnfr)
+            
+                vdu = vnfr.vdu_list.add()
+                vdu.name = cp_ref.connection_point_params.name
+                vdu.port_id = cp_ref.connection_point_params.port_id
+                vdu.vm_id = cp_ref.connection_point_params.vm_id
+                vdu.address = cp_ref.connection_point_params.address
+                vdu.port =  cp_ref.connection_point_params.port
+
+            for sff in sff_list.values():
+                _sff = vnffg.sff.add()
+                _sff.from_dict(sff.as_dict())
+                if sff.function_type == 'SFF':
+                    for vnfr in vnfr_list:
+                        vnfr.sff_name = sff.name
+                self._log.debug("Recevied SFF %s, Created SFF is %s",sff, _sff)
+
+            self._log.debug("VNFFG chain msg is %s",vnffg)
+            rc,rs = sdn_plugin.create_vnffg_chain(self._account[sdn_acct_name],vnffg)
+            if rc != RwTypes.RwStatus.SUCCESS:
+                vnffgr.operational_status = 'failed'
+                msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id)
+                raise VnffgrCreationFailed(msg)
+
+            self._log.info("VNFFG chain created successfully for rsp with id %s",rsp.id)
+
+
+        meta = {}
+        if(len(classifier_list) == 2):
+            meta[vnffgr.classifier[0].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[1].ip_address.split('.'))
+            meta[vnffgr.classifier[1].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[0].ip_address.split('.'))
+            
+        self._log.debug("VNFFG Meta VNFFG chain is {}".format(meta))
+        
+        for classifier in classifier_list:
+            vnffgr_cl = [_classifier  for _classifier in vnffgr.classifier if classifier.id == _classifier.id]
+            if len(vnffgr_cl) > 0:
+                cl_rsp_name = vnffgr_cl[0].rsp_name
+            else:
+                self._log.error("No RSP wiht name %s found; Skipping classifier %s creation",classifier.rsp_id_ref,classifier.name)
+                continue
+            vnffgcl = RwsdnYang.VNFFGClassifier()
+            vnffgcl.name = classifier.name
+            vnffgcl.rsp_name = cl_rsp_name
+            vnffgcl.port_id = vnffgr_cl[0].port_id
+            vnffgcl.vm_id = vnffgr_cl[0].vm_id
+            # Get the symmetric classifier endpoint ip and set it in nsh ctx1
+            
+            vnffgcl.vnffg_metadata.ctx1 =  meta.get(vnffgr_cl[0].id,'0') 
+            vnffgcl.vnffg_metadata.ctx2 = '0'
+            vnffgcl.vnffg_metadata.ctx3 = '0'
+            vnffgcl.vnffg_metadata.ctx4 = '0'
+            if vnffgr_cl[0].has_field('sff_name'):
+                vnffgcl.sff_name = vnffgr_cl[0].sff_name
+            for index,match_rule in enumerate(classifier.match_attributes):
+                acl = vnffgcl.match_attributes.add()
+                #acl.name = vnffgcl.name + str(index)
+                acl.name = match_rule.id
+                acl.ip_proto  = match_rule.ip_proto
+                acl.source_ip_address = match_rule.source_ip_address + '/32'
+                acl.source_port = match_rule.source_port
+                acl.destination_ip_address = match_rule.destination_ip_address + '/32'
+                acl.destination_port = match_rule.destination_port
+
+            self._log.debug(" Creating VNFFG Classifier Classifier %s for RSP: %s",vnffgcl.name,vnffgcl.rsp_name)
+            rc,rs = sdn_plugin.create_vnffg_classifier(self._account[sdn_acct_name],vnffgcl)
+            if rc != RwTypes.RwStatus.SUCCESS:
+                self._log.error("VNFFG Classifier cretaion failed for Classifier %s for RSP ID: %s",classifier.name,classifier.rsp_id_ref)
+                #vnffgr.operational_status = 'failed'
+                #msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id)
+                #raise VnffgrCreationFailed(msg)
+
+        vnffgr.operational_status = 'running'
+        self.update_vnffgrs(vnffgr.sdn_account)
+        return vnffgr
+
+    def update_vnffgrs(self,sdn_acct_name):
+        """
+        Update VNFFGR by reading data from SDN Plugin
+        """
+        sdn_plugin = self.get_sdn_plugin(sdn_acct_name)
+        rc,rs = sdn_plugin.get_vnffg_rendered_paths(self._account[sdn_acct_name])
+        if rc != RwTypes.RwStatus.SUCCESS:
+            msg = "Reading of VNFFGR from SDN Plugin failed"
+            raise VnffgrUpdateFailed(msg)
+
+        vnffgr_list = [_vnffgr for _vnffgr in self._vnffgr_list.values()  if _vnffgr.sdn_account == sdn_acct_name and _vnffgr.operational_status == 'running']
+
+        for _vnffgr in vnffgr_list:
+            for _vnffgr_rsp in _vnffgr.rsp:
+                vnffg_rsp_list = [vnffg_rsp for vnffg_rsp in rs.vnffg_rendered_path if vnffg_rsp.name == _vnffgr_rsp.name]
+                if vnffg_rsp_list is not None and len(vnffg_rsp_list) > 0:
+                    vnffg_rsp = vnffg_rsp_list[0]
+                    if len(vnffg_rsp.rendered_path_hop) != len(_vnffgr_rsp.vnfr_connection_point_ref):
+                        _vnffgr.operational_status = 'failed'
+                        self._log.error("Received hop count %d doesnt match the VNFFGD hop count %d", len(vnffg_rsp.rendered_path_hop),
+                                         len(_vnffgr_rsp.vnfr_connection_point_ref))
+                        msg = "Fetching of VNFFGR with id {} failed".format(_vnffgr.id)
+                        raise VnffgrUpdateFailed(msg)
+                    _vnffgr_rsp.path_id =  vnffg_rsp.path_id
+                    for index, rendered_hop in enumerate(vnffg_rsp.rendered_path_hop):
+                        for  vnfr_cp_ref in _vnffgr_rsp.vnfr_connection_point_ref:
+                            if rendered_hop.vnfr_name == vnfr_cp_ref.vnfr_name_ref:
+                               vnfr_cp_ref.hop_number = rendered_hop.hop_number
+                               vnfr_cp_ref.service_index = rendered_hop.service_index
+                               vnfr_cp_ref.service_function_forwarder.name = rendered_hop.service_function_forwarder.name
+                               vnfr_cp_ref.service_function_forwarder.ip_address = rendered_hop.service_function_forwarder.ip_address
+                               vnfr_cp_ref.service_function_forwarder.port = rendered_hop.service_function_forwarder.port
+                else:
+                    _vnffgr.operational_status = 'failed'
+                    self._log.error("VNFFGR RSP with name %s in VNFFG %s not found",_vnffgr_rsp.name, _vnffgr.id)
+                    msg = "Fetching of VNFFGR with name {} failed".format(_vnffgr_rsp.name)
+                    raise VnffgrUpdateFailed(msg)
+
+
+    def terminate_vnffgr(self,vnffgr_id,sdn_account_name = None):
+        """
+        Deletet the VNFFG chain
+        """
+        if vnffgr_id not in self._vnffgr_list:
+            self._log.error("VNFFGR with id %s not present in VNFFGMgr during termination", vnffgr_id)
+            msg = "VNFFGR with id {} not present in VNFFGMgr during termination".format(vnffgr_id)
+            return
+            #raise VnffgrDoesNotExist(msg)
+        self._log.info("Received VNFFG chain terminate for id %s",vnffgr_id)
+        if sdn_account_name is None:
+            sdn_account = [sdn_account.name for _,sdn_account in self._account.items()]
+            sdn_account_name = sdn_account[0]
+        sdn_plugin = self.get_sdn_plugin(sdn_account_name)
+        sdn_plugin.terminate_vnffg_chain(self._account[sdn_account_name],vnffgr_id)
+        sdn_plugin.terminate_vnffg_classifier(self._account[sdn_account_name],vnffgr_id)
+        del self._vnffgr_list[vnffgr_id]
+
+class SDNAccountDtsHandler(object):
+    XPATH = "C,/rw-sdn:sdn-account"
+
+    def __init__(self, dts, log, parent):
+        self._dts = dts
+        self._log = log
+        self._parent = parent
+
+        self._sdn_account = {}
+
+    def _set_sdn_account(self, account):
+        self._log.info("Setting sdn account: {}".format(account))
+        if account.name in self._sdn_account:
+            self._log.error("SDN Account with name %s already exists. Ignoring config", account.name);
+        self._sdn_account[account.name]  = account
+        self._parent.set_sdn_account(account)
+
+    def _del_sdn_account(self, account_name):
+        self._log.info("Deleting sdn account: {}".format(account_name))
+        del self._sdn_account[account_name]
+
+        self._parent.del_sdn_account(account_name)
+
+    def _update_sdn_account(self, account):
+        self._log.info("Updating sdn account: {}".format(account))
+        # No need to update locally saved sdn_account's updated fields, as they
+        # are not used anywhere. Call the parent's update callback.
+        self._parent.update_sdn_account(account)
+
+    @asyncio.coroutine
+    def register(self):
+        def apply_config(dts, acg, xact, action, _):
+            self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
+            if action == rwdts.AppconfAction.INSTALL and xact.id is None:
+                self._log.debug("No xact handle.  Skipping apply config")
+                return RwTypes.RwStatus.SUCCESS
+
+            return RwTypes.RwStatus.SUCCESS
+
+        @asyncio.coroutine
+        def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
+            """ Prepare callback from DTS for SDN Account config """
+
+            self._log.info("SDN Cloud account config received: %s", msg)
+
+            fref = ProtobufC.FieldReference.alloc()
+            fref.goto_whole_message(msg.to_pbcm())
+
+            if fref.is_field_deleted():
+                # Delete the sdn account record
+                self._del_sdn_account(msg.name)
+            else:
+                # If the account already exists, then this is an update.
+                if msg.name in self._sdn_account:
+                    self._log.debug("SDN account already exists. Invoking on_prepare update request")
+                    if msg.has_field("account_type"):
+                        errmsg = "Cannot update SDN account's account-type."
+                        self._log.error(errmsg)
+                        xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+                                                   SDNAccountDtsHandler.XPATH,
+                                                   errmsg)
+                        raise SdnAccountError(errmsg)
+
+                    # Update the sdn account record
+                    self._update_sdn_account(msg)
+                else:
+                    self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
+                    if not msg.has_field('account_type'):
+                        errmsg = "New SDN account must contain account-type field."
+                        self._log.error(errmsg)
+                        xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+                                                   SDNAccountDtsHandler.XPATH,
+                                                   errmsg)
+                        raise SdnAccountError(errmsg)
+
+                    # Set the sdn account record
+                    self._set_sdn_account(msg)
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+
+        self._log.debug("Registering for Sdn Account config using xpath: %s",
+                        SDNAccountDtsHandler.XPATH,
+                        )
+
+        acg_handler = rift.tasklets.AppConfGroup.Handler(
+                        on_apply=apply_config,
+                        )
+
+        with self._dts.appconf_group_create(acg_handler) as acg:
+            acg.register(
+                    xpath=SDNAccountDtsHandler.XPATH,
+                    flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
+                    on_prepare=on_prepare
+                    )
+
+
+