5GTANGO LLCM: Added VNFD-based start/stop command injection.
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
index 17dd175..bf70357 100755 (executable)
@@ -42,7 +42,6 @@ import flask_restful as fr
 from collections import defaultdict
 import pkg_resources
 from subprocess import Popen
-from random import randint
 import ipaddress
 import copy
 import time
@@ -74,7 +73,7 @@ DEPLOY_SAP = False
 
 # flag to indicate if we use bidirectional forwarding rules in the
 # automatic chaining process
-BIDIRECTIONAL_CHAIN = False
+BIDIRECTIONAL_CHAIN = True
 
 # override the management interfaces in the descriptors with default
 # docker0 interfaces in the containers
@@ -98,12 +97,12 @@ def generate_subnets(prefix, base, subnet_size=50, mask=24):
 
 
 # private subnet definitions for the generated interfaces
-# 10.10.xxx.0/24
-SAP_SUBNETS = generate_subnets('10.10', 0, subnet_size=50, mask=30)
-# 10.20.xxx.0/30
-ELAN_SUBNETS = generate_subnets('10.20', 0, subnet_size=50, mask=24)
-# 10.30.xxx.0/30
-ELINE_SUBNETS = generate_subnets('10.30', 0, subnet_size=50, mask=30)
+# 99.0.xxx.0/24
+SAP_SUBNETS = generate_subnets('99.0', 0, subnet_size=50, mask=24)
+# 30.0.xxx.0/24
+ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
+# 20.0.xxx.0/24
+ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
 
 # path to the VNFD for the SAP VNF that is deployed as internal SAP point
 SAP_VNFD = None
@@ -215,7 +214,7 @@ class Service(object):
         # VNFDs)
         if not GK_STANDALONE_MODE:
             # self._calculate_placement(FirstDcPlacement)
-            self._calculate_placement(RoundRobinDcPlacementWithSAPs)
+            self._calculate_placement(RoundRobinDcPlacement)
         # 3. start all vnfds that we have in the service (except SAPs)
         for vnf_id in self.vnfds:
             vnfd = self.vnfds[vnf_id]
@@ -238,7 +237,8 @@ class Service(object):
             eline_fwd_links = [l for l in vlinks if (
                 l["connectivity_type"] == "E-Line")]
             elan_fwd_links = [l for l in vlinks if (
-                l["connectivity_type"] == "E-LAN")]
+                l["connectivity_type"] == "E-LAN" or
+                l["connectivity_type"] == "E-Tree")]  # Treat E-Tree as E-LAN
 
             GK.net.deployed_elines.extend(eline_fwd_links)
             GK.net.deployed_elans.extend(elan_fwd_links)
@@ -285,13 +285,37 @@ class Service(object):
             LOG.info("Stopping the SAP instance: %r in DC %r" %
                      (sap_name, target_dc))
 
-        if not GK_STANDALONE_MODE:
-            # remove placement?
-            # self._remove_placement(RoundRobinPlacement)
-            None
         # last step: remove the instance from the list of all instances
         del self.instances[instance_uuid]
 
+    def _get_resource_limits(self, deployment_unit):
+        """
+        Extract resource limits from deployment units.
+        """
+        # defaults
+        cpu_list = "1"
+        cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
+        mem_limit = 0
+        # update from descriptor
+        if "resource_requirements" in deployment_unit:
+            res_req = deployment_unit.get("resource_requirements")
+            cpu_list = res_req.get("cpu").get("cores")
+            if cpu_list is None:
+                cpu_list = res_req.get("cpu").get("vcpus")
+            cpu_bw = res_req.get("cpu").get("cpu_bw", 1.0)
+            cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
+            mem_num = str(res_req.get("memory").get("size", 2))
+            mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
+            mem_limit = float(mem_num)
+            if mem_unit == "GB":
+                mem_limit = mem_limit * 1024 * 1024 * 1024
+            elif mem_unit == "MB":
+                mem_limit = mem_limit * 1024 * 1024
+            elif mem_unit == "KB":
+                mem_limit = mem_limit * 1024
+            mem_limit = int(mem_limit)
+        return cpu_list, cpu_period, cpu_quota, mem_limit
+
     def _start_vnfd(self, vnfd, vnf_id, **kwargs):
         """
         Start a single VNFD of this service
@@ -301,9 +325,11 @@ class Service(object):
         """
         # the vnf_name refers to the container image to be deployed
         vnf_name = vnfd.get("name")
