update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / tasklets / rwvnstasklet / rwvnstasklet.py
index 1f88824..97ef76c 100755 (executable)
@@ -1,6 +1,6 @@
 
-# 
-#   Copyright 2016 RIFT.IO Inc
+#
+#   Copyright 2016-2017 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -16,8 +16,6 @@
 #
 
 import asyncio
-import logging
-import os
 import sys
 
 import gi
@@ -25,38 +23,31 @@ gi.require_version('RwVnsYang', '1.0')
 gi.require_version('RwDts', '1.0')
 from gi.repository import (
     RwVnsYang,
-    RwSdnYang,
     RwDts as rwdts,
     RwTypes,
-    ProtobufC,
 )
 
 import rift.tasklets
+from rift.mano.utils.project import (
+    ManoProject,
+    ProjectHandler,
+)
+import rift.mano.sdn
 
 from rift.vlmgr import (
     VlrDtsHandler,
     VldDtsHandler,
     VirtualLinkRecord,
+    VirtualLinkEventListener
 )
 
 from rift.topmgr import (
     NwtopStaticDtsHandler,
     NwtopDiscoveryDtsHandler,
     NwtopDataStore,
-    SdnAccountMgr,
 )
 
 
-class SdnInterfaceError(Exception):
-    """ SDN interface creation Error """
-    pass
-
-
-class SdnPluginError(Exception):
-    """ SDN plugin creation Error """
-    pass
-
-
 class VlRecordError(Exception):
     """ Vlr Record creation Error """
     pass
@@ -66,203 +57,68 @@ class VlRecordNotFound(Exception):
     """ Vlr Record not found"""
     pass
 
-class SdnAccountError(Exception):
-    """ Error while creating/deleting/updating SDN Account"""
-    pass
-
-class SdnAccountNotFound(Exception):
-    pass
 
-class SDNAccountDtsOperdataHandler(object):
-    def __init__(self, dts, log, loop, parent):
-        self._dts = dts
+class SDNAccountHandlers(object):
+    def __init__(self, dts, log, log_hdl, acctstore, loop, project):
         self._log = log
+        self._log_hdl = log_hdl
+        self._dts = dts
         self._loop = loop
-        self._parent = parent
-
-    def _register_show_status(self):
-        def get_xpath(sdn_name=None):
-            return "D,/rw-sdn:sdn-account{}/rw-sdn:connection-status".format(
-                    "[name='%s']" % sdn_name if sdn_name is not None else ''
-                   )
-
-        @asyncio.coroutine
-        def on_prepare(xact_info, action, ks_path, msg):
-            path_entry = RwSdnYang.SDNAccountConfig.schema().keyspec_to_entry(ks_path)
-            sdn_account_name = path_entry.key00.name
-            self._log.debug("Got show sdn connection status request: %s", ks_path.create_string())
-
-            try:
-                saved_accounts = self._parent._acctmgr.get_saved_sdn_accounts(sdn_account_name)
-                for account in saved_accounts:
-                    sdn_acct = RwSdnYang.SDNAccountConfig()
-                    sdn_acct.from_dict(account.as_dict())
-
-                    self._log.debug("Responding to sdn connection status request: %s", sdn_acct.connection_status)
-                    xact_info.respond_xpath(
-                            rwdts.XactRspCode.MORE,
-                            xpath=get_xpath(account.name),
-                            msg=sdn_acct.connection_status,
-                            )
-            except KeyError as e:
-                self._log.warning(str(e))
-                xact_info.respond_xpath(rwdts.XactRspCode.NA)
-                return
-
-            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
-
-        yield from self._dts.register(
-                xpath=get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare),
-                flags=rwdts.Flag.PUBLISHER,
-                )
-
-    def _register_validate_rpc(self):
-        def get_xpath():
-            return "/rw-sdn:update-sdn-status"
-
-        @asyncio.coroutine
-        def on_prepare(xact_info, action, ks_path, msg):
-            if not msg.has_field("sdn_account"):
-                raise SdnAccountNotFound("SDN account name not provided")
-
-            sdn_account_name = msg.sdn_account
-            account = self._parent._acctmgr.get_sdn_account(sdn_account_name)
-            if account is None:
-                self._log.warning("SDN account %s does not exist", sdn_account_name)
-                xact_info.respond_xpath(rwdts.XactRspCode.NA)
-                return
-
-            self._parent._acctmgr.start_validate_credentials(self._loop, sdn_account_name)
-
-            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
-
-        yield from self._dts.register(
-                xpath=get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare
-                    ),
-                flags=rwdts.Flag.PUBLISHER,
-                )
-
+        self._acctstore = acctstore
+        self._project = project
+  
+        self._log.debug("Creating SDN account config handler")
+        self.sdn_cfg_handler = rift.mano.sdn.SDNAccountConfigSubscriber(
+              self._dts, self._log, project, self._log_hdl,
+              rift.mano.sdn.SDNAccountConfigCallbacks(
+                  on_add_apply=self.on_sdn_account_added,
+                  on_delete_apply=self.on_sdn_account_deleted,
+              ),
+              self._acctstore
+
+        )
+  
+        self._log.debug("Creating SDN account opdata handler")
+        self.sdn_operdata_handler = rift.mano.sdn.SDNAccountDtsOperdataHandler(
+              self._dts, self._log, self._loop, project,
+        )
+  
+    def on_sdn_account_deleted(self, account_name):
+        self._log.debug("SDN account deleted")
+        self.sdn_operdata_handler.delete_sdn_account(account_name)
+  
+    def on_sdn_account_added(self, account):
+        self._log.debug("SDN account added")
+        self.sdn_operdata_handler.add_sdn_account(account)
+  
     @asyncio.coroutine
     def register(self):
