b3916bb74afc84151d081e899fa47eeecee74d94
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from subprocess import Popen
43 import ipaddress
44 import copy
45 import time
46
47
48 LOG = logging.getLogger("5gtango.llcm")
49 LOG.setLevel(logging.INFO)
50
51
52 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
53 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
54 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
55
56 # Enable Dockerfile build functionality
57 BUILD_DOCKERFILE = False
58
59 # flag to indicate that we run without the emulator (only the bare API for
60 # integration testing)
61 GK_STANDALONE_MODE = False
62
63 # should a new version of an image be pulled even if its available
64 FORCE_PULL = False
65
66 # flag to indicate if we use bidirectional forwarding rules in the
67 # automatic chaining process
68 BIDIRECTIONAL_CHAIN = True
69
70 # override the management interfaces in the descriptors with default
71 # docker0 interfaces in the containers
72 USE_DOCKER_MGMT = False
73
74 # automatically deploy uploaded packages (no need to execute son-access
75 # deploy --latest separately)
76 AUTO_DEPLOY = False
77
78 # and also automatically terminate any other running services
79 AUTO_DELETE = False
80
81 # global subnet definitions (see reset_subnets())
82 ELAN_SUBNETS = None
83 ELINE_SUBNETS = None
84
85 # Time in seconds to wait for vnf stop scripts to execute fully
86 VNF_STOP_WAIT_TIME = 5
87
88
89 class OnBoardingException(BaseException):
90 pass
91
92
93 class Gatekeeper(object):
94
95 def __init__(self):
96 self.services = dict()
97 self.dcs = dict()
98 self.net = None
99 # used to generate short names for VNFs (Mininet limitation)
100 self.vnf_counter = 0
101 reset_subnets()
102 LOG.info("Initialized 5GTANGO LLCM module.")
103
104 def register_service_package(self, service_uuid, service):
105 """
106 register new service package
107 :param service_uuid
108 :param service object
109 """
110 self.services[service_uuid] = service
111 # lets perform all steps needed to onboard the service
112 service.onboard()
113
114
115 class Service(object):
116 """
117 This class represents a NS uploaded as a *.son package to the
118 dummy gatekeeper.
119 Can have multiple running instances of this service.
120 """
121
122 def __init__(self,
123 service_uuid,
124 package_file_hash,
125 package_file_path):
126 self.uuid = service_uuid
127 self.package_file_hash = package_file_hash
128 self.package_file_path = package_file_path
129 self.package_content_path = os.path.join(
130 CATALOG_FOLDER, "services/%s" % self.uuid)
131 self.manifest = None
132 self.nsd = None
133 self.vnfds = dict()
134 self.local_docker_files = dict()
135 self.remote_docker_image_urls = dict()
136 self.instances = dict()
137
138 def onboard(self):
139 """
140 Do all steps to prepare this service to be instantiated
141 :return:
142 """
143 # 1. extract the contents of the package and store them in our catalog
144 self._unpack_service_package()
145 # 2. read in all descriptor files
146 self._load_package_descriptor()
147 self._load_nsd()
148 self._load_vnfd()
149 if self.nsd is None:
150 raise OnBoardingException("No NSD found.")
151 if len(self.vnfds) < 1:
152 raise OnBoardingException("No VNFDs found.")
153 # 3. prepare container images (e.g. download or build Dockerfile)
154 if BUILD_DOCKERFILE:
155 self._load_docker_files()
156 self._build_images_from_dockerfiles()
157 else:
158 self._load_docker_urls()
159 self._pull_predefined_dockerimages()
160 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
161
162 def start_service(self):
163 """
164 This methods creates and starts a new service instance.
165 It computes placements, iterates over all VNFDs, and starts
166 each VNFD as a Docker container in the data center selected
167 by the placement algorithm.
168 :return:
169 """
170 LOG.info("Starting service %r" % self.uuid)
171
172 # 1. each service instance gets a new uuid to identify it
173 instance_uuid = str(uuid.uuid4())
174 # build a instances dict (a bit like a NSR :))
175 self.instances[instance_uuid] = dict()
176 self.instances[instance_uuid]["vnf_instances"] = list()
177
178 # 2. compute placement of this service instance (adds DC names to
179 # VNFDs)
180 # self._calculate_placement(FirstDcPlacement)
181 self._calculate_placement(RoundRobinDcPlacement)
182 # 3. start all vnfds that we have in the service
183 for vnf_id in self.vnfds:
184 vnfd = self.vnfds[vnf_id]
185 # attention: returns a list of started deployment units
186 vnfis = self._start_vnfd(vnfd, vnf_id)
187 # add list of VNFIs to total VNFI list
188 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
189
190 # 4. Deploy E-Line, E-Tree and E-LAN links
191 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
192 # even if "forwarding_graphs" are not used directly.
193 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
194 vlinks = self.nsd["virtual_links"]
195 # constituent virtual links are not checked
196 eline_fwd_links = [l for l in vlinks if (
197 l["connectivity_type"] == "E-Line")]
198 elan_fwd_links = [l for l in vlinks if (
199 l["connectivity_type"] == "E-LAN" or
200 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
201
202 # 5a. deploy E-Line links
203 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
204 self._connect_elines(eline_fwd_links, instance_uuid)
205 # 5b. deploy E-Tree/E-LAN links
206 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
207 self._connect_elans(elan_fwd_links, instance_uuid)
208
209 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
210 # service instance
211 self._trigger_emulator_start_scripts_in_vnfis(
212 self.instances[instance_uuid]["vnf_instances"])
213 # done
214 LOG.info("Service started. Instance id: %r" % instance_uuid)
215 return instance_uuid
216
217 def stop_service(self, instance_uuid):
218 """
219 This method stops a running service instance.
220 It iterates over all VNF instances, stopping them each
221 and removing them from their data center.
222 :param instance_uuid: the uuid of the service instance to be stopped
223 """
224 LOG.info("Stopping service %r" % self.uuid)
225 # get relevant information
226 # instance_uuid = str(self.uuid.uuid4())
227 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
228 # trigger stop skripts in vnf instances and wait a few seconds for
229 # completion
230 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
231 time.sleep(VNF_STOP_WAIT_TIME)
232 # stop all vnfs
233 for v in vnf_instances:
234 self._stop_vnfi(v)
235 # last step: remove the instance from the list of all instances
236 del self.instances[instance_uuid]
237
238 def _get_resource_limits(self, deployment_unit):
239 """
240 Extract resource limits from deployment units.
241 """
242 # defaults
243 cpu_list = None
244 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
245 mem_limit = None
246 # update from descriptor
247 if "resource_requirements" in deployment_unit:
248 res_req = deployment_unit.get("resource_requirements")
249 cpu_list = res_req.get("cpu").get("cpuset")
250 if cpu_list is None:
251 cpu_list = res_req.get("cpu").get("vcpus")
252 if cpu_list is not None:
253 # attention: docker expects list as string w/o spaces:
254 cpu_list = str(cpu_list).replace(" ", "").strip()
255 cpu_bw = res_req.get("cpu").get("cpu_bw")
256 if cpu_bw is None:
257 cpu_bw = 1.0
258 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
259 mem_limit = res_req.get("memory").get("size")
260 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
261 if mem_limit is not None:
262 mem_limit = int(mem_limit)
263 # to bytes
264 if "G" in mem_unit:
265 mem_limit = mem_limit * 1024 * 1024 * 1024
266 elif "M" in mem_unit:
267 mem_limit = mem_limit * 1024 * 1024
268 elif "K" in mem_unit:
269 mem_limit = mem_limit * 1024
270 return cpu_list, cpu_period, cpu_quota, mem_limit
271
272 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
273 """
274 Start a single VNFD of this service
275 :param vnfd: vnfd descriptor dict
276 :param vnf_id: unique id of this vnf in the nsd
277 :return:
278 """
279 vnfis = list()
280 # the vnf_name refers to the container image to be deployed
281 vnf_name = vnfd.get("name")
282 # combine VDUs and CDUs
283 deployment_units = (vnfd.get("virtual_deployment_units", []) +
284 vnfd.get("cloudnative_deployment_units", []))
285 # iterate over all deployment units within each VNFDs
286 for u in deployment_units:
287 # 0. vnf_container_name = vnf_id.vdu_id
288 vnf_container_name = get_container_name(vnf_id, u.get("id"))
289 # 1. get the name of the docker image to star
290 if vnf_container_name not in self.remote_docker_image_urls:
291 raise Exception("No image name for %r found. Abort." % vnf_container_name)
292 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
293 # 2. select datacenter to start the VNF in
294 target_dc = vnfd.get("dc")
295 # 3. perform some checks to ensure we can start the container
296 assert(docker_image_name is not None)
297 assert(target_dc is not None)
298 if not self._check_docker_image_exists(docker_image_name):
299 raise Exception("Docker image {} not found. Abort."
300 .format(docker_image_name))
301
302 # 4. get the resource limits
303 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
304
305 # get connection points defined for the DU
306 intfs = u.get("connection_points", [])
307 # do some re-naming of fields to be compatible to containernet
308 for i in intfs:
309 if i.get("address"):
310 i["ip"] = i.get("address")
311
312 # 5. collect additional information to start container
313 volumes = list()
314 cenv = dict()
315 # 5.1 inject descriptor based start/stop commands into env (overwrite)
316 VNFD_CMD_START = u.get("vm_cmd_start")
317 VNFD_CMD_STOP = u.get("vm_cmd_stop")
318 if VNFD_CMD_START and not VNFD_CMD_START == "None":
319 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
320 " Overwriting SON_EMU_CMD.")
321 cenv["SON_EMU_CMD"] = VNFD_CMD_START
322 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
323 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
324 " Overwriting SON_EMU_CMD_STOP.")
325 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
326
327 # 6. Start the container
328 LOG.info("Starting %r as %r in DC %r" %
329 (vnf_name, vnf_container_name, vnfd.get("dc")))
330 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
331 # start the container
332 vnfi = target_dc.startCompute(
333 vnf_container_name,
334 network=intfs,
335 image=docker_image_name,
336 cpu_quota=cpu_quota,
337 cpu_period=cpu_period,
338 cpuset_cpus=cpu_list,
339 mem_limit=mem_limit,
340 volumes=volumes,
341 properties=cenv, # environment
342 type=kwargs.get('type', 'docker'))
343 # add vnfd reference to vnfi
344 vnfi.vnfd = vnfd
345 # add container name
346 vnfi.vnf_container_name = vnf_container_name
347 # store vnfi
348 vnfis.append(vnfi)
349 return vnfis
350
351 def _stop_vnfi(self, vnfi):
352 """
353 Stop a VNF instance.
354 :param vnfi: vnf instance to be stopped
355 """
356 # Find the correct datacenter
357 status = vnfi.getStatus()
358 dc = vnfi.datacenter
359 # stop the vnfi
360 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
361 (status["name"], dc))
362 dc.stopCompute(status["name"])
363
364 def _get_vnf_instance(self, instance_uuid, vnf_id):
365 """
366 Returns VNFI object for a given "vnf_id" or "vnf_container_namse" taken from an NSD.
367 :return: single object
368 """
369 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
370 if str(vnfi.name) == str(vnf_id):
371 return vnfi
372 LOG.warning("No container with name: {0} found.".format(vnf_id))
373 return None
374
375 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
376 """
377 Returns a list of VNFI objects (all deployment units) for a given
378 "vnf_id" taken from an NSD.
379 :return: list
380 """
381 r = list()
382 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
383 if vnf_id in vnfi.name:
384 r.append(vnfi)
385 if len(r) > 0:
386 LOG.debug("Found units: {} for vnf_id: {}"
387 .format([i.name for i in r], vnf_id))
388 return r
389 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
390 return None
391
392 @staticmethod
393 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
394 """
395 Reconfigure the network configuration of a specific interface
396 of a running container.
397 :param vnfi: container instance
398 :param if_name: interface name
399 :param net_str: network configuration string, e.g., 1.2.3.4/24
400 :return:
401 """
402 # assign new ip address
403 if net_str is not None:
404 intf = vnfi.intf(intf=if_name)
405 if intf is not None:
406 intf.setIP(net_str)
407 LOG.debug("Reconfigured network of %s:%s to %r" %
408 (vnfi.name, if_name, net_str))
409 else:
410 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
411 vnfi.name, if_name))
412
413 if new_name is not None:
414 vnfi.cmd('ip link set', if_name, 'down')
415 vnfi.cmd('ip link set', if_name, 'name', new_name)
416 vnfi.cmd('ip link set', new_name, 'up')
417 LOG.debug("Reconfigured interface name of %s:%s to %s" %
418 (vnfi.name, if_name, new_name))
419
420 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
421 for vnfi in vnfi_list:
422 config = vnfi.dcinfo.get("Config", dict())
423 env = config.get("Env", list())
424 for env_var in env:
425 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
426 # LOG.debug("%r = %r" % (var, cmd))
427 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
428 LOG.info("Executing script in '{}': {}={}"
429 .format(vnfi.name, var, cmd))
430 # execute command in new thread to ensure that GK is not
431 # blocked by VNF
432 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
433 t.daemon = True
434 t.start()
435 break # only execute one command
436
437 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
438 for vnfi in vnfi_list:
439 config = vnfi.dcinfo.get("Config", dict())
440 env = config.get("Env", list())
441 for env_var in env:
442 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
443 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
444 LOG.info("Executing script in '{}': {}={}"
445 .format(vnfi.name, var, cmd))
446 # execute command in new thread to ensure that GK is not
447 # blocked by VNF
448 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
449 t.daemon = True
450 t.start()
451 break # only execute one command
452
453 def _unpack_service_package(self):
454 """
455 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
456 """
457 LOG.info("Unzipping: %r" % self.package_file_path)
458 with zipfile.ZipFile(self.package_file_path, "r") as z:
459 z.extractall(self.package_content_path)
460
461 def _load_package_descriptor(self):
462 """
463 Load the main package descriptor YAML and keep it as dict.
464 :return:
465 """
466 self.manifest = load_yaml(
467 os.path.join(
468 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
469
470 def _load_nsd(self):
471 """
472 Load the entry NSD YAML and keep it as dict.
473 :return:
474 """
475 if "package_content" in self.manifest:
476 nsd_path = None
477 for f in self.manifest.get("package_content"):
478 if f.get("content-type") == "application/vnd.5gtango.nsd":
479 nsd_path = os.path.join(
480 self.package_content_path,
481 make_relative_path(f.get("source")))
482 break # always use the first NSD for now
483 if nsd_path is None:
484 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
485 self.nsd = load_yaml(nsd_path)
486 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
487 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
488 else:
489 raise OnBoardingException(
490 "No 'package_content' section in package manifest:\n{}"
491 .format(self.manifest))
492
493 def _load_vnfd(self):
494 """
495 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
496 :return:
497 """
498 # first make a list of all the vnfds in the package
499 vnfd_set = dict()
500 if "package_content" in self.manifest:
501 for pc in self.manifest.get("package_content"):
502 if pc.get(
503 "content-type") == "application/vnd.5gtango.vnfd":
504 vnfd_path = os.path.join(
505 self.package_content_path,
506 make_relative_path(pc.get("source")))
507 vnfd = load_yaml(vnfd_path)
508 vnfd_set[vnfd.get("name")] = vnfd
509 if len(vnfd_set) < 1:
510 raise OnBoardingException("No VNFDs found.")
511 # then link each vnf_id in the nsd to its vnfd
512 for v in self.nsd.get("network_functions"):
513 if v.get("vnf_name") in vnfd_set:
514 self.vnfds[v.get("vnf_id")] = vnfd_set[v.get("vnf_name")]
515 LOG.debug("Loaded VNFD: {0} id: {1}"
516 .format(v.get("vnf_name"), v.get("vnf_id")))
517
518 def _connect_elines(self, eline_fwd_links, instance_uuid):
519 """
520 Connect all E-LINE links in the NSD
521 Attention: This method DOES NOT support multi V/CDU VNFs!
522 :param eline_fwd_links: list of E-LINE links in the NSD
523 :param: instance_uuid of the service
524 :return:
525 """
526 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
527 # eg. different services get a unique cookie for their flowrules
528 cookie = 1
529 for link in eline_fwd_links:
530 LOG.info("Found E-Line: {}".format(link))
531 # check if we need to deploy this link when its a management link:
532 if USE_DOCKER_MGMT:
533 if self.check_mgmt_interface(
534 link["connection_points_reference"]):
535 continue
536
537 src_id, src_if_name = parse_interface(
538 link["connection_points_reference"][0])
539 dst_id, dst_if_name = parse_interface(
540 link["connection_points_reference"][1])
541 setChaining = False
542 LOG.info("Creating E-Line: src={}, dst={}"
543 .format(src_id, dst_id))
544 # get involved vnfis
545 src_vnfi = self._get_vnf_instance(instance_uuid, src_id)
546 dst_vnfi = self._get_vnf_instance(instance_uuid, dst_id)
547
548 if src_vnfi is not None and dst_vnfi is not None:
549 setChaining = True
550 # re-configure the VNFs IP assignment and ensure that a new
551 # subnet is used for each E-Link
552 eline_net = ELINE_SUBNETS.pop(0)
553 ip1 = "{0}/{1}".format(str(eline_net[1]),
554 eline_net.prefixlen)
555 ip2 = "{0}/{1}".format(str(eline_net[2]),
556 eline_net.prefixlen)
557 # check if VNFs have fixed IPs (address field in VNFDs)
558 if (self._get_vnfd_cp_from_vnfi(src_vnfi, src_if_name)
559 .get("address") is None):
560 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
561 # check if VNFs have fixed IPs (address field in VNFDs)
562 if (self._get_vnfd_cp_from_vnfi(dst_vnfi, dst_if_name)
563 .get("address") is None):
564 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
565 # set the chaining
566 if setChaining:
567 GK.net.setChain(
568 src_id, dst_id,
569 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
570 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
571
572 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
573 """
574 Gets the connection point data structure from the VNFD
575 of the given VNFI using ifname.
576 """
577 if vnfi.vnfd is None:
578 return {}
579 cps = vnfi.vnfd.get("connection_points")
580 for cp in cps:
581 if cp.get("id") == ifname:
582 return cp
583
584 def _connect_elans(self, elan_fwd_links, instance_uuid):
585 """
586 Connect all E-LAN/E-Tree links in the NSD
587 This method supports multi-V/CDU VNFs if the connection
588 point names of the DUs are the same as the ones in the NSD.
589 :param elan_fwd_links: list of E-LAN links in the NSD
590 :param: instance_uuid of the service
591 :return:
592 """
593 for link in elan_fwd_links:
594 # a new E-LAN/E-Tree
595 elan_vnf_list = []
596 lan_net = ELAN_SUBNETS.pop(0)
597 lan_hosts = list(lan_net.hosts())
598
599 # generate lan ip address for all interfaces (of all involved (V/CDUs))
600 for intf in link["connection_points_reference"]:
601 vnf_id, intf_name = parse_interface(intf)
602 if vnf_id is None:
603 continue # skip references to NS connection points
604 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
605 if units is None:
606 continue # skip if no deployment unit is present
607 # iterate over all involved deployment units
608 for uvnfi in units:
609 # Attention: we apply a simplification for multi DU VNFs here:
610 # the connection points of all involved DUs have to have the same
611 # name as the connection points of the surrounding VNF to be mapped.
612 # This is because we do not consider links specified in the VNFds
613 container_name = uvnfi.name
614 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
615 lan_net.prefixlen)
616 LOG.debug(
617 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
618 container_name, intf_name, ip_address))
619 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
620 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
621 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
622 # necessary.
623 vnfi = self._get_vnf_instance(instance_uuid, container_name)
624 if vnfi is not None:
625 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
626 # add this vnf and interface to the E-LAN for tagging
627 elan_vnf_list.append(
628 {'name': container_name, 'interface': intf_name})
629 # install the VLAN tags for this E-LAN
630 GK.net.setLAN(elan_vnf_list)
631
632 def _load_docker_files(self):
633 """
634 Get all paths to Dockerfiles from VNFDs and store them in dict.
635 :return:
636 """
637 for vnf_id, v in self.vnfds.iteritems():
638 for vu in v.get("virtual_deployment_units", []):
639 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
640 if vu.get("vm_image_format") == "docker":
641 vm_image = vu.get("vm_image")
642 docker_path = os.path.join(
643 self.package_content_path,
644 make_relative_path(vm_image))
645 self.local_docker_files[vnf_container_name] = docker_path
646 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
647 for cu in v.get("cloudnative_deployment_units", []):
648 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
649 image = cu.get("image")
650 docker_path = os.path.join(
651 self.package_content_path,
652 make_relative_path(image))
653 self.local_docker_files[vnf_container_name] = docker_path
654 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
655
656 def _load_docker_urls(self):
657 """
658 Get all URLs to pre-build docker images in some repo.
659 :return:
660 """
661 for vnf_id, v in self.vnfds.iteritems():
662 for vu in v.get("virtual_deployment_units", []):
663 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
664 if vu.get("vm_image_format") == "docker":
665 url = vu.get("vm_image")
666 if url is not None:
667 url = url.replace("http://", "")
668 self.remote_docker_image_urls[vnf_container_name] = url
669 LOG.debug("Found Docker image URL (%r): %r" %
670 (vnf_container_name,
671 self.remote_docker_image_urls[vnf_container_name]))
672 for cu in v.get("cloudnative_deployment_units", []):
673 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
674 url = cu.get("image")
675 if url is not None:
676 url = url.replace("http://", "")
677 self.remote_docker_image_urls[vnf_container_name] = url
678 LOG.debug("Found Docker image URL (%r): %r" %
679 (vnf_container_name,
680 self.remote_docker_image_urls[vnf_container_name]))
681
682 def _build_images_from_dockerfiles(self):
683 """
684 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
685 """
686 if GK_STANDALONE_MODE:
687 return # do not build anything in standalone mode
688 dc = DockerClient()
689 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
690 self.local_docker_files))
691 for k, v in self.local_docker_files.iteritems():
692 for line in dc.build(path=v.replace(
693 "Dockerfile", ""), tag=k, rm=False, nocache=False):
694 LOG.debug("DOCKER BUILD: %s" % line)
695 LOG.info("Docker image created: %s" % k)
696
697 def _pull_predefined_dockerimages(self):
698 """
699 If the package contains URLs to pre-build Docker images, we download them with this method.
700 """
701 dc = DockerClient()
702 for url in self.remote_docker_image_urls.itervalues():
703 # only pull if not present (speedup for development)
704 if not FORCE_PULL:
705 if len(dc.images.list(name=url)) > 0:
706 LOG.debug("Image %r present. Skipping pull." % url)
707 continue
708 LOG.info("Pulling image: %r" % url)
709 # this seems to fail with latest docker api version 2.0.2
710 # dc.images.pull(url,
711 # insecure_registry=True)
712 # using docker cli instead
713 cmd = ["docker",
714 "pull",
715 url,
716 ]
717 Popen(cmd).wait()
718
719 def _check_docker_image_exists(self, image_name):
720 """
721 Query the docker service and check if the given image exists
722 :param image_name: name of the docker image
723 :return:
724 """
725 return len(DockerClient().images.list(name=image_name)) > 0
726
727 def _calculate_placement(self, algorithm):
728 """
729 Do placement by adding the a field "dc" to
730 each VNFD that points to one of our
731 data center objects known to the gatekeeper.
732 """
733 assert(len(self.vnfds) > 0)
734 assert(len(GK.dcs) > 0)
735 # instantiate algorithm an place
736 p = algorithm()
737 p.place(self.nsd, self.vnfds, GK.dcs)
738 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
739 # lets print the placement result
740 for name, vnfd in self.vnfds.iteritems():
741 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
742
743 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
744 """
745 Calculate cpu period and quota for CFS
746 :param cpu_time_percentage: percentage of overall CPU to be used
747 :return: cpu_period, cpu_quota
748 """
749 if cpu_time_percentage is None:
750 return -1, -1
751 if cpu_time_percentage < 0:
752 return -1, -1
753 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
754 # Attention minimum cpu_quota is 1ms (micro)
755 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
756 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
757 (cpu_period, cpu_time_percentage))
758 # calculate the fraction of cpu time for this container
759 cpu_quota = cpu_period * cpu_time_percentage
760 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
761 # idea why
762 if cpu_quota < 1000:
763 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
764 cpu_quota = 1000
765 LOG.warning("Increased CPU quota to avoid system error.")
766 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
767 (cpu_period, cpu_quota))
768 return int(cpu_period), int(cpu_quota)
769
770
771 """
772 Some (simple) placement algorithms
773 """
774
775
776 class FirstDcPlacement(object):
777 """
778 Placement: Always use one and the same data center from the GK.dcs dict.
779 """
780
781 def place(self, nsd, vnfds, dcs):
782 for id, vnfd in vnfds.iteritems():
783 vnfd["dc"] = list(dcs.itervalues())[0]
784
785
786 class RoundRobinDcPlacement(object):
787 """
788 Placement: Distribute VNFs across all available DCs in a round robin fashion.
789 """
790
791 def place(self, nsd, vnfds, dcs):
792 c = 0
793 dcs_list = list(dcs.itervalues())
794 for id, vnfd in vnfds.iteritems():
795 vnfd["dc"] = dcs_list[c % len(dcs_list)]
796 c += 1 # inc. c to use next DC
797
798
799 """
800 Resource definitions and API endpoints
801 """
802
803
804 class Packages(fr.Resource):
805
806 def post(self):
807 """
808 Upload a *.son service package to the dummy gatekeeper.
809
810 We expect request with a *.son file and store it in UPLOAD_FOLDER
811 :return: UUID
812 """
813 try:
814 # get file contents
815 LOG.info("POST /packages called")
816 # lets search for the package in the request
817 is_file_object = False # make API more robust: file can be in data or in files field
818 if "package" in request.files:
819 son_file = request.files["package"]
820 is_file_object = True
821 elif len(request.data) > 0:
822 son_file = request.data
823 else:
824 return {"service_uuid": None, "size": 0, "sha1": None,
825 "error": "upload failed. file not found."}, 500
826 # generate a uuid to reference this package
827 service_uuid = str(uuid.uuid4())
828 file_hash = hashlib.sha1(str(son_file)).hexdigest()
829 # ensure that upload folder exists
830 ensure_dir(UPLOAD_FOLDER)
831 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
832 # store *.son file to disk
833 if is_file_object:
834 son_file.save(upload_path)
835 else:
836 with open(upload_path, 'wb') as f:
837 f.write(son_file)
838 size = os.path.getsize(upload_path)
839
840 # first stop and delete any other running services
841 if AUTO_DELETE:
842 service_list = copy.copy(GK.services)
843 for service_uuid in service_list:
844 instances_list = copy.copy(
845 GK.services[service_uuid].instances)
846 for instance_uuid in instances_list:
847 # valid service and instance UUID, stop service
848 GK.services.get(service_uuid).stop_service(
849 instance_uuid)
850 LOG.info("service instance with uuid %r stopped." %
851 instance_uuid)
852
853 # create a service object and register it
854 s = Service(service_uuid, file_hash, upload_path)
855 GK.register_service_package(service_uuid, s)
856
857 # automatically deploy the service
858 if AUTO_DEPLOY:
859 # ok, we have a service uuid, lets start the service
860 reset_subnets()
861 GK.services.get(service_uuid).start_service()
862
863 # generate the JSON result
864 return {"service_uuid": service_uuid, "size": size,
865 "sha1": file_hash, "error": None}, 201
866 except BaseException:
867 LOG.exception("Service package upload failed:")
868 return {"service_uuid": None, "size": 0,
869 "sha1": None, "error": "upload failed"}, 500
870
871 def get(self):
872 """
873 Return a list of UUID's of uploaded service packages.
874 :return: dict/list
875 """
876 LOG.info("GET /packages")
877 return {"service_uuid_list": list(GK.services.iterkeys())}
878
879
880 class Instantiations(fr.Resource):
881
882 def post(self):
883 """
884 Instantiate a service specified by its UUID.
885 Will return a new UUID to identify the running service instance.
886 :return: UUID
887 """
888 LOG.info("POST /instantiations (or /requests) called")
889 # try to extract the service uuid from the request
890 json_data = request.get_json(force=True)
891 service_uuid = json_data.get("service_uuid")
892 service_name = json_data.get("service_name")
893
894 # first try to find by service_name
895 if service_name is not None:
896 for s_uuid, s in GK.services.iteritems():
897 if s.manifest.get("name") == service_name:
898 LOG.info("Found service: {} with UUID: {}"
899 .format(service_name, s_uuid))
900 service_uuid = s_uuid
901 # lets be a bit fuzzy here to make testing easier
902 if (service_uuid is None or service_uuid ==
903 "latest") and len(GK.services) > 0:
904 # if we don't get a service uuid, we simple start the first service
905 # in the list
906 service_uuid = list(GK.services.iterkeys())[0]
907 if service_uuid in GK.services:
908 # ok, we have a service uuid, lets start the service
909 service_instance_uuid = GK.services.get(
910 service_uuid).start_service()
911 return {"service_instance_uuid": service_instance_uuid}, 201
912 return "Service not found", 404
913
914 def get(self):
915 """
916 Returns a list of UUIDs containing all running services.
917 :return: dict / list
918 """
919 LOG.info("GET /instantiations")
920 return {"service_instantiations_list": [
921 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
922
923 def delete(self):
924 """
925 Stops a running service specified by its service and instance UUID.
926 """
927 # try to extract the service and instance UUID from the request
928 json_data = request.get_json(force=True)
929 service_uuid_input = json_data.get("service_uuid")
930 instance_uuid_input = json_data.get("service_instance_uuid")
931 if len(GK.services) < 1:
932 return "No service on-boarded.", 404
933 # try to be fuzzy
934 if service_uuid_input is None:
935 # if we don't get a service uuid we stop all services
936 service_uuid_list = list(GK.services.iterkeys())
937 LOG.info("No service_uuid given, stopping all.")
938 else:
939 service_uuid_list = [service_uuid_input]
940 # for each service
941 for service_uuid in service_uuid_list:
942 if instance_uuid_input is None:
943 instance_uuid_list = list(
944 GK.services[service_uuid].instances.iterkeys())
945 else:
946 instance_uuid_list = [instance_uuid_input]
947 # for all service instances
948 for instance_uuid in instance_uuid_list:
949 if (service_uuid in GK.services and
950 instance_uuid in GK.services[service_uuid].instances):
951 # valid service and instance UUID, stop service
952 GK.services.get(service_uuid).stop_service(instance_uuid)
953 LOG.info("Service instance with uuid %r stopped." % instance_uuid)
954 return "Service(s) stopped.", 200
955
956
957 class Exit(fr.Resource):
958
959 def put(self):
960 """
961 Stop the running Containernet instance regardless of data transmitted
962 """
963 list(GK.dcs.values())[0].net.stop()
964
965
966 def generate_subnets(prefix, base, subnet_size=50, mask=24):
967 # Generate a list of ipaddress in subnets
968 r = list()
969 for net in range(base, base + subnet_size):
970 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
971 r.append(ipaddress.ip_network(unicode(subnet)))
972 return r
973
974
975 def reset_subnets():
976 global ELINE_SUBNETS
977 global ELAN_SUBNETS
978 # private subnet definitions for the generated interfaces
979 # 30.0.xxx.0/24
980 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
981 # 20.0.xxx.0/24
982 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
983
984
985 def initialize_GK():
986 global GK
987 GK = Gatekeeper()
988
989
990 # create a single, global GK object
991 GK = None
992 initialize_GK()
993 # setup Flask
994 app = Flask(__name__)
995 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
996 api = fr.Api(app)
997 # define endpoints
998 api.add_resource(Packages, '/packages', '/api/v2/packages')
999 api.add_resource(Instantiations, '/instantiations',
1000 '/api/v2/instantiations', '/api/v2/requests')
1001 api.add_resource(Exit, '/emulator/exit')
1002
1003
1004 def start_rest_api(host, port, datacenters=dict()):
1005 GK.dcs = datacenters
1006 GK.net = get_dc_network()
1007 # start the Flask server (not the best performance but ok for our use case)
1008 app.run(host=host,
1009 port=port,
1010 debug=True,
1011 use_reloader=False # this is needed to run Flask in a non-main thread
1012 )
1013
1014
1015 def ensure_dir(name):
1016 if not os.path.exists(name):
1017 os.makedirs(name)
1018
1019
1020 def load_yaml(path):
1021 with open(path, "r") as f:
1022 try:
1023 r = yaml.load(f)
1024 except yaml.YAMLError as exc:
1025 LOG.exception("YAML parse error: %r" % str(exc))
1026 r = dict()
1027 return r
1028
1029
1030 def make_relative_path(path):
1031 if path.startswith("file://"):
1032 path = path.replace("file://", "", 1)
1033 if path.startswith("/"):
1034 path = path.replace("/", "", 1)
1035 return path
1036
1037
1038 def get_dc_network():
1039 """
1040 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1041 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1042 :return:
1043 """
1044 assert (len(GK.dcs) > 0)
1045 return GK.dcs.values()[0].net
1046
1047
1048 def parse_interface(interface_name):
1049 """
1050 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1051 :param interface_name:
1052 :return:
1053 """
1054 if ':' in interface_name:
1055 vnf_id, vnf_interface = interface_name.split(':')
1056 else:
1057 vnf_id = None
1058 vnf_interface = interface_name
1059 return vnf_id, vnf_interface
1060
1061
1062 def get_container_name(vnf_id, vdu_id):
1063 return "{}.{}".format(vnf_id, vdu_id)
1064
1065
1066 if __name__ == '__main__':
1067 """
1068 Lets allow to run the API in standalone mode.
1069 """
1070 GK_STANDALONE_MODE = True
1071 logging.getLogger("werkzeug").setLevel(logging.INFO)
1072 start_rest_api("0.0.0.0", 8000)