Support SO restart for projects 61/1561/1
authorPhilip Joseph <philip.joseph@riftio.com>
Thu, 13 Apr 2017 09:40:29 +0000 (15:10 +0530)
committerPhilip Joseph <philip.joseph@riftio.com>
Thu, 13 Apr 2017 14:15:45 +0000 (19:45 +0530)
Change-Id: I632151fa0194f80ab8d827f7bae3e96e6bc3748c
Signed-off-by: Philip Joseph <philip.joseph@riftio.com>
common/python/rift/mano/dts/subscriber/core.py
common/python/rift/mano/utils/project.py
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/tasklet.py
rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/publisher.py
rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py
rwlaunchpad/plugins/rwvnfm/rift/tasklets/rwvnfmtasklet/rwvnfmtasklet.py
rwlaunchpad/test/launchpad.py
rwprojectmano/plugins/rwprojectmano/rift/tasklets/rwprojectmano/projectmano.py
rwprojectmano/plugins/rwprojectmano/rift/tasklets/rwprojectmano/rolesmano.py

index a2181e8..63cb321 100644 (file)
@@ -163,8 +163,16 @@ class AbstractConfigSubscriber(SubscriberDtsHandler):
 
         def on_apply(dts, acg, xact, action, scratch):
             """Apply the  configuration"""
-            is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
-
+            if xact.xact is None:
+                if action == rwdts.AppconfAction.INSTALL:
+                    try:
+                        for cfg in self.reg.elements:
+                            if self.callback:
+                                self.callback(cfg, rwdts.QueryAction.CREATE)
+                    except Exception as e:
+                        self._log.exception("Adding config {} during restart failed: {}".
+                                            format(cfg, e))
+                return
 
             add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
                     dts_member_reg=self.reg,
index 96d1de5..9d607ff 100644 (file)
@@ -422,13 +422,13 @@ class ProjectDtsHandler(object):
         return self._dts
 
     def add_project(self, name):
-        self.log.info("Adding project: {}".format(name))
+        self._log.info("Adding project: {}".format(name))
 
         if name not in self.projects:
             self._callbacks.on_add_apply(name)
             self.projects.append(name)
         else:
-            self.log.error("Project already present: {}".
+            self._log.error("Project already present: {}".
                            format(name))
 
     def delete_project(self, name):
@@ -437,7 +437,7 @@ class ProjectDtsHandler(object):
             self._callbacks.on_delete_apply(name)
             self.projects.remove(name)
         else:
-            self.log.error("Unrecognized project: {}".
+            self._log.error("Unrecognized project: {}".
                            format(name))
 
     def update_project(self, name):
@@ -474,12 +474,11 @@ class ProjectDtsHandler(object):
                 if action == rwdts.AppconfAction.INSTALL:
                     curr_cfg = self._reg.elements
                     for cfg in curr_cfg:
-                        self._log.debug("Project being re-added after restart.")
-                        self.add_project(cfg.name_ref)
+                        self._log.error("NOT IMPLEMENTED: Project being re-added after restart: {}".
+                                        format(cfg))
+                        # self.add_project(cfg.name)
+                        raise NotImplementedError("Tasklet restart not supported")
                 else:
-                    # When RIFT first comes up, an INSTALL is called with the current config
-                    # Since confd doesn't actally persist data this never has any data so
-                    # skip this for now.
                     self._log.debug("No xact handle.  Skipping apply config")
 
                 return
@@ -636,6 +635,18 @@ class ProjectHandler(object):
                                 format(name, self._get_tasklet_name(), e))
 
     def on_project_added(self, name):
+        if name not in self._tasklet.projects:
+            # Restart case, directly calling apply
+            try:
+                self._tasklet.projects[name] = \
+                                self._class(name, self._tasklet, **(self._kw))
+                self._loop.create_task(self._get_project(name).register())
+
+            except Exception as e:
+                self._log.exception("Project {} create for {} failed: {}".
+                                    format(name, self._get_tasklet_name(), e))
+                raise e
+
         self._log.debug("Project {} added to tasklet {}".
                         format(name, self._get_tasklet_name()))
         self._get_project(name)._apply = True
index d416f9c..f20a270 100644 (file)
@@ -113,10 +113,16 @@ class NsdCatalogDtsHandler(CatalogDtsHandler):
     def register(self):
         def apply_config(dts, acg, xact, action, _):
             if xact.xact is None:
-                # When RIFT first comes up, an INSTALL is called with the current config
-                # Since confd doesn't actally persist data this never has any data so
-                # skip this for now.
-                self.log.debug("No xact handle.  Skipping apply config")
+                if action == rwdts.AppconfAction.INSTALL:
+                    if self.reg:
+                        for element in self.reg.elements:
+                            self.log.debug("Add NSD on restart: {}".format(element.id))
+                            self.add_nsd(element)
+                    else:
+                        self.log.error("DTS handle is null for project {}".
+                                       format(self.project.name))
+                else:
+                    self.log.debug("No xact handle.  Skipping apply config")
                 return
 
             add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
@@ -498,7 +504,6 @@ class LaunchpadTasklet(rift.tasklets.Tasklet):
                 )
 
             self.log.debug("Registering project handler")
