Extended networking parts of 5GTANGO LLCM to fully support
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from collections import defaultdict
43 import pkg_resources
44 from subprocess import Popen
45 import ipaddress
46 import copy
47 import time
48 from functools import reduce
49
50
51 LOG = logging.getLogger("5gtango.llcm")
52 LOG.setLevel(logging.INFO)
53
54
55 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
56 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
57 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
58
59 # Enable Dockerfile build functionality
60 BUILD_DOCKERFILE = False
61
62 # flag to indicate that we run without the emulator (only the bare API for
63 # integration testing)
64 GK_STANDALONE_MODE = False
65
66 # should a new version of an image be pulled even if its available
67 FORCE_PULL = False
68
69 # Automatically deploy SAPs (endpoints) of the service as new containers
70 # Attention: This is not a configuration switch but a global variable!
71 # Don't change its default value.
72 DEPLOY_SAP = False
73
74 # flag to indicate if we use bidirectional forwarding rules in the
75 # automatic chaining process
76 BIDIRECTIONAL_CHAIN = True
77
78 # override the management interfaces in the descriptors with default
79 # docker0 interfaces in the containers
80 USE_DOCKER_MGMT = False
81
82 # automatically deploy uploaded packages (no need to execute son-access
83 # deploy --latest separately)
84 AUTO_DEPLOY = False
85
86 # and also automatically terminate any other running services
87 AUTO_DELETE = False
88
89
90 def generate_subnets(prefix, base, subnet_size=50, mask=24):
91 # Generate a list of ipaddress in subnets
92 r = list()
93 for net in range(base, base + subnet_size):
94 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
95 r.append(ipaddress.ip_network(unicode(subnet)))
96 return r
97
98
99 # private subnet definitions for the generated interfaces
100 # 99.0.xxx.0/24
101 SAP_SUBNETS = generate_subnets('99.0', 0, subnet_size=50, mask=24)
102 # 30.0.xxx.0/24
103 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
104 # 20.0.xxx.0/24
105 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
106
107 # path to the VNFD for the SAP VNF that is deployed as internal SAP point
108 SAP_VNFD = None
109
110 # Time in seconds to wait for vnf stop scripts to execute fully
111 VNF_STOP_WAIT_TIME = 5
112
113
114 class OnBoardingException(BaseException):
115 pass
116
117
118 class Gatekeeper(object):
119
120 def __init__(self):
121 self.services = dict()
122 self.dcs = dict()
123 self.net = None
124 # used to generate short names for VNFs (Mininet limitation)
125 self.vnf_counter = 0
126 LOG.info("Initialized 5GTANGO LLCM module.")
127
128 def register_service_package(self, service_uuid, service):
129 """
130 register new service package
131 :param service_uuid
132 :param service object
133 """
134 self.services[service_uuid] = service
135 # lets perform all steps needed to onboard the service
136 service.onboard()
137
138
139 class Service(object):
140 """
141 This class represents a NS uploaded as a *.son package to the
142 dummy gatekeeper.
143 Can have multiple running instances of this service.
144 """
145
146 def __init__(self,
147 service_uuid,
148 package_file_hash,
149 package_file_path):
150 self.uuid = service_uuid
151 self.package_file_hash = package_file_hash
152 self.package_file_path = package_file_path
153 self.package_content_path = os.path.join(
154 CATALOG_FOLDER, "services/%s" % self.uuid)
155 self.manifest = None
156 self.nsd = None
157 self.vnfds = dict()
158 self.saps = dict()
159 self.saps_ext = list()
160 self.saps_int = list()
161 self.local_docker_files = dict()
162 self.remote_docker_image_urls = dict()
163 self.instances = dict()
164 # dict to find the vnf_name for any vnf id
165 self.vnf_id2vnf_name = dict()
166
167 def onboard(self):
168 """
169 Do all steps to prepare this service to be instantiated
170 :return:
171 """
172 # 1. extract the contents of the package and store them in our catalog
173 self._unpack_service_package()
174 # 2. read in all descriptor files
175 self._load_package_descriptor()
176 self._load_nsd()
177 self._load_vnfd()
178 if self.nsd is None:
179 raise OnBoardingException("No NSD found.")
180 if len(self.vnfds) < 1:
181 raise OnBoardingException("No VNFDs found.")
182 if DEPLOY_SAP:
183 self._load_saps()
184 # 3. prepare container images (e.g. download or build Dockerfile)
185 if BUILD_DOCKERFILE:
186 self._load_docker_files()
187 self._build_images_from_dockerfiles()
188 else:
189 self._load_docker_urls()
190 self._pull_predefined_dockerimages()
191 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
192
193 def start_service(self):
194 """
195 This methods creates and starts a new service instance.
196 It computes placements, iterates over all VNFDs, and starts
197 each VNFD as a Docker container in the data center selected
198 by the placement algorithm.
199 :return:
200 """
201 LOG.info("Starting service %r" % self.uuid)
202
203 # 1. each service instance gets a new uuid to identify it
204 instance_uuid = str(uuid.uuid4())
205 # build a instances dict (a bit like a NSR :))
206 self.instances[instance_uuid] = dict()
207 self.instances[instance_uuid]["vnf_instances"] = list()
208
209 # 2. compute placement of this service instance (adds DC names to
210 # VNFDs)
211 if not GK_STANDALONE_MODE:
212 # self._calculate_placement(FirstDcPlacement)
213 self._calculate_placement(RoundRobinDcPlacement)
214 # 3. start all vnfds that we have in the service
215 for vnf_id in self.vnfds:
216 vnfd = self.vnfds[vnf_id]
217 # attention: returns a list of started deployment units
218 vnfis = self._start_vnfd(vnfd, vnf_id)
219 # add list of VNFIs to total VNFI list
220 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
221
222 # 4. Deploy E-Line, E-Tree and E-LAN links
223 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
224 # even if "forwarding_graphs" are not used directly.
225 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
226 vlinks = self.nsd["virtual_links"]
227 # constituent virtual links are not checked
228 # fwd_links = self.nsd["forwarding_graphs"][0]["constituent_virtual_links"]
229 eline_fwd_links = [l for l in vlinks if (
230 l["connectivity_type"] == "E-Line")]
231 elan_fwd_links = [l for l in vlinks if (
232 l["connectivity_type"] == "E-LAN" or
233 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
234
235 # 5a. deploy E-Line links
236 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
237 self._connect_elines(eline_fwd_links, instance_uuid)
238 # 5b. deploy E-Tree/E-LAN links
239 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
240 self._connect_elans(elan_fwd_links, instance_uuid)
241
242 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
243 # service instance
244 self._trigger_emulator_start_scripts_in_vnfis(
245 self.instances[instance_uuid]["vnf_instances"])
246
247 LOG.info("Service started. Instance id: %r" % instance_uuid)
248 return instance_uuid
249
250 def stop_service(self, instance_uuid):
251 """
252 This method stops a running service instance.
253 It iterates over all VNF instances, stopping them each
254 and removing them from their data center.
255
256 :param instance_uuid: the uuid of the service instance to be stopped
257 """
258 LOG.info("Stopping service %r" % self.uuid)
259 # get relevant information
260 # instance_uuid = str(self.uuid.uuid4())
261 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
262
263 # trigger stop skripts in vnf instances and wait a few seconds for
264 # completion
265 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
266 time.sleep(VNF_STOP_WAIT_TIME)
267
268 for v in vnf_instances:
269 self._stop_vnfi(v)
270
271 for sap_name in self.saps_ext:
272 ext_sap = self.saps[sap_name]
273 target_dc = ext_sap.get("dc")
274 target_dc.removeExternalSAP(sap_name)
275 LOG.info("Stopping the SAP instance: %r in DC %r" %
276 (sap_name, target_dc))
277
278 # last step: remove the instance from the list of all instances
279 del self.instances[instance_uuid]
280
281 def _get_resource_limits(self, deployment_unit):
282 """
283 Extract resource limits from deployment units.
284 """
285 # defaults
286 cpu_list = "1"
287 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
288 mem_limit = 0
289 # update from descriptor
290 if "resource_requirements" in deployment_unit:
291 res_req = deployment_unit.get("resource_requirements")
292 cpu_list = res_req.get("cpu").get("cores")
293 if cpu_list is None:
294 cpu_list = res_req.get("cpu").get("vcpus")
295 cpu_bw = res_req.get("cpu").get("cpu_bw", 1.0)
296 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
297 mem_num = str(res_req.get("memory").get("size", 2))
298 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
299 mem_limit = float(mem_num)
300 if mem_unit == "GB":
301 mem_limit = mem_limit * 1024 * 1024 * 1024
302 elif mem_unit == "MB":
303 mem_limit = mem_limit * 1024 * 1024
304 elif mem_unit == "KB":
305 mem_limit = mem_limit * 1024
306 mem_limit = int(mem_limit)
307 return cpu_list, cpu_period, cpu_quota, mem_limit
308
309 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
310 """
311 Start a single VNFD of this service
312 :param vnfd: vnfd descriptor dict
313 :param vnf_id: unique id of this vnf in the nsd
314 :return:
315 """
316 vnfis = list()
317 # the vnf_name refers to the container image to be deployed
318 vnf_name = vnfd.get("name")
319 # combine VDUs and CDUs
320 deployment_units = (vnfd.get("virtual_deployment_units", []) +
321 vnfd.get("cloudnative_deployment_units", []))
322 # iterate over all deployment units within each VNFDs
323 for u in deployment_units:
324 # 0. vnf_container_name = vnf_id.vdu_id
325 vnf_container_name = get_container_name(vnf_id, u.get("id"))
326 # 1. get the name of the docker image to star
327 if vnf_container_name not in self.remote_docker_image_urls:
328 raise Exception("No image name for %r found. Abort." % vnf_container_name)
329 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
330 # 2. select datacenter to start the VNF in
331 target_dc = vnfd.get("dc")
332 # 3. perform some checks to ensure we can start the container
333 assert(docker_image_name is not None)
334 assert(target_dc is not None)
335 if not self._check_docker_image_exists(docker_image_name):
336 raise Exception("Docker image {} not found. Abort."
337 .format(docker_image_name))
338
339 # 4. get the resource limits
340 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
341
342 # get connection points defined for the DU
343 intfs = u.get("connection_points", [])
344 # do some re-naming of fields to be compatible to containernet
345 for i in intfs:
346 if i.get("address"):
347 i["ip"] = i.get("address")
348
349 # 5. collect additional information to start container
350 volumes = list()
351 cenv = dict()
352 # 5.1 inject descriptor based start/stop commands into env (overwrite)
353 VNFD_CMD_START = u.get("vm_cmd_start")
354 VNFD_CMD_STOP = u.get("vm_cmd_stop")
355 if VNFD_CMD_START and not VNFD_CMD_START == "None":
356 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
357 " Overwriting SON_EMU_CMD.")
358 cenv["SON_EMU_CMD"] = VNFD_CMD_START
359 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
360 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
361 " Overwriting SON_EMU_CMD_STOP.")
362 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
363
364 # 6. Start the container
365 LOG.info("Starting %r as %r in DC %r" %
366 (vnf_name, vnf_container_name, vnfd.get("dc")))
367 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
368 # start the container
369 vnfi = target_dc.startCompute(
370 vnf_container_name,
371 network=intfs,
372 image=docker_image_name,
373 cpu_quota=cpu_quota,
374 cpu_period=cpu_period,
375 cpuset=cpu_list,
376 mem_limit=mem_limit,
377 volumes=volumes,
378 properties=cenv, # environment
379 type=kwargs.get('type', 'docker'))
380 # add vnfd reference to vnfi
381 vnfi.vnfd = vnfd
382 # add container name
383 vnfi.vnf_container_name = vnf_container_name
384 # store vnfi
385 vnfis.append(vnfi)
386 return vnfis
387
388 def _stop_vnfi(self, vnfi):
389 """
390 Stop a VNF instance.
391
392 :param vnfi: vnf instance to be stopped
393 """
394 # Find the correct datacenter
395 status = vnfi.getStatus()
396 dc = vnfi.datacenter
397
398 # stop the vnfi
399 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
400 (status["name"], dc))
401 dc.stopCompute(status["name"])
402
403 def _get_vnf_instance(self, instance_uuid, vnf_id):
404 """
405 Returns VNFI object for a given "vnf_id" or "vnf_container_namse" taken from an NSD.
406 :return: single object
407 """
408 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
409 if str(vnfi.name) == str(vnf_id):
410 return vnfi
411 LOG.warning("No container with name: {0} found.".format(vnf_id))
412 return None
413
414 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
415 """
416 Returns a list of VNFI objects (all deployment units) for a given
417 "vnf_id" taken from an NSD.
418 :return: list
419 """
420 r = list()
421 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
422 if vnf_id in vnfi.name:
423 r.append(vnfi)
424 if len(r) > 0:
425 LOG.debug("Found units: {} for vnf_id: {}"
426 .format([i.name for i in r], vnf_id))
427 return r
428 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
429 return None
430
431 @staticmethod
432 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
433 """
434 Reconfigure the network configuration of a specific interface
435 of a running container.
436 :param vnfi: container instance
437 :param if_name: interface name
438 :param net_str: network configuration string, e.g., 1.2.3.4/24
439 :return:
440 """
441
442 # assign new ip address
443 if net_str is not None:
444 intf = vnfi.intf(intf=if_name)
445 if intf is not None:
446 intf.setIP(net_str)
447 LOG.debug("Reconfigured network of %s:%s to %r" %
448 (vnfi.name, if_name, net_str))
449 else:
450 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
451 vnfi.name, if_name))
452
453 if new_name is not None:
454 vnfi.cmd('ip link set', if_name, 'down')
455 vnfi.cmd('ip link set', if_name, 'name', new_name)
456 vnfi.cmd('ip link set', new_name, 'up')
457 LOG.debug("Reconfigured interface name of %s:%s to %s" %
458 (vnfi.name, if_name, new_name))
459
460 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
461 for vnfi in vnfi_list:
462 config = vnfi.dcinfo.get("Config", dict())
463 env = config.get("Env", list())
464 for env_var in env:
465 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
466 # LOG.debug("%r = %r" % (var, cmd))
467 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
468 LOG.info("Executing script in '{}': {}={}"
469 .format(vnfi.name, var, cmd))
470 # execute command in new thread to ensure that GK is not
471 # blocked by VNF
472 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
473 t.daemon = True
474 t.start()
475 break # only execute one command
476
477 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
478 for vnfi in vnfi_list:
479 config = vnfi.dcinfo.get("Config", dict())
480 env = config.get("Env", list())
481 for env_var in env:
482 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
483 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
484 LOG.info("Executing script in '{}': {}={}"
485 .format(vnfi.name, var, cmd))
486 # execute command in new thread to ensure that GK is not
487 # blocked by VNF
488 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
489 t.daemon = True
490 t.start()
491 break # only execute one command
492
493 def _unpack_service_package(self):
494 """
495 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
496 """
497 LOG.info("Unzipping: %r" % self.package_file_path)
498 with zipfile.ZipFile(self.package_file_path, "r") as z:
499 z.extractall(self.package_content_path)
500
501 def _load_package_descriptor(self):
502 """
503 Load the main package descriptor YAML and keep it as dict.
504 :return:
505 """
506 self.manifest = load_yaml(
507 os.path.join(
508 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
509
510 def _load_nsd(self):
511 """
512 Load the entry NSD YAML and keep it as dict.
513 :return:
514 """
515 if "package_content" in self.manifest:
516 nsd_path = None
517 for f in self.manifest.get("package_content"):
518 if f.get("content-type") == "application/vnd.5gtango.nsd":
519 nsd_path = os.path.join(
520 self.package_content_path,
521 make_relative_path(f.get("source")))
522 break # always use the first NSD for now
523 if nsd_path is None:
524 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
525 self.nsd = load_yaml(nsd_path)
526 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
527 # create dict to find the vnf_name for any vnf id
528 self.vnf_id2vnf_name = defaultdict(lambda: "NotExistingNode",
529 reduce(lambda x, y: dict(x, **y),
530 map(lambda d: {d["vnf_id"]: d["vnf_name"]},
531 self.nsd["network_functions"])))
532 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
533 else:
534 raise OnBoardingException(
535 "No 'package_content' section in package manifest:\n{}"
536 .format(self.manifest))
537
538 def _load_vnfd(self):
539 """
540 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
541 :return:
542 """
543
544 # first make a list of all the vnfds in the package
545 vnfd_set = dict()
546 if "package_content" in self.manifest:
547 for pc in self.manifest.get("package_content"):
548 if pc.get(
549 "content-type") == "application/vnd.5gtango.vnfd":
550 vnfd_path = os.path.join(
551 self.package_content_path,
552 make_relative_path(pc.get("source")))
553 vnfd = load_yaml(vnfd_path)
554 vnfd_set[vnfd.get("name")] = vnfd
555 if len(vnfd_set) < 1:
556 raise OnBoardingException("No VNFDs found.")
557 # then link each vnf_id in the nsd to its vnfd
558 for vnf_id in self.vnf_id2vnf_name:
559 vnf_name = self.vnf_id2vnf_name[vnf_id]
560 self.vnfds[vnf_id] = vnfd_set[vnf_name]
561 LOG.debug("Loaded VNFD: {0} id: {1}".format(vnf_name, vnf_id))
562
563 def _load_saps(self):
564 # create list of all SAPs
565 # check if we need to deploy management ports
566 if USE_DOCKER_MGMT:
567 SAPs = [p for p in self.nsd["connection_points"]
568 if 'management' not in p.get('type')]
569 else:
570 SAPs = [p for p in self.nsd["connection_points"]]
571
572 for sap in SAPs:
573 # endpoint needed in this service
574 sap_id, sap_interface, sap_docker_name = parse_interface(sap['id'])
575 # make sure SAP has type set (default internal)
576 sap["type"] = sap.get("type", 'internal')
577
578 # Each Service Access Point (connection_point) in the nsd is an IP
579 # address on the host
580 if sap["type"] == "external":
581 # add to vnfds to calculate placement later on
582 sap_net = SAP_SUBNETS.pop(0)
583 self.saps[sap_docker_name] = {
584 "name": sap_docker_name, "type": "external", "net": sap_net}
585 # add SAP vnf to list in the NSD so it is deployed later on
586 # each SAP gets a unique VNFD and vnf_id in the NSD and custom
587 # type (only defined in the dummygatekeeper)
588 self.nsd["network_functions"].append(
589 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_ext"})
590
591 # Each Service Access Point (connection_point) in the nsd is
592 # getting its own container (default)
593 elif sap["type"] == "internal" or sap["type"] == "management":
594 # add SAP to self.vnfds
595 if SAP_VNFD is None:
596 sapfile = pkg_resources.resource_filename(
597 __name__, "sap_vnfd.yml")
598 else:
599 sapfile = SAP_VNFD
600 sap_vnfd = load_yaml(sapfile)
601 sap_vnfd["connection_points"][0]["id"] = sap_interface
602 sap_vnfd["name"] = sap_docker_name
603 sap_vnfd["type"] = "internal"
604 # add to vnfds to calculate placement later on and deploy
605 self.saps[sap_docker_name] = sap_vnfd
606 # add SAP vnf to list in the NSD so it is deployed later on
607 # each SAP get a unique VNFD and vnf_id in the NSD
608 self.nsd["network_functions"].append(
609 {"vnf_id": sap_docker_name, "vnf_name": sap_docker_name, "vnf_type": "sap_int"})
610
611 LOG.debug("Loaded SAP: name: {0}, type: {1}".format(
612 sap_docker_name, sap['type']))
613
614 # create sap lists
615 self.saps_ext = [self.saps[sap]['name']
616 for sap in self.saps if self.saps[sap]["type"] == "external"]
617 self.saps_int = [self.saps[sap]['name']
618 for sap in self.saps if self.saps[sap]["type"] == "internal"]
619
620 def _start_sap(self, sap, instance_uuid):
621 if not DEPLOY_SAP:
622 return
623
624 LOG.info('start SAP: {0} ,type: {1}'.format(sap['name'], sap['type']))
625 if sap["type"] == "internal":
626 vnfi = None
627 if not GK_STANDALONE_MODE:
628 vnfi = self._start_vnfd(sap, sap['name'], type='sap_int')
629 self.instances[instance_uuid]["vnf_instances"].append(vnfi)
630
631 elif sap["type"] == "external":
632 target_dc = sap.get("dc")
633 # add interface to dc switch
634 target_dc.attachExternalSAP(sap['name'], sap['net'])
635
636 def _connect_elines(self, eline_fwd_links, instance_uuid):
637 """
638 Connect all E-LINE links in the NSD
639 Attention: This method DOES NOT support multi V/CDU VNFs!
640 :param eline_fwd_links: list of E-LINE links in the NSD
641 :param: instance_uuid of the service
642 :return:
643 """
644 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
645 # eg. different services get a unique cookie for their flowrules
646 cookie = 1
647 for link in eline_fwd_links:
648 LOG.info("Found E-Line: {}".format(link))
649 # check if we need to deploy this link when its a management link:
650 if USE_DOCKER_MGMT:
651 if self.check_mgmt_interface(
652 link["connection_points_reference"]):
653 continue
654
655 src_id, src_if_name, src_sap_id = parse_interface(
656 link["connection_points_reference"][0])
657 dst_id, dst_if_name, dst_sap_id = parse_interface(
658 link["connection_points_reference"][1])
659
660 setChaining = False
661 # check if there is a SAP in the link and chain everything together
662 if src_sap_id in self.saps and dst_sap_id in self.saps:
663 LOG.info(
664 '2 SAPs cannot be chained together : {0} - {1}'.format(src_sap_id, dst_sap_id))
665 continue
666
667 elif src_sap_id in self.saps_ext:
668 src_id = src_sap_id
669 # set intf name to None so the chaining function will choose
670 # the first one
671 src_if_name = None
672 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
673 if dst_vnfi is not None:
674 # choose first ip address in sap subnet
675 sap_net = self.saps[src_sap_id]['net']
676 sap_ip = "{0}/{1}".format(str(sap_net[2]),
677 sap_net.prefixlen)
678 self._vnf_reconfigure_network(
679 dst_vnfi, dst_if_name, sap_ip)
680 setChaining = True
681
682 elif dst_sap_id in self.saps_ext:
683 dst_id = dst_sap_id
684 # set intf name to None so the chaining function will choose
685 # the first one
686 dst_if_name = None
687 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
688 if src_vnfi is not None:
689 sap_net = self.saps[dst_sap_id]['net']
690 sap_ip = "{0}/{1}".format(str(sap_net[2]),
691 sap_net.prefixlen)
692 self._vnf_reconfigure_network(
693 src_vnfi, src_if_name, sap_ip)
694 setChaining = True
695
696 # Link between 2 VNFs
697 else:
698 LOG.info("Creating E-Line: src={}, dst={}"
699 .format(src_id, dst_id))
700 # make sure we use the correct sap vnf name
701 if src_sap_id in self.saps_int:
702 src_id = src_sap_id
703 if dst_sap_id in self.saps_int:
704 dst_id = dst_sap_id
705 # get involved vnfis
706 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
707 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
708
709 if src_vnfi is not None and dst_vnfi is not None:
710 setChaining = True
711 # re-configure the VNFs IP assignment and ensure that a new
712 # subnet is used for each E-Link
713 eline_net = ELINE_SUBNETS.pop(0)
714 ip1 = "{0}/{1}".format(str(eline_net[1]),
715 eline_net.prefixlen)
716 ip2 = "{0}/{1}".format(str(eline_net[2]),
717 eline_net.prefixlen)
718 # check if VNFs have fixed IPs (address field in VNFDs)
719 if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
720 .get("address") is None):
721 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
722 # check if VNFs have fixed IPs (address field in VNFDs)
723 if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
724 .get("address") is None):
725 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
726
727 # Set the chaining
728 if setChaining:
729 GK.net.setChain(
730 src_id, dst_id,
731 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
732 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
733
734 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
735 """
736 Gets the connection point data structure from the VNFD
737 of the given VNFI using ifname.
738 """
739 if vnfi.vnfd is None:
740 return {}
741 cps = vnfi.vnfd.get("connection_points")
742 for cp in cps:
743 if cp.get("id") == ifname:
744 return cp
745
746 def _connect_elans(self, elan_fwd_links, instance_uuid):
747 """
748 Connect all E-LAN/E-Tree links in the NSD
749 This method supports multi-V/CDU VNFs if the connection
750 point names of the DUs are the same as the ones in the NSD.
751 :param elan_fwd_links: list of E-LAN links in the NSD
752 :param: instance_uuid of the service
753 :return:
754 """
755 for link in elan_fwd_links:
756 # a new E-LAN/E-Tree
757 elan_vnf_list = []
758 lan_net = ELAN_SUBNETS.pop(0)
759 lan_hosts = list(lan_net.hosts())
760
761 # generate lan ip address for all interfaces (of all involved (V/CDUs))
762 for intf in link["connection_points_reference"]:
763 vnf_id, intf_name, _ = parse_interface(intf)
764 if vnf_id is None:
765 continue # skip references to NS connection points
766 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
767 if units is None:
768 continue # skip if no deployment unit is present
769 # iterate over all involved deployment units
770 for uvnfi in units:
771 # Attention: we apply a simplification for multi DU VNFs here:
772 # the connection points of all involved DUs have to have the same
773 # name as the connection points of the surrounding VNF to be mapped.
774 # This is because we do not consider links specified in the VNFds
775 container_name = uvnfi.name
776 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
777 lan_net.prefixlen)
778 LOG.debug(
779 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
780 container_name, intf_name, ip_address))
781 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
782 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
783 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
784 # necessary.
785 vnfi = self._get_vnf_instance(instance_uuid, container_name)
786 if vnfi is not None:
787 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
788 # add this vnf and interface to the E-LAN for tagging
789 elan_vnf_list.append(
790 {'name': container_name, 'interface': intf_name})
791 # install the VLAN tags for this E-LAN
792 GK.net.setLAN(elan_vnf_list)
793
794 def _load_docker_files(self):
795 """
796 Get all paths to Dockerfiles from VNFDs and store them in dict.
797 :return:
798 """
799 for vnf_id, v in self.vnfds.iteritems():
800 for vu in v.get("virtual_deployment_units", []):
801 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
802 if vu.get("vm_image_format") == "docker":
803 vm_image = vu.get("vm_image")
804 docker_path = os.path.join(
805 self.package_content_path,
806 make_relative_path(vm_image))
807 self.local_docker_files[vnf_container_name] = docker_path
808 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
809 for cu in v.get("cloudnative_deployment_units", []):
810 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
811 image = cu.get("image")
812 docker_path = os.path.join(
813 self.package_content_path,
814 make_relative_path(image))
815 self.local_docker_files[vnf_container_name] = docker_path
816 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
817
818 def _load_docker_urls(self):
819 """
820 Get all URLs to pre-build docker images in some repo.
821 :return:
822 """
823 for vnf_id, v in self.vnfds.iteritems():
824 for vu in v.get("virtual_deployment_units", []):
825 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
826 if vu.get("vm_image_format") == "docker":
827 url = vu.get("vm_image")
828 if url is not None:
829 url = url.replace("http://", "")
830 self.remote_docker_image_urls[vnf_container_name] = url
831 LOG.debug("Found Docker image URL (%r): %r" %
832 (vnf_container_name,
833 self.remote_docker_image_urls[vnf_container_name]))
834 for cu in v.get("cloudnative_deployment_units", []):
835 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
836 url = cu.get("image")
837 if url is not None:
838 url = url.replace("http://", "")
839 self.remote_docker_image_urls[vnf_container_name] = url
840 LOG.debug("Found Docker image URL (%r): %r" %
841 (vnf_container_name,
842 self.remote_docker_image_urls[vnf_container_name]))
843
844 def _build_images_from_dockerfiles(self):
845 """
846 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
847 """
848 if GK_STANDALONE_MODE:
849 return # do not build anything in standalone mode
850 dc = DockerClient()
851 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
852 self.local_docker_files))
853 for k, v in self.local_docker_files.iteritems():
854 for line in dc.build(path=v.replace(
855 "Dockerfile", ""), tag=k, rm=False, nocache=False):
856 LOG.debug("DOCKER BUILD: %s" % line)
857 LOG.info("Docker image created: %s" % k)
858
859 def _pull_predefined_dockerimages(self):
860 """
861 If the package contains URLs to pre-build Docker images, we download them with this method.
862 """
863 dc = DockerClient()
864 for url in self.remote_docker_image_urls.itervalues():
865 # only pull if not present (speedup for development)
866 if not FORCE_PULL:
867 if len(dc.images.list(name=url)) > 0:
868 LOG.debug("Image %r present. Skipping pull." % url)
869 continue
870 LOG.info("Pulling image: %r" % url)
871 # this seems to fail with latest docker api version 2.0.2
872 # dc.images.pull(url,
873 # insecure_registry=True)
874 # using docker cli instead
875 cmd = ["docker",
876 "pull",
877 url,
878 ]
879 Popen(cmd).wait()
880
881 def _check_docker_image_exists(self, image_name):
882 """
883 Query the docker service and check if the given image exists
884 :param image_name: name of the docker image
885 :return:
886 """
887 return len(DockerClient().images.list(name=image_name)) > 0
888
889 def _calculate_placement(self, algorithm):
890 """
891 Do placement by adding the a field "dc" to
892 each VNFD that points to one of our
893 data center objects known to the gatekeeper.
894 """
895 assert(len(self.vnfds) > 0)
896 assert(len(GK.dcs) > 0)
897 # instantiate algorithm an place
898 p = algorithm()
899 p.place(self.nsd, self.vnfds, self.saps, GK.dcs)
900 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
901 # lets print the placement result
902 for name, vnfd in self.vnfds.iteritems():
903 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
904 for sap in self.saps:
905 sap_dict = self.saps[sap]
906 LOG.info("Placed SAP %r on DC %r" % (sap, str(sap_dict.get("dc"))))
907
908 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
909 """
910 Calculate cpu period and quota for CFS
911 :param cpu_time_percentage: percentage of overall CPU to be used
912 :return: cpu_period, cpu_quota
913 """
914 if cpu_time_percentage is None:
915 return -1, -1
916 if cpu_time_percentage < 0:
917 return -1, -1
918 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
919 # Attention minimum cpu_quota is 1ms (micro)
920 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
921 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
922 (cpu_period, cpu_time_percentage))
923 # calculate the fraction of cpu time for this container
924 cpu_quota = cpu_period * cpu_time_percentage
925 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
926 # idea why
927 if cpu_quota < 1000:
928 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
929 cpu_quota = 1000
930 LOG.warning("Increased CPU quota to avoid system error.")
931 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
932 (cpu_period, cpu_quota))
933 return int(cpu_period), int(cpu_quota)
934
935 def check_ext_saps(self, intf_list):
936 # check if the list of interfacs contains an external SAP
937 saps_ext = [self.saps[sap]['name']
938 for sap in self.saps if self.saps[sap]["type"] == "external"]
939 for intf_name in intf_list:
940 vnf_id, vnf_interface, vnf_sap_docker_name = parse_interface(
941 intf_name)
942 if vnf_sap_docker_name in saps_ext:
943 return vnf_sap_docker_name
944
945 def check_mgmt_interface(self, intf_list):
946 SAPs_mgmt = [p.get('id') for p in self.nsd["connection_points"]
947 if 'management' in p.get('type')]
948 for intf_name in intf_list:
949 if intf_name in SAPs_mgmt:
950 return True
951
952
953 """
954 Some (simple) placement algorithms
955 """
956
957
958 class FirstDcPlacement(object):
959 """
960 Placement: Always use one and the same data center from the GK.dcs dict.
961 """
962
963 def place(self, nsd, vnfds, saps, dcs):
964 for id, vnfd in vnfds.iteritems():
965 vnfd["dc"] = list(dcs.itervalues())[0]
966
967
968 class RoundRobinDcPlacement(object):
969 """
970 Placement: Distribute VNFs across all available DCs in a round robin fashion.
971 """
972
973 def place(self, nsd, vnfds, saps, dcs):
974 c = 0
975 dcs_list = list(dcs.itervalues())
976 for id, vnfd in vnfds.iteritems():
977 vnfd["dc"] = dcs_list[c % len(dcs_list)]
978 c += 1 # inc. c to use next DC
979
980
981 """
982 Resource definitions and API endpoints
983 """
984
985
986 class Packages(fr.Resource):
987
988 def post(self):
989 """
990 Upload a *.son service package to the dummy gatekeeper.
991
992 We expect request with a *.son file and store it in UPLOAD_FOLDER
993 :return: UUID
994 """
995 try:
996 # get file contents
997 LOG.info("POST /packages called")
998 # lets search for the package in the request
999 is_file_object = False # make API more robust: file can be in data or in files field
1000 if "package" in request.files:
1001 son_file = request.files["package"]
1002 is_file_object = True
1003 elif len(request.data) > 0:
1004 son_file = request.data
1005 else:
1006 return {"service_uuid": None, "size": 0, "sha1": None,
1007 "error": "upload failed. file not found."}, 500
1008 # generate a uuid to reference this package
1009 service_uuid = str(uuid.uuid4())
1010 file_hash = hashlib.sha1(str(son_file)).hexdigest()
1011 # ensure that upload folder exists
1012 ensure_dir(UPLOAD_FOLDER)
1013 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
1014 # store *.son file to disk
1015 if is_file_object:
1016 son_file.save(upload_path)
1017 else:
1018 with open(upload_path, 'wb') as f:
1019 f.write(son_file)
1020 size = os.path.getsize(upload_path)
1021
1022 # first stop and delete any other running services
1023 if AUTO_DELETE:
1024 service_list = copy.copy(GK.services)
1025 for service_uuid in service_list:
1026 instances_list = copy.copy(
1027 GK.services[service_uuid].instances)
1028 for instance_uuid in instances_list:
1029 # valid service and instance UUID, stop service
1030 GK.services.get(service_uuid).stop_service(
1031 instance_uuid)
1032 LOG.info("service instance with uuid %r stopped." %
1033 instance_uuid)
1034
1035 # create a service object and register it
1036 s = Service(service_uuid, file_hash, upload_path)
1037 GK.register_service_package(service_uuid, s)
1038
1039 # automatically deploy the service
1040 if AUTO_DEPLOY:
1041 # ok, we have a service uuid, lets start the service
1042 reset_subnets()
1043 GK.services.get(service_uuid).start_service()
1044
1045 # generate the JSON result
1046 return {"service_uuid": service_uuid, "size": size,
1047 "sha1": file_hash, "error": None}, 201
1048 except BaseException:
1049 LOG.exception("Service package upload failed:")
1050 return {"service_uuid": None, "size": 0,
1051 "sha1": None, "error": "upload failed"}, 500
1052
1053 def get(self):
1054 """
1055 Return a list of UUID's of uploaded service packages.
1056 :return: dict/list
1057 """
1058 LOG.info("GET /packages")
1059 return {"service_uuid_list": list(GK.services.iterkeys())}
1060
1061
1062 class Instantiations(fr.Resource):
1063
1064 def post(self):
1065 """
1066 Instantiate a service specified by its UUID.
1067 Will return a new UUID to identify the running service instance.
1068 :return: UUID
1069 """
1070 LOG.info("POST /instantiations (or /requests) called")
1071 # try to extract the service uuid from the request
1072 json_data = request.get_json(force=True)
1073 service_uuid = json_data.get("service_uuid")
1074
1075 # lets be a bit fuzzy here to make testing easier
1076 if (service_uuid is None or service_uuid ==
1077 "latest") and len(GK.services) > 0:
1078 # if we don't get a service uuid, we simple start the first service
1079 # in the list
1080 service_uuid = list(GK.services.iterkeys())[0]
1081 if service_uuid in GK.services:
1082 # ok, we have a service uuid, lets start the service
1083 service_instance_uuid = GK.services.get(
1084 service_uuid).start_service()
1085 return {"service_instance_uuid": service_instance_uuid}, 201
1086 return "Service not found", 404
1087
1088 def get(self):
1089 """
1090 Returns a list of UUIDs containing all running services.
1091 :return: dict / list
1092 """
1093 LOG.info("GET /instantiations")
1094 return {"service_instantiations_list": [
1095 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
1096
1097 def delete(self):
1098 """
1099 Stops a running service specified by its service and instance UUID.
1100 """
1101 # try to extract the service and instance UUID from the request
1102 json_data = request.get_json(force=True)
1103 service_uuid = json_data.get("service_uuid")
1104 instance_uuid = json_data.get("service_instance_uuid")
1105
1106 # try to be fuzzy
1107 if service_uuid is None and len(GK.services) > 0:
1108 # if we don't get a service uuid, we simply stop the last service
1109 # in the list
1110 service_uuid = list(GK.services.iterkeys())[0]
1111 if instance_uuid is None and len(
1112 GK.services[service_uuid].instances) > 0:
1113 instance_uuid = list(
1114 GK.services[service_uuid].instances.iterkeys())[0]
1115
1116 if service_uuid in GK.services and instance_uuid in GK.services[service_uuid].instances:
1117 # valid service and instance UUID, stop service
1118 GK.services.get(service_uuid).stop_service(instance_uuid)
1119 return "service instance with uuid %r stopped." % instance_uuid, 200
1120 return "Service not found", 404
1121
1122
1123 class Exit(fr.Resource):
1124
1125 def put(self):
1126 """
1127 Stop the running Containernet instance regardless of data transmitted
1128 """
1129 list(GK.dcs.values())[0].net.stop()
1130
1131
1132 def initialize_GK():
1133 global GK
1134 GK = Gatekeeper()
1135
1136
1137 # create a single, global GK object
1138 GK = None
1139 initialize_GK()
1140 # setup Flask
1141 app = Flask(__name__)
1142 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1143 api = fr.Api(app)
1144 # define endpoints
1145 api.add_resource(Packages, '/packages', '/api/v2/packages')
1146 api.add_resource(Instantiations, '/instantiations',
1147 '/api/v2/instantiations', '/api/v2/requests')
1148 api.add_resource(Exit, '/emulator/exit')
1149
1150
1151 def start_rest_api(host, port, datacenters=dict()):
1152 GK.dcs = datacenters
1153 GK.net = get_dc_network()
1154 # start the Flask server (not the best performance but ok for our use case)
1155 app.run(host=host,
1156 port=port,
1157 debug=True,
1158 use_reloader=False # this is needed to run Flask in a non-main thread
1159 )
1160
1161
1162 def ensure_dir(name):
1163 if not os.path.exists(name):
1164 os.makedirs(name)
1165
1166
1167 def load_yaml(path):
1168 with open(path, "r") as f:
1169 try:
1170 r = yaml.load(f)
1171 except yaml.YAMLError as exc:
1172 LOG.exception("YAML parse error: %r" % str(exc))
1173 r = dict()
1174 return r
1175
1176
1177 def make_relative_path(path):
1178 if path.startswith("file://"):
1179 path = path.replace("file://", "", 1)
1180 if path.startswith("/"):
1181 path = path.replace("/", "", 1)
1182 return path
1183
1184
1185 def get_dc_network():
1186 """
1187 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1188 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1189 :return:
1190 """
1191 assert (len(GK.dcs) > 0)
1192 return GK.dcs.values()[0].net
1193
1194
1195 def parse_interface(interface_name):
1196 """
1197 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1198 :param interface_name:
1199 :return:
1200 """
1201 if ':' in interface_name:
1202 vnf_id, vnf_interface = interface_name.split(':')
1203 else:
1204 vnf_id = None
1205 vnf_interface = interface_name
1206 return vnf_id, vnf_interface, None
1207
1208
1209 def reset_subnets():
1210 # private subnet definitions for the generated interfaces
1211 # 10.10.xxx.0/24
1212 global SAP_SUBNETS
1213 SAP_SUBNETS = generate_subnets('10.10', 0, subnet_size=50, mask=30)
1214 # 10.20.xxx.0/30
1215 global ELAN_SUBNETS
1216 ELAN_SUBNETS = generate_subnets('10.20', 0, subnet_size=50, mask=24)
1217 # 10.30.xxx.0/30
1218 global ELINE_SUBNETS
1219 ELINE_SUBNETS = generate_subnets('10.30', 0, subnet_size=50, mask=30)
1220
1221
1222 def get_container_name(vnf_id, vdu_id):
1223 return "{}.{}".format(vnf_id, vdu_id)
1224
1225
1226 if __name__ == '__main__':
1227 """
1228 Lets allow to run the API in standalone mode.
1229 """
1230 GK_STANDALONE_MODE = True
1231 logging.getLogger("werkzeug").setLevel(logging.INFO)
1232 start_rest_api("0.0.0.0", 8000)