Merge branch 'v2.0'
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
index d683a60..7572fe9 100755 (executable)
@@ -35,7 +35,7 @@ from enum import Enum
 
 import gi
 gi.require_version('RwYang', '1.0')
-gi.require_version('RwNsdYang', '1.0')
+gi.require_version('NsdYang', '1.0')
 gi.require_version('RwDts', '1.0')
 gi.require_version('RwNsmYang', '1.0')
 gi.require_version('RwNsrYang', '1.0')
@@ -51,7 +51,7 @@ from gi.repository import (
     VnfrYang,
     RwVnfrYang,
     RwNsmYang,
-    RwsdnYang,
+    RwsdnalYang,
     RwDts as rwdts,
     RwTypes,
     ProtobufC,
@@ -383,7 +383,7 @@ class VnffgRecord(object):
                     vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
                     self._log.debug("Received VNFR is %s", vnfr)
 
-                sff =  RwsdnYang.VNFFGSff()
+                sff =  RwsdnalYang.VNFFGSff()
                 sff_list[nsr_vnfr.vnfd.id] = sff
                 sff.name = nsr_vnfr.name
                 sff.function_type = nsr_vnfr.vnfd.service_function_chain
@@ -957,7 +957,6 @@ class VirtualNetworkFunctionRecord(object):
         vnfr_dict.update(vnfd_copy_dict)
 
         vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
-
         vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(),
                                                                           ignore_missing_keys=True)
         vnfr.member_vnf_index_ref = self.member_vnf_index
@@ -1073,7 +1072,9 @@ class VirtualNetworkFunctionRecord(object):
             cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
             cpr.name = conn_p.name
             cpr.type_yang = conn_p.type_yang
-            cpr.port_security_enabled = conn_p.port_security_enabled
+            if conn_p.has_field('port_security_enabled'):
+              cpr.port_security_enabled = conn_p.port_security_enabled
+
             vlr_ref = find_vlr_for_cp(conn_p)
             if vlr_ref is None:
                 msg = "Failed to find VLR for cp = %s" % conn_p.name
@@ -1295,6 +1296,7 @@ class NetworkServiceRecord(object):
             self._vnf_phase_completed = True
 
         self._op_status.set_state(state)
+        self._nsm_plugin.set_state(self.id, state)
 
     @property
     def id(self):
@@ -2009,7 +2011,7 @@ class NetworkServiceRecord(object):
                    (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index):
                     group_info = self.resolve_placement_group_cloud_construct(group)
                     if group_info is None:
-                        self._log.error("Could not resolve cloud-construct for placement group: %s", group.name)
+                        self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
                         ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
                     else:
                         self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
@@ -2024,10 +2026,11 @@ class NetworkServiceRecord(object):
         # Fetch the VNFD associated with this VNF
         placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
         self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name)
-        self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
+        self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s, restart mode self.restart_mode %s",
                        vnfd_msg.name,
                        const_vnfd.member_vnf_index,
-                       [ group.name for group in placement_groups])
+                       [ group.name for group in placement_groups],
+                       self.restart_mode)
         vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
                                             self._log,
                                             self._loop,
@@ -2819,9 +2822,14 @@ class NsdDtsHandler(object):
                 # Delete an NSD record
                 self._log.debug("Deleting NSD with id %s", msg.id)
                 if self._nsm.nsd_in_use(msg.id):
-                    self._log.debug("Cannot delete NSD in use - %s", msg.id)
-                    err = "Cannot delete an NSD in use - %s" % msg.id
-                    raise NetworkServiceDescriptorRefCountExists(err)
+                    errmsg = "Cannot delete an NSD in use - %s" % msg.id
+                    self._log.error(errmsg)
+                    xpath = ks_path.to_xpath(NsdYang.get_schema())
+                    xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+                                               xpath,
+                                               errmsg)
+                    xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+                    return
 
                 yield from delete_nsd_libs(msg.id)
                 self._nsm.delete_nsd(msg.id)
