Update pip requirements to pass stage2 and stage3 in all modules
[osm/PLA.git] / osm_pla / server / server.py
index 8d25879..ede6f86 100644 (file)
 
 import asyncio
 import logging
-# import platform
+import itertools
 from pathlib import Path
 
-# import pkg_resources
 import yaml
 from osm_common import dbmemory, dbmongo, msglocal, msgkafka
 
@@ -31,36 +30,40 @@ from osm_pla.placement.mznplacement import NsPlacementDataFactory
 
 
 class Server:
-    pil_price_list_file = Path('/placement/pil_price_list.yaml')
-    vnf_price_list_file = Path('/placement/vnf_price_list.yaml')
+    pil_price_list_file = Path("/placement/pil_price_list.yaml")
+    vnf_price_list_file = Path("/placement/vnf_price_list.yaml")
 
-    def __init__(self, config: Config, loop=None):
+    def __init__(self, config: Config):
         self.log = logging.getLogger("pla.server")
         self.db = None
         self.msgBus = None
         self.config = config
-        self.loop = loop or asyncio.get_event_loop()
 
         try:
-            if config.get('database', 'driver') == "mongo":
+            if config.get("database", "driver") == "mongo":
                 self.db = dbmongo.DbMongo()
-                self.db.db_connect(config.get('database'))
-            elif config.get('database', 'driver') == "memory":
+                self.db.db_connect(config.get("database"))
+            elif config.get("database", "driver") == "memory":
                 self.db = dbmemory.DbMemory()
-                self.db.db_connect(config.get('database'))
+                self.db.db_connect(config.get("database"))
             else:
-                raise Exception("Invalid configuration param '{}' at '[database]':'driver'".format(
-                    config.get('database', 'driver')))
+                raise Exception(
+                    "Invalid configuration param '{}' at '[database]':'driver'".format(
+                        config.get("database", "driver")
+                    )
+                )
 
-            if config.get('message', 'driver') == "local":
+            if config.get("message", "driver") == "local":
                 self.msgBus = msglocal.MsgLocal()
-            elif config.get('message', 'driver') == "kafka":
+            elif config.get("message", "driver") == "kafka":
                 self.msgBus = msgkafka.MsgKafka()
             else:
-                raise Exception("Invalid message bus driver {}".format(
-                    config.get('message', 'driver')))
-            self.msgBus.loop = loop
-            self.msgBus.connect(config.get('message'))
+                raise Exception(
+                    "Invalid message bus driver {}".format(
+                        config.get("message", "driver")
+                    )
+                )
+            self.msgBus.connect(config.get("message"))
 
         except Exception as e:
             self.log.exception("kafka setup error. Exception: {}".format(e))
@@ -74,6 +77,13 @@ class Server:
         nslcmop = self.db.get_one("nslcmops", db_filter)
         return nslcmop
 
