Added 5GTANGO lightweight lifecycle manager (LLCM).
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from collections import defaultdict
43 import pkg_resources
44 from subprocess import Popen
45 from random import randint
46 import ipaddress
47 import copy
48 import time
49 from functools import reduce
50
51
52 LOG = logging.getLogger("5gtango.llcm")
53 LOG.setLevel(logging.INFO)
54
55
56 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
57 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
58 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
59
60 # Enable Dockerfile build functionality
61 BUILD_DOCKERFILE = False
62
63 # flag to indicate that we run without the emulator (only the bare API for
64 # integration testing)
65 GK_STANDALONE_MODE = False
66
67 # should a new version of an image be pulled even if its available
68 FORCE_PULL = False
69
70 # Automatically deploy SAPs (endpoints) of the service as new containers
71 # Attention: This is not a configuration switch but a global variable!
72 # Don't change its default value.
73 DEPLOY_SAP = False
74
75 # flag to indicate if we use bidirectional forwarding rules in the
76 # automatic chaining process
77 BIDIRECTIONAL_CHAIN = False
78
79 # override the management interfaces in the descriptors with default
80 # docker0 interfaces in the containers
81 USE_DOCKER_MGMT = False
82
83 # automatically deploy uploaded packages (no need to execute son-access
84 # deploy --latest separately)
85 AUTO_DEPLOY = False
86
87 # and also automatically terminate any other running services
88 AUTO_DELETE = False
89
90
91 def generate_subnets(prefix, base, subnet_size=50, mask=24):
92 # Generate a list of ipaddress in subnets
93 r = list()
94 for net in range(base, base + subnet_size):
95 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
96 r.append(ipaddress.ip_network(unicode(subnet)))
97 return r
98
99
100 # private subnet definitions for the generated interfaces
101 # 10.10.xxx.0/24
102 SAP_SUBNETS = generate_subnets('10.10', 0, subnet_size=50, mask=30)
103 # 10.20.xxx.0/30
104 ELAN_SUBNETS = generate_subnets('10.20', 0, subnet_size=50, mask=24)
105 # 10.30.xxx.0/30
106 ELINE_SUBNETS = generate_subnets('10.30', 0, subnet_size=50, mask=30)
107
108 # path to the VNFD for the SAP VNF that is deployed as internal SAP point
109 SAP_VNFD = None
110
111 # Time in seconds to wait for vnf stop scripts to execute fully
112 VNF_STOP_WAIT_TIME = 5
113
114
115 class OnBoardingException(BaseException):
116 pass
117
118
119 class Gatekeeper(object):
120
121 def __init__(self):
122 self.services = dict()
123 self.dcs = dict()
124 self.net = None
125 # used to generate short names for VNFs (Mininet limitation)
126 self.vnf_counter = 0
127 LOG.info("Initialized 5GTANGO LLCM module.")
128
129 def register_service_package(self, service_uuid, service):
130 """
131 register new service package
132 :param service_uuid
133 :param service object
134 """
135 self.services[service_uuid] = service
136 # lets perform all steps needed to onboard the service
137 service.onboard()
138
139 def get_next_vnf_name(self):
140 self.vnf_counter += 1
141 return "vnf%d" % self.vnf_counter
142
143
144 class Service(object):
145 """
146 This class represents a NS uploaded as a *.son package to the
147 dummy gatekeeper.
148 Can have multiple running instances of this service.
149 """
150
151 def __init__(self,
152 service_uuid,
153 package_file_hash,
154 package_file_path):
155 self.uuid = service_uuid
156 self.package_file_hash = package_file_hash
157 self.package_file_path = package_file_path
158 self.package_content_path = os.path.join(
159 CATALOG_FOLDER, "services/%s" % self.uuid)
160 self.manifest = None
161 self.nsd = None
162 self.vnfds = dict()
163 self.saps = dict()
164 self.saps_ext = list()
165 self.saps_int = list()
166 self.local_docker_files = dict()
167 self.remote_docker_image_urls = dict()
168 self.instances = dict()
169 # dict to find the vnf_name for any vnf id
170 self.vnf_id2vnf_name = dict()
171
172 def onboard(self):
173 """
174 Do all steps to prepare this service to be instantiated
175 :return:
176 """
177 # 1. extract the contents of the package and store them in our catalog
178 self._unpack_service_package()
179 # 2. read in all descriptor files
180 self._load_package_descriptor()
181 self._load_nsd()
182 self._load_vnfd()
183 if self.nsd is None:
184 raise OnBoardingException("No NSD found.")
185 if len(self.vnfds) < 1:
186 raise OnBoardingException("No VNFDs found.")
187 if DEPLOY_SAP:
188 self._load_saps()
189 # 3. prepare container images (e.g. download or build Dockerfile)
190 if BUILD_DOCKERFILE:
191 self._load_docker_files()
192 self._build_images_from_dockerfiles()
193 else:
194 self._load_docker_urls()
195 self._pull_predefined_dockerimages()
196 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
197
198 def start_service(self):
199 """
200 This methods creates and starts a new service instance.
201 It computes placements, iterates over all VNFDs, and starts
202 each VNFD as a Docker container in the data center selected
203 by the placement algorithm.
204 :return:
205 """
206 LOG.info("Starting service %r" % self.uuid)
207
208 # 1. each service instance gets a new uuid to identify it
209 instance_uuid = str(uuid.uuid4())
210 # build a instances dict (a bit like a NSR :))
211 self.instances[instance_uuid] = dict()
212 self.instances[instance_uuid]["vnf_instances"] = list()
213
214 # 2. compute placement of this service instance (adds DC names to
215 # VNFDs)
216 if not GK_STANDALONE_MODE:
217 # self._calculate_placement(FirstDcPlacement)
218 self._calculate_placement(RoundRobinDcPlacementWithSAPs)
219 # 3. start all vnfds that we have in the service (except SAPs)
220 for vnf_id in self.vnfds:
221 vnfd = self.vnfds[vnf_id]
222 vnfi = None
223 if not GK_STANDALONE_MODE:
224 vnfi = self._start_vnfd(vnfd, vnf_id)
225 self.instances[instance_uuid]["vnf_instances"].append(vnfi)
226
227 # 4. start all SAPs in the service
228 for sap in self.saps:
229 self._start_sap(self.saps[sap], instance_uuid)
230
231 # 5. Deploy E-Line and E_LAN links
232 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
233 # even if "forwarding_graphs" are not used directly.
234 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
235 vlinks = self.nsd["virtual_links"]
236 # constituent virtual links are not checked
237 # fwd_links = self.nsd["forwarding_graphs"][0]["constituent_virtual_links"]
238 eline_fwd_links = [l for l in vlinks if (
239 l["connectivity_type"] == "E-Line")]
240 elan_fwd_links = [l for l in vlinks if (
241 l["connectivity_type"] == "E-LAN")]
242
243 GK.net.deployed_elines.extend(eline_fwd_links)
244 GK.net.deployed_elans.extend(elan_fwd_links)
245
246 # 5a. deploy E-Line links
247 self._connect_elines(eline_fwd_links, instance_uuid)
248
249 # 5b. deploy E-LAN links
250 self._connect_elans(elan_fwd_links, instance_uuid)
251
252 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
253 # service instance
254 self._trigger_emulator_start_scripts_in_vnfis(
255 self.instances[instance_uuid]["vnf_instances"])
256
257 LOG.info("Service started. Instance id: %r" % instance_uuid)
258 return instance_uuid
259
260 def stop_service(self, instance_uuid):
261 """
262 This method stops a running service instance.
263 It iterates over all VNF instances, stopping them each
264 and removing them from their data center.
265
266 :param instance_uuid: the uuid of the service instance to be stopped
267 """
268 LOG.info("Stopping service %r" % self.uuid)
269 # get relevant information
270 # instance_uuid = str(self.uuid.uuid4())
271 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
272
273 # trigger stop skripts in vnf instances and wait a few seconds for
274 # completion
275 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
276 time.sleep(VNF_STOP_WAIT_TIME)
277
278 for v in vnf_instances:
279 self._stop_vnfi(v)
280
281 for sap_name in self.saps_ext:
282 ext_sap = self.saps[sap_name]
283 target_dc = ext_sap.get("dc")
284 target_dc.removeExternalSAP(sap_name)
285 LOG.info("Stopping the SAP instance: %r in DC %r" %
286 (sap_name, target_dc))
287
288 if not GK_STANDALONE_MODE:
289 # remove placement?
290 # self._remove_placement(RoundRobinPlacement)
291 None
292 # last step: remove the instance from the list of all instances
293 del self.instances[instance_uuid]
294
295 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
296 """
297 Start a single VNFD of this service
298 :param vnfd: vnfd descriptor dict
299 :param vnf_id: unique id of this vnf in the nsd
300 :return:
301 """
302 # the vnf_name refers to the container image to be deployed
303 vnf_name = vnfd.get("name")
304
305 # iterate over all deployment units within each VNFDs
306 for u in vnfd.get("virtual_deployment_units"):
307 # 1. get the name of the docker image to start and the assigned DC
308 if vnf_id not in self.remote_docker_image_urls:
309 raise Exception("No image name for %r found. Abort." % vnf_id)
310 docker_name = self.remote_docker_image_urls.get(vnf_id)
311 target_dc = vnfd.get("dc")
312 # 2. perform some checks to ensure we can start the container
313 assert(docker_name is not None)
314 assert(target_dc is not None)
315 if not self._check_docker_image_exists(docker_name):
316 raise Exception(
317 "Docker image %r not found. Abort." % docker_name)
318
319 # 3. get the resource limits
320 res_req = u.get("resource_requirements")
321 cpu_list = res_req.get("cpu").get("cores")
322 if cpu_list is None:
323 cpu_list = res_req.get("cpu").get("vcpus")
324 if cpu_list is None:
325 cpu_list = "1"
326 cpu_bw = res_req.get("cpu").get("cpu_bw")
327 if not cpu_bw:
328 cpu_bw = 1
329 mem_num = str(res_req.get("memory").get("size"))
330 if len(mem_num) == 0:
331 mem_num = "2"
332 mem_unit = str(res_req.get("memory").get("size_unit"))
333 if str(mem_unit) == 0:
334 mem_unit = "GB"
335 mem_limit = float(mem_num)
336 if mem_unit == "GB":
337 mem_limit = mem_limit * 1024 * 1024 * 1024
338 elif mem_unit == "MB":
339 mem_limit = mem_limit * 1024 * 1024
340 elif mem_unit == "KB":
341 mem_limit = mem_limit * 1024
342 mem_lim = int(mem_limit)
343 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(
344 float(cpu_bw))
345
346 # check if we need to deploy the management ports (defined as
347 # type:management both on in the vnfd and nsd)
348 intfs = vnfd.get("connection_points", [])
349 mgmt_intf_names = []
350 if USE_DOCKER_MGMT:
351 mgmt_intfs = [vnf_id + ':' + intf['id']
352 for intf in intfs if intf.get('type') == 'management']
353 # check if any of these management interfaces are used in a
354 # management-type network in the nsd
355 for nsd_intf_name in mgmt_intfs:
356 vlinks = [l["connection_points_reference"]
357 for l in self.nsd.get("virtual_links", [])]
358 for link in vlinks:
359 if nsd_intf_name in link and self.check_mgmt_interface(
360 link):
361 # this is indeed a management interface and can be
362 # skipped
363 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
364 nsd_intf_name)
365 found_interfaces = [
366 intf for intf in intfs if intf.get('id') == vnf_interface]
367 intfs.remove(found_interfaces[0])
368 mgmt_intf_names.append(vnf_interface)
369
370 # 4. generate the volume paths for the docker container
371 volumes = list()
372 # a volume to extract log files
373 docker_log_path = "/tmp/results/%s/%s" % (self.uuid, vnf_id)
374 LOG.debug("LOG path for vnf %s is %s." % (vnf_id, docker_log_path))
375 if not os.path.exists(docker_log_path):
376 LOG.debug("Creating folder %s" % docker_log_path)
377 os.makedirs(docker_log_path)
378
379 volumes.append(docker_log_path + ":/mnt/share/")
380
381 # 5. do the dc.startCompute(name="foobar") call to run the container
382 # TODO consider flavors, and other annotations
383 # TODO: get all vnf id's from the nsd for this vnfd and use those as dockername
384 # use the vnf_id in the nsd as docker name
385 # so deployed containers can be easily mapped back to the nsd
386 LOG.info("Starting %r as %r in DC %r" %
387 (vnf_name, vnf_id, vnfd.get("dc")))
388 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
389 vnfi = target_dc.startCompute(
390 vnf_id,
391 network=intfs,
392 image=docker_name,
393 flavor_name="small",
394 cpu_quota=cpu_quota,
395 cpu_period=cpu_period,
396 cpuset=cpu_list,
397 mem_limit=mem_lim,
398 volumes=volumes,
399 type=kwargs.get('type', 'docker'))
400
401 # rename the docker0 interfaces (eth0) to the management port name
402 # defined in the VNFD
403 if USE_DOCKER_MGMT:
404 for intf_name in mgmt_intf_names:
405 self._vnf_reconfigure_network(
406 vnfi, 'eth0', new_name=intf_name)
407
408 return vnfi
409
410 def _stop_vnfi(self, vnfi):
411 """
412 Stop a VNF instance.
413
414 :param vnfi: vnf instance to be stopped
415 """
416 # Find the correct datacenter
417 status = vnfi.getStatus()
418 dc = vnfi.datacenter
419
420 # stop the vnfi
421 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
422 (status["name"], dc))
423 dc.stopCompute(status["name"])
424
425 def _get_vnf_instance(self, instance_uuid, vnf_id):
426 """
427 Returns the Docker object for the given VNF id (or Docker name).
428 :param instance_uuid: UUID of the service instance to search in.
429 :param name: VNF name or Docker name. We are fuzzy here.
430 :return:
431 """
432 dn = vnf_id
433 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
434 if vnfi.name == dn:
435 return vnfi
436 LOG.warning("No container with name: {0} found.".format(dn))
437 return None
438
439 @staticmethod
440 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
441 """
442 Reconfigure the network configuration of a specific interface
443 of a running container.
444 :param vnfi: container instance
445 :param if_name: interface name
446 :param net_str: network configuration string, e.g., 1.2.3.4/24
447 :return:
448 """
449
450 # assign new ip address
451 if net_str is not None:
452 intf = vnfi.intf(intf=if_name)
453 if intf is not None:
454 intf.setIP(net_str)
455 LOG.debug("Reconfigured network of %s:%s to %r" %
456 (vnfi.name, if_name, net_str))
457 else:
458 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
459 vnfi.name, if_name))
460
461 if new_name is not None:
462 vnfi.cmd('ip link set', if_name, 'down')
463 vnfi.cmd('ip link set', if_name, 'name', new_name)
464 vnfi.cmd('ip link set', new_name, 'up')
465 LOG.debug("Reconfigured interface name of %s:%s to %s" %
466 (vnfi.name, if_name, new_name))
467
468 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
469 for vnfi in vnfi_list:
470 config = vnfi.dcinfo.get("Config", dict())
471 env = config.get("Env", list())
472 for env_var in env:
473 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
474 LOG.debug("%r = %r" % (var, cmd))
475 if var == "SON_EMU_CMD":
476 LOG.info("Executing entry point script in %r: %r" %
477 (vnfi.name, cmd))
478 # execute command in new thread to ensure that GK is not
479 # blocked by VNF
480 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
481 t.daemon = True
482 t.start()
483
484 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
485 for vnfi in vnfi_list:
486 config = vnfi.dcinfo.get("Config", dict())
487 env = config.get("Env", list())
488 for env_var in env:
489 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
490 if var == "SON_EMU_CMD_STOP":
491 LOG.info("Executing stop script in %r: %r" %
492 (vnfi.name, cmd))
493 # execute command in new thread to ensure that GK is not
494 # blocked by VNF
495 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
496 t.daemon = True
497 t.start()
498
499 def _unpack_service_package(self):
500 """
501 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
502 """
503 LOG.info("Unzipping: %r" % self.package_file_path)
504 with zipfile.ZipFile(self.package_file_path, "r") as z:
505 z.extractall(self.package_content_path)
506
507 def _load_package_descriptor(self):
508 """
509 Load the main package descriptor YAML and keep it as dict.
510 :return:
511 """
512 self.manifest = load_yaml(
513 os.path.join(
514 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
515
516 def _load_nsd(self):
517 """
518 Load the entry NSD YAML and keep it as dict.
519 :return:
520 """
521 if "package_content" in self.manifest:
522 nsd_path = None
523 for f in self.manifest.get("package_content"):
524 if f.get("content-type") == "application/vnd.5gtango.nsd":
525 nsd_path = os.path.join(
526 self.package_content_path,
527 make_relative_path(f.get("source")))
528 break # always use the first NSD for now
529 if nsd_path is None:
530 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
531 self.nsd = load_yaml(nsd_path)
532 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
533 # create dict to find the vnf_name for any vnf id
534 self.vnf_id2vnf_name = defaultdict(lambda: "NotExistingNode",
535 reduce(lambda x, y: dict(x, **y),
536 map(lambda d: {d["vnf_id"]: d["vnf_name"]},
537 self.nsd["network_functions"])))
538 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
539 else:
540 raise OnBoardingException(
541 "No 'package_content' section in package manifest:\n{}"
542 .format(self.manifest))
543
544 def _load_vnfd(self):
545 """
546 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
547 :return:
548 """
549
550 # first make a list of all the vnfds in the package
551 vnfd_set = dict()
552 if "package_content" in self.manifest:
553 for pc in self.manifest.get("package_content"):
554 if pc.get(
555 "content-type") == "application/vnd.5gtango.vnfd":
556 vnfd_path = os.path.join(
557 self.package_content_path,
558 make_relative_path(pc.get("source")))
559 vnfd = load_yaml(vnfd_path)
560 vnfd_set[vnfd.get("name")] = vnfd
561 if len(vnfd_set) < 1:
562 raise OnBoardingException("No VNFDs found.")
563 # then link each vnf_id in the nsd to its vnfd
564 for vnf_id in self.vnf_id2vnf_name:
565 vnf_name = self.vnf_id2vnf_name[vnf_id]
566 self.vnfds[vnf_id] = vnfd_set[vnf_name]
567 LOG.debug("Loaded VNFD: {0} id: {1}".format(vnf_name, vnf_id))
568
569 def _load_saps(self):
570 # create list of all SAPs
571 # check if we need to deploy management ports
572 if USE_DOCKER_MGMT:
573 SAPs = [p for p in self.nsd["connection_points"]
574 if 'management' not in p.get('type')]
575 else:
576 SAPs = [p for p in self.nsd["connection_points"]]
577
578 for sap in SAPs:
579 # endpoint needed in this service
580 sap_id, sap_interface, sap_docker_name = parse_interface(sap['id'])
581 # make sure SAP has type set (default internal)
582 sap["type"] = sap.get("type", 'internal')
583
584 # Each Service Access Point (connection_point) in the nsd is an IP
585 # address on the host
586 if sap["type"] == "external":
587 # add to vnfds to calculate placement later on
588 sap_net = SAP_SUBNETS.pop(0)
589 self.saps[sap_docker_name] = {
590 "name": sap_docker_name, "type": "external", "net": sap_net}
591 # add SAP vnf to list in the NSD so it is deployed later on
592 # each SAP gets a unique VNFD and vnf_id in the NSD and custom
593 # type (only defined in the dummygatekeeper)
594 self.nsd["network_functions"].append(
595 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_ext"})
596
597 # Each Service Access Point (connection_point) in the nsd is
598 # getting its own container (default)
599 elif sap["type"] == "internal" or sap["type"] == "management":
600 # add SAP to self.vnfds
601 if SAP_VNFD is None:
602 sapfile = pkg_resources.resource_filename(
603 __name__, "sap_vnfd.yml")
604 else:
605 sapfile = SAP_VNFD
606 sap_vnfd = load_yaml(sapfile)
607 sap_vnfd["connection_points"][0]["id"] = sap_interface
608 sap_vnfd["name"] = sap_docker_name
609 sap_vnfd["type"] = "internal"
610 # add to vnfds to calculate placement later on and deploy
611 self.saps[sap_docker_name] = sap_vnfd
612 # add SAP vnf to list in the NSD so it is deployed later on
613 # each SAP get a unique VNFD and vnf_id in the NSD
614 self.nsd["network_functions"].append(
615 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_int"})
616
617 LOG.debug("Loaded SAP: name: {0}, type: {1}".format(
618 sap_docker_name, sap['type']))
619
620 # create sap lists
621 self.saps_ext = [self.saps[sap]['name']
622 for sap in self.saps if self.saps[sap]["type"] == "external"]
623 self.saps_int = [self.saps[sap]['name']
624 for sap in self.saps if self.saps[sap]["type"] == "internal"]
625
626 def _start_sap(self, sap, instance_uuid):
627 if not DEPLOY_SAP:
628 return
629
630 LOG.info('start SAP: {0} ,type: {1}'.format(sap['name'], sap['type']))
631 if sap["type"] == "internal":
632 vnfi = None
633 if not GK_STANDALONE_MODE:
634 vnfi = self._start_vnfd(sap, sap['name'], type='sap_int')
635 self.instances[instance_uuid]["vnf_instances"].append(vnfi)
636
637 elif sap["type"] == "external":
638 target_dc = sap.get("dc")
639 # add interface to dc switch
640 target_dc.attachExternalSAP(sap['name'], sap['net'])
641
642 def _connect_elines(self, eline_fwd_links, instance_uuid):
643 """
644 Connect all E-LINE links in the NSD
645 :param eline_fwd_links: list of E-LINE links in the NSD
646 :param: instance_uuid of the service
647 :return:
648 """
649 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
650 # eg. different services get a unique cookie for their flowrules
651 cookie = 1
652 for link in eline_fwd_links:
653 # check if we need to deploy this link when its a management link:
654 if USE_DOCKER_MGMT:
655 if self.check_mgmt_interface(
656 link["connection_points_reference"]):
657 continue
658
659 src_id, src_if_name, src_sap_id = parse_interface(
660 link["connection_points_reference"][0])
661 dst_id, dst_if_name, dst_sap_id = parse_interface(
662 link["connection_points_reference"][1])
663
664 setChaining = False
665 # check if there is a SAP in the link and chain everything together
666 if src_sap_id in self.saps and dst_sap_id in self.saps:
667 LOG.info(
668 '2 SAPs cannot be chained together : {0} - {1}'.format(src_sap_id, dst_sap_id))
669 continue
670
671 elif src_sap_id in self.saps_ext:
672 src_id = src_sap_id
673 # set intf name to None so the chaining function will choose
674 # the first one
675 src_if_name = None
676 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
677 if dst_vnfi is not None:
678 # choose first ip address in sap subnet
679 sap_net = self.saps[src_sap_id]['net']
680 sap_ip = "{0}/{1}".format(str(sap_net[2]),
681 sap_net.prefixlen)
682 self._vnf_reconfigure_network(
683 dst_vnfi, dst_if_name, sap_ip)
684 setChaining = True
685
686 elif dst_sap_id in self.saps_ext:
687 dst_id = dst_sap_id
688 # set intf name to None so the chaining function will choose
689 # the first one
690 dst_if_name = None
691 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
692 if src_vnfi is not None:
693 sap_net = self.saps[dst_sap_id]['net']
694 sap_ip = "{0}/{1}".format(str(sap_net[2]),
695 sap_net.prefixlen)
696 self._vnf_reconfigure_network(
697 src_vnfi, src_if_name, sap_ip)
698 setChaining = True
699
700 # Link between 2 VNFs
701 else:
702 # make sure we use the correct sap vnf name
703 if src_sap_id in self.saps_int:
704 src_id = src_sap_id
705 if dst_sap_id in self.saps_int:
706 dst_id = dst_sap_id
707 # re-configure the VNFs IP assignment and ensure that a new
708 # subnet is used for each E-Link
709 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
710 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
711 if src_vnfi is not None and dst_vnfi is not None:
712 eline_net = ELINE_SUBNETS.pop(0)
713 ip1 = "{0}/{1}".format(str(eline_net[1]),
714 eline_net.prefixlen)
715 ip2 = "{0}/{1}".format(str(eline_net[2]),
716 eline_net.prefixlen)
717 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
718 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
719 setChaining = True
720
721 # Set the chaining
722 if setChaining:
723 GK.net.setChain(
724 src_id, dst_id,
725 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
726 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
727 LOG.debug(
728 "Setting up E-Line link. (%s:%s) -> (%s:%s)" % (
729 src_id, src_if_name, dst_id, dst_if_name))
730
731 def _connect_elans(self, elan_fwd_links, instance_uuid):
732 """
733 Connect all E-LAN links in the NSD
734 :param elan_fwd_links: list of E-LAN links in the NSD
735 :param: instance_uuid of the service
736 :return:
737 """
738 for link in elan_fwd_links:
739 # check if we need to deploy this link when its a management link:
740 if USE_DOCKER_MGMT:
741 if self.check_mgmt_interface(
742 link["connection_points_reference"]):
743 continue
744
745 elan_vnf_list = []
746 # check if an external SAP is in the E-LAN (then a subnet is
747 # already defined)
748 intfs_elan = [intf for intf in link["connection_points_reference"]]
749 lan_sap = self.check_ext_saps(intfs_elan)
750 if lan_sap:
751 lan_net = self.saps[lan_sap]['net']
752 lan_hosts = list(lan_net.hosts())
753 else:
754 lan_net = ELAN_SUBNETS.pop(0)
755 lan_hosts = list(lan_net.hosts())
756
757 # generate lan ip address for all interfaces except external SAPs
758 for intf in link["connection_points_reference"]:
759
760 # skip external SAPs, they already have an ip
761 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
762 intf)
763 if vnf_sap_docker_name in self.saps_ext:
764 elan_vnf_list.append(
765 {'name': vnf_sap_docker_name, 'interface': vnf_interface})
766 continue
767
768 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
769 lan_net.prefixlen)
770 vnf_id, intf_name, vnf_sap_id = parse_interface(intf)
771
772 # make sure we use the correct sap vnf name
773 src_docker_name = vnf_id
774 if vnf_sap_id in self.saps_int:
775 src_docker_name = vnf_sap_id
776 vnf_id = vnf_sap_id
777
778 LOG.debug(
779 "Setting up E-LAN interface. (%s:%s) -> %s" % (
780 vnf_id, intf_name, ip_address))
781
782 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
783 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
784 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is necessary.
785 vnfi = self._get_vnf_instance(instance_uuid, vnf_id)
786 if vnfi is not None:
787 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
788 # add this vnf and interface to the E-LAN for tagging
789 elan_vnf_list.append(
790 {'name': src_docker_name, 'interface': intf_name})
791
792 # install the VLAN tags for this E-LAN
793 GK.net.setLAN(elan_vnf_list)
794
795 def _load_docker_files(self):
796 """
797 Get all paths to Dockerfiles from VNFDs and store them in dict.
798 :return:
799 """
800 for k, v in self.vnfds.iteritems():
801 for vu in v.get("virtual_deployment_units"):
802 if vu.get("vm_image_format") == "docker":
803 vm_image = vu.get("vm_image")
804 docker_path = os.path.join(
805 self.package_content_path,
806 make_relative_path(vm_image))
807 self.local_docker_files[k] = docker_path
808 LOG.debug("Found Dockerfile (%r): %r" % (k, docker_path))
809
810 def _load_docker_urls(self):
811 """
812 Get all URLs to pre-build docker images in some repo.
813 :return:
814 """
815 # also merge sap dicts, because internal saps also need a docker
816 # container
817 all_vnfs = self.vnfds.copy()
818 all_vnfs.update(self.saps)
819
820 for k, v in all_vnfs.iteritems():
821 for vu in v.get("virtual_deployment_units", {}):
822 if vu.get("vm_image_format") == "docker":
823 url = vu.get("vm_image")
824 if url is not None:
825 url = url.replace("http://", "")
826 self.remote_docker_image_urls[k] = url
827 LOG.debug("Found Docker image URL (%r): %r" %
828 (k, self.remote_docker_image_urls[k]))
829
830 def _build_images_from_dockerfiles(self):
831 """
832 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
833 """
834 if GK_STANDALONE_MODE:
835 return # do not build anything in standalone mode
836 dc = DockerClient()
837 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
838 self.local_docker_files))
839 for k, v in self.local_docker_files.iteritems():
840 for line in dc.build(path=v.replace(
841 "Dockerfile", ""), tag=k, rm=False, nocache=False):
842 LOG.debug("DOCKER BUILD: %s" % line)
843 LOG.info("Docker image created: %s" % k)
844
845 def _pull_predefined_dockerimages(self):
846 """
847 If the package contains URLs to pre-build Docker images, we download them with this method.
848 """
849 dc = DockerClient()
850 for url in self.remote_docker_image_urls.itervalues():
851 # only pull if not present (speedup for development)
852 if not FORCE_PULL:
853 if len(dc.images.list(name=url)) > 0:
854 LOG.debug("Image %r present. Skipping pull." % url)
855 continue
856 LOG.info("Pulling image: %r" % url)
857 # this seems to fail with latest docker api version 2.0.2
858 # dc.images.pull(url,
859 # insecure_registry=True)
860 # using docker cli instead
861 cmd = ["docker",
862 "pull",
863 url,
864 ]
865 Popen(cmd).wait()
866
867 def _check_docker_image_exists(self, image_name):
868 """
869 Query the docker service and check if the given image exists
870 :param image_name: name of the docker image
871 :return:
872 """
873 return len(DockerClient().images.list(name=image_name)) > 0
874
875 def _calculate_placement(self, algorithm):
876 """
877 Do placement by adding the a field "dc" to
878 each VNFD that points to one of our
879 data center objects known to the gatekeeper.
880 """
881 assert(len(self.vnfds) > 0)
882 assert(len(GK.dcs) > 0)
883 # instantiate algorithm an place
884 p = algorithm()
885 p.place(self.nsd, self.vnfds, self.saps, GK.dcs)
886 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
887 # lets print the placement result
888 for name, vnfd in self.vnfds.iteritems():
889 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
890 for sap in self.saps:
891 sap_dict = self.saps[sap]
892 LOG.info("Placed SAP %r on DC %r" % (sap, str(sap_dict.get("dc"))))
893
894 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
895 """
896 Calculate cpu period and quota for CFS
897 :param cpu_time_percentage: percentage of overall CPU to be used
898 :return: cpu_period, cpu_quota
899 """
900 if cpu_time_percentage is None:
901 return -1, -1
902 if cpu_time_percentage < 0:
903 return -1, -1
904 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
905 # Attention minimum cpu_quota is 1ms (micro)
906 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
907 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
908 (cpu_period, cpu_time_percentage))
909 # calculate the fraction of cpu time for this container
910 cpu_quota = cpu_period * cpu_time_percentage
911 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
912 # idea why
913 if cpu_quota < 1000:
914 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
915 cpu_quota = 1000
916 LOG.warning("Increased CPU quota to avoid system error.")
917 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
918 (cpu_period, cpu_quota))
919 return int(cpu_period), int(cpu_quota)
920
921 def check_ext_saps(self, intf_list):
922 # check if the list of interfacs contains an external SAP
923 saps_ext = [self.saps[sap]['name']
924 for sap in self.saps if self.saps[sap]["type"] == "external"]
925 for intf_name in intf_list:
926 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
927 intf_name)
928 if vnf_sap_docker_name in saps_ext:
929 return vnf_sap_docker_name
930
931 def check_mgmt_interface(self, intf_list):
932 SAPs_mgmt = [p.get('id') for p in self.nsd["connection_points"]
933 if 'management' in p.get('type')]
934 for intf_name in intf_list:
935 if intf_name in SAPs_mgmt:
936 return True
937
938
939 """
940 Some (simple) placement algorithms
941 """
942
943
944 class FirstDcPlacement(object):
945 """
946 Placement: Always use one and the same data center from the GK.dcs dict.
947 """
948
949 def place(self, nsd, vnfds, saps, dcs):
950 for id, vnfd in vnfds.iteritems():
951 vnfd["dc"] = list(dcs.itervalues())[0]
952
953
954 class RoundRobinDcPlacement(object):
955 """
956 Placement: Distribute VNFs across all available DCs in a round robin fashion.
957 """
958
959 def place(self, nsd, vnfds, saps, dcs):
960 c = 0
961 dcs_list = list(dcs.itervalues())
962 for id, vnfd in vnfds.iteritems():
963 vnfd["dc"] = dcs_list[c % len(dcs_list)]
964 c += 1 # inc. c to use next DC
965
966
967 class RoundRobinDcPlacementWithSAPs(object):
968 """
969 Placement: Distribute VNFs across all available DCs in a round robin fashion,
970 every SAP is instantiated on the same DC as the connected VNF.
971 """
972
973 def place(self, nsd, vnfds, saps, dcs):
974
975 # place vnfs
976 c = 0
977 dcs_list = list(dcs.itervalues())
978 for id, vnfd in vnfds.iteritems():
979 vnfd["dc"] = dcs_list[c % len(dcs_list)]
980 c += 1 # inc. c to use next DC
981
982 # place SAPs
983 vlinks = nsd.get("virtual_links", [])
984 eline_fwd_links = [l for l in vlinks if (
985 l["connectivity_type"] == "E-Line")]
986 elan_fwd_links = [l for l in vlinks if (
987 l["connectivity_type"] == "E-LAN")]
988
989 # SAPs on E-Line links are placed on the same DC as the VNF on the
990 # E-Line
991 for link in eline_fwd_links:
992 src_id, src_if_name, src_sap_id = parse_interface(
993 link["connection_points_reference"][0])
994 dst_id, dst_if_name, dst_sap_id = parse_interface(
995 link["connection_points_reference"][1])
996
997 # check if there is a SAP in the link
998 if src_sap_id in saps:
999 # get dc where connected vnf is mapped to
1000 dc = vnfds[dst_id]['dc']
1001 saps[src_sap_id]['dc'] = dc
1002
1003 if dst_sap_id in saps:
1004 # get dc where connected vnf is mapped to
1005 dc = vnfds[src_id]['dc']
1006 saps[dst_sap_id]['dc'] = dc
1007
1008 # SAPs on E-LANs are placed on a random DC
1009 dcs_list = list(dcs.itervalues())
1010 dc_len = len(dcs_list)
1011 for link in elan_fwd_links:
1012 for intf in link["connection_points_reference"]:
1013 # find SAP interfaces
1014 intf_id, intf_name, intf_sap_id = parse_interface(intf)
1015 if intf_sap_id in saps:
1016 dc = dcs_list[randint(0, dc_len - 1)]
1017 saps[intf_sap_id]['dc'] = dc
1018
1019
1020 """
1021 Resource definitions and API endpoints
1022 """
1023
1024
1025 class Packages(fr.Resource):
1026
1027 def post(self):
1028 """
1029 Upload a *.son service package to the dummy gatekeeper.
1030
1031 We expect request with a *.son file and store it in UPLOAD_FOLDER
1032 :return: UUID
1033 """
1034 try:
1035 # get file contents
1036 LOG.info("POST /packages called")
1037 # lets search for the package in the request
1038 is_file_object = False # make API more robust: file can be in data or in files field
1039 if "package" in request.files:
1040 son_file = request.files["package"]
1041 is_file_object = True
1042 elif len(request.data) > 0:
1043 son_file = request.data
1044 else:
1045 return {"service_uuid": None, "size": 0, "sha1": None,
1046 "error": "upload failed. file not found."}, 500
1047 # generate a uuid to reference this package
1048 service_uuid = str(uuid.uuid4())
1049 file_hash = hashlib.sha1(str(son_file)).hexdigest()
1050 # ensure that upload folder exists
1051 ensure_dir(UPLOAD_FOLDER)
1052 upload_path = os.path.join(UPLOAD_FOLDER, "%s.son" % service_uuid)
1053 # store *.son file to disk
1054 if is_file_object:
1055 son_file.save(upload_path)
1056 else:
1057 with open(upload_path, 'wb') as f:
1058 f.write(son_file)
1059 size = os.path.getsize(upload_path)
1060
1061 # first stop and delete any other running services
1062 if AUTO_DELETE:
1063 service_list = copy.copy(GK.services)
1064 for service_uuid in service_list:
1065 instances_list = copy.copy(
1066 GK.services[service_uuid].instances)
1067 for instance_uuid in instances_list:
1068 # valid service and instance UUID, stop service
1069 GK.services.get(service_uuid).stop_service(
1070 instance_uuid)
1071 LOG.info("service instance with uuid %r stopped." %
1072 instance_uuid)
1073
1074 # create a service object and register it
1075 s = Service(service_uuid, file_hash, upload_path)
1076 GK.register_service_package(service_uuid, s)
1077
1078 # automatically deploy the service
1079 if AUTO_DEPLOY:
1080 # ok, we have a service uuid, lets start the service
1081 reset_subnets()
1082 GK.services.get(service_uuid).start_service()
1083
1084 # generate the JSON result
1085 return {"service_uuid": service_uuid, "size": size,
1086 "sha1": file_hash, "error": None}, 201
1087 except BaseException:
1088 LOG.exception("Service package upload failed:")
1089 return {"service_uuid": None, "size": 0,
1090 "sha1": None, "error": "upload failed"}, 500
1091
1092 def get(self):
1093 """
1094 Return a list of UUID's of uploaded service packages.
1095 :return: dict/list
1096 """
1097 LOG.info("GET /packages")
1098 return {"service_uuid_list": list(GK.services.iterkeys())}
1099
1100
1101 class Instantiations(fr.Resource):
1102
1103 def post(self):
1104 """
1105 Instantiate a service specified by its UUID.
1106 Will return a new UUID to identify the running service instance.
1107 :return: UUID
1108 """
1109 LOG.info("POST /instantiations (or /requests) called")
1110 # try to extract the service uuid from the request
1111 json_data = request.get_json(force=True)
1112 service_uuid = json_data.get("service_uuid")
1113
1114 # lets be a bit fuzzy here to make testing easier
1115 if (service_uuid is None or service_uuid ==
1116 "latest") and len(GK.services) > 0:
1117 # if we don't get a service uuid, we simple start the first service
1118 # in the list
1119 service_uuid = list(GK.services.iterkeys())[0]
1120 if service_uuid in GK.services:
1121 # ok, we have a service uuid, lets start the service
1122 service_instance_uuid = GK.services.get(
1123 service_uuid).start_service()
1124 return {"service_instance_uuid": service_instance_uuid}, 201
1125 return "Service not found", 404
1126
1127 def get(self):
1128 """
1129 Returns a list of UUIDs containing all running services.
1130 :return: dict / list
1131 """
1132 LOG.info("GET /instantiations")
1133 return {"service_instantiations_list": [
1134 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
1135
1136 def delete(self):
1137 """
1138 Stops a running service specified by its service and instance UUID.
1139 """
1140 # try to extract the service and instance UUID from the request
1141 json_data = request.get_json(force=True)
1142 service_uuid = json_data.get("service_uuid")
1143 instance_uuid = json_data.get("service_instance_uuid")
1144
1145 # try to be fuzzy
1146 if service_uuid is None and len(GK.services) > 0:
1147 # if we don't get a service uuid, we simply stop the last service
1148 # in the list
1149 service_uuid = list(GK.services.iterkeys())[0]
1150 if instance_uuid is None and len(
1151 GK.services[service_uuid].instances) > 0:
1152 instance_uuid = list(
1153 GK.services[service_uuid].instances.iterkeys())[0]
1154
1155 if service_uuid in GK.services and instance_uuid in GK.services[service_uuid].instances:
1156 # valid service and instance UUID, stop service
1157 GK.services.get(service_uuid).stop_service(instance_uuid)
1158 return "service instance with uuid %r stopped." % instance_uuid, 200
1159 return "Service not found", 404
1160
1161
1162 class Exit(fr.Resource):
1163
1164 def put(self):
1165 """
1166 Stop the running Containernet instance regardless of data transmitted
1167 """
1168 list(GK.dcs.values())[0].net.stop()
1169
1170
1171 def initialize_GK():
1172 global GK
1173 GK = Gatekeeper()
1174
1175
1176 # create a single, global GK object
1177 GK = None
1178 initialize_GK()
1179 # setup Flask
1180 app = Flask(__name__)
1181 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1182 api = fr.Api(app)
1183 # define endpoints
1184 api.add_resource(Packages, '/packages', '/api/v2/packages')
1185 api.add_resource(Instantiations, '/instantiations',
1186 '/api/v2/instantiations', '/api/v2/requests')
1187 api.add_resource(Exit, '/emulator/exit')
1188
1189
1190 def start_rest_api(host, port, datacenters=dict()):
1191 GK.dcs = datacenters
1192 GK.net = get_dc_network()
1193 # start the Flask server (not the best performance but ok for our use case)
1194 app.run(host=host,
1195 port=port,
1196 debug=True,
1197 use_reloader=False # this is needed to run Flask in a non-main thread
1198 )
1199
1200
1201 def ensure_dir(name):
1202 if not os.path.exists(name):
1203 os.makedirs(name)
1204
1205
1206 def load_yaml(path):
1207 with open(path, "r") as f:
1208 try:
1209 r = yaml.load(f)
1210 except yaml.YAMLError as exc:
1211 LOG.exception("YAML parse error: %r" % str(exc))
1212 r = dict()
1213 return r
1214
1215
1216 def make_relative_path(path):
1217 if path.startswith("file://"):
1218 path = path.replace("file://", "", 1)
1219 if path.startswith("/"):
1220 path = path.replace("/", "", 1)
1221 return path
1222
1223
1224 def get_dc_network():
1225 """
1226 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1227 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1228 :return:
1229 """
1230 assert (len(GK.dcs) > 0)
1231 return GK.dcs.values()[0].net
1232
1233
1234 def parse_interface(interface_name):
1235 """
1236 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1237 :param interface_name:
1238 :return:
1239 """
1240
1241 if ':' in interface_name:
1242 vnf_id, vnf_interface = interface_name.split(':')
1243 vnf_sap_docker_name = interface_name.replace(':', '_')
1244 else:
1245 vnf_id = interface_name
1246 vnf_interface = interface_name
1247 vnf_sap_docker_name = interface_name
1248
1249 return vnf_id, vnf_interface, vnf_sap_docker_name
1250
1251
1252 def reset_subnets():
1253 # private subnet definitions for the generated interfaces
1254 # 10.10.xxx.0/24
1255 global SAP_SUBNETS
1256 SAP_SUBNETS = generate_subnets('10.10', 0, subnet_size=50, mask=30)
1257 # 10.20.xxx.0/30
1258 global ELAN_SUBNETS
1259 ELAN_SUBNETS = generate_subnets('10.20', 0, subnet_size=50, mask=24)
1260 # 10.30.xxx.0/30
1261 global ELINE_SUBNETS
1262 ELINE_SUBNETS = generate_subnets('10.30', 0, subnet_size=50, mask=30)
1263
1264
1265 if __name__ == '__main__':
1266 """
1267 Lets allow to run the API in standalone mode.
1268 """
1269 GK_STANDALONE_MODE = True
1270 logging.getLogger("werkzeug").setLevel(logging.INFO)
1271 start_rest_api("0.0.0.0", 8000)