35cdf4dc11d90cecdcdf675e1138def683faa64b
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from subprocess import Popen
43 import ipaddress
44 import copy
45 import time
46
47
48 LOG = logging.getLogger("5gtango.llcm")
49 LOG.setLevel(logging.INFO)
50
51
52 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
53 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
54 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
55
56 # Enable Dockerfile build functionality
57 BUILD_DOCKERFILE = False
58
59 # flag to indicate that we run without the emulator (only the bare API for
60 # integration testing)
61 GK_STANDALONE_MODE = False
62
63 # should a new version of an image be pulled even if its available
64 FORCE_PULL = False
65
66 # flag to indicate if we use bidirectional forwarding rules in the
67 # automatic chaining process
68 BIDIRECTIONAL_CHAIN = True
69
70 # override the management interfaces in the descriptors with default
71 # docker0 interfaces in the containers
72 USE_DOCKER_MGMT = False
73
74 # automatically deploy uploaded packages (no need to execute son-access
75 # deploy --latest separately)
76 AUTO_DEPLOY = False
77
78 # and also automatically terminate any other running services
79 AUTO_DELETE = False
80
81 # global subnet definitions (see reset_subnets())
82 ELAN_SUBNETS = None
83 ELINE_SUBNETS = None
84
85 # Time in seconds to wait for vnf stop scripts to execute fully
86 VNF_STOP_WAIT_TIME = 5
87
88
89 class OnBoardingException(BaseException):
90 pass
91
92
93 class Gatekeeper(object):
94
95 def __init__(self):
96 self.services = dict()
97 self.dcs = dict()
98 self.net = None
99 # used to generate short names for VNFs (Mininet limitation)
100 self.vnf_counter = 0
101 reset_subnets()
102 LOG.info("Initialized 5GTANGO LLCM module.")
103
104 def register_service_package(self, service_uuid, service):
105 """
106 register new service package
107 :param service_uuid
108 :param service object
109 """
110 self.services[service_uuid] = service
111 # lets perform all steps needed to onboard the service
112 service.onboard()
113
114
115 class Service(object):
116 """
117 This class represents a NS uploaded as a *.son package to the
118 dummy gatekeeper.
119 Can have multiple running instances of this service.
120 """
121
122 def __init__(self,
123 service_uuid,
124 package_file_hash,
125 package_file_path):
126 self.uuid = service_uuid
127 self.package_file_hash = package_file_hash
128 self.package_file_path = package_file_path
129 self.package_content_path = os.path.join(
130 CATALOG_FOLDER, "services/%s" % self.uuid)
131 self.manifest = None
132 self.nsd = None
133 self.vnfds = dict()
134 self.local_docker_files = dict()
135 self.remote_docker_image_urls = dict()
136 self.instances = dict()
137
138 def onboard(self):
139 """
140 Do all steps to prepare this service to be instantiated
141 :return:
142 """
143 # 1. extract the contents of the package and store them in our catalog
144 self._unpack_service_package()
145 # 2. read in all descriptor files
146 self._load_package_descriptor()
147 self._load_nsd()
148 self._load_vnfd()
149 if self.nsd is None:
150 raise OnBoardingException("No NSD found.")
151 if len(self.vnfds) < 1:
152 raise OnBoardingException("No VNFDs found.")
153 # 3. prepare container images (e.g. download or build Dockerfile)
154 if BUILD_DOCKERFILE:
155 self._load_docker_files()
156 self._build_images_from_dockerfiles()
157 else:
158 self._load_docker_urls()
159 self._pull_predefined_dockerimages()
160 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
161
162 def start_service(self):
163 """
164 This methods creates and starts a new service instance.
165 It computes placements, iterates over all VNFDs, and starts
166 each VNFD as a Docker container in the data center selected
167 by the placement algorithm.
168 :return:
169 """
170 LOG.info("Starting service %r" % self.uuid)
171
172 # 1. each service instance gets a new uuid to identify it
173 instance_uuid = str(uuid.uuid4())
174 # build a instances dict (a bit like a NSR :))
175 self.instances[instance_uuid] = dict()
176 self.instances[instance_uuid]["vnf_instances"] = list()
177
178 # 2. compute placement of this service instance (adds DC names to
179 # VNFDs)
180 # self._calculate_placement(FirstDcPlacement)
181 self._calculate_placement(RoundRobinDcPlacement)
182 # 3. start all vnfds that we have in the service
183 for vnf_id in self.vnfds:
184 vnfd = self.vnfds[vnf_id]
185 # attention: returns a list of started deployment units
186 vnfis = self._start_vnfd(vnfd, vnf_id)
187 # add list of VNFIs to total VNFI list
188 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
189
190 # 4. Deploy E-Line, E-Tree and E-LAN links
191 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
192 # even if "forwarding_graphs" are not used directly.
193 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
194 vlinks = self.nsd["virtual_links"]
195 # constituent virtual links are not checked
196 eline_fwd_links = [l for l in vlinks if (
197 l["connectivity_type"] == "E-Line")]
198 elan_fwd_links = [l for l in vlinks if (
199 l["connectivity_type"] == "E-LAN" or
200 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
201
202 # 5a. deploy E-Line links
203 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
204 self._connect_elines(eline_fwd_links, instance_uuid)
205 # 5b. deploy E-Tree/E-LAN links
206 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
207 self._connect_elans(elan_fwd_links, instance_uuid)
208
209 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
210 # service instance
211 self._trigger_emulator_start_scripts_in_vnfis(
212 self.instances[instance_uuid]["vnf_instances"])
213 # done
214 LOG.info("Service started. Instance id: %r" % instance_uuid)
215 return instance_uuid
216
217 def stop_service(self, instance_uuid):
218 """
219 This method stops a running service instance.
220 It iterates over all VNF instances, stopping them each
221 and removing them from their data center.
222 :param instance_uuid: the uuid of the service instance to be stopped
223 """
224 LOG.info("Stopping service %r" % self.uuid)
225 # get relevant information
226 # instance_uuid = str(self.uuid.uuid4())
227 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
228 # trigger stop skripts in vnf instances and wait a few seconds for
229 # completion
230 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
231 time.sleep(VNF_STOP_WAIT_TIME)
232 # stop all vnfs
233 for v in vnf_instances:
234 self._stop_vnfi(v)
235 # last step: remove the instance from the list of all instances
236 del self.instances[instance_uuid]
237
238 def _get_resource_limits(self, deployment_unit):
239 """
240 Extract resource limits from deployment units.
241 """
242 # defaults
243 cpu_list = None
244 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
245 mem_limit = None
246 # update from descriptor
247 if "resource_requirements" in deployment_unit:
248 res_req = deployment_unit.get("resource_requirements")
249 cpu_list = res_req.get("cpu").get("cpuset")
250 if cpu_list is None:
251 cpu_list = res_req.get("cpu").get("vcpus")
252 if cpu_list is not None:
253 # attention: docker expects list as string w/o spaces:
254 cpu_list = str(cpu_list).replace(" ", "").strip()
255 cpu_bw = res_req.get("cpu").get("cpu_bw")
256 if cpu_bw is None:
257 cpu_bw = 1.0
258 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
259 mem_limit = res_req.get("memory").get("size")
260 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
261 if mem_limit is not None:
262 mem_limit = int(mem_limit)
263 # to bytes
264 if "G" in mem_unit:
265 mem_limit = mem_limit * 1024 * 1024 * 1024
266 elif "M" in mem_unit:
267 mem_limit = mem_limit * 1024 * 1024
268 elif "K" in mem_unit:
269 mem_limit = mem_limit * 1024
270 return cpu_list, cpu_period, cpu_quota, mem_limit
271
272 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
273 """
274 Start a single VNFD of this service
275 :param vnfd: vnfd descriptor dict
276 :param vnf_id: unique id of this vnf in the nsd
277 :return:
278 """
279 vnfis = list()
280 # the vnf_name refers to the container image to be deployed
281 vnf_name = vnfd.get("name")
282 # combine VDUs and CDUs
283 deployment_units = (vnfd.get("virtual_deployment_units", []) +
284 vnfd.get("cloudnative_deployment_units", []))
285 # iterate over all deployment units within each VNFDs
286 for u in deployment_units:
287 # 0. vnf_container_name = vnf_id.vdu_id
288 vnf_container_name = get_container_name(vnf_id, u.get("id"))
289 # 1. get the name of the docker image to star
290 if vnf_container_name not in self.remote_docker_image_urls:
291 raise Exception("No image name for %r found. Abort." % vnf_container_name)
292 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
293 # 2. select datacenter to start the VNF in
294 target_dc = vnfd.get("dc")
295 # 3. perform some checks to ensure we can start the container
296 assert(docker_image_name is not None)
297 assert(target_dc is not None)
298 if not self._check_docker_image_exists(docker_image_name):
299 raise Exception("Docker image {} not found. Abort."
300 .format(docker_image_name))
301
302 # 4. get the resource limits
303 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
304
305 # get connection points defined for the DU
306 intfs = u.get("connection_points", [])
307 # do some re-naming of fields to be compatible to containernet
308 for i in intfs:
309 if i.get("address"):
310 i["ip"] = i.get("address")
311
312 # 5. collect additional information to start container
313 volumes = list()
314 cenv = dict()
315 # 5.1 inject descriptor based start/stop commands into env (overwrite)
316 VNFD_CMD_START = u.get("vm_cmd_start")
317 VNFD_CMD_STOP = u.get("vm_cmd_stop")
318 if VNFD_CMD_START and not VNFD_CMD_START == "None":
319 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
320 " Overwriting SON_EMU_CMD.")
321 cenv["SON_EMU_CMD"] = VNFD_CMD_START
322 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
323 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
324 " Overwriting SON_EMU_CMD_STOP.")
325 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
326
327 # 6. Start the container
328 LOG.info("Starting %r as %r in DC %r" %
329 (vnf_name, vnf_container_name, vnfd.get("dc")))
330 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
331 # start the container
332 vnfi = target_dc.startCompute(
333 vnf_container_name,
334 network=intfs,
335 image=docker_image_name,
336 cpu_quota=cpu_quota,
337 cpu_period=cpu_period,
338 cpuset_cpus=cpu_list,
339 mem_limit=mem_limit,
340 volumes=volumes,
341 properties=cenv, # environment
342 type=kwargs.get('type', 'docker'))
343 # add vnfd reference to vnfi
344 vnfi.vnfd = vnfd
345 # add container name
346 vnfi.vnf_container_name = vnf_container_name
347 # store vnfi
348 vnfis.append(vnfi)
349 return vnfis
350
351 def _stop_vnfi(self, vnfi):
352 """
353 Stop a VNF instance.
354 :param vnfi: vnf instance to be stopped
355 """
356 # Find the correct datacenter
357 status = vnfi.getStatus()
358 dc = vnfi.datacenter
359 # stop the vnfi
360 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
361 (status["name"], dc))
362 dc.stopCompute(status["name"])
363
364 def _get_vnf_instance(self, instance_uuid, vnf_id):
365 """
366 Returns VNFI object for a given "vnf_id" or "vnf_container_namse" taken from an NSD.
367 :return: single object
368 """
369 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
370 if str(vnfi.name) == str(vnf_id):
371 return vnfi
372 LOG.warning("No container with name: {0} found.".format(vnf_id))
373 return None
374
375 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
376 """
377 Returns a list of VNFI objects (all deployment units) for a given
378 "vnf_id" taken from an NSD.
379 :return: list
380 """
381 r = list()
382 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
383 if vnf_id in vnfi.name:
384 r.append(vnfi)
385 if len(r) > 0:
386 LOG.debug("Found units: {} for vnf_id: {}"
387 .format([i.name for i in r], vnf_id))
388 return r
389 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
390 return None
391
392 @staticmethod
393 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
394 """
395 Reconfigure the network configuration of a specific interface
396 of a running container.
397 :param vnfi: container instance
398 :param if_name: interface name
399 :param net_str: network configuration string, e.g., 1.2.3.4/24
400 :return:
401 """
402 # assign new ip address
403 if net_str is not None:
404 intf = vnfi.intf(intf=if_name)
405 if intf is not None:
406 intf.setIP(net_str)
407 LOG.debug("Reconfigured network of %s:%s to %r" %
408 (vnfi.name, if_name, net_str))
409 else:
410 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
411 vnfi.name, if_name))
412
413 if new_name is not None:
414 vnfi.cmd('ip link set', if_name, 'down')
415 vnfi.cmd('ip link set', if_name, 'name', new_name)
416 vnfi.cmd('ip link set', new_name, 'up')
417 LOG.debug("Reconfigured interface name of %s:%s to %s" %
418 (vnfi.name, if_name, new_name))
419
420 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
421 for vnfi in vnfi_list:
422 config = vnfi.dcinfo.get("Config", dict())
423 env = config.get("Env", list())
424 for env_var in env:
425 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
426 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
427 LOG.info("Executing script in '{}': {}={}"
428 .format(vnfi.name, var, cmd))
429 # execute command in new thread to ensure that GK is not
430 # blocked by VNF
431 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
432 t.daemon = True
433 t.start()
434 break # only execute one command
435
436 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
437 for vnfi in vnfi_list:
438 config = vnfi.dcinfo.get("Config", dict())
439 env = config.get("Env", list())
440 for env_var in env:
441 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
442 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
443 LOG.info("Executing script in '{}': {}={}"
444 .format(vnfi.name, var, cmd))
445 # execute command in new thread to ensure that GK is not
446 # blocked by VNF
447 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
448 t.daemon = True
449 t.start()
450 break # only execute one command
451
452 def _unpack_service_package(self):
453 """
454 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
455 """
456 LOG.info("Unzipping: %r" % self.package_file_path)
457 with zipfile.ZipFile(self.package_file_path, "r") as z:
458 z.extractall(self.package_content_path)
459
460 def _load_package_descriptor(self):
461 """
462 Load the main package descriptor YAML and keep it as dict.
463 :return:
464 """
465 self.manifest = load_yaml(
466 os.path.join(
467 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
468
469 def _load_nsd(self):
470 """
471 Load the entry NSD YAML and keep it as dict.
472 :return:
473 """
474 if "package_content" in self.manifest:
475 nsd_path = None
476 for f in self.manifest.get("package_content"):
477 if f.get("content-type") == "application/vnd.5gtango.nsd":
478 nsd_path = os.path.join(
479 self.package_content_path,
480 make_relative_path(f.get("source")))
481 break # always use the first NSD for now
482 if nsd_path is None:
483 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
484 self.nsd = load_yaml(nsd_path)
485 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
486 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
487 else:
488 raise OnBoardingException(
489 "No 'package_content' section in package manifest:\n{}"
490 .format(self.manifest))
491
492 def _load_vnfd(self):
493 """
494 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
495 :return:
496 """
497 # first make a list of all the vnfds in the package
498 vnfd_set = dict()
499 if "package_content" in self.manifest:
500 for pc in self.manifest.get("package_content"):
501 if pc.get(
502 "content-type") == "application/vnd.5gtango.vnfd":
503 vnfd_path = os.path.join(
504 self.package_content_path,
505 make_relative_path(pc.get("source")))
506 vnfd = load_yaml(vnfd_path)
507 vnfd_set[vnfd.get("name")] = vnfd
508 if len(vnfd_set) < 1:
509 raise OnBoardingException("No VNFDs found.")
510 # then link each vnf_id in the nsd to its vnfd
511 for v in self.nsd.get("network_functions"):
512 if v.get("vnf_name") in vnfd_set:
513 self.vnfds[v.get("vnf_id")] = vnfd_set[v.get("vnf_name")]
514 LOG.debug("Loaded VNFD: {0} id: {1}"
515 .format(v.get("vnf_name"), v.get("vnf_id")))
516
517 def _connect_elines(self, eline_fwd_links, instance_uuid):
518 """
519 Connect all E-LINE links in the NSD
520 Attention: This method DOES NOT support multi V/CDU VNFs!
521 :param eline_fwd_links: list of E-LINE links in the NSD
522 :param: instance_uuid of the service
523 :return:
524 """
525 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
526 # eg. different services get a unique cookie for their flowrules
527 cookie = 1
528 for link in eline_fwd_links:
529 LOG.info("Found E-Line: {}".format(link))
530 # check if we need to deploy this link when its a management link:
531 if USE_DOCKER_MGMT:
532 if self.check_mgmt_interface(
533 link["connection_points_reference"]):
534 continue
535
536 src_id, src_if_name = parse_interface(
537 link["connection_points_reference"][0])
538 dst_id, dst_if_name = parse_interface(
539 link["connection_points_reference"][1])
540 setChaining = False
541 LOG.info("Creating E-Line: src={}, dst={}"
542 .format(src_id, dst_id))
543 # get involved vnfis
544 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
545 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
546
547 if src_vnfi is not None and dst_vnfi is not None:
548 setChaining = True
549 # re-configure the VNFs IP assignment and ensure that a new
550 # subnet is used for each E-Link
551 eline_net = ELINE_SUBNETS.pop(0)
552 ip1 = "{0}/{1}".format(str(eline_net[1]),
553 eline_net.prefixlen)
554 ip2 = "{0}/{1}".format(str(eline_net[2]),
555 eline_net.prefixlen)
556 # check if VNFs have fixed IPs (address field in VNFDs)
557 if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
558 .get("address") is None):
559 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
560 # check if VNFs have fixed IPs (address field in VNFDs)
561 if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
562 .get("address") is None):
563 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
564 # set the chaining
565 if setChaining:
566 GK.net.setChain(
567 src_id, dst_id,
568 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
569 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
570
571 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
572 """
573 Gets the connection point data structure from the VNFD
574 of the given VNFI using ifname.
575 """
576 if vnfi.vnfd is None:
577 return {}
578 cps = vnfi.vnfd.get("connection_points")
579 for cp in cps:
580 if cp.get("id") == ifname:
581 return cp
582
583 def _connect_elans(self, elan_fwd_links, instance_uuid):
584 """
585 Connect all E-LAN/E-Tree links in the NSD
586 This method supports multi-V/CDU VNFs if the connection
587 point names of the DUs are the same as the ones in the NSD.
588 :param elan_fwd_links: list of E-LAN links in the NSD
589 :param: instance_uuid of the service
590 :return:
591 """
592 for link in elan_fwd_links:
593 # a new E-LAN/E-Tree
594 elan_vnf_list = []
595 lan_net = ELAN_SUBNETS.pop(0)
596 lan_hosts = list(lan_net.hosts())
597
598 # generate lan ip address for all interfaces (of all involved (V/CDUs))
599 for intf in link["connection_points_reference"]:
600 vnf_id, intf_name = parse_interface(intf)
601 if vnf_id is None:
602 continue # skip references to NS connection points
603 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
604 if units is None:
605 continue # skip if no deployment unit is present
606 # iterate over all involved deployment units
607 for uvnfi in units:
608 # Attention: we apply a simplification for multi DU VNFs here:
609 # the connection points of all involved DUs have to have the same
610 # name as the connection points of the surrounding VNF to be mapped.
611 # This is because we do not consider links specified in the VNFds
612 container_name = uvnfi.name
613 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
614 lan_net.prefixlen)
615 LOG.debug(
616 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
617 container_name, intf_name, ip_address))
618 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
619 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
620 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
621 # necessary.
622 vnfi = self._get_vnf_instance(instance_uuid, container_name)
623 if vnfi is not None:
624 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
625 # add this vnf and interface to the E-LAN for tagging
626 elan_vnf_list.append(
627 {'name': container_name, 'interface': intf_name})
628 # install the VLAN tags for this E-LAN
629 GK.net.setLAN(elan_vnf_list)
630
631 def _load_docker_files(self):
632 """
633 Get all paths to Dockerfiles from VNFDs and store them in dict.
634 :return:
635 """
636 for vnf_id, v in self.vnfds.iteritems():
637 for vu in v.get("virtual_deployment_units", []):
638 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
639 if vu.get("vm_image_format") == "docker":
640 vm_image = vu.get("vm_image")
641 docker_path = os.path.join(
642 self.package_content_path,
643 make_relative_path(vm_image))
644 self.local_docker_files[vnf_container_name] = docker_path
645 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
646 for cu in v.get("cloudnative_deployment_units", []):
647 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
648 image = cu.get("image")
649 docker_path = os.path.join(
650 self.package_content_path,
651 make_relative_path(image))
652 self.local_docker_files[vnf_container_name] = docker_path
653 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
654
655 def _load_docker_urls(self):
656 """
657 Get all URLs to pre-build docker images in some repo.
658 :return:
659 """
660 for vnf_id, v in self.vnfds.iteritems():
661 for vu in v.get("virtual_deployment_units", []):
662 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
663 if vu.get("vm_image_format") == "docker":
664 url = vu.get("vm_image")
665 if url is not None:
666 url = url.replace("http://", "")
667 self.remote_docker_image_urls[vnf_container_name] = url
668 LOG.debug("Found Docker image URL (%r): %r" %
669 (vnf_container_name,
670 self.remote_docker_image_urls[vnf_container_name]))
671 for cu in v.get("cloudnative_deployment_units", []):
672 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
673 url = cu.get("image")
674 if url is not None:
675 url = url.replace("http://", "")
676 self.remote_docker_image_urls[vnf_container_name] = url
677 LOG.debug("Found Docker image URL (%r): %r" %
678 (vnf_container_name,
679 self.remote_docker_image_urls[vnf_container_name]))
680
681 def _build_images_from_dockerfiles(self):
682 """
683 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
684 """
685 if GK_STANDALONE_MODE:
686 return # do not build anything in standalone mode
687 dc = DockerClient()
688 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
689 self.local_docker_files))
690 for k, v in self.local_docker_files.iteritems():
691 for line in dc.build(path=v.replace(
692 "Dockerfile", ""), tag=k, rm=False, nocache=False):
693 LOG.debug("DOCKER BUILD: %s" % line)
694 LOG.info("Docker image created: %s" % k)
695
696 def _pull_predefined_dockerimages(self):
697 """
698 If the package contains URLs to pre-build Docker images, we download them with this method.
699 """
700 dc = DockerClient()
701 for url in self.remote_docker_image_urls.itervalues():
702 # only pull if not present (speedup for development)
703 if not FORCE_PULL:
704 if len(dc.images.list(name=url)) > 0:
705 LOG.debug("Image %r present. Skipping pull." % url)
706 continue
707 LOG.info("Pulling image: %r" % url)
708 # this seems to fail with latest docker api version 2.0.2
709 # dc.images.pull(url,
710 # insecure_registry=True)
711 # using docker cli instead
712 cmd = ["docker",
713 "pull",
714 url,
715 ]
716 Popen(cmd).wait()
717
718 def _check_docker_image_exists(self, image_name):
719 """
720 Query the docker service and check if the given image exists
721 :param image_name: name of the docker image
722 :return:
723 """
724 return len(DockerClient().images.list(name=image_name)) > 0
725
726 def _calculate_placement(self, algorithm):
727 """
728 Do placement by adding the a field "dc" to
729 each VNFD that points to one of our
730 data center objects known to the gatekeeper.
731 """
732 assert(len(self.vnfds) > 0)
733 assert(len(GK.dcs) > 0)
734 # instantiate algorithm an place
735 p = algorithm()
736 p.place(self.nsd, self.vnfds, GK.dcs)
737 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
738 # lets print the placement result
739 for name, vnfd in self.vnfds.iteritems():
740 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
741
742 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
743 """
744 Calculate cpu period and quota for CFS
745 :param cpu_time_percentage: percentage of overall CPU to be used
746 :return: cpu_period, cpu_quota
747 """
748 if cpu_time_percentage is None:
749 return -1, -1
750 if cpu_time_percentage < 0:
751 return -1, -1
752 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
753 # Attention minimum cpu_quota is 1ms (micro)
754 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
755 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
756 (cpu_period, cpu_time_percentage))
757 # calculate the fraction of cpu time for this container
758 cpu_quota = cpu_period * cpu_time_percentage
759 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
760 # idea why
761 if cpu_quota < 1000:
762 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
763 cpu_quota = 1000
764 LOG.warning("Increased CPU quota to avoid system error.")
765 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
766 (cpu_period, cpu_quota))
767 return int(cpu_period), int(cpu_quota)
768
769
770 """
771 Some (simple) placement algorithms
772 """
773
774
775 class FirstDcPlacement(object):
776 """
777 Placement: Always use one and the same data center from the GK.dcs dict.
778 """
779
780 def place(self, nsd, vnfds, dcs):
781 for id, vnfd in vnfds.iteritems():
782 vnfd["dc"] = list(dcs.itervalues())[0]
783
784
785 class RoundRobinDcPlacement(object):
786 """
787 Placement: Distribute VNFs across all available DCs in a round robin fashion.
788 """
789
790 def place(self, nsd, vnfds, dcs):
791 c = 0
792 dcs_list = list(dcs.itervalues())
793 for id, vnfd in vnfds.iteritems():
794 vnfd["dc"] = dcs_list[c % len(dcs_list)]
795 c += 1 # inc. c to use next DC
796
797
798 """
799 Resource definitions and API endpoints
800 """
801
802
803 class Packages(fr.Resource):
804
805 def post(self):
806 """
807 Upload a *.son service package to the dummy gatekeeper.
808
809 We expect request with a *.son file and store it in UPLOAD_FOLDER
810 :return: UUID
811 """
812 try:
813 # get file contents
814 LOG.info("POST /packages called")
815 # lets search for the package in the request
816 is_file_object = False # make API more robust: file can be in data or in files field
817 if "package" in request.files:
818 son_file = request.files["package"]
819 is_file_object = True
820 elif len(request.data) > 0:
821 son_file = request.data
822 else:
823 return {"service_uuid": None, "size": 0, "sha1": None,
824 "error": "upload failed. file not found."}, 500
825 # generate a uuid to reference this package
826 service_uuid = str(uuid.uuid4())
827 file_hash = hashlib.sha1(str(son_file)).hexdigest()
828 # ensure that upload folder exists
829 ensure_dir(UPLOAD_FOLDER)
830 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
831 # store *.son file to disk
832 if is_file_object:
833 son_file.save(upload_path)
834 else:
835 with open(upload_path, 'wb') as f:
836 f.write(son_file)
837 size = os.path.getsize(upload_path)
838
839 # first stop and delete any other running services
840 if AUTO_DELETE:
841 service_list = copy.copy(GK.services)
842 for service_uuid in service_list:
843 instances_list = copy.copy(
844 GK.services[service_uuid].instances)
845 for instance_uuid in instances_list:
846 # valid service and instance UUID, stop service
847 GK.services.get(service_uuid).stop_service(
848 instance_uuid)
849 LOG.info("service instance with uuid %r stopped." %
850 instance_uuid)
851
852 # create a service object and register it
853 s = Service(service_uuid, file_hash, upload_path)
854 GK.register_service_package(service_uuid, s)
855
856 # automatically deploy the service
857 if AUTO_DEPLOY:
858 # ok, we have a service uuid, lets start the service
859 reset_subnets()
860 GK.services.get(service_uuid).start_service()
861
862 # generate the JSON result
863 return {"service_uuid": service_uuid, "size": size,
864 "sha1": file_hash, "error": None}, 201
865 except BaseException:
866 LOG.exception("Service package upload failed:")
867 return {"service_uuid": None, "size": 0,
868 "sha1": None, "error": "upload failed"}, 500
869
870 def get(self):
871 """
872 Return a list of UUID's of uploaded service packages.
873 :return: dict/list
874 """
875 LOG.info("GET /packages")
876 return {"service_uuid_list": list(GK.services.iterkeys())}
877
878
879 class Instantiations(fr.Resource):
880
881 def post(self):
882 """
883 Instantiate a service specified by its UUID.
884 Will return a new UUID to identify the running service instance.
885 :return: UUID
886 """
887 LOG.info("POST /instantiations (or /requests) called")
888 # try to extract the service uuid from the request
889 json_data = request.get_json(force=True)
890 service_uuid = json_data.get("service_uuid")
891 service_name = json_data.get("service_name")
892
893 # first try to find by service_name
894 if service_name is not None:
895 for s_uuid, s in GK.services.iteritems():
896 if s.manifest.get("name") == service_name:
897 LOG.info("Found service: {} with UUID: {}"
898 .format(service_name, s_uuid))
899 service_uuid = s_uuid
900 # lets be a bit fuzzy here to make testing easier
901 if (service_uuid is None or service_uuid ==
902 "latest") and len(GK.services) > 0:
903 # if we don't get a service uuid, we simple start the first service
904 # in the list
905 service_uuid = list(GK.services.iterkeys())[0]
906 if service_uuid in GK.services:
907 # ok, we have a service uuid, lets start the service
908 service_instance_uuid = GK.services.get(
909 service_uuid).start_service()
910 return {"service_instance_uuid": service_instance_uuid}, 201
911 return "Service not found", 404
912
913 def get(self):
914 """
915 Returns a list of UUIDs containing all running services.
916 :return: dict / list
917 """
918 LOG.info("GET /instantiations")
919 return {"service_instantiations_list": [
920 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
921
922 def delete(self):
923 """
924 Stops a running service specified by its service and instance UUID.
925 """
926 # try to extract the service and instance UUID from the request
927 json_data = request.get_json(force=True)
928 service_uuid_input = json_data.get("service_uuid")
929 instance_uuid_input = json_data.get("service_instance_uuid")
930 if len(GK.services) < 1:
931 return "No service on-boarded.", 404
932 # try to be fuzzy
933 if service_uuid_input is None:
934 # if we don't get a service uuid we stop all services
935 service_uuid_list = list(GK.services.iterkeys())
936 LOG.info("No service_uuid given, stopping all.")
937 else:
938 service_uuid_list = [service_uuid_input]
939 # for each service
940 for service_uuid in service_uuid_list:
941 if instance_uuid_input is None:
942 instance_uuid_list = list(
943 GK.services[service_uuid].instances.iterkeys())
944 else:
945 instance_uuid_list = [instance_uuid_input]
946 # for all service instances
947 for instance_uuid in instance_uuid_list:
948 if (service_uuid in GK.services and
949 instance_uuid in GK.services[service_uuid].instances):
950 # valid service and instance UUID, stop service
951 GK.services.get(service_uuid).stop_service(instance_uuid)
952 LOG.info("Service instance with uuid %r stopped." % instance_uuid)
953 return "Service(s) stopped.", 200
954
955
956 class Exit(fr.Resource):
957
958 def put(self):
959 """
960 Stop the running Containernet instance regardless of data transmitted
961 """
962 list(GK.dcs.values())[0].net.stop()
963
964
965 def generate_subnets(prefix, base, subnet_size=50, mask=24):
966 # Generate a list of ipaddress in subnets
967 r = list()
968 for net in range(base, base + subnet_size):
969 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
970 r.append(ipaddress.ip_network(unicode(subnet)))
971 return r
972
973
974 def reset_subnets():
975 global ELINE_SUBNETS
976 global ELAN_SUBNETS
977 # private subnet definitions for the generated interfaces
978 # 30.0.xxx.0/24
979 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
980 # 20.0.xxx.0/24
981 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
982
983
984 def initialize_GK():
985 global GK
986 GK = Gatekeeper()
987
988
989 # create a single, global GK object
990 GK = None
991 initialize_GK()
992 # setup Flask
993 app = Flask(__name__)
994 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
995 api = fr.Api(app)
996 # define endpoints
997 api.add_resource(Packages, '/packages', '/api/v2/packages')
998 api.add_resource(Instantiations, '/instantiations',
999 '/api/v2/instantiations', '/api/v2/requests')
1000 api.add_resource(Exit, '/emulator/exit')
1001
1002
1003 def start_rest_api(host, port, datacenters=dict()):
1004 GK.dcs = datacenters
1005 GK.net = get_dc_network()
1006 # start the Flask server (not the best performance but ok for our use case)
1007 app.run(host=host,
1008 port=port,
1009 debug=True,
1010 use_reloader=False # this is needed to run Flask in a non-main thread
1011 )
1012
1013
1014 def ensure_dir(name):
1015 if not os.path.exists(name):
1016 os.makedirs(name)
1017
1018
1019 def load_yaml(path):
1020 with open(path, "r") as f:
1021 try:
1022 r = yaml.load(f)
1023 except yaml.YAMLError as exc:
1024 LOG.exception("YAML parse error: %r" % str(exc))
1025 r = dict()
1026 return r
1027
1028
1029 def make_relative_path(path):
1030 if path.startswith("file://"):
1031 path = path.replace("file://", "", 1)
1032 if path.startswith("/"):
1033 path = path.replace("/", "", 1)
1034 return path
1035
1036
1037 def get_dc_network():
1038 """
1039 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1040 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1041 :return:
1042 """
1043 assert (len(GK.dcs) > 0)
1044 return GK.dcs.values()[0].net
1045
1046
1047 def parse_interface(interface_name):
1048 """
1049 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1050 :param interface_name:
1051 :return:
1052 """
1053 if ':' in interface_name:
1054 vnf_id, vnf_interface = interface_name.split(':')
1055 else:
1056 vnf_id = None
1057 vnf_interface = interface_name
1058 return vnf_id, vnf_interface
1059
1060
1061 def get_container_name(vnf_id, vdu_id):
1062 return "{}.{}".format(vnf_id, vdu_id)
1063
1064
1065 if __name__ == '__main__':
1066 """
1067 Lets allow to run the API in standalone mode.
1068 """
1069 GK_STANDALONE_MODE = True
1070 logging.getLogger("werkzeug").setLevel(logging.INFO)
1071 start_rest_api("0.0.0.0", 8000)