update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
index 03756d8..ea1bfec 100644 (file)
@@ -1,5 +1,5 @@
 
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,20 +42,22 @@ class RiftCMRPCHandler(object):
     GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
     GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
 
-    def __init__(self, dts, log, loop, nsm):
+    def __init__(self, dts, log, loop, project, nsm):
         self._dts = dts
         self._log = log
         self._loop = loop
+        self._project = project
         self._nsm = nsm
 
         self._ns_regh = None
         self._vnf_regh = None
         self._get_ns_conf_regh = None
 
-        self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
+        self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop,
+                                                                        project, nsm)
 
         self._rift_install_dir = os.environ['RIFT_INSTALL']
-        self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
+        self._rift_var_root_dir = os.environ['RIFT_VAR_ROOT']
 
     @property
     def reghs(self):
@@ -67,6 +69,17 @@ class RiftCMRPCHandler(object):
         """ Return the NS manager instance """
         return self._nsm
 
+    def deregister(self):
+        self._log.debug("De-register conman rpc handlers for project {}".
+                        format(self._project))
+        for reg in self.reghs:
+            if reg:
+                reg.deregister()
+                reg = None
+
+        self.job_manager.deregister()
+        self.job_manager = None
+
     def prepare_meta(self, rpc_ip):
 
         try:
@@ -105,7 +118,7 @@ class RiftCMRPCHandler(object):
         if vnf:
             self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
                             vnf.vnf_configuration)
-            for primitive in vnf.vnf_configuration.service_primitive:
+            for primitive in vnf.vnf_configuration.config_primitive:
                 if primitive.name == primitive_name:
                     return primitive
 
@@ -146,7 +159,7 @@ class RiftCMRPCHandler(object):
 
             for vnfr_id in agent_nsr.vnfr_ids:
                 vnfr = agent_vnfrs[vnfr_id]
-                self._log.debug("CA_RPC: VNFR metadata: {}".format(vnfr))
+                self._log.debug("CA-RPC: VNFR metadata: {}".format(vnfr))
 
                 # index->vnfr ref
                 vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
@@ -154,21 +167,27 @@ class RiftCMRPCHandler(object):
                 if 'mgmt_interface' in vnfr.vnfr:
                     vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
 
+                vnfr_data_dict['name'] = vnfr.vnfr['name']
                 vnfr_data_dict['connection_point'] = []
                 if 'connection_point' in vnfr.vnfr:
                     for cp in vnfr.vnfr['connection_point']:
-                        cp_dict = dict()
-                        cp_dict['name'] = cp['name']
-                        cp_dict['ip_address'] = cp['ip_address']
+                        cp_dict = dict(name = cp['name'],
+                                       ip_address = cp['ip_address'],
+                                       connection_point_id = cp['connection_point_id'])
+                        if 'virtual_cps' in cp:
+                            cp_info['virtual_cps'] = [ {k:v for k,v in vcp.items()
+                                                        if k in ['ip_address', 'mac_address']}
+                                                       for vcp in cp['virtual_cps'] ]
+
                         vnfr_data_dict['connection_point'].append(cp_dict)
 
                 try:
                     vnfr_data_dict['vdur'] = []
-                    vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'])
+                    vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'], vdu['vdu_id_ref'])
                                 for vdu in vnfr.vnfr['vdur']]
 
                     for data in vdu_data:
-                        data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data))
+                        data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data))
                         vnfr_data_dict['vdur'].append(data)
 
                     vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
@@ -189,8 +208,12 @@ class RiftCMRPCHandler(object):
                     for primitive in vnfr.vnf_configuration['initial_config_primitive']:
                         if 'parameter' in primitive:
                             for parameter in primitive['parameter']:
-                               value = xlate(parameter['value'], vnfr.tags)
-                               param_data[parameter.name] = value
+                                try:
+                                    value = xlate(parameter['value'], vnfr.tags)
+                                    param_data[parameter['name']] = value
+                                except KeyError as e:
+                                    self._log.warn("Unable to parse the parameter{}:  {}".
+                                                   format(parameter))
 
                 initial_params[vnfr_id] = param_data
 
@@ -207,7 +230,7 @@ class RiftCMRPCHandler(object):
                     return config_plugin.agent_data
             return ret
 
