adding ns_lcm_op_occs instantiate terminate action (primitive)
[osm/RO.git] / lcm / osm_common / msgkafka.py
index 459513d..de1e764 100644 (file)
@@ -56,7 +56,7 @@ class MsgKafka(MsgBase):
         except Exception as e:
             raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
 
-    async def aiowrite(self, topic, key, msg, loop):
+    async def aiowrite(self, topic, key, msg, loop=None):
 
         if not loop:
             loop = self.loop
@@ -64,9 +64,9 @@ class MsgKafka(MsgBase):
             self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
                                              bootstrap_servers=self.broker)
             await self.producer.start()
-            await self.producer.send(topic=topic, key=key, value=msg)
+            await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
         except Exception as e:
-            raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
+            raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
         finally:
             await self.producer.stop()