Adds projects_read and projects_write params to scaling nslcmop
[osm/POL.git] / osm_policy_module / common / lcm_client.py
index 3d8012f..089e541 100644 (file)
@@ -28,46 +28,54 @@ import logging
 import time
 import uuid
 
-from aiokafka import AIOKafkaProducer
-from osm_common import dbmongo
-
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.message_bus_client import MessageBusClient
 from osm_policy_module.core.config import Config
 
 log = logging.getLogger(__name__)
 
 
 class LcmClient:
-    def __init__(self, loop=None):
-        cfg = Config.instance()
-        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                           cfg.OSMPOL_MESSAGE_PORT)
-        self.common_db = dbmongo.DbMongo()
-        self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
-                                   'port': int(cfg.OSMPOL_DATABASE_PORT),
-                                   'name': 'osm'})
+    """
+    Client to communicate with LCM through the message bus.
+    """
+
+    def __init__(self, config: Config, loop=None):
+        self.db_client = CommonDbClient(config)
+        self.msg_bus = MessageBusClient(config)
         if not loop:
             loop = asyncio.get_event_loop()
         self.loop = loop
 
-    async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+    async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: str, action: str):
+        """
+        Sends scaling action to LCM through the message bus.
+
+        :param nsr_id: Network service record id
+        :param scaling_group_name: Scaling group name
+        :param vnf_member_index: VNF member index
+        :param action: Scaling action to be executed. Valid values: scale_in, scale_out
+        :return:
+        """
         log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
-        nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
-        self.common_db.create("nslcmops", nslcmop)
-        log.info("Sending scale action message: %s", json.dumps(nslcmop))
-        producer = AIOKafkaProducer(loop=self.loop,
-                                    bootstrap_servers=self.kafka_server,
-                                    key_serializer=str.encode,
-                                    value_serializer=str.encode)
-        await producer.start()
-        try:
-            # Produce message
-            await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
-        finally:
-            # Wait for all pending messages to be delivered or expire.
-            await producer.stop()
-
-    def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
-        log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
+        nsr = self.db_client.get_nsr(nsr_id)
+        nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action, nsr['_admin'])
+        self.db_client.create_nslcmop(nslcmop)
+        log.debug("Sending scale action message: %s", json.dumps(nslcmop))
+        await self.msg_bus.aiowrite("ns", "scale", nslcmop)
+
+    def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: str, action: str, admin: dict):
+        """
+        Builds scaling nslcmop.
+
+        :param nsr_id: Network service record id
+        :param scaling_group_name: Scaling group name
+        :param vnf_member_index: VNF member index
+        :param action: Scaling action to be executed. Valid values: scale_in, scale_out
+        :param admin: Dict corresponding to the _admin section of the nsr. Required keys: projects_read, projects_write.
+        :return:
+        """
+        log.debug("_generate_nslcmop %s %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action, admin)
         _id = str(uuid.uuid4())
         now = time.time()
         params = {
@@ -76,7 +84,7 @@ class LcmClient:
                 "scaleVnfType": action.upper(),
                 "scaleByStepData": {
                     "scaling-group-descriptor": scaling_group_name,
-                    "member-vnf-index": str(vnf_member_index)
+                    "member-vnf-index": vnf_member_index
                 }
             },
             "scaleTime": "{}Z".format(datetime.datetime.utcnow().isoformat())
@@ -96,6 +104,10 @@ class LcmClient:
             "links": {
                 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
                 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
+            },
+            "_admin": {
+                "projects_read": admin['projects_read'],
+                "projects_write": admin['projects_write']
             }
         }
         return nslcmop