Enable black in tox.ini
[osm/PLA.git] / osm_pla / placement / mznplacement.py
index bbb3ddd..d507bbd 100755 (executable)
@@ -25,14 +25,17 @@ class MznPlacementConductor(object):
     """
     Knows how to process placement req using minizinc
     """
-    if platform.system() == 'Windows':
-        default_mzn_path = 'C:\\Program Files\\MiniZinc IDE (bundled)\\minizinc.exe'
+
+    if platform.system() == "Windows":
+        default_mzn_path = "C:\\Program Files\\MiniZinc IDE (bundled)\\minizinc.exe"
     else:
-        default_mzn_path = '/minizinc/bin/minizinc'
+        default_mzn_path = "/minizinc/bin/minizinc"
 
     def __init__(self, log, mzn_path=default_mzn_path):
-        pymzn.config['minizinc'] = mzn_path
-        self.log = log  # FIXME what to log (besides forwarding it to MznModelGenerator) here?
+        pymzn.config["minizinc"] = mzn_path
+        self.log = (
+            log  # FIXME what to log (besides forwarding it to MznModelGenerator) here?
+        )
 
     def _run_placement_model(self, mzn_model, ns_desc, mzn_model_data={}):
         """
@@ -51,23 +54,31 @@ class MznPlacementConductor(object):
         :return: list of dicts formatted as {'vimAccountId': '<account id>', 'member-vnf-index': <'index'>}
         or formatted as [{}] if unsatisfiable model
         """
-        solns = pymzn.minizinc(mzn_model, data=mzn_model_data, output_mode='item')
+        solns = pymzn.minizinc(mzn_model, data=mzn_model_data, output_mode="item")
 
-        if 'UNSATISFIABLE' in str(solns):
+        if "UNSATISFIABLE" in str(solns):
             return [{}]
 
         solns_as_str = str(solns[0])
 
         # make it easier to extract the desired information by cleaning from newline, whitespace etc.
-        solns_as_str = solns_as_str.replace('\n', '').replace(' ', '').rstrip(';')
+        solns_as_str = solns_as_str.replace("\n", "").replace(" ", "").rstrip(";")
 
-        vnf_vim_mapping = (e.split('=') for e in solns_as_str.split(';'))
+        vnf_vim_mapping = (e.split("=") for e in solns_as_str.split(";"))
 
-        res = [{'vimAccountId': e[1][3:].replace('_', '-'), 'member-vnf-index': e[0][3:]} for e in
-               vnf_vim_mapping]
+        res = [
+            {"vimAccountId": e[1][3:].replace("_", "-"), "member-vnf-index": e[0][3:]}
+            for e in vnf_vim_mapping
+        ]
         # add any pinned VNFs
-        pinned = [{'vimAccountId': e['vim_account'][3:].replace('_', '-'), 'member-vnf-index': e['vnf_id']} for e in
-                  ns_desc if 'vim_account' in e.keys()]
+        pinned = [
+            {
+                "vimAccountId": e["vim_account"][3:].replace("_", "-"),
+                "member-vnf-index": e["vnf_id"],
+            }
+            for e in ns_desc
+            if "vim_account" in e.keys()
+        ]
 
         return res + pinned
 
@@ -79,41 +90,47 @@ class MznPlacementConductor(object):
         :return: see _run_placement_model
         """
         mzn_model = MznModelGenerator(self.log).create_model(nspd)
-        return self._run_placement_model(mzn_model, nspd['ns_desc'])
+        return self._run_placement_model(mzn_model, nspd["ns_desc"])
 
 
 class MznModelGenerator(object):
-    '''
+    """
     Has the capability to generate minizinc models from information contained in
     NsPlacementData objects. Uses jinja2 as templating language for the model
-    '''
+    """
+
     default_j2_template = "osm_pla_dynamic_template.j2"
-    template_search_path = ['osm_pla/placement', '../placement', '/pla/osm_pla/placement',
-                            './', '/usr/lib/python3/dist-packages/osm_pla/placement']
+    template_search_path = [
+        "osm_pla/placement",
+        "../placement",
+        "/pla/osm_pla/placement",
+        "./",
+        "/usr/lib/python3/dist-packages/osm_pla/placement",
+    ]
 
     def __init__(self, log):
-        '''
+        """
         Constructor
-        '''
+        """
         self.log = log  # FIXME we do not log anything so far
 
     def create_model(self, ns_placement_data):
