Fix: Changed LLCM to use gevent
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
index d7c24dd..eaa0b5e 100755 (executable)
@@ -39,6 +39,7 @@ import threading
 from docker import DockerClient
 from flask import Flask, request
 import flask_restful as fr
+from gevent.pywsgi import WSGIServer
 from subprocess import Popen
 import ipaddress
 import copy
@@ -240,27 +241,33 @@ class Service(object):
         Extract resource limits from deployment units.
         """
         # defaults
-        cpu_list = "1"
+        cpu_list = None
         cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
-        mem_limit = 0
+        mem_limit = None
         # update from descriptor
         if "resource_requirements" in deployment_unit:
             res_req = deployment_unit.get("resource_requirements")
-            cpu_list = res_req.get("cpu").get("cores")
+            cpu_list = res_req.get("cpu").get("cpuset")
             if cpu_list is None:
                 cpu_list = res_req.get("cpu").get("vcpus")
-            cpu_bw = res_req.get("cpu").get("cpu_bw", 1.0)
+            if cpu_list is not None:
+                # attention: docker expects list as string w/o spaces:
+                cpu_list = str(cpu_list).replace(" ", "").strip()
+            cpu_bw = res_req.get("cpu").get("cpu_bw")
+            if cpu_bw is None:
+                cpu_bw = 1.0
             cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
-            mem_num = str(res_req.get("memory").get("size", 2))
+            mem_limit = res_req.get("memory").get("size")
             mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
-            mem_limit = float(mem_num)
-            if mem_unit == "GB":
-                mem_limit = mem_limit * 1024 * 1024 * 1024
-            elif mem_unit == "MB":
-                mem_limit = mem_limit * 1024 * 1024
-            elif mem_unit == "KB":
-                mem_limit = mem_limit * 1024
-            mem_limit = int(mem_limit)
+            if mem_limit is not None:
+                mem_limit = int(mem_limit)
+                # to bytes
+                if "G" in mem_unit:
+                    mem_limit = mem_limit * 1024 * 1024 * 1024
+                elif "M" in mem_unit:
+                    mem_limit = mem_limit * 1024 * 1024
+                elif "K" in mem_unit:
+                    mem_limit = mem_limit * 1024
         return cpu_list, cpu_period, cpu_quota, mem_limit
 
     def _start_vnfd(self, vnfd, vnf_id, **kwargs):
@@ -303,6 +310,26 @@ class Service(object):
                 if i.get("address"):
                     i["ip"] = i.get("address")
 
+            # get ports and port_bindings from the port and publish fields of CNFD
+            # see: https://github.com/containernet/containernet/wiki/Exposing-and-mapping-network-ports
+            ports = list()  # Containernet naming
+            port_bindings = dict()
+            for i in intfs:
+                if i.get("port"):
+                    if not isinstance(i.get("port"), int):
+                        LOG.error("Field 'port' is no int CP: {}".format(i))
+                    else:
+                        ports.append(i.get("port"))
+                if i.get("publish"):
+                    if not isinstance(i.get("publish"), dict):
+                        LOG.error("Field 'publish' is no dict CP: {}".format(i))
+                    else:
+                        port_bindings.update(i.get("publish"))
+            if len(ports) > 0:
+                LOG.info("{} exposes ports: {}".format(vnf_container_name, ports))
+            if len(port_bindings) > 0:
+                LOG.info("{} publishes ports: {}".format(vnf_container_name, port_bindings))
+
             # 5. collect additional information to start container
             volumes = list()
             cenv = dict()
@@ -329,10 +356,12 @@ class Service(object):
                 image=docker_image_name,
                 cpu_quota=cpu_quota,
                 cpu_period=cpu_period,
-                cpuset=cpu_list,
+                cpuset_cpus=cpu_list,
                 mem_limit=mem_limit,
                 volumes=volumes,
                 properties=cenv,  # environment
+                ports=ports,
+                port_bindings=port_bindings,
                 type=kwargs.get('type', 'docker'))
             # add vnfd reference to vnfi
             vnfi.vnfd = vnfd
@@ -357,7 +386,7 @@ class Service(object):
 
     def _get_vnf_instance(self, instance_uuid, vnf_id):
         """
