Full Juju Charm support
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
index 729a1f1..94e9976 100644 (file)
@@ -1,4 +1,4 @@
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,6 +17,9 @@
 import asyncio
 import concurrent.futures
 import time
+import gi
+
+gi.require_version('RwNsrYang', '1.0')
 
 from gi.repository import (
     NsrYang,
@@ -25,15 +28,17 @@ from gi.repository import (
     RwNsrYang,
     RwConfigAgentYang,
     RwDts as rwdts)
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
 
 import rift.tasklets
-
 import rift.mano.utils.juju_api as juju
 
 
 class ConfigAgentAccountNotFound(Exception):
     pass
 
+
 class JujuClient(object):
     def __init__(self, log, ip, port, user, passwd):
         self._log = log
@@ -46,29 +51,43 @@ class JujuClient(object):
                                  server=ip, port=port,
                                  user=user, secret=passwd)
 
-
     def validate_account_creds(self):
-        status = RwcalYang.CloudConnectionStatus()
+        """Validate the account credentials.
+
+        Verifies if the account credentials can connect and login to a Juju
+        controller at the provided IP address.
+        """
+        status = "unknown"
+        details = "Connection status not known."
+
+        loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
         try:
-            env = self._api._get_env()
-        except juju.JujuEnvError as e:
-            msg = "JujuClient: Invalid account credentials: %s", str(e)
-            self._log.error(msg)
-            raise Exception(msg)
-        except ConnectionRefusedError as e:
-            msg = "JujuClient: Wrong IP or Port: %s", str(e)
-            self._log.error(msg)
-            raise Exception(msg)
+            loop.run_until_complete(asyncio.gather(
+                self._api.logout(),
+                self._api.login(),
+                loop=loop,
+            ))
         except Exception as e:
+            loop.close()
+
             msg = "JujuClient: Connection Failed: %s", str(e)
             self._log.error(msg)
+            status = "failure"
+            details = msg
             raise Exception(msg)
         else:
-            status.status = "success"
-            status.details = "Connection was successful"
+            self._log.error("Success reached.")
+            status = "success"
+            details = "Connection was successful"
             self._log.info("JujuClient: Connection Successful")
+        finally:
+            loop.close()
 
-        return status
+        return RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
+            status=status,
+            details=details,
+        )
 
 
 class ConfigAgentAccount(object):
@@ -86,7 +105,7 @@ class ConfigAgentAccount(object):
         else:
             self._cfg_agent_client_plugin = None
 
-        self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+        self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
                 status="unknown",
                 details="Connection status lookup not started"
                 )
@@ -117,13 +136,13 @@ class ConfigAgentAccount(object):
     def validate_cfg_agent_account_credentials(self, loop):
         self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
 
-        self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+        self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
                 status="validating",
                 details="Config Agent account connection validation in progress"
                 )
 
         if self._cfg_agent_client_plugin is None:
-            self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+            self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
                     status="unknown",
                     details="Config Agent account does not support validation of account creds"
                     )
@@ -131,11 +150,11 @@ class ConfigAgentAccount(object):
             try:
                 status = yield from loop.run_in_executor(
                     None,
-                    self._cfg_agent_client_plugin.validate_account_creds
+                    self._cfg_agent_client_plugin.validate_account_creds,
                     )
-                self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus.from_dict(status.as_dict())
+                self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus.from_dict(status.as_dict())
             except Exception as e:
-                self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+                self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
                     status="failure",
                     details="Error - " + str(e)
                     )
@@ -153,12 +172,15 @@ class ConfigAgentAccount(object):
                 )
 
 class CfgAgentDtsOperdataHandler(object):
-    def __init__(self, dts, log, loop):
+    def __init__(self, dts, log, loop, project):
         self._dts = dts
         self._log = log
         self._loop = loop
+        self._project = project
 
         self.cfg_agent_accounts = {}
+        self._show_reg = None
+        self._rpc_reg = None
 
     def add_cfg_agent_account(self, account_msg):
         account = ConfigAgentAccount(self._log, account_msg)
@@ -191,12 +213,12 @@ class CfgAgentDtsOperdataHandler(object):
     def _register_show_status(self):
         def get_xpath(cfg_agent_name=None):
             return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
-                    "[name='%s']" % cfg_agent_name if cfg_agent_name is not None else ''
+                    "[name=%s]" % quoted_key(cfg_agent_name) if cfg_agent_name is not None else ''
                     )
 
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
-            path_entry = RwConfigAgentYang.ConfigAgentAccount.schema().keyspec_to_entry(ks_path)
+            path_entry = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account.schema().keyspec_to_entry(ks_path)
             cfg_agent_account_name = path_entry.key00.name
             self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
 