-        unit_names, init_data, vnfr_index_map, vnf_data_map = get_meta(agent_nsr, agent_vnfrs)
+        unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs)
 
         # The data consists of 4 sections
         # 1. Account data
@@ -227,7 +250,7 @@ class RiftCMRPCHandler(object):
             tmp_file.write(yaml.dump(data, default_flow_style=True)
                     .encode("UTF-8"))
 
-        self._log.debug("CA_RPC: Creating a temp file {} with input data: {}".
+        self._log.debug("CA-RPC: Creating a temp file {} with input data: {}".
                         format(tmp_file.name, data))
 
         # Get the full path to the script
@@ -236,22 +259,20 @@ class RiftCMRPCHandler(object):
             # The script has full path, use as is
             script = rpc_ip.user_defined_script
         else:
-            script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
+            script = os.path.join(self._rift_var_root_dir,
+                                  'launchpad/packages/nsd',
+                                  self._project.name,
+                                  agent_nsr.nsd_id, 'scripts',
                                   rpc_ip.user_defined_script)
-            self.log.debug("CA_RPC: Checking for script in %s", script)
-            if not os.path.exists(script):
-                script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
+            self._log.debug("CA-RPC: Checking for script in %s", script)
 
-        cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
-        self._log.debug("CA_RPC: Running the CMD: {}".format(cmd))
+        cmd = "{} {}".format(script, tmp_file.name)
+        self._log.debug("CA-RPC: Running the CMD: {}".format(cmd))
 
-        coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
-                                               stderr=asyncio.subprocess.PIPE)
-        process = yield from coro
-        err = yield from process.stderr.read()
-        task = self._loop.create_task(process.wait())
+        process = yield from asyncio.create_subprocess_shell(
+            cmd)
 
-        return task, err
+        return process
 
     @asyncio.coroutine
     def register(self):
@@ -262,6 +283,10 @@ class RiftCMRPCHandler(object):
         def on_ns_config_prepare(xact_info, action, ks_path, msg):
             """ prepare callback from dts exec-ns-service-primitive"""
             assert action == rwdts.QueryAction.RPC
+
+            if not self._project.rpc_check(msg, xact_info):
+                return
+
             rpc_ip = msg
             rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
                     "triggered_by": rpc_ip.triggered_by,
@@ -311,16 +336,12 @@ class RiftCMRPCHandler(object):
                 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
                     rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
 
-                    tasks = []
-                    task, err = yield from self._apply_ns_config(
+                    task = yield from self._apply_ns_config(
                         nsr,
                         vnfrs,
                         rpc_ip)
-                    tasks.append(task)
-                    if err:
-                        rpc_op.job_status_details = err.decode()
 
-                    self.job_manager.add_job(rpc_op, tasks)
+                    self.job_manager.add_job(rpc_op, [task])
                 else:
                     # Otherwise create VNF primitives.
                     for vnf in rpc_ip.vnf_list:
@@ -337,7 +358,7 @@ class RiftCMRPCHandler(object):
                             idx += 1
                             op_primitive.name = primitive.name
                             op_primitive.execution_id = ''
-                            op_primitive.execution_status = 'completed'
+                            op_primitive.execution_status = 'pending'
                             op_primitive.execution_error_details = ''
 
                             # Copy over the VNF pimitive's input parameters
@@ -365,6 +386,7 @@ class RiftCMRPCHandler(object):
                                 nsr_param_pool.add_used_value(param.value)
 
                             for config_plugin in self.nsm.config_agent_plugins:
+                                # TODO: Execute these in separate threads to prevent blocking
                                 yield from config_plugin.vnf_config_primitive(nsr_id,
                                                                               vnfr_id,
                                                                               primitive,
@@ -391,6 +413,10 @@ class RiftCMRPCHandler(object):
         @asyncio.coroutine
         def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
             assert action == rwdts.QueryAction.RPC
+
+            if not self._project.rpc_check(msg, xact_info):
+                return
+
             nsr_id = msg.nsr_id_ref
             cfg_prim_name = msg.name
             try:
@@ -499,5 +525,3 @@ class RiftCMRPCHandler(object):
                                                     handler=hdl_ns_get,
                                                     flags=rwdts.Flag.PUBLISHER,
                                                     )
-
-