X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=inline;f=osm_pla%2Fserver%2Fserver.py;h=ede6f86379ce37f6bbf34592182a8b3db39574ac;hb=HEAD;hp=a1b7b64bb34c1dde605163a8eb6e107e54853eac;hpb=31181aa29c6c0489b1629877e25fdafb62e3f4e2;p=osm%2FPLA.git diff --git a/osm_pla/server/server.py b/osm_pla/server/server.py index a1b7b64..ede6f86 100644 --- a/osm_pla/server/server.py +++ b/osm_pla/server/server.py @@ -30,36 +30,40 @@ from osm_pla.placement.mznplacement import NsPlacementDataFactory class Server: - pil_price_list_file = Path('/placement/pil_price_list.yaml') - vnf_price_list_file = Path('/placement/vnf_price_list.yaml') + pil_price_list_file = Path("/placement/pil_price_list.yaml") + vnf_price_list_file = Path("/placement/vnf_price_list.yaml") - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): self.log = logging.getLogger("pla.server") self.db = None self.msgBus = None self.config = config - self.loop = loop or asyncio.get_event_loop() try: - if config.get('database', 'driver') == "mongo": + if config.get("database", "driver") == "mongo": self.db = dbmongo.DbMongo() - self.db.db_connect(config.get('database')) - elif config.get('database', 'driver') == "memory": + self.db.db_connect(config.get("database")) + elif config.get("database", "driver") == "memory": self.db = dbmemory.DbMemory() - self.db.db_connect(config.get('database')) + self.db.db_connect(config.get("database")) else: - raise Exception("Invalid configuration param '{}' at '[database]':'driver'".format( - config.get('database', 'driver'))) + raise Exception( + "Invalid configuration param '{}' at '[database]':'driver'".format( + config.get("database", "driver") + ) + ) - if config.get('message', 'driver') == "local": + if config.get("message", "driver") == "local": self.msgBus = msglocal.MsgLocal() - elif config.get('message', 'driver') == "kafka": + elif config.get("message", "driver") == "kafka": self.msgBus = msgkafka.MsgKafka() else: - raise Exception("Invalid message bus driver {}".format( - config.get('message', 'driver'))) - self.msgBus.loop = loop - self.msgBus.connect(config.get('message')) + raise Exception( + "Invalid message bus driver {}".format( + config.get("message", "driver") + ) + ) + self.msgBus.connect(config.get("message")) except Exception as e: self.log.exception("kafka setup error. Exception: {}".format(e)) @@ -78,7 +82,7 @@ class Server: :return: project name to project id mapping """ projects = self.db.get_list("projects") - return {project['_id']: project['name'] for project in projects} + return {project["_id"]: project["name"] for project in projects} def _get_nsd(self, nsd_id): """ @@ -119,10 +123,10 @@ class Server: :return: True if project part of price list, else False """ price_list_entry_keys = set(price_list[0].keys()) - price_list_entry_keys.remove('vnfd') + price_list_entry_keys.remove("vnfd") pl_key = price_list_entry_keys.pop() entry_to_check = price_list[0][pl_key][0].keys() - return True if 'prices' in entry_to_check else False + return True if "prices" in entry_to_check else False def _get_vnf_price_list(self, price_list_file_path, project_name=None): """ @@ -136,12 +140,21 @@ class Server: if self._price_list_with_project(price_list_data): res = {} for i in price_list_data: - price_data = i[project_name] if type(i[project_name]) is dict else i[project_name][0] - res_component = {i['vim_name']: i['price'] for i in price_data['prices']} - res.update({i['vnfd']: res_component}) + price_data = ( + i[project_name] + if type(i[project_name]) is dict + else i[project_name][0] + ) + res_component = { + i["vim_name"]: i["price"] for i in price_data["prices"] + } + res.update({i["vnfd"]: res_component}) return res else: - return {i['vnfd']: {i1['vim_name']: i1['price'] for i1 in i['prices']} for i in price_list_data} + return { + i["vnfd"]: {i1["vim_name"]: i1["price"] for i1 in i["prices"]} + for i in price_list_data + } def _get_pil_info(self, pil_info_file_path): """ @@ -159,9 +172,12 @@ class Server: return tuples with mappings {: } and {: } """ + # TODO: Change for multiple DF support + ns_df = nsd.get("df", [{}])[0] next_idx = itertools.count() - member_vnf_index2mzn = {e['member-vnf-index']: 'VNF' + str(next(next_idx)) for e in - nsd['constituent-vnfd']} + member_vnf_index2mzn = { + e["id"]: "VNF" + str(next(next_idx)) for e in ns_df.get("vnf-profile", []) + } # reverse the name map dictionary, used when the placement result is remapped mzn_name2member_vnf_index = {v: k for k, v in member_vnf_index2mzn.items()} @@ -181,39 +197,47 @@ class Server: """ try: nslcmop = self._get_nslcmop(nslcmop_id) - nsd = self._get_nsd(nslcmop['operationParams']['nsdId']) + nsd = self._get_nsd(nslcmop["operationParams"]["nsdId"]) member_vnf_index2mzn, mzn2member_vnf_index = self._create_vnf_id_maps(nsd) # adjust vnf identifiers - for e in nsd['constituent-vnfd']: - e['member-vnf-index'] = member_vnf_index2mzn[e['member-vnf-index']] - for vld in nsd['vld']: - for cp_ref in vld['vnfd-connection-point-ref']: - cp_ref['member-vnf-index-ref'] = member_vnf_index2mzn[cp_ref['member-vnf-index-ref']] + # TODO: Change for multiple DF support + ns_df = nsd.get("df", [{}])[0] + for vnf_profile in ns_df.get("vnf-profile", []): + vnf_profile["id"] = member_vnf_index2mzn[vnf_profile["id"]] + for vlc in vnf_profile.get("virtual-link-connectivity", []): + for ccpd in vlc.get("constituent-cpd-id", []): + ccpd["constituent-base-element-id"] = member_vnf_index2mzn[ + ccpd["constituent-base-element-id"] + ] self.log.info("adjusted nsd: {}".format(nsd)) projects = self._get_projects() self.log.info("projects: {}".format(projects)) - nslcmop_project = nslcmop['_admin']['projects_read'][0] + nslcmop_project = nslcmop["_admin"]["projects_read"][0] self.log.info("nslcmop_project: {}".format(nslcmop_project)) - valid_vim_accounts = nslcmop['operationParams']['validVimAccounts'] + valid_vim_accounts = nslcmop["operationParams"]["validVimAccounts"] vim_accounts_data = self._get_vim_accounts(valid_vim_accounts) - vims_information = {_['name']: _['_id'] for _ in vim_accounts_data} - price_list = self._get_vnf_price_list(Server.vnf_price_list_file, projects[nslcmop_project]) + vims_information = {_["name"]: _["_id"] for _ in vim_accounts_data} + price_list = self._get_vnf_price_list( + Server.vnf_price_list_file, projects[nslcmop_project] + ) pil_info = self._get_pil_info(Server.pil_price_list_file) - pinnings = nslcmop['operationParams'].get('vnf') + pinnings = nslcmop["operationParams"].get("vnf", []) # remap member-vnf-index values according to id map for pinning in pinnings: - pinning['member-vnf-index'] = member_vnf_index2mzn[pinning['member-vnf-index']] + pinning["member-vnf-index"] = member_vnf_index2mzn[ + pinning["member-vnf-index"] + ] self.log.info("pinnings: {}".format(pinnings)) - order_constraints = nslcmop['operationParams'].get('placement-constraints') + order_constraints = nslcmop["operationParams"].get("placement-constraints") self.log.info("order constraints: {}".format(order_constraints)) - nspd = NsPlacementDataFactory(vims_information, - price_list, - nsd, - pil_info, - pinnings, order_constraints).create_ns_placement_data() + nspd = NsPlacementDataFactory( + vims_information, price_list, nsd, pil_info, pinnings, order_constraints + ).create_ns_placement_data() - vnf_placement = MznPlacementConductor(self.log).do_placement_computation(nspd) + vnf_placement = MznPlacementConductor(self.log).do_placement_computation( + nspd + ) except Exception as e: # Note: there is no cure for failure so we have a catch-all clause here @@ -222,29 +246,30 @@ class Server: finally: # remap names in vnf_placement for e in vnf_placement: - e['member-vnf-index'] = mzn2member_vnf_index[e['member-vnf-index']] - await self.msgBus.aiowrite("pla", "placement", - {'placement': {'vnf': vnf_placement, 'nslcmopId': nslcmop_id}}) + e["member-vnf-index"] = mzn2member_vnf_index[e["member-vnf-index"]] + await self.msgBus.aiowrite( + "pla", + "placement", + {"placement": {"vnf": vnf_placement, "nslcmopId": nslcmop_id}}, + ) def handle_kafka_command(self, topic, command, params): self.log.info("Kafka msg arrived: {} {} {}".format(topic, command, params)) if topic == "pla" and command == "get_placement": - nslcmop_id = params.get('nslcmopId') - self.loop.create_task(self.get_placement(nslcmop_id)) + nslcmop_id = params.get("nslcmopId") + asyncio.create_task(self.get_placement(nslcmop_id)) async def kafka_read(self): self.log.info("Task kafka_read start") while True: try: topics = "pla" - await self.msgBus.aioread(topics, self.loop, self.handle_kafka_command) + await self.msgBus.aioread(topics, self.handle_kafka_command) except Exception as e: self.log.error("kafka read error. Exception: {}".format(e)) - await asyncio.sleep(5, loop=self.loop) + await asyncio.sleep(5) def run(self): - self.loop.run_until_complete(self.kafka_read()) - self.loop.close() - self.loop = None + asyncio.run(self.kafka_read()) if self.msgBus: self.msgBus.disconnect()