Fix bug #1077 slice instantiation with ssh_keys option
[osm/NBI.git] / osm_nbi / subscriptions.py
index 393918c..711455e 100644 (file)
@@ -80,21 +80,32 @@ class SubscriptionThread(threading.Thread):
                 if not kafka_working:
                     self.logger.critical("kafka is working again")
                     kafka_working = True
                 if not kafka_working:
                     self.logger.critical("kafka is working again")
                     kafka_working = True
-                if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED":
+                if not self.aiomain_task_admin:
                     await asyncio.sleep(10, loop=self.loop)
                     self.logger.debug("Starting admin subscription task")
                     self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
                                                                                      group_id=False,
                                                                                      aiocallback=self._msg_callback),
                                                                     loop=self.loop)
                     await asyncio.sleep(10, loop=self.loop)
                     self.logger.debug("Starting admin subscription task")
                     self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
                                                                                      group_id=False,
                                                                                      aiocallback=self._msg_callback),
                                                                     loop=self.loop)
-                if not self.aiomain_task or self.aiomain_task._state == "FINISHED":
+                if not self.aiomain_task:
                     await asyncio.sleep(10, loop=self.loop)
                     self.logger.debug("Starting non-admin subscription task")
                     self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
                                                                                aiocallback=self._msg_callback),
                                                               loop=self.loop)
                     await asyncio.sleep(10, loop=self.loop)
                     self.logger.debug("Starting non-admin subscription task")
                     self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
                                                                                aiocallback=self._msg_callback),
                                                               loop=self.loop)
-                await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
-                                   timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                done, _ = await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
+                                             timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
+                try:
+                    if self.aiomain_task_admin in done:
+                        exc = self.aiomain_task_admin.exception()
+                        self.logger.error("admin subscription task exception: {}".format(exc))
+                        self.aiomain_task_admin = None
+                    if self.aiomain_task in done:
+                        exc = self.aiomain_task.exception()
+                        self.logger.error("non-admin subscription task exception: {}".format(exc))
+                        self.aiomain_task = None
+                except asyncio.CancelledError:
+                    pass
             except Exception as e:
                 if self.to_terminate:
                     return
             except Exception as e:
                 if self.to_terminate:
                     return
@@ -226,3 +237,5 @@ class SubscriptionThread(threading.Thread):
         self.to_terminate = True
         if self.aiomain_task:
             self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
         self.to_terminate = True
         if self.aiomain_task:
             self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
+        if self.aiomain_task_admin:
+            self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)