@@ -3007,9 +3015,18 @@ class NsrRpcDtsHandler(object):
                     "nsr_id":str(uuid.uuid4())
                 })
 
-            if not ('name' in rpc_ip and  'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
-                self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip))
-
+            if not ('name' in rpc_ip and  'nsd_ref' in rpc_ip and
+                    ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
+                errmsg = (
+                    "Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".
+                    format(rpc_ip))
+                self._log.error(errmsg)
+                xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+                                           NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
+                                           errmsg)
+                xact_info.respond_xpath(rwdts.XactRspCode.NACK,
+                                        NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
+                return
 
             self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
 
@@ -3019,9 +3036,6 @@ class NsrRpcDtsHandler(object):
 
                 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
 
-                #if not self._manager:
-                #    self._manager = yield from self._connect()
-
                 self._log.debug("Configuring ns-instance-config with name  %s nsd-ref: %s",
                         rpc_ip.name, rpc_ip.nsd_ref)
 
@@ -3035,18 +3049,10 @@ class NsrRpcDtsHandler(object):
                 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
 
                 payload_dict = ns_instance_config.to_json(self._model)
-                #xml = ns_instance_config.to_xml_v2(self._model)
-                #netconf_xml = self.wrap_netconf_config_xml(xml)
 
-                #self._log.debug("Sending configure ns-instance-config xml to %s: %s",
-                #        netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
                 self._log.debug("Sending configure ns-instance-config json to %s: %s",
                         self._nsr_config_url,ns_instance_config)
 
-                #response = yield from self._manager.edit_config(
-                #           target="running",
-                #           config=netconf_xml,
-                #           )
                 response = yield from self._loop.run_in_executor(
                     None,
                     self._apply_ns_instance_config,
@@ -3059,13 +3065,15 @@ class NsrRpcDtsHandler(object):
                                         NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
                                         rpc_op)
             except Exception as e:
-                self._log.error("Exception processing the "
-                                "start-network-service: {}".format(e))
-                self._log.exception(e)
+                errmsg = ("Exception processing the "
+                          "start-network-service: {}".format(e))
+                self._log.exception(errmsg)
+                xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+                                           NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
+                                           errmsg)
                 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
                                         NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
 
-
         hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
 
         with self._dts.group_create() as group:
@@ -3358,43 +3366,76 @@ class NsrDtsHandler(object):
             fref = ProtobufC.FieldReference.alloc()
             fref.goto_whole_message(msg.to_pbcm())
 
+            def send_err_msg(err_msg):
+                self._log.error(errmsg)
+                xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
+                                           xpath,
+                                           errmsg)
+                xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+
+
             if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
                 # if this is an NSR create
                 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
                     # Ensure the Cloud account/datacenter has been specified
                     if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"):
-                        raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
+                        errmsg = ("Cloud account or datacenter not specified in NS {}".
+                                  format(msg.name))
+                        send_err_msg(errmsg)
+                        return
 
                     # Check if nsd is specified
                     if not msg.has_field("nsd"):
-                        raise NsrInstantiationFailed("NSD not specified in NSR")
+                        errmsg = ("NSD not specified in NS {}".
+                                  format(msg.name))
+                        send_err_msg(errmsg)
+                        return
 
                 else:
                     nsr = self._nsm.nsrs[msg.id]
 
                     if msg.has_field("nsd"):
                         if nsr.state != NetworkServiceRecordState.RUNNING:
-                            raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
+                            errmsg = ("Unable to update VL when NS {} not in running state".
+                                      format(msg.name))
+                            send_err_msg(errmsg)
+                            return
+
                         if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
-                            raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
+                            errmsg = ("NS config {} NSD should have atleast 1 VLD".
+                                      format(msg.name))
+                            send_err_msg(errmsg)
+                            return
 
                     if msg.has_field("scaling_group"):
                         if nsr.state != NetworkServiceRecordState.RUNNING:
-                            raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
+                            errmsg = ("Unable to perform scaling action when NS {} not in running state".
+                                      format(msg.name))
+                            send_err_msg(errmsg)
+                            return
 
                         if len(msg.scaling_group) > 1:
-                            raise ScalingOperationError("Only a single scaling group can be configured at a time")
+                            errmsg = ("Only a single scaling group can be configured at a time for NS {}".
+                                      format(msg.name))
+                            send_err_msg(errmsg)
+                            return
 
                         for group_msg in msg.scaling_group:
                             num_new_group_instances = len(group_msg.instance)
                             if num_new_group_instances > 1:
-                                raise ScalingOperationError("Only a single scaling instance can be modified at a time")
+                                errmsg = ("Only a single scaling instance can be modified at a time for NS {}".
+                                          format(msg.name))
+                                send_err_msg(errmsg)
+                                return
 
                             elif num_new_group_instances == 1:
                                 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
                                 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
                                     if len(scale_group.instances) == scale_group.max_instance_count:
-                                        raise ScalingOperationError("Max instances for %s reached" % scale_group)
+                                        errmsg = (" Max instances for {} reached for NS {}".
+                                                  format(str(scale_group), msg.name))
+                                        send_err_msg(errmsg)
+                                        return
 
             acg.handle.prepare_complete_ok(xact_info.handle)
 
@@ -3616,13 +3657,18 @@ class NsManager(object):
         self._cloud_account_handler = cloud_account_handler
 
         self._ro_plugin_selector = ro_plugin_selector
-        self._ncclient = rift.mano.ncclient.NcClient(
-              host="127.0.0.1",
-              port=2022,
-              username="admin",
-              password="admin",
-              loop=self._loop)
 
+        # Intialize the set of variables for implementing Scaling RPC using REST.
+        self._headers = {"content-type":"application/json", "accept":"application/json"}
+        #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed.
+        self._user = 'admin'
+        self._password = 'admin'
+        self._ip = 'localhost'
+        self._rport = 8008
+        self._conf_url = "https://{ip}:{port}/api/config". \
+                       format(ip=self._ip,
+                              port=self._rport)
+        
         self._nsrs = {}
         self._nsds = {}
         self._vnfds = {}