-            print("PJ: Registering project handler")
             self.project_handler = ProjectHandler(self, LaunchpadProject,
                                                   app=self.app)
             self.project_handler.register()
index 1b5c787..972b183 100644 (file)
@@ -276,8 +276,8 @@ class VnfdPublisher(object):
             url = "{}://127.0.0.1:8008/api/config/project/{}/vnfd-catalog/vnfd/{}"
 
             model = RwYang.Model.create_libncx()
-            model.load_module("rw-vnfd")
-            model.load_module("vnfd")
+            model.load_module("rw-project-vnfd")
+            model.load_module("project-vnfd")
 
             data = vnfd.to_json(model)
 
index 00c549b..061ade0 100755 (executable)
@@ -1045,8 +1045,7 @@ class VirtualNetworkFunctionRecord(object):
                 # But not sure whats the use of this variable?
                 self.vnfr_msg.config_status = status_to_string(status)
             except Exception as e:
-                self._log.error("Exception=%s", str(e))
-                pass
+                self._log.exception("Exception=%s", str(e))
 
             self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
 
@@ -1115,6 +1114,8 @@ class VirtualNetworkFunctionRecord(object):
             self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
                             cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id)
 
+        self._log.debug("VNFR {} restart mode {}".
+                        format(self.vnfr_msg.id, self.restart_mode))
         if not self.restart_mode:
             yield from self._dts.query_create(self.xpath,
                                               0,   # this is sub
@@ -1125,7 +1126,7 @@ class VirtualNetworkFunctionRecord(object):
                                               self.vnfr_msg)
 
         self._log.info("Created VNF with xpath %s and vnfr %s",
-                       self.xpath, self.vnfr_msg)
+                        self.xpath, self.vnfr_msg)
 
     @asyncio.coroutine
     def update_state(self, vnfr_msg):
@@ -1507,6 +1508,7 @@ class NetworkServiceRecord(object):
     @asyncio.coroutine
     def create(self, config_xact):
         """ Create this network service"""
+        self._log.debug("Create NS {} for {}".format(self.name, self._project.name))
         # Create virtual links  for all the external vnf
         # connection points in this NS
         yield from self.create_vls()
@@ -2957,19 +2959,21 @@ class VnfdDtsHandler(object):
             self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
                             xact, action, scratch)
 
+            is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
+
             if self._regh:
                 # Create/Update a VNFD record
                 for cfg in self._regh.get_xact_elements(xact):
                     # Only interested in those VNFD cfgs whose ID was received in prepare callback
-                    if cfg.id in scratch.get('vnfds', []):
+                    if cfg.id in scratch.get('vnfds', []) or is_recovery:
                         self._nsm.update_vnfd(cfg)
 
-                        for cfg in self._regh.elements:
-                            if cfg.id in scratch.get('deleted_vnfds', []):
-                                yield from self._nsm.delete_vnfd(cfg.id)
+                for cfg in self._regh.elements:
+                    if cfg.id in scratch.get('deleted_vnfds', []):
+                        yield from self._nsm.delete_vnfd(cfg.id)
 
             else:
-                self._log.error("Reg handle none for {} in project {}".
+                self._log.debug("Reg handle none for {} in project {}".
                                 format(self.__class__, self._project))
 
             scratch.pop('vnfds', None)
@@ -2979,7 +2983,7 @@ class VnfdDtsHandler(object):
         def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
             """ on prepare callback """
             self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
-                            ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg)
+                            ks_path.to_xpath(NsdYang.get_schema()), xact_info.query_action, msg)
 
             fref = ProtobufC.FieldReference.alloc()
             fref.goto_whole_message(msg.to_pbcm())
@@ -3344,8 +3348,8 @@ class NsrDtsHandler(object):
                     self._log.error(err)
                     raise NetworkServiceRecordError(err)
 