+    def _get_projects(self):
+        """
+        :return: project name to project id mapping
+        """
+        projects = self.db.get_list("projects")
+        return {project["_id"]: project["name"] for project in projects}
+
     def _get_nsd(self, nsd_id):
         """
         :param nsd_id:
@@ -90,16 +100,61 @@ class Server:
         db_filter = {"_id": vim_account_ids}
         return self.db.get_list("vim_accounts", db_filter)
 
-    def _get_vnf_price_list(self, price_list_file_path):
+    def _read_vnf_price_list(self, price_list_file_path):
+        """
+        read vnf price list configuration file
+        :param price_list_file_path:
+        :return:
+        """
+        with open(str(price_list_file_path)) as pl_fd:
+            price_list = yaml.safe_load_all(pl_fd)
+            return next(price_list)
+
+    def _price_list_with_project(self, price_list):
+        """
+        Figure out if this price list is with  project or not.
+        Note: to handle the unlikely event that a project is called 'prices' we do not simply check if 'prices'
+        is in the dict keys for a price list sequence but rather go down one step in the nesting
+        in which we either have
+        1) 'prices:{vim_url:...}' if prices are also per project, or
+        2) '{vim_url:...}' if prices are only per vim
+
+        :param price_list:
+        :return: True if project part of price list, else False
         """
-        read vnf price list configuration file and reformat its content
+        price_list_entry_keys = set(price_list[0].keys())
+        price_list_entry_keys.remove("vnfd")
+        pl_key = price_list_entry_keys.pop()
+        entry_to_check = price_list[0][pl_key][0].keys()
+        return True if "prices" in entry_to_check else False
 
-        :param: price_list_file: Path to price list file
+    def _get_vnf_price_list(self, price_list_file_path, project_name=None):
+        """
+        read vnf price list configuration file, determine its type and reformat content accordingly
+
+        :param price_list_file_path:
+        :param project_name:
         :return: dictionary formatted as {'<vnfd>': {'<vim-url>':'<price>'}}
         """
-        with open(str(price_list_file_path)) as pl_fd:
-            price_list_data = yaml.safe_load_all(pl_fd)
-            return {i['vnfd']: {i1['vim_url']: i1['price'] for i1 in i['prices']} for i in next(price_list_data)}
+        price_list_data = self._read_vnf_price_list(price_list_file_path)
+        if self._price_list_with_project(price_list_data):
+            res = {}
+            for i in price_list_data:
+                price_data = (
+                    i[project_name]
+                    if type(i[project_name]) is dict
+                    else i[project_name][0]
+                )
+                res_component = {
+                    i["vim_name"]: i["price"] for i in price_data["prices"]
+                }
+                res.update({i["vnfd"]: res_component})
+            return res
+        else:
+            return {
+                i["vnfd"]: {i1["vim_name"]: i1["price"] for i1 in i["prices"]}
+                for i in price_list_data
+            }
 
     def _get_pil_info(self, pil_info_file_path):
         """
@@ -111,6 +166,24 @@ class Server:
             data = yaml.safe_load_all(pil_fd)
             return next(data)
 
+    def _create_vnf_id_maps(self, nsd):
+        """
+        map identifier for 'member-vnf-index' in nsd to syntax that is safe for mzn
+
+         return tuples with mappings {<adjusted id>: <original id>} and {<original id>: <adjusted id>}
+        """
+        # TODO: Change for multiple DF support
+        ns_df = nsd.get("df", [{}])[0]
+        next_idx = itertools.count()
+        member_vnf_index2mzn = {
+            e["id"]: "VNF" + str(next(next_idx)) for e in ns_df.get("vnf-profile", [])
+        }
+
+        # reverse the name map dictionary, used when the placement result is remapped
+        mzn_name2member_vnf_index = {v: k for k, v in member_vnf_index2mzn.items()}
+
+        return member_vnf_index2mzn, mzn_name2member_vnf_index
+
     async def get_placement(self, nslcmop_id):
         """
         - Collects and prepares placement information.
@@ -124,53 +197,79 @@ class Server:
         """
         try:
             nslcmop = self._get_nslcmop(nslcmop_id)
-            nsd = self._get_nsd(nslcmop['operationParams']['nsdId'])
-            self.log.info("nsd: {}".format(nsd))
-            valid_vim_accounts = nslcmop['operationParams']['validVimAccounts']
+            nsd = self._get_nsd(nslcmop["operationParams"]["nsdId"])
+            member_vnf_index2mzn, mzn2member_vnf_index = self._create_vnf_id_maps(nsd)
+            # adjust vnf identifiers
+            # TODO: Change for multiple DF support
+            ns_df = nsd.get("df", [{}])[0]
+            for vnf_profile in ns_df.get("vnf-profile", []):
+                vnf_profile["id"] = member_vnf_index2mzn[vnf_profile["id"]]
+                for vlc in vnf_profile.get("virtual-link-connectivity", []):
+                    for ccpd in vlc.get("constituent-cpd-id", []):
+                        ccpd["constituent-base-element-id"] = member_vnf_index2mzn[
+                            ccpd["constituent-base-element-id"]
+                        ]
+            self.log.info("adjusted nsd: {}".format(nsd))
+            projects = self._get_projects()
+            self.log.info("projects: {}".format(projects))
+            nslcmop_project = nslcmop["_admin"]["projects_read"][0]
+            self.log.info("nslcmop_project: {}".format(nslcmop_project))
+            valid_vim_accounts = nslcmop["operationParams"]["validVimAccounts"]
             vim_accounts_data = self._get_vim_accounts(valid_vim_accounts)