-        '''
+        """
         Creates a minizinc model according to the content of nspd
         nspd - NSPlacementData
         return MZNModel
-        '''
-        self.log.info('ns_desc: {}'.format(ns_placement_data['ns_desc']))
-        self.log.info('vld_desc: {}'.format(ns_placement_data['vld_desc']))
+        """
+        self.log.info("ns_desc: {}".format(ns_placement_data["ns_desc"]))
+        self.log.info("vld_desc: {}".format(ns_placement_data["vld_desc"]))
         mzn_model_template = self._load_jinja_template()
         mzn_model = mzn_model_template.render(ns_placement_data)
-        self.log.info('Minizinc model: {}'.format(mzn_model))
+        self.log.info("Minizinc model: {}".format(mzn_model))
         return mzn_model
 
     def _load_jinja_template(self, template_name=default_j2_template):
         """loads the jinja template used for model generation"""
         loader1 = FileSystemLoader(MznModelGenerator.template_search_path)
-        loader2 = PackageLoader('osm_pla', '.')
+        loader2 = PackageLoader("osm_pla", ".")
         env = Environment(loader=ChoiceLoader([loader1, loader2]))
         return env.get_template(template_name)
 
@@ -124,7 +141,15 @@ class NsPlacementDataFactory(object):
     information tailored for the minizinc model code generator
     """
 
-    def __init__(self, vim_accounts_info, vnf_prices, nsd, pil_info, pinning=None, order_constraints=None):
+    def __init__(
+        self,
+        vim_accounts_info,
+        vnf_prices,
+        nsd,
+        pil_info,
+        pinning=None,
+        order_constraints=None,
+    ):
         """
         :param vim_accounts_info: a dictionary with vim url as key and id as value, we add a unique index to it for use
         in the mzn array constructs and adjust the value of the id to minizinc acceptable identifier syntax
@@ -135,8 +160,10 @@ class NsPlacementDataFactory(object):
         :param order_constraints: any constraints provided at instantiation time
         """
         next_idx = itertools.count()
-        self._vim_accounts_info = {k: {'id': 'vim' + v.replace('-', '_'), 'idx': next(next_idx)} for k, v in
-                                   vim_accounts_info.items()}
+        self._vim_accounts_info = {
+            k: {"id": "vim" + v.replace("-", "_"), "idx": next(next_idx)}
+            for k, v in vim_accounts_info.items()
+        }
         self._vnf_prices = vnf_prices
         self._nsd = nsd
         self._pil_info = pil_info