-                self._log.debug("Creating NetworkServiceRecord %s  from nsr config  %s",
-                               msg.id, msg.as_dict())
+                self._log.debug("Creating NSR {} with restart mode {} from nsr config  {}".
+                                format(msg.id, restart_mode, msg.as_dict()))
                 nsr = self.nsm.create_nsr(msg, key_pairs=key_pairs, restart_mode=restart_mode)
                 return nsr
 
@@ -3377,10 +3381,7 @@ class NsrDtsHandler(object):
                     self._log.exception("NS instantiation: {}".format(e))
                     raise e
 
-            self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
-                            xact, action, scratch)
-
-            if action == rwdts.AppconfAction.INSTALL and xact.id is None:
+            if action == rwdts.AppconfAction.INSTALL and xact.xact is None:
                 key_pairs = []
                 if self._key_pair_regh:
                     for element in self._key_pair_regh.elements:
@@ -3391,12 +3392,16 @@ class NsrDtsHandler(object):
 
                 if self._nsr_regh:
                     for element in self._nsr_regh.elements:
+                        self._log.error("Create NSR {} for project {} on restart".
+                                        format(element.name, self._project.name))
                         nsr = handle_create_nsr(element, key_pairs, restart_mode=True)
                         self._loop.create_task(begin_instantiation(nsr))
                 else:
                     self._log.error("Reg handle none for NSR in project {}".
                                     format(self._project))
 
+                return RwTypes.RwStatus.SUCCESS
+
 
             (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
                                                                                   xact,
index 253094f..50e7f26 100755 (executable)
@@ -2296,14 +2296,15 @@ class VnfrDtsHandler(object):
 
                 yield from vnfr.instantiate(None, restart_mode=True)
 
+            self._log.debug("Got on_event in vnfm: {}".format(xact_event))
+
             if xact_event == rwdts.MemberEvent.INSTALL:
                 curr_cfg = self.regh.elements
                 for cfg in curr_cfg:
                     vnfr = self.vnfm.create_vnfr(cfg)
+                    self._log.debug("Creating VNFR {}".format(vnfr.vnfr_id))
                     self._loop.create_task(instantiate_realloc_vnfr(vnfr))
 
-            self._log.debug("Got on_event in vnfm")
-
             return rwdts.MemberRspCode.ACTION_OK
 
         @asyncio.coroutine
@@ -2314,12 +2315,8 @@ class VnfrDtsHandler(object):
                 xact_info, action, msg
                 )
 
-            if action == rwdts.QueryAction.CREATE:
-                if not msg.has_field("vnfd"):
-                    err = "Vnfd not provided"
-                    self._log.error(err)
-                    raise VnfRecordError(err)
-
+            @asyncio.coroutine
+            def create_vnf():
                 vnfr = self.vnfm.create_vnfr(msg)
                 try:
                     # RIFT-9105: Unable to add a READ query under an existing transaction
@@ -2330,6 +2327,14 @@ class VnfrDtsHandler(object):
                     self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
                     vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
                     yield from vnfr.publish(None)
+
+            if action == rwdts.QueryAction.CREATE:
+                if not msg.has_field("vnfd"):
+                    err = "Vnfd not provided"
+                    self._log.error(err)
+                    raise VnfRecordError(err)
+                yield from create_vnf()
+
             elif action == rwdts.QueryAction.DELETE:
                 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
                 path_entry = schema.keyspec_to_entry(ks_path)
@@ -2355,6 +2360,14 @@ class VnfrDtsHandler(object):
                 vnfr = None
                 try:
                     vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
+
+                except VnfRecordError as e:
+                    self._log.debug("Did not find VNFR {}".format(path_entry.key00.id))
+                    # Could be a restart of LP
+                    yield from create_vnf()
+                    xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+                    return
+
                 except Exception as e:
                     self._log.debug("No vnfr found with id %s", path_entry.key00.id)
                     xact_info.respond_xpath(rwdts.XactRspCode.NA)
index 6db393c..1eb373c 100755 (executable)
@@ -455,7 +455,7 @@ class Demo(rift.vcs.demo.Demo):
             rift.vcs.uAgentTasklet(),
             rift.vcs.IdentityManagerTasklet(),
             rift.vcs.ProjectManagerTasklet(),
-            ProjectMgrManoTasklet(),
+            ProjectMgrManoTasklet(),
             rift.vcs.Launchpad(),
             ]
 
@@ -484,7 +484,7 @@ class Demo(rift.vcs.demo.Demo):
               AutoscalerTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
               PackageManagerTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
               StagingManagerTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
-              #ProjectMgrManoTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+              ProjectMgrManoTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
             ]
 
         if not mgmt_ip_list or len(mgmt_ip_list) == 0:
index a59284a..66f2849 100644 (file)
@@ -99,13 +99,13 @@ class ProjectDtsHandler(object):
 
     def add_project(self, cfg):
         name = cfg.name
-        self.log.info("Adding project: {}".format(name))
+        self._log.info("Adding project: {}".format(name))
 
         if name not in self.projects:
             self._callbacks.on_add_apply(name, cfg)
             self.projects.append(name)
         else:
-            self.log.error("Project already present: {}".
+            self._log.error("Project already present: {}".
                            format(name))
 
     def delete_project(self, name):
@@ -114,7 +114,7 @@ class ProjectDtsHandler(object):
             self._callbacks.on_delete_apply(name)
             self.projects.remove(name)
         else:
-            self.log.error("Unrecognized project: {}".
+            self._log.error("Unrecognized project: {}".
                            format(name))
 
     def update_project(self, cfg):
@@ -131,7 +131,7 @@ class ProjectDtsHandler(object):
         if name in self.projects:
             pass
         else:
-            self.log.error("Unrecognized project: {}".
+            self._log.error("Unrecognized project: {}".
                            format(name))
 
     def register(self):
@@ -143,12 +143,10 @@ class ProjectDtsHandler(object):
                 if action == rwdts.AppconfAction.INSTALL:
                     curr_cfg = self._reg.elements
                     for cfg in curr_cfg:
-                        self._log.debug("Project being re-added after restart.")
-                        self.add_project(cfg.name)
+                        self._log.info("Project {} being re-added after restart.".
+                                       format(cfg.name))
+                        self.add_project(cfg)
                 else:
-                    # When RIFT first comes up, an INSTALL is called with the current config
-                    # Since confd doesn't actally persist data this never has any data so
-                    # skip this for now.
                     self._log.debug("No xact handle.  Skipping apply config")
 
                 return
@@ -282,6 +280,18 @@ class ProjectHandler(object):
                                 format(name, self._get_tasklet_name(), e))
 
     def on_project_added(self, name, cfg):
+        if name not in self._tasklet.projects:
+            # Restart case, directly calling apply
+            try:
+                self._tasklet.projects[name] = \
+                                self._class(name, self._tasklet)
+                self._loop.create_task(self._get_project(name).register())
+
+            except Exception as e:
+                self._log.exception("Project {} create for {} failed: {}".
+                                    format(name, self._get_tasklet_name(), e))
+                raise e
+
         self._log.debug("Project {} added to tasklet {}".
                         format(name, self._get_tasklet_name()))
         self._get_project(name)._apply = True
index d6d12c4..9b0ae00 100644 (file)
@@ -86,7 +86,7 @@ class ProjectConfigSubscriber(object):
 
     def update_user(self, cfg):
         user = User().pb(cfg)
-        self._log.error("Update user {} for project {}".
+        self._log.debug("Update user {} for project {}".
                         format(user.key, self.project_name))
         cfg_roles = {}
         for cfg_role in cfg.mano_role:
@@ -148,8 +148,9 @@ class ProjectConfigSubscriber(object):
                 if action == rwdts.AppconfAction.INSTALL:
                     curr_cfg = self._reg.elements
                     for cfg in curr_cfg:
-                        self._log.debug("Project being re-added after restart.")
-                        self.add_user(cfg)
+                        self._log.info("Project {} user being re-added after restart: {}.".
+                                       format(self.project_name, cfg.as_dict()))
+                        self.update_user(cfg)
                 else:
                     # When RIFT first comes up, an INSTALL is called with the current config
                     # Since confd doesn't actally persist data this never has any data so
@@ -319,8 +320,8 @@ class RoleConfigPublisher(rift.tasklets.DtsConfigPublisher):
             self.create_project_role(role)
 
     def create_project_role(self, role):
-        self.log.error("Create project role for {}: {}".
-                       format(self.project_name, role.role))
+        self.log.info("Create project role for {}: {}".
+                      format(self.project_name, role.role))
         xpath = self.role_xpath(role.key)
         pb_role = self.pb_role(role)
         self._regh.update_element(xpath, pb_role)
@@ -333,8 +334,8 @@ class RoleConfigPublisher(rift.tasklets.DtsConfigPublisher):
             self.delete_project_role(role)
 
     def delete_project_role(self, role):
-        self.log.error("Delete project role for {}: {}".
-                       format(self.project_name, role.role))
+        self.log.info("Delete project role for {}: {}".
+                      format(self.project_name, role.role))
         xpath = self.role_xpath(role.key)
         self._regh.delete_element(xpath)