aae6be1bc54d9cdf4712454bc6b52395858424b1
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from subprocess import Popen
43 import ipaddress
44 import copy
45 import time
46
47
48 LOG = logging.getLogger("5gtango.llcm")
49 LOG.setLevel(logging.INFO)
50
51
52 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
53 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
54 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
55
56 # Enable Dockerfile build functionality
57 BUILD_DOCKERFILE = False
58
59 # flag to indicate that we run without the emulator (only the bare API for
60 # integration testing)
61 GK_STANDALONE_MODE = False
62
63 # should a new version of an image be pulled even if its available
64 FORCE_PULL = False
65
66 # flag to indicate if we use bidirectional forwarding rules in the
67 # automatic chaining process
68 BIDIRECTIONAL_CHAIN = True
69
70 # override the management interfaces in the descriptors with default
71 # docker0 interfaces in the containers
72 USE_DOCKER_MGMT = False
73
74 # automatically deploy uploaded packages (no need to execute son-access
75 # deploy --latest separately)
76 AUTO_DEPLOY = False
77
78 # and also automatically terminate any other running services
79 AUTO_DELETE = False
80
81 # global subnet definitions (see reset_subnets())
82 ELAN_SUBNETS = None
83 ELINE_SUBNETS = None
84
85 # Time in seconds to wait for vnf stop scripts to execute fully
86 VNF_STOP_WAIT_TIME = 5
87
88
89 class OnBoardingException(BaseException):
90 pass
91
92
93 class Gatekeeper(object):
94
95 def __init__(self):
96 self.services = dict()
97 self.dcs = dict()
98 self.net = None
99 # used to generate short names for VNFs (Mininet limitation)
100 self.vnf_counter = 0
101 reset_subnets()
102 LOG.info("Initialized 5GTANGO LLCM module.")
103
104 def register_service_package(self, service_uuid, service):
105 """
106 register new service package
107 :param service_uuid
108 :param service object
109 """
110 self.services[service_uuid] = service
111 # lets perform all steps needed to onboard the service
112 service.onboard()
113
114
115 class Service(object):
116 """
117 This class represents a NS uploaded as a *.son package to the
118 dummy gatekeeper.
119 Can have multiple running instances of this service.
120 """
121
122 def __init__(self,
123 service_uuid,
124 package_file_hash,
125 package_file_path):
126 self.uuid = service_uuid
127 self.package_file_hash = package_file_hash
128 self.package_file_path = package_file_path
129 self.package_content_path = os.path.join(
130 CATALOG_FOLDER, "services/%s" % self.uuid)
131 self.manifest = None
132 self.nsd = None
133 self.vnfds = dict()
134 self.local_docker_files = dict()
135 self.remote_docker_image_urls = dict()
136 self.instances = dict()
137
138 def onboard(self):
139 """
140 Do all steps to prepare this service to be instantiated
141 :return:
142 """
143 # 1. extract the contents of the package and store them in our catalog
144 self._unpack_service_package()
145 # 2. read in all descriptor files
146 self._load_package_descriptor()
147 self._load_nsd()
148 self._load_vnfd()
149 if self.nsd is None:
150 raise OnBoardingException("No NSD found.")
151 if len(self.vnfds) < 1:
152 raise OnBoardingException("No VNFDs found.")
153 # 3. prepare container images (e.g. download or build Dockerfile)
154 if BUILD_DOCKERFILE:
155 self._load_docker_files()
156 self._build_images_from_dockerfiles()
157 else:
158 self._load_docker_urls()
159 self._pull_predefined_dockerimages()
160 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
161
162 def start_service(self):
163 """
164 This methods creates and starts a new service instance.
165 It computes placements, iterates over all VNFDs, and starts
166 each VNFD as a Docker container in the data center selected
167 by the placement algorithm.
168 :return:
169 """
170 LOG.info("Starting service %r" % self.uuid)
171
172 # 1. each service instance gets a new uuid to identify it
173 instance_uuid = str(uuid.uuid4())
174 # build a instances dict (a bit like a NSR :))
175 self.instances[instance_uuid] = dict()
176 self.instances[instance_uuid]["vnf_instances"] = list()
177
178 # 2. compute placement of this service instance (adds DC names to
179 # VNFDs)
180 # self._calculate_placement(FirstDcPlacement)
181 self._calculate_placement(RoundRobinDcPlacement)
182 # 3. start all vnfds that we have in the service
183 for vnf_id in self.vnfds:
184 vnfd = self.vnfds[vnf_id]
185 # attention: returns a list of started deployment units
186 vnfis = self._start_vnfd(vnfd, vnf_id)
187 # add list of VNFIs to total VNFI list
188 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
189
190 # 4. Deploy E-Line, E-Tree and E-LAN links
191 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
192 # even if "forwarding_graphs" are not used directly.
193 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
194 vlinks = self.nsd["virtual_links"]
195 # constituent virtual links are not checked
196 eline_fwd_links = [l for l in vlinks if (
197 l["connectivity_type"] == "E-Line")]
198 elan_fwd_links = [l for l in vlinks if (
199 l["connectivity_type"] == "E-LAN" or
200 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
201
202 # 5a. deploy E-Line links
203 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
204 self._connect_elines(eline_fwd_links, instance_uuid)
205 # 5b. deploy E-Tree/E-LAN links
206 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
207 self._connect_elans(elan_fwd_links, instance_uuid)
208
209 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
210 # service instance
211 self._trigger_emulator_start_scripts_in_vnfis(
212 self.instances[instance_uuid]["vnf_instances"])
213 # done
214 LOG.info("Service started. Instance id: %r" % instance_uuid)
215 return instance_uuid
216
217 def stop_service(self, instance_uuid):
218 """
219 This method stops a running service instance.
220 It iterates over all VNF instances, stopping them each
221 and removing them from their data center.
222 :param instance_uuid: the uuid of the service instance to be stopped
223 """
224 LOG.info("Stopping service %r" % self.uuid)
225 # get relevant information
226 # instance_uuid = str(self.uuid.uuid4())
227 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
228 # trigger stop skripts in vnf instances and wait a few seconds for
229 # completion
230 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
231 time.sleep(VNF_STOP_WAIT_TIME)
232 # stop all vnfs
233 for v in vnf_instances:
234 self._stop_vnfi(v)
235 # last step: remove the instance from the list of all instances
236 del self.instances[instance_uuid]
237
238 def _get_resource_limits(self, deployment_unit):
239 """
240 Extract resource limits from deployment units.
241 """
242 # defaults
243 cpu_list = None
244 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
245 mem_limit = None
246 # update from descriptor
247 if "resource_requirements" in deployment_unit:
248 res_req = deployment_unit.get("resource_requirements")
249 cpu_list = res_req.get("cpu").get("cpuset")
250 if cpu_list is None:
251 cpu_list = res_req.get("cpu").get("vcpus")
252 if cpu_list is not None:
253 # attention: docker expects list as string w/o spaces:
254 cpu_list = str(cpu_list).replace(" ", "").strip()
255 cpu_bw = res_req.get("cpu").get("cpu_bw")
256 if cpu_bw is None:
257 cpu_bw = 1.0
258 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
259 mem_limit = res_req.get("memory").get("size")
260 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
261 if mem_limit is not None:
262 mem_limit = int(mem_limit)
263 # to bytes
264 if "G" in mem_unit:
265 mem_limit = mem_limit * 1024 * 1024 * 1024
266 elif "M" in mem_unit:
267 mem_limit = mem_limit * 1024 * 1024
268 elif "K" in mem_unit:
269 mem_limit = mem_limit * 1024
270 return cpu_list, cpu_period, cpu_quota, mem_limit
271
272 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
273 """
274 Start a single VNFD of this service
275 :param vnfd: vnfd descriptor dict
276 :param vnf_id: unique id of this vnf in the nsd
277 :return:
278 """
279 vnfis = list()
280 # the vnf_name refers to the container image to be deployed
281 vnf_name = vnfd.get("name")
282 # combine VDUs and CDUs
283 deployment_units = (vnfd.get("virtual_deployment_units", []) +
284 vnfd.get("cloudnative_deployment_units", []))
285 # iterate over all deployment units within each VNFDs
286 for u in deployment_units:
287 # 0. vnf_container_name = vnf_id.vdu_id
288 vnf_container_name = get_container_name(vnf_id, u.get("id"))
289 # 1. get the name of the docker image to star
290 if vnf_container_name not in self.remote_docker_image_urls:
291 raise Exception("No image name for %r found. Abort." % vnf_container_name)
292 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
293 # 2. select datacenter to start the VNF in
294 target_dc = vnfd.get("dc")
295 # 3. perform some checks to ensure we can start the container
296 assert(docker_image_name is not None)
297 assert(target_dc is not None)
298 if not self._check_docker_image_exists(docker_image_name):
299 raise Exception("Docker image {} not found. Abort."
300 .format(docker_image_name))
301
302 # 4. get the resource limits
303 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
304
305 # get connection points defined for the DU
306 intfs = u.get("connection_points", [])
307 # do some re-naming of fields to be compatible to containernet
308 for i in intfs:
309 if i.get("address"):
310 i["ip"] = i.get("address")
311
312 # get ports and port_bindings from the port and publish fields of CNFD
313 # see: https://github.com/containernet/containernet/wiki/Exposing-and-mapping-network-ports
314 ports = list() # Containernet naming
315 port_bindings = dict()
316 for i in intfs:
317 if i.get("port"):
318 if not isinstance(i.get("port"), int):
319 LOG.error("Field 'port' is no int CP: {}".format(i))
320 else:
321 ports.append(i.get("port"))
322 if i.get("publish"):
323 if not isinstance(i.get("publish"), dict):
324 LOG.error("Field 'publish' is no dict CP: {}".format(i))
325 else:
326 port_bindings.update(i.get("publish"))
327 if len(ports) > 0:
328 LOG.info("{} exposes ports: {}".format(vnf_container_name, ports))
329 if len(port_bindings) > 0:
330 LOG.info("{} publishes ports: {}".format(vnf_container_name, port_bindings))
331
332 # 5. collect additional information to start container
333 volumes = list()
334 cenv = dict()
335 # 5.1 inject descriptor based start/stop commands into env (overwrite)
336 VNFD_CMD_START = u.get("vm_cmd_start")
337 VNFD_CMD_STOP = u.get("vm_cmd_stop")
338 if VNFD_CMD_START and not VNFD_CMD_START == "None":
339 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
340 " Overwriting SON_EMU_CMD.")
341 cenv["SON_EMU_CMD"] = VNFD_CMD_START
342 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
343 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
344 " Overwriting SON_EMU_CMD_STOP.")
345 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
346
347 # 6. Start the container
348 LOG.info("Starting %r as %r in DC %r" %
349 (vnf_name, vnf_container_name, vnfd.get("dc")))
350 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
351 # start the container
352 vnfi = target_dc.startCompute(
353 vnf_container_name,
354 network=intfs,
355 image=docker_image_name,
356 cpu_quota=cpu_quota,
357 cpu_period=cpu_period,
358 cpuset_cpus=cpu_list,
359 mem_limit=mem_limit,
360 volumes=volumes,
361 properties=cenv, # environment
362 ports=ports,
363 port_bindings=port_bindings,
364 type=kwargs.get('type', 'docker'))
365 # add vnfd reference to vnfi
366 vnfi.vnfd = vnfd
367 # add container name
368 vnfi.vnf_container_name = vnf_container_name
369 # store vnfi
370 vnfis.append(vnfi)
371 return vnfis
372
373 def _stop_vnfi(self, vnfi):
374 """
375 Stop a VNF instance.
376 :param vnfi: vnf instance to be stopped
377 """
378 # Find the correct datacenter
379 status = vnfi.getStatus()
380 dc = vnfi.datacenter
381 # stop the vnfi
382 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
383 (status["name"], dc))
384 dc.stopCompute(status["name"])
385
386 def _get_vnf_instance(self, instance_uuid, vnf_id):
387 """
388 Returns VNFI object for a given "vnf_id" or "vnf_container_name" taken from an NSD.
389 :return: single object
390 """
391 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
392 if str(vnfi.name) == str(vnf_id):
393 return vnfi
394 LOG.warning("No container with name: {0} found.".format(vnf_id))
395 return None
396
397 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
398 """
399 Returns a list of VNFI objects (all deployment units) for a given
400 "vnf_id" taken from an NSD.
401 :return: list
402 """
403 if vnf_id is None:
404 return None
405 r = list()
406 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
407 if vnf_id in vnfi.name:
408 r.append(vnfi)
409 if len(r) > 0:
410 LOG.debug("Found units: {} for vnf_id: {}"
411 .format([i.name for i in r], vnf_id))
412 return r
413 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
414 return None
415
416 @staticmethod
417 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
418 """
419 Reconfigure the network configuration of a specific interface
420 of a running container.
421 :param vnfi: container instance
422 :param if_name: interface name
423 :param net_str: network configuration string, e.g., 1.2.3.4/24
424 :return:
425 """
426 # assign new ip address
427 if net_str is not None:
428 intf = vnfi.intf(intf=if_name)
429 if intf is not None:
430 intf.setIP(net_str)
431 LOG.debug("Reconfigured network of %s:%s to %r" %
432 (vnfi.name, if_name, net_str))
433 else:
434 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
435 vnfi.name, if_name))
436
437 if new_name is not None:
438 vnfi.cmd('ip link set', if_name, 'down')
439 vnfi.cmd('ip link set', if_name, 'name', new_name)
440 vnfi.cmd('ip link set', new_name, 'up')
441 LOG.debug("Reconfigured interface name of %s:%s to %s" %
442 (vnfi.name, if_name, new_name))
443
444 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
445 for vnfi in vnfi_list:
446 config = vnfi.dcinfo.get("Config", dict())
447 env = config.get("Env", list())
448 for env_var in env:
449 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
450 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
451 LOG.info("Executing script in '{}': {}={}"
452 .format(vnfi.name, var, cmd))
453 # execute command in new thread to ensure that GK is not
454 # blocked by VNF
455 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
456 t.daemon = True
457 t.start()
458 break # only execute one command
459
460 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
461 for vnfi in vnfi_list:
462 config = vnfi.dcinfo.get("Config", dict())
463 env = config.get("Env", list())
464 for env_var in env:
465 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
466 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
467 LOG.info("Executing script in '{}': {}={}"
468 .format(vnfi.name, var, cmd))
469 # execute command in new thread to ensure that GK is not
470 # blocked by VNF
471 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
472 t.daemon = True
473 t.start()
474 break # only execute one command
475
476 def _unpack_service_package(self):
477 """
478 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
479 """
480 LOG.info("Unzipping: %r" % self.package_file_path)
481 with zipfile.ZipFile(self.package_file_path, "r") as z:
482 z.extractall(self.package_content_path)
483
484 def _load_package_descriptor(self):
485 """
486 Load the main package descriptor YAML and keep it as dict.
487 :return:
488 """
489 self.manifest = load_yaml(
490 os.path.join(
491 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
492
493 def _load_nsd(self):
494 """
495 Load the entry NSD YAML and keep it as dict.
496 :return:
497 """
498 if "package_content" in self.manifest:
499 nsd_path = None
500 for f in self.manifest.get("package_content"):
501 if f.get("content-type") == "application/vnd.5gtango.nsd":
502 nsd_path = os.path.join(
503 self.package_content_path,
504 make_relative_path(f.get("source")))
505 break # always use the first NSD for now
506 if nsd_path is None:
507 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
508 self.nsd = load_yaml(nsd_path)
509 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
510 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
511 else:
512 raise OnBoardingException(
513 "No 'package_content' section in package manifest:\n{}"
514 .format(self.manifest))
515
516 def _load_vnfd(self):
517 """
518 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
519 :return:
520 """
521 # first make a list of all the vnfds in the package
522 vnfd_set = dict()
523 if "package_content" in self.manifest:
524 for pc in self.manifest.get("package_content"):
525 if pc.get(
526 "content-type") == "application/vnd.5gtango.vnfd":
527 vnfd_path = os.path.join(
528 self.package_content_path,
529 make_relative_path(pc.get("source")))
530 vnfd = load_yaml(vnfd_path)
531 vnfd_set[vnfd.get("name")] = vnfd
532 if len(vnfd_set) < 1:
533 raise OnBoardingException("No VNFDs found.")
534 # then link each vnf_id in the nsd to its vnfd
535 for v in self.nsd.get("network_functions"):
536 if v.get("vnf_name") in vnfd_set:
537 self.vnfds[v.get("vnf_id")] = vnfd_set[v.get("vnf_name")]
538 LOG.debug("Loaded VNFD: {0} id: {1}"
539 .format(v.get("vnf_name"), v.get("vnf_id")))
540
541 def _connect_elines(self, eline_fwd_links, instance_uuid):
542 """
543 Connect all E-LINE links in the NSD
544 Attention: This method DOES NOT support multi V/CDU VNFs!
545 :param eline_fwd_links: list of E-LINE links in the NSD
546 :param: instance_uuid of the service
547 :return:
548 """
549 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
550 # eg. different services get a unique cookie for their flowrules
551 cookie = 1
552 for link in eline_fwd_links:
553 LOG.info("Found E-Line: {}".format(link))
554 src_id, src_if_name = parse_interface(
555 link["connection_points_reference"][0])
556 dst_id, dst_if_name = parse_interface(
557 link["connection_points_reference"][1])
558 LOG.info("Searching C/VDU for E-Line: src={}, src_if={}, dst={}, dst_if={}"
559 .format(src_id, src_if_name, dst_id, dst_if_name))
560 # handle C/VDUs (ugly hack, only one V/CDU per VNF for now)
561 src_units = self._get_vnf_instance_units(instance_uuid, src_id)
562 dst_units = self._get_vnf_instance_units(instance_uuid, dst_id)
563 if src_units is None or dst_units is None:
564 LOG.info("No VNF-VNF link. Skipping: src={}, src_if={}, dst={}, dst_if={}"
565 .format(src_id, src_if_name, dst_id, dst_if_name))
566 return
567 # we only support VNFs with one V/CDU right now
568 if len(src_units) != 1 or len(dst_units) != 1:
569 raise BaseException("LLCM does not support E-LINES for multi V/CDU VNFs.")
570 # get the full name from that C/VDU and use it as src_id and dst_id
571 src_id = src_units[0].name
572 dst_id = dst_units[0].name
573 # from here we have all info we need
574 LOG.info("Creating E-Line for C/VDU: src={}, src_if={}, dst={}, dst_if={}"
575 .format(src_id, src_if_name, dst_id, dst_if_name))
576 # get involved vnfis
577 src_vnfi = src_units[0]
578 dst_vnfi = dst_units[0]
579 # proceed with chaining setup
580 setChaining = False
581 if src_vnfi is not None and dst_vnfi is not None:
582 setChaining = True
583 # re-configure the VNFs IP assignment and ensure that a new
584 # subnet is used for each E-Link
585 eline_net = ELINE_SUBNETS.pop(0)
586 ip1 = "{0}/{1}".format(str(eline_net[1]),
587 eline_net.prefixlen)
588 ip2 = "{0}/{1}".format(str(eline_net[2]),
589 eline_net.prefixlen)
590 # check if VNFs have fixed IPs (ip/address field in VNFDs)
591 if (self._get_vnfd_cp_from_vnfi(
592 src_vnfi, src_if_name).get("ip") is None and
593 self._get_vnfd_cp_from_vnfi(
594 src_vnfi, src_if_name).get("address") is None):
595 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
596 # check if VNFs have fixed IPs (ip field in VNFDs)
597 if (self._get_vnfd_cp_from_vnfi(
598 dst_vnfi, dst_if_name).get("ip") is None and
599 self._get_vnfd_cp_from_vnfi(
600 dst_vnfi, dst_if_name).get("address") is None):
601 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
602 # set the chaining
603 if setChaining:
604 GK.net.setChain(
605 src_id, dst_id,
606 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
607 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
608
609 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
610 """
611 Gets the connection point data structure from the VNFD
612 of the given VNFI using ifname.
613 """
614 if vnfi.vnfd is None:
615 return {}
616 cps = vnfi.vnfd.get("connection_points")
617 for cp in cps:
618 if cp.get("id") == ifname:
619 return cp
620
621 def _connect_elans(self, elan_fwd_links, instance_uuid):
622 """
623 Connect all E-LAN/E-Tree links in the NSD
624 This method supports multi-V/CDU VNFs if the connection
625 point names of the DUs are the same as the ones in the NSD.
626 :param elan_fwd_links: list of E-LAN links in the NSD
627 :param: instance_uuid of the service
628 :return:
629 """
630 for link in elan_fwd_links:
631 # a new E-LAN/E-Tree
632 elan_vnf_list = []
633 lan_net = ELAN_SUBNETS.pop(0)
634 lan_hosts = list(lan_net.hosts())
635
636 # generate lan ip address for all interfaces (of all involved (V/CDUs))
637 for intf in link["connection_points_reference"]:
638 vnf_id, intf_name = parse_interface(intf)
639 if vnf_id is None:
640 continue # skip references to NS connection points
641 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
642 if units is None:
643 continue # skip if no deployment unit is present
644 # iterate over all involved deployment units
645 for uvnfi in units:
646 # Attention: we apply a simplification for multi DU VNFs here:
647 # the connection points of all involved DUs have to have the same
648 # name as the connection points of the surrounding VNF to be mapped.
649 # This is because we do not consider links specified in the VNFds
650 container_name = uvnfi.name
651 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
652 lan_net.prefixlen)
653 LOG.debug(
654 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
655 container_name, intf_name, ip_address))
656 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
657 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
658 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
659 # necessary.
660 vnfi = self._get_vnf_instance(instance_uuid, container_name)
661 if vnfi is not None:
662 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
663 # add this vnf and interface to the E-LAN for tagging
664 elan_vnf_list.append(
665 {'name': container_name, 'interface': intf_name})
666 # install the VLAN tags for this E-LAN
667 GK.net.setLAN(elan_vnf_list)
668
669 def _load_docker_files(self):
670 """
671 Get all paths to Dockerfiles from VNFDs and store them in dict.
672 :return:
673 """
674 for vnf_id, v in self.vnfds.iteritems():
675 for vu in v.get("virtual_deployment_units", []):
676 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
677 if vu.get("vm_image_format") == "docker":
678 vm_image = vu.get("vm_image")
679 docker_path = os.path.join(
680 self.package_content_path,
681 make_relative_path(vm_image))
682 self.local_docker_files[vnf_container_name] = docker_path
683 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
684 for cu in v.get("cloudnative_deployment_units", []):
685 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
686 image = cu.get("image")
687 docker_path = os.path.join(
688 self.package_content_path,
689 make_relative_path(image))
690 self.local_docker_files[vnf_container_name] = docker_path
691 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
692
693 def _load_docker_urls(self):
694 """
695 Get all URLs to pre-build docker images in some repo.
696 :return:
697 """
698 for vnf_id, v in self.vnfds.iteritems():
699 for vu in v.get("virtual_deployment_units", []):
700 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
701 if vu.get("vm_image_format") == "docker":
702 url = vu.get("vm_image")
703 if url is not None:
704 url = url.replace("http://", "")
705 self.remote_docker_image_urls[vnf_container_name] = url
706 LOG.debug("Found Docker image URL (%r): %r" %
707 (vnf_container_name,
708 self.remote_docker_image_urls[vnf_container_name]))
709 for cu in v.get("cloudnative_deployment_units", []):
710 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
711 url = cu.get("image")
712 if url is not None:
713 url = url.replace("http://", "")
714 self.remote_docker_image_urls[vnf_container_name] = url
715 LOG.debug("Found Docker image URL (%r): %r" %
716 (vnf_container_name,
717 self.remote_docker_image_urls[vnf_container_name]))
718
719 def _build_images_from_dockerfiles(self):
720 """
721 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
722 """
723 if GK_STANDALONE_MODE:
724 return # do not build anything in standalone mode
725 dc = DockerClient()
726 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
727 self.local_docker_files))
728 for k, v in self.local_docker_files.iteritems():
729 for line in dc.build(path=v.replace(
730 "Dockerfile", ""), tag=k, rm=False, nocache=False):
731 LOG.debug("DOCKER BUILD: %s" % line)
732 LOG.info("Docker image created: %s" % k)
733
734 def _pull_predefined_dockerimages(self):
735 """
736 If the package contains URLs to pre-build Docker images, we download them with this method.
737 """
738 dc = DockerClient()
739 for url in self.remote_docker_image_urls.itervalues():
740 # only pull if not present (speedup for development)
741 if not FORCE_PULL:
742 if len(dc.images.list(name=url)) > 0:
743 LOG.debug("Image %r present. Skipping pull." % url)
744 continue
745 LOG.info("Pulling image: %r" % url)
746 # this seems to fail with latest docker api version 2.0.2
747 # dc.images.pull(url,
748 # insecure_registry=True)
749 # using docker cli instead
750 cmd = ["docker",
751 "pull",
752 url,
753 ]
754 Popen(cmd).wait()
755
756 def _check_docker_image_exists(self, image_name):
757 """
758 Query the docker service and check if the given image exists
759 :param image_name: name of the docker image
760 :return:
761 """
762 return len(DockerClient().images.list(name=image_name)) > 0
763
764 def _calculate_placement(self, algorithm):
765 """
766 Do placement by adding the a field "dc" to
767 each VNFD that points to one of our
768 data center objects known to the gatekeeper.
769 """
770 assert(len(self.vnfds) > 0)
771 assert(len(GK.dcs) > 0)
772 # instantiate algorithm an place
773 p = algorithm()
774 p.place(self.nsd, self.vnfds, GK.dcs)
775 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
776 # lets print the placement result
777 for name, vnfd in self.vnfds.iteritems():
778 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
779
780 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
781 """
782 Calculate cpu period and quota for CFS
783 :param cpu_time_percentage: percentage of overall CPU to be used
784 :return: cpu_period, cpu_quota
785 """
786 if cpu_time_percentage is None:
787 return -1, -1
788 if cpu_time_percentage < 0:
789 return -1, -1
790 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
791 # Attention minimum cpu_quota is 1ms (micro)
792 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
793 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
794 (cpu_period, cpu_time_percentage))
795 # calculate the fraction of cpu time for this container
796 cpu_quota = cpu_period * cpu_time_percentage
797 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
798 # idea why
799 if cpu_quota < 1000:
800 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
801 cpu_quota = 1000
802 LOG.warning("Increased CPU quota to avoid system error.")
803 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
804 (cpu_period, cpu_quota))
805 return int(cpu_period), int(cpu_quota)
806
807
808 """
809 Some (simple) placement algorithms
810 """
811
812
813 class FirstDcPlacement(object):
814 """
815 Placement: Always use one and the same data center from the GK.dcs dict.
816 """
817
818 def place(self, nsd, vnfds, dcs):
819 for id, vnfd in vnfds.iteritems():
820 vnfd["dc"] = list(dcs.itervalues())[0]
821
822
823 class RoundRobinDcPlacement(object):
824 """
825 Placement: Distribute VNFs across all available DCs in a round robin fashion.
826 """
827
828 def place(self, nsd, vnfds, dcs):
829 c = 0
830 dcs_list = list(dcs.itervalues())
831 for id, vnfd in vnfds.iteritems():
832 vnfd["dc"] = dcs_list[c % len(dcs_list)]
833 c += 1 # inc. c to use next DC
834
835
836 """
837 Resource definitions and API endpoints
838 """
839
840
841 class Packages(fr.Resource):
842
843 def post(self):
844 """
845 Upload a *.son service package to the dummy gatekeeper.
846
847 We expect request with a *.son file and store it in UPLOAD_FOLDER
848 :return: UUID
849 """
850 try:
851 # get file contents
852 LOG.info("POST /packages called")
853 # lets search for the package in the request
854 is_file_object = False # make API more robust: file can be in data or in files field
855 if "package" in request.files:
856 son_file = request.files["package"]
857 is_file_object = True
858 elif len(request.data) > 0:
859 son_file = request.data
860 else:
861 return {"service_uuid": None, "size": 0, "sha1": None,
862 "error": "upload failed. file not found."}, 500
863 # generate a uuid to reference this package
864 service_uuid = str(uuid.uuid4())
865 file_hash = hashlib.sha1(str(son_file)).hexdigest()
866 # ensure that upload folder exists
867 ensure_dir(UPLOAD_FOLDER)
868 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
869 # store *.son file to disk
870 if is_file_object:
871 son_file.save(upload_path)
872 else:
873 with open(upload_path, 'wb') as f:
874 f.write(son_file)
875 size = os.path.getsize(upload_path)
876
877 # first stop and delete any other running services
878 if AUTO_DELETE:
879 service_list = copy.copy(GK.services)
880 for service_uuid in service_list:
881 instances_list = copy.copy(
882 GK.services[service_uuid].instances)
883 for instance_uuid in instances_list:
884 # valid service and instance UUID, stop service
885 GK.services.get(service_uuid).stop_service(
886 instance_uuid)
887 LOG.info("service instance with uuid %r stopped." %
888 instance_uuid)
889
890 # create a service object and register it
891 s = Service(service_uuid, file_hash, upload_path)
892 GK.register_service_package(service_uuid, s)
893
894 # automatically deploy the service
895 if AUTO_DEPLOY:
896 # ok, we have a service uuid, lets start the service
897 reset_subnets()
898 GK.services.get(service_uuid).start_service()
899
900 # generate the JSON result
901 return {"service_uuid": service_uuid, "size": size,
902 "sha1": file_hash, "error": None}, 201
903 except BaseException:
904 LOG.exception("Service package upload failed:")
905 return {"service_uuid": None, "size": 0,
906 "sha1": None, "error": "upload failed"}, 500
907
908 def get(self):
909 """
910 Return a list of UUID's of uploaded service packages.
911 :return: dict/list
912 """
913 LOG.info("GET /packages")
914 return {"service_uuid_list": list(GK.services.iterkeys())}
915
916
917 class Instantiations(fr.Resource):
918
919 def post(self):
920 """
921 Instantiate a service specified by its UUID.
922 Will return a new UUID to identify the running service instance.
923 :return: UUID
924 """
925 LOG.info("POST /instantiations (or /requests) called")
926 # try to extract the service uuid from the request
927 json_data = request.get_json(force=True)
928 service_uuid = json_data.get("service_uuid")
929 service_name = json_data.get("service_name")
930
931 # first try to find by service_name
932 if service_name is not None:
933 for s_uuid, s in GK.services.iteritems():
934 if s.manifest.get("name") == service_name:
935 LOG.info("Found service: {} with UUID: {}"
936 .format(service_name, s_uuid))
937 service_uuid = s_uuid
938 # lets be a bit fuzzy here to make testing easier
939 if (service_uuid is None or service_uuid ==
940 "latest") and len(GK.services) > 0:
941 # if we don't get a service uuid, we simple start the first service
942 # in the list
943 service_uuid = list(GK.services.iterkeys())[0]
944 if service_uuid in GK.services:
945 # ok, we have a service uuid, lets start the service
946 service_instance_uuid = GK.services.get(
947 service_uuid).start_service()
948 return {"service_instance_uuid": service_instance_uuid}, 201
949 return "Service not found", 404
950
951 def get(self):
952 """
953 Returns a list of UUIDs containing all running services.
954 :return: dict / list
955 """
956 LOG.info("GET /instantiations")
957 return {"service_instantiations_list": [
958 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
959
960 def delete(self):
961 """
962 Stops a running service specified by its service and instance UUID.
963 """
964 # try to extract the service and instance UUID from the request
965 json_data = request.get_json(force=True)
966 service_uuid_input = json_data.get("service_uuid")
967 instance_uuid_input = json_data.get("service_instance_uuid")
968 if len(GK.services) < 1:
969 return "No service on-boarded.", 404
970 # try to be fuzzy
971 if service_uuid_input is None:
972 # if we don't get a service uuid we stop all services
973 service_uuid_list = list(GK.services.iterkeys())
974 LOG.info("No service_uuid given, stopping all.")
975 else:
976 service_uuid_list = [service_uuid_input]
977 # for each service
978 for service_uuid in service_uuid_list:
979 if instance_uuid_input is None:
980 instance_uuid_list = list(
981 GK.services[service_uuid].instances.iterkeys())
982 else:
983 instance_uuid_list = [instance_uuid_input]
984 # for all service instances
985 for instance_uuid in instance_uuid_list:
986 if (service_uuid in GK.services and
987 instance_uuid in GK.services[service_uuid].instances):
988 # valid service and instance UUID, stop service
989 GK.services.get(service_uuid).stop_service(instance_uuid)
990 LOG.info("Service instance with uuid %r stopped." % instance_uuid)
991 return "Service(s) stopped.", 200
992
993
994 class Exit(fr.Resource):
995
996 def put(self):
997 """
998 Stop the running Containernet instance regardless of data transmitted
999 """
1000 list(GK.dcs.values())[0].net.stop()
1001
1002
1003 def generate_subnets(prefix, base, subnet_size=50, mask=24):
1004 # Generate a list of ipaddress in subnets
1005 r = list()
1006 for net in range(base, base + subnet_size):
1007 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
1008 r.append(ipaddress.ip_network(unicode(subnet)))
1009 return r
1010
1011
1012 def reset_subnets():
1013 global ELINE_SUBNETS
1014 global ELAN_SUBNETS
1015 # private subnet definitions for the generated interfaces
1016 # 30.0.xxx.0/24
1017 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
1018 # 20.0.xxx.0/24
1019 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
1020
1021
1022 def initialize_GK():
1023 global GK
1024 GK = Gatekeeper()
1025
1026
1027 # create a single, global GK object
1028 GK = None
1029 initialize_GK()
1030 # setup Flask
1031 app = Flask(__name__)
1032 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1033 api = fr.Api(app)
1034 # define endpoints
1035 api.add_resource(Packages, '/packages', '/api/v2/packages')
1036 api.add_resource(Instantiations, '/instantiations',
1037 '/api/v2/instantiations', '/api/v2/requests')
1038 api.add_resource(Exit, '/emulator/exit')
1039
1040
1041 def start_rest_api(host, port, datacenters=dict()):
1042 GK.dcs = datacenters
1043 GK.net = get_dc_network()
1044 # start the Flask server (not the best performance but ok for our use case)
1045 app.run(host=host,
1046 port=port,
1047 debug=True,
1048 use_reloader=False # this is needed to run Flask in a non-main thread
1049 )
1050
1051
1052 def ensure_dir(name):
1053 if not os.path.exists(name):
1054 os.makedirs(name)
1055
1056
1057 def load_yaml(path):
1058 with open(path, "r") as f:
1059 try:
1060 r = yaml.load(f)
1061 except yaml.YAMLError as exc:
1062 LOG.exception("YAML parse error: %r" % str(exc))
1063 r = dict()
1064 return r
1065
1066
1067 def make_relative_path(path):
1068 if path.startswith("file://"):
1069 path = path.replace("file://", "", 1)
1070 if path.startswith("/"):
1071 path = path.replace("/", "", 1)
1072 return path
1073
1074
1075 def get_dc_network():
1076 """
1077 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1078 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1079 :return:
1080 """
1081 assert (len(GK.dcs) > 0)
1082 return GK.dcs.values()[0].net
1083
1084
1085 def parse_interface(interface_name):
1086 """
1087 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1088 :param interface_name:
1089 :return:
1090 """
1091 if ':' in interface_name:
1092 vnf_id, vnf_interface = interface_name.split(':')
1093 else:
1094 vnf_id = None
1095 vnf_interface = interface_name
1096 return vnf_id, vnf_interface
1097
1098
1099 def get_container_name(vnf_id, vdu_id):
1100 return "{}.{}".format(vnf_id, vdu_id)
1101
1102
1103 if __name__ == '__main__':
1104 """
1105 Lets allow to run the API in standalone mode.
1106 """
1107 GK_STANDALONE_MODE = True
1108 logging.getLogger("werkzeug").setLevel(logging.INFO)
1109 start_rest_api("0.0.0.0", 8000)