5GTANGO LLCM: Refactoring, cleanup
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from subprocess import Popen
43 import ipaddress
44 import copy
45 import time
46
47
48 LOG = logging.getLogger("5gtango.llcm")
49 LOG.setLevel(logging.INFO)
50
51
52 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
53 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
54 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
55
56 # Enable Dockerfile build functionality
57 BUILD_DOCKERFILE = False
58
59 # flag to indicate that we run without the emulator (only the bare API for
60 # integration testing)
61 GK_STANDALONE_MODE = False
62
63 # should a new version of an image be pulled even if its available
64 FORCE_PULL = False
65
66 # flag to indicate if we use bidirectional forwarding rules in the
67 # automatic chaining process
68 BIDIRECTIONAL_CHAIN = True
69
70 # override the management interfaces in the descriptors with default
71 # docker0 interfaces in the containers
72 USE_DOCKER_MGMT = False
73
74 # automatically deploy uploaded packages (no need to execute son-access
75 # deploy --latest separately)
76 AUTO_DEPLOY = False
77
78 # and also automatically terminate any other running services
79 AUTO_DELETE = False
80
81 # global subnet definitions (see reset_subnets())
82 ELAN_SUBNETS = None
83 ELINE_SUBNETS = None
84
85 # Time in seconds to wait for vnf stop scripts to execute fully
86 VNF_STOP_WAIT_TIME = 5
87
88
89 class OnBoardingException(BaseException):
90 pass
91
92
93 class Gatekeeper(object):
94
95 def __init__(self):
96 self.services = dict()
97 self.dcs = dict()
98 self.net = None
99 # used to generate short names for VNFs (Mininet limitation)
100 self.vnf_counter = 0
101 reset_subnets()
102 LOG.info("Initialized 5GTANGO LLCM module.")
103
104 def register_service_package(self, service_uuid, service):
105 """
106 register new service package
107 :param service_uuid
108 :param service object
109 """
110 self.services[service_uuid] = service
111 # lets perform all steps needed to onboard the service
112 service.onboard()
113
114
115 class Service(object):
116 """
117 This class represents a NS uploaded as a *.son package to the
118 dummy gatekeeper.
119 Can have multiple running instances of this service.
120 """
121
122 def __init__(self,
123 service_uuid,
124 package_file_hash,
125 package_file_path):
126 self.uuid = service_uuid
127 self.package_file_hash = package_file_hash
128 self.package_file_path = package_file_path
129 self.package_content_path = os.path.join(
130 CATALOG_FOLDER, "services/%s" % self.uuid)
131 self.manifest = None
132 self.nsd = None
133 self.vnfds = dict()
134 self.local_docker_files = dict()
135 self.remote_docker_image_urls = dict()
136 self.instances = dict()
137
138 def onboard(self):
139 """
140 Do all steps to prepare this service to be instantiated
141 :return:
142 """
143 # 1. extract the contents of the package and store them in our catalog
144 self._unpack_service_package()
145 # 2. read in all descriptor files
146 self._load_package_descriptor()
147 self._load_nsd()
148 self._load_vnfd()
149 if self.nsd is None:
150 raise OnBoardingException("No NSD found.")
151 if len(self.vnfds) < 1:
152 raise OnBoardingException("No VNFDs found.")
153 # 3. prepare container images (e.g. download or build Dockerfile)
154 if BUILD_DOCKERFILE:
155 self._load_docker_files()
156 self._build_images_from_dockerfiles()
157 else:
158 self._load_docker_urls()
159 self._pull_predefined_dockerimages()
160 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
161
162 def start_service(self):
163 """
164 This methods creates and starts a new service instance.
165 It computes placements, iterates over all VNFDs, and starts
166 each VNFD as a Docker container in the data center selected
167 by the placement algorithm.
168 :return:
169 """
170 LOG.info("Starting service %r" % self.uuid)
171
172 # 1. each service instance gets a new uuid to identify it
173 instance_uuid = str(uuid.uuid4())
174 # build a instances dict (a bit like a NSR :))
175 self.instances[instance_uuid] = dict()
176 self.instances[instance_uuid]["vnf_instances"] = list()
177
178 # 2. compute placement of this service instance (adds DC names to
179 # VNFDs)
180 # self._calculate_placement(FirstDcPlacement)
181 self._calculate_placement(RoundRobinDcPlacement)
182 # 3. start all vnfds that we have in the service
183 for vnf_id in self.vnfds:
184 vnfd = self.vnfds[vnf_id]
185 # attention: returns a list of started deployment units
186 vnfis = self._start_vnfd(vnfd, vnf_id)
187 # add list of VNFIs to total VNFI list
188 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
189
190 # 4. Deploy E-Line, E-Tree and E-LAN links
191 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
192 # even if "forwarding_graphs" are not used directly.
193 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
194 vlinks = self.nsd["virtual_links"]
195 # constituent virtual links are not checked
196 eline_fwd_links = [l for l in vlinks if (
197 l["connectivity_type"] == "E-Line")]
198 elan_fwd_links = [l for l in vlinks if (
199 l["connectivity_type"] == "E-LAN" or
200 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
201
202 # 5a. deploy E-Line links
203 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
204 self._connect_elines(eline_fwd_links, instance_uuid)
205 # 5b. deploy E-Tree/E-LAN links
206 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
207 self._connect_elans(elan_fwd_links, instance_uuid)
208
209 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
210 # service instance
211 self._trigger_emulator_start_scripts_in_vnfis(
212 self.instances[instance_uuid]["vnf_instances"])
213 # done
214 LOG.info("Service started. Instance id: %r" % instance_uuid)
215 return instance_uuid
216
217 def stop_service(self, instance_uuid):
218 """
219 This method stops a running service instance.
220 It iterates over all VNF instances, stopping them each
221 and removing them from their data center.
222 :param instance_uuid: the uuid of the service instance to be stopped
223 """
224 LOG.info("Stopping service %r" % self.uuid)
225 # get relevant information
226 # instance_uuid = str(self.uuid.uuid4())
227 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
228 # trigger stop skripts in vnf instances and wait a few seconds for
229 # completion
230 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
231 time.sleep(VNF_STOP_WAIT_TIME)
232 # stop all vnfs
233 for v in vnf_instances:
234 self._stop_vnfi(v)
235 # last step: remove the instance from the list of all instances
236 del self.instances[instance_uuid]
237
238 def _get_resource_limits(self, deployment_unit):
239 """
240 Extract resource limits from deployment units.
241 """
242 # defaults
243 cpu_list = "1"
244 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
245 mem_limit = 0
246 # update from descriptor
247 if "resource_requirements" in deployment_unit:
248 res_req = deployment_unit.get("resource_requirements")
249 cpu_list = res_req.get("cpu").get("cores")
250 if cpu_list is None:
251 cpu_list = res_req.get("cpu").get("vcpus")
252 cpu_bw = res_req.get("cpu").get("cpu_bw", 1.0)
253 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
254 mem_num = str(res_req.get("memory").get("size", 2))
255 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
256 mem_limit = float(mem_num)
257 if mem_unit == "GB":
258 mem_limit = mem_limit * 1024 * 1024 * 1024
259 elif mem_unit == "MB":
260 mem_limit = mem_limit * 1024 * 1024
261 elif mem_unit == "KB":
262 mem_limit = mem_limit * 1024
263 mem_limit = int(mem_limit)
264 return cpu_list, cpu_period, cpu_quota, mem_limit
265
266 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
267 """
268 Start a single VNFD of this service
269 :param vnfd: vnfd descriptor dict
270 :param vnf_id: unique id of this vnf in the nsd
271 :return:
272 """
273 vnfis = list()
274 # the vnf_name refers to the container image to be deployed
275 vnf_name = vnfd.get("name")
276 # combine VDUs and CDUs
277 deployment_units = (vnfd.get("virtual_deployment_units", []) +
278 vnfd.get("cloudnative_deployment_units", []))
279 # iterate over all deployment units within each VNFDs
280 for u in deployment_units:
281 # 0. vnf_container_name = vnf_id.vdu_id
282 vnf_container_name = get_container_name(vnf_id, u.get("id"))
283 # 1. get the name of the docker image to star
284 if vnf_container_name not in self.remote_docker_image_urls:
285 raise Exception("No image name for %r found. Abort." % vnf_container_name)
286 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
287 # 2. select datacenter to start the VNF in
288 target_dc = vnfd.get("dc")
289 # 3. perform some checks to ensure we can start the container
290 assert(docker_image_name is not None)
291 assert(target_dc is not None)
292 if not self._check_docker_image_exists(docker_image_name):
293 raise Exception("Docker image {} not found. Abort."
294 .format(docker_image_name))
295
296 # 4. get the resource limits
297 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
298
299 # get connection points defined for the DU
300 intfs = u.get("connection_points", [])
301 # do some re-naming of fields to be compatible to containernet
302 for i in intfs:
303 if i.get("address"):
304 i["ip"] = i.get("address")
305
306 # 5. collect additional information to start container
307 volumes = list()
308 cenv = dict()
309 # 5.1 inject descriptor based start/stop commands into env (overwrite)
310 VNFD_CMD_START = u.get("vm_cmd_start")
311 VNFD_CMD_STOP = u.get("vm_cmd_stop")
312 if VNFD_CMD_START and not VNFD_CMD_START == "None":
313 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
314 " Overwriting SON_EMU_CMD.")
315 cenv["SON_EMU_CMD"] = VNFD_CMD_START
316 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
317 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
318 " Overwriting SON_EMU_CMD_STOP.")
319 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
320
321 # 6. Start the container
322 LOG.info("Starting %r as %r in DC %r" %
323 (vnf_name, vnf_container_name, vnfd.get("dc")))
324 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
325 # start the container
326 vnfi = target_dc.startCompute(
327 vnf_container_name,
328 network=intfs,
329 image=docker_image_name,
330 cpu_quota=cpu_quota,
331 cpu_period=cpu_period,
332 cpuset=cpu_list,
333 mem_limit=mem_limit,
334 volumes=volumes,
335 properties=cenv, # environment
336 type=kwargs.get('type', 'docker'))
337 # add vnfd reference to vnfi
338 vnfi.vnfd = vnfd
339 # add container name
340 vnfi.vnf_container_name = vnf_container_name
341 # store vnfi
342 vnfis.append(vnfi)
343 return vnfis
344
345 def _stop_vnfi(self, vnfi):
346 """
347 Stop a VNF instance.
348 :param vnfi: vnf instance to be stopped
349 """
350 # Find the correct datacenter
351 status = vnfi.getStatus()
352 dc = vnfi.datacenter
353 # stop the vnfi
354 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
355 (status["name"], dc))
356 dc.stopCompute(status["name"])
357
358 def _get_vnf_instance(self, instance_uuid, vnf_id):
359 """
360 Returns VNFI object for a given "vnf_id" or "vnf_container_namse" taken from an NSD.
361 :return: single object
362 """
363 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
364 if str(vnfi.name) == str(vnf_id):
365 return vnfi
366 LOG.warning("No container with name: {0} found.".format(vnf_id))
367 return None
368
369 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
370 """
371 Returns a list of VNFI objects (all deployment units) for a given
372 "vnf_id" taken from an NSD.
373 :return: list
374 """
375 r = list()
376 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
377 if vnf_id in vnfi.name:
378 r.append(vnfi)
379 if len(r) > 0:
380 LOG.debug("Found units: {} for vnf_id: {}"
381 .format([i.name for i in r], vnf_id))
382 return r
383 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
384 return None
385
386 @staticmethod
387 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
388 """
389 Reconfigure the network configuration of a specific interface
390 of a running container.
391 :param vnfi: container instance
392 :param if_name: interface name
393 :param net_str: network configuration string, e.g., 1.2.3.4/24
394 :return:
395 """
396 # assign new ip address
397 if net_str is not None:
398 intf = vnfi.intf(intf=if_name)
399 if intf is not None:
400 intf.setIP(net_str)
401 LOG.debug("Reconfigured network of %s:%s to %r" %
402 (vnfi.name, if_name, net_str))
403 else:
404 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
405 vnfi.name, if_name))
406
407 if new_name is not None:
408 vnfi.cmd('ip link set', if_name, 'down')
409 vnfi.cmd('ip link set', if_name, 'name', new_name)
410 vnfi.cmd('ip link set', new_name, 'up')
411 LOG.debug("Reconfigured interface name of %s:%s to %s" %
412 (vnfi.name, if_name, new_name))
413
414 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
415 for vnfi in vnfi_list:
416 config = vnfi.dcinfo.get("Config", dict())
417 env = config.get("Env", list())
418 for env_var in env:
419 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
420 # LOG.debug("%r = %r" % (var, cmd))
421 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
422 LOG.info("Executing script in '{}': {}={}"
423 .format(vnfi.name, var, cmd))
424 # execute command in new thread to ensure that GK is not
425 # blocked by VNF
426 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
427 t.daemon = True
428 t.start()
429 break # only execute one command
430
431 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
432 for vnfi in vnfi_list:
433 config = vnfi.dcinfo.get("Config", dict())
434 env = config.get("Env", list())
435 for env_var in env:
436 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
437 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
438 LOG.info("Executing script in '{}': {}={}"
439 .format(vnfi.name, var, cmd))
440 # execute command in new thread to ensure that GK is not
441 # blocked by VNF
442 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
443 t.daemon = True
444 t.start()
445 break # only execute one command
446
447 def _unpack_service_package(self):
448 """
449 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
450 """
451 LOG.info("Unzipping: %r" % self.package_file_path)
452 with zipfile.ZipFile(self.package_file_path, "r") as z:
453 z.extractall(self.package_content_path)
454
455 def _load_package_descriptor(self):
456 """
457 Load the main package descriptor YAML and keep it as dict.
458 :return:
459 """
460 self.manifest = load_yaml(
461 os.path.join(
462 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
463
464 def _load_nsd(self):
465 """
466 Load the entry NSD YAML and keep it as dict.
467 :return:
468 """
469 if "package_content" in self.manifest:
470 nsd_path = None
471 for f in self.manifest.get("package_content"):
472 if f.get("content-type") == "application/vnd.5gtango.nsd":
473 nsd_path = os.path.join(
474 self.package_content_path,
475 make_relative_path(f.get("source")))
476 break # always use the first NSD for now
477 if nsd_path is None:
478 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
479 self.nsd = load_yaml(nsd_path)
480 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
481 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
482 else:
483 raise OnBoardingException(
484 "No 'package_content' section in package manifest:\n{}"
485 .format(self.manifest))
486
487 def _load_vnfd(self):
488 """
489 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
490 :return:
491 """
492 # first make a list of all the vnfds in the package
493 vnfd_set = dict()
494 if "package_content" in self.manifest:
495 for pc in self.manifest.get("package_content"):
496 if pc.get(
497 "content-type") == "application/vnd.5gtango.vnfd":
498 vnfd_path = os.path.join(
499 self.package_content_path,
500 make_relative_path(pc.get("source")))
501 vnfd = load_yaml(vnfd_path)
502 vnfd_set[vnfd.get("name")] = vnfd
503 if len(vnfd_set) < 1:
504 raise OnBoardingException("No VNFDs found.")
505 # then link each vnf_id in the nsd to its vnfd
506 for v in self.nsd.get("network_functions"):
507 if v.get("vnf_name") in vnfd_set:
508 self.vnfds[v.get("vnf_id")] = vnfd_set[v.get("vnf_name")]
509 LOG.debug("Loaded VNFD: {0} id: {1}"
510 .format(v.get("vnf_name"), v.get("vnf_id")))
511
512 def _connect_elines(self, eline_fwd_links, instance_uuid):
513 """
514 Connect all E-LINE links in the NSD
515 Attention: This method DOES NOT support multi V/CDU VNFs!
516 :param eline_fwd_links: list of E-LINE links in the NSD
517 :param: instance_uuid of the service
518 :return:
519 """
520 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
521 # eg. different services get a unique cookie for their flowrules
522 cookie = 1
523 for link in eline_fwd_links:
524 LOG.info("Found E-Line: {}".format(link))
525 # check if we need to deploy this link when its a management link:
526 if USE_DOCKER_MGMT:
527 if self.check_mgmt_interface(
528 link["connection_points_reference"]):
529 continue
530
531 src_id, src_if_name = parse_interface(
532 link["connection_points_reference"][0])
533 dst_id, dst_if_name = parse_interface(
534 link["connection_points_reference"][1])
535 setChaining = False
536 LOG.info("Creating E-Line: src={}, dst={}"
537 .format(src_id, dst_id))
538 # get involved vnfis
539 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
540 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
541
542 if src_vnfi is not None and dst_vnfi is not None:
543 setChaining = True
544 # re-configure the VNFs IP assignment and ensure that a new
545 # subnet is used for each E-Link
546 eline_net = ELINE_SUBNETS.pop(0)
547 ip1 = "{0}/{1}".format(str(eline_net[1]),
548 eline_net.prefixlen)
549 ip2 = "{0}/{1}".format(str(eline_net[2]),
550 eline_net.prefixlen)
551 # check if VNFs have fixed IPs (address field in VNFDs)
552 if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
553 .get("address") is None):
554 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
555 # check if VNFs have fixed IPs (address field in VNFDs)
556 if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
557 .get("address") is None):
558 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
559 # set the chaining
560 if setChaining:
561 GK.net.setChain(
562 src_id, dst_id,
563 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
564 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
565
566 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
567 """
568 Gets the connection point data structure from the VNFD
569 of the given VNFI using ifname.
570 """
571 if vnfi.vnfd is None:
572 return {}
573 cps = vnfi.vnfd.get("connection_points")
574 for cp in cps:
575 if cp.get("id") == ifname:
576 return cp
577
578 def _connect_elans(self, elan_fwd_links, instance_uuid):
579 """
580 Connect all E-LAN/E-Tree links in the NSD
581 This method supports multi-V/CDU VNFs if the connection
582 point names of the DUs are the same as the ones in the NSD.
583 :param elan_fwd_links: list of E-LAN links in the NSD
584 :param: instance_uuid of the service
585 :return:
586 """
587 for link in elan_fwd_links:
588 # a new E-LAN/E-Tree
589 elan_vnf_list = []
590 lan_net = ELAN_SUBNETS.pop(0)
591 lan_hosts = list(lan_net.hosts())
592
593 # generate lan ip address for all interfaces (of all involved (V/CDUs))
594 for intf in link["connection_points_reference"]:
595 vnf_id, intf_name = parse_interface(intf)
596 if vnf_id is None:
597 continue # skip references to NS connection points
598 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
599 if units is None:
600 continue # skip if no deployment unit is present
601 # iterate over all involved deployment units
602 for uvnfi in units:
603 # Attention: we apply a simplification for multi DU VNFs here:
604 # the connection points of all involved DUs have to have the same
605 # name as the connection points of the surrounding VNF to be mapped.
606 # This is because we do not consider links specified in the VNFds
607 container_name = uvnfi.name
608 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
609 lan_net.prefixlen)
610 LOG.debug(
611 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
612 container_name, intf_name, ip_address))
613 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
614 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
615 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
616 # necessary.
617 vnfi = self._get_vnf_instance(instance_uuid, container_name)
618 if vnfi is not None:
619 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
620 # add this vnf and interface to the E-LAN for tagging
621 elan_vnf_list.append(
622 {'name': container_name, 'interface': intf_name})
623 # install the VLAN tags for this E-LAN
624 GK.net.setLAN(elan_vnf_list)
625
626 def _load_docker_files(self):
627 """
628 Get all paths to Dockerfiles from VNFDs and store them in dict.
629 :return:
630 """
631 for vnf_id, v in self.vnfds.iteritems():
632 for vu in v.get("virtual_deployment_units", []):
633 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
634 if vu.get("vm_image_format") == "docker":
635 vm_image = vu.get("vm_image")
636 docker_path = os.path.join(
637 self.package_content_path,
638 make_relative_path(vm_image))
639 self.local_docker_files[vnf_container_name] = docker_path
640 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
641 for cu in v.get("cloudnative_deployment_units", []):
642 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
643 image = cu.get("image")
644 docker_path = os.path.join(
645 self.package_content_path,
646 make_relative_path(image))
647 self.local_docker_files[vnf_container_name] = docker_path
648 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
649
650 def _load_docker_urls(self):
651 """
652 Get all URLs to pre-build docker images in some repo.
653 :return:
654 """
655 for vnf_id, v in self.vnfds.iteritems():
656 for vu in v.get("virtual_deployment_units", []):
657 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
658 if vu.get("vm_image_format") == "docker":
659 url = vu.get("vm_image")
660 if url is not None:
661 url = url.replace("http://", "")
662 self.remote_docker_image_urls[vnf_container_name] = url
663 LOG.debug("Found Docker image URL (%r): %r" %
664 (vnf_container_name,
665 self.remote_docker_image_urls[vnf_container_name]))
666 for cu in v.get("cloudnative_deployment_units", []):
667 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
668 url = cu.get("image")
669 if url is not None:
670 url = url.replace("http://", "")
671 self.remote_docker_image_urls[vnf_container_name] = url
672 LOG.debug("Found Docker image URL (%r): %r" %
673 (vnf_container_name,
674 self.remote_docker_image_urls[vnf_container_name]))
675
676 def _build_images_from_dockerfiles(self):
677 """
678 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
679 """
680 if GK_STANDALONE_MODE:
681 return # do not build anything in standalone mode
682 dc = DockerClient()
683 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
684 self.local_docker_files))
685 for k, v in self.local_docker_files.iteritems():
686 for line in dc.build(path=v.replace(
687 "Dockerfile", ""), tag=k, rm=False, nocache=False):
688 LOG.debug("DOCKER BUILD: %s" % line)
689 LOG.info("Docker image created: %s" % k)
690
691 def _pull_predefined_dockerimages(self):
692 """
693 If the package contains URLs to pre-build Docker images, we download them with this method.
694 """
695 dc = DockerClient()
696 for url in self.remote_docker_image_urls.itervalues():
697 # only pull if not present (speedup for development)
698 if not FORCE_PULL:
699 if len(dc.images.list(name=url)) > 0:
700 LOG.debug("Image %r present. Skipping pull." % url)
701 continue
702 LOG.info("Pulling image: %r" % url)
703 # this seems to fail with latest docker api version 2.0.2
704 # dc.images.pull(url,
705 # insecure_registry=True)
706 # using docker cli instead
707 cmd = ["docker",
708 "pull",
709 url,
710 ]
711 Popen(cmd).wait()
712
713 def _check_docker_image_exists(self, image_name):
714 """
715 Query the docker service and check if the given image exists
716 :param image_name: name of the docker image
717 :return:
718 """
719 return len(DockerClient().images.list(name=image_name)) > 0
720
721 def _calculate_placement(self, algorithm):
722 """
723 Do placement by adding the a field "dc" to
724 each VNFD that points to one of our
725 data center objects known to the gatekeeper.
726 """
727 assert(len(self.vnfds) > 0)
728 assert(len(GK.dcs) > 0)
729 # instantiate algorithm an place
730 p = algorithm()
731 p.place(self.nsd, self.vnfds, GK.dcs)
732 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
733 # lets print the placement result
734 for name, vnfd in self.vnfds.iteritems():
735 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
736
737 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
738 """
739 Calculate cpu period and quota for CFS
740 :param cpu_time_percentage: percentage of overall CPU to be used
741 :return: cpu_period, cpu_quota
742 """
743 if cpu_time_percentage is None:
744 return -1, -1
745 if cpu_time_percentage < 0:
746 return -1, -1
747 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
748 # Attention minimum cpu_quota is 1ms (micro)
749 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
750 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
751 (cpu_period, cpu_time_percentage))
752 # calculate the fraction of cpu time for this container
753 cpu_quota = cpu_period * cpu_time_percentage
754 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
755 # idea why
756 if cpu_quota < 1000:
757 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
758 cpu_quota = 1000
759 LOG.warning("Increased CPU quota to avoid system error.")
760 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
761 (cpu_period, cpu_quota))
762 return int(cpu_period), int(cpu_quota)
763
764
765 """
766 Some (simple) placement algorithms
767 """
768
769
770 class FirstDcPlacement(object):
771 """
772 Placement: Always use one and the same data center from the GK.dcs dict.
773 """
774
775 def place(self, nsd, vnfds, dcs):
776 for id, vnfd in vnfds.iteritems():
777 vnfd["dc"] = list(dcs.itervalues())[0]
778
779
780 class RoundRobinDcPlacement(object):
781 """
782 Placement: Distribute VNFs across all available DCs in a round robin fashion.
783 """
784
785 def place(self, nsd, vnfds, dcs):
786 c = 0
787 dcs_list = list(dcs.itervalues())
788 for id, vnfd in vnfds.iteritems():
789 vnfd["dc"] = dcs_list[c % len(dcs_list)]
790 c += 1 # inc. c to use next DC
791
792
793 """
794 Resource definitions and API endpoints
795 """
796
797
798 class Packages(fr.Resource):
799
800 def post(self):
801 """
802 Upload a *.son service package to the dummy gatekeeper.
803
804 We expect request with a *.son file and store it in UPLOAD_FOLDER
805 :return: UUID
806 """
807 try:
808 # get file contents
809 LOG.info("POST /packages called")
810 # lets search for the package in the request
811 is_file_object = False # make API more robust: file can be in data or in files field
812 if "package" in request.files:
813 son_file = request.files["package"]
814 is_file_object = True
815 elif len(request.data) > 0:
816 son_file = request.data
817 else:
818 return {"service_uuid": None, "size": 0, "sha1": None,
819 "error": "upload failed. file not found."}, 500
820 # generate a uuid to reference this package
821 service_uuid = str(uuid.uuid4())
822 file_hash = hashlib.sha1(str(son_file)).hexdigest()
823 # ensure that upload folder exists
824 ensure_dir(UPLOAD_FOLDER)
825 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
826 # store *.son file to disk
827 if is_file_object:
828 son_file.save(upload_path)
829 else:
830 with open(upload_path, 'wb') as f:
831 f.write(son_file)
832 size = os.path.getsize(upload_path)
833
834 # first stop and delete any other running services
835 if AUTO_DELETE:
836 service_list = copy.copy(GK.services)
837 for service_uuid in service_list:
838 instances_list = copy.copy(
839 GK.services[service_uuid].instances)
840 for instance_uuid in instances_list:
841 # valid service and instance UUID, stop service
842 GK.services.get(service_uuid).stop_service(
843 instance_uuid)
844 LOG.info("service instance with uuid %r stopped." %
845 instance_uuid)
846
847 # create a service object and register it
848 s = Service(service_uuid, file_hash, upload_path)
849 GK.register_service_package(service_uuid, s)
850
851 # automatically deploy the service
852 if AUTO_DEPLOY:
853 # ok, we have a service uuid, lets start the service
854 reset_subnets()
855 GK.services.get(service_uuid).start_service()
856
857 # generate the JSON result
858 return {"service_uuid": service_uuid, "size": size,
859 "sha1": file_hash, "error": None}, 201
860 except BaseException:
861 LOG.exception("Service package upload failed:")
862 return {"service_uuid": None, "size": 0,
863 "sha1": None, "error": "upload failed"}, 500
864
865 def get(self):
866 """
867 Return a list of UUID's of uploaded service packages.
868 :return: dict/list
869 """
870 LOG.info("GET /packages")
871 return {"service_uuid_list": list(GK.services.iterkeys())}
872
873
874 class Instantiations(fr.Resource):
875
876 def post(self):
877 """
878 Instantiate a service specified by its UUID.
879 Will return a new UUID to identify the running service instance.
880 :return: UUID
881 """
882 LOG.info("POST /instantiations (or /requests) called")
883 # try to extract the service uuid from the request
884 json_data = request.get_json(force=True)
885 service_uuid = json_data.get("service_uuid")
886
887 # lets be a bit fuzzy here to make testing easier
888 if (service_uuid is None or service_uuid ==
889 "latest") and len(GK.services) > 0:
890 # if we don't get a service uuid, we simple start the first service
891 # in the list
892 service_uuid = list(GK.services.iterkeys())[0]
893 if service_uuid in GK.services:
894 # ok, we have a service uuid, lets start the service
895 service_instance_uuid = GK.services.get(
896 service_uuid).start_service()
897 return {"service_instance_uuid": service_instance_uuid}, 201
898 return "Service not found", 404
899
900 def get(self):
901 """
902 Returns a list of UUIDs containing all running services.
903 :return: dict / list
904 """
905 LOG.info("GET /instantiations")
906 return {"service_instantiations_list": [
907 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
908
909 def delete(self):
910 """
911 Stops a running service specified by its service and instance UUID.
912 """
913 # try to extract the service and instance UUID from the request
914 json_data = request.get_json(force=True)
915 service_uuid = json_data.get("service_uuid")
916 instance_uuid = json_data.get("service_instance_uuid")
917
918 # try to be fuzzy
919 if service_uuid is None and len(GK.services) > 0:
920 # if we don't get a service uuid, we simply stop the last service
921 # in the list
922 service_uuid = list(GK.services.iterkeys())[0]
923 if instance_uuid is None and len(
924 GK.services[service_uuid].instances) > 0:
925 instance_uuid = list(
926 GK.services[service_uuid].instances.iterkeys())[0]
927
928 if service_uuid in GK.services and instance_uuid in GK.services[service_uuid].instances:
929 # valid service and instance UUID, stop service
930 GK.services.get(service_uuid).stop_service(instance_uuid)
931 return "service instance with uuid %r stopped." % instance_uuid, 200
932 return "Service not found", 404
933
934
935 class Exit(fr.Resource):
936
937 def put(self):
938 """
939 Stop the running Containernet instance regardless of data transmitted
940 """
941 list(GK.dcs.values())[0].net.stop()
942
943
944 def generate_subnets(prefix, base, subnet_size=50, mask=24):
945 # Generate a list of ipaddress in subnets
946 r = list()
947 for net in range(base, base + subnet_size):
948 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
949 r.append(ipaddress.ip_network(unicode(subnet)))
950 return r
951
952
953 def reset_subnets():
954 global ELINE_SUBNETS
955 global ELAN_SUBNETS
956 # private subnet definitions for the generated interfaces
957 # 30.0.xxx.0/24
958 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
959 # 20.0.xxx.0/24
960 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
961
962
963 def initialize_GK():
964 global GK
965 GK = Gatekeeper()
966
967
968 # create a single, global GK object
969 GK = None
970 initialize_GK()
971 # setup Flask
972 app = Flask(__name__)
973 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
974 api = fr.Api(app)
975 # define endpoints
976 api.add_resource(Packages, '/packages', '/api/v2/packages')
977 api.add_resource(Instantiations, '/instantiations',
978 '/api/v2/instantiations', '/api/v2/requests')
979 api.add_resource(Exit, '/emulator/exit')
980
981
982 def start_rest_api(host, port, datacenters=dict()):
983 GK.dcs = datacenters
984 GK.net = get_dc_network()
985 # start the Flask server (not the best performance but ok for our use case)
986 app.run(host=host,
987 port=port,
988 debug=True,
989 use_reloader=False # this is needed to run Flask in a non-main thread
990 )
991
992
993 def ensure_dir(name):
994 if not os.path.exists(name):
995 os.makedirs(name)
996
997
998 def load_yaml(path):
999 with open(path, "r") as f:
1000 try:
1001 r = yaml.load(f)
1002 except yaml.YAMLError as exc:
1003 LOG.exception("YAML parse error: %r" % str(exc))
1004 r = dict()
1005 return r
1006
1007
1008 def make_relative_path(path):
1009 if path.startswith("file://"):
1010 path = path.replace("file://", "", 1)
1011 if path.startswith("/"):
1012 path = path.replace("/", "", 1)
1013 return path
1014
1015
1016 def get_dc_network():
1017 """
1018 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1019 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1020 :return:
1021 """
1022 assert (len(GK.dcs) > 0)
1023 return GK.dcs.values()[0].net
1024
1025
1026 def parse_interface(interface_name):
1027 """
1028 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1029 :param interface_name:
1030 :return:
1031 """
1032 if ':' in interface_name:
1033 vnf_id, vnf_interface = interface_name.split(':')
1034 else:
1035 vnf_id = None
1036 vnf_interface = interface_name
1037 return vnf_id, vnf_interface
1038
1039
1040 def get_container_name(vnf_id, vdu_id):
1041 return "{}.{}".format(vnf_id, vdu_id)
1042
1043
1044 if __name__ == '__main__':
1045 """
1046 Lets allow to run the API in standalone mode.
1047 """
1048 GK_STANDALONE_MODE = True
1049 logging.getLogger("werkzeug").setLevel(logging.INFO)
1050 start_rest_api("0.0.0.0", 8000)