Feature #290: Multi-VDU support for 5GTANGO LLCM. VUD deployment works.
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from collections import defaultdict
43 import pkg_resources
44 from subprocess import Popen
45 import ipaddress
46 import copy
47 import time
48 from functools import reduce
49
50
51 LOG = logging.getLogger("5gtango.llcm")
52 LOG.setLevel(logging.INFO)
53
54
55 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
56 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
57 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
58
59 # Enable Dockerfile build functionality
60 BUILD_DOCKERFILE = False
61
62 # flag to indicate that we run without the emulator (only the bare API for
63 # integration testing)
64 GK_STANDALONE_MODE = False
65
66 # should a new version of an image be pulled even if its available
67 FORCE_PULL = False
68
69 # Automatically deploy SAPs (endpoints) of the service as new containers
70 # Attention: This is not a configuration switch but a global variable!
71 # Don't change its default value.
72 DEPLOY_SAP = False
73
74 # flag to indicate if we use bidirectional forwarding rules in the
75 # automatic chaining process
76 BIDIRECTIONAL_CHAIN = True
77
78 # override the management interfaces in the descriptors with default
79 # docker0 interfaces in the containers
80 USE_DOCKER_MGMT = False
81
82 # automatically deploy uploaded packages (no need to execute son-access
83 # deploy --latest separately)
84 AUTO_DEPLOY = False
85
86 # and also automatically terminate any other running services
87 AUTO_DELETE = False
88
89
90 def generate_subnets(prefix, base, subnet_size=50, mask=24):
91 # Generate a list of ipaddress in subnets
92 r = list()
93 for net in range(base, base + subnet_size):
94 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
95 r.append(ipaddress.ip_network(unicode(subnet)))
96 return r
97
98
99 # private subnet definitions for the generated interfaces
100 # 99.0.xxx.0/24
101 SAP_SUBNETS = generate_subnets('99.0', 0, subnet_size=50, mask=24)
102 # 30.0.xxx.0/24
103 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
104 # 20.0.xxx.0/24
105 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
106
107 # path to the VNFD for the SAP VNF that is deployed as internal SAP point
108 SAP_VNFD = None
109
110 # Time in seconds to wait for vnf stop scripts to execute fully
111 VNF_STOP_WAIT_TIME = 5
112
113
114 class OnBoardingException(BaseException):
115 pass
116
117
118 class Gatekeeper(object):
119
120 def __init__(self):
121 self.services = dict()
122 self.dcs = dict()
123 self.net = None
124 # used to generate short names for VNFs (Mininet limitation)
125 self.vnf_counter = 0
126 LOG.info("Initialized 5GTANGO LLCM module.")
127
128 def register_service_package(self, service_uuid, service):
129 """
130 register new service package
131 :param service_uuid
132 :param service object
133 """
134 self.services[service_uuid] = service
135 # lets perform all steps needed to onboard the service
136 service.onboard()
137
138 def get_next_vnf_name(self):
139 self.vnf_counter += 1
140 return "vnf%d" % self.vnf_counter
141
142
143 class Service(object):
144 """
145 This class represents a NS uploaded as a *.son package to the
146 dummy gatekeeper.
147 Can have multiple running instances of this service.
148 """
149
150 def __init__(self,
151 service_uuid,
152 package_file_hash,
153 package_file_path):
154 self.uuid = service_uuid
155 self.package_file_hash = package_file_hash
156 self.package_file_path = package_file_path
157 self.package_content_path = os.path.join(
158 CATALOG_FOLDER, "services/%s" % self.uuid)
159 self.manifest = None
160 self.nsd = None
161 self.vnfds = dict()
162 self.saps = dict()
163 self.saps_ext = list()
164 self.saps_int = list()
165 self.local_docker_files = dict()
166 self.remote_docker_image_urls = dict()
167 self.instances = dict()
168 # dict to find the vnf_name for any vnf id
169 self.vnf_id2vnf_name = dict()
170
171 def onboard(self):
172 """
173 Do all steps to prepare this service to be instantiated
174 :return:
175 """
176 # 1. extract the contents of the package and store them in our catalog
177 self._unpack_service_package()
178 # 2. read in all descriptor files
179 self._load_package_descriptor()
180 self._load_nsd()
181 self._load_vnfd()
182 if self.nsd is None:
183 raise OnBoardingException("No NSD found.")
184 if len(self.vnfds) < 1:
185 raise OnBoardingException("No VNFDs found.")
186 if DEPLOY_SAP:
187 self._load_saps()
188 # 3. prepare container images (e.g. download or build Dockerfile)
189 if BUILD_DOCKERFILE:
190 self._load_docker_files()
191 self._build_images_from_dockerfiles()
192 else:
193 self._load_docker_urls()
194 self._pull_predefined_dockerimages()
195 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
196
197 def start_service(self):
198 """
199 This methods creates and starts a new service instance.
200 It computes placements, iterates over all VNFDs, and starts
201 each VNFD as a Docker container in the data center selected
202 by the placement algorithm.
203 :return:
204 """
205 LOG.info("Starting service %r" % self.uuid)
206
207 # 1. each service instance gets a new uuid to identify it
208 instance_uuid = str(uuid.uuid4())
209 # build a instances dict (a bit like a NSR :))
210 self.instances[instance_uuid] = dict()
211 self.instances[instance_uuid]["vnf_instances"] = list()
212
213 # 2. compute placement of this service instance (adds DC names to
214 # VNFDs)
215 if not GK_STANDALONE_MODE:
216 # self._calculate_placement(FirstDcPlacement)
217 self._calculate_placement(RoundRobinDcPlacement)
218 # 3. start all vnfds that we have in the service (except SAPs)
219 for vnf_id in self.vnfds:
220 vnfd = self.vnfds[vnf_id]
221 vnfis = self._start_vnfd(vnfd, vnf_id)
222 # add list of VNFIs to total VNFI list
223 self.instances[instance_uuid]["vnf_instances"] + vnfis
224
225 # 4. Deploy E-Line and E_LAN links
226 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
227 # even if "forwarding_graphs" are not used directly.
228 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
229 vlinks = self.nsd["virtual_links"]
230 # constituent virtual links are not checked
231 # fwd_links = self.nsd["forwarding_graphs"][0]["constituent_virtual_links"]
232 eline_fwd_links = [l for l in vlinks if (
233 l["connectivity_type"] == "E-Line")]
234 elan_fwd_links = [l for l in vlinks if (
235 l["connectivity_type"] == "E-LAN" or
236 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
237
238 GK.net.deployed_elines.extend(eline_fwd_links)
239 GK.net.deployed_elans.extend(elan_fwd_links)
240
241 # 5a. deploy E-Line links
242 self._connect_elines(eline_fwd_links, instance_uuid)
243
244 # 5b. deploy E-LAN links
245 self._connect_elans(elan_fwd_links, instance_uuid)
246
247 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
248 # service instance
249 self._trigger_emulator_start_scripts_in_vnfis(
250 self.instances[instance_uuid]["vnf_instances"])
251
252 LOG.info("Service started. Instance id: %r" % instance_uuid)
253 return instance_uuid
254
255 def stop_service(self, instance_uuid):
256 """
257 This method stops a running service instance.
258 It iterates over all VNF instances, stopping them each
259 and removing them from their data center.
260
261 :param instance_uuid: the uuid of the service instance to be stopped
262 """
263 LOG.info("Stopping service %r" % self.uuid)
264 # get relevant information
265 # instance_uuid = str(self.uuid.uuid4())
266 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
267
268 # trigger stop skripts in vnf instances and wait a few seconds for
269 # completion
270 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
271 time.sleep(VNF_STOP_WAIT_TIME)
272
273 for v in vnf_instances:
274 self._stop_vnfi(v)
275
276 for sap_name in self.saps_ext:
277 ext_sap = self.saps[sap_name]
278 target_dc = ext_sap.get("dc")
279 target_dc.removeExternalSAP(sap_name)
280 LOG.info("Stopping the SAP instance: %r in DC %r" %
281 (sap_name, target_dc))
282
283 # last step: remove the instance from the list of all instances
284 del self.instances[instance_uuid]
285
286 def _get_resource_limits(self, deployment_unit):
287 """
288 Extract resource limits from deployment units.
289 """
290 # defaults
291 cpu_list = "1"
292 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
293 mem_limit = 0
294 # update from descriptor
295 if "resource_requirements" in deployment_unit:
296 res_req = deployment_unit.get("resource_requirements")
297 cpu_list = res_req.get("cpu").get("cores")
298 if cpu_list is None:
299 cpu_list = res_req.get("cpu").get("vcpus")
300 cpu_bw = res_req.get("cpu").get("cpu_bw", 1.0)
301 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
302 mem_num = str(res_req.get("memory").get("size", 2))
303 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
304 mem_limit = float(mem_num)
305 if mem_unit == "GB":
306 mem_limit = mem_limit * 1024 * 1024 * 1024
307 elif mem_unit == "MB":
308 mem_limit = mem_limit * 1024 * 1024
309 elif mem_unit == "KB":
310 mem_limit = mem_limit * 1024
311 mem_limit = int(mem_limit)
312 return cpu_list, cpu_period, cpu_quota, mem_limit
313
314 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
315 """
316 Start a single VNFD of this service
317 :param vnfd: vnfd descriptor dict
318 :param vnf_id: unique id of this vnf in the nsd
319 :return:
320 """
321 vnfis = list()
322 # the vnf_name refers to the container image to be deployed
323 vnf_name = vnfd.get("name")
324 # combine VDUs and CDUs
325 deployment_units = (vnfd.get("virtual_deployment_units", []) +
326 vnfd.get("cloudnative_deployment_units", []))
327 # iterate over all deployment units within each VNFDs
328 for u in deployment_units:
329 # 0. vnf_container_name = vnf_id.vdu_id
330 vnf_container_name = get_container_name(vnf_id, u.get("id"))
331 # 1. get the name of the docker image to star
332 if vnf_container_name not in self.remote_docker_image_urls:
333 raise Exception("No image name for %r found. Abort." % vnf_container_name)
334 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
335 # 2. select datacenter to start the VNF in
336 target_dc = vnfd.get("dc")
337 # 3. perform some checks to ensure we can start the container
338 assert(docker_image_name is not None)
339 assert(target_dc is not None)
340 if not self._check_docker_image_exists(docker_image_name):
341 raise Exception("Docker image {} not found. Abort."
342 .format(docker_image_name))
343
344 # 4. get the resource limits
345 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
346
347 # check if we need to deploy the management ports (defined as
348 # type:management both on in the vnfd and nsd)
349 intfs = vnfd.get("connection_points", [])
350 # do some re-naming of fields to be compatible to containernet
351 for i in intfs:
352 if i.get("address"):
353 i["ip"] = i.get("address")
354
355 # 5. collect additional information to start container
356 volumes = list()
357 cenv = dict()
358 # 5.1 inject descriptor based start/stop commands into env (overwrite)
359 VNFD_CMD_START = u.get("vm_cmd_start")
360 VNFD_CMD_STOP = u.get("vm_cmd_stop")
361 if VNFD_CMD_START and not VNFD_CMD_START == "None":
362 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
363 " Overwriting SON_EMU_CMD.")
364 cenv["SON_EMU_CMD"] = VNFD_CMD_START
365 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
366 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
367 " Overwriting SON_EMU_CMD_STOP.")
368 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
369
370 # 6. Start the container
371 LOG.info("Starting %r as %r in DC %r" %
372 (vnf_name, vnf_container_name, vnfd.get("dc")))
373 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
374 # start the container
375 vnfi = target_dc.startCompute(
376 vnf_container_name,
377 network=intfs,
378 image=docker_image_name,
379 flavor_name="small",
380 cpu_quota=cpu_quota,
381 cpu_period=cpu_period,
382 cpuset=cpu_list,
383 mem_limit=mem_limit,
384 volumes=volumes,
385 properties=cenv, # environment
386 type=kwargs.get('type', 'docker'))
387 # add vnfd reference to vnfi
388 vnfi.vnfd = vnfd
389 # add container name
390 vnfi.vnf_container_name = vnf_container_name
391 # store vnfi
392 vnfis.append(vnfi)
393 return vnfis
394
395 def _stop_vnfi(self, vnfi):
396 """
397 Stop a VNF instance.
398
399 :param vnfi: vnf instance to be stopped
400 """
401 # Find the correct datacenter
402 status = vnfi.getStatus()
403 dc = vnfi.datacenter
404
405 # stop the vnfi
406 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
407 (status["name"], dc))
408 dc.stopCompute(status["name"])
409
410 def _get_vnf_instance(self, instance_uuid, vnf_id):
411 """
412 Returns the Docker object for the given VNF id (or Docker name).
413 :param instance_uuid: UUID of the service instance to search in.
414 :param name: VNF name or Docker name. We are fuzzy here.
415 :return:
416 """
417 dn = vnf_id
418 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
419 if vnfi.name == dn:
420 return vnfi
421 LOG.warning("No container with name: {0} found.".format(dn))
422 return None
423
424 @staticmethod
425 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
426 """
427 Reconfigure the network configuration of a specific interface
428 of a running container.
429 :param vnfi: container instance
430 :param if_name: interface name
431 :param net_str: network configuration string, e.g., 1.2.3.4/24
432 :return:
433 """
434
435 # assign new ip address
436 if net_str is not None:
437 intf = vnfi.intf(intf=if_name)
438 if intf is not None:
439 intf.setIP(net_str)
440 LOG.debug("Reconfigured network of %s:%s to %r" %
441 (vnfi.name, if_name, net_str))
442 else:
443 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
444 vnfi.name, if_name))
445
446 if new_name is not None:
447 vnfi.cmd('ip link set', if_name, 'down')
448 vnfi.cmd('ip link set', if_name, 'name', new_name)
449 vnfi.cmd('ip link set', new_name, 'up')
450 LOG.debug("Reconfigured interface name of %s:%s to %s" %
451 (vnfi.name, if_name, new_name))
452
453 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
454 for vnfi in vnfi_list:
455 config = vnfi.dcinfo.get("Config", dict())
456 env = config.get("Env", list())
457 for env_var in env:
458 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
459 LOG.debug("%r = %r" % (var, cmd))
460 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
461 LOG.info("Executing script in '{}': {}={}"
462 .format(vnfi.name, var, cmd))
463 # execute command in new thread to ensure that GK is not
464 # blocked by VNF
465 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
466 t.daemon = True
467 t.start()
468 break # only execute one command
469
470 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
471 for vnfi in vnfi_list:
472 config = vnfi.dcinfo.get("Config", dict())
473 env = config.get("Env", list())
474 for env_var in env:
475 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
476 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
477 LOG.info("Executing script in '{}': {}={}"
478 .format(vnfi.name, var, cmd))
479 # execute command in new thread to ensure that GK is not
480 # blocked by VNF
481 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
482 t.daemon = True
483 t.start()
484 break # only execute one command
485
486 def _unpack_service_package(self):
487 """
488 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
489 """
490 LOG.info("Unzipping: %r" % self.package_file_path)
491 with zipfile.ZipFile(self.package_file_path, "r") as z:
492 z.extractall(self.package_content_path)
493
494 def _load_package_descriptor(self):
495 """
496 Load the main package descriptor YAML and keep it as dict.
497 :return:
498 """
499 self.manifest = load_yaml(
500 os.path.join(
501 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
502
503 def _load_nsd(self):
504 """
505 Load the entry NSD YAML and keep it as dict.
506 :return:
507 """
508 if "package_content" in self.manifest:
509 nsd_path = None
510 for f in self.manifest.get("package_content"):
511 if f.get("content-type") == "application/vnd.5gtango.nsd":
512 nsd_path = os.path.join(
513 self.package_content_path,
514 make_relative_path(f.get("source")))
515 break # always use the first NSD for now
516 if nsd_path is None:
517 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
518 self.nsd = load_yaml(nsd_path)
519 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
520 # create dict to find the vnf_name for any vnf id
521 self.vnf_id2vnf_name = defaultdict(lambda: "NotExistingNode",
522 reduce(lambda x, y: dict(x, **y),
523 map(lambda d: {d["vnf_id"]: d["vnf_name"]},
524 self.nsd["network_functions"])))
525 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
526 else:
527 raise OnBoardingException(
528 "No 'package_content' section in package manifest:\n{}"
529 .format(self.manifest))
530
531 def _load_vnfd(self):
532 """
533 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
534 :return:
535 """
536
537 # first make a list of all the vnfds in the package
538 vnfd_set = dict()
539 if "package_content" in self.manifest:
540 for pc in self.manifest.get("package_content"):
541 if pc.get(
542 "content-type") == "application/vnd.5gtango.vnfd":
543 vnfd_path = os.path.join(
544 self.package_content_path,
545 make_relative_path(pc.get("source")))
546 vnfd = load_yaml(vnfd_path)
547 vnfd_set[vnfd.get("name")] = vnfd
548 if len(vnfd_set) < 1:
549 raise OnBoardingException("No VNFDs found.")
550 # then link each vnf_id in the nsd to its vnfd
551 for vnf_id in self.vnf_id2vnf_name:
552 vnf_name = self.vnf_id2vnf_name[vnf_id]
553 self.vnfds[vnf_id] = vnfd_set[vnf_name]
554 LOG.debug("Loaded VNFD: {0} id: {1}".format(vnf_name, vnf_id))
555
556 def _load_saps(self):
557 # create list of all SAPs
558 # check if we need to deploy management ports
559 if USE_DOCKER_MGMT:
560 SAPs = [p for p in self.nsd["connection_points"]
561 if 'management' not in p.get('type')]
562 else:
563 SAPs = [p for p in self.nsd["connection_points"]]
564
565 for sap in SAPs:
566 # endpoint needed in this service
567 sap_id, sap_interface, sap_docker_name = parse_interface(sap['id'])
568 # make sure SAP has type set (default internal)
569 sap["type"] = sap.get("type", 'internal')
570
571 # Each Service Access Point (connection_point) in the nsd is an IP
572 # address on the host
573 if sap["type"] == "external":
574 # add to vnfds to calculate placement later on
575 sap_net = SAP_SUBNETS.pop(0)
576 self.saps[sap_docker_name] = {
577 "name": sap_docker_name, "type": "external", "net": sap_net}
578 # add SAP vnf to list in the NSD so it is deployed later on
579 # each SAP gets a unique VNFD and vnf_id in the NSD and custom
580 # type (only defined in the dummygatekeeper)
581 self.nsd["network_functions"].append(
582 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_ext"})
583
584 # Each Service Access Point (connection_point) in the nsd is
585 # getting its own container (default)
586 elif sap["type"] == "internal" or sap["type"] == "management":
587 # add SAP to self.vnfds
588 if SAP_VNFD is None:
589 sapfile = pkg_resources.resource_filename(
590 __name__, "sap_vnfd.yml")
591 else:
592 sapfile = SAP_VNFD
593 sap_vnfd = load_yaml(sapfile)
594 sap_vnfd["connection_points"][0]["id"] = sap_interface
595 sap_vnfd["name"] = sap_docker_name
596 sap_vnfd["type"] = "internal"
597 # add to vnfds to calculate placement later on and deploy
598 self.saps[sap_docker_name] = sap_vnfd
599 # add SAP vnf to list in the NSD so it is deployed later on
600 # each SAP get a unique VNFD and vnf_id in the NSD
601 self.nsd["network_functions"].append(
602 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_int"})
603
604 LOG.debug("Loaded SAP: name: {0}, type: {1}".format(
605 sap_docker_name, sap['type']))
606
607 # create sap lists
608 self.saps_ext = [self.saps[sap]['name']
609 for sap in self.saps if self.saps[sap]["type"] == "external"]
610 self.saps_int = [self.saps[sap]['name']
611 for sap in self.saps if self.saps[sap]["type"] == "internal"]
612
613 def _start_sap(self, sap, instance_uuid):
614 if not DEPLOY_SAP:
615 return
616
617 LOG.info('start SAP: {0} ,type: {1}'.format(sap['name'], sap['type']))
618 if sap["type"] == "internal":
619 vnfi = None
620 if not GK_STANDALONE_MODE:
621 vnfi = self._start_vnfd(sap, sap['name'], type='sap_int')
622 self.instances[instance_uuid]["vnf_instances"].append(vnfi)
623
624 elif sap["type"] == "external":
625 target_dc = sap.get("dc")
626 # add interface to dc switch
627 target_dc.attachExternalSAP(sap['name'], sap['net'])
628
629 def _connect_elines(self, eline_fwd_links, instance_uuid):
630 """
631 Connect all E-LINE links in the NSD
632 :param eline_fwd_links: list of E-LINE links in the NSD
633 :param: instance_uuid of the service
634 :return:
635 """
636 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
637 # eg. different services get a unique cookie for their flowrules
638 cookie = 1
639 for link in eline_fwd_links:
640 LOG.info("Found E-Line: {}".format(link))
641 # check if we need to deploy this link when its a management link:
642 if USE_DOCKER_MGMT:
643 if self.check_mgmt_interface(
644 link["connection_points_reference"]):
645 continue
646
647 src_id, src_if_name, src_sap_id = parse_interface(
648 link["connection_points_reference"][0])
649 dst_id, dst_if_name, dst_sap_id = parse_interface(
650 link["connection_points_reference"][1])
651
652 setChaining = False
653 # check if there is a SAP in the link and chain everything together
654 if src_sap_id in self.saps and dst_sap_id in self.saps:
655 LOG.info(
656 '2 SAPs cannot be chained together : {0} - {1}'.format(src_sap_id, dst_sap_id))
657 continue
658
659 elif src_sap_id in self.saps_ext:
660 src_id = src_sap_id
661 # set intf name to None so the chaining function will choose
662 # the first one
663 src_if_name = None
664 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
665 if dst_vnfi is not None:
666 # choose first ip address in sap subnet
667 sap_net = self.saps[src_sap_id]['net']
668 sap_ip = "{0}/{1}".format(str(sap_net[2]),
669 sap_net.prefixlen)
670 self._vnf_reconfigure_network(
671 dst_vnfi, dst_if_name, sap_ip)
672 setChaining = True
673
674 elif dst_sap_id in self.saps_ext:
675 dst_id = dst_sap_id
676 # set intf name to None so the chaining function will choose
677 # the first one
678 dst_if_name = None
679 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
680 if src_vnfi is not None:
681 sap_net = self.saps[dst_sap_id]['net']
682 sap_ip = "{0}/{1}".format(str(sap_net[2]),
683 sap_net.prefixlen)
684 self._vnf_reconfigure_network(
685 src_vnfi, src_if_name, sap_ip)
686 setChaining = True
687
688 # Link between 2 VNFs
689 else:
690 LOG.info("Creating E-Line: src={}, dst={}"
691 .format(src_id, dst_id))
692 # make sure we use the correct sap vnf name
693 if src_sap_id in self.saps_int:
694 src_id = src_sap_id
695 if dst_sap_id in self.saps_int:
696 dst_id = dst_sap_id
697 # get involved vnfis
698 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
699 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
700
701 if src_vnfi is not None and dst_vnfi is not None:
702 setChaining = True
703 # re-configure the VNFs IP assignment and ensure that a new
704 # subnet is used for each E-Link
705 eline_net = ELINE_SUBNETS.pop(0)
706 ip1 = "{0}/{1}".format(str(eline_net[1]),
707 eline_net.prefixlen)
708 ip2 = "{0}/{1}".format(str(eline_net[2]),
709 eline_net.prefixlen)
710 # check if VNFs have fixed IPs (address field in VNFDs)
711 if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
712 .get("address") is None):
713 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
714 # check if VNFs have fixed IPs (address field in VNFDs)
715 if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
716 .get("address") is None):
717 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
718
719 # Set the chaining
720 if setChaining:
721 GK.net.setChain(
722 src_id, dst_id,
723 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
724 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
725
726 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
727 """
728 Gets the connection point data structure from the VNFD
729 of the given VNFI using ifname.
730 """
731 if vnfi.vnfd is None:
732 return {}
733 cps = vnfi.vnfd.get("connection_points")
734 for cp in cps:
735 if cp.get("id") == ifname:
736 return cp
737
738 def _connect_elans(self, elan_fwd_links, instance_uuid):
739 """
740 Connect all E-LAN links in the NSD
741 :param elan_fwd_links: list of E-LAN links in the NSD
742 :param: instance_uuid of the service
743 :return:
744 """
745 for link in elan_fwd_links:
746 # check if we need to deploy this link when its a management link:
747 if USE_DOCKER_MGMT:
748 if self.check_mgmt_interface(
749 link["connection_points_reference"]):
750 continue
751
752 elan_vnf_list = []
753 # check if an external SAP is in the E-LAN (then a subnet is
754 # already defined)
755 intfs_elan = [intf for intf in link["connection_points_reference"]]
756 lan_sap = self.check_ext_saps(intfs_elan)
757 if lan_sap:
758 lan_net = self.saps[lan_sap]['net']
759 lan_hosts = list(lan_net.hosts())
760 else:
761 lan_net = ELAN_SUBNETS.pop(0)
762 lan_hosts = list(lan_net.hosts())
763
764 # generate lan ip address for all interfaces except external SAPs
765 for intf in link["connection_points_reference"]:
766
767 # skip external SAPs, they already have an ip
768 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
769 intf)
770 if vnf_sap_docker_name in self.saps_ext:
771 elan_vnf_list.append(
772 {'name': vnf_sap_docker_name, 'interface': vnf_interface})
773 continue
774
775 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
776 lan_net.prefixlen)
777 vnf_id, intf_name, vnf_sap_id = parse_interface(intf)
778
779 # make sure we use the correct sap vnf name
780 src_docker_name = vnf_id
781 if vnf_sap_id in self.saps_int:
782 src_docker_name = vnf_sap_id
783 vnf_id = vnf_sap_id
784
785 LOG.debug(
786 "Setting up E-LAN interface. (%s:%s) -> %s" % (
787 vnf_id, intf_name, ip_address))
788
789 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
790 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
791 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is necessary.
792 vnfi = self._get_vnf_instance(instance_uuid, vnf_id)
793 if vnfi is not None:
794 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
795 # add this vnf and interface to the E-LAN for tagging
796 elan_vnf_list.append(
797 {'name': src_docker_name, 'interface': intf_name})
798
799 # install the VLAN tags for this E-LAN
800 GK.net.setLAN(elan_vnf_list)
801
802 def _load_docker_files(self):
803 """
804 Get all paths to Dockerfiles from VNFDs and store them in dict.
805 :return:
806 """
807 for vnf_id, v in self.vnfds.iteritems():
808 for vu in v.get("virtual_deployment_units", []):
809 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
810 if vu.get("vm_image_format") == "docker":
811 vm_image = vu.get("vm_image")
812 docker_path = os.path.join(
813 self.package_content_path,
814 make_relative_path(vm_image))
815 self.local_docker_files[vnf_container_name] = docker_path
816 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
817 for cu in v.get("cloudnative_deployment_units", []):
818 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
819 image = cu.get("image")
820 docker_path = os.path.join(
821 self.package_content_path,
822 make_relative_path(image))
823 self.local_docker_files[vnf_container_name] = docker_path
824 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
825
826 def _load_docker_urls(self):
827 """
828 Get all URLs to pre-build docker images in some repo.
829 :return:
830 """
831 for vnf_id, v in self.vnfds.iteritems():
832 for vu in v.get("virtual_deployment_units", []):
833 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
834 if vu.get("vm_image_format") == "docker":
835 url = vu.get("vm_image")
836 if url is not None:
837 url = url.replace("http://", "")
838 self.remote_docker_image_urls[vnf_container_name] = url
839 LOG.debug("Found Docker image URL (%r): %r" %
840 (vnf_container_name,
841 self.remote_docker_image_urls[vnf_container_name]))
842 for cu in v.get("cloudnative_deployment_units", []):
843 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
844 url = cu.get("image")
845 if url is not None:
846 url = url.replace("http://", "")
847 self.remote_docker_image_urls[vnf_container_name] = url
848 LOG.debug("Found Docker image URL (%r): %r" %
849 (vnf_container_name,
850 self.remote_docker_image_urls[vnf_container_name]))
851
852 def _build_images_from_dockerfiles(self):
853 """
854 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
855 """
856 if GK_STANDALONE_MODE:
857 return # do not build anything in standalone mode
858 dc = DockerClient()
859 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
860 self.local_docker_files))
861 for k, v in self.local_docker_files.iteritems():
862 for line in dc.build(path=v.replace(
863 "Dockerfile", ""), tag=k, rm=False, nocache=False):
864 LOG.debug("DOCKER BUILD: %s" % line)
865 LOG.info("Docker image created: %s" % k)
866
867 def _pull_predefined_dockerimages(self):
868 """
869 If the package contains URLs to pre-build Docker images, we download them with this method.
870 """
871 dc = DockerClient()
872 for url in self.remote_docker_image_urls.itervalues():
873 # only pull if not present (speedup for development)
874 if not FORCE_PULL:
875 if len(dc.images.list(name=url)) > 0:
876 LOG.debug("Image %r present. Skipping pull." % url)
877 continue
878 LOG.info("Pulling image: %r" % url)
879 # this seems to fail with latest docker api version 2.0.2
880 # dc.images.pull(url,
881 # insecure_registry=True)
882 # using docker cli instead
883 cmd = ["docker",
884 "pull",
885 url,
886 ]
887 Popen(cmd).wait()
888
889 def _check_docker_image_exists(self, image_name):
890 """
891 Query the docker service and check if the given image exists
892 :param image_name: name of the docker image
893 :return:
894 """
895 return len(DockerClient().images.list(name=image_name)) > 0
896
897 def _calculate_placement(self, algorithm):
898 """
899 Do placement by adding the a field "dc" to
900 each VNFD that points to one of our
901 data center objects known to the gatekeeper.
902 """
903 assert(len(self.vnfds) > 0)
904 assert(len(GK.dcs) > 0)
905 # instantiate algorithm an place
906 p = algorithm()
907 p.place(self.nsd, self.vnfds, self.saps, GK.dcs)
908 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
909 # lets print the placement result
910 for name, vnfd in self.vnfds.iteritems():
911 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
912 for sap in self.saps:
913 sap_dict = self.saps[sap]
914 LOG.info("Placed SAP %r on DC %r" % (sap, str(sap_dict.get("dc"))))
915
916 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
917 """
918 Calculate cpu period and quota for CFS
919 :param cpu_time_percentage: percentage of overall CPU to be used
920 :return: cpu_period, cpu_quota
921 """
922 if cpu_time_percentage is None:
923 return -1, -1
924 if cpu_time_percentage < 0:
925 return -1, -1
926 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
927 # Attention minimum cpu_quota is 1ms (micro)
928 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
929 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
930 (cpu_period, cpu_time_percentage))
931 # calculate the fraction of cpu time for this container
932 cpu_quota = cpu_period * cpu_time_percentage
933 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
934 # idea why
935 if cpu_quota < 1000:
936 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
937 cpu_quota = 1000
938 LOG.warning("Increased CPU quota to avoid system error.")
939 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
940 (cpu_period, cpu_quota))
941 return int(cpu_period), int(cpu_quota)
942
943 def check_ext_saps(self, intf_list):
944 # check if the list of interfacs contains an external SAP
945 saps_ext = [self.saps[sap]['name']
946 for sap in self.saps if self.saps[sap]["type"] == "external"]
947 for intf_name in intf_list:
948 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
949 intf_name)
950 if vnf_sap_docker_name in saps_ext:
951 return vnf_sap_docker_name
952
953 def check_mgmt_interface(self, intf_list):
954 SAPs_mgmt = [p.get('id') for p in self.nsd["connection_points"]
955 if 'management' in p.get('type')]
956 for intf_name in intf_list:
957 if intf_name in SAPs_mgmt:
958 return True
959
960
961 """
962 Some (simple) placement algorithms
963 """
964
965
966 class FirstDcPlacement(object):
967 """
968 Placement: Always use one and the same data center from the GK.dcs dict.
969 """
970
971 def place(self, nsd, vnfds, saps, dcs):
972 for id, vnfd in vnfds.iteritems():
973 vnfd["dc"] = list(dcs.itervalues())[0]
974
975
976 class RoundRobinDcPlacement(object):
977 """
978 Placement: Distribute VNFs across all available DCs in a round robin fashion.
979 """
980
981 def place(self, nsd, vnfds, saps, dcs):
982 c = 0
983 dcs_list = list(dcs.itervalues())
984 for id, vnfd in vnfds.iteritems():
985 vnfd["dc"] = dcs_list[c % len(dcs_list)]
986 c += 1 # inc. c to use next DC
987
988
989 """
990 Resource definitions and API endpoints
991 """
992
993
994 class Packages(fr.Resource):
995
996 def post(self):
997 """
998 Upload a *.son service package to the dummy gatekeeper.
999
1000 We expect request with a *.son file and store it in UPLOAD_FOLDER
1001 :return: UUID
1002 """
1003 try:
1004 # get file contents
1005 LOG.info("POST /packages called")
1006 # lets search for the package in the request
1007 is_file_object = False # make API more robust: file can be in data or in files field
1008 if "package" in request.files:
1009 son_file = request.files["package"]
1010 is_file_object = True
1011 elif len(request.data) > 0:
1012 son_file = request.data
1013 else:
1014 return {"service_uuid": None, "size": 0, "sha1": None,
1015 "error": "upload failed. file not found."}, 500
1016 # generate a uuid to reference this package
1017 service_uuid = str(uuid.uuid4())
1018 file_hash = hashlib.sha1(str(son_file)).hexdigest()
1019 # ensure that upload folder exists
1020 ensure_dir(UPLOAD_FOLDER)
1021 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
1022 # store *.son file to disk
1023 if is_file_object:
1024 son_file.save(upload_path)
1025 else:
1026 with open(upload_path, 'wb') as f:
1027 f.write(son_file)
1028 size = os.path.getsize(upload_path)
1029
1030 # first stop and delete any other running services
1031 if AUTO_DELETE:
1032 service_list = copy.copy(GK.services)
1033 for service_uuid in service_list:
1034 instances_list = copy.copy(
1035 GK.services[service_uuid].instances)
1036 for instance_uuid in instances_list:
1037 # valid service and instance UUID, stop service
1038 GK.services.get(service_uuid).stop_service(
1039 instance_uuid)
1040 LOG.info("service instance with uuid %r stopped." %
1041 instance_uuid)
1042
1043 # create a service object and register it
1044 s = Service(service_uuid, file_hash, upload_path)
1045 GK.register_service_package(service_uuid, s)
1046
1047 # automatically deploy the service
1048 if AUTO_DEPLOY:
1049 # ok, we have a service uuid, lets start the service
1050 reset_subnets()
1051 GK.services.get(service_uuid).start_service()
1052
1053 # generate the JSON result
1054 return {"service_uuid": service_uuid, "size": size,
1055 "sha1": file_hash, "error": None}, 201
1056 except BaseException:
1057 LOG.exception("Service package upload failed:")
1058 return {"service_uuid": None, "size": 0,
1059 "sha1": None, "error": "upload failed"}, 500
1060
1061 def get(self):
1062 """
1063 Return a list of UUID's of uploaded service packages.
1064 :return: dict/list
1065 """
1066 LOG.info("GET /packages")
1067 return {"service_uuid_list": list(GK.services.iterkeys())}
1068
1069
1070 class Instantiations(fr.Resource):
1071
1072 def post(self):
1073 """
1074 Instantiate a service specified by its UUID.
1075 Will return a new UUID to identify the running service instance.
1076 :return: UUID
1077 """
1078 LOG.info("POST /instantiations (or /requests) called")
1079 # try to extract the service uuid from the request
1080 json_data = request.get_json(force=True)
1081 service_uuid = json_data.get("service_uuid")
1082
1083 # lets be a bit fuzzy here to make testing easier
1084 if (service_uuid is None or service_uuid ==
1085 "latest") and len(GK.services) > 0:
1086 # if we don't get a service uuid, we simple start the first service
1087 # in the list
1088 service_uuid = list(GK.services.iterkeys())[0]
1089 if service_uuid in GK.services:
1090 # ok, we have a service uuid, lets start the service
1091 service_instance_uuid = GK.services.get(
1092 service_uuid).start_service()
1093 return {"service_instance_uuid": service_instance_uuid}, 201
1094 return "Service not found", 404
1095
1096 def get(self):
1097 """
1098 Returns a list of UUIDs containing all running services.
1099 :return: dict / list
1100 """
1101 LOG.info("GET /instantiations")
1102 return {"service_instantiations_list": [
1103 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
1104
1105 def delete(self):
1106 """
1107 Stops a running service specified by its service and instance UUID.
1108 """
1109 # try to extract the service and instance UUID from the request
1110 json_data = request.get_json(force=True)
1111 service_uuid = json_data.get("service_uuid")
1112 instance_uuid = json_data.get("service_instance_uuid")
1113
1114 # try to be fuzzy
1115 if service_uuid is None and len(GK.services) > 0:
1116 # if we don't get a service uuid, we simply stop the last service
1117 # in the list
1118 service_uuid = list(GK.services.iterkeys())[0]
1119 if instance_uuid is None and len(
1120 GK.services[service_uuid].instances) > 0:
1121 instance_uuid = list(
1122 GK.services[service_uuid].instances.iterkeys())[0]
1123
1124 if service_uuid in GK.services and instance_uuid in GK.services[service_uuid].instances:
1125 # valid service and instance UUID, stop service
1126 GK.services.get(service_uuid).stop_service(instance_uuid)
1127 return "service instance with uuid %r stopped." % instance_uuid, 200
1128 return "Service not found", 404
1129
1130
1131 class Exit(fr.Resource):
1132
1133 def put(self):
1134 """
1135 Stop the running Containernet instance regardless of data transmitted
1136 """
1137 list(GK.dcs.values())[0].net.stop()
1138
1139
1140 def initialize_GK():
1141 global GK
1142 GK = Gatekeeper()
1143
1144
1145 # create a single, global GK object
1146 GK = None
1147 initialize_GK()
1148 # setup Flask
1149 app = Flask(__name__)
1150 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1151 api = fr.Api(app)
1152 # define endpoints
1153 api.add_resource(Packages, '/packages', '/api/v2/packages')
1154 api.add_resource(Instantiations, '/instantiations',
1155 '/api/v2/instantiations', '/api/v2/requests')
1156 api.add_resource(Exit, '/emulator/exit')
1157
1158
1159 def start_rest_api(host, port, datacenters=dict()):
1160 GK.dcs = datacenters
1161 GK.net = get_dc_network()
1162 # start the Flask server (not the best performance but ok for our use case)
1163 app.run(host=host,
1164 port=port,
1165 debug=True,
1166 use_reloader=False # this is needed to run Flask in a non-main thread
1167 )
1168
1169
1170 def ensure_dir(name):
1171 if not os.path.exists(name):
1172 os.makedirs(name)
1173
1174
1175 def load_yaml(path):
1176 with open(path, "r") as f:
1177 try:
1178 r = yaml.load(f)
1179 except yaml.YAMLError as exc:
1180 LOG.exception("YAML parse error: %r" % str(exc))
1181 r = dict()
1182 return r
1183
1184
1185 def make_relative_path(path):
1186 if path.startswith("file://"):
1187 path = path.replace("file://", "", 1)
1188 if path.startswith("/"):
1189 path = path.replace("/", "", 1)
1190 return path
1191
1192
1193 def get_dc_network():
1194 """
1195 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1196 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1197 :return:
1198 """
1199 assert (len(GK.dcs) > 0)
1200 return GK.dcs.values()[0].net
1201
1202
1203 def parse_interface(interface_name):
1204 """
1205 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1206 :param interface_name:
1207 :return:
1208 """
1209
1210 if ':' in interface_name:
1211 vnf_id, vnf_interface = interface_name.split(':')
1212 vnf_sap_docker_name = interface_name.replace(':', '_')
1213 else:
1214 vnf_id = interface_name
1215 vnf_interface = interface_name
1216 vnf_sap_docker_name = interface_name
1217
1218 return vnf_id, vnf_interface, vnf_sap_docker_name
1219
1220
1221 def reset_subnets():
1222 # private subnet definitions for the generated interfaces
1223 # 10.10.xxx.0/24
1224 global SAP_SUBNETS
1225 SAP_SUBNETS = generate_subnets('10.10', 0, subnet_size=50, mask=30)
1226 # 10.20.xxx.0/30
1227 global ELAN_SUBNETS
1228 ELAN_SUBNETS = generate_subnets('10.20', 0, subnet_size=50, mask=24)
1229 # 10.30.xxx.0/30
1230 global ELINE_SUBNETS
1231 ELINE_SUBNETS = generate_subnets('10.30', 0, subnet_size=50, mask=30)
1232
1233
1234 def get_container_name(vnf_id, vdu_id):
1235 return "{}.{}".format(vnf_id, vdu_id)
1236
1237
1238 if __name__ == '__main__':
1239 """
1240 Lets allow to run the API in standalone mode.
1241 """
1242 GK_STANDALONE_MODE = True
1243 logging.getLogger("werkzeug").setLevel(logging.INFO)
1244 start_rest_api("0.0.0.0", 8000)