@@ -148,18 +175,21 @@ class NsPlacementDataFactory(object):
         :param characteristics: one of  {pil_latency, pil_price, pil_jitter}
         :return: 2d array of requested trp_link characteristics data
         """
-        if characteristics not in {'pil_latency', 'pil_price', 'pil_jitter'}:
-            raise Exception('characteristic \'{}\' not supported'.format(characteristics))
+        if characteristics not in {"pil_latency", "pil_price", "pil_jitter"}:
+            raise Exception("characteristic '{}' not supported".format(characteristics))
         num_vims = len(self._vim_accounts_info)
-        trp_link_characteristics = [[0 if col == row else 0x7fff for col in range(num_vims)] for row in range(num_vims)]
-        for pil in self._pil_info['pil']:
+        trp_link_characteristics = [
+            [0 if col == row else 0x7FFF for col in range(num_vims)]
+            for row in range(num_vims)
+        ]
+        for pil in self._pil_info["pil"]:
             if characteristics in pil.keys():
-                ep1 = pil['pil_endpoints'][0]
-                ep2 = pil['pil_endpoints'][1]
+                ep1 = pil["pil_endpoints"][0]
+                ep2 = pil["pil_endpoints"][1]
                 # only consider links between applicable vims
                 if ep1 in self._vim_accounts_info and ep2 in self._vim_accounts_info:
-                    idx1 = self._vim_accounts_info[ep1]['idx']
-                    idx2 = self._vim_accounts_info[ep2]['idx']
+                    idx1 = self._vim_accounts_info[ep1]["idx"]
+                    idx2 = self._vim_accounts_info[ep2]["idx"]
                     trp_link_characteristics[idx1][idx2] = pil[characteristics]
                     trp_link_characteristics[idx2][idx1] = pil[characteristics]
 
@@ -175,61 +205,67 @@ class NsPlacementDataFactory(object):
 
         all_vld_member_vnf_index_refs = {}
         # TODO: Change for multiple DF support
-        ns_df = self._nsd.get('df', [{}])[0]
-        for vnf_profile in ns_df.get('vnf-profile', []):
-            for vlc in vnf_profile.get('virtual-link-connectivity', []):
-                vld_id = vlc.get('virtual-link-profile-id')
-                vld_member_vnf_index_ref = vnf_profile.get('id')
+        ns_df = self._nsd.get("df", [{}])[0]
+        for vnf_profile in ns_df.get("vnf-profile", []):
+            for vlc in vnf_profile.get("virtual-link-connectivity", []):
+                vld_id = vlc.get("virtual-link-profile-id")
+                vld_member_vnf_index_ref = vnf_profile.get("id")
                 if vld_id in all_vld_member_vnf_index_refs:
-                    all_vld_member_vnf_index_refs[vld_id].append(vld_member_vnf_index_ref)
+                    all_vld_member_vnf_index_refs[vld_id].append(
+                        vld_member_vnf_index_ref
+                    )
                 else:
                     all_vld_member_vnf_index_refs[vld_id] = [vld_member_vnf_index_ref]
 
         vld_desc = []
-        for vld in self._nsd.get('virtual-link-desc', ()):
-            if vld.get('mgmt-network', False) is True:
+        for vld in self._nsd.get("virtual-link-desc", ()):
+            if vld.get("mgmt-network", False) is True:
                 continue
             vld_desc_entry = {}
-            cp_refs = all_vld_member_vnf_index_refs[vld.get('id')]
+            cp_refs = all_vld_member_vnf_index_refs[vld.get("id")]
             if len(cp_refs) == 2:
-                vld_desc_entry['cp_refs'] = cp_refs
+                vld_desc_entry["cp_refs"] = cp_refs
                 # TODO: Change for multiple DF support
-                vld_df = vld.get('df', [{}])[0]
-                for constraint in vld_df.get('qos', {}):
-                    if constraint == 'latency':
-                        vld_desc_entry['latency'] = vld_df['qos'][constraint]
-                    elif constraint == 'packet-delay-variation':
-                        vld_desc_entry['jitter'] = vld_df['qos'][constraint]
+                vld_df = vld.get("df", [{}])[0]
+                for constraint in vld_df.get("qos", {}):
+                    if constraint == "latency":
+                        vld_desc_entry["latency"] = vld_df["qos"][constraint]
+                    elif constraint == "packet-delay-variation":
+                        vld_desc_entry["jitter"] = vld_df["qos"][constraint]
                 vld_desc.append(vld_desc_entry)
 
         # create candidates from instantiate params
         if self._order_constraints is not None:
             candidate_vld_desc = []
             # use id to find the endpoints in the nsd
-            for entry in self._order_constraints.get('vld-constraints'):
-                for vld in self._nsd.get('virtual-link-desc', ()):
-                    if entry['id'] == vld.get('id'):
+            for entry in self._order_constraints.get("vld-constraints"):
+                for vld in self._nsd.get("virtual-link-desc", ()):
+                    if entry["id"] == vld.get("id"):
                         vld_desc_instantiate_entry = {}
-                        cp_refs = all_vld_member_vnf_index_refs[vld.get('id')]
-                        vld_desc_instantiate_entry['cp_refs'] = cp_refs
+                        cp_refs = all_vld_member_vnf_index_refs[vld.get("id")]
+                        vld_desc_instantiate_entry["cp_refs"] = cp_refs
                         # add whatever constraints that are provided to the vld_desc_entry
                         # misspelled 'link-constraints' => empty dict
                         # lack (or misspelling) of one or both supported constraints => entry not appended
-                        for constraint, value in entry.get('link-constraints', {}).items():
-                            if constraint == 'latency':
-                                vld_desc_instantiate_entry['latency'] = value
-                            elif constraint == 'jitter':
-                                vld_desc_instantiate_entry['jitter'] = value
-                        if {'latency', 'jitter'}.intersection(vld_desc_instantiate_entry.keys()):
+                        for constraint, value in entry.get(
+                            "link-constraints", {}
+                        ).items():
+                            if constraint == "latency":
+                                vld_desc_instantiate_entry["latency"] = value
+                            elif constraint == "jitter":
+                                vld_desc_instantiate_entry["jitter"] = value
+                        if {"latency", "jitter"}.intersection(
+                            vld_desc_instantiate_entry.keys()
+                        ):
                             candidate_vld_desc.append(vld_desc_instantiate_entry)
             # merge with nsd originated, FIXME log any deviations?
             for vld_d in vld_desc:
                 for vld_d_i in candidate_vld_desc:
-                    if set(vld_d['cp_refs']) == set(vld_d_i['cp_refs']):
-                        if vld_d_i.get('jitter'):
-                            vld_d['jitter'] = vld_d_i['jitter']
-                        if vld_d_i.get('latency'):
-                            vld_d['latency'] = vld_d_i['latency']
+                    if set(vld_d["cp_refs"]) == set(vld_d_i["cp_refs"]):
+                        if vld_d_i.get("jitter"):
+                            vld_d["jitter"] = vld_d_i["jitter"]
+                        if vld_d_i.get("latency"):
+                            vld_d["latency"] = vld_d_i["latency"]
 
         return vld_desc
 
@@ -240,38 +276,50 @@ class NsPlacementDataFactory(object):
         """
         ns_desc = []
         # TODO: Change for multiple DF support