-        yield from self._register_show_status()
-        yield from self._register_validate_rpc()
-
-class SDNAccountDtsHandler(object):
-    XPATH = "C,/rw-sdn:sdn-account"
-
-    def __init__(self, dts, log, parent):
-        self._dts = dts
-        self._log = log
-        self._parent = parent
-
-        self._sdn_account = {}
-
-    def _set_sdn_account(self, account):
-        self._log.info("Setting sdn account: {}".format(account))
-        if account.name in self._sdn_account:
-            self._log.error("SDN Account with name %s already exists. Ignoring config", account.name);
-        self._sdn_account[account.name]  = account
-        self._parent._acctmgr.set_sdn_account(account)
-
-    def _del_sdn_account(self, account_name):
-        self._log.info("Deleting sdn account: {}".format(account_name))
-        del self._sdn_account[account_name]
+        self.sdn_cfg_handler.register()
+        yield from self.sdn_operdata_handler.register()
 
-        self._parent._acctmgr.del_sdn_account(account_name)
-
-    def _update_sdn_account(self, account):
-        self._log.info("Updating sdn account: {}".format(account))
-        # No need to update locally saved sdn_account's updated fields, as they
-        # are not used anywhere. Call the parent's update callback.
-        self._parent._acctmgr.update_sdn_account(account)
-
-    @asyncio.coroutine
-    def register(self):
-        def apply_config(dts, acg, xact, action, _):
-            self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
-            if action == rwdts.AppconfAction.INSTALL and xact.id is None:
-                self._log.debug("No xact handle.  Skipping apply config")
-                return RwTypes.RwStatus.SUCCESS
-
-            return RwTypes.RwStatus.SUCCESS
-
-        @asyncio.coroutine
-        def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
-            """ Prepare callback from DTS for SDN Account config """
-
-            self._log.info("SDN Cloud account config received: %s", msg)
-
-            fref = ProtobufC.FieldReference.alloc()
-            fref.goto_whole_message(msg.to_pbcm())
-
-            if fref.is_field_deleted():
-                # Delete the sdn account record
-                self._del_sdn_account(msg.name)
-            else:
-                # If the account already exists, then this is an update.
-                if msg.name in self._sdn_account:
-                    self._log.debug("SDN account already exists. Invoking on_prepare update request")
-                    if msg.has_field("account_type"):
-                        errmsg = "Cannot update SDN account's account-type."
-                        self._log.error(errmsg)
-                        xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
-                                                   SDNAccountDtsHandler.XPATH,
-                                                   errmsg)
-                        raise SdnAccountError(errmsg)
-
-                    # Update the sdn account record
-                    self._update_sdn_account(msg)
-                else:
-                    self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
-                    if not msg.has_field('account_type'):
-                        errmsg = "New SDN account must contain account-type field."
-                        self._log.error(errmsg)
-                        xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
-                                                   SDNAccountDtsHandler.XPATH,
-                                                   errmsg)
-                        raise SdnAccountError(errmsg)
-
-                    # Set the sdn account record
-                    self._set_sdn_account(msg)
-
-            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
-
-
-        self._log.debug("Registering for Sdn Account config using xpath: %s",
-                        SDNAccountDtsHandler.XPATH,
-                        )
-
-        acg_handler = rift.tasklets.AppConfGroup.Handler(
-                        on_apply=apply_config,
-                        )
-
-        with self._dts.appconf_group_create(acg_handler) as acg:
-            acg.register(
-                    xpath=SDNAccountDtsHandler.XPATH,
-                    flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
-                    on_prepare=on_prepare
-                    )
+    def deregister(self):
+        self.sdn_cfg_handler.deregister()
+        self.sdn_operdata_handler.deregister()
 
 
 class VnsManager(object):
     """ The Virtual Network Service Manager """
