5GTANGO LLCM: Preliminary fix of the E-LINE configruation.
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from subprocess import Popen
43 import ipaddress
44 import copy
45 import time
46
47
48 LOG = logging.getLogger("5gtango.llcm")
49 LOG.setLevel(logging.INFO)
50
51
52 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
53 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
54 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
55
56 # Enable Dockerfile build functionality
57 BUILD_DOCKERFILE = False
58
59 # flag to indicate that we run without the emulator (only the bare API for
60 # integration testing)
61 GK_STANDALONE_MODE = False
62
63 # should a new version of an image be pulled even if its available
64 FORCE_PULL = False
65
66 # flag to indicate if we use bidirectional forwarding rules in the
67 # automatic chaining process
68 BIDIRECTIONAL_CHAIN = True
69
70 # override the management interfaces in the descriptors with default
71 # docker0 interfaces in the containers
72 USE_DOCKER_MGMT = False
73
74 # automatically deploy uploaded packages (no need to execute son-access
75 # deploy --latest separately)
76 AUTO_DEPLOY = False
77
78 # and also automatically terminate any other running services
79 AUTO_DELETE = False
80
81 # global subnet definitions (see reset_subnets())
82 ELAN_SUBNETS = None
83 ELINE_SUBNETS = None
84
85 # Time in seconds to wait for vnf stop scripts to execute fully
86 VNF_STOP_WAIT_TIME = 5
87
88
89 class OnBoardingException(BaseException):
90 pass
91
92
93 class Gatekeeper(object):
94
95 def __init__(self):
96 self.services = dict()
97 self.dcs = dict()
98 self.net = None
99 # used to generate short names for VNFs (Mininet limitation)
100 self.vnf_counter = 0
101 reset_subnets()
102 LOG.info("Initialized 5GTANGO LLCM module.")
103
104 def register_service_package(self, service_uuid, service):
105 """
106 register new service package
107 :param service_uuid
108 :param service object
109 """
110 self.services[service_uuid] = service
111 # lets perform all steps needed to onboard the service
112 service.onboard()
113
114
115 class Service(object):
116 """
117 This class represents a NS uploaded as a *.son package to the
118 dummy gatekeeper.
119 Can have multiple running instances of this service.
120 """
121
122 def __init__(self,
123 service_uuid,
124 package_file_hash,
125 package_file_path):
126 self.uuid = service_uuid
127 self.package_file_hash = package_file_hash
128 self.package_file_path = package_file_path
129 self.package_content_path = os.path.join(
130 CATALOG_FOLDER, "services/%s" % self.uuid)
131 self.manifest = None
132 self.nsd = None
133 self.vnfds = dict()
134 self.local_docker_files = dict()
135 self.remote_docker_image_urls = dict()
136 self.instances = dict()
137
138 def onboard(self):
139 """
140 Do all steps to prepare this service to be instantiated
141 :return:
142 """
143 # 1. extract the contents of the package and store them in our catalog
144 self._unpack_service_package()
145 # 2. read in all descriptor files
146 self._load_package_descriptor()
147 self._load_nsd()
148 self._load_vnfd()
149 if self.nsd is None:
150 raise OnBoardingException("No NSD found.")
151 if len(self.vnfds) < 1:
152 raise OnBoardingException("No VNFDs found.")
153 # 3. prepare container images (e.g. download or build Dockerfile)
154 if BUILD_DOCKERFILE:
155 self._load_docker_files()
156 self._build_images_from_dockerfiles()
157 else:
158 self._load_docker_urls()
159 self._pull_predefined_dockerimages()
160 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
161
162 def start_service(self):
163 """
164 This methods creates and starts a new service instance.
165 It computes placements, iterates over all VNFDs, and starts
166 each VNFD as a Docker container in the data center selected
167 by the placement algorithm.
168 :return:
169 """
170 LOG.info("Starting service %r" % self.uuid)
171
172 # 1. each service instance gets a new uuid to identify it
173 instance_uuid = str(uuid.uuid4())
174 # build a instances dict (a bit like a NSR :))
175 self.instances[instance_uuid] = dict()
176 self.instances[instance_uuid]["vnf_instances"] = list()
177
178 # 2. compute placement of this service instance (adds DC names to
179 # VNFDs)
180 # self._calculate_placement(FirstDcPlacement)
181 self._calculate_placement(RoundRobinDcPlacement)
182 # 3. start all vnfds that we have in the service
183 for vnf_id in self.vnfds:
184 vnfd = self.vnfds[vnf_id]
185 # attention: returns a list of started deployment units
186 vnfis = self._start_vnfd(vnfd, vnf_id)
187 # add list of VNFIs to total VNFI list
188 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
189
190 # 4. Deploy E-Line, E-Tree and E-LAN links
191 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
192 # even if "forwarding_graphs" are not used directly.
193 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
194 vlinks = self.nsd["virtual_links"]
195 # constituent virtual links are not checked
196 eline_fwd_links = [l for l in vlinks if (
197 l["connectivity_type"] == "E-Line")]
198 elan_fwd_links = [l for l in vlinks if (
199 l["connectivity_type"] == "E-LAN" or
200 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
201
202 # 5a. deploy E-Line links
203 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
204 self._connect_elines(eline_fwd_links, instance_uuid)
205 # 5b. deploy E-Tree/E-LAN links
206 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
207 self._connect_elans(elan_fwd_links, instance_uuid)
208
209 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
210 # service instance
211 self._trigger_emulator_start_scripts_in_vnfis(
212 self.instances[instance_uuid]["vnf_instances"])
213 # done
214 LOG.info("Service started. Instance id: %r" % instance_uuid)
215 return instance_uuid
216
217 def stop_service(self, instance_uuid):
218 """
219 This method stops a running service instance.
220 It iterates over all VNF instances, stopping them each
221 and removing them from their data center.
222 :param instance_uuid: the uuid of the service instance to be stopped
223 """
224 LOG.info("Stopping service %r" % self.uuid)
225 # get relevant information
226 # instance_uuid = str(self.uuid.uuid4())
227 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
228 # trigger stop skripts in vnf instances and wait a few seconds for
229 # completion
230 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
231 time.sleep(VNF_STOP_WAIT_TIME)
232 # stop all vnfs
233 for v in vnf_instances:
234 self._stop_vnfi(v)
235 # last step: remove the instance from the list of all instances
236 del self.instances[instance_uuid]
237
238 def _get_resource_limits(self, deployment_unit):
239 """
240 Extract resource limits from deployment units.
241 """
242 # defaults
243 cpu_list = None
244 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
245 mem_limit = None
246 # update from descriptor
247 if "resource_requirements" in deployment_unit:
248 res_req = deployment_unit.get("resource_requirements")
249 cpu_list = res_req.get("cpu").get("cpuset")
250 if cpu_list is None:
251 cpu_list = res_req.get("cpu").get("vcpus")
252 if cpu_list is not None:
253 # attention: docker expects list as string w/o spaces:
254 cpu_list = str(cpu_list).replace(" ", "").strip()
255 cpu_bw = res_req.get("cpu").get("cpu_bw")
256 if cpu_bw is None:
257 cpu_bw = 1.0
258 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
259 mem_limit = res_req.get("memory").get("size")
260 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
261 if mem_limit is not None:
262 mem_limit = int(mem_limit)
263 # to bytes
264 if "G" in mem_unit:
265 mem_limit = mem_limit * 1024 * 1024 * 1024
266 elif "M" in mem_unit:
267 mem_limit = mem_limit * 1024 * 1024
268 elif "K" in mem_unit:
269 mem_limit = mem_limit * 1024
270 return cpu_list, cpu_period, cpu_quota, mem_limit
271
272 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
273 """
274 Start a single VNFD of this service
275 :param vnfd: vnfd descriptor dict
276 :param vnf_id: unique id of this vnf in the nsd
277 :return:
278 """
279 vnfis = list()
280 # the vnf_name refers to the container image to be deployed
281 vnf_name = vnfd.get("name")
282 # combine VDUs and CDUs
283 deployment_units = (vnfd.get("virtual_deployment_units", []) +
284 vnfd.get("cloudnative_deployment_units", []))
285 # iterate over all deployment units within each VNFDs
286 for u in deployment_units:
287 # 0. vnf_container_name = vnf_id.vdu_id
288 vnf_container_name = get_container_name(vnf_id, u.get("id"))
289 # 1. get the name of the docker image to star
290 if vnf_container_name not in self.remote_docker_image_urls:
291 raise Exception("No image name for %r found. Abort." % vnf_container_name)
292 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
293 # 2. select datacenter to start the VNF in
294 target_dc = vnfd.get("dc")
295 # 3. perform some checks to ensure we can start the container
296 assert(docker_image_name is not None)
297 assert(target_dc is not None)
298 if not self._check_docker_image_exists(docker_image_name):
299 raise Exception("Docker image {} not found. Abort."
300 .format(docker_image_name))
301
302 # 4. get the resource limits
303 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
304
305 # get connection points defined for the DU
306 intfs = u.get("connection_points", [])
307 # do some re-naming of fields to be compatible to containernet
308 for i in intfs:
309 if i.get("address"):
310 i["ip"] = i.get("address")
311
312 # 5. collect additional information to start container
313 volumes = list()
314 cenv = dict()
315 # 5.1 inject descriptor based start/stop commands into env (overwrite)
316 VNFD_CMD_START = u.get("vm_cmd_start")
317 VNFD_CMD_STOP = u.get("vm_cmd_stop")
318 if VNFD_CMD_START and not VNFD_CMD_START == "None":
319 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
320 " Overwriting SON_EMU_CMD.")
321 cenv["SON_EMU_CMD"] = VNFD_CMD_START
322 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
323 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
324 " Overwriting SON_EMU_CMD_STOP.")
325 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
326
327 # 6. Start the container
328 LOG.info("Starting %r as %r in DC %r" %
329 (vnf_name, vnf_container_name, vnfd.get("dc")))
330 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
331 # start the container
332 vnfi = target_dc.startCompute(
333 vnf_container_name,
334 network=intfs,
335 image=docker_image_name,
336 cpu_quota=cpu_quota,
337 cpu_period=cpu_period,
338 cpuset_cpus=cpu_list,
339 mem_limit=mem_limit,
340 volumes=volumes,
341 properties=cenv, # environment
342 type=kwargs.get('type', 'docker'))
343 # add vnfd reference to vnfi
344 vnfi.vnfd = vnfd
345 # add container name
346 vnfi.vnf_container_name = vnf_container_name
347 # store vnfi
348 vnfis.append(vnfi)
349 return vnfis
350
351 def _stop_vnfi(self, vnfi):
352 """
353 Stop a VNF instance.
354 :param vnfi: vnf instance to be stopped
355 """
356 # Find the correct datacenter
357 status = vnfi.getStatus()
358 dc = vnfi.datacenter
359 # stop the vnfi
360 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
361 (status["name"], dc))
362 dc.stopCompute(status["name"])
363
364 def _get_vnf_instance(self, instance_uuid, vnf_id):
365 """
366 Returns VNFI object for a given "vnf_id" or "vnf_container_namse" taken from an NSD.
367 :return: single object
368 """
369 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
370 if str(vnfi.name) == str(vnf_id):
371 return vnfi
372 LOG.warning("No container with name: {0} found.".format(vnf_id))
373 return None
374
375 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
376 """
377 Returns a list of VNFI objects (all deployment units) for a given
378 "vnf_id" taken from an NSD.
379 :return: list
380 """
381 if vnf_id is None:
382 return None
383 r = list()
384 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
385 if vnf_id in vnfi.name:
386 r.append(vnfi)
387 if len(r) > 0:
388 LOG.debug("Found units: {} for vnf_id: {}"
389 .format([i.name for i in r], vnf_id))
390 return r
391 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
392 return None
393
394 @staticmethod
395 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
396 """
397 Reconfigure the network configuration of a specific interface
398 of a running container.
399 :param vnfi: container instance
400 :param if_name: interface name
401 :param net_str: network configuration string, e.g., 1.2.3.4/24
402 :return:
403 """
404 # assign new ip address
405 if net_str is not None:
406 intf = vnfi.intf(intf=if_name)
407 if intf is not None:
408 intf.setIP(net_str)
409 LOG.debug("Reconfigured network of %s:%s to %r" %
410 (vnfi.name, if_name, net_str))
411 else:
412 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
413 vnfi.name, if_name))
414
415 if new_name is not None:
416 vnfi.cmd('ip link set', if_name, 'down')
417 vnfi.cmd('ip link set', if_name, 'name', new_name)
418 vnfi.cmd('ip link set', new_name, 'up')
419 LOG.debug("Reconfigured interface name of %s:%s to %s" %
420 (vnfi.name, if_name, new_name))
421
422 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
423 for vnfi in vnfi_list:
424 config = vnfi.dcinfo.get("Config", dict())
425 env = config.get("Env", list())
426 for env_var in env:
427 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
428 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
429 LOG.info("Executing script in '{}': {}={}"
430 .format(vnfi.name, var, cmd))
431 # execute command in new thread to ensure that GK is not
432 # blocked by VNF
433 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
434 t.daemon = True
435 t.start()
436 break # only execute one command
437
438 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
439 for vnfi in vnfi_list:
440 config = vnfi.dcinfo.get("Config", dict())
441 env = config.get("Env", list())
442 for env_var in env:
443 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
444 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
445 LOG.info("Executing script in '{}': {}={}"
446 .format(vnfi.name, var, cmd))
447 # execute command in new thread to ensure that GK is not
448 # blocked by VNF
449 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
450 t.daemon = True
451 t.start()
452 break # only execute one command
453
454 def _unpack_service_package(self):
455 """
456 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
457 """
458 LOG.info("Unzipping: %r" % self.package_file_path)
459 with zipfile.ZipFile(self.package_file_path, "r") as z:
460 z.extractall(self.package_content_path)
461
462 def _load_package_descriptor(self):
463 """
464 Load the main package descriptor YAML and keep it as dict.
465 :return:
466 """
467 self.manifest = load_yaml(
468 os.path.join(
469 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
470
471 def _load_nsd(self):
472 """
473 Load the entry NSD YAML and keep it as dict.
474 :return:
475 """
476 if "package_content" in self.manifest:
477 nsd_path = None
478 for f in self.manifest.get("package_content"):
479 if f.get("content-type") == "application/vnd.5gtango.nsd":
480 nsd_path = os.path.join(
481 self.package_content_path,
482 make_relative_path(f.get("source")))
483 break # always use the first NSD for now
484 if nsd_path is None:
485 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
486 self.nsd = load_yaml(nsd_path)
487 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
488 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
489 else:
490 raise OnBoardingException(
491 "No 'package_content' section in package manifest:\n{}"
492 .format(self.manifest))
493
494 def _load_vnfd(self):
495 """
496 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
497 :return:
498 """
499 # first make a list of all the vnfds in the package
500 vnfd_set = dict()
501 if "package_content" in self.manifest:
502 for pc in self.manifest.get("package_content"):
503 if pc.get(
504 "content-type") == "application/vnd.5gtango.vnfd":
505 vnfd_path = os.path.join(
506 self.package_content_path,
507 make_relative_path(pc.get("source")))
508 vnfd = load_yaml(vnfd_path)
509 vnfd_set[vnfd.get("name")] = vnfd
510 if len(vnfd_set) < 1:
511 raise OnBoardingException("No VNFDs found.")
512 # then link each vnf_id in the nsd to its vnfd
513 for v in self.nsd.get("network_functions"):
514 if v.get("vnf_name") in vnfd_set:
515 self.vnfds[v.get("vnf_id")] = vnfd_set[v.get("vnf_name")]
516 LOG.debug("Loaded VNFD: {0} id: {1}"
517 .format(v.get("vnf_name"), v.get("vnf_id")))
518
519 def _connect_elines(self, eline_fwd_links, instance_uuid):
520 """
521 Connect all E-LINE links in the NSD
522 Attention: This method DOES NOT support multi V/CDU VNFs!
523 :param eline_fwd_links: list of E-LINE links in the NSD
524 :param: instance_uuid of the service
525 :return:
526 """
527 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
528 # eg. different services get a unique cookie for their flowrules
529 cookie = 1
530 for link in eline_fwd_links:
531 LOG.info("Found E-Line: {}".format(link))
532 src_id, src_if_name = parse_interface(
533 link["connection_points_reference"][0])
534 dst_id, dst_if_name = parse_interface(
535 link["connection_points_reference"][1])
536 LOG.info("Searching C/VDU for E-Line: src={}, src_if={}, dst={}, dst_if={}"
537 .format(src_id, src_if_name, dst_id, dst_if_name))
538 # handle C/VDUs (ugly hack, only one V/CDU per VNF for now)
539 src_units = self._get_vnf_instance_units(instance_uuid, src_id)
540 dst_units = self._get_vnf_instance_units(instance_uuid, dst_id)
541 if src_units is None or dst_units is None:
542 LOG.info("No VNF-VNF link. Skipping: src={}, src_if={}, dst={}, dst_if={}"
543 .format(src_id, src_if_name, dst_id, dst_if_name))
544 return
545 # we only support VNFs with one V/CDU right now
546 if len(src_units) != 1 or len(dst_units) != 1:
547 raise BaseException("LLCM does not support E-LINES for multi V/CDU VNFs.")
548 # get the full name from that C/VDU and use it as src_id and dst_id
549 src_id = src_units[0].name
550 dst_id = dst_units[0].name
551 # from here we have all info we need
552 LOG.info("Creating E-Line for C/VDU: src={}, src_if={}, dst={}, dst_if={}"
553 .format(src_id, src_if_name, dst_id, dst_if_name))
554 # get involved vnfis
555 src_vnfi = src_units[0]
556 dst_vnfi = dst_units[0]
557 # proceed with chaining setup
558 setChaining = False
559 if src_vnfi is not None and dst_vnfi is not None:
560 setChaining = True
561 # re-configure the VNFs IP assignment and ensure that a new
562 # subnet is used for each E-Link
563 eline_net = ELINE_SUBNETS.pop(0)
564 ip1 = "{0}/{1}".format(str(eline_net[1]),
565 eline_net.prefixlen)
566 ip2 = "{0}/{1}".format(str(eline_net[2]),
567 eline_net.prefixlen)
568 # check if VNFs have fixed IPs (address field in VNFDs)
569 if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
570 .get("address") is None):
571 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
572 # check if VNFs have fixed IPs (address field in VNFDs)
573 if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
574 .get("address") is None):
575 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
576 # set the chaining
577 if setChaining:
578 GK.net.setChain(
579 src_id, dst_id,
580 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
581 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
582
583 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
584 """
585 Gets the connection point data structure from the VNFD
586 of the given VNFI using ifname.
587 """
588 if vnfi.vnfd is None:
589 return {}
590 cps = vnfi.vnfd.get("connection_points")
591 for cp in cps:
592 if cp.get("id") == ifname:
593 return cp
594
595 def _connect_elans(self, elan_fwd_links, instance_uuid):
596 """
597 Connect all E-LAN/E-Tree links in the NSD
598 This method supports multi-V/CDU VNFs if the connection
599 point names of the DUs are the same as the ones in the NSD.
600 :param elan_fwd_links: list of E-LAN links in the NSD
601 :param: instance_uuid of the service
602 :return:
603 """
604 for link in elan_fwd_links:
605 # a new E-LAN/E-Tree
606 elan_vnf_list = []
607 lan_net = ELAN_SUBNETS.pop(0)
608 lan_hosts = list(lan_net.hosts())
609
610 # generate lan ip address for all interfaces (of all involved (V/CDUs))
611 for intf in link["connection_points_reference"]:
612 vnf_id, intf_name = parse_interface(intf)
613 if vnf_id is None:
614 continue # skip references to NS connection points
615 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
616 if units is None:
617 continue # skip if no deployment unit is present
618 # iterate over all involved deployment units
619 for uvnfi in units:
620 # Attention: we apply a simplification for multi DU VNFs here:
621 # the connection points of all involved DUs have to have the same
622 # name as the connection points of the surrounding VNF to be mapped.
623 # This is because we do not consider links specified in the VNFds
624 container_name = uvnfi.name
625 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
626 lan_net.prefixlen)
627 LOG.debug(
628 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
629 container_name, intf_name, ip_address))
630 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
631 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
632 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
633 # necessary.
634 vnfi = self._get_vnf_instance(instance_uuid, container_name)
635 if vnfi is not None:
636 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
637 # add this vnf and interface to the E-LAN for tagging
638 elan_vnf_list.append(
639 {'name': container_name, 'interface': intf_name})
640 # install the VLAN tags for this E-LAN
641 GK.net.setLAN(elan_vnf_list)
642
643 def _load_docker_files(self):
644 """
645 Get all paths to Dockerfiles from VNFDs and store them in dict.
646 :return:
647 """
648 for vnf_id, v in self.vnfds.iteritems():
649 for vu in v.get("virtual_deployment_units", []):
650 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
651 if vu.get("vm_image_format") == "docker":
652 vm_image = vu.get("vm_image")
653 docker_path = os.path.join(
654 self.package_content_path,
655 make_relative_path(vm_image))
656 self.local_docker_files[vnf_container_name] = docker_path
657 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
658 for cu in v.get("cloudnative_deployment_units", []):
659 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
660 image = cu.get("image")
661 docker_path = os.path.join(
662 self.package_content_path,
663 make_relative_path(image))
664 self.local_docker_files[vnf_container_name] = docker_path
665 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
666
667 def _load_docker_urls(self):
668 """
669 Get all URLs to pre-build docker images in some repo.
670 :return:
671 """
672 for vnf_id, v in self.vnfds.iteritems():
673 for vu in v.get("virtual_deployment_units", []):
674 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
675 if vu.get("vm_image_format") == "docker":
676 url = vu.get("vm_image")
677 if url is not None:
678 url = url.replace("http://", "")
679 self.remote_docker_image_urls[vnf_container_name] = url
680 LOG.debug("Found Docker image URL (%r): %r" %
681 (vnf_container_name,
682 self.remote_docker_image_urls[vnf_container_name]))
683 for cu in v.get("cloudnative_deployment_units", []):
684 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
685 url = cu.get("image")
686 if url is not None:
687 url = url.replace("http://", "")
688 self.remote_docker_image_urls[vnf_container_name] = url
689 LOG.debug("Found Docker image URL (%r): %r" %
690 (vnf_container_name,
691 self.remote_docker_image_urls[vnf_container_name]))
692
693 def _build_images_from_dockerfiles(self):
694 """
695 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
696 """
697 if GK_STANDALONE_MODE:
698 return # do not build anything in standalone mode
699 dc = DockerClient()
700 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
701 self.local_docker_files))
702 for k, v in self.local_docker_files.iteritems():
703 for line in dc.build(path=v.replace(
704 "Dockerfile", ""), tag=k, rm=False, nocache=False):
705 LOG.debug("DOCKER BUILD: %s" % line)
706 LOG.info("Docker image created: %s" % k)
707
708 def _pull_predefined_dockerimages(self):
709 """
710 If the package contains URLs to pre-build Docker images, we download them with this method.
711 """
712 dc = DockerClient()
713 for url in self.remote_docker_image_urls.itervalues():
714 # only pull if not present (speedup for development)
715 if not FORCE_PULL:
716 if len(dc.images.list(name=url)) > 0:
717 LOG.debug("Image %r present. Skipping pull." % url)
718 continue
719 LOG.info("Pulling image: %r" % url)
720 # this seems to fail with latest docker api version 2.0.2
721 # dc.images.pull(url,
722 # insecure_registry=True)
723 # using docker cli instead
724 cmd = ["docker",
725 "pull",
726 url,
727 ]
728 Popen(cmd).wait()
729
730 def _check_docker_image_exists(self, image_name):
731 """
732 Query the docker service and check if the given image exists
733 :param image_name: name of the docker image
734 :return:
735 """
736 return len(DockerClient().images.list(name=image_name)) > 0
737
738 def _calculate_placement(self, algorithm):
739 """
740 Do placement by adding the a field "dc" to
741 each VNFD that points to one of our
742 data center objects known to the gatekeeper.
743 """
744 assert(len(self.vnfds) > 0)
745 assert(len(GK.dcs) > 0)
746 # instantiate algorithm an place
747 p = algorithm()
748 p.place(self.nsd, self.vnfds, GK.dcs)
749 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
750 # lets print the placement result
751 for name, vnfd in self.vnfds.iteritems():
752 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
753
754 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
755 """
756 Calculate cpu period and quota for CFS
757 :param cpu_time_percentage: percentage of overall CPU to be used
758 :return: cpu_period, cpu_quota
759 """
760 if cpu_time_percentage is None:
761 return -1, -1
762 if cpu_time_percentage < 0:
763 return -1, -1
764 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
765 # Attention minimum cpu_quota is 1ms (micro)
766 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
767 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
768 (cpu_period, cpu_time_percentage))
769 # calculate the fraction of cpu time for this container
770 cpu_quota = cpu_period * cpu_time_percentage
771 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
772 # idea why
773 if cpu_quota < 1000:
774 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
775 cpu_quota = 1000
776 LOG.warning("Increased CPU quota to avoid system error.")
777 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
778 (cpu_period, cpu_quota))
779 return int(cpu_period), int(cpu_quota)
780
781
782 """
783 Some (simple) placement algorithms
784 """
785
786
787 class FirstDcPlacement(object):
788 """
789 Placement: Always use one and the same data center from the GK.dcs dict.
790 """
791
792 def place(self, nsd, vnfds, dcs):
793 for id, vnfd in vnfds.iteritems():
794 vnfd["dc"] = list(dcs.itervalues())[0]
795
796
797 class RoundRobinDcPlacement(object):
798 """
799 Placement: Distribute VNFs across all available DCs in a round robin fashion.
800 """
801
802 def place(self, nsd, vnfds, dcs):
803 c = 0
804 dcs_list = list(dcs.itervalues())
805 for id, vnfd in vnfds.iteritems():
806 vnfd["dc"] = dcs_list[c % len(dcs_list)]
807 c += 1 # inc. c to use next DC
808
809
810 """
811 Resource definitions and API endpoints
812 """
813
814
815 class Packages(fr.Resource):
816
817 def post(self):
818 """
819 Upload a *.son service package to the dummy gatekeeper.
820
821 We expect request with a *.son file and store it in UPLOAD_FOLDER
822 :return: UUID
823 """
824 try:
825 # get file contents
826 LOG.info("POST /packages called")
827 # lets search for the package in the request
828 is_file_object = False # make API more robust: file can be in data or in files field
829 if "package" in request.files:
830 son_file = request.files["package"]
831 is_file_object = True
832 elif len(request.data) > 0:
833 son_file = request.data
834 else:
835 return {"service_uuid": None, "size": 0, "sha1": None,
836 "error": "upload failed. file not found."}, 500
837 # generate a uuid to reference this package
838 service_uuid = str(uuid.uuid4())
839 file_hash = hashlib.sha1(str(son_file)).hexdigest()
840 # ensure that upload folder exists
841 ensure_dir(UPLOAD_FOLDER)
842 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
843 # store *.son file to disk
844 if is_file_object:
845 son_file.save(upload_path)
846 else:
847 with open(upload_path, 'wb') as f:
848 f.write(son_file)
849 size = os.path.getsize(upload_path)
850
851 # first stop and delete any other running services
852 if AUTO_DELETE:
853 service_list = copy.copy(GK.services)
854 for service_uuid in service_list:
855 instances_list = copy.copy(
856 GK.services[service_uuid].instances)
857 for instance_uuid in instances_list:
858 # valid service and instance UUID, stop service
859 GK.services.get(service_uuid).stop_service(
860 instance_uuid)
861 LOG.info("service instance with uuid %r stopped." %
862 instance_uuid)
863
864 # create a service object and register it
865 s = Service(service_uuid, file_hash, upload_path)
866 GK.register_service_package(service_uuid, s)
867
868 # automatically deploy the service
869 if AUTO_DEPLOY:
870 # ok, we have a service uuid, lets start the service
871 reset_subnets()
872 GK.services.get(service_uuid).start_service()
873
874 # generate the JSON result
875 return {"service_uuid": service_uuid, "size": size,
876 "sha1": file_hash, "error": None}, 201
877 except BaseException:
878 LOG.exception("Service package upload failed:")
879 return {"service_uuid": None, "size": 0,
880 "sha1": None, "error": "upload failed"}, 500
881
882 def get(self):
883 """
884 Return a list of UUID's of uploaded service packages.
885 :return: dict/list
886 """
887 LOG.info("GET /packages")
888 return {"service_uuid_list": list(GK.services.iterkeys())}
889
890
891 class Instantiations(fr.Resource):
892
893 def post(self):
894 """
895 Instantiate a service specified by its UUID.
896 Will return a new UUID to identify the running service instance.
897 :return: UUID
898 """
899 LOG.info("POST /instantiations (or /requests) called")
900 # try to extract the service uuid from the request
901 json_data = request.get_json(force=True)
902 service_uuid = json_data.get("service_uuid")
903 service_name = json_data.get("service_name")
904
905 # first try to find by service_name
906 if service_name is not None:
907 for s_uuid, s in GK.services.iteritems():
908 if s.manifest.get("name") == service_name:
909 LOG.info("Found service: {} with UUID: {}"
910 .format(service_name, s_uuid))
911 service_uuid = s_uuid
912 # lets be a bit fuzzy here to make testing easier
913 if (service_uuid is None or service_uuid ==
914 "latest") and len(GK.services) > 0:
915 # if we don't get a service uuid, we simple start the first service
916 # in the list
917 service_uuid = list(GK.services.iterkeys())[0]
918 if service_uuid in GK.services:
919 # ok, we have a service uuid, lets start the service
920 service_instance_uuid = GK.services.get(
921 service_uuid).start_service()
922 return {"service_instance_uuid": service_instance_uuid}, 201
923 return "Service not found", 404
924
925 def get(self):
926 """
927 Returns a list of UUIDs containing all running services.
928 :return: dict / list
929 """
930 LOG.info("GET /instantiations")
931 return {"service_instantiations_list": [
932 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
933
934 def delete(self):
935 """
936 Stops a running service specified by its service and instance UUID.
937 """
938 # try to extract the service and instance UUID from the request
939 json_data = request.get_json(force=True)
940 service_uuid_input = json_data.get("service_uuid")
941 instance_uuid_input = json_data.get("service_instance_uuid")
942 if len(GK.services) < 1:
943 return "No service on-boarded.", 404
944 # try to be fuzzy
945 if service_uuid_input is None:
946 # if we don't get a service uuid we stop all services
947 service_uuid_list = list(GK.services.iterkeys())
948 LOG.info("No service_uuid given, stopping all.")
949 else:
950 service_uuid_list = [service_uuid_input]
951 # for each service
952 for service_uuid in service_uuid_list:
953 if instance_uuid_input is None:
954 instance_uuid_list = list(
955 GK.services[service_uuid].instances.iterkeys())
956 else:
957 instance_uuid_list = [instance_uuid_input]
958 # for all service instances
959 for instance_uuid in instance_uuid_list:
960 if (service_uuid in GK.services and
961 instance_uuid in GK.services[service_uuid].instances):
962 # valid service and instance UUID, stop service
963 GK.services.get(service_uuid).stop_service(instance_uuid)
964 LOG.info("Service instance with uuid %r stopped." % instance_uuid)
965 return "Service(s) stopped.", 200
966
967
968 class Exit(fr.Resource):
969
970 def put(self):
971 """
972 Stop the running Containernet instance regardless of data transmitted
973 """
974 list(GK.dcs.values())[0].net.stop()
975
976
977 def generate_subnets(prefix, base, subnet_size=50, mask=24):
978 # Generate a list of ipaddress in subnets
979 r = list()
980 for net in range(base, base + subnet_size):
981 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
982 r.append(ipaddress.ip_network(unicode(subnet)))
983 return r
984
985
986 def reset_subnets():
987 global ELINE_SUBNETS
988 global ELAN_SUBNETS
989 # private subnet definitions for the generated interfaces
990 # 30.0.xxx.0/24
991 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
992 # 20.0.xxx.0/24
993 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
994
995
996 def initialize_GK():
997 global GK
998 GK = Gatekeeper()
999
1000
1001 # create a single, global GK object
1002 GK = None
1003 initialize_GK()
1004 # setup Flask
1005 app = Flask(__name__)
1006 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1007 api = fr.Api(app)
1008 # define endpoints
1009 api.add_resource(Packages, '/packages', '/api/v2/packages')
1010 api.add_resource(Instantiations, '/instantiations',
1011 '/api/v2/instantiations', '/api/v2/requests')
1012 api.add_resource(Exit, '/emulator/exit')
1013
1014
1015 def start_rest_api(host, port, datacenters=dict()):
1016 GK.dcs = datacenters
1017 GK.net = get_dc_network()
1018 # start the Flask server (not the best performance but ok for our use case)
1019 app.run(host=host,
1020 port=port,
1021 debug=True,
1022 use_reloader=False # this is needed to run Flask in a non-main thread
1023 )
1024
1025
1026 def ensure_dir(name):
1027 if not os.path.exists(name):
1028 os.makedirs(name)
1029
1030
1031 def load_yaml(path):
1032 with open(path, "r") as f:
1033 try:
1034 r = yaml.load(f)
1035 except yaml.YAMLError as exc:
1036 LOG.exception("YAML parse error: %r" % str(exc))
1037 r = dict()
1038 return r
1039
1040
1041 def make_relative_path(path):
1042 if path.startswith("file://"):
1043 path = path.replace("file://", "", 1)
1044 if path.startswith("/"):
1045 path = path.replace("/", "", 1)
1046 return path
1047
1048
1049 def get_dc_network():
1050 """
1051 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1052 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1053 :return:
1054 """
1055 assert (len(GK.dcs) > 0)
1056 return GK.dcs.values()[0].net
1057
1058
1059 def parse_interface(interface_name):
1060 """
1061 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1062 :param interface_name:
1063 :return:
1064 """
1065 if ':' in interface_name:
1066 vnf_id, vnf_interface = interface_name.split(':')
1067 else:
1068 vnf_id = None
1069 vnf_interface = interface_name
1070 return vnf_id, vnf_interface
1071
1072
1073 def get_container_name(vnf_id, vdu_id):
1074 return "{}.{}".format(vnf_id, vdu_id)
1075
1076
1077 if __name__ == '__main__':
1078 """
1079 Lets allow to run the API in standalone mode.
1080 """
1081 GK_STANDALONE_MODE = True
1082 logging.getLogger("werkzeug").setLevel(logging.INFO)
1083 start_rest_api("0.0.0.0", 8000)