-
+        # combine VDUs and CDUs
+        deployment_units = (vnfd.get("virtual_deployment_units", []) +
+                            vnfd.get("cloudnative_deployment_units", []))
         # iterate over all deployment units within each VNFDs
-        for u in vnfd.get("virtual_deployment_units"):
+        for u in deployment_units:
             # 1. get the name of the docker image to start and the assigned DC
             if vnf_id not in self.remote_docker_image_urls:
                 raise Exception("No image name for %r found. Abort." % vnf_id)
@@ -317,35 +343,16 @@ class Service(object):
                     "Docker image %r not found. Abort." % docker_name)
 
             # 3. get the resource limits
-            res_req = u.get("resource_requirements")
-            cpu_list = res_req.get("cpu").get("cores")
-            if cpu_list is None:
-                cpu_list = res_req.get("cpu").get("vcpus")
-            if cpu_list is None:
-                cpu_list = "1"
-            cpu_bw = res_req.get("cpu").get("cpu_bw")
-            if not cpu_bw:
-                cpu_bw = 1
-            mem_num = str(res_req.get("memory").get("size"))
-            if len(mem_num) == 0:
-                mem_num = "2"
-            mem_unit = str(res_req.get("memory").get("size_unit"))
-            if str(mem_unit) == 0:
-                mem_unit = "GB"
-            mem_limit = float(mem_num)
-            if mem_unit == "GB":
-                mem_limit = mem_limit * 1024 * 1024 * 1024
-            elif mem_unit == "MB":
-                mem_limit = mem_limit * 1024 * 1024
-            elif mem_unit == "KB":
-                mem_limit = mem_limit * 1024
-            mem_lim = int(mem_limit)
-            cpu_period, cpu_quota = self._calculate_cpu_cfs_values(
-                float(cpu_bw))
+            cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
 
             # check if we need to deploy the management ports (defined as
             # type:management both on in the vnfd and nsd)
             intfs = vnfd.get("connection_points", [])
+            # do some re-naming of fields to be compatible to containernet
+            for i in intfs:
+                if i.get("address"):
+                    i["ip"] = i.get("address")
+
             mgmt_intf_names = []
             if USE_DOCKER_MGMT:
                 mgmt_intfs = [vnf_id + ':' + intf['id']
@@ -375,10 +382,23 @@ class Service(object):
             if not os.path.exists(docker_log_path):
                 LOG.debug("Creating folder %s" % docker_log_path)
                 os.makedirs(docker_log_path)
-
             volumes.append(docker_log_path + ":/mnt/share/")
 
-            # 5. do the dc.startCompute(name="foobar") call to run the container
+            # 5. collect additional information to start container
+            cenv = dict()
+            # 5.1 inject descriptor based start/stop commands into env (overwrite)
+            VNFD_CMD_START = u.get("vm_cmd_start")
+            VNFD_CMD_STOP = u.get("vm_cmd_stop")
+            if VNFD_CMD_START and not VNFD_CMD_START == "None":
+                LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
+                         " Overwriting SON_EMU_CMD.")
+                cenv["SON_EMU_CMD"] = VNFD_CMD_START
+            if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
+                LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
+                         " Overwriting SON_EMU_CMD_STOP.")
+                cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
+
+            # 6. do the dc.startCompute(name="foobar") call to run the container
             # TODO consider flavors, and other annotations
             # TODO: get all vnf id's from the nsd for this vnfd and use those as dockername
             # use the vnf_id in the nsd as docker name
@@ -394,10 +414,14 @@ class Service(object):
                 cpu_quota=cpu_quota,
                 cpu_period=cpu_period,
                 cpuset=cpu_list,
-                mem_limit=mem_lim,
+                mem_limit=mem_limit,
                 volumes=volumes,
+                properties=cenv,  # environment
                 type=kwargs.get('type', 'docker'))
 
+            # add vnfd reference to vnfi
+            vnfi.vnfd = vnfd
+
             # rename the docker0 interfaces (eth0) to the management port name
             # defined in the VNFD
             if USE_DOCKER_MGMT:
@@ -472,14 +496,15 @@ class Service(object):
             for env_var in env:
                 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
                 LOG.debug("%r = %r" % (var, cmd))
