747a44e6aecf378a3b228249e933f9d14bb14b91
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from collections import defaultdict
43 import pkg_resources
44 from subprocess import Popen
45 import ipaddress
46 import copy
47 import time
48 from functools import reduce
49
50
51 LOG = logging.getLogger("5gtango.llcm")
52 LOG.setLevel(logging.INFO)
53
54
55 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
56 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
57 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
58
59 # Enable Dockerfile build functionality
60 BUILD_DOCKERFILE = False
61
62 # flag to indicate that we run without the emulator (only the bare API for
63 # integration testing)
64 GK_STANDALONE_MODE = False
65
66 # should a new version of an image be pulled even if its available
67 FORCE_PULL = False
68
69 # Automatically deploy SAPs (endpoints) of the service as new containers
70 # Attention: This is not a configuration switch but a global variable!
71 # Don't change its default value.
72 DEPLOY_SAP = False
73
74 # flag to indicate if we use bidirectional forwarding rules in the
75 # automatic chaining process
76 BIDIRECTIONAL_CHAIN = True
77
78 # override the management interfaces in the descriptors with default
79 # docker0 interfaces in the containers
80 USE_DOCKER_MGMT = False
81
82 # automatically deploy uploaded packages (no need to execute son-access
83 # deploy --latest separately)
84 AUTO_DEPLOY = False
85
86 # and also automatically terminate any other running services
87 AUTO_DELETE = False
88
89
90 def generate_subnets(prefix, base, subnet_size=50, mask=24):
91 # Generate a list of ipaddress in subnets
92 r = list()
93 for net in range(base, base + subnet_size):
94 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
95 r.append(ipaddress.ip_network(unicode(subnet)))
96 return r
97
98
99 # private subnet definitions for the generated interfaces
100 # 99.0.xxx.0/24
101 SAP_SUBNETS = generate_subnets('99.0', 0, subnet_size=50, mask=24)
102 # 30.0.xxx.0/24
103 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
104 # 20.0.xxx.0/24
105 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
106
107 # path to the VNFD for the SAP VNF that is deployed as internal SAP point
108 SAP_VNFD = None
109
110 # Time in seconds to wait for vnf stop scripts to execute fully
111 VNF_STOP_WAIT_TIME = 5
112
113
114 class OnBoardingException(BaseException):
115 pass
116
117
118 class Gatekeeper(object):
119
120 def __init__(self):
121 self.services = dict()
122 self.dcs = dict()
123 self.net = None
124 # used to generate short names for VNFs (Mininet limitation)
125 self.vnf_counter = 0
126 LOG.info("Initialized 5GTANGO LLCM module.")
127
128 def register_service_package(self, service_uuid, service):
129 """
130 register new service package
131 :param service_uuid
132 :param service object
133 """
134 self.services[service_uuid] = service
135 # lets perform all steps needed to onboard the service
136 service.onboard()
137
138 def get_next_vnf_name(self):
139 self.vnf_counter += 1
140 return "vnf%d" % self.vnf_counter
141
142
143 class Service(object):
144 """
145 This class represents a NS uploaded as a *.son package to the
146 dummy gatekeeper.
147 Can have multiple running instances of this service.
148 """
149
150 def __init__(self,
151 service_uuid,
152 package_file_hash,
153 package_file_path):
154 self.uuid = service_uuid
155 self.package_file_hash = package_file_hash
156 self.package_file_path = package_file_path
157 self.package_content_path = os.path.join(
158 CATALOG_FOLDER, "services/%s" % self.uuid)
159 self.manifest = None
160 self.nsd = None
161 self.vnfds = dict()
162 self.saps = dict()
163 self.saps_ext = list()
164 self.saps_int = list()
165 self.local_docker_files = dict()
166 self.remote_docker_image_urls = dict()
167 self.instances = dict()
168 # dict to find the vnf_name for any vnf id
169 self.vnf_id2vnf_name = dict()
170
171 def onboard(self):
172 """
173 Do all steps to prepare this service to be instantiated
174 :return:
175 """
176 # 1. extract the contents of the package and store them in our catalog
177 self._unpack_service_package()
178 # 2. read in all descriptor files
179 self._load_package_descriptor()
180 self._load_nsd()
181 self._load_vnfd()
182 if self.nsd is None:
183 raise OnBoardingException("No NSD found.")
184 if len(self.vnfds) < 1:
185 raise OnBoardingException("No VNFDs found.")
186 if DEPLOY_SAP:
187 self._load_saps()
188 # 3. prepare container images (e.g. download or build Dockerfile)
189 if BUILD_DOCKERFILE:
190 self._load_docker_files()
191 self._build_images_from_dockerfiles()
192 else:
193 self._load_docker_urls()
194 self._pull_predefined_dockerimages()
195 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
196
197 def start_service(self):
198 """
199 This methods creates and starts a new service instance.
200 It computes placements, iterates over all VNFDs, and starts
201 each VNFD as a Docker container in the data center selected
202 by the placement algorithm.
203 :return:
204 """
205 LOG.info("Starting service %r" % self.uuid)
206
207 # 1. each service instance gets a new uuid to identify it
208 instance_uuid = str(uuid.uuid4())
209 # build a instances dict (a bit like a NSR :))
210 self.instances[instance_uuid] = dict()
211 self.instances[instance_uuid]["vnf_instances"] = list()
212
213 # 2. compute placement of this service instance (adds DC names to
214 # VNFDs)
215 if not GK_STANDALONE_MODE:
216 # self._calculate_placement(FirstDcPlacement)
217 self._calculate_placement(RoundRobinDcPlacement)
218 # 3. start all vnfds that we have in the service (except SAPs)
219 for vnf_id in self.vnfds:
220 vnfd = self.vnfds[vnf_id]
221 vnfi = None
222 if not GK_STANDALONE_MODE:
223 vnfi = self._start_vnfd(vnfd, vnf_id)
224 self.instances[instance_uuid]["vnf_instances"].append(vnfi)
225
226 # 4. start all SAPs in the service
227 for sap in self.saps:
228 self._start_sap(self.saps[sap], instance_uuid)
229
230 # 5. Deploy E-Line and E_LAN links
231 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
232 # even if "forwarding_graphs" are not used directly.
233 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
234 vlinks = self.nsd["virtual_links"]
235 # constituent virtual links are not checked
236 # fwd_links = self.nsd["forwarding_graphs"][0]["constituent_virtual_links"]
237 eline_fwd_links = [l for l in vlinks if (
238 l["connectivity_type"] == "E-Line")]
239 elan_fwd_links = [l for l in vlinks if (
240 l["connectivity_type"] == "E-LAN" or
241 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
242
243 GK.net.deployed_elines.extend(eline_fwd_links)
244 GK.net.deployed_elans.extend(elan_fwd_links)
245
246 # 5a. deploy E-Line links
247 self._connect_elines(eline_fwd_links, instance_uuid)
248
249 # 5b. deploy E-LAN links
250 self._connect_elans(elan_fwd_links, instance_uuid)
251
252 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
253 # service instance
254 self._trigger_emulator_start_scripts_in_vnfis(
255 self.instances[instance_uuid]["vnf_instances"])
256
257 LOG.info("Service started. Instance id: %r" % instance_uuid)
258 return instance_uuid
259
260 def stop_service(self, instance_uuid):
261 """
262 This method stops a running service instance.
263 It iterates over all VNF instances, stopping them each
264 and removing them from their data center.
265
266 :param instance_uuid: the uuid of the service instance to be stopped
267 """
268 LOG.info("Stopping service %r" % self.uuid)
269 # get relevant information
270 # instance_uuid = str(self.uuid.uuid4())
271 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
272
273 # trigger stop skripts in vnf instances and wait a few seconds for
274 # completion
275 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
276 time.sleep(VNF_STOP_WAIT_TIME)
277
278 for v in vnf_instances:
279 self._stop_vnfi(v)
280
281 for sap_name in self.saps_ext:
282 ext_sap = self.saps[sap_name]
283 target_dc = ext_sap.get("dc")
284 target_dc.removeExternalSAP(sap_name)
285 LOG.info("Stopping the SAP instance: %r in DC %r" %
286 (sap_name, target_dc))
287
288 # last step: remove the instance from the list of all instances
289 del self.instances[instance_uuid]
290
291 def _get_resource_limits(self, deployment_unit):
292 """
293 Extract resource limits from deployment units.
294 """
295 # defaults
296 cpu_list = "1"
297 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
298 mem_limit = 0
299 # update from descriptor
300 if "resource_requirements" in deployment_unit:
301 res_req = deployment_unit.get("resource_requirements")
302 cpu_list = res_req.get("cpu").get("cores")
303 if cpu_list is None:
304 cpu_list = res_req.get("cpu").get("vcpus")
305 cpu_bw = res_req.get("cpu").get("cpu_bw", 1.0)
306 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
307 mem_num = str(res_req.get("memory").get("size", 2))
308 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
309 mem_limit = float(mem_num)
310 if mem_unit == "GB":
311 mem_limit = mem_limit * 1024 * 1024 * 1024
312 elif mem_unit == "MB":
313 mem_limit = mem_limit * 1024 * 1024
314 elif mem_unit == "KB":
315 mem_limit = mem_limit * 1024
316 mem_limit = int(mem_limit)
317 return cpu_list, cpu_period, cpu_quota, mem_limit
318
319 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
320 """
321 Start a single VNFD of this service
322 :param vnfd: vnfd descriptor dict
323 :param vnf_id: unique id of this vnf in the nsd
324 :return:
325 """
326 # the vnf_name refers to the container image to be deployed
327 vnf_name = vnfd.get("name")
328 # combine VDUs and CDUs
329 deployment_units = (vnfd.get("virtual_deployment_units", []) +
330 vnfd.get("cloudnative_deployment_units", []))
331 # iterate over all deployment units within each VNFDs
332 for u in deployment_units:
333 # 1. get the name of the docker image to start and the assigned DC
334 if vnf_id not in self.remote_docker_image_urls:
335 raise Exception("No image name for %r found. Abort." % vnf_id)
336 docker_name = self.remote_docker_image_urls.get(vnf_id)
337 target_dc = vnfd.get("dc")
338 # 2. perform some checks to ensure we can start the container
339 assert(docker_name is not None)
340 assert(target_dc is not None)
341 if not self._check_docker_image_exists(docker_name):
342 raise Exception(
343 "Docker image %r not found. Abort." % docker_name)
344
345 # 3. get the resource limits
346 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
347
348 # check if we need to deploy the management ports (defined as
349 # type:management both on in the vnfd and nsd)
350 intfs = vnfd.get("connection_points", [])
351 # do some re-naming of fields to be compatible to containernet
352 for i in intfs:
353 if i.get("address"):
354 i["ip"] = i.get("address")
355
356 mgmt_intf_names = []
357 if USE_DOCKER_MGMT:
358 mgmt_intfs = [vnf_id + ':' + intf['id']
359 for intf in intfs if intf.get('type') == 'management']
360 # check if any of these management interfaces are used in a
361 # management-type network in the nsd
362 for nsd_intf_name in mgmt_intfs:
363 vlinks = [l["connection_points_reference"]
364 for l in self.nsd.get("virtual_links", [])]
365 for link in vlinks:
366 if nsd_intf_name in link and self.check_mgmt_interface(
367 link):
368 # this is indeed a management interface and can be
369 # skipped
370 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
371 nsd_intf_name)
372 found_interfaces = [
373 intf for intf in intfs if intf.get('id') == vnf_interface]
374 intfs.remove(found_interfaces[0])
375 mgmt_intf_names.append(vnf_interface)
376
377 # 4. generate the volume paths for the docker container
378 volumes = list()
379 # a volume to extract log files
380 docker_log_path = "/tmp/results/%s/%s" % (self.uuid, vnf_id)
381 LOG.debug("LOG path for vnf %s is %s." % (vnf_id, docker_log_path))
382 if not os.path.exists(docker_log_path):
383 LOG.debug("Creating folder %s" % docker_log_path)
384 os.makedirs(docker_log_path)
385
386 volumes.append(docker_log_path + ":/mnt/share/")
387
388 # 5. do the dc.startCompute(name="foobar") call to run the container
389 # TODO consider flavors, and other annotations
390 # TODO: get all vnf id's from the nsd for this vnfd and use those as dockername
391 # use the vnf_id in the nsd as docker name
392 # so deployed containers can be easily mapped back to the nsd
393 LOG.info("Starting %r as %r in DC %r" %
394 (vnf_name, vnf_id, vnfd.get("dc")))
395 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
396 vnfi = target_dc.startCompute(
397 vnf_id,
398 network=intfs,
399 image=docker_name,
400 flavor_name="small",
401 cpu_quota=cpu_quota,
402 cpu_period=cpu_period,
403 cpuset=cpu_list,
404 mem_limit=mem_limit,
405 volumes=volumes,
406 type=kwargs.get('type', 'docker'))
407
408 # add vnfd reference to vnfi
409 vnfi.vnfd = vnfd
410
411 # rename the docker0 interfaces (eth0) to the management port name
412 # defined in the VNFD
413 if USE_DOCKER_MGMT:
414 for intf_name in mgmt_intf_names:
415 self._vnf_reconfigure_network(
416 vnfi, 'eth0', new_name=intf_name)
417
418 return vnfi
419
420 def _stop_vnfi(self, vnfi):
421 """
422 Stop a VNF instance.
423
424 :param vnfi: vnf instance to be stopped
425 """
426 # Find the correct datacenter
427 status = vnfi.getStatus()
428 dc = vnfi.datacenter
429
430 # stop the vnfi
431 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
432 (status["name"], dc))
433 dc.stopCompute(status["name"])
434
435 def _get_vnf_instance(self, instance_uuid, vnf_id):
436 """
437 Returns the Docker object for the given VNF id (or Docker name).
438 :param instance_uuid: UUID of the service instance to search in.
439 :param name: VNF name or Docker name. We are fuzzy here.
440 :return:
441 """
442 dn = vnf_id
443 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
444 if vnfi.name == dn:
445 return vnfi
446 LOG.warning("No container with name: {0} found.".format(dn))
447 return None
448
449 @staticmethod
450 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
451 """
452 Reconfigure the network configuration of a specific interface
453 of a running container.
454 :param vnfi: container instance
455 :param if_name: interface name
456 :param net_str: network configuration string, e.g., 1.2.3.4/24
457 :return:
458 """
459
460 # assign new ip address
461 if net_str is not None:
462 intf = vnfi.intf(intf=if_name)
463 if intf is not None:
464 intf.setIP(net_str)
465 LOG.debug("Reconfigured network of %s:%s to %r" %
466 (vnfi.name, if_name, net_str))
467 else:
468 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
469 vnfi.name, if_name))
470
471 if new_name is not None:
472 vnfi.cmd('ip link set', if_name, 'down')
473 vnfi.cmd('ip link set', if_name, 'name', new_name)
474 vnfi.cmd('ip link set', new_name, 'up')
475 LOG.debug("Reconfigured interface name of %s:%s to %s" %
476 (vnfi.name, if_name, new_name))
477
478 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
479 for vnfi in vnfi_list:
480 config = vnfi.dcinfo.get("Config", dict())
481 env = config.get("Env", list())
482 for env_var in env:
483 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
484 LOG.debug("%r = %r" % (var, cmd))
485 if var == "SON_EMU_CMD":
486 LOG.info("Executing entry point script in %r: %r" %
487 (vnfi.name, cmd))
488 # execute command in new thread to ensure that GK is not
489 # blocked by VNF
490 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
491 t.daemon = True
492 t.start()
493
494 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
495 for vnfi in vnfi_list:
496 config = vnfi.dcinfo.get("Config", dict())
497 env = config.get("Env", list())
498 for env_var in env:
499 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
500 if var == "SON_EMU_CMD_STOP":
501 LOG.info("Executing stop script in %r: %r" %
502 (vnfi.name, cmd))
503 # execute command in new thread to ensure that GK is not
504 # blocked by VNF
505 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
506 t.daemon = True
507 t.start()
508
509 def _unpack_service_package(self):
510 """
511 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
512 """
513 LOG.info("Unzipping: %r" % self.package_file_path)
514 with zipfile.ZipFile(self.package_file_path, "r") as z:
515 z.extractall(self.package_content_path)
516
517 def _load_package_descriptor(self):
518 """
519 Load the main package descriptor YAML and keep it as dict.
520 :return:
521 """
522 self.manifest = load_yaml(
523 os.path.join(
524 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
525
526 def _load_nsd(self):
527 """
528 Load the entry NSD YAML and keep it as dict.
529 :return:
530 """
531 if "package_content" in self.manifest:
532 nsd_path = None
533 for f in self.manifest.get("package_content"):
534 if f.get("content-type") == "application/vnd.5gtango.nsd":
535 nsd_path = os.path.join(
536 self.package_content_path,
537 make_relative_path(f.get("source")))
538 break # always use the first NSD for now
539 if nsd_path is None:
540 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
541 self.nsd = load_yaml(nsd_path)
542 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
543 # create dict to find the vnf_name for any vnf id
544 self.vnf_id2vnf_name = defaultdict(lambda: "NotExistingNode",
545 reduce(lambda x, y: dict(x, **y),
546 map(lambda d: {d["vnf_id"]: d["vnf_name"]},
547 self.nsd["network_functions"])))
548 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
549 else:
550 raise OnBoardingException(
551 "No 'package_content' section in package manifest:\n{}"
552 .format(self.manifest))
553
554 def _load_vnfd(self):
555 """
556 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
557 :return:
558 """
559
560 # first make a list of all the vnfds in the package
561 vnfd_set = dict()
562 if "package_content" in self.manifest:
563 for pc in self.manifest.get("package_content"):
564 if pc.get(
565 "content-type") == "application/vnd.5gtango.vnfd":
566 vnfd_path = os.path.join(
567 self.package_content_path,
568 make_relative_path(pc.get("source")))
569 vnfd = load_yaml(vnfd_path)
570 vnfd_set[vnfd.get("name")] = vnfd
571 if len(vnfd_set) < 1:
572 raise OnBoardingException("No VNFDs found.")
573 # then link each vnf_id in the nsd to its vnfd
574 for vnf_id in self.vnf_id2vnf_name:
575 vnf_name = self.vnf_id2vnf_name[vnf_id]
576 self.vnfds[vnf_id] = vnfd_set[vnf_name]
577 LOG.debug("Loaded VNFD: {0} id: {1}".format(vnf_name, vnf_id))
578
579 def _load_saps(self):
580 # create list of all SAPs
581 # check if we need to deploy management ports
582 if USE_DOCKER_MGMT:
583 SAPs = [p for p in self.nsd["connection_points"]
584 if 'management' not in p.get('type')]
585 else:
586 SAPs = [p for p in self.nsd["connection_points"]]
587
588 for sap in SAPs:
589 # endpoint needed in this service
590 sap_id, sap_interface, sap_docker_name = parse_interface(sap['id'])
591 # make sure SAP has type set (default internal)
592 sap["type"] = sap.get("type", 'internal')
593
594 # Each Service Access Point (connection_point) in the nsd is an IP
595 # address on the host
596 if sap["type"] == "external":
597 # add to vnfds to calculate placement later on
598 sap_net = SAP_SUBNETS.pop(0)
599 self.saps[sap_docker_name] = {
600 "name": sap_docker_name, "type": "external", "net": sap_net}
601 # add SAP vnf to list in the NSD so it is deployed later on
602 # each SAP gets a unique VNFD and vnf_id in the NSD and custom
603 # type (only defined in the dummygatekeeper)
604 self.nsd["network_functions"].append(
605 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_ext"})
606
607 # Each Service Access Point (connection_point) in the nsd is
608 # getting its own container (default)
609 elif sap["type"] == "internal" or sap["type"] == "management":
610 # add SAP to self.vnfds
611 if SAP_VNFD is None:
612 sapfile = pkg_resources.resource_filename(
613 __name__, "sap_vnfd.yml")
614 else:
615 sapfile = SAP_VNFD
616 sap_vnfd = load_yaml(sapfile)
617 sap_vnfd["connection_points"][0]["id"] = sap_interface
618 sap_vnfd["name"] = sap_docker_name
619 sap_vnfd["type"] = "internal"
620 # add to vnfds to calculate placement later on and deploy
621 self.saps[sap_docker_name] = sap_vnfd
622 # add SAP vnf to list in the NSD so it is deployed later on
623 # each SAP get a unique VNFD and vnf_id in the NSD
624 self.nsd["network_functions"].append(
625 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_int"})
626
627 LOG.debug("Loaded SAP: name: {0}, type: {1}".format(
628 sap_docker_name, sap['type']))
629
630 # create sap lists
631 self.saps_ext = [self.saps[sap]['name']
632 for sap in self.saps if self.saps[sap]["type"] == "external"]
633 self.saps_int = [self.saps[sap]['name']
634 for sap in self.saps if self.saps[sap]["type"] == "internal"]
635
636 def _start_sap(self, sap, instance_uuid):
637 if not DEPLOY_SAP:
638 return
639
640 LOG.info('start SAP: {0} ,type: {1}'.format(sap['name'], sap['type']))
641 if sap["type"] == "internal":
642 vnfi = None
643 if not GK_STANDALONE_MODE:
644 vnfi = self._start_vnfd(sap, sap['name'], type='sap_int')
645 self.instances[instance_uuid]["vnf_instances"].append(vnfi)
646
647 elif sap["type"] == "external":
648 target_dc = sap.get("dc")
649 # add interface to dc switch
650 target_dc.attachExternalSAP(sap['name'], sap['net'])
651
652 def _connect_elines(self, eline_fwd_links, instance_uuid):
653 """
654 Connect all E-LINE links in the NSD
655 :param eline_fwd_links: list of E-LINE links in the NSD
656 :param: instance_uuid of the service
657 :return:
658 """
659 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
660 # eg. different services get a unique cookie for their flowrules
661 cookie = 1
662 for link in eline_fwd_links:
663 LOG.info("Found E-Line: {}".format(link))
664 # check if we need to deploy this link when its a management link:
665 if USE_DOCKER_MGMT:
666 if self.check_mgmt_interface(
667 link["connection_points_reference"]):
668 continue
669
670 src_id, src_if_name, src_sap_id = parse_interface(
671 link["connection_points_reference"][0])
672 dst_id, dst_if_name, dst_sap_id = parse_interface(
673 link["connection_points_reference"][1])
674
675 setChaining = False
676 # check if there is a SAP in the link and chain everything together
677 if src_sap_id in self.saps and dst_sap_id in self.saps:
678 LOG.info(
679 '2 SAPs cannot be chained together : {0} - {1}'.format(src_sap_id, dst_sap_id))
680 continue
681
682 elif src_sap_id in self.saps_ext:
683 src_id = src_sap_id
684 # set intf name to None so the chaining function will choose
685 # the first one
686 src_if_name = None
687 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
688 if dst_vnfi is not None:
689 # choose first ip address in sap subnet
690 sap_net = self.saps[src_sap_id]['net']
691 sap_ip = "{0}/{1}".format(str(sap_net[2]),
692 sap_net.prefixlen)
693 self._vnf_reconfigure_network(
694 dst_vnfi, dst_if_name, sap_ip)
695 setChaining = True
696
697 elif dst_sap_id in self.saps_ext:
698 dst_id = dst_sap_id
699 # set intf name to None so the chaining function will choose
700 # the first one
701 dst_if_name = None
702 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
703 if src_vnfi is not None:
704 sap_net = self.saps[dst_sap_id]['net']
705 sap_ip = "{0}/{1}".format(str(sap_net[2]),
706 sap_net.prefixlen)
707 self._vnf_reconfigure_network(
708 src_vnfi, src_if_name, sap_ip)
709 setChaining = True
710
711 # Link between 2 VNFs
712 else:
713 LOG.info("Creating E-Line: src={}, dst={}"
714 .format(src_id, dst_id))
715 # make sure we use the correct sap vnf name
716 if src_sap_id in self.saps_int:
717 src_id = src_sap_id
718 if dst_sap_id in self.saps_int:
719 dst_id = dst_sap_id
720 # get involved vnfis
721 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
722 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
723
724 if src_vnfi is not None and dst_vnfi is not None:
725 setChaining = True
726 # re-configure the VNFs IP assignment and ensure that a new
727 # subnet is used for each E-Link
728 eline_net = ELINE_SUBNETS.pop(0)
729 ip1 = "{0}/{1}".format(str(eline_net[1]),
730 eline_net.prefixlen)
731 ip2 = "{0}/{1}".format(str(eline_net[2]),
732 eline_net.prefixlen)
733 # check if VNFs have fixed IPs (address field in VNFDs)
734 if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
735 .get("address") is None):
736 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
737 # check if VNFs have fixed IPs (address field in VNFDs)
738 if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
739 .get("address") is None):
740 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
741
742 # Set the chaining
743 if setChaining:
744 GK.net.setChain(
745 src_id, dst_id,
746 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
747 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
748
749 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
750 """
751 Gets the connection point data structure from the VNFD
752 of the given VNFI using ifname.
753 """
754 if vnfi.vnfd is None:
755 return {}
756 cps = vnfi.vnfd.get("connection_points")
757 for cp in cps:
758 if cp.get("id") == ifname:
759 return cp
760
761 def _connect_elans(self, elan_fwd_links, instance_uuid):
762 """
763 Connect all E-LAN links in the NSD
764 :param elan_fwd_links: list of E-LAN links in the NSD
765 :param: instance_uuid of the service
766 :return:
767 """
768 for link in elan_fwd_links:
769 # check if we need to deploy this link when its a management link:
770 if USE_DOCKER_MGMT:
771 if self.check_mgmt_interface(
772 link["connection_points_reference"]):
773 continue
774
775 elan_vnf_list = []
776 # check if an external SAP is in the E-LAN (then a subnet is
777 # already defined)
778 intfs_elan = [intf for intf in link["connection_points_reference"]]
779 lan_sap = self.check_ext_saps(intfs_elan)
780 if lan_sap:
781 lan_net = self.saps[lan_sap]['net']
782 lan_hosts = list(lan_net.hosts())
783 else:
784 lan_net = ELAN_SUBNETS.pop(0)
785 lan_hosts = list(lan_net.hosts())
786
787 # generate lan ip address for all interfaces except external SAPs
788 for intf in link["connection_points_reference"]:
789
790 # skip external SAPs, they already have an ip
791 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
792 intf)
793 if vnf_sap_docker_name in self.saps_ext:
794 elan_vnf_list.append(
795 {'name': vnf_sap_docker_name, 'interface': vnf_interface})
796 continue
797
798 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
799 lan_net.prefixlen)
800 vnf_id, intf_name, vnf_sap_id = parse_interface(intf)
801
802 # make sure we use the correct sap vnf name
803 src_docker_name = vnf_id
804 if vnf_sap_id in self.saps_int:
805 src_docker_name = vnf_sap_id
806 vnf_id = vnf_sap_id
807
808 LOG.debug(
809 "Setting up E-LAN interface. (%s:%s) -> %s" % (
810 vnf_id, intf_name, ip_address))
811
812 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
813 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
814 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is necessary.
815 vnfi = self._get_vnf_instance(instance_uuid, vnf_id)
816 if vnfi is not None:
817 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
818 # add this vnf and interface to the E-LAN for tagging
819 elan_vnf_list.append(
820 {'name': src_docker_name, 'interface': intf_name})
821
822 # install the VLAN tags for this E-LAN
823 GK.net.setLAN(elan_vnf_list)
824
825 def _load_docker_files(self):
826 """
827 Get all paths to Dockerfiles from VNFDs and store them in dict.
828 :return:
829 """
830 for k, v in self.vnfds.iteritems():
831 for vu in v.get("virtual_deployment_units", []):
832 if vu.get("vm_image_format") == "docker":
833 vm_image = vu.get("vm_image")
834 docker_path = os.path.join(
835 self.package_content_path,
836 make_relative_path(vm_image))
837 self.local_docker_files[k] = docker_path
838 LOG.debug("Found Dockerfile (%r): %r" % (k, docker_path))
839 for cu in v.get("cloudnative_deployment_units", []):
840 image = cu.get("image")
841 docker_path = os.path.join(
842 self.package_content_path,
843 make_relative_path(image))
844 self.local_docker_files[k] = docker_path
845 LOG.debug("Found Dockerfile (%r): %r" % (k, docker_path))
846
847 def _load_docker_urls(self):
848 """
849 Get all URLs to pre-build docker images in some repo.
850 :return:
851 """
852 for k, v in self.vnfds.iteritems():
853 for vu in v.get("virtual_deployment_units", []):
854 if vu.get("vm_image_format") == "docker":
855 url = vu.get("vm_image")
856 if url is not None:
857 url = url.replace("http://", "")
858 self.remote_docker_image_urls[k] = url
859 LOG.debug("Found Docker image URL (%r): %r" %
860 (k, self.remote_docker_image_urls[k]))
861 for cu in v.get("cloudnative_deployment_units", []):
862 url = cu.get("image")
863 if url is not None:
864 url = url.replace("http://", "")
865 self.remote_docker_image_urls[k] = url
866 LOG.debug("Found Docker image URL (%r): %r" %
867 (k, self.remote_docker_image_urls[k]))
868
869 def _build_images_from_dockerfiles(self):
870 """
871 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
872 """
873 if GK_STANDALONE_MODE:
874 return # do not build anything in standalone mode
875 dc = DockerClient()
876 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
877 self.local_docker_files))
878 for k, v in self.local_docker_files.iteritems():
879 for line in dc.build(path=v.replace(
880 "Dockerfile", ""), tag=k, rm=False, nocache=False):
881 LOG.debug("DOCKER BUILD: %s" % line)
882 LOG.info("Docker image created: %s" % k)
883
884 def _pull_predefined_dockerimages(self):
885 """
886 If the package contains URLs to pre-build Docker images, we download them with this method.
887 """
888 dc = DockerClient()
889 for url in self.remote_docker_image_urls.itervalues():
890 # only pull if not present (speedup for development)
891 if not FORCE_PULL:
892 if len(dc.images.list(name=url)) > 0:
893 LOG.debug("Image %r present. Skipping pull." % url)
894 continue
895 LOG.info("Pulling image: %r" % url)
896 # this seems to fail with latest docker api version 2.0.2
897 # dc.images.pull(url,
898 # insecure_registry=True)
899 # using docker cli instead
900 cmd = ["docker",
901 "pull",
902 url,
903 ]
904 Popen(cmd).wait()
905
906 def _check_docker_image_exists(self, image_name):
907 """
908 Query the docker service and check if the given image exists
909 :param image_name: name of the docker image
910 :return:
911 """
912 return len(DockerClient().images.list(name=image_name)) > 0
913
914 def _calculate_placement(self, algorithm):
915 """
916 Do placement by adding the a field "dc" to
917 each VNFD that points to one of our
918 data center objects known to the gatekeeper.
919 """
920 assert(len(self.vnfds) > 0)
921 assert(len(GK.dcs) > 0)
922 # instantiate algorithm an place
923 p = algorithm()
924 p.place(self.nsd, self.vnfds, self.saps, GK.dcs)
925 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
926 # lets print the placement result
927 for name, vnfd in self.vnfds.iteritems():
928 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
929 for sap in self.saps:
930 sap_dict = self.saps[sap]
931 LOG.info("Placed SAP %r on DC %r" % (sap, str(sap_dict.get("dc"))))
932
933 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
934 """
935 Calculate cpu period and quota for CFS
936 :param cpu_time_percentage: percentage of overall CPU to be used
937 :return: cpu_period, cpu_quota
938 """
939 if cpu_time_percentage is None:
940 return -1, -1
941 if cpu_time_percentage < 0:
942 return -1, -1
943 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
944 # Attention minimum cpu_quota is 1ms (micro)
945 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
946 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
947 (cpu_period, cpu_time_percentage))
948 # calculate the fraction of cpu time for this container
949 cpu_quota = cpu_period * cpu_time_percentage
950 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
951 # idea why
952 if cpu_quota < 1000:
953 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
954 cpu_quota = 1000
955 LOG.warning("Increased CPU quota to avoid system error.")
956 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
957 (cpu_period, cpu_quota))
958 return int(cpu_period), int(cpu_quota)
959
960 def check_ext_saps(self, intf_list):
961 # check if the list of interfacs contains an external SAP
962 saps_ext = [self.saps[sap]['name']
963 for sap in self.saps if self.saps[sap]["type"] == "external"]
964 for intf_name in intf_list:
965 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
966 intf_name)
967 if vnf_sap_docker_name in saps_ext:
968 return vnf_sap_docker_name
969
970 def check_mgmt_interface(self, intf_list):
971 SAPs_mgmt = [p.get('id') for p in self.nsd["connection_points"]
972 if 'management' in p.get('type')]
973 for intf_name in intf_list:
974 if intf_name in SAPs_mgmt:
975 return True
976
977
978 """
979 Some (simple) placement algorithms
980 """
981
982
983 class FirstDcPlacement(object):
984 """
985 Placement: Always use one and the same data center from the GK.dcs dict.
986 """
987
988 def place(self, nsd, vnfds, saps, dcs):
989 for id, vnfd in vnfds.iteritems():
990 vnfd["dc"] = list(dcs.itervalues())[0]
991
992
993 class RoundRobinDcPlacement(object):
994 """
995 Placement: Distribute VNFs across all available DCs in a round robin fashion.
996 """
997
998 def place(self, nsd, vnfds, saps, dcs):
999 c = 0
1000 dcs_list = list(dcs.itervalues())
1001 for id, vnfd in vnfds.iteritems():
1002 vnfd["dc"] = dcs_list[c % len(dcs_list)]
1003 c += 1 # inc. c to use next DC
1004
1005
1006 """
1007 Resource definitions and API endpoints
1008 """
1009
1010
1011 class Packages(fr.Resource):
1012
1013 def post(self):
1014 """
1015 Upload a *.son service package to the dummy gatekeeper.
1016
1017 We expect request with a *.son file and store it in UPLOAD_FOLDER
1018 :return: UUID
1019 """
1020 try:
1021 # get file contents
1022 LOG.info("POST /packages called")
1023 # lets search for the package in the request
1024 is_file_object = False # make API more robust: file can be in data or in files field
1025 if "package" in request.files:
1026 son_file = request.files["package"]
1027 is_file_object = True
1028 elif len(request.data) > 0:
1029 son_file = request.data
1030 else:
1031 return {"service_uuid": None, "size": 0, "sha1": None,
1032 "error": "upload failed. file not found."}, 500
1033 # generate a uuid to reference this package
1034 service_uuid = str(uuid.uuid4())
1035 file_hash = hashlib.sha1(str(son_file)).hexdigest()
1036 # ensure that upload folder exists
1037 ensure_dir(UPLOAD_FOLDER)
1038 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
1039 # store *.son file to disk
1040 if is_file_object:
1041 son_file.save(upload_path)
1042 else:
1043 with open(upload_path, 'wb') as f:
1044 f.write(son_file)
1045 size = os.path.getsize(upload_path)
1046
1047 # first stop and delete any other running services
1048 if AUTO_DELETE:
1049 service_list = copy.copy(GK.services)
1050 for service_uuid in service_list:
1051 instances_list = copy.copy(
1052 GK.services[service_uuid].instances)
1053 for instance_uuid in instances_list:
1054 # valid service and instance UUID, stop service
1055 GK.services.get(service_uuid).stop_service(
1056 instance_uuid)
1057 LOG.info("service instance with uuid %r stopped." %
1058 instance_uuid)
1059
1060 # create a service object and register it
1061 s = Service(service_uuid, file_hash, upload_path)
1062 GK.register_service_package(service_uuid, s)
1063
1064 # automatically deploy the service
1065 if AUTO_DEPLOY:
1066 # ok, we have a service uuid, lets start the service
1067 reset_subnets()
1068 GK.services.get(service_uuid).start_service()
1069
1070 # generate the JSON result
1071 return {"service_uuid": service_uuid, "size": size,
1072 "sha1": file_hash, "error": None}, 201
1073 except BaseException:
1074 LOG.exception("Service package upload failed:")
1075 return {"service_uuid": None, "size": 0,
1076 "sha1": None, "error": "upload failed"}, 500
1077
1078 def get(self):
1079 """
1080 Return a list of UUID's of uploaded service packages.
1081 :return: dict/list
1082 """
1083 LOG.info("GET /packages")
1084 return {"service_uuid_list": list(GK.services.iterkeys())}
1085
1086
1087 class Instantiations(fr.Resource):
1088
1089 def post(self):
1090 """
1091 Instantiate a service specified by its UUID.
1092 Will return a new UUID to identify the running service instance.
1093 :return: UUID
1094 """
1095 LOG.info("POST /instantiations (or /requests) called")
1096 # try to extract the service uuid from the request
1097 json_data = request.get_json(force=True)
1098 service_uuid = json_data.get("service_uuid")
1099
1100 # lets be a bit fuzzy here to make testing easier
1101 if (service_uuid is None or service_uuid ==
1102 "latest") and len(GK.services) > 0:
1103 # if we don't get a service uuid, we simple start the first service
1104 # in the list
1105 service_uuid = list(GK.services.iterkeys())[0]
1106 if service_uuid in GK.services:
1107 # ok, we have a service uuid, lets start the service
1108 service_instance_uuid = GK.services.get(
1109 service_uuid).start_service()
1110 return {"service_instance_uuid": service_instance_uuid}, 201
1111 return "Service not found", 404
1112
1113 def get(self):
1114 """
1115 Returns a list of UUIDs containing all running services.
1116 :return: dict / list
1117 """
1118 LOG.info("GET /instantiations")
1119 return {"service_instantiations_list": [
1120 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
1121
1122 def delete(self):
1123 """
1124 Stops a running service specified by its service and instance UUID.
1125 """
1126 # try to extract the service and instance UUID from the request
1127 json_data = request.get_json(force=True)
1128 service_uuid = json_data.get("service_uuid")
1129 instance_uuid = json_data.get("service_instance_uuid")
1130
1131 # try to be fuzzy
1132 if service_uuid is None and len(GK.services) > 0:
1133 # if we don't get a service uuid, we simply stop the last service
1134 # in the list
1135 service_uuid = list(GK.services.iterkeys())[0]
1136 if instance_uuid is None and len(
1137 GK.services[service_uuid].instances) > 0:
1138 instance_uuid = list(
1139 GK.services[service_uuid].instances.iterkeys())[0]
1140
1141 if service_uuid in GK.services and instance_uuid in GK.services[service_uuid].instances:
1142 # valid service and instance UUID, stop service
1143 GK.services.get(service_uuid).stop_service(instance_uuid)
1144 return "service instance with uuid %r stopped." % instance_uuid, 200
1145 return "Service not found", 404
1146
1147
1148 class Exit(fr.Resource):
1149
1150 def put(self):
1151 """
1152 Stop the running Containernet instance regardless of data transmitted
1153 """
1154 list(GK.dcs.values())[0].net.stop()
1155
1156
1157 def initialize_GK():
1158 global GK
1159 GK = Gatekeeper()
1160
1161
1162 # create a single, global GK object
1163 GK = None
1164 initialize_GK()
1165 # setup Flask
1166 app = Flask(__name__)
1167 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1168 api = fr.Api(app)
1169 # define endpoints
1170 api.add_resource(Packages, '/packages', '/api/v2/packages')
1171 api.add_resource(Instantiations, '/instantiations',
1172 '/api/v2/instantiations', '/api/v2/requests')
1173 api.add_resource(Exit, '/emulator/exit')
1174
1175
1176 def start_rest_api(host, port, datacenters=dict()):
1177 GK.dcs = datacenters
1178 GK.net = get_dc_network()
1179 # start the Flask server (not the best performance but ok for our use case)
1180 app.run(host=host,
1181 port=port,
1182 debug=True,
1183 use_reloader=False # this is needed to run Flask in a non-main thread
1184 )
1185
1186
1187 def ensure_dir(name):
1188 if not os.path.exists(name):
1189 os.makedirs(name)
1190
1191
1192 def load_yaml(path):
1193 with open(path, "r") as f:
1194 try:
1195 r = yaml.load(f)
1196 except yaml.YAMLError as exc:
1197 LOG.exception("YAML parse error: %r" % str(exc))
1198 r = dict()
1199 return r
1200
1201
1202 def make_relative_path(path):
1203 if path.startswith("file://"):
1204 path = path.replace("file://", "", 1)
1205 if path.startswith("/"):
1206 path = path.replace("/", "", 1)
1207 return path
1208
1209
1210 def get_dc_network():
1211 """
1212 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1213 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1214 :return:
1215 """
1216 assert (len(GK.dcs) > 0)
1217 return GK.dcs.values()[0].net
1218
1219
1220 def parse_interface(interface_name):
1221 """
1222 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1223 :param interface_name:
1224 :return:
1225 """
1226
1227 if ':' in interface_name:
1228 vnf_id, vnf_interface = interface_name.split(':')
1229 vnf_sap_docker_name = interface_name.replace(':', '_')
1230 else:
1231 vnf_id = interface_name
1232 vnf_interface = interface_name
1233 vnf_sap_docker_name = interface_name
1234
1235 return vnf_id, vnf_interface, vnf_sap_docker_name
1236
1237
1238 def reset_subnets():
1239 # private subnet definitions for the generated interfaces
1240 # 10.10.xxx.0/24
1241 global SAP_SUBNETS
1242 SAP_SUBNETS = generate_subnets('10.10', 0, subnet_size=50, mask=30)
1243 # 10.20.xxx.0/30
1244 global ELAN_SUBNETS
1245 ELAN_SUBNETS = generate_subnets('10.20', 0, subnet_size=50, mask=24)
1246 # 10.30.xxx.0/30
1247 global ELINE_SUBNETS
1248 ELINE_SUBNETS = generate_subnets('10.30', 0, subnet_size=50, mask=30)
1249
1250
1251 if __name__ == '__main__':
1252 """
1253 Lets allow to run the API in standalone mode.
1254 """
1255 GK_STANDALONE_MODE = True
1256 logging.getLogger("werkzeug").setLevel(logging.INFO)
1257 start_rest_api("0.0.0.0", 8000)