@@ -205,9 +227,10 @@ class CfgAgentDtsOperdataHandler(object):
                 for account in saved_accounts:
                     connection_status = account.connection_status
                     self._log.debug("Responding to config agent connection status request: %s", connection_status)
+                    xpath = self._project.add_project(get_xpath(account.name))
                     xact_info.respond_xpath(
                             rwdts.XactRspCode.MORE,
-                            xpath=get_xpath(account.name),
+                            xpath=xpath,
                             msg=account.connection_status,
                             )
             except KeyError as e:
@@ -217,12 +240,13 @@ class CfgAgentDtsOperdataHandler(object):
 
             xact_info.respond_xpath(rwdts.XactRspCode.ACK)
 
-        yield from self._dts.register(
-                xpath=get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        xpath = self._project.add_project(get_xpath())
+        self._show_reg = yield from self._dts.register(
+            xpath=xpath,
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
     def _register_validate_rpc(self):
         def get_xpath():
@@ -234,6 +258,10 @@ class CfgAgentDtsOperdataHandler(object):
                 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
 
             cfg_agent_account_name = msg.cfg_agent_account
+
+            if not self._project.rpc_check(msg, xact_info=xact_info):
+                return
+
             try:
                 account = self.cfg_agent_accounts[cfg_agent_account_name]
             except KeyError:
@@ -243,24 +271,29 @@ class CfgAgentDtsOperdataHandler(object):
 
             xact_info.respond_xpath(rwdts.XactRspCode.ACK)
 
-        yield from self._dts.register(
-                xpath=get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare
-                    ),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        self._rpc_reg = yield from self._dts.register(
+            xpath=get_xpath(),
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare
+            ),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
     @asyncio.coroutine
     def register(self):
         yield from self._register_show_status()
         yield from self._register_validate_rpc()
 
+    def deregister(self):
+        self._show_reg.deregister()
+        self._rpc_reg.deregister()
+
+
 class ConfigAgentJob(object):
     """A wrapper over the config agent job object, providing some
     convenience functions.
 
-    YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
+    YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
     ||
      ==> VNFRS
           ||
@@ -274,17 +307,19 @@ class ConfigAgentJob(object):
                   "running"  : "pending",
                   "failed"   : "failure"}
 
-    def __init__(self, nsr_id, job, tasks=None):
+    def __init__(self, nsr_id, job, project, tasks=None):
         """
         Args:
             nsr_id (uuid): ID of NSR record
-            job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
+            job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
             tasks: List of asyncio.tasks. If provided the job monitor will
                 use it to monitor the tasks instead of the execution IDs
         """
         self._job = job
         self.nsr_id = nsr_id
         self.tasks = tasks
+        self._project = project
+
         self._regh = None
 
     @property
@@ -315,10 +350,10 @@ class ConfigAgentJob(object):
     @property
     def xpath(self):
         """Xpath of the job"""
-        return ("D,/nsr:ns-instance-opdata" +
-                "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
-                "/nsr:config-agent-job[nsr:job-id='{}']"
-                ).format(self.nsr_id, self.id)
+        return self._project.add_project(("D,/nsr:ns-instance-opdata" +
+                "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
+                "/nsr:config-agent-job[nsr:job-id={}]"
+                ).format(quoted_key(self.nsr_id), quoted_key(str(self.id))))
 
     @property
     def regh(self):
@@ -331,9 +366,9 @@ class ConfigAgentJob(object):
         self._regh = hdl
 
     @staticmethod
-    def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
+    def convert_rpc_input_to_job(nsr_id, rpc_output, tasks, project):
         """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
-        to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
+        to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
 
         Args:
             nsr_id (uuid): NSR ID
@@ -344,10 +379,10 @@ class ConfigAgentJob(object):
             ConfigAgentJob
         """
         # Shortcuts to prevent the HUUGE names.
-        CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
-        CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
-        CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
-        CfgAgentPrimitiveParam =  NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
+        CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
+        CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
+        CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
+        CfgAgentPrimitiveParam =  NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
 
         job = CfgAgentJob.from_dict({
                 "job_id": rpc_output.job_id,
@@ -370,7 +405,8 @@ class ConfigAgentJob(object):
                 vnf_primitive = CfgAgentPrimitive.from_dict({
                         "name": primitive.name,
                         "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
-                        "execution_id": primitive.execution_id
+                        "execution_id": primitive.execution_id,
+                        "execution_error_details": primitive.execution_error_details,
                     })
 
                 # Copy over the input param
@@ -385,7 +421,7 @@ class ConfigAgentJob(object):
 
             job.vnfr.append(vnfr_job)
 
-        return ConfigAgentJob(nsr_id, job, tasks)
+        return ConfigAgentJob(nsr_id, job, project, tasks)
 
 
 class ConfigAgentJobMonitor(object):
@@ -432,7 +468,10 @@ class ConfigAgentJobMonitor(object):
                            format(process, rc, err))
 
             if len(err):
-                errs += "<error>{}</error>".format(err)
+                if rc == 0:
+                    errs += "<success>{}</success>".format(err)
+                else:
+                    errs += "<error>{}</error>".format(err)
             result |= rc
 
         if result == 0:
@@ -445,13 +484,10 @@ class ConfigAgentJobMonitor(object):
 
         registration_handle.update_element(self.job.xpath, self.job.job)
 
-    def get_error_details(self):
+    def get_execution_details(self):
         '''Get the error details from failed primitives'''
         errs = ''
         for vnfr in self.job.job.vnfr:
-            if vnfr.vnf_job_status != "failure":
-                continue
-
             for primitive in vnfr.primitive:
                 if primitive.execution_status == "failure":
                     errs += '<error>'
@@ -460,7 +496,11 @@ class ConfigAgentJobMonitor(object):
                     else:
                         errs += '{}: Unknown error'.format(primitive.name)
                     errs += "</error>"
-
+                else:
+                    if primitive.execution_error_details:
+                        errs += '<{status}>{details}</{status}>'.format(
+                            status=primitive.execution_status,
+                            details=primitive.execution_error_details)
         return errs
 
     @asyncio.coroutine
@@ -511,14 +551,15 @@ class ConfigAgentJobMonitor(object):
 
                 if "failure" in job_status:
                     self.job.job_status = "failure"
-                    errs = self.get_error_details()
-                    if len(errs):
-                        self.job.job.job_status_details = errs
                 elif "pending" in job_status:
                     self.job.job_status = "pending"
                 else:
                     self.job.job_status = "success"
 
+                errs = self.get_execution_details()
+                if len(errs):
+                    self.job.job.job_status_details = errs
+
                 # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
                 #     self.job.job_status,
                 #     self.job.xpath,
@@ -526,6 +567,7 @@ class ConfigAgentJobMonitor(object):
 
                 registration_handle.update_element(self.job.xpath, self.job.job)
 
+            registration_handle.update_element(self.job.xpath, self.job.job)
 
         except Exception as e:
             self.log.exception(e)
@@ -548,6 +590,9 @@ class ConfigAgentJobMonitor(object):
 
         for primitive in vnfr.primitive:
             if primitive.execution_status != 'pending':
+                if primitive.execution_id == "":
+                    # We may not have processed the status for these yet
+                    job_status.append(primitive.execution_status)
                 continue
 
             if primitive.execution_id == "":
@@ -555,7 +600,7 @@ class ConfigAgentJobMonitor(object):
                 job_status.append(primitive.execution_status)
                 continue
 
-            elif primitive.execution_id == "config":
+            if primitive.execution_id == "config":
                 # Config job. Check if service is active
                 task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
 
@@ -665,7 +710,7 @@ class CfgAgentJobDtsHandler(object):
         self._nsm = nsm
 
         self._regh = None
-        self._nsr_regh = None
+        self._project = cfgm.project
 
     @property
     def regh(self):
@@ -682,11 +727,10 @@ class CfgAgentJobDtsHandler(object):
         """ Return the ConfigManager manager instance """
         return self._cfgm
 
-    @staticmethod
-    def cfg_job_xpath(nsr_id, job_id):
-        return ("D,/nsr:ns-instance-opdata" +
-                "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
-                "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)
+    def cfg_job_xpath(self, nsr_id, job_id):
+        return self._project.add_project(("D,/nsr:ns-instance-opdata" +
+                "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
+                "/nsr:config-agent-job[nsr:job-id={}]").format(quoted_key(nsr_id), quoted_key(str(job_id))))
 
     @asyncio.coroutine
     def register(self):
@@ -697,7 +741,7 @@ class CfgAgentJobDtsHandler(object):
             """ prepare callback from dts """
             xpath = ks_path.to_xpath(RwNsrYang.get_schema())
             if action == rwdts.QueryAction.READ:
-                schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema()
+                schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
                 path_entry = schema.keyspec_to_entry(ks_path)
                 try:
                     nsr_id = path_entry.key00.ns_instance_config_ref
@@ -716,7 +760,7 @@ class CfgAgentJobDtsHandler(object):
                         for job in jobs:
                             xact_info.respond_xpath(
                                 rwdts.XactRspCode.MORE,
-                                CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
+                                self.cfg_job_xpath(nsr_id, job.id),
                                 job.job)
 
                 except Exception as e:
@@ -728,17 +772,17 @@ class CfgAgentJobDtsHandler(object):
 
         hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
         with self._dts.group_create() as group:
-            self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH,
+            self._regh = group.register(xpath=self._project.add_project(
+                CfgAgentJobDtsHandler.XPATH),
                                         handler=hdl,
                                         flags=rwdts.Flag.PUBLISHER,
                                         )
 
-    @asyncio.coroutine
     def _terminate_nsr(self, nsr_id):
         self._log.debug("NSR {} being terminated".format(nsr_id))
         jobs = self.cfgm.get_job(nsr_id)
         for job in jobs:
-            path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
+            path = self.cfg_job_xpath(nsr_id, job.id)
             with self._dts.transaction() as xact:
                 self._log.debug("Deleting job: {}".format(path))
                 job.regh.delete_element(path)
@@ -749,40 +793,14 @@ class CfgAgentJobDtsHandler(object):
 
     @property
     def nsr_xpath(self):
-        return "D,/nsr:ns-instance-opdata/nsr:nsr"
-
-    @asyncio.coroutine
-    def register_for_nsr(self):
-        """ Register for NSR changes """
-
-        @asyncio.coroutine
-        def on_prepare(xact_info, query_action, ks_path, msg):
-            """ This NSR is created """
-            self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
-                            query_action,
-                            ks_path,
-                            msg)
-
-            if (query_action == rwdts.QueryAction.UPDATE or
-                query_action == rwdts.QueryAction.CREATE):
-                pass
-            elif query_action == rwdts.QueryAction.DELETE:
-                nsr_id = msg.ns_instance_config_ref
-                asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
-            else:
-                raise NotImplementedError(
-                    "%s action on cm-state not supported",
-                    query_action)
-
-            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+        return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
 
-        try:
-            handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
-            self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
-                                                          flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
-                                                          handler=handler)
-        except Exception as e:
-            self._log.error("Failed to register for NSR changes as %s", str(e))
+    def deregister(self):
+        self._log.debug("De-register config agent job for project".
+                        format(self._project.name))
+        if self._regh:
+            self._regh.deregister()
+            self._regh = None
 
 
 class ConfigAgentJobManager(object):
@@ -791,7 +809,7 @@ class ConfigAgentJobManager(object):
 
     TODO: Needs to support multiple config agents.
     """
-    def __init__(self, dts, log, loop, nsm):
+    def __init__(self, dts, log, loop, project, nsm):
         """
         Args:
             dts  : Dts handle
@@ -804,11 +822,12 @@ class ConfigAgentJobManager(object):
         self.log = log
         self.loop = loop
         self.nsm = nsm
+        self.project = project
         self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
         self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
 
     def add_job(self, rpc_output, tasks=None):
-        """Once an RPC is trigger add a now job
+        """Once an RPC is triggered, add a new job
 
         Args:
             rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
@@ -818,7 +837,8 @@ class ConfigAgentJobManager(object):
         """
         nsr_id = rpc_output.nsr_id_ref
 
-        job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+        job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
+                                                      tasks, self.project)
 
         self.log.debug("Creating a job monitor for Job id: {}".format(
                 rpc_output.job_id))
@@ -837,6 +857,14 @@ class ConfigAgentJobManager(object):
                     ca = agent
                     break
 
+        def done_callback(fut):
+            e = fut.exception()
+            if e:
+                self.log.error("Exception on monitor job {}: {}".
+                               format(rpc_output.job_id, e))
+                fut.print_stack()
+            self.log.debug("Monitor job done for {}".format(rpc_output.job_id))
+
         # For every Job we will schedule a new monitoring process.
         job_monitor = ConfigAgentJobMonitor(
             self.dts,
@@ -847,6 +875,7 @@ class ConfigAgentJobManager(object):
             ca
             )
         task = self.loop.create_task(job_monitor.publish_action_status())
+        task.add_done_callback(done_callback)
 
     def get_job(self, nsr_id):
         """Get the job associated with the NSR Id, if present."""
@@ -863,4 +892,8 @@ class ConfigAgentJobManager(object):
     @asyncio.coroutine
     def register(self):
         yield from self.handler.register()
-        yield from self.handler.register_for_nsr()
+        # yield from self.handler.register_for_nsr()
+
+    def deregister(self):
+        self.handler.deregister()
+        self.handler = None