-                if var == "SON_EMU_CMD":
-                    LOG.info("Executing entry point script in %r: %r" %
-                             (vnfi.name, cmd))
+                if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
+                    LOG.info("Executing script in '{}': {}={}"
+                             .format(vnfi.name, var, cmd))
                     # execute command in new thread to ensure that GK is not
                     # blocked by VNF
                     t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
                     t.daemon = True
                     t.start()
+                    break  # only execute one command
 
     def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
         for vnfi in vnfi_list:
@@ -487,14 +512,15 @@ class Service(object):
             env = config.get("Env", list())
             for env_var in env:
                 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
-                if var == "SON_EMU_CMD_STOP":
-                    LOG.info("Executing stop script in %r: %r" %
-                             (vnfi.name, cmd))
+                if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
+                    LOG.info("Executing script in '{}': {}={}"
+                             .format(vnfi.name, var, cmd))
                     # execute command in new thread to ensure that GK is not
                     # blocked by VNF
                     t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
                     t.daemon = True
                     t.start()
+                    break  # only execute one command
 
     def _unpack_service_package(self):
         """
@@ -650,6 +676,7 @@ class Service(object):
         # eg. different services get a unique cookie for their flowrules
         cookie = 1
         for link in eline_fwd_links:
+            LOG.info("Found E-Line: {}".format(link))
             # check if we need to deploy this link when its a management link:
             if USE_DOCKER_MGMT:
                 if self.check_mgmt_interface(
@@ -699,24 +726,34 @@ class Service(object):
 
             # Link between 2 VNFs
             else:
+                LOG.info("Creating E-Line: src={}, dst={}"
+                         .format(src_id, dst_id))
                 # make sure we use the correct sap vnf name
                 if src_sap_id in self.saps_int:
                     src_id = src_sap_id
                 if dst_sap_id in self.saps_int:
                     dst_id = dst_sap_id
-                # re-configure the VNFs IP assignment and ensure that a new
-                # subnet is used for each E-Link
+                # get involved vnfis
                 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
                 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
+
                 if src_vnfi is not None and dst_vnfi is not None:
+                    setChaining = True
+                    # re-configure the VNFs IP assignment and ensure that a new
+                    # subnet is used for each E-Link
                     eline_net = ELINE_SUBNETS.pop(0)
                     ip1 = "{0}/{1}".format(str(eline_net[1]),
                                            eline_net.prefixlen)
                     ip2 = "{0}/{1}".format(str(eline_net[2]),
                                            eline_net.prefixlen)
-                    self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
-                    self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
-                    setChaining = True
+                    # check if VNFs have fixed IPs (address field in VNFDs)
+                    if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
+                            .get("address") is None):
+                        self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
+                    # check if VNFs have fixed IPs (address field in VNFDs)
+                    if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
+                            .get("address") is None):
+                        self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
 
             # Set the chaining
             if setChaining:
@@ -724,9 +761,18 @@ class Service(object):
                     src_id, dst_id,
                     vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
                     bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
-                LOG.debug(
-                    "Setting up E-Line link. (%s:%s) -> (%s:%s)" % (
-                        src_id, src_if_name, dst_id, dst_if_name))
+
+    def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
+        """
+        Gets the connection point data structure from the VNFD
+        of the given VNFI using ifname.
+        """
+        if vnfi.vnfd is None:
+            return {}
+        cps = vnfi.vnfd.get("connection_points")
+        for cp in cps:
+            if cp.get("id") == ifname:
+                return cp
 
     def _connect_elans(self, elan_fwd_links, instance_uuid):
         """