-        Returns VNFI object for a given "vnf_id" or "vnf_container_namse" taken from an NSD.
+        Returns VNFI object for a given "vnf_id" or "vnf_container_name" taken from an NSD.
         :return: single object
         """
         for vnfi in self.instances[instance_uuid]["vnf_instances"]:
@@ -372,6 +401,8 @@ class Service(object):
         "vnf_id" taken from an NSD.
         :return: list
         """
+        if vnf_id is None:
+            return None
         r = list()
         for vnfi in self.instances[instance_uuid]["vnf_instances"]:
             if vnf_id in vnfi.name:
@@ -417,7 +448,6 @@ class Service(object):
             env = config.get("Env", list())
             for env_var in env:
                 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
-                # LOG.debug("%r = %r" % (var, cmd))
                 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
                     LOG.info("Executing script in '{}': {}={}"
                              .format(vnfi.name, var, cmd))
@@ -522,23 +552,33 @@ class Service(object):
         cookie = 1
         for link in eline_fwd_links:
             LOG.info("Found E-Line: {}".format(link))
-            # check if we need to deploy this link when its a management link:
-            if USE_DOCKER_MGMT:
-                if self.check_mgmt_interface(
-                        link["connection_points_reference"]):
-                    continue
-
             src_id, src_if_name = parse_interface(
                 link["connection_points_reference"][0])
             dst_id, dst_if_name = parse_interface(
                 link["connection_points_reference"][1])
-            setChaining = False
-            LOG.info("Creating E-Line: src={}, dst={}"
-                     .format(src_id, dst_id))
+            LOG.info("Searching C/VDU for E-Line: src={}, src_if={}, dst={}, dst_if={}"
+                     .format(src_id, src_if_name, dst_id, dst_if_name))
+            # handle C/VDUs (ugly hack, only one V/CDU per VNF for now)
+            src_units = self._get_vnf_instance_units(instance_uuid, src_id)
+            dst_units = self._get_vnf_instance_units(instance_uuid, dst_id)
+            if src_units is None or dst_units is None:
+                LOG.info("No VNF-VNF link. Skipping: src={}, src_if={}, dst={}, dst_if={}"
+                         .format(src_id, src_if_name, dst_id, dst_if_name))
+                return
+            # we only support VNFs with one V/CDU right now
+            if len(src_units) != 1 or len(dst_units) != 1:
+                raise BaseException("LLCM does not support E-LINES for multi V/CDU VNFs.")
+            # get the full name from that C/VDU and use it as src_id and dst_id
+            src_id = src_units[0].name
+            dst_id = dst_units[0].name
+            # from here we have all info we need
+            LOG.info("Creating E-Line for C/VDU: src={}, src_if={}, dst={}, dst_if={}"
+                     .format(src_id, src_if_name, dst_id, dst_if_name))
             # get involved vnfis
-            src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
-            dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
-
+            src_vnfi = src_units[0]
+            dst_vnfi = dst_units[0]
+            # proceed with chaining setup
+            setChaining = False
             if src_vnfi is not None and dst_vnfi is not None:
                 setChaining = True
                 # re-configure the VNFs IP assignment and ensure that a new
@@ -548,13 +588,17 @@ class Service(object):
                                        eline_net.prefixlen)
                 ip2 = "{0}/{1}".format(str(eline_net[2]),
                                        eline_net.prefixlen)
-                # check if VNFs have fixed IPs (address field in VNFDs)
-                if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
-                        .get("address") is None):
+                # check if VNFs have fixed IPs (ip/address field in VNFDs)
+                if (self._get_vnfd_cp_from_vnfi(
+                    src_vnfi, src_if_name).get("ip") is None and
+                    self._get_vnfd_cp_from_vnfi(
+                        src_vnfi, src_if_name).get("address") is None):
                     self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
-                # check if VNFs have fixed IPs (address field in VNFDs)
-                if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
-                        .get("address") is None):
+                # check if VNFs have fixed IPs (ip field in VNFDs)
+                if (self._get_vnfd_cp_from_vnfi(
+                    dst_vnfi, dst_if_name).get("ip") is None and
+                    self._get_vnfd_cp_from_vnfi(
+                        dst_vnfi, dst_if_name).get("address") is None):
                     self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
             # set the chaining
             if setChaining:
@@ -883,7 +927,15 @@ class Instantiations(fr.Resource):
         # try to extract the service uuid from the request
         json_data = request.get_json(force=True)
         service_uuid = json_data.get("service_uuid")
-
+        service_name = json_data.get("service_name")
+
+        # first try to find by service_name
+        if service_name is not None:
+            for s_uuid, s in GK.services.iteritems():
+                if s.manifest.get("name") == service_name:
+                    LOG.info("Found service: {} with UUID: {}"
+                             .format(service_name, s_uuid))
+                    service_uuid = s_uuid
         # lets be a bit fuzzy here to make testing easier
         if (service_uuid is None or service_uuid ==
                 "latest") and len(GK.services) > 0:
@@ -912,24 +964,32 @@ class Instantiations(fr.Resource):
         """
         # try to extract the service  and instance UUID from the request
         json_data = request.get_json(force=True)
-        service_uuid = json_data.get("service_uuid")
-        instance_uuid = json_data.get("service_instance_uuid")
-
+        service_uuid_input = json_data.get("service_uuid")
+        instance_uuid_input = json_data.get("service_instance_uuid")
+        if len(GK.services) < 1:
+            return "No service on-boarded.", 404
         # try to be fuzzy
-        if service_uuid is None and len(GK.services) > 0:
-            # if we don't get a service uuid, we simply stop the last service
-            # in the list
-            service_uuid = list(GK.services.iterkeys())[0]
-        if instance_uuid is None and len(
-                GK.services[service_uuid].instances) > 0:
-            instance_uuid = list(
-                GK.services[service_uuid].instances.iterkeys())[0]
-
-        if service_uuid in GK.services and instance_uuid in GK.services[service_uuid].instances:
-            # valid service and instance UUID, stop service
-            GK.services.get(service_uuid).stop_service(instance_uuid)
-            return "service instance with uuid %r stopped." % instance_uuid, 200
-        return "Service not found", 404
+        if service_uuid_input is None:
+            # if we don't get a service uuid we stop all services
+            service_uuid_list = list(GK.services.iterkeys())
+            LOG.info("No service_uuid given, stopping all.")
+        else:
+            service_uuid_list = [service_uuid_input]
+        # for each service
+        for service_uuid in service_uuid_list:
+            if instance_uuid_input is None:
+                instance_uuid_list = list(
+                    GK.services[service_uuid].instances.iterkeys())
+            else:
+                instance_uuid_list = [instance_uuid_input]
+            # for all service instances
+            for instance_uuid in instance_uuid_list:
+                if (service_uuid in GK.services and
+                        instance_uuid in GK.services[service_uuid].instances):
+                    # valid service and instance UUID, stop service
+                    GK.services.get(service_uuid).stop_service(instance_uuid)
+                    LOG.info("Service instance with uuid %r stopped." % instance_uuid)
+        return "Service(s) stopped.", 200
 
 
 class Exit(fr.Resource):
@@ -969,6 +1029,7 @@ def initialize_GK():
 GK = None
 initialize_GK()
 # setup Flask
+http_server = None
 app = Flask(__name__)
 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024  # 512 MB max upload
 api = fr.Api(app)
@@ -980,14 +1041,22 @@ api.add_resource(Exit, '/emulator/exit')
 
 
 def start_rest_api(host, port, datacenters=dict()):
+    global http_server
     GK.dcs = datacenters
     GK.net = get_dc_network()
     # start the Flask server (not the best performance but ok for our use case)
-    app.run(host=host,
-            port=port,
-            debug=True,
-            use_reloader=False  # this is needed to run Flask in a non-main thread
-            )
+    # app.run(host=host,
+    #        port=port,
+    #        debug=True,
+    #        use_reloader=False  # this is needed to run Flask in a non-main thread
+    #        )
+    http_server = WSGIServer((host, port), app, log=open("/dev/null", "w"))
+    http_server.serve_forever()
+
+
+def stop_rest_api():
+    if http_server:
+        http_server.close()
 
 
 def ensure_dir(name):