Fix Bug 914: Updating VNFD the first time shows ProducerClosed error
[osm/common.git] / osm_common / msgkafka.py
index bc9147d..91775f2 100644 (file)
@@ -64,11 +64,15 @@ class MsgKafka(MsgBase):
         :param msg: message content, can be string or dictionary
         :return: None or raises MsgException on failing
         """
-        try:
-            self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
-
-        except Exception as e:
-            raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
+        retry = 2   # Try two times
+        while retry:
+            try:
+                self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
+                break
+            except Exception as e:
+                retry -= 1
+                if retry == 0:
+                    raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
 
     def read(self, topic):
         """
@@ -136,11 +140,12 @@ class MsgKafka(MsgBase):
 
             async for message in self.consumer:
                 if callback:
-                    callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+                    callback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value), **kwargs)
                 elif aiocallback:
-                    await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
+                    await aiocallback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value),
+                                      **kwargs)
                 else:
-                    return message.topic, yaml.load(message.key), yaml.load(message.value)
+                    return message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value)
         except KafkaError as e:
             raise MsgException(str(e))
         finally: