X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fvim-emu.git;a=blobdiff_plain;f=src%2Femuvim%2Fapi%2Ftango%2Fllcm.py;h=6e570d009951b107fc4645613edfbe135c0c2ecb;hp=d7c24dd10416c8b5b30b533f36a9549a2d7b6ea7;hb=efdda12a005a63a449ea7afb9baab1743b4d04ab;hpb=17008d0f9109faae34dd2e85cc7d481ffea57406 diff --git a/src/emuvim/api/tango/llcm.py b/src/emuvim/api/tango/llcm.py index d7c24dd..6e570d0 100755 --- a/src/emuvim/api/tango/llcm.py +++ b/src/emuvim/api/tango/llcm.py @@ -1,3 +1,4 @@ + # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University # ALL RIGHTS RESERVED. # @@ -36,9 +37,11 @@ import hashlib import zipfile import yaml import threading +import datetime from docker import DockerClient from flask import Flask, request import flask_restful as fr +from gevent.pywsgi import WSGIServer from subprocess import Popen import ipaddress import copy @@ -49,6 +52,10 @@ LOG = logging.getLogger("5gtango.llcm") LOG.setLevel(logging.INFO) +CORS_HEADER = {'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET,OPTIONS'} + + GK_STORAGE = "/tmp/vim-emu-tango-llcm/" UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/") CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/") @@ -85,6 +92,20 @@ ELINE_SUBNETS = None # Time in seconds to wait for vnf stop scripts to execute fully VNF_STOP_WAIT_TIME = 5 +# If services are instantiated multiple times, the public port +# mappings need to be adapted to avoid colisions. We use this +# offset for this: NEW_PORT (SSIID * OFFSET) + ORIGINAL_PORT +MULTI_INSTANCE_PORT_OFFSET = 1000 + +# Selected Placement Algorithm: Points to the class of the selected +# placement algorithm. +PLACEMENT_ALGORITHM_OBJ = None + +# Path to folder with .env.yml files that contain +# environment variables injected into the specific container +# when it is started. +PER_INSTANCE_ENV_CONFIGURATION_FOLDER = None + class OnBoardingException(BaseException): pass @@ -134,6 +155,8 @@ class Service(object): self.local_docker_files = dict() self.remote_docker_image_urls = dict() self.instances = dict() + self._instance_counter = 0 + self.created_at = str(datetime.datetime.now()) def onboard(self): """ @@ -157,7 +180,14 @@ class Service(object): else: self._load_docker_urls() self._pull_predefined_dockerimages() - LOG.info("On-boarded service: %r" % self.manifest.get("name")) + # 4. reserve subnets + eline_fwd_links, elan_fwd_links = self._get_elines_and_elans() + self.eline_subnets = [ELINE_SUBNETS.pop(0) for _ in eline_fwd_links] + self.elan_subnets = [ELAN_SUBNETS.pop(0) for _ in elan_fwd_links] + LOG.debug("Reserved subnets for service '{}': E-Line: {} / E-LAN: {}" + .format(self.manifest.get("name"), + self.eline_subnets, self.elan_subnets)) + LOG.info("On-boarded service: {}".format(self.manifest.get("name"))) def start_service(self): """ @@ -167,51 +197,52 @@ class Service(object): by the placement algorithm. :return: """ - LOG.info("Starting service %r" % self.uuid) + LOG.info("Starting service {} ({})" + .format(get_triple_id(self.nsd), self.uuid)) # 1. each service instance gets a new uuid to identify it instance_uuid = str(uuid.uuid4()) # build a instances dict (a bit like a NSR :)) self.instances[instance_uuid] = dict() + self.instances[instance_uuid]["uuid"] = self.uuid + # SSIID = short service instance ID (to postfix Container names) + self.instances[instance_uuid]["ssiid"] = self._instance_counter + self.instances[instance_uuid]["name"] = get_triple_id(self.nsd) self.instances[instance_uuid]["vnf_instances"] = list() + self.instances[instance_uuid]["created_at"] = str(datetime.datetime.now()) + # increase for next instance + self._instance_counter += 1 - # 2. compute placement of this service instance (adds DC names to - # VNFDs) - # self._calculate_placement(FirstDcPlacement) - self._calculate_placement(RoundRobinDcPlacement) # 3. start all vnfds that we have in the service for vnf_id in self.vnfds: vnfd = self.vnfds[vnf_id] # attention: returns a list of started deployment units - vnfis = self._start_vnfd(vnfd, vnf_id) + vnfis = self._start_vnfd( + vnfd, vnf_id, self.instances[instance_uuid]["ssiid"]) # add list of VNFIs to total VNFI list self.instances[instance_uuid]["vnf_instances"].extend(vnfis) # 4. Deploy E-Line, E-Tree and E-LAN links # Attention: Only done if ""forwarding_graphs" section in NSD exists, # even if "forwarding_graphs" are not used directly. - if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd: - vlinks = self.nsd["virtual_links"] - # constituent virtual links are not checked - eline_fwd_links = [l for l in vlinks if ( - l["connectivity_type"] == "E-Line")] - elan_fwd_links = [l for l in vlinks if ( - l["connectivity_type"] == "E-LAN" or - l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN - - # 5a. deploy E-Line links - GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping - self._connect_elines(eline_fwd_links, instance_uuid) - # 5b. deploy E-Tree/E-LAN links - GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping - self._connect_elans(elan_fwd_links, instance_uuid) + # Attention2: Do a copy of *_subnets with list() is important here! + eline_fwd_links, elan_fwd_links = self._get_elines_and_elans() + # 5a. deploy E-Line links + GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping + self._connect_elines(eline_fwd_links, instance_uuid, list(self.eline_subnets)) + # 5b. deploy E-Tree/E-LAN links + GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping + self._connect_elans(elan_fwd_links, instance_uuid, list(self.elan_subnets)) # 6. run the emulator specific entrypoint scripts in the VNFIs of this # service instance self._trigger_emulator_start_scripts_in_vnfis( self.instances[instance_uuid]["vnf_instances"]) # done - LOG.info("Service started. Instance id: %r" % instance_uuid) + LOG.info("Service '{}' started. Instance id: {} SSIID: {}" + .format(self.instances[instance_uuid]["name"], + instance_uuid, + self.instances[instance_uuid]["ssiid"])) return instance_uuid def stop_service(self, instance_uuid): @@ -235,35 +266,59 @@ class Service(object): # last step: remove the instance from the list of all instances del self.instances[instance_uuid] + def _get_elines_and_elans(self): + """ + Get the E-Line, E-LAN, E-Tree links from the NSD. + """ + # Attention: Only done if ""forwarding_graphs" section in NSD exists, + # even if "forwarding_graphs" are not used directly. + eline_fwd_links = list() + elan_fwd_links = list() + if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd: + vlinks = self.nsd["virtual_links"] + # constituent virtual links are not checked + eline_fwd_links = [l for l in vlinks if ( + l["connectivity_type"] == "E-Line")] + elan_fwd_links = [l for l in vlinks if ( + l["connectivity_type"] == "E-LAN" or + l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN + return eline_fwd_links, elan_fwd_links + def _get_resource_limits(self, deployment_unit): """ Extract resource limits from deployment units. """ # defaults - cpu_list = "1" + cpu_list = None cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0)) - mem_limit = 0 + mem_limit = None # update from descriptor if "resource_requirements" in deployment_unit: res_req = deployment_unit.get("resource_requirements") - cpu_list = res_req.get("cpu").get("cores") + cpu_list = res_req.get("cpu").get("cpuset") if cpu_list is None: cpu_list = res_req.get("cpu").get("vcpus") - cpu_bw = res_req.get("cpu").get("cpu_bw", 1.0) + if cpu_list is not None: + # attention: docker expects list as string w/o spaces: + cpu_list = str(cpu_list).replace(" ", "").strip() + cpu_bw = res_req.get("cpu").get("cpu_bw") + if cpu_bw is None: + cpu_bw = 1.0 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw)) - mem_num = str(res_req.get("memory").get("size", 2)) + mem_limit = res_req.get("memory").get("size") mem_unit = str(res_req.get("memory").get("size_unit", "GB")) - mem_limit = float(mem_num) - if mem_unit == "GB": - mem_limit = mem_limit * 1024 * 1024 * 1024 - elif mem_unit == "MB": - mem_limit = mem_limit * 1024 * 1024 - elif mem_unit == "KB": - mem_limit = mem_limit * 1024 - mem_limit = int(mem_limit) + if mem_limit is not None: + mem_limit = int(mem_limit) + # to bytes + if "G" in mem_unit: + mem_limit = mem_limit * 1024 * 1024 * 1024 + elif "M" in mem_unit: + mem_limit = mem_limit * 1024 * 1024 + elif "K" in mem_unit: + mem_limit = mem_limit * 1024 return cpu_list, cpu_period, cpu_quota, mem_limit - def _start_vnfd(self, vnfd, vnf_id, **kwargs): + def _start_vnfd(self, vnfd, vnf_id, ssiid, **kwargs): """ Start a single VNFD of this service :param vnfd: vnfd descriptor dict @@ -280,12 +335,13 @@ class Service(object): for u in deployment_units: # 0. vnf_container_name = vnf_id.vdu_id vnf_container_name = get_container_name(vnf_id, u.get("id")) + vnf_container_instance_name = get_container_name(vnf_id, u.get("id"), ssiid) # 1. get the name of the docker image to star if vnf_container_name not in self.remote_docker_image_urls: raise Exception("No image name for %r found. Abort." % vnf_container_name) docker_image_name = self.remote_docker_image_urls.get(vnf_container_name) # 2. select datacenter to start the VNF in - target_dc = vnfd.get("dc") + target_dc = self._place(vnfd, vnf_id, u, ssiid) # 3. perform some checks to ensure we can start the container assert(docker_image_name is not None) assert(target_dc is not None) @@ -301,8 +357,51 @@ class Service(object): # do some re-naming of fields to be compatible to containernet for i in intfs: if i.get("address"): + LOG.info("Found static address for {}: {}" + .format(i.get("id"), i.get("address"))) i["ip"] = i.get("address") + # get ports and port_bindings from the port and publish fields of CNFD + # see: https://github.com/containernet/containernet/wiki/Exposing-and-mapping-network-ports + ports = list() # Containernet naming + port_bindings = dict() + for i in intfs: + if i.get("port"): # field with a single port + if not isinstance(i.get("port"), int): + LOG.info("Field 'port' is no int CP: {}".format(i)) + else: + ports.append(i.get("port")) # collect all ports + if i.get("ports"): # list with multiple ports + if not isinstance(i.get("ports"), list): + LOG.info("Field 'port' is no list CP: {}".format(i)) + else: + for p in i.get("ports"): + if not isinstance(p, int): + # do some parsing + try: + if "/udp" in p: + p = tuple(p.split("/")) + else: + p = int(p) + ports.append(p) # collect all ports + except BaseException as ex: + LOG.error( + "Could not parse ports list: {}".format(p)) + LOG.error(ex) + else: + ports.append(p) # collect all ports + if i.get("publish"): + if not isinstance(i.get("publish"), dict): + LOG.info("Field 'publish' is no dict CP: {}".format(i)) + else: + port_bindings.update(i.get("publish")) + # update port mapping for cases where service is deployed > 1 times + port_bindings = update_port_mapping_multi_instance(ssiid, port_bindings) + if len(ports) > 0: + LOG.info("{} exposes ports: {}".format(vnf_container_instance_name, ports)) + if len(port_bindings) > 0: + LOG.info("{} publishes ports: {}".format(vnf_container_instance_name, port_bindings)) + # 5. collect additional information to start container volumes = list() cenv = dict() @@ -318,26 +417,46 @@ class Service(object): " Overwriting SON_EMU_CMD_STOP.") cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP + # 5.2 inject per instance configurations based on envs + conf_envs = self._load_instance_conf_envs(vnf_container_instance_name) + cenv.update(conf_envs) + + # 5.3 handle optional ipc_mode setting + ipc_mode = u.get("ipc_mode", None) + # 5.4 handle optional devices setting + devices = u.get("devices", []) + # 5.5 handle optional cap_add setting + cap_add = u.get("cap_add", []) + # 6. Start the container LOG.info("Starting %r as %r in DC %r" % - (vnf_name, vnf_container_name, vnfd.get("dc"))) + (vnf_name, vnf_container_instance_name, target_dc)) LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs)) # start the container vnfi = target_dc.startCompute( - vnf_container_name, + vnf_container_instance_name, network=intfs, image=docker_image_name, cpu_quota=cpu_quota, cpu_period=cpu_period, - cpuset=cpu_list, + cpuset_cpus=cpu_list, mem_limit=mem_limit, volumes=volumes, properties=cenv, # environment + ports=ports, + port_bindings=port_bindings, + # only publish if explicitly stated in descriptor + publish_all_ports=False, + ipc_mode=ipc_mode, + devices=devices, + cap_add=cap_add, type=kwargs.get('type', 'docker')) # add vnfd reference to vnfi vnfi.vnfd = vnfd # add container name vnfi.vnf_container_name = vnf_container_name + vnfi.vnf_container_instance_name = vnf_container_instance_name + vnfi.ssiid = ssiid # store vnfi vnfis.append(vnfi) return vnfis @@ -357,7 +476,7 @@ class Service(object): def _get_vnf_instance(self, instance_uuid, vnf_id): """ - Returns VNFI object for a given "vnf_id" or "vnf_container_namse" taken from an NSD. + Returns VNFI object for a given "vnf_id" or "vnf_container_name" taken from an NSD. :return: single object """ for vnfi in self.instances[instance_uuid]["vnf_instances"]: @@ -372,6 +491,8 @@ class Service(object): "vnf_id" taken from an NSD. :return: list """ + if vnf_id is None: + return None r = list() for vnfi in self.instances[instance_uuid]["vnf_instances"]: if vnf_id in vnfi.name: @@ -417,7 +538,6 @@ class Service(object): env = config.get("Env", list()) for env_var in env: var, cmd = map(str.strip, map(str, env_var.split('=', 1))) - # LOG.debug("%r = %r" % (var, cmd)) if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD": LOG.info("Executing script in '{}': {}={}" .format(vnfi.name, var, cmd)) @@ -444,6 +564,26 @@ class Service(object): t.start() break # only execute one command + def _load_instance_conf_envs(self, cname): + """ + Try to load an instance-specific env file. If not found, + just return an empty dict. + """ + if PER_INSTANCE_ENV_CONFIGURATION_FOLDER is None: + return dict() + try: + path = os.path.expanduser(PER_INSTANCE_ENV_CONFIGURATION_FOLDER) + path = os.path.join(path, "{}.env.yml".format(cname)) + res = load_yaml(path) + LOG.info("Loaded instance-specific env file for '{}': {}" + .format(cname, res)) + return res + except BaseException as ex: + LOG.info("No instance-specific env file found for: {}" + .format(cname)) + del ex + return dict() + def _unpack_service_package(self): """ unzip *.son file and store contents in CATALOG_FOLDER/services// @@ -509,12 +649,13 @@ class Service(object): LOG.debug("Loaded VNFD: {0} id: {1}" .format(v.get("vnf_name"), v.get("vnf_id"))) - def _connect_elines(self, eline_fwd_links, instance_uuid): + def _connect_elines(self, eline_fwd_links, instance_uuid, subnets): """ Connect all E-LINE links in the NSD Attention: This method DOES NOT support multi V/CDU VNFs! :param eline_fwd_links: list of E-LINE links in the NSD :param: instance_uuid of the service + :param: subnets list of subnets to be used :return: """ # cookie is used as identifier for the flowrules installed by the dummygatekeeper @@ -522,39 +663,53 @@ class Service(object): cookie = 1 for link in eline_fwd_links: LOG.info("Found E-Line: {}".format(link)) - # check if we need to deploy this link when its a management link: - if USE_DOCKER_MGMT: - if self.check_mgmt_interface( - link["connection_points_reference"]): - continue - src_id, src_if_name = parse_interface( link["connection_points_reference"][0]) dst_id, dst_if_name = parse_interface( link["connection_points_reference"][1]) - setChaining = False - LOG.info("Creating E-Line: src={}, dst={}" - .format(src_id, dst_id)) + LOG.info("Searching C/VDU for E-Line: src={}, src_if={}, dst={}, dst_if={}" + .format(src_id, src_if_name, dst_id, dst_if_name)) + # handle C/VDUs (ugly hack, only one V/CDU per VNF for now) + src_units = self._get_vnf_instance_units(instance_uuid, src_id) + dst_units = self._get_vnf_instance_units(instance_uuid, dst_id) + if src_units is None or dst_units is None: + LOG.info("No VNF-VNF link. Skipping: src={}, src_if={}, dst={}, dst_if={}" + .format(src_id, src_if_name, dst_id, dst_if_name)) + return + # we only support VNFs with one V/CDU right now + if len(src_units) != 1 or len(dst_units) != 1: + raise BaseException("LLCM does not support E-LINES for multi V/CDU VNFs.") + # get the full name from that C/VDU and use it as src_id and dst_id + src_id = src_units[0].name + dst_id = dst_units[0].name + # from here we have all info we need + LOG.info("Creating E-Line for C/VDU: src={}, src_if={}, dst={}, dst_if={}" + .format(src_id, src_if_name, dst_id, dst_if_name)) # get involved vnfis - src_vnfi = self._get_vnf_instance(instance_uuid, src_id) - dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id) - + src_vnfi = src_units[0] + dst_vnfi = dst_units[0] + # proceed with chaining setup + setChaining = False if src_vnfi is not None and dst_vnfi is not None: setChaining = True # re-configure the VNFs IP assignment and ensure that a new # subnet is used for each E-Link - eline_net = ELINE_SUBNETS.pop(0) + eline_net = subnets.pop(0) ip1 = "{0}/{1}".format(str(eline_net[1]), eline_net.prefixlen) ip2 = "{0}/{1}".format(str(eline_net[2]), eline_net.prefixlen) - # check if VNFs have fixed IPs (address field in VNFDs) - if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name) - .get("address") is None): + # check if VNFs have fixed IPs (ip/address field in VNFDs) + if (self._get_vnfd_cp_from_vnfi( + src_vnfi, src_if_name).get("ip") is None and + self._get_vnfd_cp_from_vnfi( + src_vnfi, src_if_name).get("address") is None): self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1) - # check if VNFs have fixed IPs (address field in VNFDs) - if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name) - .get("address") is None): + # check if VNFs have fixed IPs (ip field in VNFDs) + if (self._get_vnfd_cp_from_vnfi( + dst_vnfi, dst_if_name).get("ip") is None and + self._get_vnfd_cp_from_vnfi( + dst_vnfi, dst_if_name).get("address") is None): self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2) # set the chaining if setChaining: @@ -575,24 +730,25 @@ class Service(object): if cp.get("id") == ifname: return cp - def _connect_elans(self, elan_fwd_links, instance_uuid): + def _connect_elans(self, elan_fwd_links, instance_uuid, subnets): """ Connect all E-LAN/E-Tree links in the NSD This method supports multi-V/CDU VNFs if the connection point names of the DUs are the same as the ones in the NSD. :param elan_fwd_links: list of E-LAN links in the NSD :param: instance_uuid of the service + :param: subnets list of subnets to be used :return: """ for link in elan_fwd_links: # a new E-LAN/E-Tree elan_vnf_list = [] - lan_net = ELAN_SUBNETS.pop(0) + lan_net = subnets.pop(0) lan_hosts = list(lan_net.hosts()) # generate lan ip address for all interfaces (of all involved (V/CDUs)) - for intf in link["connection_points_reference"]: - vnf_id, intf_name = parse_interface(intf) + for intf_ref in link["connection_points_reference"]: + vnf_id, intf_name = parse_interface(intf_ref) if vnf_id is None: continue # skip references to NS connection points units = self._get_vnf_instance_units(instance_uuid, vnf_id) @@ -603,10 +759,20 @@ class Service(object): # Attention: we apply a simplification for multi DU VNFs here: # the connection points of all involved DUs have to have the same # name as the connection points of the surrounding VNF to be mapped. - # This is because we do not consider links specified in the VNFds + # This is because we do not consider links specified in the VNFDs container_name = uvnfi.name - ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)), - lan_net.prefixlen) + + ip_address = None + # get the interface of the unit + intf = self._get_vnfd_cp_from_vnfi(uvnfi, intf_name) + # check if there is a manually assigned address + if intf is not None: + if intf.get("address"): + ip_address = intf.get("address") + if ip_address is None: + # automatically asign an IP from our pool + ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)), + lan_net.prefixlen) LOG.debug( "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % ( container_name, intf_name, ip_address)) @@ -652,7 +818,7 @@ class Service(object): Get all URLs to pre-build docker images in some repo. :return: """ - for vnf_id, v in self.vnfds.iteritems(): + for vnf_id, v in list(self.vnfds.items()): for vu in v.get("virtual_deployment_units", []): vnf_container_name = get_container_name(vnf_id, vu.get("id")) if vu.get("vm_image_format") == "docker": @@ -682,7 +848,7 @@ class Service(object): dc = DockerClient() LOG.info("Building %d Docker images (this may take several minutes) ..." % len( self.local_docker_files)) - for k, v in self.local_docker_files.iteritems(): + for k, v in list(self.local_docker_files.items()): for line in dc.build(path=v.replace( "Dockerfile", ""), tag=k, rm=False, nocache=False): LOG.debug("DOCKER BUILD: %s" % line) @@ -693,7 +859,7 @@ class Service(object): If the package contains URLs to pre-build Docker images, we download them with this method. """ dc = DockerClient() - for url in self.remote_docker_image_urls.itervalues(): + for url in list(self.remote_docker_image_urls.values()): # only pull if not present (speedup for development) if not FORCE_PULL: if len(dc.images.list(name=url)) > 0: @@ -718,21 +884,22 @@ class Service(object): """ return len(DockerClient().images.list(name=image_name)) > 0 - def _calculate_placement(self, algorithm): + def _place(self, vnfd, vnfid, vdu, ssiid): """ - Do placement by adding the a field "dc" to - each VNFD that points to one of our - data center objects known to the gatekeeper. + Do placement. Return the name of the DC to place + the given VDU. """ assert(len(self.vnfds) > 0) assert(len(GK.dcs) > 0) - # instantiate algorithm an place - p = algorithm() - p.place(self.nsd, self.vnfds, GK.dcs) - LOG.info("Using placement algorithm: %r" % p.__class__.__name__) - # lets print the placement result - for name, vnfd in self.vnfds.iteritems(): - LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc")))) + if PLACEMENT_ALGORITHM_OBJ is None: + LOG.error("No placement algorithm given. Using FirstDcPlacement!") + p = FirstDcPlacement() + else: + p = PLACEMENT_ALGORITHM_OBJ + cname = get_container_name(vnfid, vdu.get("id"), ssiid) + rdc = p.place(GK.dcs, vnfd, vnfid, vdu, ssiid, cname) + LOG.info("Placement: '{}' --> '{}'".format(cname, rdc)) + return rdc def _calculate_cpu_cfs_values(self, cpu_time_percentage): """ @@ -772,9 +939,8 @@ class FirstDcPlacement(object): Placement: Always use one and the same data center from the GK.dcs dict. """ - def place(self, nsd, vnfds, dcs): - for id, vnfd in vnfds.iteritems(): - vnfd["dc"] = list(dcs.itervalues())[0] + def place(self, dcs, vnfd, vnfid, vdu, ssiid, cname): + return list(dcs.values())[0] class RoundRobinDcPlacement(object): @@ -782,12 +948,50 @@ class RoundRobinDcPlacement(object): Placement: Distribute VNFs across all available DCs in a round robin fashion. """ - def place(self, nsd, vnfds, dcs): - c = 0 - dcs_list = list(dcs.itervalues()) - for id, vnfd in vnfds.iteritems(): - vnfd["dc"] = dcs_list[c % len(dcs_list)] - c += 1 # inc. c to use next DC + def __init__(self): + self.count = 0 + + def place(self, dcs, vnfd, vnfid, vdu, ssiid, cname): + dcs_list = list(dcs.values()) + rdc = dcs_list[self.count % len(dcs_list)] + self.count += 1 # inc. count to use next DC + return rdc + + +class StaticConfigPlacement(object): + """ + Placement: Fixed assignment based on config file. + """ + + def __init__(self, path=None): + if path is None: + path = "static_placement.yml" + path = os.path.expanduser(path) + self.static_placement = dict() + try: + self.static_placement = load_yaml(path) + except BaseException as ex: + LOG.error(ex) + LOG.error("Couldn't load placement from {}" + .format(path)) + LOG.info("Loaded static placement: {}" + .format(self.static_placement)) + + def place(self, dcs, vnfd, vnfid, vdu, ssiid, cname): + # check for container name entry + if cname not in self.static_placement: + LOG.error("Coudn't find {} in placement".format(cname)) + LOG.error("Using first DC as fallback!") + return list(dcs.values())[0] + # lookup + candidate_dc = self.static_placement.get(cname) + # check if DC exsits + if candidate_dc not in dcs: + LOG.error("Coudn't find DC {}".format(candidate_dc)) + LOG.error("Using first DC as fallback!") + return list(dcs.values())[0] + # return correct DC + return dcs.get(candidate_dc) """ @@ -819,10 +1023,13 @@ class Packages(fr.Resource): "error": "upload failed. file not found."}, 500 # generate a uuid to reference this package service_uuid = str(uuid.uuid4()) - file_hash = hashlib.sha1(str(son_file)).hexdigest() + file_hash = str(son_file) + file_hash = hashlib.sha1(file_hash.encode()) + file_hash = file_hash.hexdigest() # ensure that upload folder exists ensure_dir(UPLOAD_FOLDER) - upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid) + upload_path = os.path.\ + join(UPLOAD_FOLDER, "%s.tgo" % service_uuid) # store *.son file to disk if is_file_object: son_file.save(upload_path) @@ -864,11 +1071,44 @@ class Packages(fr.Resource): def get(self): """ - Return a list of UUID's of uploaded service packages. - :return: dict/list + Return a list of package descriptor headers. + Fakes the behavior of 5GTANGO's GK API to be + compatible with tng-cli. + :return: list """ LOG.info("GET /packages") - return {"service_uuid_list": list(GK.services.iterkeys())} + result = list() + for suuid, sobj in GK.services.items(): + pkg = dict() + pkg["pd"] = dict() + pkg["uuid"] = suuid + pkg["pd"]["name"] = sobj.manifest.get("name") + pkg["pd"]["version"] = sobj.manifest.get("version") + pkg["created_at"] = sobj.created_at + result.append(pkg) + return result, 200, CORS_HEADER + + +class Services(fr.Resource): + + def get(self): + """ + Return a list of services. + Fakes the behavior of 5GTANGO's GK API to be + compatible with tng-cli. + :return: list + """ + LOG.info("GET /services") + result = list() + for suuid, sobj in GK.services.items(): + service = dict() + service["nsd"] = dict() + service["uuid"] = suuid + service["nsd"]["name"] = sobj.nsd.get("name") + service["nsd"]["version"] = sobj.nsd.get("version") + service["created_at"] = sobj.created_at + result.append(service) + return result, 200, CORS_HEADER class Instantiations(fr.Resource): @@ -883,18 +1123,31 @@ class Instantiations(fr.Resource): # try to extract the service uuid from the request json_data = request.get_json(force=True) service_uuid = json_data.get("service_uuid") - + service_name = json_data.get("service_name") + if service_name is None: + # lets be fuzzy + service_name = service_uuid + # first try to find by service_name + if service_name is not None: + for s_uuid, s in GK.services.items(): + if s.manifest.get("name") == service_name: + LOG.info("Searched for: {}. Found service w. UUID: {}" + .format(service_name, s_uuid)) + service_uuid = s_uuid # lets be a bit fuzzy here to make testing easier if (service_uuid is None or service_uuid == "latest") and len(GK.services) > 0: # if we don't get a service uuid, we simple start the first service # in the list - service_uuid = list(GK.services.iterkeys())[0] + service_uuid = list(GK.services.keys())[0] if service_uuid in GK.services: # ok, we have a service uuid, lets start the service service_instance_uuid = GK.services.get( service_uuid).start_service() - return {"service_instance_uuid": service_instance_uuid}, 201 + # multiple ID fields to be compatible with tng-bench and tng-cli + return ({"service_instance_uuid": service_instance_uuid, + "id": service_instance_uuid}, 201) + LOG.error("Service not found: {}/{}".format(service_uuid, service_name)) return "Service not found", 404 def get(self): @@ -902,9 +1155,20 @@ class Instantiations(fr.Resource): Returns a list of UUIDs containing all running services. :return: dict / list """ - LOG.info("GET /instantiations") - return {"service_instantiations_list": [ - list(s.instances.iterkeys()) for s in GK.services.itervalues()]} + LOG.debug("GET /instantiations or /api/v3/records/services") + # return {"service_instantiations_list": [ + # list(s.instances.keys()) for s in GK.services.values()]} + result = list() + for suuid, sobj in GK.services.items(): + for iuuid, iobj in sobj.instances.items(): + inst = dict() + inst["uuid"] = iobj.get("uuid") + inst["instance_name"] = "{}-inst.{}".format( + iobj.get("name"), iobj.get("ssiid")) + inst["status"] = "running" + inst["created_at"] = iobj.get("created_at") + result.append(inst) + return result, 200, CORS_HEADER def delete(self): """ @@ -912,24 +1176,32 @@ class Instantiations(fr.Resource): """ # try to extract the service and instance UUID from the request json_data = request.get_json(force=True) - service_uuid = json_data.get("service_uuid") - instance_uuid = json_data.get("service_instance_uuid") - + service_uuid_input = json_data.get("service_uuid") + instance_uuid_input = json_data.get("service_instance_uuid") + if len(GK.services) < 1: + return "No service on-boarded.", 404 # try to be fuzzy - if service_uuid is None and len(GK.services) > 0: - # if we don't get a service uuid, we simply stop the last service - # in the list - service_uuid = list(GK.services.iterkeys())[0] - if instance_uuid is None and len( - GK.services[service_uuid].instances) > 0: - instance_uuid = list( - GK.services[service_uuid].instances.iterkeys())[0] - - if service_uuid in GK.services and instance_uuid in GK.services[service_uuid].instances: - # valid service and instance UUID, stop service - GK.services.get(service_uuid).stop_service(instance_uuid) - return "service instance with uuid %r stopped." % instance_uuid, 200 - return "Service not found", 404 + if service_uuid_input is None: + # if we don't get a service uuid we stop all services + service_uuid_list = list(GK.services.keys()) + LOG.info("No service_uuid given, stopping all.") + else: + service_uuid_list = [service_uuid_input] + # for each service + for service_uuid in service_uuid_list: + if instance_uuid_input is None: + instance_uuid_list = list( + GK.services[service_uuid].instances.keys()) + else: + instance_uuid_list = [instance_uuid_input] + # for all service instances + for instance_uuid in instance_uuid_list: + if (service_uuid in GK.services and + instance_uuid in GK.services[service_uuid].instances): + # valid service and instance UUID, stop service + GK.services.get(service_uuid).stop_service(instance_uuid) + LOG.info("Service instance with uuid %r stopped." % instance_uuid) + return "Service(s) stopped.", 200 class Exit(fr.Resource): @@ -946,7 +1218,7 @@ def generate_subnets(prefix, base, subnet_size=50, mask=24): r = list() for net in range(base, base + subnet_size): subnet = "{0}.{1}.0/{2}".format(prefix, net, mask) - r.append(ipaddress.ip_network(unicode(subnet))) + r.append(ipaddress.ip_network(subnet)) return r @@ -969,25 +1241,36 @@ def initialize_GK(): GK = None initialize_GK() # setup Flask +http_server = None app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload api = fr.Api(app) # define endpoints -api.add_resource(Packages, '/packages', '/api/v2/packages') +api.add_resource(Packages, '/packages', '/api/v2/packages', '/api/v3/packages') +api.add_resource(Services, '/services', '/api/v2/services', '/api/v3/services') api.add_resource(Instantiations, '/instantiations', - '/api/v2/instantiations', '/api/v2/requests') + '/api/v2/instantiations', '/api/v2/requests', '/api/v3/requests', + '/api/v3/records/services') api.add_resource(Exit, '/emulator/exit') def start_rest_api(host, port, datacenters=dict()): + global http_server GK.dcs = datacenters GK.net = get_dc_network() # start the Flask server (not the best performance but ok for our use case) - app.run(host=host, - port=port, - debug=True, - use_reloader=False # this is needed to run Flask in a non-main thread - ) + # app.run(host=host, + # port=port, + # debug=True, + # use_reloader=False # this is needed to run Flask in a non-main thread + # ) + http_server = WSGIServer((host, port), app, log=open("/dev/null", "w")) + http_server.serve_forever() + + +def stop_rest_api(): + if http_server: + http_server.close() def ensure_dir(name): @@ -1020,7 +1303,7 @@ def get_dc_network(): :return: """ assert (len(GK.dcs) > 0) - return GK.dcs.values()[0].net + return list(GK.dcs.values())[0].net def parse_interface(interface_name): @@ -1037,10 +1320,32 @@ def parse_interface(interface_name): return vnf_id, vnf_interface -def get_container_name(vnf_id, vdu_id): +def get_container_name(vnf_id, vdu_id, ssiid=None): + if ssiid is not None: + return "{}.{}.{}".format(vnf_id, vdu_id, ssiid) return "{}.{}".format(vnf_id, vdu_id) +def get_triple_id(descr): + return "{}.{}.{}".format( + descr.get("vendor"), descr.get("name"), descr.get("version")) + + +def update_port_mapping_multi_instance(ssiid, port_bindings): + """ + Port_bindings are used to expose ports of the deployed containers. + They would collide if we deploy multiple service instances. + This function adds a offset to them which is based on the + short service instance id (SSIID). + MULTI_INSTANCE_PORT_OFFSET + """ + def _offset(p): + return p + MULTI_INSTANCE_PORT_OFFSET * ssiid + + port_bindings = {k: _offset(v) for k, v in port_bindings.items()} + return port_bindings + + if __name__ == '__main__': """ Lets allow to run the API in standalone mode.