-    def __init__(self, dts, log, log_hdl, loop):
+    def __init__(self, dts, log, log_hdl, loop, project):
         self._dts = dts
         self._log = log
         self._log_hdl = log_hdl
         self._loop = loop
+        self._project = project
+        self._acctstore = {}
         self._vlr_handler = VlrDtsHandler(dts, log, loop, self)
         self._vld_handler = VldDtsHandler(dts, log, loop, self)
-        self._sdn_handler = SDNAccountDtsHandler(dts,log,self)
-        self._sdn_opdata_handler = SDNAccountDtsOperdataHandler(dts,log, loop, self)
-        self._acctmgr = SdnAccountMgr(self._log, self._log_hdl, self._loop)
+        self._sdn_handlers = SDNAccountHandlers(dts, log, log_hdl, self._acctstore, loop, self._project)
         self._nwtopdata_store = NwtopDataStore(log)
-        self._nwtopdiscovery_handler = NwtopDiscoveryDtsHandler(dts, log, loop, self._acctmgr, self._nwtopdata_store)
-        self._nwtopstatic_handler = NwtopStaticDtsHandler(dts, log, loop, self._acctmgr, self._nwtopdata_store)
+        self._nwtopdiscovery_handler = NwtopDiscoveryDtsHandler(dts, log, loop, project,
+                                                                self._acctstore, self._nwtopdata_store)
+        self._nwtopstatic_handler = NwtopStaticDtsHandler(dts, log, loop, project,
+                                                          self._acctstore, self._nwtopdata_store)
+        self._vl_event_listener = VirtualLinkEventListener(dts, log, loop, self)
         self._vlrs = {}
 
     @asyncio.coroutine
@@ -278,11 +134,10 @@ class VnsManager(object):
         yield from self._vld_handler.register()
 
     @asyncio.coroutine
-    def register_sdn_handler(self):
-        """ Register vlr DTS handler """
-        self._log.debug("Registering  SDN Account config handler")
-        yield from self._sdn_handler.register()
-        yield from self._sdn_opdata_handler.register()
+    def register_sdn_handlers(self):
+        """ Register SDN DTS handlers """
+        self._log.debug("Registering  SDN Account handlers")
+        yield from self._sdn_handlers.register()
 
     @asyncio.coroutine
     def register_nwtopstatic_handler(self):
@@ -296,15 +151,29 @@ class VnsManager(object):
         self._log.debug("Registering  discovery-based DTS NW topology handler")
         yield from self._nwtopdiscovery_handler.register()
 
+    @asyncio.coroutine
+    def register_vl_event_listener(self):
+        """ Register Virtual Link related events DTS handler """
+        self._log.debug("Registering  Virtual Link Event listener")
+        yield from self._vl_event_listener.register()
+
     @asyncio.coroutine
     def register(self):
         """ Register all static DTS handlers"""
-        yield from self.register_sdn_handler()
+        yield from self.register_sdn_handlers()
         yield from self.register_vlr_handler()
         yield from self.register_vld_handler()
         yield from self.register_nwtopstatic_handler()
-        # Not used for now
         yield from self.register_nwtopdiscovery_handler()
+        yield from self.register_vl_event_listener()
+
+    def deregister(self):
+        self._vl_event_listener.deregister()
+        self._nwtopdiscovery_handler.deregister()
+        self._nwtopstatic_handler.deregister()
+        self._vld_handler.deregister()
+        self._vlr_handler.deregister()
+        self._sdn_handlers.deregister()
 
     def create_vlr(self, msg):
         """ Create VLR """
@@ -320,7 +189,6 @@ class VnsManager(object):
                                                self._loop,
                                                self,
                                                msg,
-                                               msg.res_id
                                                )
         return self._vlrs[msg.id]
 
@@ -341,7 +209,7 @@ class VnsManager(object):
         del self._vlrs[vlr_id]
         self._log.info("Deleted virtual link id %s", vlr_id)
 
-    def find_vlr_by_vld_id(self, vld_id):
+    def find_vlR_by_vld_id(self, vld_id):
         """ Find a VLR matching the VLD Id """
         for vlr in self._vlrs.values():
             if vlr.vld_id == vld_id:
@@ -359,18 +227,87 @@ class VnsManager(object):
         return False
 
     @asyncio.coroutine
-    def publish_vlr(self, xact, path, msg):
+    def publish_vlr(self, xact, xpath, msg):
         """ Publish a VLR """
+        path = self._project.add_project(xpath)
         self._log.debug("Publish vlr called with path %s, msg %s",
                         path, msg)
         yield from self._vlr_handler.update(xact, path, msg)
 
     @asyncio.coroutine
-    def unpublish_vlr(self, xact, path):
+    def unpublish_vlr(self, xact, xpath):
         """ Publish a VLR """
