Fix: Changed LLCM to use gevent
[osm/vim-emu.git] / src / emuvim / api / tango / llcm.py
1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
2 # ALL RIGHTS RESERVED.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
19 # permission.
20 #
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
26 #
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
32 import logging
33 import os
34 import uuid
35 import hashlib
36 import zipfile
37 import yaml
38 import threading
39 from docker import DockerClient
40 from flask import Flask, request
41 import flask_restful as fr
42 from gevent.pywsgi import WSGIServer
43 from subprocess import Popen
44 import ipaddress
45 import copy
46 import time
47
48
49 LOG = logging.getLogger("5gtango.llcm")
50 LOG.setLevel(logging.INFO)
51
52
53 GK_STORAGE = "/tmp/vim-emu-tango-llcm/"
54 UPLOAD_FOLDER = os.path.join(GK_STORAGE, "uploads/")
55 CATALOG_FOLDER = os.path.join(GK_STORAGE, "catalog/")
56
57 # Enable Dockerfile build functionality
58 BUILD_DOCKERFILE = False
59
60 # flag to indicate that we run without the emulator (only the bare API for
61 # integration testing)
62 GK_STANDALONE_MODE = False
63
64 # should a new version of an image be pulled even if its available
65 FORCE_PULL = False
66
67 # flag to indicate if we use bidirectional forwarding rules in the
68 # automatic chaining process
69 BIDIRECTIONAL_CHAIN = True
70
71 # override the management interfaces in the descriptors with default
72 # docker0 interfaces in the containers
73 USE_DOCKER_MGMT = False
74
75 # automatically deploy uploaded packages (no need to execute son-access
76 # deploy --latest separately)
77 AUTO_DEPLOY = False
78
79 # and also automatically terminate any other running services
80 AUTO_DELETE = False
81
82 # global subnet definitions (see reset_subnets())
83 ELAN_SUBNETS = None
84 ELINE_SUBNETS = None
85
86 # Time in seconds to wait for vnf stop scripts to execute fully
87 VNF_STOP_WAIT_TIME = 5
88
89
90 class OnBoardingException(BaseException):
91 pass
92
93
94 class Gatekeeper(object):
95
96 def __init__(self):
97 self.services = dict()
98 self.dcs = dict()
99 self.net = None
100 # used to generate short names for VNFs (Mininet limitation)
101 self.vnf_counter = 0
102 reset_subnets()
103 LOG.info("Initialized 5GTANGO LLCM module.")
104
105 def register_service_package(self, service_uuid, service):
106 """
107 register new service package
108 :param service_uuid
109 :param service object
110 """
111 self.services[service_uuid] = service
112 # lets perform all steps needed to onboard the service
113 service.onboard()
114
115
116 class Service(object):
117 """
118 This class represents a NS uploaded as a *.son package to the
119 dummy gatekeeper.
120 Can have multiple running instances of this service.
121 """
122
123 def __init__(self,
124 service_uuid,
125 package_file_hash,
126 package_file_path):
127 self.uuid = service_uuid
128 self.package_file_hash = package_file_hash
129 self.package_file_path = package_file_path
130 self.package_content_path = os.path.join(
131 CATALOG_FOLDER, "services/%s" % self.uuid)
132 self.manifest = None
133 self.nsd = None
134 self.vnfds = dict()
135 self.local_docker_files = dict()
136 self.remote_docker_image_urls = dict()
137 self.instances = dict()
138
139 def onboard(self):
140 """
141 Do all steps to prepare this service to be instantiated
142 :return:
143 """
144 # 1. extract the contents of the package and store them in our catalog
145 self._unpack_service_package()
146 # 2. read in all descriptor files
147 self._load_package_descriptor()
148 self._load_nsd()
149 self._load_vnfd()
150 if self.nsd is None:
151 raise OnBoardingException("No NSD found.")
152 if len(self.vnfds) < 1:
153 raise OnBoardingException("No VNFDs found.")
154 # 3. prepare container images (e.g. download or build Dockerfile)
155 if BUILD_DOCKERFILE:
156 self._load_docker_files()
157 self._build_images_from_dockerfiles()
158 else:
159 self._load_docker_urls()
160 self._pull_predefined_dockerimages()
161 LOG.info("On-boarded service: %r" % self.manifest.get("name"))
162
163 def start_service(self):
164 """
165 This methods creates and starts a new service instance.
166 It computes placements, iterates over all VNFDs, and starts
167 each VNFD as a Docker container in the data center selected
168 by the placement algorithm.
169 :return:
170 """
171 LOG.info("Starting service %r" % self.uuid)
172
173 # 1. each service instance gets a new uuid to identify it
174 instance_uuid = str(uuid.uuid4())
175 # build a instances dict (a bit like a NSR :))
176 self.instances[instance_uuid] = dict()
177 self.instances[instance_uuid]["vnf_instances"] = list()
178
179 # 2. compute placement of this service instance (adds DC names to
180 # VNFDs)
181 # self._calculate_placement(FirstDcPlacement)
182 self._calculate_placement(RoundRobinDcPlacement)
183 # 3. start all vnfds that we have in the service
184 for vnf_id in self.vnfds:
185 vnfd = self.vnfds[vnf_id]
186 # attention: returns a list of started deployment units
187 vnfis = self._start_vnfd(vnfd, vnf_id)
188 # add list of VNFIs to total VNFI list
189 self.instances[instance_uuid]["vnf_instances"].extend(vnfis)
190
191 # 4. Deploy E-Line, E-Tree and E-LAN links
192 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
193 # even if "forwarding_graphs" are not used directly.
194 if "virtual_links" in self.nsd and "forwarding_graphs" in self.nsd:
195 vlinks = self.nsd["virtual_links"]
196 # constituent virtual links are not checked
197 eline_fwd_links = [l for l in vlinks if (
198 l["connectivity_type"] == "E-Line")]
199 elan_fwd_links = [l for l in vlinks if (
200 l["connectivity_type"] == "E-LAN" or
201 l["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
202
203 # 5a. deploy E-Line links
204 GK.net.deployed_elines.extend(eline_fwd_links) # bookkeeping
205 self._connect_elines(eline_fwd_links, instance_uuid)
206 # 5b. deploy E-Tree/E-LAN links
207 GK.net.deployed_elans.extend(elan_fwd_links) # bookkeeping
208 self._connect_elans(elan_fwd_links, instance_uuid)
209
210 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
211 # service instance
212 self._trigger_emulator_start_scripts_in_vnfis(
213 self.instances[instance_uuid]["vnf_instances"])
214 # done
215 LOG.info("Service started. Instance id: %r" % instance_uuid)
216 return instance_uuid
217
218 def stop_service(self, instance_uuid):
219 """
220 This method stops a running service instance.
221 It iterates over all VNF instances, stopping them each
222 and removing them from their data center.
223 :param instance_uuid: the uuid of the service instance to be stopped
224 """
225 LOG.info("Stopping service %r" % self.uuid)
226 # get relevant information
227 # instance_uuid = str(self.uuid.uuid4())
228 vnf_instances = self.instances[instance_uuid]["vnf_instances"]
229 # trigger stop skripts in vnf instances and wait a few seconds for
230 # completion
231 self._trigger_emulator_stop_scripts_in_vnfis(vnf_instances)
232 time.sleep(VNF_STOP_WAIT_TIME)
233 # stop all vnfs
234 for v in vnf_instances:
235 self._stop_vnfi(v)
236 # last step: remove the instance from the list of all instances
237 del self.instances[instance_uuid]
238
239 def _get_resource_limits(self, deployment_unit):
240 """
241 Extract resource limits from deployment units.
242 """
243 # defaults
244 cpu_list = None
245 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(1.0))
246 mem_limit = None
247 # update from descriptor
248 if "resource_requirements" in deployment_unit:
249 res_req = deployment_unit.get("resource_requirements")
250 cpu_list = res_req.get("cpu").get("cpuset")
251 if cpu_list is None:
252 cpu_list = res_req.get("cpu").get("vcpus")
253 if cpu_list is not None:
254 # attention: docker expects list as string w/o spaces:
255 cpu_list = str(cpu_list).replace(" ", "").strip()
256 cpu_bw = res_req.get("cpu").get("cpu_bw")
257 if cpu_bw is None:
258 cpu_bw = 1.0
259 cpu_period, cpu_quota = self._calculate_cpu_cfs_values(float(cpu_bw))
260 mem_limit = res_req.get("memory").get("size")
261 mem_unit = str(res_req.get("memory").get("size_unit", "GB"))
262 if mem_limit is not None:
263 mem_limit = int(mem_limit)
264 # to bytes
265 if "G" in mem_unit:
266 mem_limit = mem_limit * 1024 * 1024 * 1024
267 elif "M" in mem_unit:
268 mem_limit = mem_limit * 1024 * 1024
269 elif "K" in mem_unit:
270 mem_limit = mem_limit * 1024
271 return cpu_list, cpu_period, cpu_quota, mem_limit
272
273 def _start_vnfd(self, vnfd, vnf_id, **kwargs):
274 """
275 Start a single VNFD of this service
276 :param vnfd: vnfd descriptor dict
277 :param vnf_id: unique id of this vnf in the nsd
278 :return:
279 """
280 vnfis = list()
281 # the vnf_name refers to the container image to be deployed
282 vnf_name = vnfd.get("name")
283 # combine VDUs and CDUs
284 deployment_units = (vnfd.get("virtual_deployment_units", []) +
285 vnfd.get("cloudnative_deployment_units", []))
286 # iterate over all deployment units within each VNFDs
287 for u in deployment_units:
288 # 0. vnf_container_name = vnf_id.vdu_id
289 vnf_container_name = get_container_name(vnf_id, u.get("id"))
290 # 1. get the name of the docker image to star
291 if vnf_container_name not in self.remote_docker_image_urls:
292 raise Exception("No image name for %r found. Abort." % vnf_container_name)
293 docker_image_name = self.remote_docker_image_urls.get(vnf_container_name)
294 # 2. select datacenter to start the VNF in
295 target_dc = vnfd.get("dc")
296 # 3. perform some checks to ensure we can start the container
297 assert(docker_image_name is not None)
298 assert(target_dc is not None)
299 if not self._check_docker_image_exists(docker_image_name):
300 raise Exception("Docker image {} not found. Abort."
301 .format(docker_image_name))
302
303 # 4. get the resource limits
304 cpu_list, cpu_period, cpu_quota, mem_limit = self._get_resource_limits(u)
305
306 # get connection points defined for the DU
307 intfs = u.get("connection_points", [])
308 # do some re-naming of fields to be compatible to containernet
309 for i in intfs:
310 if i.get("address"):
311 i["ip"] = i.get("address")
312
313 # get ports and port_bindings from the port and publish fields of CNFD
314 # see: https://github.com/containernet/containernet/wiki/Exposing-and-mapping-network-ports
315 ports = list() # Containernet naming
316 port_bindings = dict()
317 for i in intfs:
318 if i.get("port"):
319 if not isinstance(i.get("port"), int):
320 LOG.error("Field 'port' is no int CP: {}".format(i))
321 else:
322 ports.append(i.get("port"))
323 if i.get("publish"):
324 if not isinstance(i.get("publish"), dict):
325 LOG.error("Field 'publish' is no dict CP: {}".format(i))
326 else:
327 port_bindings.update(i.get("publish"))
328 if len(ports) > 0:
329 LOG.info("{} exposes ports: {}".format(vnf_container_name, ports))
330 if len(port_bindings) > 0:
331 LOG.info("{} publishes ports: {}".format(vnf_container_name, port_bindings))
332
333 # 5. collect additional information to start container
334 volumes = list()
335 cenv = dict()
336 # 5.1 inject descriptor based start/stop commands into env (overwrite)
337 VNFD_CMD_START = u.get("vm_cmd_start")
338 VNFD_CMD_STOP = u.get("vm_cmd_stop")
339 if VNFD_CMD_START and not VNFD_CMD_START == "None":
340 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START) +
341 " Overwriting SON_EMU_CMD.")
342 cenv["SON_EMU_CMD"] = VNFD_CMD_START
343 if VNFD_CMD_STOP and not VNFD_CMD_STOP == "None":
344 LOG.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP) +
345 " Overwriting SON_EMU_CMD_STOP.")
346 cenv["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
347
348 # 6. Start the container
349 LOG.info("Starting %r as %r in DC %r" %
350 (vnf_name, vnf_container_name, vnfd.get("dc")))
351 LOG.debug("Interfaces for %r: %r" % (vnf_id, intfs))
352 # start the container
353 vnfi = target_dc.startCompute(
354 vnf_container_name,
355 network=intfs,
356 image=docker_image_name,
357 cpu_quota=cpu_quota,
358 cpu_period=cpu_period,
359 cpuset_cpus=cpu_list,
360 mem_limit=mem_limit,
361 volumes=volumes,
362 properties=cenv, # environment
363 ports=ports,
364 port_bindings=port_bindings,
365 type=kwargs.get('type', 'docker'))
366 # add vnfd reference to vnfi
367 vnfi.vnfd = vnfd
368 # add container name
369 vnfi.vnf_container_name = vnf_container_name
370 # store vnfi
371 vnfis.append(vnfi)
372 return vnfis
373
374 def _stop_vnfi(self, vnfi):
375 """
376 Stop a VNF instance.
377 :param vnfi: vnf instance to be stopped
378 """
379 # Find the correct datacenter
380 status = vnfi.getStatus()
381 dc = vnfi.datacenter
382 # stop the vnfi
383 LOG.info("Stopping the vnf instance contained in %r in DC %r" %
384 (status["name"], dc))
385 dc.stopCompute(status["name"])
386
387 def _get_vnf_instance(self, instance_uuid, vnf_id):
388 """
389 Returns VNFI object for a given "vnf_id" or "vnf_container_name" taken from an NSD.
390 :return: single object
391 """
392 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
393 if str(vnfi.name) == str(vnf_id):
394 return vnfi
395 LOG.warning("No container with name: {0} found.".format(vnf_id))
396 return None
397
398 def _get_vnf_instance_units(self, instance_uuid, vnf_id):
399 """
400 Returns a list of VNFI objects (all deployment units) for a given
401 "vnf_id" taken from an NSD.
402 :return: list
403 """
404 if vnf_id is None:
405 return None
406 r = list()
407 for vnfi in self.instances[instance_uuid]["vnf_instances"]:
408 if vnf_id in vnfi.name:
409 r.append(vnfi)
410 if len(r) > 0:
411 LOG.debug("Found units: {} for vnf_id: {}"
412 .format([i.name for i in r], vnf_id))
413 return r
414 LOG.warning("No container(s) with name: {0} found.".format(vnf_id))
415 return None
416
417 @staticmethod
418 def _vnf_reconfigure_network(vnfi, if_name, net_str=None, new_name=None):
419 """
420 Reconfigure the network configuration of a specific interface
421 of a running container.
422 :param vnfi: container instance
423 :param if_name: interface name
424 :param net_str: network configuration string, e.g., 1.2.3.4/24
425 :return:
426 """
427 # assign new ip address
428 if net_str is not None:
429 intf = vnfi.intf(intf=if_name)
430 if intf is not None:
431 intf.setIP(net_str)
432 LOG.debug("Reconfigured network of %s:%s to %r" %
433 (vnfi.name, if_name, net_str))
434 else:
435 LOG.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
436 vnfi.name, if_name))
437
438 if new_name is not None:
439 vnfi.cmd('ip link set', if_name, 'down')
440 vnfi.cmd('ip link set', if_name, 'name', new_name)
441 vnfi.cmd('ip link set', new_name, 'up')
442 LOG.debug("Reconfigured interface name of %s:%s to %s" %
443 (vnfi.name, if_name, new_name))
444
445 def _trigger_emulator_start_scripts_in_vnfis(self, vnfi_list):
446 for vnfi in vnfi_list:
447 config = vnfi.dcinfo.get("Config", dict())
448 env = config.get("Env", list())
449 for env_var in env:
450 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
451 if var == "SON_EMU_CMD" or var == "VIM_EMU_CMD":
452 LOG.info("Executing script in '{}': {}={}"
453 .format(vnfi.name, var, cmd))
454 # execute command in new thread to ensure that GK is not
455 # blocked by VNF
456 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
457 t.daemon = True
458 t.start()
459 break # only execute one command
460
461 def _trigger_emulator_stop_scripts_in_vnfis(self, vnfi_list):
462 for vnfi in vnfi_list:
463 config = vnfi.dcinfo.get("Config", dict())
464 env = config.get("Env", list())
465 for env_var in env:
466 var, cmd = map(str.strip, map(str, env_var.split('=', 1)))
467 if var == "SON_EMU_CMD_STOP" or var == "VIM_EMU_CMD_STOP":
468 LOG.info("Executing script in '{}': {}={}"
469 .format(vnfi.name, var, cmd))
470 # execute command in new thread to ensure that GK is not
471 # blocked by VNF
472 t = threading.Thread(target=vnfi.cmdPrint, args=(cmd,))
473 t.daemon = True
474 t.start()
475 break # only execute one command
476
477 def _unpack_service_package(self):
478 """
479 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
480 """
481 LOG.info("Unzipping: %r" % self.package_file_path)
482 with zipfile.ZipFile(self.package_file_path, "r") as z:
483 z.extractall(self.package_content_path)
484
485 def _load_package_descriptor(self):
486 """
487 Load the main package descriptor YAML and keep it as dict.
488 :return:
489 """
490 self.manifest = load_yaml(
491 os.path.join(
492 self.package_content_path, "TOSCA-Metadata/NAPD.yaml"))
493
494 def _load_nsd(self):
495 """
496 Load the entry NSD YAML and keep it as dict.
497 :return:
498 """
499 if "package_content" in self.manifest:
500 nsd_path = None
501 for f in self.manifest.get("package_content"):
502 if f.get("content-type") == "application/vnd.5gtango.nsd":
503 nsd_path = os.path.join(
504 self.package_content_path,
505 make_relative_path(f.get("source")))
506 break # always use the first NSD for now
507 if nsd_path is None:
508 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
509 self.nsd = load_yaml(nsd_path)
510 GK.net.deployed_nsds.append(self.nsd) # TODO this seems strange (remove?)
511 LOG.debug("Loaded NSD: %r" % self.nsd.get("name"))
512 else:
513 raise OnBoardingException(
514 "No 'package_content' section in package manifest:\n{}"
515 .format(self.manifest))
516
517 def _load_vnfd(self):
518 """
519 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
520 :return:
521 """
522 # first make a list of all the vnfds in the package
523 vnfd_set = dict()
524 if "package_content" in self.manifest:
525 for pc in self.manifest.get("package_content"):
526 if pc.get(
527 "content-type") == "application/vnd.5gtango.vnfd":
528 vnfd_path = os.path.join(
529 self.package_content_path,
530 make_relative_path(pc.get("source")))
531 vnfd = load_yaml(vnfd_path)
532 vnfd_set[vnfd.get("name")] = vnfd
533 if len(vnfd_set) < 1:
534 raise OnBoardingException("No VNFDs found.")
535 # then link each vnf_id in the nsd to its vnfd
536 for v in self.nsd.get("network_functions"):
537 if v.get("vnf_name") in vnfd_set:
538 self.vnfds[v.get("vnf_id")] = vnfd_set[v.get("vnf_name")]
539 LOG.debug("Loaded VNFD: {0} id: {1}"
540 .format(v.get("vnf_name"), v.get("vnf_id")))
541
542 def _connect_elines(self, eline_fwd_links, instance_uuid):
543 """
544 Connect all E-LINE links in the NSD
545 Attention: This method DOES NOT support multi V/CDU VNFs!
546 :param eline_fwd_links: list of E-LINE links in the NSD
547 :param: instance_uuid of the service
548 :return:
549 """
550 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
551 # eg. different services get a unique cookie for their flowrules
552 cookie = 1
553 for link in eline_fwd_links:
554 LOG.info("Found E-Line: {}".format(link))
555 src_id, src_if_name = parse_interface(
556 link["connection_points_reference"][0])
557 dst_id, dst_if_name = parse_interface(
558 link["connection_points_reference"][1])
559 LOG.info("Searching C/VDU for E-Line: src={}, src_if={}, dst={}, dst_if={}"
560 .format(src_id, src_if_name, dst_id, dst_if_name))
561 # handle C/VDUs (ugly hack, only one V/CDU per VNF for now)
562 src_units = self._get_vnf_instance_units(instance_uuid, src_id)
563 dst_units = self._get_vnf_instance_units(instance_uuid, dst_id)
564 if src_units is None or dst_units is None:
565 LOG.info("No VNF-VNF link. Skipping: src={}, src_if={}, dst={}, dst_if={}"
566 .format(src_id, src_if_name, dst_id, dst_if_name))
567 return
568 # we only support VNFs with one V/CDU right now
569 if len(src_units) != 1 or len(dst_units) != 1:
570 raise BaseException("LLCM does not support E-LINES for multi V/CDU VNFs.")
571 # get the full name from that C/VDU and use it as src_id and dst_id
572 src_id = src_units[0].name
573 dst_id = dst_units[0].name
574 # from here we have all info we need
575 LOG.info("Creating E-Line for C/VDU: src={}, src_if={}, dst={}, dst_if={}"
576 .format(src_id, src_if_name, dst_id, dst_if_name))
577 # get involved vnfis
578 src_vnfi = src_units[0]
579 dst_vnfi = dst_units[0]
580 # proceed with chaining setup
581 setChaining = False
582 if src_vnfi is not None and dst_vnfi is not None:
583 setChaining = True
584 # re-configure the VNFs IP assignment and ensure that a new
585 # subnet is used for each E-Link
586 eline_net = ELINE_SUBNETS.pop(0)
587 ip1 = "{0}/{1}".format(str(eline_net[1]),
588 eline_net.prefixlen)
589 ip2 = "{0}/{1}".format(str(eline_net[2]),
590 eline_net.prefixlen)
591 # check if VNFs have fixed IPs (ip/address field in VNFDs)
592 if (self._get_vnfd_cp_from_vnfi(
593 src_vnfi, src_if_name).get("ip") is None and
594 self._get_vnfd_cp_from_vnfi(
595 src_vnfi, src_if_name).get("address") is None):
596 self._vnf_reconfigure_network(src_vnfi, src_if_name, ip1)
597 # check if VNFs have fixed IPs (ip field in VNFDs)
598 if (self._get_vnfd_cp_from_vnfi(
599 dst_vnfi, dst_if_name).get("ip") is None and
600 self._get_vnfd_cp_from_vnfi(
601 dst_vnfi, dst_if_name).get("address") is None):
602 self._vnf_reconfigure_network(dst_vnfi, dst_if_name, ip2)
603 # set the chaining
604 if setChaining:
605 GK.net.setChain(
606 src_id, dst_id,
607 vnf_src_interface=src_if_name, vnf_dst_interface=dst_if_name,
608 bidirectional=BIDIRECTIONAL_CHAIN, cmd="add-flow", cookie=cookie, priority=10)
609
610 def _get_vnfd_cp_from_vnfi(self, vnfi, ifname):
611 """
612 Gets the connection point data structure from the VNFD
613 of the given VNFI using ifname.
614 """
615 if vnfi.vnfd is None:
616 return {}
617 cps = vnfi.vnfd.get("connection_points")
618 for cp in cps:
619 if cp.get("id") == ifname:
620 return cp
621
622 def _connect_elans(self, elan_fwd_links, instance_uuid):
623 """
624 Connect all E-LAN/E-Tree links in the NSD
625 This method supports multi-V/CDU VNFs if the connection
626 point names of the DUs are the same as the ones in the NSD.
627 :param elan_fwd_links: list of E-LAN links in the NSD
628 :param: instance_uuid of the service
629 :return:
630 """
631 for link in elan_fwd_links:
632 # a new E-LAN/E-Tree
633 elan_vnf_list = []
634 lan_net = ELAN_SUBNETS.pop(0)
635 lan_hosts = list(lan_net.hosts())
636
637 # generate lan ip address for all interfaces (of all involved (V/CDUs))
638 for intf in link["connection_points_reference"]:
639 vnf_id, intf_name = parse_interface(intf)
640 if vnf_id is None:
641 continue # skip references to NS connection points
642 units = self._get_vnf_instance_units(instance_uuid, vnf_id)
643 if units is None:
644 continue # skip if no deployment unit is present
645 # iterate over all involved deployment units
646 for uvnfi in units:
647 # Attention: we apply a simplification for multi DU VNFs here:
648 # the connection points of all involved DUs have to have the same
649 # name as the connection points of the surrounding VNF to be mapped.
650 # This is because we do not consider links specified in the VNFds
651 container_name = uvnfi.name
652 ip_address = "{0}/{1}".format(str(lan_hosts.pop(0)),
653 lan_net.prefixlen)
654 LOG.debug(
655 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
656 container_name, intf_name, ip_address))
657 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
658 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
659 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
660 # necessary.
661 vnfi = self._get_vnf_instance(instance_uuid, container_name)
662 if vnfi is not None:
663 self._vnf_reconfigure_network(vnfi, intf_name, ip_address)
664 # add this vnf and interface to the E-LAN for tagging
665 elan_vnf_list.append(
666 {'name': container_name, 'interface': intf_name})
667 # install the VLAN tags for this E-LAN
668 GK.net.setLAN(elan_vnf_list)
669
670 def _load_docker_files(self):
671 """
672 Get all paths to Dockerfiles from VNFDs and store them in dict.
673 :return:
674 """
675 for vnf_id, v in self.vnfds.iteritems():
676 for vu in v.get("virtual_deployment_units", []):
677 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
678 if vu.get("vm_image_format") == "docker":
679 vm_image = vu.get("vm_image")
680 docker_path = os.path.join(
681 self.package_content_path,
682 make_relative_path(vm_image))
683 self.local_docker_files[vnf_container_name] = docker_path
684 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
685 for cu in v.get("cloudnative_deployment_units", []):
686 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
687 image = cu.get("image")
688 docker_path = os.path.join(
689 self.package_content_path,
690 make_relative_path(image))
691 self.local_docker_files[vnf_container_name] = docker_path
692 LOG.debug("Found Dockerfile (%r): %r" % (vnf_container_name, docker_path))
693
694 def _load_docker_urls(self):
695 """
696 Get all URLs to pre-build docker images in some repo.
697 :return:
698 """
699 for vnf_id, v in self.vnfds.iteritems():
700 for vu in v.get("virtual_deployment_units", []):
701 vnf_container_name = get_container_name(vnf_id, vu.get("id"))
702 if vu.get("vm_image_format") == "docker":
703 url = vu.get("vm_image")
704 if url is not None:
705 url = url.replace("http://", "")
706 self.remote_docker_image_urls[vnf_container_name] = url
707 LOG.debug("Found Docker image URL (%r): %r" %
708 (vnf_container_name,
709 self.remote_docker_image_urls[vnf_container_name]))
710 for cu in v.get("cloudnative_deployment_units", []):
711 vnf_container_name = get_container_name(vnf_id, cu.get("id"))
712 url = cu.get("image")
713 if url is not None:
714 url = url.replace("http://", "")
715 self.remote_docker_image_urls[vnf_container_name] = url
716 LOG.debug("Found Docker image URL (%r): %r" %
717 (vnf_container_name,
718 self.remote_docker_image_urls[vnf_container_name]))
719
720 def _build_images_from_dockerfiles(self):
721 """
722 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
723 """
724 if GK_STANDALONE_MODE:
725 return # do not build anything in standalone mode
726 dc = DockerClient()
727 LOG.info("Building %d Docker images (this may take several minutes) ..." % len(
728 self.local_docker_files))
729 for k, v in self.local_docker_files.iteritems():
730 for line in dc.build(path=v.replace(
731 "Dockerfile", ""), tag=k, rm=False, nocache=False):
732 LOG.debug("DOCKER BUILD: %s" % line)
733 LOG.info("Docker image created: %s" % k)
734
735 def _pull_predefined_dockerimages(self):
736 """
737 If the package contains URLs to pre-build Docker images, we download them with this method.
738 """
739 dc = DockerClient()
740 for url in self.remote_docker_image_urls.itervalues():
741 # only pull if not present (speedup for development)
742 if not FORCE_PULL:
743 if len(dc.images.list(name=url)) > 0:
744 LOG.debug("Image %r present. Skipping pull." % url)
745 continue
746 LOG.info("Pulling image: %r" % url)
747 # this seems to fail with latest docker api version 2.0.2
748 # dc.images.pull(url,
749 # insecure_registry=True)
750 # using docker cli instead
751 cmd = ["docker",
752 "pull",
753 url,
754 ]
755 Popen(cmd).wait()
756
757 def _check_docker_image_exists(self, image_name):
758 """
759 Query the docker service and check if the given image exists
760 :param image_name: name of the docker image
761 :return:
762 """
763 return len(DockerClient().images.list(name=image_name)) > 0
764
765 def _calculate_placement(self, algorithm):
766 """
767 Do placement by adding the a field "dc" to
768 each VNFD that points to one of our
769 data center objects known to the gatekeeper.
770 """
771 assert(len(self.vnfds) > 0)
772 assert(len(GK.dcs) > 0)
773 # instantiate algorithm an place
774 p = algorithm()
775 p.place(self.nsd, self.vnfds, GK.dcs)
776 LOG.info("Using placement algorithm: %r" % p.__class__.__name__)
777 # lets print the placement result
778 for name, vnfd in self.vnfds.iteritems():
779 LOG.info("Placed VNF %r on DC %r" % (name, str(vnfd.get("dc"))))
780
781 def _calculate_cpu_cfs_values(self, cpu_time_percentage):
782 """
783 Calculate cpu period and quota for CFS
784 :param cpu_time_percentage: percentage of overall CPU to be used
785 :return: cpu_period, cpu_quota
786 """
787 if cpu_time_percentage is None:
788 return -1, -1
789 if cpu_time_percentage < 0:
790 return -1, -1
791 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
792 # Attention minimum cpu_quota is 1ms (micro)
793 cpu_period = 1000000 # lets consider a fixed period of 1000000 microseconds for now
794 LOG.debug("cpu_period is %r, cpu_percentage is %r" %
795 (cpu_period, cpu_time_percentage))
796 # calculate the fraction of cpu time for this container
797 cpu_quota = cpu_period * cpu_time_percentage
798 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
799 # idea why
800 if cpu_quota < 1000:
801 LOG.debug("cpu_quota before correcting: %r" % cpu_quota)
802 cpu_quota = 1000
803 LOG.warning("Increased CPU quota to avoid system error.")
804 LOG.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
805 (cpu_period, cpu_quota))
806 return int(cpu_period), int(cpu_quota)
807
808
809 """
810 Some (simple) placement algorithms
811 """
812
813
814 class FirstDcPlacement(object):
815 """
816 Placement: Always use one and the same data center from the GK.dcs dict.
817 """
818
819 def place(self, nsd, vnfds, dcs):
820 for id, vnfd in vnfds.iteritems():
821 vnfd["dc"] = list(dcs.itervalues())[0]
822
823
824 class RoundRobinDcPlacement(object):
825 """
826 Placement: Distribute VNFs across all available DCs in a round robin fashion.
827 """
828
829 def place(self, nsd, vnfds, dcs):
830 c = 0
831 dcs_list = list(dcs.itervalues())
832 for id, vnfd in vnfds.iteritems():
833 vnfd["dc"] = dcs_list[c % len(dcs_list)]
834 c += 1 # inc. c to use next DC
835
836
837 """
838 Resource definitions and API endpoints
839 """
840
841
842 class Packages(fr.Resource):
843
844 def post(self):
845 """
846 Upload a *.son service package to the dummy gatekeeper.
847
848 We expect request with a *.son file and store it in UPLOAD_FOLDER
849 :return: UUID
850 """
851 try:
852 # get file contents
853 LOG.info("POST /packages called")
854 # lets search for the package in the request
855 is_file_object = False # make API more robust: file can be in data or in files field
856 if "package" in request.files:
857 son_file = request.files["package"]
858 is_file_object = True
859 elif len(request.data) > 0:
860 son_file = request.data
861 else:
862 return {"service_uuid": None, "size": 0, "sha1": None,
863 "error": "upload failed. file not found."}, 500
864 # generate a uuid to reference this package
865 service_uuid = str(uuid.uuid4())
866 file_hash = hashlib.sha1(str(son_file)).hexdigest()
867 # ensure that upload folder exists
868 ensure_dir(UPLOAD_FOLDER)
869 upload_path = os.path.join(UPLOAD_FOLDER, "%s.tgo" % service_uuid)
870 # store *.son file to disk
871 if is_file_object:
872 son_file.save(upload_path)
873 else:
874 with open(upload_path, 'wb') as f:
875 f.write(son_file)
876 size = os.path.getsize(upload_path)
877
878 # first stop and delete any other running services
879 if AUTO_DELETE:
880 service_list = copy.copy(GK.services)
881 for service_uuid in service_list:
882 instances_list = copy.copy(
883 GK.services[service_uuid].instances)
884 for instance_uuid in instances_list:
885 # valid service and instance UUID, stop service
886 GK.services.get(service_uuid).stop_service(
887 instance_uuid)
888 LOG.info("service instance with uuid %r stopped." %
889 instance_uuid)
890
891 # create a service object and register it
892 s = Service(service_uuid, file_hash, upload_path)
893 GK.register_service_package(service_uuid, s)
894
895 # automatically deploy the service
896 if AUTO_DEPLOY:
897 # ok, we have a service uuid, lets start the service
898 reset_subnets()
899 GK.services.get(service_uuid).start_service()
900
901 # generate the JSON result
902 return {"service_uuid": service_uuid, "size": size,
903 "sha1": file_hash, "error": None}, 201
904 except BaseException:
905 LOG.exception("Service package upload failed:")
906 return {"service_uuid": None, "size": 0,
907 "sha1": None, "error": "upload failed"}, 500
908
909 def get(self):
910 """
911 Return a list of UUID's of uploaded service packages.
912 :return: dict/list
913 """
914 LOG.info("GET /packages")
915 return {"service_uuid_list": list(GK.services.iterkeys())}
916
917
918 class Instantiations(fr.Resource):
919
920 def post(self):
921 """
922 Instantiate a service specified by its UUID.
923 Will return a new UUID to identify the running service instance.
924 :return: UUID
925 """
926 LOG.info("POST /instantiations (or /requests) called")
927 # try to extract the service uuid from the request
928 json_data = request.get_json(force=True)
929 service_uuid = json_data.get("service_uuid")
930 service_name = json_data.get("service_name")
931
932 # first try to find by service_name
933 if service_name is not None:
934 for s_uuid, s in GK.services.iteritems():
935 if s.manifest.get("name") == service_name:
936 LOG.info("Found service: {} with UUID: {}"
937 .format(service_name, s_uuid))
938 service_uuid = s_uuid
939 # lets be a bit fuzzy here to make testing easier
940 if (service_uuid is None or service_uuid ==
941 "latest") and len(GK.services) > 0:
942 # if we don't get a service uuid, we simple start the first service
943 # in the list
944 service_uuid = list(GK.services.iterkeys())[0]
945 if service_uuid in GK.services:
946 # ok, we have a service uuid, lets start the service
947 service_instance_uuid = GK.services.get(
948 service_uuid).start_service()
949 return {"service_instance_uuid": service_instance_uuid}, 201
950 return "Service not found", 404
951
952 def get(self):
953 """
954 Returns a list of UUIDs containing all running services.
955 :return: dict / list
956 """
957 LOG.info("GET /instantiations")
958 return {"service_instantiations_list": [
959 list(s.instances.iterkeys()) for s in GK.services.itervalues()]}
960
961 def delete(self):
962 """
963 Stops a running service specified by its service and instance UUID.
964 """
965 # try to extract the service and instance UUID from the request
966 json_data = request.get_json(force=True)
967 service_uuid_input = json_data.get("service_uuid")
968 instance_uuid_input = json_data.get("service_instance_uuid")
969 if len(GK.services) < 1:
970 return "No service on-boarded.", 404
971 # try to be fuzzy
972 if service_uuid_input is None:
973 # if we don't get a service uuid we stop all services
974 service_uuid_list = list(GK.services.iterkeys())
975 LOG.info("No service_uuid given, stopping all.")
976 else:
977 service_uuid_list = [service_uuid_input]
978 # for each service
979 for service_uuid in service_uuid_list:
980 if instance_uuid_input is None:
981 instance_uuid_list = list(
982 GK.services[service_uuid].instances.iterkeys())
983 else:
984 instance_uuid_list = [instance_uuid_input]
985 # for all service instances
986 for instance_uuid in instance_uuid_list:
987 if (service_uuid in GK.services and
988 instance_uuid in GK.services[service_uuid].instances):
989 # valid service and instance UUID, stop service
990 GK.services.get(service_uuid).stop_service(instance_uuid)
991 LOG.info("Service instance with uuid %r stopped." % instance_uuid)
992 return "Service(s) stopped.", 200
993
994
995 class Exit(fr.Resource):
996
997 def put(self):
998 """
999 Stop the running Containernet instance regardless of data transmitted
1000 """
1001 list(GK.dcs.values())[0].net.stop()
1002
1003
1004 def generate_subnets(prefix, base, subnet_size=50, mask=24):
1005 # Generate a list of ipaddress in subnets
1006 r = list()
1007 for net in range(base, base + subnet_size):
1008 subnet = "{0}.{1}.0/{2}".format(prefix, net, mask)
1009 r.append(ipaddress.ip_network(unicode(subnet)))
1010 return r
1011
1012
1013 def reset_subnets():
1014 global ELINE_SUBNETS
1015 global ELAN_SUBNETS
1016 # private subnet definitions for the generated interfaces
1017 # 30.0.xxx.0/24
1018 ELAN_SUBNETS = generate_subnets('30.0', 0, subnet_size=50, mask=24)
1019 # 20.0.xxx.0/24
1020 ELINE_SUBNETS = generate_subnets('20.0', 0, subnet_size=50, mask=24)
1021
1022
1023 def initialize_GK():
1024 global GK
1025 GK = Gatekeeper()
1026
1027
1028 # create a single, global GK object
1029 GK = None
1030 initialize_GK()
1031 # setup Flask
1032 http_server = None
1033 app = Flask(__name__)
1034 app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1035 api = fr.Api(app)
1036 # define endpoints
1037 api.add_resource(Packages, '/packages', '/api/v2/packages')
1038 api.add_resource(Instantiations, '/instantiations',
1039 '/api/v2/instantiations', '/api/v2/requests')
1040 api.add_resource(Exit, '/emulator/exit')
1041
1042
1043 def start_rest_api(host, port, datacenters=dict()):
1044 global http_server
1045 GK.dcs = datacenters
1046 GK.net = get_dc_network()
1047 # start the Flask server (not the best performance but ok for our use case)
1048 # app.run(host=host,
1049 # port=port,
1050 # debug=True,
1051 # use_reloader=False # this is needed to run Flask in a non-main thread
1052 # )
1053 http_server = WSGIServer((host, port), app, log=open("/dev/null", "w"))
1054 http_server.serve_forever()
1055
1056
1057 def stop_rest_api():
1058 if http_server:
1059 http_server.close()
1060
1061
1062 def ensure_dir(name):
1063 if not os.path.exists(name):
1064 os.makedirs(name)
1065
1066
1067 def load_yaml(path):
1068 with open(path, "r") as f:
1069 try:
1070 r = yaml.load(f)
1071 except yaml.YAMLError as exc:
1072 LOG.exception("YAML parse error: %r" % str(exc))
1073 r = dict()
1074 return r
1075
1076
1077 def make_relative_path(path):
1078 if path.startswith("file://"):
1079 path = path.replace("file://", "", 1)
1080 if path.startswith("/"):
1081 path = path.replace("/", "", 1)
1082 return path
1083
1084
1085 def get_dc_network():
1086 """
1087 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1088 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1089 :return:
1090 """
1091 assert (len(GK.dcs) > 0)
1092 return GK.dcs.values()[0].net
1093
1094
1095 def parse_interface(interface_name):
1096 """
1097 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1098 :param interface_name:
1099 :return:
1100 """
1101 if ':' in interface_name:
1102 vnf_id, vnf_interface = interface_name.split(':')
1103 else:
1104 vnf_id = None
1105 vnf_interface = interface_name
1106 return vnf_id, vnf_interface
1107
1108
1109 def get_container_name(vnf_id, vdu_id):
1110 return "{}.{}".format(vnf_id, vdu_id)
1111
1112
1113 if __name__ == '__main__':
1114 """
1115 Lets allow to run the API in standalone mode.
1116 """
1117 GK_STANDALONE_MODE = True
1118 logging.getLogger("werkzeug").setLevel(logging.INFO)
1119 start_rest_api("0.0.0.0", 8000)