@@ -798,7 +844,7 @@ class Service(object):
         :return:
         """
         for k, v in self.vnfds.iteritems():
-            for vu in v.get("virtual_deployment_units"):
+            for vu in v.get("virtual_deployment_units", []):
                 if vu.get("vm_image_format") == "docker":
                     vm_image = vu.get("vm_image")
                     docker_path = os.path.join(
@@ -806,19 +852,21 @@ class Service(object):
                         make_relative_path(vm_image))
                     self.local_docker_files[k] = docker_path
                     LOG.debug("Found Dockerfile (%r): %r" % (k, docker_path))
+            for cu in v.get("cloudnative_deployment_units", []):
+                image = cu.get("image")
+                docker_path = os.path.join(
+                    self.package_content_path,
+                    make_relative_path(image))
+                self.local_docker_files[k] = docker_path
+                LOG.debug("Found Dockerfile (%r): %r" % (k, docker_path))
 
     def _load_docker_urls(self):
         """
         Get all URLs to pre-build docker images in some repo.
         :return:
         """
-        # also merge sap dicts, because internal saps also need a docker
-        # container
-        all_vnfs = self.vnfds.copy()
-        all_vnfs.update(self.saps)
-
-        for k, v in all_vnfs.iteritems():
-            for vu in v.get("virtual_deployment_units", {}):
+        for k, v in self.vnfds.iteritems():
+            for vu in v.get("virtual_deployment_units", []):
                 if vu.get("vm_image_format") == "docker":
                     url = vu.get("vm_image")
                     if url is not None:
@@ -826,6 +874,13 @@ class Service(object):
                         self.remote_docker_image_urls[k] = url
                         LOG.debug("Found Docker image URL (%r): %r" %
                                   (k, self.remote_docker_image_urls[k]))
+            for cu in v.get("cloudnative_deployment_units", []):
+                url = cu.get("image")
+                if url is not None:
+                    url = url.replace("http://", "")
+                    self.remote_docker_image_urls[k] = url
+                    LOG.debug("Found Docker image URL (%r): %r" %
+                              (k, self.remote_docker_image_urls[k]))
 
     def _build_images_from_dockerfiles(self):
         """
@@ -964,59 +1019,6 @@ class RoundRobinDcPlacement(object):
             c += 1  # inc. c to use next DC
 
 
-class RoundRobinDcPlacementWithSAPs(object):
-    """
-    Placement: Distribute VNFs across all available DCs in a round robin fashion,
-    every SAP is instantiated on the same DC as the connected VNF.
-    """
-
-    def place(self, nsd, vnfds, saps, dcs):
-
-        # place vnfs
-        c = 0
-        dcs_list = list(dcs.itervalues())
-        for id, vnfd in vnfds.iteritems():
-            vnfd["dc"] = dcs_list[c % len(dcs_list)]
-            c += 1  # inc. c to use next DC
-
-        # place SAPs
-        vlinks = nsd.get("virtual_links", [])
-        eline_fwd_links = [l for l in vlinks if (
-            l["connectivity_type"] == "E-Line")]
-        elan_fwd_links = [l for l in vlinks if (
-            l["connectivity_type"] == "E-LAN")]
-
-        # SAPs on E-Line links are placed on the same DC as the VNF on the
-        # E-Line
-        for link in eline_fwd_links:
-            src_id, src_if_name, src_sap_id = parse_interface(
-                link["connection_points_reference"][0])
-            dst_id, dst_if_name, dst_sap_id = parse_interface(
-                link["connection_points_reference"][1])
-
-            # check if there is a SAP in the link
-            if src_sap_id in saps:
-                # get dc where connected vnf is mapped to
-                dc = vnfds[dst_id]['dc']
-                saps[src_sap_id]['dc'] = dc
-
-            if dst_sap_id in saps:
-                # get dc where connected vnf is mapped to
-                dc = vnfds[src_id]['dc']
-                saps[dst_sap_id]['dc'] = dc
-
-        # SAPs on E-LANs are placed on a random DC
-        dcs_list = list(dcs.itervalues())
-        dc_len = len(dcs_list)
-        for link in elan_fwd_links:
-            for intf in link["connection_points_reference"]:
-                # find SAP interfaces
-                intf_id, intf_name, intf_sap_id = parse_interface(intf)
-                if intf_sap_id in saps:
-                    dc = dcs_list[randint(0, dc_len - 1)]
-                    saps[intf_sap_id]['dc'] = dc
-
-
 """
 Resource definitions and API endpoints
 """
@@ -1049,7 +1051,7 @@ class Packages(fr.Resource):
             file_hash = hashlib.sha1(str(son_file)).hexdigest()
             # ensure that upload folder exists
             ensure_dir(UPLOAD_FOLDER)
-            upload_path = os.path.join(UPLOAD_FOLDER, "%s.son" % service_uuid)
+            upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
             # store *.son file to disk
             if is_file_object:
                 son_file.save(upload_path)