5GTANGO LLCM: Added support to deploy multiple instances
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from gevent.pywsgi import WSGIServer
43 from subprocess import Popen
44 import ipaddress
45 import copy
46 import time
47
48
49 LOG = logging.getLogger("5gtango.llcm")
50 LOG.setLevel(logging.INFO)
51
52
53 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
54 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
55 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
56
57 # Enable Dockerfile build functionality
58 BUILD_DOCKERFILE = False
59
60 # flag to indicate that we run without the emulator (only the bare API for
61 # integration testing)
62 GK_STANDALONE_MODE = False
63
64 # should a new version of an image be pulled even if its available
65 FORCE_PULL = False
66
67 # flag to indicate if we use bidirectional forwarding rules in the
68 # automatic chaining process
69 BIDIRECTIONAL_CHAIN = True
70
71 # override the management interfaces in the descriptors with default
72 # docker0 interfaces in the containers
73 USE_DOCKER_MGMT = False
74
75 # automatically deploy uploaded packages (no need to execute son-access
76 # deploy --latest separately)
77 AUTO_DEPLOY = False
78
79 # and also automatically terminate any other running services
80 AUTO_DELETE = False
81
82 # global subnet definitions (see reset_subnets())
83 ELAN_SUBNETS = None
84 ELINE_SUBNETS = None
85
86 # Time in seconds to wait for vnf stop scripts to execute fully
87 VNF_STOP_WAIT_TIME = 5
88
89 # If services are instantiated multiple times, the public port
90 # mappings need to be adapted to avoid colisions. We use this
91 # offset for this: NEW_PORT (SSIID * OFFSET) + ORIGINAL_PORT
92 MULTI_INSTANCE_PORT_OFFSET = 1000
93
94
95 class OnBoardingException(BaseException):
96 pass
97
98
99 class Gatekeeper(object):
100
101 def __init__(self):
102 self.services = dict()
103 self.dcs = dict()
104 self.net = None
105 # used to generate short names for VNFs (Mininet limitation)
106 self.vnf_counter = 0
107 reset_subnets()
108 LOG.info("Initialized 5GTANGO LLCM module.")
109
110 def register_service_package(self, service_uuid, service):
111 """
112 register new service package
113 :param service_uuid
114 :param service object
115 """
116 self.services[service_uuid] = service
117 # lets perform all steps needed to onboard the service
118 service.onboard()
119
120
121 class Service(object):
122 """
123 This class represents a NS uploaded as a *.son package to the
124 dummy gatekeeper.
125 Can have multiple running instances of this service.
126 """
127
128 def __init__(self,
129 service_uuid,
130 package_file_hash,
131 package_file_path):
132 self.uuid = service_uuid
133 self.package_file_hash = package_file_hash
134 self.package_file_path = package_file_path
135 self.package_content_path = os.path.join(
136 CATALOG_FOLDER, "services/%s" % self.uuid)
137 self.manifest = None
138 self.nsd = None
139 self.vnfds = dict()
140 self.local_docker_files = dict()
141 self.remote_docker_image_urls = dict()
142 self.instances = dict()
143 self._instance_counter = 0
144
145 def onboard(self):
146 """
147 Do all steps to prepare this service to be instantiated
148 :return:
149 """
150 # 1. extract the contents of the package and store them in our catalog
151 self._unpack_service_package()
152 # 2. read in all descriptor files
153 self._load_package_descriptor()
154 self._load_nsd()
155 self._load_vnfd()
156 if self.nsd is None:
157 raise OnBoardingException("No NSD found.")
158 if len(self.vnfds) < 1:
159 raise OnBoardingException("No VNFDs found.")
160 # 3. prepare container images (e.g. download or build Dockerfile)
161 if BUILD_DOCKERFILE:
162 self._load_docker_files()
163 self._build_images_from_dockerfiles()
164 else:
165 self._load_docker_urls()
166 self._pull_predefined_dockerimages()
167 # 4. reserve subnets
168 eline_fwd_links, elan_fwd_links = self._get_elines_and_elans()
169 self.eline_subnets = [ELINE_SUBNETS.pop(0) for _ in eline_fwd_links]
170 self.elan_subnets = [ELAN_SUBNETS.pop(0) for _ in elan_fwd_links]
171 LOG.debug("Reserved subnets for service '{}': E-Line: {} / E-LAN: {}"
172 .format(self.manifest.get("name"),
173 self.eline_subnets, self.elan_subnets))
174 LOG.info("On-boarded service: {}".format(self.manifest.get("name")))
175
176 def start_service(self):
177 """
178 This methods creates and starts a new service instance.
179 It computes placements, iterates over all VNFDs, and starts
180 each VNFD as a Docker container in the data center selected
181 by the placement algorithm.
182 :return:
183 """
184 LOG.info("Starting service {} ({})"
185 .format(get_triple_id(self.nsd), self.uuid))
186
187 # 1. each service instance gets a new uuid to identify it
188 instance_uuid = str(uuid.uuid4())
189 # build a instances dict (a bit like a NSR :))
190 self.instances[instance_uuid] = dict()
191 self.instances[instance_uuid]["uuid"] = self.uuid
192 # SSIID = short service instance ID (to postfix Container names)
193 self.instances[instance_uuid]["ssiid"] = self._instance_counter
194 self.instances[instance_uuid]["name"] = get_triple_id(self.nsd)
195 self.instances[instance_uuid]["vnf_instances"] = list()
196 # increase for next instance
197 self._instance_counter += 1
198
199 # 2. compute placement of this service instance (adds DC names to
200 # VNFDs)
201 # self._calculate_placement(FirstDcPlacement)
202 self._calculate_placement(RoundRobinDcPlacement)
203 # 3. start all vnfds that we have in the service
204 for vnf_id in self.vnfds:
205 vnfd = self.vnfds[vnf_id]
206 # attention: returns a list of started deployment units
207 vnfis = self._start_vnfd(
208 vnfd, vnf_id, self.instances[instance_uuid]["ssiid"])
209 # add list of VNFIs to total VNFI list
210 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
211
212 # 4. Deploy E-Line, E-Tree and E-LAN links
213 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
214 # even if "forwarding_graphs" are not used directly.
215 # Attention2: Do a copy of *_subnets with list() is important here!
216 eline_fwd_links, elan_fwd_links = self._get_elines_and_elans()
217 # 5a. deploy E-Line links
218 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
219 self._connect_elines(eline_fwd_links, instance_uuid, list(self.eline_subnets))
220 # 5b. deploy E-Tree/E-LAN links
221 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
222 self._connect_elans(elan_fwd_links, instance_uuid, list(self.elan_subnets))
223
224 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
225 # service instance
226 self._trigger_emulator_start_scripts_in_vnfis(
227 self.instances[instance_uuid]["vnf_instances"])
228 # done
229 LOG.info("Service '{}' started. Instance id: {} SSIID: {}"
230 .format(self.instances[instance_uuid]["name"],
231 instance_uuid,
232 self.instances[instance_uuid]["ssiid"]))
233 return instance_uuid
234
235 def stop_service(self, instance_uuid):
236 """
237 This method stops a running service instance.
238 It iterates over all VNF instances, stopping them each
239 and removing them from their data center.
240 :param instance_uuid: the uuid of the service instance to be stopped
241 """
242 LOG.info("Stopping service %r" % self.uuid)
243 # get relevant information
244 # instance_uuid = str(self.uuid.uuid4())
245 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
246 # trigger stop skripts in vnf instances and wait a few seconds for
247 # completion
248 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
249 time.sleep(VNF_STOP_WAIT_TIME)
250 # stop all vnfs
251 for v in vnf_instances:
252 self._stop_vnfi(v)
253 # last step: remove the instance from the list of all instances
254 del self.instances[instance_uuid]
255
256 def _get_elines_and_elans(self):
257 """
258 Get the E-Line, E-LAN, E-Tree links from the NSD.
259 """
260 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
261 # even if "forwarding_graphs" are not used directly.
262 eline_fwd_links = list()
263 elan_fwd_links = list()
264 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
265 vlinks = self.nsd["virtual_links"]
266 # constituent virtual links are not checked
267 eline_fwd_links = [l for l in vlinks if (
268 l["connectivity_type"] == "E-Line")]
269 elan_fwd_links = [l for l in vlinks if (
270 l["connectivity_type"] == "E-LAN" or
271 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
272 return eline_fwd_links, elan_fwd_links
273
274 def _get_resource_limits(self, deployment_unit):
275 """
276 Extract resource limits from deployment units.
277 """
278 # defaults
279 cpu_list = None
280 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
281 mem_limit = None
282 # update from descriptor
283 if "resource_requirements" in deployment_unit:
284 res_req = deployment_unit.get("resource_requirements")
285 cpu_list = res_req.get("cpu").get("cpuset")
286 if cpu_list is None:
287 cpu_list = res_req.get("cpu").get("vcpus")
288 if cpu_list is not None:
289 # attention: docker expects list as string w/o spaces:
290 cpu_list = str(cpu_list).replace(" ", "").strip()
291 cpu_bw = res_req.get("cpu").get("cpu_bw")
292 if cpu_bw is None:
293 cpu_bw = 1.0
294 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
295 mem_limit = res_req.get("memory").get("size")
296 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
297 if mem_limit is not None:
298 mem_limit = int(mem_limit)
299 # to bytes
300 if "G" in mem_unit:
301 mem_limit = mem_limit * 1024 * 1024 * 1024
302 elif "M" in mem_unit:
303 mem_limit = mem_limit * 1024 * 1024
304 elif "K" in mem_unit:
305 mem_limit = mem_limit * 1024
306 return cpu_list, cpu_period, cpu_quota, mem_limit
307
308 def _start_vnfd(self, vnfd, vnf_id, ssiid, **kwargs):
309 """
310 Start a single VNFD of this service
311 :param vnfd: vnfd descriptor dict
312 :param vnf_id: unique id of this vnf in the nsd
313 :return:
314 """
315 vnfis = list()
316 # the vnf_name refers to the container image to be deployed
317 vnf_name = vnfd.get("name")
318 # combine VDUs and CDUs
319 deployment_units = (vnfd.get("virtual_deployment_units", []) +
320 vnfd.get("cloudnative_deployment_units", []))
321 # iterate over all deployment units within each VNFDs
322 for u in deployment_units:
323 # 0. vnf_container_name = vnf_id.vdu_id
324 vnf_container_name = get_container_name(vnf_id, u.get("id"))
325 vnf_container_instance_name = get_container_name(vnf_id, u.get("id"), ssiid)
326 # 1. get the name of the docker image to star
327 if vnf_container_name not in self.remote_docker_image_urls:
328 raise Exception("No image name for %r found. Abort." % vnf_container_name)
329 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
330 # 2. select datacenter to start the VNF in
331 target_dc = vnfd.get("dc")
332 # 3. perform some checks to ensure we can start the container
333 assert(docker_image_name is not None)
334 assert(target_dc is not None)
335 if not self._check_docker_image_exists(docker_image_name):
336 raise Exception("Docker image {} not found. Abort."
337 .format(docker_image_name))
338
339 # 4. get the resource limits
340 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
341
342 # get connection points defined for the DU
343 intfs = u.get("connection_points", [])
344 # do some re-naming of fields to be compatible to containernet
345 for i in intfs:
346 if i.get("address"):
347 i["ip"] = i.get("address")
348
349 # get ports and port_bindings from the port and publish fields of CNFD
350 # see: https://github.com/containernet/containernet/wiki/Exposing-and-mapping-network-ports
351 ports = list() # Containernet naming
352 port_bindings = dict()
353 for i in intfs:
354 if i.get("port"):
355 if not isinstance(i.get("port"), int):
356 LOG.info("Field 'port' is no int CP: {}".format(i))
357 else:
358 ports.append(i.get("port"))
359 if i.get("publish"):
360 if not isinstance(i.get("publish"), dict):
361 LOG.info("Field 'publish' is no dict CP: {}".format(i))
362 else:
363 port_bindings.update(i.get("publish"))
364 # update port mapping for cases where service is deployed > 1 times
365 port_bindings = update_port_mapping_multi_instance(ssiid, port_bindings)
366 if len(ports) > 0:
367 LOG.info("{} exposes ports: {}".format(vnf_container_instance_name, ports))
368 if len(port_bindings) > 0:
369 LOG.info("{} publishes ports: {}".format(vnf_container_instance_name, port_bindings))
370
371 # 5. collect additional information to start container
372 volumes = list()
373 cenv = dict()
374 # 5.1 inject descriptor based start/stop commands into env (overwrite)
375 VNFD_CMD_START = u.get("vm_cmd_start")
376 VNFD_CMD_STOP = u.get("vm_cmd_stop")
377 if VNFD_CMD_START and not VNFD_CMD_START == "None":
378 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
379 " Overwriting SON_EMU_CMD.")
380 cenv["SON_EMU_CMD"] = VNFD_CMD_START
381 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
382 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
383 " Overwriting SON_EMU_CMD_STOP.")
384 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
385
386 # 6. Start the container
387 LOG.info("Starting %r as %r in DC %r" %
388 (vnf_name, vnf_container_instance_name, vnfd.get("dc")))
389 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
390 # start the container
391 vnfi = target_dc.startCompute(
392 vnf_container_instance_name,
393 network=intfs,
394 image=docker_image_name,
395 cpu_quota=cpu_quota,
396 cpu_period=cpu_period,
397 cpuset_cpus=cpu_list,
398 mem_limit=mem_limit,
399 volumes=volumes,
400 properties=cenv, # environment
401 ports=ports,
402 port_bindings=port_bindings,
403 # only publish if explicitly stated in descriptor
404 publish_all_ports=False,
405 type=kwargs.get('type', 'docker'))
406 # add vnfd reference to vnfi
407 vnfi.vnfd = vnfd
408 # add container name
409 vnfi.vnf_container_name = vnf_container_name
410 vnfi.vnf_container_instance_name = vnf_container_instance_name
411 vnfi.ssiid = ssiid
412 # store vnfi
413 vnfis.append(vnfi)
414 return vnfis
415
416 def _stop_vnfi(self, vnfi):
417 """
418 Stop a VNF instance.
419 :param vnfi: vnf instance to be stopped
420 """
421 # Find the correct datacenter
422 status = vnfi.getStatus()
423 dc = vnfi.datacenter
424 # stop the vnfi
425 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
426 (status["name"], dc))
427 dc.stopCompute(status["name"])
428
429 def _get_vnf_instance(self, instance_uuid, vnf_id):
430 """
431 Returns VNFI object for a given "vnf_id" or "vnf_container_name" taken from an NSD.
432 :return: single object
433 """
434 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
435 if str(vnfi.name) == str(vnf_id):
436 return vnfi
437 LOG.warning("No container with name: {0} found.".format(vnf_id))
438 return None
439
440 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
441 """
442 Returns a list of VNFI objects (all deployment units) for a given
443 "vnf_id" taken from an NSD.
444 :return: list
445 """
446 if vnf_id is None:
447 return None
448 r = list()
449 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
450 if vnf_id in vnfi.name:
451 r.append(vnfi)
452 if len(r) > 0:
453 LOG.debug("Found units: {} for vnf_id: {}"
454 .format([i.name for i in r], vnf_id))
455 return r
456 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
457 return None
458
459 @staticmethod
460 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
461 """
462 Reconfigure the network configuration of a specific interface
463 of a running container.
464 :param vnfi: container instance
465 :param if_name: interface name
466 :param net_str: network configuration string, e.g., 1.2.3.4/24
467 :return:
468 """
469 # assign new ip address
470 if net_str is not None:
471 intf = vnfi.intf(intf=if_name)
472 if intf is not None:
473 intf.setIP(net_str)
474 LOG.debug("Reconfigured network of %s:%s to %r" %
475 (vnfi.name, if_name, net_str))
476 else:
477 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
478 vnfi.name, if_name))
479
480 if new_name is not None:
481 vnfi.cmd('ip link set', if_name, 'down')
482 vnfi.cmd('ip link set', if_name, 'name', new_name)
483 vnfi.cmd('ip link set', new_name, 'up')
484 LOG.debug("Reconfigured interface name of %s:%s to %s" %
485 (vnfi.name, if_name, new_name))
486
487 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
488 for vnfi in vnfi_list:
489 config = vnfi.dcinfo.get("Config", dict())
490 env = config.get("Env", list())
491 for env_var in env:
492 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
493 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
494 LOG.info("Executing script in '{}': {}={}"
495 .format(vnfi.name, var, cmd))
496 # execute command in new thread to ensure that GK is not
497 # blocked by VNF
498 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
499 t.daemon = True
500 t.start()
501 break # only execute one command
502
503 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
504 for vnfi in vnfi_list:
505 config = vnfi.dcinfo.get("Config", dict())
506 env = config.get("Env", list())
507 for env_var in env:
508 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
509 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
510 LOG.info("Executing script in '{}': {}={}"
511 .format(vnfi.name, var, cmd))
512 # execute command in new thread to ensure that GK is not
513 # blocked by VNF
514 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
515 t.daemon = True
516 t.start()
517 break # only execute one command
518
519 def _unpack_service_package(self):
520 """
521 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
522 """
523 LOG.info("Unzipping: %r" % self.package_file_path)
524 with zipfile.ZipFile(self.package_file_path, "r") as z:
525 z.extractall(self.package_content_path)
526
527 def _load_package_descriptor(self):
528 """
529 Load the main package descriptor YAML and keep it as dict.
530 :return:
531 """
532 self.manifest = load_yaml(
533 os.path.join(
534 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
535
536 def _load_nsd(self):
537 """
538 Load the entry NSD YAML and keep it as dict.
539 :return:
540 """
541 if "package_content" in self.manifest:
542 nsd_path = None
543 for f in self.manifest.get("package_content"):
544 if f.get("content-type") == "application/vnd.5gtango.nsd":
545 nsd_path = os.path.join(
546 self.package_content_path,
547 make_relative_path(f.get("source")))
548 break # always use the first NSD for now
549 if nsd_path is None:
550 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
551 self.nsd = load_yaml(nsd_path)
552 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
553 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
554 else:
555 raise OnBoardingException(
556 "No 'package_content' section in package manifest:\n{}"
557 .format(self.manifest))
558
559 def _load_vnfd(self):
560 """
561 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
562 :return:
563 """
564 # first make a list of all the vnfds in the package
565 vnfd_set = dict()
566 if "package_content" in self.manifest:
567 for pc in self.manifest.get("package_content"):
568 if pc.get(
569 "content-type") == "application/vnd.5gtango.vnfd":
570 vnfd_path = os.path.join(
571 self.package_content_path,
572 make_relative_path(pc.get("source")))
573 vnfd = load_yaml(vnfd_path)
574 vnfd_set[vnfd.get("name")] = vnfd
575 if len(vnfd_set) < 1:
576 raise OnBoardingException("No VNFDs found.")
577 # then link each vnf_id in the nsd to its vnfd
578 for v in self.nsd.get("network_functions"):
579 if v.get("vnf_name") in vnfd_set:
580 self.vnfds[v.get("vnf_id")] = vnfd_set[v.get("vnf_name")]
581 LOG.debug("Loaded VNFD: {0} id: {1}"
582 .format(v.get("vnf_name"), v.get("vnf_id")))
583
584 def _connect_elines(self, eline_fwd_links, instance_uuid, subnets):
585 """
586 Connect all E-LINE links in the NSD
587 Attention: This method DOES NOT support multi V/CDU VNFs!
588 :param eline_fwd_links: list of E-LINE links in the NSD
589 :param: instance_uuid of the service
590 :param: subnets list of subnets to be used
591 :return:
592 """
593 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
594 # eg. different services get a unique cookie for their flowrules
595 cookie = 1
596 for link in eline_fwd_links:
597 LOG.info("Found E-Line: {}".format(link))
598 src_id, src_if_name = parse_interface(
599 link["connection_points_reference"][0])
600 dst_id, dst_if_name = parse_interface(
601 link["connection_points_reference"][1])
602 LOG.info("Searching C/VDU for E-Line: src={}, src_if={}, dst={}, dst_if={}"
603 .format(src_id, src_if_name, dst_id, dst_if_name))
604 # handle C/VDUs (ugly hack, only one V/CDU per VNF for now)
605 src_units = self._get_vnf_instance_units(instance_uuid, src_id)
606 dst_units = self._get_vnf_instance_units(instance_uuid, dst_id)
607 if src_units is None or dst_units is None:
608 LOG.info("No VNF-VNF link. Skipping: src={}, src_if={}, dst={}, dst_if={}"
609 .format(src_id, src_if_name, dst_id, dst_if_name))
610 return
611 # we only support VNFs with one V/CDU right now
612 if len(src_units) != 1 or len(dst_units) != 1:
613 raise BaseException("LLCM does not support E-LINES for multi V/CDU VNFs.")
614 # get the full name from that C/VDU and use it as src_id and dst_id
615 src_id = src_units[0].name
616 dst_id = dst_units[0].name
617 # from here we have all info we need
618 LOG.info("Creating E-Line for C/VDU: src={}, src_if={}, dst={}, dst_if={}"
619 .format(src_id, src_if_name, dst_id, dst_if_name))
620 # get involved vnfis
621 src_vnfi = src_units[0]
622 dst_vnfi = dst_units[0]
623 # proceed with chaining setup
624 setChaining = False
625 if src_vnfi is not None and dst_vnfi is not None:
626 setChaining = True
627 # re-configure the VNFs IP assignment and ensure that a new
628 # subnet is used for each E-Link
629 eline_net = subnets.pop(0)
630 ip1 = "{0}/{1}".format(str(eline_net[1]),
631 eline_net.prefixlen)
632 ip2 = "{0}/{1}".format(str(eline_net[2]),
633 eline_net.prefixlen)
634 # check if VNFs have fixed IPs (ip/address field in VNFDs)
635 if (self._get_vnfd_cp_from_vnfi(
636 src_vnfi, src_if_name).get("ip") is None and
637 self._get_vnfd_cp_from_vnfi(
638 src_vnfi, src_if_name).get("address") is None):
639 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
640 # check if VNFs have fixed IPs (ip field in VNFDs)
641 if (self._get_vnfd_cp_from_vnfi(
642 dst_vnfi, dst_if_name).get("ip") is None and
643 self._get_vnfd_cp_from_vnfi(
644 dst_vnfi, dst_if_name).get("address") is None):
645 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
646 # set the chaining
647 if setChaining:
648 GK.net.setChain(
649 src_id, dst_id,
650 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
651 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
652
653 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
654 """
655 Gets the connection point data structure from the VNFD
656 of the given VNFI using ifname.
657 """
658 if vnfi.vnfd is None:
659 return {}
660 cps = vnfi.vnfd.get("connection_points")
661 for cp in cps:
662 if cp.get("id") == ifname:
663 return cp
664
665 def _connect_elans(self, elan_fwd_links, instance_uuid, subnets):
666 """
667 Connect all E-LAN/E-Tree links in the NSD
668 This method supports multi-V/CDU VNFs if the connection
669 point names of the DUs are the same as the ones in the NSD.
670 :param elan_fwd_links: list of E-LAN links in the NSD
671 :param: instance_uuid of the service
672 :param: subnets list of subnets to be used
673 :return:
674 """
675 for link in elan_fwd_links:
676 # a new E-LAN/E-Tree
677 elan_vnf_list = []
678 lan_net = subnets.pop(0)
679 lan_hosts = list(lan_net.hosts())
680
681 # generate lan ip address for all interfaces (of all involved (V/CDUs))
682 for intf in link["connection_points_reference"]:
683 vnf_id, intf_name = parse_interface(intf)
684 if vnf_id is None:
685 continue # skip references to NS connection points
686 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
687 if units is None:
688 continue # skip if no deployment unit is present
689 # iterate over all involved deployment units
690 for uvnfi in units:
691 # Attention: we apply a simplification for multi DU VNFs here:
692 # the connection points of all involved DUs have to have the same
693 # name as the connection points of the surrounding VNF to be mapped.
694 # This is because we do not consider links specified in the VNFds
695 container_name = uvnfi.name
696 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
697 lan_net.prefixlen)
698 LOG.debug(
699 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
700 container_name, intf_name, ip_address))
701 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
702 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
703 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
704 # necessary.
705 vnfi = self._get_vnf_instance(instance_uuid, container_name)
706 if vnfi is not None:
707 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
708 # add this vnf and interface to the E-LAN for tagging
709 elan_vnf_list.append(
710 {'name': container_name, 'interface': intf_name})
711 # install the VLAN tags for this E-LAN
712 GK.net.setLAN(elan_vnf_list)
713
714 def _load_docker_files(self):
715 """
716 Get all paths to Dockerfiles from VNFDs and store them in dict.
717 :return:
718 """
719 for vnf_id, v in self.vnfds.iteritems():
720 for vu in v.get("virtual_deployment_units", []):
721 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
722 if vu.get("vm_image_format") == "docker":
723 vm_image = vu.get("vm_image")
724 docker_path = os.path.join(
725 self.package_content_path,
726 make_relative_path(vm_image))
727 self.local_docker_files[vnf_container_name] = docker_path
728 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
729 for cu in v.get("cloudnative_deployment_units", []):
730 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
731 image = cu.get("image")
732 docker_path = os.path.join(
733 self.package_content_path,
734 make_relative_path(image))
735 self.local_docker_files[vnf_container_name] = docker_path
736 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
737
738 def _load_docker_urls(self):
739 """
740 Get all URLs to pre-build docker images in some repo.
741 :return:
742 """
743 for vnf_id, v in self.vnfds.iteritems():
744 for vu in v.get("virtual_deployment_units", []):
745 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
746 if vu.get("vm_image_format") == "docker":
747 url = vu.get("vm_image")
748 if url is not None:
749 url = url.replace("http://", "")
750 self.remote_docker_image_urls[vnf_container_name] = url
751 LOG.debug("Found Docker image URL (%r): %r" %
752 (vnf_container_name,
753 self.remote_docker_image_urls[vnf_container_name]))
754 for cu in v.get("cloudnative_deployment_units", []):
755 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
756 url = cu.get("image")
757 if url is not None:
758 url = url.replace("http://", "")
759 self.remote_docker_image_urls[vnf_container_name] = url
760 LOG.debug("Found Docker image URL (%r): %r" %
761 (vnf_container_name,
762 self.remote_docker_image_urls[vnf_container_name]))
763
764 def _build_images_from_dockerfiles(self):
765 """
766 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
767 """
768 if GK_STANDALONE_MODE:
769 return # do not build anything in standalone mode
770 dc = DockerClient()
771 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
772 self.local_docker_files))
773 for k, v in self.local_docker_files.iteritems():
774 for line in dc.build(path=v.replace(
775 "Dockerfile", ""), tag=k, rm=False, nocache=False):
776 LOG.debug("DOCKER BUILD: %s" % line)
777 LOG.info("Docker image created: %s" % k)
778
779 def _pull_predefined_dockerimages(self):
780 """
781 If the package contains URLs to pre-build Docker images, we download them with this method.
782 """
783 dc = DockerClient()
784 for url in self.remote_docker_image_urls.itervalues():
785 # only pull if not present (speedup for development)
786 if not FORCE_PULL:
787 if len(dc.images.list(name=url)) > 0:
788 LOG.debug("Image %r present. Skipping pull." % url)
789 continue
790 LOG.info("Pulling image: %r" % url)
791 # this seems to fail with latest docker api version 2.0.2
792 # dc.images.pull(url,
793 # insecure_registry=True)
794 # using docker cli instead
795 cmd = ["docker",
796 "pull",
797 url,
798 ]
799 Popen(cmd).wait()
800
801 def _check_docker_image_exists(self, image_name):
802 """
803 Query the docker service and check if the given image exists
804 :param image_name: name of the docker image
805 :return:
806 """
807 return len(DockerClient().images.list(name=image_name)) > 0
808
809 def _calculate_placement(self, algorithm):
810 """
811 Do placement by adding the a field "dc" to
812 each VNFD that points to one of our
813 data center objects known to the gatekeeper.
814 """
815 assert(len(self.vnfds) > 0)
816 assert(len(GK.dcs) > 0)
817 # instantiate algorithm an place
818 p = algorithm()
819 p.place(self.nsd, self.vnfds, GK.dcs)
820 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
821 # lets print the placement result
822 for name, vnfd in self.vnfds.iteritems():
823 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
824
825 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
826 """
827 Calculate cpu period and quota for CFS
828 :param cpu_time_percentage: percentage of overall CPU to be used
829 :return: cpu_period, cpu_quota
830 """
831 if cpu_time_percentage is None:
832 return -1, -1
833 if cpu_time_percentage < 0:
834 return -1, -1
835 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
836 # Attention minimum cpu_quota is 1ms (micro)
837 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
838 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
839 (cpu_period, cpu_time_percentage))
840 # calculate the fraction of cpu time for this container
841 cpu_quota = cpu_period * cpu_time_percentage
842 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
843 # idea why
844 if cpu_quota < 1000:
845 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
846 cpu_quota = 1000
847 LOG.warning("Increased CPU quota to avoid system error.")
848 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
849 (cpu_period, cpu_quota))
850 return int(cpu_period), int(cpu_quota)
851
852
853 """
854 Some (simple) placement algorithms
855 """
856
857
858 class FirstDcPlacement(object):
859 """
860 Placement: Always use one and the same data center from the GK.dcs dict.
861 """
862
863 def place(self, nsd, vnfds, dcs):
864 for id, vnfd in vnfds.iteritems():
865 vnfd["dc"] = list(dcs.itervalues())[0]
866
867
868 class RoundRobinDcPlacement(object):
869 """
870 Placement: Distribute VNFs across all available DCs in a round robin fashion.
871 """
872
873 def place(self, nsd, vnfds, dcs):
874 c = 0
875 dcs_list = list(dcs.itervalues())
876 for id, vnfd in vnfds.iteritems():
877 vnfd["dc"] = dcs_list[c % len(dcs_list)]
878 c += 1 # inc. c to use next DC
879
880
881 """
882 Resource definitions and API endpoints
883 """
884
885
886 class Packages(fr.Resource):
887
888 def post(self):
889 """
890 Upload a *.son service package to the dummy gatekeeper.
891
892 We expect request with a *.son file and store it in UPLOAD_FOLDER
893 :return: UUID
894 """
895 try:
896 # get file contents
897 LOG.info("POST /packages called")
898 # lets search for the package in the request
899 is_file_object = False # make API more robust: file can be in data or in files field
900 if "package" in request.files:
901 son_file = request.files["package"]
902 is_file_object = True
903 elif len(request.data) > 0:
904 son_file = request.data
905 else:
906 return {"service_uuid": None, "size": 0, "sha1": None,
907 "error": "upload failed. file not found."}, 500
908 # generate a uuid to reference this package
909 service_uuid = str(uuid.uuid4())
910 file_hash = hashlib.sha1(str(son_file)).hexdigest()
911 # ensure that upload folder exists
912 ensure_dir(UPLOAD_FOLDER)
913 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
914 # store *.son file to disk
915 if is_file_object:
916 son_file.save(upload_path)
917 else:
918 with open(upload_path, 'wb') as f:
919 f.write(son_file)
920 size = os.path.getsize(upload_path)
921
922 # first stop and delete any other running services
923 if AUTO_DELETE:
924 service_list = copy.copy(GK.services)
925 for service_uuid in service_list:
926 instances_list = copy.copy(
927 GK.services[service_uuid].instances)
928 for instance_uuid in instances_list:
929 # valid service and instance UUID, stop service
930 GK.services.get(service_uuid).stop_service(
931 instance_uuid)
932 LOG.info("service instance with uuid %r stopped." %
933 instance_uuid)
934
935 # create a service object and register it
936 s = Service(service_uuid, file_hash, upload_path)
937 GK.register_service_package(service_uuid, s)
938
939 # automatically deploy the service
940 if AUTO_DEPLOY:
941 # ok, we have a service uuid, lets start the service
942 reset_subnets()
943 GK.services.get(service_uuid).start_service()
944
945 # generate the JSON result
946 return {"service_uuid": service_uuid, "size": size,
947 "sha1": file_hash, "error": None}, 201
948 except BaseException:
949 LOG.exception("Service package upload failed:")
950 return {"service_uuid": None, "size": 0,
951 "sha1": None, "error": "upload failed"}, 500
952
953 def get(self):
954 """
955 Return a list of UUID's of uploaded service packages.
956 :return: dict/list
957 """
958 LOG.info("GET /packages")
959 return {"service_uuid_list": list(GK.services.iterkeys())}
960
961
962 class Instantiations(fr.Resource):
963
964 def post(self):
965 """
966 Instantiate a service specified by its UUID.
967 Will return a new UUID to identify the running service instance.
968 :return: UUID
969 """
970 LOG.info("POST /instantiations (or /requests) called")
971 # try to extract the service uuid from the request
972 json_data = request.get_json(force=True)
973 service_uuid = json_data.get("service_uuid")
974 service_name = json_data.get("service_name")
975
976 # first try to find by service_name
977 if service_name is not None:
978 for s_uuid, s in GK.services.iteritems():
979 if s.manifest.get("name") == service_name:
980 LOG.info("Searched for: {}. Found service w. UUID: {}"
981 .format(service_name, s_uuid))
982 service_uuid = s_uuid
983 # lets be a bit fuzzy here to make testing easier
984 if (service_uuid is None or service_uuid ==
985 "latest") and len(GK.services) > 0:
986 # if we don't get a service uuid, we simple start the first service
987 # in the list
988 service_uuid = list(GK.services.iterkeys())[0]
989 if service_uuid in GK.services:
990 # ok, we have a service uuid, lets start the service
991 service_instance_uuid = GK.services.get(
992 service_uuid).start_service()
993 return {"service_instance_uuid": service_instance_uuid}, 201
994 return "Service not found", 404
995
996 def get(self):
997 """
998 Returns a list of UUIDs containing all running services.
999 :return: dict / list
1000 """
1001 LOG.info("GET /instantiations")
1002 return {"service_instantiations_list": [
1003 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
1004
1005 def delete(self):
1006 """
1007 Stops a running service specified by its service and instance UUID.
1008 """
1009 # try to extract the service and instance UUID from the request
1010 json_data = request.get_json(force=True)
1011 service_uuid_input = json_data.get("service_uuid")
1012 instance_uuid_input = json_data.get("service_instance_uuid")
1013 if len(GK.services) < 1:
1014 return "No service on-boarded.", 404
1015 # try to be fuzzy
1016 if service_uuid_input is None:
1017 # if we don't get a service uuid we stop all services
1018 service_uuid_list = list(GK.services.iterkeys())
1019 LOG.info("No service_uuid given, stopping all.")
1020 else:
1021 service_uuid_list = [service_uuid_input]
1022 # for each service
1023 for service_uuid in service_uuid_list:
1024 if instance_uuid_input is None:
1025 instance_uuid_list = list(
1026 GK.services[service_uuid].instances.iterkeys())
1027 else:
1028 instance_uuid_list = [instance_uuid_input]
1029 # for all service instances
1030 for instance_uuid in instance_uuid_list:
1031 if (service_uuid in GK.services and
1032 instance_uuid in GK.services[service_uuid].instances):
1033 # valid service and instance UUID, stop service
1034 GK.services.get(service_uuid).stop_service(instance_uuid)
1035 LOG.info("Service instance with uuid %r stopped." % instance_uuid)
1036 return "Service(s) stopped.", 200
1037
1038
1039 class Exit(fr.Resource):
1040
1041 def put(self):
1042 """
1043 Stop the running Containernet instance regardless of data transmitted
1044 """
1045 list(GK.dcs.values())[0].net.stop()
1046
1047
1048 def generate_subnets(prefix, base, subnet_size=50, mask=24):
1049 # Generate a list of ipaddress in subnets
1050 r = list()
1051 for net in range(base, base + subnet_size):
1052 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
1053 r.append(ipaddress.ip_network(unicode(subnet)))
1054 return r
1055
1056
1057 def reset_subnets():
1058 global ELINE_SUBNETS
1059 global ELAN_SUBNETS
1060 # private subnet definitions for the generated interfaces
1061 # 30.0.xxx.0/24
1062 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
1063 # 20.0.xxx.0/24
1064 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
1065
1066
1067 def initialize_GK():
1068 global GK
1069 GK = Gatekeeper()
1070
1071
1072 # create a single, global GK object
1073 GK = None
1074 initialize_GK()
1075 # setup Flask
1076 http_server = None
1077 app = Flask(__name__)
1078 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1079 api = fr.Api(app)
1080 # define endpoints
1081 api.add_resource(Packages, '/packages', '/api/v2/packages')
1082 api.add_resource(Instantiations, '/instantiations',
1083 '/api/v2/instantiations', '/api/v2/requests')
1084 api.add_resource(Exit, '/emulator/exit')
1085
1086
1087 def start_rest_api(host, port, datacenters=dict()):
1088 global http_server
1089 GK.dcs = datacenters
1090 GK.net = get_dc_network()
1091 # start the Flask server (not the best performance but ok for our use case)
1092 # app.run(host=host,
1093 # port=port,
1094 # debug=True,
1095 # use_reloader=False # this is needed to run Flask in a non-main thread
1096 # )
1097 http_server = WSGIServer((host, port), app, log=open("/dev/null", "w"))
1098 http_server.serve_forever()
1099
1100
1101 def stop_rest_api():
1102 if http_server:
1103 http_server.close()
1104
1105
1106 def ensure_dir(name):
1107 if not os.path.exists(name):
1108 os.makedirs(name)
1109
1110
1111 def load_yaml(path):
1112 with open(path, "r") as f:
1113 try:
1114 r = yaml.load(f)
1115 except yaml.YAMLError as exc:
1116 LOG.exception("YAML parse error: %r" % str(exc))
1117 r = dict()
1118 return r
1119
1120
1121 def make_relative_path(path):
1122 if path.startswith("file://"):
1123 path = path.replace("file://", "", 1)
1124 if path.startswith("/"):
1125 path = path.replace("/", "", 1)
1126 return path
1127
1128
1129 def get_dc_network():
1130 """
1131 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1132 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1133 :return:
1134 """
1135 assert (len(GK.dcs) > 0)
1136 return GK.dcs.values()[0].net
1137
1138
1139 def parse_interface(interface_name):
1140 """
1141 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1142 :param interface_name:
1143 :return:
1144 """
1145 if ':' in interface_name:
1146 vnf_id, vnf_interface = interface_name.split(':')
1147 else:
1148 vnf_id = None
1149 vnf_interface = interface_name
1150 return vnf_id, vnf_interface
1151
1152
1153 def get_container_name(vnf_id, vdu_id, ssiid=None):
1154 if ssiid is not None:
1155 return "{}.{}.{}".format(vnf_id, vdu_id, ssiid)
1156 return "{}.{}".format(vnf_id, vdu_id)
1157
1158
1159 def get_triple_id(descr):
1160 return "{}.{}.{}".format(
1161 descr.get("vendor"), descr.get("name"), descr.get("version"))
1162
1163
1164 def update_port_mapping_multi_instance(ssiid, port_bindings):
1165 """
1166 Port_bindings are used to expose ports of the deployed containers.
1167 They would collide if we deploy multiple service instances.
1168 This function adds a offset to them which is based on the
1169 short service instance id (SSIID).
1170 MULTI_INSTANCE_PORT_OFFSET
1171 """
1172 def _offset(p):
1173 return p + MULTI_INSTANCE_PORT_OFFSET * ssiid
1174
1175 port_bindings = {k: _offset(v) for k, v in port_bindings.iteritems()}
1176 return port_bindings
1177
1178
1179 if __name__ == '__main__':
1180 """
1181 Lets allow to run the API in standalone mode.
1182 """
1183 GK_STANDALONE_MODE = True
1184 logging.getLogger("werkzeug").setLevel(logging.INFO)
1185 start_rest_api("0.0.0.0", 8000)