@@ -3754,66 +3800,82 @@ class NsManager(object):
             msg : RPC input
             action : Scaling Action
         """
-        ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
-        ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
-
-        xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
-                          msg.nsr_id_ref)
-        instance = ScalingGroupInstance.from_dict({"id": msg.instance_id})
-
-        @asyncio.coroutine
-        def get_nsr_scaling_group():
-            results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
-
-            for result in results:
-                res = yield from result
-                nsr_config = res.result
-
-            for scaling_group in nsr_config.scaling_group:
-                if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref:
-                    break
-            else:
-                scaling_group = nsr_config.scaling_group.add()
-                scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref
-
-            return (nsr_config, scaling_group)
-
-        @asyncio.coroutine
-        def update_config(nsr_config):
-            xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config)
-            xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
-            yield from self._ncclient.connect()
-            yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace")
-
-        @asyncio.coroutine
+        def get_scaling_group_information():
+            scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
+            output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False)
+            if output.text == None or len(output.text) == 0:
+                self.log.error("nsr id %s information not present", self._nsr_id)
+                return None
+            scaling_group_info = json.loads(output.text)
+            return scaling_group_info
+        
+        def config_scaling_group_information(scaling_group_info):
+            data_str = json.dumps(scaling_group_info)
+            self.log.debug("scaling group Info %s", data_str)
+
+            scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
+            response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers)
+            response.raise_for_status()
+            
         def scale_out():
-            nsr_config, scaling_group = yield from get_nsr_scaling_group()
-            scaling_group.instance.append(instance)
-            yield from update_config(nsr_config)
+            scaling_group_info = get_scaling_group_information()
+            if scaling_group_info is None:
+                return
+                    
+            scaling_group_present = False
+            if "scaling-group" in scaling_group_info["nsr:nsr"]:
+                scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
+                for scaling_group in scaling_group_array:
+                    if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
+                        scaling_group_present = True
+                        if 'instance' not in scaling_group:
+                            scaling_group['instance'] = []
+                        for instance in scaling_group['instance']:
+                            if instance["id"] == int(msg.instance_id):
+                                self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id)
+                                return 
+                        scaling_group["instance"].append({"id": int(msg.instance_id)})       
+            
+            if not scaling_group_present:
+                scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}]
+                
+            config_scaling_group_information(scaling_group_info)
+            return
 
-        @asyncio.coroutine
         def scale_in():
-            nsr_config, scaling_group = yield from get_nsr_scaling_group()
-            scaling_group.instance.remove(instance)
-            yield from update_config(nsr_config)
+            scaling_group_info = get_scaling_group_information()
+            if scaling_group_info is None:
+                return
+            
+            scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
+            scaling_group_present = False
+            instance_id_present = False
+            for scaling_group in scaling_group_array:
+                if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
+                    scaling_group_present = True
+                    if 'instance' in scaling_group:
+                        instance_array = scaling_group["instance"];
+                        for index in range(len(instance_array)):       
+                            if instance_array[index]["id"] == int(msg.instance_id):
+                                instance_array.pop(index)
+                                instance_id_present = True
+                                break
+    
+            if not scaling_group_present:
+                self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref)
+                return
+            
+            if not instance_id_present:
+                self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id)
+                return
+            
+            config_scaling_group_information(scaling_group_info)
+            return
 
         if action == ScalingRpcHandler.ACTION.SCALE_OUT:
-            self._loop.create_task(scale_out())
+            self._loop.run_in_executor(None, scale_out)
         else:
-            self._loop.create_task(scale_in())
-
-        # Opdata based calls, disabled for now!
-        # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
-        #     self.scale_nsr_out(
-        #           msg.nsr_id_ref,
-        #           msg.scaling_group_name_ref,
-        #           msg.instance_id,
-        #           xact)
-        # else:
-        #     self.scale_nsr_in(
-        #           msg.nsr_id_ref,
-        #           msg.scaling_group_name_ref,
-        #           msg.instance_id)
+            self._loop.run_in_executor(None, scale_in)
 
     def nsr_update_cfg(self, nsr_id, msg):
         nsr = self._nsrs[nsr_id]
@@ -3844,9 +3906,10 @@ class NsManager(object):
             self._log.error(msg)
             raise NetworkServiceRecordError(msg)
 
-        self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
+        self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s, restart mode %s",
                        nsr_msg.id,
-                       nsr_msg.nsd.id)
+                       nsr_msg.nsd.id,
+                       restart_mode)
 
         nsm_plugin = self._ro_plugin_selector.ro_plugin
         sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account)
@@ -4211,9 +4274,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler):
             assert action == rwdts.QueryAction.RPC
 
             try:
-                if self.callback:
-                    self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
-
                 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
                       "instance_id": msg.instance_id})
 
@@ -4222,6 +4282,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler):
                     self.__class__.SCALE_IN_OUTPUT_XPATH,
                     rpc_op)
 
+                if self.callback:
+                    self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
             except Exception as e:
                 self.log.exception(e)
                 xact_info.respond_xpath(
@@ -4239,9 +4301,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler):
                     msg.instance_id  = last_instance_id + 1
                     self.last_instance_id[scale_group] += 1
 
-                if self.callback:
-                    self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
-
                 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
                       "instance_id": msg.instance_id})
 
@@ -4250,6 +4309,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler):
                     self.__class__.SCALE_OUT_OUTPUT_XPATH,
                     rpc_op)
 
+                if self.callback:
+                    self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
             except Exception as e:
                 self.log.exception(e)
                 xact_info.respond_xpath(