X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fvim-emu.git;a=blobdiff_plain;f=src%2Femuvim%2Fapi%2Ftango%2Fllcm.py;h=bd3e1f94be9c5c8a1567b4ce4bb7ca28b5a33b8c;hp=f0caf56277b4edf29850cc37832ca1942876c298;hb=85408ed517f100c3e477efd35e54630bf9171af0;hpb=8c6b10b7efac9b925df2fac71c2744b438417626 diff --git a/src/emuvim/api/tango/llcm.py b/src/emuvim/api/tango/llcm.py index f0caf56..bd3e1f9 100755 --- a/src/emuvim/api/tango/llcm.py +++ b/src/emuvim/api/tango/llcm.py @@ -1,3 +1,4 @@ + # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University # ALL RIGHTS RESERVED. # @@ -36,6 +37,7 @@ import hashlib import zipfile import yaml import threading +import datetime from docker import DockerClient from flask import Flask, request import flask_restful as fr @@ -50,6 +52,10 @@ LOG = logging.getLogger("5gtango.llcm") LOG.setLevel(logging.INFO) +CORS_HEADER = {'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET,OPTIONS'} + + GK_STORAGE = "/tmp/vim-emu-tango-llcm/" UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/") CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/") @@ -91,6 +97,15 @@ VNF_STOP_WAIT_TIME = 5 # offset for this: NEW_PORT (SSIID * OFFSET) + ORIGINAL_PORT MULTI_INSTANCE_PORT_OFFSET = 1000 +# Selected Placement Algorithm: Points to the class of the selected +# placement algorithm. +PLACEMENT_ALGORITHM_OBJ = None + +# Path to folder with .env.yml files that contain +# environment variables injected into the specific container +# when it is started. +PER_INSTANCE_ENV_CONFIGURATION_FOLDER = None + class OnBoardingException(BaseException): pass @@ -141,6 +156,7 @@ class Service(object): self.remote_docker_image_urls = dict() self.instances = dict() self._instance_counter = 0 + self.created_at = str(datetime.datetime.now()) def onboard(self): """ @@ -193,13 +209,10 @@ class Service(object): self.instances[instance_uuid]["ssiid"] = self._instance_counter self.instances[instance_uuid]["name"] = get_triple_id(self.nsd) self.instances[instance_uuid]["vnf_instances"] = list() + self.instances[instance_uuid]["created_at"] = str(datetime.datetime.now()) # increase for next instance self._instance_counter += 1 - # 2. compute placement of this service instance (adds DC names to - # VNFDs) - # self._calculate_placement(FirstDcPlacement) - self._calculate_placement(RoundRobinDcPlacement) # 3. start all vnfds that we have in the service for vnf_id in self.vnfds: vnfd = self.vnfds[vnf_id] @@ -328,7 +341,7 @@ class Service(object): raise Exception("No image name for %r found. Abort." % vnf_container_name) docker_image_name = self.remote_docker_image_urls.get(vnf_container_name) # 2. select datacenter to start the VNF in - target_dc = vnfd.get("dc") + target_dc = self._place(vnfd, vnf_id, u, ssiid) # 3. perform some checks to ensure we can start the container assert(docker_image_name is not None) assert(target_dc is not None) @@ -344,6 +357,8 @@ class Service(object): # do some re-naming of fields to be compatible to containernet for i in intfs: if i.get("address"): + LOG.info("Found static address for {}: {}" + .format(i.get("id"), i.get("address"))) i["ip"] = i.get("address") # get ports and port_bindings from the port and publish fields of CNFD @@ -351,11 +366,30 @@ class Service(object): ports = list() # Containernet naming port_bindings = dict() for i in intfs: - if i.get("port"): + if i.get("port"): # field with a single port if not isinstance(i.get("port"), int): LOG.info("Field 'port' is no int CP: {}".format(i)) else: - ports.append(i.get("port")) + ports.append(i.get("port")) # collect all ports + if i.get("ports"): # list with multiple ports + if not isinstance(i.get("ports"), list): + LOG.info("Field 'port' is no list CP: {}".format(i)) + else: + for p in i.get("ports"): + if not isinstance(p, int): + # do some parsing + try: + if "/udp" in p: + p = tuple(p.split("/")) + else: + p = int(p) + ports.append(p) # collect all ports + except BaseException as ex: + LOG.error( + "Could not parse ports list: {}".format(p)) + LOG.error(ex) + else: + ports.append(p) # collect all ports if i.get("publish"): if not isinstance(i.get("publish"), dict): LOG.info("Field 'publish' is no dict CP: {}".format(i)) @@ -383,9 +417,20 @@ class Service(object): " Overwriting SON_EMU_CMD_STOP.") cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP + # 5.2 inject per instance configurations based on envs + conf_envs = self._load_instance_conf_envs(vnf_container_instance_name) + cenv.update(conf_envs) + + # 5.3 handle optional ipc_mode setting + ipc_mode = u.get("ipc_mode", None) + # 5.4 handle optional devices setting + devices = u.get("devices", []) + # 5.5 handle optional cap_add setting + cap_add = u.get("cap_add", []) + # 6. Start the container LOG.info("Starting %r as %r in DC %r" % - (vnf_name, vnf_container_instance_name, vnfd.get("dc"))) + (vnf_name, vnf_container_instance_name, target_dc)) LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs)) # start the container vnfi = target_dc.startCompute( @@ -402,6 +447,9 @@ class Service(object): port_bindings=port_bindings, # only publish if explicitly stated in descriptor publish_all_ports=False, + ipc_mode=ipc_mode, + devices=devices, + cap_add=cap_add, type=kwargs.get('type', 'docker')) # add vnfd reference to vnfi vnfi.vnfd = vnfd @@ -516,6 +564,26 @@ class Service(object): t.start() break # only execute one command + def _load_instance_conf_envs(self, cname): + """ + Try to load an instance-specific env file. If not found, + just return an empty dict. + """ + if PER_INSTANCE_ENV_CONFIGURATION_FOLDER is None: + return dict() + try: + path = os.path.expanduser(PER_INSTANCE_ENV_CONFIGURATION_FOLDER) + path = os.path.join(path, "{}.env.yml".format(cname)) + res = load_yaml(path) + LOG.info("Loaded instance-specific env file for '{}': {}" + .format(cname, res)) + return res + except BaseException as ex: + LOG.info("No instance-specific env file found for: {}" + .format(cname)) + del ex + return dict() + def _unpack_service_package(self): """ unzip *.son file and store contents in CATALOG_FOLDER/services// @@ -679,8 +747,8 @@ class Service(object): lan_hosts = list(lan_net.hosts()) # generate lan ip address for all interfaces (of all involved (V/CDUs)) - for intf in link["connection_points_reference"]: - vnf_id, intf_name = parse_interface(intf) + for intf_ref in link["connection_points_reference"]: + vnf_id, intf_name = parse_interface(intf_ref) if vnf_id is None: continue # skip references to NS connection points units = self._get_vnf_instance_units(instance_uuid, vnf_id) @@ -691,10 +759,20 @@ class Service(object): # Attention: we apply a simplification for multi DU VNFs here: # the connection points of all involved DUs have to have the same # name as the connection points of the surrounding VNF to be mapped. - # This is because we do not consider links specified in the VNFds + # This is because we do not consider links specified in the VNFDs container_name = uvnfi.name - ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)), - lan_net.prefixlen) + + ip_address = None + # get the interface of the unit + intf = self._get_vnfd_cp_from_vnfi(uvnfi, intf_name) + # check if there is a manually assigned address + if intf is not None: + if intf.get("address"): + ip_address = intf.get("address") + if ip_address is None: + # automatically asign an IP from our pool + ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)), + lan_net.prefixlen) LOG.debug( "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % ( container_name, intf_name, ip_address)) @@ -806,21 +884,22 @@ class Service(object): """ return len(DockerClient().images.list(name=image_name)) > 0 - def _calculate_placement(self, algorithm): + def _place(self, vnfd, vnfid, vdu, ssiid): """ - Do placement by adding the a field "dc" to - each VNFD that points to one of our - data center objects known to the gatekeeper. + Do placement. Return the name of the DC to place + the given VDU. """ assert(len(self.vnfds) > 0) assert(len(GK.dcs) > 0) - # instantiate algorithm an place - p = algorithm() - p.place(self.nsd, self.vnfds, GK.dcs) - LOG.info("Using placement algorithm: %r" % p.__class__.__name__) - # lets print the placement result - for name, vnfd in self.vnfds.iteritems(): - LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc")))) + if PLACEMENT_ALGORITHM_OBJ is None: + LOG.error("No placement algorithm given. Using FirstDcPlacement!") + p = FirstDcPlacement() + else: + p = PLACEMENT_ALGORITHM_OBJ + cname = get_container_name(vnfid, vdu.get("id"), ssiid) + rdc = p.place(GK.dcs, vnfd, vnfid, vdu, ssiid, cname) + LOG.info("Placement: '{}' --> '{}'".format(cname, rdc)) + return rdc def _calculate_cpu_cfs_values(self, cpu_time_percentage): """ @@ -860,9 +939,8 @@ class FirstDcPlacement(object): Placement: Always use one and the same data center from the GK.dcs dict. """ - def place(self, nsd, vnfds, dcs): - for id, vnfd in vnfds.iteritems(): - vnfd["dc"] = list(dcs.itervalues())[0] + def place(self, dcs, vnfd, vnfid, vdu, ssiid, cname): + return list(dcs.itervalues())[0] class RoundRobinDcPlacement(object): @@ -870,12 +948,50 @@ class RoundRobinDcPlacement(object): Placement: Distribute VNFs across all available DCs in a round robin fashion. """ - def place(self, nsd, vnfds, dcs): - c = 0 + def __init__(self): + self.count = 0 + + def place(self, dcs, vnfd, vnfid, vdu, ssiid, cname): dcs_list = list(dcs.itervalues()) - for id, vnfd in vnfds.iteritems(): - vnfd["dc"] = dcs_list[c % len(dcs_list)] - c += 1 # inc. c to use next DC + rdc = dcs_list[self.count % len(dcs_list)] + self.count += 1 # inc. count to use next DC + return rdc + + +class StaticConfigPlacement(object): + """ + Placement: Fixed assignment based on config file. + """ + + def __init__(self, path=None): + if path is None: + path = "static_placement.yml" + path = os.path.expanduser(path) + self.static_placement = dict() + try: + self.static_placement = load_yaml(path) + except BaseException as ex: + LOG.error(ex) + LOG.error("Couldn't load placement from {}" + .format(path)) + LOG.info("Loaded static placement: {}" + .format(self.static_placement)) + + def place(self, dcs, vnfd, vnfid, vdu, ssiid, cname): + # check for container name entry + if cname not in self.static_placement: + LOG.error("Coudn't find {} in placement".format(cname)) + LOG.error("Using first DC as fallback!") + return list(dcs.itervalues())[0] + # lookup + candidate_dc = self.static_placement.get(cname) + # check if DC exsits + if candidate_dc not in dcs: + LOG.error("Coudn't find DC {}".format(candidate_dc)) + LOG.error("Using first DC as fallback!") + return list(dcs.itervalues())[0] + # return correct DC + return dcs.get(candidate_dc) """ @@ -952,11 +1068,44 @@ class Packages(fr.Resource): def get(self): """ - Return a list of UUID's of uploaded service packages. - :return: dict/list + Return a list of package descriptor headers. + Fakes the behavior of 5GTANGO's GK API to be + compatible with tng-cli. + :return: list """ LOG.info("GET /packages") - return {"service_uuid_list": list(GK.services.iterkeys())} + result = list() + for suuid, sobj in GK.services.iteritems(): + pkg = dict() + pkg["pd"] = dict() + pkg["uuid"] = suuid + pkg["pd"]["name"] = sobj.manifest.get("name") + pkg["pd"]["version"] = sobj.manifest.get("version") + pkg["created_at"] = sobj.created_at + result.append(pkg) + return result, 200, CORS_HEADER + + +class Services(fr.Resource): + + def get(self): + """ + Return a list of services. + Fakes the behavior of 5GTANGO's GK API to be + compatible with tng-cli. + :return: list + """ + LOG.info("GET /services") + result = list() + for suuid, sobj in GK.services.iteritems(): + service = dict() + service["nsd"] = dict() + service["uuid"] = suuid + service["nsd"]["name"] = sobj.nsd.get("name") + service["nsd"]["version"] = sobj.nsd.get("version") + service["created_at"] = sobj.created_at + result.append(service) + return result, 200, CORS_HEADER class Instantiations(fr.Resource): @@ -972,7 +1121,9 @@ class Instantiations(fr.Resource): json_data = request.get_json(force=True) service_uuid = json_data.get("service_uuid") service_name = json_data.get("service_name") - + if service_name is None: + # lets be fuzzy + service_name = service_uuid # first try to find by service_name if service_name is not None: for s_uuid, s in GK.services.iteritems(): @@ -990,7 +1141,10 @@ class Instantiations(fr.Resource): # ok, we have a service uuid, lets start the service service_instance_uuid = GK.services.get( service_uuid).start_service() - return {"service_instance_uuid": service_instance_uuid}, 201 + # multiple ID fields to be compatible with tng-bench and tng-cli + return ({"service_instance_uuid": service_instance_uuid, + "id": service_instance_uuid}, 201) + LOG.error("Service not found: {}/{}".format(service_uuid, service_name)) return "Service not found", 404 def get(self): @@ -998,9 +1152,20 @@ class Instantiations(fr.Resource): Returns a list of UUIDs containing all running services. :return: dict / list """ - LOG.info("GET /instantiations") - return {"service_instantiations_list": [ - list(s.instances.iterkeys()) for s in GK.services.itervalues()]} + LOG.debug("GET /instantiations or /api/v3/records/services") + # return {"service_instantiations_list": [ + # list(s.instances.iterkeys()) for s in GK.services.itervalues()]} + result = list() + for suuid, sobj in GK.services.iteritems(): + for iuuid, iobj in sobj.instances.iteritems(): + inst = dict() + inst["uuid"] = iobj.get("uuid") + inst["instance_name"] = "{}-inst.{}".format( + iobj.get("name"), iobj.get("ssiid")) + inst["status"] = "running" + inst["created_at"] = iobj.get("created_at") + result.append(inst) + return result, 200, CORS_HEADER def delete(self): """ @@ -1078,9 +1243,11 @@ app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload api = fr.Api(app) # define endpoints -api.add_resource(Packages, '/packages', '/api/v2/packages') +api.add_resource(Packages, '/packages', '/api/v2/packages', '/api/v3/packages') +api.add_resource(Services, '/services', '/api/v2/services', '/api/v3/services') api.add_resource(Instantiations, '/instantiations', - '/api/v2/instantiations', '/api/v2/requests') + '/api/v2/instantiations', '/api/v2/requests', '/api/v3/requests', + '/api/v3/records/services') api.add_resource(Exit, '/emulator/exit')