+        path = self._project.add_project(xpath)
         self._log.debug("Unpublish vlr called with path %s", path)
         yield from self._vlr_handler.delete(xact, path)
 
+    def create_virual_link_event(self, event_id, event_msg):
+        """ Update Virtual Link Event """
+        self._log.debug("Creating Virtual Link Event id [%s], msg [%s]",
+                       event_id, event_msg)
+
+    @asyncio.coroutine
+    def update_virual_link_event(self, event_id, event_msg):
+        """ Update Virtual Link Event """
+        self._log.debug("Updating Virtual Link Event id [%s], msg [%s]",
+                        event_id, event_msg)
+        # event id and vlr_id are the same.
+        # Use event id to look up the VLR and update and publish state change
+        vlr = None
+
+        if event_id in self._vlrs:
+            vlr = self._vlrs[event_id]
+
+        if vlr is None:
+            self._log.error("Received VLR Event notifcation for unknown VLR - event-id:%s",
+                            event_id)
+            return
+
+        if event_msg.resource_info.resource_state == 'active':
+            with self._dts.transaction(flags=0) as xact:
+                yield from vlr.ready(event_msg, xact)
+        elif event_msg.resource_info.resource_state == 'failed':
+            with self._dts.transaction(flags=0) as xact:
+                if event_msg.resource_info.resource_errors:
+                    vlr._state_failed_reason = str(event_msg.resource_info.resource_errors)
+                yield from vlr.failed(event_msg, xact)
+        else:
+            self._log.warning("Receieved unknown resource state %s for event id %s vlr:%s",
+                              event_msg.resource_info.resource_state, event_id, vlr.name)
+
+    def delete_virual_link_event(self, event_id):
+        """ Delete Virtual Link Event """
+        self._log.debug("Deleting Virtual Link Event id [%s]",
+                        event_id)
+
+
+class VnsProject(ManoProject):
+
+    def __init__(self, name, tasklet, **kw):
+        super(VnsProject, self).__init__(tasklet.log, name)
+        self.update(tasklet)
+
+        self._vlr_handler = None
+        self._vnsm = None
+        # A mapping of instantiated vlr_id's to VirtualLinkRecord objects
+        self._vlrs = {}
+
+    @asyncio.coroutine
+    def register (self):
+        try:
+            self._vnsm = VnsManager(dts=self._dts,
+                                    log=self.log,
+                                    log_hdl=self.log_hdl,
+                                    loop=self.loop,
+                                    project=self)
+            yield from self._vnsm.run()
+        except Exception as e:
+            self.log.exception("VNS Task failed to run", e)
+
+    def deregister(self):
+        self._log.debug("De-register project {}".format(self.name))
+        self._vnsm.deregister()
+
 
 class VnsTasklet(rift.tasklets.Tasklet):
     """ The VNS tasklet class """
@@ -380,21 +317,25 @@ class VnsTasklet(rift.tasklets.Tasklet):
         self.rwlog.set_subcategory("vns")
 
         self._dts = None
-        self._vlr_handler = None
+        self._project_handler = None
+        self.projects = {}
 
-        self._vnsm = None
-        # A mapping of instantiated vlr_id's to VirtualLinkRecord objects
-        self._vlrs = {}
+    @property
+    def dts(self):
+        return self._dts
 
     def start(self):
         super(VnsTasklet, self).start()
         self.log.info("Starting VnsTasklet")
 
         self.log.debug("Registering with dts")
-        self._dts = rift.tasklets.DTS(self.tasklet_info,
-                                      RwVnsYang.get_schema(),
-                                      self.loop,
-                                      self.on_dts_state_change)
+        try:
+            self._dts = rift.tasklets.DTS(self.tasklet_info,
+                                          RwVnsYang.get_schema(),
+                                          self.loop,
+                                          self.on_dts_state_change)
+        except Exception:
+            self.log.exception("Caught Exception in VNS start:", e)
 
         self.log.debug("Created DTS Api GI Object: %s", self._dts)
 
@@ -412,17 +353,9 @@ class VnsTasklet(rift.tasklets.Tasklet):
     @asyncio.coroutine
     def init(self):
         """ task init callback"""
-        self._vnsm = VnsManager(dts=self._dts,
-                                log=self.log,
-                                log_hdl=self.log_hdl,
-                                loop=self.loop)
-        yield from self._vnsm.run()
-
-        # NSM needs to detect VLD deletion that has active VLR
-        # self._vld_handler = VldDescriptorConfigDtsHandler(
-        #         self._dts, self.log, self.loop, self._vlrs,
-        #         )
-        # yield from self._vld_handler.register()
+        self.log.debug("creating project handler")
+        self.project_handler = ProjectHandler(self, VnsProject)
+        self.project_handler.register()
 
     @asyncio.coroutine
     def run(self):