-        ns_df = self._nsd.get('df', [{}])[0]
-        for vnf_profile in ns_df.get('vnf-profile', []):
-            vnf_info = {'vnf_id': vnf_profile['id']}
+        ns_df = self._nsd.get("df", [{}])[0]
+        for vnf_profile in ns_df.get("vnf-profile", []):
+            vnf_info = {"vnf_id": vnf_profile["id"]}
             # prices
-            prices_for_vnfd = self._vnf_prices[vnf_profile['vnfd-id']]
+            prices_for_vnfd = self._vnf_prices[vnf_profile["vnfd-id"]]
             # the list of prices must be ordered according to the indexing of the vim_accounts
             price_list = [_ for _ in range(len(self._vim_accounts_info))]
             for k in prices_for_vnfd.keys():
                 if k in self._vim_accounts_info.keys():
-                    price_list[self._vim_accounts_info[k]['idx']] = prices_for_vnfd[k]
-            vnf_info['vnf_price_per_vim'] = price_list
+                    price_list[self._vim_accounts_info[k]["idx"]] = prices_for_vnfd[k]
+            vnf_info["vnf_price_per_vim"] = price_list
 
             # pinning to dc
             if self._pinning is not None:
                 for pinned_vnf in self._pinning:
-                    if vnf_profile['id'] == pinned_vnf['member-vnf-index']:
-                        vnf_info['vim_account'] = 'vim' + pinned_vnf['vimAccountId'].replace('-', '_')
+                    if vnf_profile["id"] == pinned_vnf["member-vnf-index"]:
+                        vnf_info["vim_account"] = "vim" + pinned_vnf[
+                            "vimAccountId"
+                        ].replace("-", "_")
 
             ns_desc.append(vnf_info)
         return ns_desc
 
     def create_ns_placement_data(self):
-        """populate NsPlacmentData object
-        """
-        ns_placement_data = {'vim_accounts': [vim_data['id'] for _, vim_data in sorted(self._vim_accounts_info.items(),
-                                                                                       key=lambda item: item[1][
-                                                                                           'idx'])],
-                             'trp_link_latency': self._produce_trp_link_characteristics_data('pil_latency'),
-                             'trp_link_jitter': self._produce_trp_link_characteristics_data('pil_jitter'),
-                             'trp_link_price_list': self._produce_trp_link_characteristics_data('pil_price'),
-                             'ns_desc': self._produce_ns_desc(),
-                             'vld_desc': self._produce_vld_desc(),
-                             'generator_data': {'file': __file__, 'time': datetime.datetime.now()}}
+        """populate NsPlacmentData object"""
+        ns_placement_data = {
+            "vim_accounts": [
+                vim_data["id"]
+                for _, vim_data in sorted(
+                    self._vim_accounts_info.items(), key=lambda item: item[1]["idx"]
+                )
+            ],
+            "trp_link_latency": self._produce_trp_link_characteristics_data(
+                "pil_latency"
+            ),
+            "trp_link_jitter": self._produce_trp_link_characteristics_data(
+                "pil_jitter"
+            ),
+            "trp_link_price_list": self._produce_trp_link_characteristics_data(
+                "pil_price"
+            ),
+            "ns_desc": self._produce_ns_desc(),
+            "vld_desc": self._produce_vld_desc(),
+            "generator_data": {"file": __file__, "time": datetime.datetime.now()},
+        }
 
         return ns_placement_data