1 # Copyright (c) 2018 SONATA-NFV, 5GTANGO and Paderborn University
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # Neither the name of the SONATA-NFV, 5GTANGO, Paderborn University
17 # nor the names of its contributors may be used to endorse or promote
18 # products derived from this software without specific prior written
21 # This work has been performed in the framework of the SONATA project,
22 # funded by the European Commission under Grant number 671517 through
23 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
24 # acknowledge the contributions of their colleagues of the SONATA
25 # partner consortium (www.sonata-nfv.eu).
27 # This work has also been performed in the framework of the 5GTANGO project,
28 # funded by the European Commission under Grant number 761493 through
29 # the Horizon 2020 and 5G-PPP programmes. The authors would like to
30 # acknowledge the contributions of their colleagues of the 5GTANGO
31 # partner consortium (www.5gtango.eu).
39 from docker
import DockerClient
40 from flask
import Flask
, request
41 import flask_restful
as fr
42 from subprocess
import Popen
48 LOG
= logging
.getLogger("5gtango.llcm")
49 LOG
.setLevel(logging
.INFO
)
52 GK_STORAGE
= "/tmp/vim-emu-tango-llcm/"
53 UPLOAD_FOLDER
= os
.path
.join(GK_STORAGE
, "uploads/")
54 CATALOG_FOLDER
= os
.path
.join(GK_STORAGE
, "catalog/")
56 # Enable Dockerfile build functionality
57 BUILD_DOCKERFILE
= False
59 # flag to indicate that we run without the emulator (only the bare API for
60 # integration testing)
61 GK_STANDALONE_MODE
= False
63 # should a new version of an image be pulled even if its available
66 # flag to indicate if we use bidirectional forwarding rules in the
67 # automatic chaining process
68 BIDIRECTIONAL_CHAIN
= True
70 # override the management interfaces in the descriptors with default
71 # docker0 interfaces in the containers
72 USE_DOCKER_MGMT
= False
74 # automatically deploy uploaded packages (no need to execute son-access
75 # deploy --latest separately)
78 # and also automatically terminate any other running services
81 # global subnet definitions (see reset_subnets())
85 # Time in seconds to wait for vnf stop scripts to execute fully
86 VNF_STOP_WAIT_TIME
= 5
89 class OnBoardingException(BaseException
):
93 class Gatekeeper(object):
96 self
.services
= dict()
99 # used to generate short names for VNFs (Mininet limitation)
102 LOG
.info("Initialized 5GTANGO LLCM module.")
104 def register_service_package(self
, service_uuid
, service
):
106 register new service package
108 :param service object
110 self
.services
[service_uuid
] = service
111 # lets perform all steps needed to onboard the service
115 class Service(object):
117 This class represents a NS uploaded as a *.son package to the
119 Can have multiple running instances of this service.
126 self
.uuid
= service_uuid
127 self
.package_file_hash
= package_file_hash
128 self
.package_file_path
= package_file_path
129 self
.package_content_path
= os
.path
.join(
130 CATALOG_FOLDER
, "services/%s" % self
.uuid
)
134 self
.local_docker_files
= dict()
135 self
.remote_docker_image_urls
= dict()
136 self
.instances
= dict()
140 Do all steps to prepare this service to be instantiated
143 # 1. extract the contents of the package and store them in our catalog
144 self
._unpack
_service
_package
()
145 # 2. read in all descriptor files
146 self
._load
_package
_descriptor
()
150 raise OnBoardingException("No NSD found.")
151 if len(self
.vnfds
) < 1:
152 raise OnBoardingException("No VNFDs found.")
153 # 3. prepare container images (e.g. download or build Dockerfile)
155 self
._load
_docker
_files
()
156 self
._build
_images
_from
_dockerfiles
()
158 self
._load
_docker
_urls
()
159 self
._pull
_predefined
_dockerimages
()
160 LOG
.info("On-boarded service: %r" % self
.manifest
.get("name"))
162 def start_service(self
):
164 This methods creates and starts a new service instance.
165 It computes placements, iterates over all VNFDs, and starts
166 each VNFD as a Docker container in the data center selected
167 by the placement algorithm.
170 LOG
.info("Starting service %r" % self
.uuid
)
172 # 1. each service instance gets a new uuid to identify it
173 instance_uuid
= str(uuid
.uuid4())
174 # build a instances dict (a bit like a NSR :))
175 self
.instances
[instance_uuid
] = dict()
176 self
.instances
[instance_uuid
]["vnf_instances"] = list()
178 # 2. compute placement of this service instance (adds DC names to
180 # self._calculate_placement(FirstDcPlacement)
181 self
._calculate
_placement
(RoundRobinDcPlacement
)
182 # 3. start all vnfds that we have in the service
183 for vnf_id
in self
.vnfds
:
184 vnfd
= self
.vnfds
[vnf_id
]
185 # attention: returns a list of started deployment units
186 vnfis
= self
._start
_vnfd
(vnfd
, vnf_id
)
187 # add list of VNFIs to total VNFI list
188 self
.instances
[instance_uuid
]["vnf_instances"].extend(vnfis
)
190 # 4. Deploy E-Line, E-Tree and E-LAN links
191 # Attention: Only done if ""forwarding_graphs" section in NSD exists,
192 # even if "forwarding_graphs" are not used directly.
193 if "virtual_links" in self
.nsd
and "forwarding_graphs" in self
.nsd
:
194 vlinks
= self
.nsd
["virtual_links"]
195 # constituent virtual links are not checked
196 eline_fwd_links
= [l
for l
in vlinks
if (
197 l
["connectivity_type"] == "E-Line")]
198 elan_fwd_links
= [l
for l
in vlinks
if (
199 l
["connectivity_type"] == "E-LAN" or
200 l
["connectivity_type"] == "E-Tree")] # Treat E-Tree as E-LAN
202 # 5a. deploy E-Line links
203 GK
.net
.deployed_elines
.extend(eline_fwd_links
) # bookkeeping
204 self
._connect
_elines
(eline_fwd_links
, instance_uuid
)
205 # 5b. deploy E-Tree/E-LAN links
206 GK
.net
.deployed_elans
.extend(elan_fwd_links
) # bookkeeping
207 self
._connect
_elans
(elan_fwd_links
, instance_uuid
)
209 # 6. run the emulator specific entrypoint scripts in the VNFIs of this
211 self
._trigger
_emulator
_start
_scripts
_in
_vnfis
(
212 self
.instances
[instance_uuid
]["vnf_instances"])
214 LOG
.info("Service started. Instance id: %r" % instance_uuid
)
217 def stop_service(self
, instance_uuid
):
219 This method stops a running service instance.
220 It iterates over all VNF instances, stopping them each
221 and removing them from their data center.
222 :param instance_uuid: the uuid of the service instance to be stopped
224 LOG
.info("Stopping service %r" % self
.uuid
)
225 # get relevant information
226 # instance_uuid = str(self.uuid.uuid4())
227 vnf_instances
= self
.instances
[instance_uuid
]["vnf_instances"]
228 # trigger stop skripts in vnf instances and wait a few seconds for
230 self
._trigger
_emulator
_stop
_scripts
_in
_vnfis
(vnf_instances
)
231 time
.sleep(VNF_STOP_WAIT_TIME
)
233 for v
in vnf_instances
:
235 # last step: remove the instance from the list of all instances
236 del self
.instances
[instance_uuid
]
238 def _get_resource_limits(self
, deployment_unit
):
240 Extract resource limits from deployment units.
244 cpu_period
, cpu_quota
= self
._calculate
_cpu
_cfs
_values
(float(1.0))
246 # update from descriptor
247 if "resource_requirements" in deployment_unit
:
248 res_req
= deployment_unit
.get("resource_requirements")
249 cpu_list
= res_req
.get("cpu").get("cpuset")
251 cpu_list
= res_req
.get("cpu").get("vcpus")
252 if cpu_list
is not None:
253 # attention: docker expects list as string w/o spaces:
254 cpu_list
= str(cpu_list
).replace(" ", "").strip()
255 cpu_bw
= res_req
.get("cpu").get("cpu_bw")
258 cpu_period
, cpu_quota
= self
._calculate
_cpu
_cfs
_values
(float(cpu_bw
))
259 mem_limit
= res_req
.get("memory").get("size")
260 mem_unit
= str(res_req
.get("memory").get("size_unit", "GB"))
261 if mem_limit
is not None:
262 mem_limit
= int(mem_limit
)
265 mem_limit
= mem_limit
* 1024 * 1024 * 1024
266 elif "M" in mem_unit
:
267 mem_limit
= mem_limit
* 1024 * 1024
268 elif "K" in mem_unit
:
269 mem_limit
= mem_limit
* 1024
270 return cpu_list
, cpu_period
, cpu_quota
, mem_limit
272 def _start_vnfd(self
, vnfd
, vnf_id
, **kwargs
):
274 Start a single VNFD of this service
275 :param vnfd: vnfd descriptor dict
276 :param vnf_id: unique id of this vnf in the nsd
280 # the vnf_name refers to the container image to be deployed
281 vnf_name
= vnfd
.get("name")
282 # combine VDUs and CDUs
283 deployment_units
= (vnfd
.get("virtual_deployment_units", []) +
284 vnfd
.get("cloudnative_deployment_units", []))
285 # iterate over all deployment units within each VNFDs
286 for u
in deployment_units
:
287 # 0. vnf_container_name = vnf_id.vdu_id
288 vnf_container_name
= get_container_name(vnf_id
, u
.get("id"))
289 # 1. get the name of the docker image to star
290 if vnf_container_name
not in self
.remote_docker_image_urls
:
291 raise Exception("No image name for %r found. Abort." % vnf_container_name
)
292 docker_image_name
= self
.remote_docker_image_urls
.get(vnf_container_name
)
293 # 2. select datacenter to start the VNF in
294 target_dc
= vnfd
.get("dc")
295 # 3. perform some checks to ensure we can start the container
296 assert(docker_image_name
is not None)
297 assert(target_dc
is not None)
298 if not self
._check
_docker
_image
_exists
(docker_image_name
):
299 raise Exception("Docker image {} not found. Abort."
300 .format(docker_image_name
))
302 # 4. get the resource limits
303 cpu_list
, cpu_period
, cpu_quota
, mem_limit
= self
._get
_resource
_limits
(u
)
305 # get connection points defined for the DU
306 intfs
= u
.get("connection_points", [])
307 # do some re-naming of fields to be compatible to containernet
310 i
["ip"] = i
.get("address")
312 # get ports and port_bindings from the port and publish fields of CNFD
313 # see: https://github.com/containernet/containernet/wiki/Exposing-and-mapping-network-ports
314 ports
= list() # Containernet naming
315 port_bindings
= dict()
318 if not isinstance(i
.get("port"), int):
319 LOG
.error("Field 'port' is no int CP: {}".format(i
))
321 ports
.append(i
.get("port"))
323 if not isinstance(i
.get("publish"), dict):
324 LOG
.error("Field 'publish' is no dict CP: {}".format(i
))
326 port_bindings
.update(i
.get("publish"))
328 LOG
.info("{} exposes ports: {}".format(vnf_container_name
, ports
))
329 if len(port_bindings
) > 0:
330 LOG
.info("{} publishes ports: {}".format(vnf_container_name
, port_bindings
))
332 # 5. collect additional information to start container
335 # 5.1 inject descriptor based start/stop commands into env (overwrite)
336 VNFD_CMD_START
= u
.get("vm_cmd_start")
337 VNFD_CMD_STOP
= u
.get("vm_cmd_stop")
338 if VNFD_CMD_START
and not VNFD_CMD_START
== "None":
339 LOG
.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_START
) +
340 " Overwriting SON_EMU_CMD.")
341 cenv
["SON_EMU_CMD"] = VNFD_CMD_START
342 if VNFD_CMD_STOP
and not VNFD_CMD_STOP
== "None":
343 LOG
.info("Found 'vm_cmd_start'='{}' in VNFD.".format(VNFD_CMD_STOP
) +
344 " Overwriting SON_EMU_CMD_STOP.")
345 cenv
["SON_EMU_CMD_STOP"] = VNFD_CMD_STOP
347 # 6. Start the container
348 LOG
.info("Starting %r as %r in DC %r" %
349 (vnf_name
, vnf_container_name
, vnfd
.get("dc")))
350 LOG
.debug("Interfaces for %r: %r" % (vnf_id
, intfs
))
351 # start the container
352 vnfi
= target_dc
.startCompute(
355 image
=docker_image_name
,
357 cpu_period
=cpu_period
,
358 cpuset_cpus
=cpu_list
,
361 properties
=cenv
, # environment
363 port_bindings
=port_bindings
,
364 type=kwargs
.get('type', 'docker'))
365 # add vnfd reference to vnfi
368 vnfi
.vnf_container_name
= vnf_container_name
373 def _stop_vnfi(self
, vnfi
):
376 :param vnfi: vnf instance to be stopped
378 # Find the correct datacenter
379 status
= vnfi
.getStatus()
382 LOG
.info("Stopping the vnf instance contained in %r in DC %r" %
383 (status
["name"], dc
))
384 dc
.stopCompute(status
["name"])
386 def _get_vnf_instance(self
, instance_uuid
, vnf_id
):
388 Returns VNFI object for a given "vnf_id" or "vnf_container_name" taken from an NSD.
389 :return: single object
391 for vnfi
in self
.instances
[instance_uuid
]["vnf_instances"]:
392 if str(vnfi
.name
) == str(vnf_id
):
394 LOG
.warning("No container with name: {0} found.".format(vnf_id
))
397 def _get_vnf_instance_units(self
, instance_uuid
, vnf_id
):
399 Returns a list of VNFI objects (all deployment units) for a given
400 "vnf_id" taken from an NSD.
406 for vnfi
in self
.instances
[instance_uuid
]["vnf_instances"]:
407 if vnf_id
in vnfi
.name
:
410 LOG
.debug("Found units: {} for vnf_id: {}"
411 .format([i
.name
for i
in r
], vnf_id
))
413 LOG
.warning("No container(s) with name: {0} found.".format(vnf_id
))
417 def _vnf_reconfigure_network(vnfi
, if_name
, net_str
=None, new_name
=None):
419 Reconfigure the network configuration of a specific interface
420 of a running container.
421 :param vnfi: container instance
422 :param if_name: interface name
423 :param net_str: network configuration string, e.g., 1.2.3.4/24
426 # assign new ip address
427 if net_str
is not None:
428 intf
= vnfi
.intf(intf
=if_name
)
431 LOG
.debug("Reconfigured network of %s:%s to %r" %
432 (vnfi
.name
, if_name
, net_str
))
434 LOG
.warning("Interface not found: %s:%s. Network reconfiguration skipped." % (
437 if new_name
is not None:
438 vnfi
.cmd('ip link set', if_name
, 'down')
439 vnfi
.cmd('ip link set', if_name
, 'name', new_name
)
440 vnfi
.cmd('ip link set', new_name
, 'up')
441 LOG
.debug("Reconfigured interface name of %s:%s to %s" %
442 (vnfi
.name
, if_name
, new_name
))
444 def _trigger_emulator_start_scripts_in_vnfis(self
, vnfi_list
):
445 for vnfi
in vnfi_list
:
446 config
= vnfi
.dcinfo
.get("Config", dict())
447 env
= config
.get("Env", list())
449 var
, cmd
= map(str.strip
, map(str, env_var
.split('=', 1)))
450 if var
== "SON_EMU_CMD" or var
== "VIM_EMU_CMD":
451 LOG
.info("Executing script in '{}': {}={}"
452 .format(vnfi
.name
, var
, cmd
))
453 # execute command in new thread to ensure that GK is not
455 t
= threading
.Thread(target
=vnfi
.cmdPrint
, args
=(cmd
,))
458 break # only execute one command
460 def _trigger_emulator_stop_scripts_in_vnfis(self
, vnfi_list
):
461 for vnfi
in vnfi_list
:
462 config
= vnfi
.dcinfo
.get("Config", dict())
463 env
= config
.get("Env", list())
465 var
, cmd
= map(str.strip
, map(str, env_var
.split('=', 1)))
466 if var
== "SON_EMU_CMD_STOP" or var
== "VIM_EMU_CMD_STOP":
467 LOG
.info("Executing script in '{}': {}={}"
468 .format(vnfi
.name
, var
, cmd
))
469 # execute command in new thread to ensure that GK is not
471 t
= threading
.Thread(target
=vnfi
.cmdPrint
, args
=(cmd
,))
474 break # only execute one command
476 def _unpack_service_package(self
):
478 unzip *.son file and store contents in CATALOG_FOLDER/services/<service_uuid>/
480 LOG
.info("Unzipping: %r" % self
.package_file_path
)
481 with zipfile
.ZipFile(self
.package_file_path
, "r") as z
:
482 z
.extractall(self
.package_content_path
)
484 def _load_package_descriptor(self
):
486 Load the main package descriptor YAML and keep it as dict.
489 self
.manifest
= load_yaml(
491 self
.package_content_path
, "TOSCA-Metadata/NAPD.yaml"))
495 Load the entry NSD YAML and keep it as dict.
498 if "package_content" in self
.manifest
:
500 for f
in self
.manifest
.get("package_content"):
501 if f
.get("content-type") == "application/vnd.5gtango.nsd":
502 nsd_path
= os
.path
.join(
503 self
.package_content_path
,
504 make_relative_path(f
.get("source")))
505 break # always use the first NSD for now
507 raise OnBoardingException("No NSD with type 'application/vnd.5gtango.nsd' found.")
508 self
.nsd
= load_yaml(nsd_path
)
509 GK
.net
.deployed_nsds
.append(self
.nsd
) # TODO this seems strange (remove?)
510 LOG
.debug("Loaded NSD: %r" % self
.nsd
.get("name"))
512 raise OnBoardingException(
513 "No 'package_content' section in package manifest:\n{}"
514 .format(self
.manifest
))
516 def _load_vnfd(self
):
518 Load all VNFD YAML files referenced in MANIFEST.MF and keep them in dict.
521 # first make a list of all the vnfds in the package
523 if "package_content" in self
.manifest
:
524 for pc
in self
.manifest
.get("package_content"):
526 "content-type") == "application/vnd.5gtango.vnfd":
527 vnfd_path
= os
.path
.join(
528 self
.package_content_path
,
529 make_relative_path(pc
.get("source")))
530 vnfd
= load_yaml(vnfd_path
)
531 vnfd_set
[vnfd
.get("name")] = vnfd
532 if len(vnfd_set
) < 1:
533 raise OnBoardingException("No VNFDs found.")
534 # then link each vnf_id in the nsd to its vnfd
535 for v
in self
.nsd
.get("network_functions"):
536 if v
.get("vnf_name") in vnfd_set
:
537 self
.vnfds
[v
.get("vnf_id")] = vnfd_set
[v
.get("vnf_name")]
538 LOG
.debug("Loaded VNFD: {0} id: {1}"
539 .format(v
.get("vnf_name"), v
.get("vnf_id")))
541 def _connect_elines(self
, eline_fwd_links
, instance_uuid
):
543 Connect all E-LINE links in the NSD
544 Attention: This method DOES NOT support multi V/CDU VNFs!
545 :param eline_fwd_links: list of E-LINE links in the NSD
546 :param: instance_uuid of the service
549 # cookie is used as identifier for the flowrules installed by the dummygatekeeper
550 # eg. different services get a unique cookie for their flowrules
552 for link
in eline_fwd_links
:
553 LOG
.info("Found E-Line: {}".format(link
))
554 src_id
, src_if_name
= parse_interface(
555 link
["connection_points_reference"][0])
556 dst_id
, dst_if_name
= parse_interface(
557 link
["connection_points_reference"][1])
558 LOG
.info("Searching C/VDU for E-Line: src={}, src_if={}, dst={}, dst_if={}"
559 .format(src_id
, src_if_name
, dst_id
, dst_if_name
))
560 # handle C/VDUs (ugly hack, only one V/CDU per VNF for now)
561 src_units
= self
._get
_vnf
_instance
_units
(instance_uuid
, src_id
)
562 dst_units
= self
._get
_vnf
_instance
_units
(instance_uuid
, dst_id
)
563 if src_units
is None or dst_units
is None:
564 LOG
.info("No VNF-VNF link. Skipping: src={}, src_if={}, dst={}, dst_if={}"
565 .format(src_id
, src_if_name
, dst_id
, dst_if_name
))
567 # we only support VNFs with one V/CDU right now
568 if len(src_units
) != 1 or len(dst_units
) != 1:
569 raise BaseException("LLCM does not support E-LINES for multi V/CDU VNFs.")
570 # get the full name from that C/VDU and use it as src_id and dst_id
571 src_id
= src_units
[0].name
572 dst_id
= dst_units
[0].name
573 # from here we have all info we need
574 LOG
.info("Creating E-Line for C/VDU: src={}, src_if={}, dst={}, dst_if={}"
575 .format(src_id
, src_if_name
, dst_id
, dst_if_name
))
577 src_vnfi
= src_units
[0]
578 dst_vnfi
= dst_units
[0]
579 # proceed with chaining setup
581 if src_vnfi
is not None and dst_vnfi
is not None:
583 # re-configure the VNFs IP assignment and ensure that a new
584 # subnet is used for each E-Link
585 eline_net
= ELINE_SUBNETS
.pop(0)
586 ip1
= "{0}/{1}".format(str(eline_net
[1]),
588 ip2
= "{0}/{1}".format(str(eline_net
[2]),
590 # check if VNFs have fixed IPs (ip/address field in VNFDs)
591 if (self
._get
_vnfd
_cp
_from
_vnfi
(
592 src_vnfi
, src_if_name
).get("ip") is None and
593 self
._get
_vnfd
_cp
_from
_vnfi
(
594 src_vnfi
, src_if_name
).get("address") is None):
595 self
._vnf
_reconfigure
_network
(src_vnfi
, src_if_name
, ip1
)
596 # check if VNFs have fixed IPs (ip field in VNFDs)
597 if (self
._get
_vnfd
_cp
_from
_vnfi
(
598 dst_vnfi
, dst_if_name
).get("ip") is None and
599 self
._get
_vnfd
_cp
_from
_vnfi
(
600 dst_vnfi
, dst_if_name
).get("address") is None):
601 self
._vnf
_reconfigure
_network
(dst_vnfi
, dst_if_name
, ip2
)
606 vnf_src_interface
=src_if_name
, vnf_dst_interface
=dst_if_name
,
607 bidirectional
=BIDIRECTIONAL_CHAIN
, cmd
="add-flow", cookie
=cookie
, priority
=10)
609 def _get_vnfd_cp_from_vnfi(self
, vnfi
, ifname
):
611 Gets the connection point data structure from the VNFD
612 of the given VNFI using ifname.
614 if vnfi
.vnfd
is None:
616 cps
= vnfi
.vnfd
.get("connection_points")
618 if cp
.get("id") == ifname
:
621 def _connect_elans(self
, elan_fwd_links
, instance_uuid
):
623 Connect all E-LAN/E-Tree links in the NSD
624 This method supports multi-V/CDU VNFs if the connection
625 point names of the DUs are the same as the ones in the NSD.
626 :param elan_fwd_links: list of E-LAN links in the NSD
627 :param: instance_uuid of the service
630 for link
in elan_fwd_links
:
633 lan_net
= ELAN_SUBNETS
.pop(0)
634 lan_hosts
= list(lan_net
.hosts())
636 # generate lan ip address for all interfaces (of all involved (V/CDUs))
637 for intf
in link
["connection_points_reference"]:
638 vnf_id
, intf_name
= parse_interface(intf
)
640 continue # skip references to NS connection points
641 units
= self
._get
_vnf
_instance
_units
(instance_uuid
, vnf_id
)
643 continue # skip if no deployment unit is present
644 # iterate over all involved deployment units
646 # Attention: we apply a simplification for multi DU VNFs here:
647 # the connection points of all involved DUs have to have the same
648 # name as the connection points of the surrounding VNF to be mapped.
649 # This is because we do not consider links specified in the VNFds
650 container_name
= uvnfi
.name
651 ip_address
= "{0}/{1}".format(str(lan_hosts
.pop(0)),
654 "Setting up E-LAN/E-Tree interface. (%s:%s) -> %s" % (
655 container_name
, intf_name
, ip_address
))
656 # re-configure the VNFs IP assignment and ensure that a new subnet is used for each E-LAN
657 # E-LAN relies on the learning switch capability of Ryu which has to be turned on in the topology
658 # (DCNetwork(controller=RemoteController, enable_learning=True)), so no explicit chaining is
660 vnfi
= self
._get
_vnf
_instance
(instance_uuid
, container_name
)
662 self
._vnf
_reconfigure
_network
(vnfi
, intf_name
, ip_address
)
663 # add this vnf and interface to the E-LAN for tagging
664 elan_vnf_list
.append(
665 {'name': container_name
, 'interface': intf_name
})
666 # install the VLAN tags for this E-LAN
667 GK
.net
.setLAN(elan_vnf_list
)
669 def _load_docker_files(self
):
671 Get all paths to Dockerfiles from VNFDs and store them in dict.
674 for vnf_id
, v
in self
.vnfds
.iteritems():
675 for vu
in v
.get("virtual_deployment_units", []):
676 vnf_container_name
= get_container_name(vnf_id
, vu
.get("id"))
677 if vu
.get("vm_image_format") == "docker":
678 vm_image
= vu
.get("vm_image")
679 docker_path
= os
.path
.join(
680 self
.package_content_path
,
681 make_relative_path(vm_image
))
682 self
.local_docker_files
[vnf_container_name
] = docker_path
683 LOG
.debug("Found Dockerfile (%r): %r" % (vnf_container_name
, docker_path
))
684 for cu
in v
.get("cloudnative_deployment_units", []):
685 vnf_container_name
= get_container_name(vnf_id
, cu
.get("id"))
686 image
= cu
.get("image")
687 docker_path
= os
.path
.join(
688 self
.package_content_path
,
689 make_relative_path(image
))
690 self
.local_docker_files
[vnf_container_name
] = docker_path
691 LOG
.debug("Found Dockerfile (%r): %r" % (vnf_container_name
, docker_path
))
693 def _load_docker_urls(self
):
695 Get all URLs to pre-build docker images in some repo.
698 for vnf_id
, v
in self
.vnfds
.iteritems():
699 for vu
in v
.get("virtual_deployment_units", []):
700 vnf_container_name
= get_container_name(vnf_id
, vu
.get("id"))
701 if vu
.get("vm_image_format") == "docker":
702 url
= vu
.get("vm_image")
704 url
= url
.replace("http://", "")
705 self
.remote_docker_image_urls
[vnf_container_name
] = url
706 LOG
.debug("Found Docker image URL (%r): %r" %
708 self
.remote_docker_image_urls
[vnf_container_name
]))
709 for cu
in v
.get("cloudnative_deployment_units", []):
710 vnf_container_name
= get_container_name(vnf_id
, cu
.get("id"))
711 url
= cu
.get("image")
713 url
= url
.replace("http://", "")
714 self
.remote_docker_image_urls
[vnf_container_name
] = url
715 LOG
.debug("Found Docker image URL (%r): %r" %
717 self
.remote_docker_image_urls
[vnf_container_name
]))
719 def _build_images_from_dockerfiles(self
):
721 Build Docker images for each local Dockerfile found in the package: self.local_docker_files
723 if GK_STANDALONE_MODE
:
724 return # do not build anything in standalone mode
726 LOG
.info("Building %d Docker images (this may take several minutes) ..." % len(
727 self
.local_docker_files
))
728 for k
, v
in self
.local_docker_files
.iteritems():
729 for line
in dc
.build(path
=v
.replace(
730 "Dockerfile", ""), tag
=k
, rm
=False, nocache
=False):
731 LOG
.debug("DOCKER BUILD: %s" % line
)
732 LOG
.info("Docker image created: %s" % k
)
734 def _pull_predefined_dockerimages(self
):
736 If the package contains URLs to pre-build Docker images, we download them with this method.
739 for url
in self
.remote_docker_image_urls
.itervalues():
740 # only pull if not present (speedup for development)
742 if len(dc
.images
.list(name
=url
)) > 0:
743 LOG
.debug("Image %r present. Skipping pull." % url
)
745 LOG
.info("Pulling image: %r" % url
)
746 # this seems to fail with latest docker api version 2.0.2
747 # dc.images.pull(url,
748 # insecure_registry=True)
749 # using docker cli instead
756 def _check_docker_image_exists(self
, image_name
):
758 Query the docker service and check if the given image exists
759 :param image_name: name of the docker image
762 return len(DockerClient().images
.list(name
=image_name
)) > 0
764 def _calculate_placement(self
, algorithm
):
766 Do placement by adding the a field "dc" to
767 each VNFD that points to one of our
768 data center objects known to the gatekeeper.
770 assert(len(self
.vnfds
) > 0)
771 assert(len(GK
.dcs
) > 0)
772 # instantiate algorithm an place
774 p
.place(self
.nsd
, self
.vnfds
, GK
.dcs
)
775 LOG
.info("Using placement algorithm: %r" % p
.__class
__.__name
__)
776 # lets print the placement result
777 for name
, vnfd
in self
.vnfds
.iteritems():
778 LOG
.info("Placed VNF %r on DC %r" % (name
, str(vnfd
.get("dc"))))
780 def _calculate_cpu_cfs_values(self
, cpu_time_percentage
):
782 Calculate cpu period and quota for CFS
783 :param cpu_time_percentage: percentage of overall CPU to be used
784 :return: cpu_period, cpu_quota
786 if cpu_time_percentage
is None:
788 if cpu_time_percentage
< 0:
790 # (see: https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt)
791 # Attention minimum cpu_quota is 1ms (micro)
792 cpu_period
= 1000000 # lets consider a fixed period of 1000000 microseconds for now
793 LOG
.debug("cpu_period is %r, cpu_percentage is %r" %
794 (cpu_period
, cpu_time_percentage
))
795 # calculate the fraction of cpu time for this container
796 cpu_quota
= cpu_period
* cpu_time_percentage
797 # ATTENTION >= 1000 to avoid a invalid argument system error ... no
800 LOG
.debug("cpu_quota before correcting: %r" % cpu_quota
)
802 LOG
.warning("Increased CPU quota to avoid system error.")
803 LOG
.debug("Calculated: cpu_period=%f / cpu_quota=%f" %
804 (cpu_period
, cpu_quota
))
805 return int(cpu_period
), int(cpu_quota
)
809 Some (simple) placement algorithms
813 class FirstDcPlacement(object):
815 Placement: Always use one and the same data center from the GK.dcs dict.
818 def place(self
, nsd
, vnfds
, dcs
):
819 for id, vnfd
in vnfds
.iteritems():
820 vnfd
["dc"] = list(dcs
.itervalues())[0]
823 class RoundRobinDcPlacement(object):
825 Placement: Distribute VNFs across all available DCs in a round robin fashion.
828 def place(self
, nsd
, vnfds
, dcs
):
830 dcs_list
= list(dcs
.itervalues())
831 for id, vnfd
in vnfds
.iteritems():
832 vnfd
["dc"] = dcs_list
[c
% len(dcs_list
)]
833 c
+= 1 # inc. c to use next DC
837 Resource definitions and API endpoints
841 class Packages(fr
.Resource
):
845 Upload a *.son service package to the dummy gatekeeper.
847 We expect request with a *.son file and store it in UPLOAD_FOLDER
852 LOG
.info("POST /packages called")
853 # lets search for the package in the request
854 is_file_object
= False # make API more robust: file can be in data or in files field
855 if "package" in request
.files
:
856 son_file
= request
.files
["package"]
857 is_file_object
= True
858 elif len(request
.data
) > 0:
859 son_file
= request
.data
861 return {"service_uuid": None, "size": 0, "sha1": None,
862 "error": "upload failed. file not found."}, 500
863 # generate a uuid to reference this package
864 service_uuid
= str(uuid
.uuid4())
865 file_hash
= hashlib
.sha1(str(son_file
)).hexdigest()
866 # ensure that upload folder exists
867 ensure_dir(UPLOAD_FOLDER
)
868 upload_path
= os
.path
.join(UPLOAD_FOLDER
, "%s.tgo" % service_uuid
)
869 # store *.son file to disk
871 son_file
.save(upload_path
)
873 with
open(upload_path
, 'wb') as f
:
875 size
= os
.path
.getsize(upload_path
)
877 # first stop and delete any other running services
879 service_list
= copy
.copy(GK
.services
)
880 for service_uuid
in service_list
:
881 instances_list
= copy
.copy(
882 GK
.services
[service_uuid
].instances
)
883 for instance_uuid
in instances_list
:
884 # valid service and instance UUID, stop service
885 GK
.services
.get(service_uuid
).stop_service(
887 LOG
.info("service instance with uuid %r stopped." %
890 # create a service object and register it
891 s
= Service(service_uuid
, file_hash
, upload_path
)
892 GK
.register_service_package(service_uuid
, s
)
894 # automatically deploy the service
896 # ok, we have a service uuid, lets start the service
898 GK
.services
.get(service_uuid
).start_service()
900 # generate the JSON result
901 return {"service_uuid": service_uuid
, "size": size
,
902 "sha1": file_hash
, "error": None}, 201
903 except BaseException
:
904 LOG
.exception("Service package upload failed:")
905 return {"service_uuid": None, "size": 0,
906 "sha1": None, "error": "upload failed"}, 500
910 Return a list of UUID's of uploaded service packages.
913 LOG
.info("GET /packages")
914 return {"service_uuid_list": list(GK
.services
.iterkeys())}
917 class Instantiations(fr
.Resource
):
921 Instantiate a service specified by its UUID.
922 Will return a new UUID to identify the running service instance.
925 LOG
.info("POST /instantiations (or /requests) called")
926 # try to extract the service uuid from the request
927 json_data
= request
.get_json(force
=True)
928 service_uuid
= json_data
.get("service_uuid")
929 service_name
= json_data
.get("service_name")
931 # first try to find by service_name
932 if service_name
is not None:
933 for s_uuid
, s
in GK
.services
.iteritems():
934 if s
.manifest
.get("name") == service_name
:
935 LOG
.info("Found service: {} with UUID: {}"
936 .format(service_name
, s_uuid
))
937 service_uuid
= s_uuid
938 # lets be a bit fuzzy here to make testing easier
939 if (service_uuid
is None or service_uuid
==
940 "latest") and len(GK
.services
) > 0:
941 # if we don't get a service uuid, we simple start the first service
943 service_uuid
= list(GK
.services
.iterkeys())[0]
944 if service_uuid
in GK
.services
:
945 # ok, we have a service uuid, lets start the service
946 service_instance_uuid
= GK
.services
.get(
947 service_uuid
).start_service()
948 return {"service_instance_uuid": service_instance_uuid
}, 201
949 return "Service not found", 404
953 Returns a list of UUIDs containing all running services.
956 LOG
.info("GET /instantiations")
957 return {"service_instantiations_list": [
958 list(s
.instances
.iterkeys()) for s
in GK
.services
.itervalues()]}
962 Stops a running service specified by its service and instance UUID.
964 # try to extract the service and instance UUID from the request
965 json_data
= request
.get_json(force
=True)
966 service_uuid_input
= json_data
.get("service_uuid")
967 instance_uuid_input
= json_data
.get("service_instance_uuid")
968 if len(GK
.services
) < 1:
969 return "No service on-boarded.", 404
971 if service_uuid_input
is None:
972 # if we don't get a service uuid we stop all services
973 service_uuid_list
= list(GK
.services
.iterkeys())
974 LOG
.info("No service_uuid given, stopping all.")
976 service_uuid_list
= [service_uuid_input
]
978 for service_uuid
in service_uuid_list
:
979 if instance_uuid_input
is None:
980 instance_uuid_list
= list(
981 GK
.services
[service_uuid
].instances
.iterkeys())
983 instance_uuid_list
= [instance_uuid_input
]
984 # for all service instances
985 for instance_uuid
in instance_uuid_list
:
986 if (service_uuid
in GK
.services
and
987 instance_uuid
in GK
.services
[service_uuid
].instances
):
988 # valid service and instance UUID, stop service
989 GK
.services
.get(service_uuid
).stop_service(instance_uuid
)
990 LOG
.info("Service instance with uuid %r stopped." % instance_uuid
)
991 return "Service(s) stopped.", 200
994 class Exit(fr
.Resource
):
998 Stop the running Containernet instance regardless of data transmitted
1000 list(GK
.dcs
.values())[0].net
.stop()
1003 def generate_subnets(prefix
, base
, subnet_size
=50, mask
=24):
1004 # Generate a list of ipaddress in subnets
1006 for net
in range(base
, base
+ subnet_size
):
1007 subnet
= "{0}.{1}.0/{2}".format(prefix
, net
, mask
)
1008 r
.append(ipaddress
.ip_network(unicode(subnet
)))
1012 def reset_subnets():
1013 global ELINE_SUBNETS
1015 # private subnet definitions for the generated interfaces
1017 ELAN_SUBNETS
= generate_subnets('30.0', 0, subnet_size
=50, mask
=24)
1019 ELINE_SUBNETS
= generate_subnets('20.0', 0, subnet_size
=50, mask
=24)
1022 def initialize_GK():
1027 # create a single, global GK object
1031 app
= Flask(__name__
)
1032 app
.config
['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB max upload
1035 api
.add_resource(Packages
, '/packages', '/api/v2/packages')
1036 api
.add_resource(Instantiations
, '/instantiations',
1037 '/api/v2/instantiations', '/api/v2/requests')
1038 api
.add_resource(Exit
, '/emulator/exit')
1041 def start_rest_api(host
, port
, datacenters
=dict()):
1042 GK
.dcs
= datacenters
1043 GK
.net
= get_dc_network()
1044 # start the Flask server (not the best performance but ok for our use case)
1048 use_reloader
=False # this is needed to run Flask in a non-main thread
1052 def ensure_dir(name
):
1053 if not os
.path
.exists(name
):
1057 def load_yaml(path
):
1058 with
open(path
, "r") as f
:
1061 except yaml
.YAMLError
as exc
:
1062 LOG
.exception("YAML parse error: %r" % str(exc
))
1067 def make_relative_path(path
):
1068 if path
.startswith("file://"):
1069 path
= path
.replace("file://", "", 1)
1070 if path
.startswith("/"):
1071 path
= path
.replace("/", "", 1)
1075 def get_dc_network():
1077 retrieve the DCnetwork where this dummygatekeeper (GK) connects to.
1078 Assume at least 1 datacenter is connected to this GK, and that all datacenters belong to the same DCNetwork
1081 assert (len(GK
.dcs
) > 0)
1082 return GK
.dcs
.values()[0].net
1085 def parse_interface(interface_name
):
1087 convert the interface name in the nsd to the according vnf_id, vnf_interface names
1088 :param interface_name:
1091 if ':' in interface_name
:
1092 vnf_id
, vnf_interface
= interface_name
.split(':')
1095 vnf_interface
= interface_name
1096 return vnf_id
, vnf_interface
1099 def get_container_name(vnf_id
, vdu_id
):
1100 return "{}.{}".format(vnf_id
, vdu_id
)
1103 if __name__
== '__main__':
1105 Lets allow to run the API in standalone mode.
1107 GK_STANDALONE_MODE
= True
1108 logging
.getLogger("werkzeug").setLevel(logging
.INFO
)
1109 start_rest_api("0.0.0.0", 8000)