-            vims_information = {_['vim_url']: _['_id'] for _ in vim_accounts_data}
-            price_list = self._get_vnf_price_list(Server.vnf_price_list_file)
+            vims_information = {_["name"]: _["_id"] for _ in vim_accounts_data}
+            price_list = self._get_vnf_price_list(
+                Server.vnf_price_list_file, projects[nslcmop_project]
+            )
             pil_info = self._get_pil_info(Server.pil_price_list_file)
-            pinning = nslcmop['operationParams'].get('vnf')
-            self.log.info("pinning: {}".format(pinning))
-            order_constraints = nslcmop['operationParams'].get('placement-constraints')
+            pinnings = nslcmop["operationParams"].get("vnf", [])
+            # remap member-vnf-index values according to id map
+            for pinning in pinnings:
+                pinning["member-vnf-index"] = member_vnf_index2mzn[
+                    pinning["member-vnf-index"]
+                ]
+            self.log.info("pinnings: {}".format(pinnings))
+            order_constraints = nslcmop["operationParams"].get("placement-constraints")
             self.log.info("order constraints: {}".format(order_constraints))
 
-            nspd = NsPlacementDataFactory(vims_information,
-                                          price_list,
-                                          nsd,
-                                          pil_info,
-                                          pinning, order_constraints).create_ns_placement_data()
+            nspd = NsPlacementDataFactory(
+                vims_information, price_list, nsd, pil_info, pinnings, order_constraints
+            ).create_ns_placement_data()
 
-            vnf_placement = MznPlacementConductor(self.log).do_placement_computation(nspd)
+            vnf_placement = MznPlacementConductor(self.log).do_placement_computation(
+                nspd
+            )
 
         except Exception as e:
             # Note: there is no cure for failure so we have a catch-all clause here
             self.log.exception("PLA fault. Exception: {}".format(e))
             vnf_placement = []
         finally:
-            await self.msgBus.aiowrite("pla", "placement",
-                                       {'placement': {'vnf': vnf_placement, 'nslcmopId': nslcmop_id}})
+            # remap names in vnf_placement
+            for e in vnf_placement:
+                e["member-vnf-index"] = mzn2member_vnf_index[e["member-vnf-index"]]
+            await self.msgBus.aiowrite(
+                "pla",
+                "placement",
+                {"placement": {"vnf": vnf_placement, "nslcmopId": nslcmop_id}},
+            )
 
     def handle_kafka_command(self, topic, command, params):
         self.log.info("Kafka msg arrived: {} {} {}".format(topic, command, params))
         if topic == "pla" and command == "get_placement":
-            nslcmop_id = params.get('nslcmopId')
-            self.loop.create_task(self.get_placement(nslcmop_id))
+            nslcmop_id = params.get("nslcmopId")
+            asyncio.create_task(self.get_placement(nslcmop_id))
 
     async def kafka_read(self):
         self.log.info("Task kafka_read start")
         while True:
             try:
                 topics = "pla"
-                await self.msgBus.aioread(topics, self.loop, self.handle_kafka_command)
+                await self.msgBus.aioread(topics, self.handle_kafka_command)
             except Exception as e:
                 self.log.error("kafka read error. Exception: {}".format(e))
-                await asyncio.sleep(5, loop=self.loop)
+                await asyncio.sleep(5)
 
     def run(self):
-        self.loop.run_until_complete(self.kafka_read())
-        self.loop.close()
-        self.loop = None
+        asyncio.run(self.kafka_read())
         if self.msgBus:
             self.msgBus.disconnect()