Shorten VDU names
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54 import rift.mano.utils.short_name as mano_short_name
55
56
57 class VMResourceError(Exception):
58 """ VM resource Error"""
59 pass
60
61
62 class VnfRecordError(Exception):
63 """ VNF record instatiation failed"""
64 pass
65
66
67 class VduRecordError(Exception):
68 """ VDU record instatiation failed"""
69 pass
70
71
72 class NotImplemented(Exception):
73 """Not implemented """
74 pass
75
76
77 class VnfrRecordExistsError(Exception):
78 """VNFR record already exist with the same VNFR id"""
79 pass
80
81
82 class InternalVirtualLinkRecordError(Exception):
83 """Internal virtual link record error"""
84 pass
85
86
87 class VDUImageNotFound(Exception):
88 """VDU Image not found error"""
89 pass
90
91
92 class VirtualDeploymentUnitRecordError(Exception):
93 """VDU Instantiation failed"""
94 pass
95
96
97 class VMNotReadyError(Exception):
98 """ VM Not yet received from resource manager """
99 pass
100
101
102 class VDURecordNotFound(Exception):
103 """ Could not find a VDU record """
104 pass
105
106
107 class VirtualNetworkFunctionRecordDescNotFound(Exception):
108 """ Cannot find Virtual Network Function Record Descriptor """
109 pass
110
111
112 class VirtualNetworkFunctionDescriptorError(Exception):
113 """ Virtual Network Function Record Descriptor Error """
114 pass
115
116
117 class VirtualNetworkFunctionDescriptorNotFound(Exception):
118 """ Virtual Network Function Record Descriptor Not Found """
119 pass
120
121
122 class VirtualNetworkFunctionRecordNotFound(Exception):
123 """ Virtual Network Function Record Not Found """
124 pass
125
126
127 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
128 """ Virtual Network Funtion Descriptor reference count exists """
129 pass
130
131
132 class VnfrInstantiationFailed(Exception):
133 """ Virtual Network Funtion Instantiation failed"""
134 pass
135
136
137 class VNFMPlacementGroupError(Exception):
138 pass
139
140 class VirtualNetworkFunctionRecordState(enum.Enum):
141 """ VNFR state """
142 INIT = 1
143 VL_INIT_PHASE = 2
144 VM_INIT_PHASE = 3
145 READY = 4
146 TERMINATE = 5
147 VL_TERMINATE_PHASE = 6
148 VDU_TERMINATE_PHASE = 7
149 TERMINATED = 7
150 FAILED = 10
151
152
153 class VDURecordState(enum.Enum):
154 """VDU record state """
155 INIT = 1
156 INSTANTIATING = 2
157 RESOURCE_ALLOC_PENDING = 3
158 READY = 4
159 TERMINATING = 5
160 TERMINATED = 6
161 FAILED = 10
162
163
164 class VcsComponent(object):
165 """ VCS Component within the VNF descriptor """
166 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
167 self._dts = dts
168 self._log = log
169 self._loop = loop
170 self._component = component
171 self._cluster_name = cluster_name
172 self._vcs_handler = vcs_handler
173 self._mangled_name = mangled_name
174
175 @staticmethod
176 def mangle_name(component_name, vnf_name, vnfd_id):
177 """ mangled component name """
178 return vnf_name + ":" + component_name + ":" + vnfd_id
179
180 @property
181 def name(self):
182 """ name of this component"""
183 return self._mangled_name
184
185 @property
186 def path(self):
187 """ The path for this object """
188 return("D,/rw-manifest:manifest" +
189 "/rw-manifest:operational-inventory" +
190 "/rw-manifest:component" +
191 "[rw-manifest:component-name = '{}']").format(self.name)
192
193 @property
194 def instance_xpath(self):
195 """ The path for this object """
196 return("D,/rw-base:vcs" +
197 "/instances" +
198 "/instance" +
199 "[instance-name = '{}']".format(self._cluster_name))
200
201 @property
202 def start_comp_xpath(self):
203 """ start component xpath """
204 return (self.instance_xpath +
205 "/child-n[instance-name = 'START-REQ']")
206
207 def get_start_comp_msg(self, ip_address):
208 """ start this component """
209 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
210 start_msg.instance_name = 'START-REQ'
211 start_msg.component_name = self.name
212 start_msg.admin_command = "START"
213 start_msg.ip_address = ip_address
214
215 return start_msg
216
217 @property
218 def msg(self):
219 """ Returns the message for this vcs component"""
220
221 vcs_comp_dict = self._component.as_dict()
222
223 def mangle_comp_names(comp_dict):
224 """ mangle component name with VNF name, id"""
225 for key, val in comp_dict.items():
226 if isinstance(val, dict):
227 comp_dict[key] = mangle_comp_names(val)
228 elif isinstance(val, list):
229 i = 0
230 for ent in val:
231 if isinstance(ent, dict):
232 val[i] = mangle_comp_names(ent)
233 else:
234 val[i] = ent
235 i += 1
236 elif key == "component_name":
237 comp_dict[key] = VcsComponent.mangle_name(val,
238 self._vnfd_name,
239 self._vnfd_id)
240 return comp_dict
241
242 mangled_dict = mangle_comp_names(vcs_comp_dict)
243 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
244 return msg
245
246 @asyncio.coroutine
247 def publish(self, xact):
248 """ Publishes the VCS component """
249 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
250 self.name, self.path, self.msg)
251 yield from self._vcs_handler.publish(xact, self.path, self.msg)
252
253 @asyncio.coroutine
254 def start(self, xact, parent, ip_addr=None):
255 """ Starts this VCS component """
256 # ATTN RV - replace with block add
257 start_msg = self.get_start_comp_msg(ip_addr)
258 self._log.debug("starting component %s %s",
259 self.start_comp_xpath, start_msg)
260 yield from self._dts.query_create(self.start_comp_xpath,
261 0,
262 start_msg)
263 self._log.debug("started component %s, %s",
264 self.start_comp_xpath, start_msg)
265
266
267 class VirtualDeploymentUnitRecord(object):
268 """ Virtual Deployment Unit Record """
269 def __init__(self,
270 dts,
271 log,
272 loop,
273 vdud,
274 vnfr,
275 nsr_config,
276 mgmt_intf,
277 mgmt_network,
278 cloud_account_name,
279 vnfd_package_store,
280 vdur_id=None,
281 placement_groups=[]):
282 self._dts = dts
283 self._log = log
284 self._loop = loop
285 self._vdud = vdud
286 self._vnfr = vnfr
287 self._nsr_config = nsr_config
288 self._mgmt_intf = mgmt_intf
289 self._cloud_account_name = cloud_account_name
290 self._vnfd_package_store = vnfd_package_store
291 self._mgmt_network = mgmt_network
292
293 self._vdur_id = vdur_id or str(uuid.uuid4())
294 self._int_intf = []
295 self._ext_intf = []
296 self._state = VDURecordState.INIT
297 self._state_failed_reason = None
298 self._request_id = str(uuid.uuid4())
299 self._name = vnfr.name + "__" + vdud.id
300 self._placement_groups = placement_groups
301 self._rm_regh = None
302 self._vm_resp = None
303 self._vdud_cloud_init = None
304 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
305
306 @asyncio.coroutine
307 def vdu_opdata_register(self):
308 yield from self._vdur_console_handler.register()
309
310 def cp_ip_addr(self, cp_name):
311 """ Find ip address by connection point name """
312 if self._vm_resp is not None:
313 for conn_point in self._vm_resp.connection_points:
314 if conn_point.name == cp_name:
315 return conn_point.ip_address
316 return "0.0.0.0"
317
318 def cp_mac_addr(self, cp_name):
319 """ Find mac address by connection point name """
320 if self._vm_resp is not None:
321 for conn_point in self._vm_resp.connection_points:
322 if conn_point.name == cp_name:
323 return conn_point.mac_addr
324 return "00:00:00:00:00:00"
325
326 def cp_id(self, cp_name):
327 """ Find connection point id by connection point name """
328 if self._vm_resp is not None:
329 for conn_point in self._vm_resp.connection_points:
330 if conn_point.name == cp_name:
331 return conn_point.connection_point_id
332 return ''
333
334 @property
335 def vdu_id(self):
336 return self._vdud.id
337
338 @property
339 def vm_resp(self):
340 return self._vm_resp
341
342 @property
343 def name(self):
344 """ Return this VDUR's name """
345 return self._name
346
347 # Truncated name confirming to RFC 1123
348 @property
349 def unique_short_name(self):
350 """ Return this VDUR's unique short name """
351 # Impose these restrictions on Unique name
352 # Max 64
353 # - Max 10 of NSR name (remove all specialcharacters, only numbers and alphabets)
354 # - 6 chars of shortened name
355 # - Max 10 of VDU name (remove all specialcharacters, only numbers and alphabets)
356 #
357 def _restrict_tag(input_str):
358 # Exclude all characters except a-zA-Z0-9
359 outstr = re.sub('[^a-zA-Z0-9]', '', input_str)
360 # Take max of 10 chars
361 return outstr[-10:]
362
363 # Use NSR name for part1
364 part1 = _restrict_tag(self._nsr_config.name)
365 # Get unique short string (6 chars)
366 part2 = mano_short_name.StringShortner(self._name)
367 # Use VDU ID for part3
368 part3 = _restrict_tag(self._vdud.id)
369 shortstr = part1 + "-" + part2.short_string + "-" + part3
370 return shortstr
371
372 @property
373 def cloud_account_name(self):
374 """ Cloud account this VDU should be created in """
375 return self._cloud_account_name
376
377 @property
378 def image_name(self):
379 """ name that should be used to lookup the image on the CMP """
380 if 'image' not in self._vdud:
381 return None
382 return os.path.basename(self._vdud.image)
383
384 @property
385 def image_checksum(self):
386 """ name that should be used to lookup the image on the CMP """
387 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
388
389 @property
390 def management_ip(self):
391 if not self.active:
392 return None
393 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
394
395 @property
396 def vm_management_ip(self):
397 if not self.active:
398 return None
399 return self._vm_resp.management_ip
400
401 @property
402 def operational_status(self):
403 """ Operational status of this VDU"""
404 op_stats_dict = {"INIT": "init",
405 "INSTANTIATING": "vm_init_phase",
406 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
407 "READY": "running",
408 "FAILED": "failed",
409 "TERMINATING": "terminated",
410 "TERMINATED": "terminated",
411 }
412 return op_stats_dict[self._state.name]
413
414 @property
415 def msg(self):
416 """ Process VDU message from resmgr"""
417 vdu_fields = ["vm_flavor",
418 "guest_epa",
419 "vswitch_epa",
420 "hypervisor_epa",
421 "host_epa",
422 "volumes",
423 ]
424 vdu_copy_dict = {k: v for k, v in
425 self._vdud.as_dict().items() if k in vdu_fields}
426 vdur_dict = {"id": self._vdur_id,
427 "vdu_id_ref": self._vdud.id,
428 "operational_status": self.operational_status,
429 "operational_status_details": self._state_failed_reason,
430 "name": self.name,
431 "unique_short_name": self.unique_short_name
432 }
433
434 if self.vm_resp is not None:
435 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
436 "flavor_id": self.vm_resp.flavor_id
437 })
438 if self._vm_resp.has_field('image_id'):
439 vdur_dict.update({ "image_id": self.vm_resp.image_id })
440
441 if self.management_ip is not None:
442 vdur_dict["management_ip"] = self.management_ip
443
444 if self.vm_management_ip is not None:
445 vdur_dict["vm_management_ip"] = self.vm_management_ip
446
447 vdur_dict.update(vdu_copy_dict)
448
449 if self.vm_resp is not None:
450 if self._vm_resp.has_field('volumes'):
451 for opvolume in self._vm_resp.volumes:
452 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
453 if len(vdurvol_data) == 1:
454 vdurvol_data[0]["volume_id"] = opvolume.volume_id
455 if opvolume.has_field('custom_meta_data'):
456 metadata_list = list()
457 for metadata_item in opvolume.custom_meta_data:
458 metadata_list.append(metadata_item.as_dict())
459 vdurvol_data[0]['custom_meta_data'] = metadata_list
460
461 if self._vm_resp.has_field('supplemental_boot_data'):
462 vdur_dict['supplemental_boot_data'] = dict()
463 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
464 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
465 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
466 metadata_list = list()
467 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
468 metadata_list.append(metadata_item.as_dict())
469 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
470 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
471 file_list = list()
472 for file_item in self._vm_resp.supplemental_boot_data.config_file:
473 file_list.append(file_item.as_dict())
474 vdur_dict['supplemental_boot_data']['config_file'] = file_list
475
476 icp_list = []
477 ii_list = []
478
479 for intf, cp_id, vlr in self._int_intf:
480 cp = self.find_internal_cp_by_cp_id(cp_id)
481
482 icp_list.append({"name": cp.name,
483 "id": cp.id,
484 "type_yang": "VPORT",
485 "ip_address": self.cp_ip_addr(cp.id),
486 "mac_address": self.cp_mac_addr(cp.id)})
487
488 ii_list.append({"name": intf.name,
489 "vdur_internal_connection_point_ref": cp.id,
490 "virtual_interface": {}})
491
492 vdur_dict["internal_connection_point"] = icp_list
493 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
494 vdur_dict["internal_interface"] = ii_list
495
496 ei_list = []
497 for intf, cp, vlr in self._ext_intf:
498 ei_list.append({"name": cp.name,
499 "vnfd_connection_point_ref": cp.name,
500 "virtual_interface": {}})
501 self._vnfr.update_cp(cp.name,
502 self.cp_ip_addr(cp.name),
503 self.cp_mac_addr(cp.name),
504 self.cp_id(cp.name))
505
506 vdur_dict["external_interface"] = ei_list
507
508 placement_groups = []
509 for group in self._placement_groups:
510 placement_groups.append(group.as_dict())
511 vdur_dict['placement_groups_info'] = placement_groups
512
513 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
514
515 @property
516 def resmgr_path(self):
517 """ path for resource-mgr"""
518 return ("D,/rw-resource-mgr:resource-mgmt" +
519 "/vdu-event" +
520 "/vdu-event-data[event-id='{}']".format(self._request_id))
521
522 @property
523 def vm_flavor_msg(self):
524 """ VM flavor message """
525 flavor = self._vdud.vm_flavor.__class__()
526 flavor.copy_from(self._vdud.vm_flavor)
527
528 return flavor
529
530 @property
531 def vdud_cloud_init(self):
532 """ Return the cloud-init contents for the VDU """
533 if self._vdud_cloud_init is None:
534 self._vdud_cloud_init = self.cloud_init()
535
536 return self._vdud_cloud_init
537
538 def cloud_init(self):
539 """ Populate cloud_init with cloud-config script from
540 either the inline contents or from the file provided
541 """
542 if self._vdud.cloud_init is not None:
543 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
544 return self._vdud.cloud_init
545 elif self._vdud.cloud_init_file is not None:
546 # Get cloud-init script contents from the file provided in the cloud_init_file param
547 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
548 filename = self._vdud.cloud_init_file
549 self._vnfd_package_store.refresh()
550 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
551 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
552 try:
553 return cloud_init_extractor.read_script(stored_package, filename)
554 except rift.package.cloud_init.CloudInitExtractionError as e:
555 self.instantiation_failed(str(e))
556 raise VirtualDeploymentUnitRecordError(e)
557 else:
558 self._log.debug("VDU Instantiation: cloud-init script not provided")
559
560 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
561 host_aggregates = []
562 availability_zones = []
563 server_groups = []
564 for group in self._placement_groups:
565 if group.has_field('host_aggregate'):
566 for aggregate in group.host_aggregate:
567 host_aggregates.append(aggregate.as_dict())
568 if group.has_field('availability_zone'):
569 availability_zones.append(group.availability_zone.as_dict())
570 if group.has_field('server_group'):
571 server_groups.append(group.server_group.as_dict())
572
573 if availability_zones:
574 if len(availability_zones) > 1:
575 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
576 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
577 else:
578 vm_create_msg_dict['availability_zone'] = availability_zones[0]
579
580 if server_groups:
581 if len(server_groups) > 1:
582 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
583 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
584 else:
585 vm_create_msg_dict['server_group'] = server_groups[0]
586
587 if host_aggregates:
588 vm_create_msg_dict['host_aggregate'] = host_aggregates
589
590 return
591
592 def process_placement_groups(self, vm_create_msg_dict):
593 """Process the placement_groups and fill resource-mgr request"""
594 if not self._placement_groups:
595 return
596
597 cloud_set = set([group.cloud_type for group in self._placement_groups])
598 assert len(cloud_set) == 1
599 cloud_type = cloud_set.pop()
600
601 if cloud_type == 'openstack':
602 self.process_openstack_placement_group_construct(vm_create_msg_dict)
603
604 else:
605 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
606 return
607
608 def process_custom_bootdata(self, vm_create_msg_dict):
609 """Process the custom boot data"""
610 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
611 return
612
613 self._vnfd_package_store.refresh()
614 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
615 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
616 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
617 if 'source' not in file_item or 'dest' not in file_item:
618 continue
619 source = file_item['source']
620 # Find source file in scripts dir of VNFD
621 self._log.debug("Checking for source config file at %s", source)
622 try:
623 source_file_str = cloud_init_extractor.read_script(stored_package, source)
624 except rift.package.cloud_init.CloudInitExtractionError as e:
625 raise VirtualDeploymentUnitRecordError(e)
626 # Update source file location with file contents
627 file_item['source'] = source_file_str
628
629 return
630
631 def resmgr_msg(self, config=None):
632 vdu_fields = ["vm_flavor",
633 "guest_epa",
634 "vswitch_epa",
635 "hypervisor_epa",
636 "host_epa",
637 "volumes",
638 "supplemental_boot_data"]
639
640 self._log.debug("Creating params based on VDUD: %s", self._vdud)
641 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
642
643 vm_create_msg_dict = {
644 "name": self.unique_short_name, # Truncated name confirming to RFC 1123
645 "node_id": self.name, # Rift assigned Id
646 }
647
648 if self.image_name is not None:
649 vm_create_msg_dict["image_name"] = self.image_name
650
651 if self.image_checksum is not None:
652 vm_create_msg_dict["image_checksum"] = self.image_checksum
653
654 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
655 if self._vdud.has_field('mgmt_vpci'):
656 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
657
658 self._log.debug("VDUD: %s", self._vdud)
659 if config is not None:
660 vm_create_msg_dict['vdu_init'] = {'userdata': config}
661
662 if self._mgmt_network:
663 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
664
665 cp_list = []
666 for intf, cp, vlr in self._ext_intf:
667 cp_info = { "name": cp.name,
668 "virtual_link_id": vlr.network_id,
669 "type_yang": intf.virtual_interface.type_yang }
670
671 if cp.has_field('port_security_enabled'):
672 cp_info["port_security_enabled"] = cp.port_security_enabled
673
674 if (intf.virtual_interface.has_field('vpci') and
675 intf.virtual_interface.vpci is not None):
676 cp_info["vpci"] = intf.virtual_interface.vpci
677
678 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
679 cp_info['security_group'] = vlr.ip_profile_params.security_group
680
681 cp_list.append(cp_info)
682
683 for intf, cp, vlr in self._int_intf:
684 if (intf.virtual_interface.has_field('vpci') and
685 intf.virtual_interface.vpci is not None):
686 cp_list.append({"name": cp,
687 "virtual_link_id": vlr.network_id,
688 "type_yang": intf.virtual_interface.type_yang,
689 "vpci": intf.virtual_interface.vpci})
690 else:
691 if cp.has_field('port_security_enabled'):
692 cp_list.append({"name": cp,
693 "virtual_link_id": vlr.network_id,
694 "type_yang": intf.virtual_interface.type_yang,
695 "port_security_enabled": cp.port_security_enabled})
696 else:
697 cp_list.append({"name": cp,
698 "virtual_link_id": vlr.network_id,
699 "type_yang": intf.virtual_interface.type_yang})
700
701
702 vm_create_msg_dict["connection_points"] = cp_list
703 vm_create_msg_dict.update(vdu_copy_dict)
704
705 self.process_placement_groups(vm_create_msg_dict)
706 if 'supplemental_boot_data' in vm_create_msg_dict:
707 self.process_custom_bootdata(vm_create_msg_dict)
708
709 msg = RwResourceMgrYang.VDUEventData()
710 msg.event_id = self._request_id
711 msg.cloud_account = self.cloud_account_name
712 msg.request_info.from_dict(vm_create_msg_dict)
713
714 return msg
715
716 @asyncio.coroutine
717 def terminate(self, xact):
718 """ Delete resource in VIM """
719 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
720 self._log.warning("VDU terminate in not ready state - Ignoring request")
721 return
722
723 self._state = VDURecordState.TERMINATING
724 if self._vm_resp is not None:
725 try:
726 with self._dts.transaction() as new_xact:
727 yield from self.delete_resource(new_xact)
728 except Exception:
729 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
730
731 if self._rm_regh is not None:
732 self._log.debug("Deregistering resource manager registration handle")
733 self._rm_regh.deregister()
734 self._rm_regh = None
735
736 if self._vdur_console_handler is not None:
737 self._log.error("Deregistering vnfr vdur registration handle")
738 self._vdur_console_handler._regh.deregister()
739 self._vdur_console_handler._regh = None
740
741 self._state = VDURecordState.TERMINATED
742
743 def find_internal_cp_by_cp_id(self, cp_id):
744 """ Find the CP corresponding to the connection point id"""
745 cp = None
746
747 self._log.debug("find_internal_cp_by_cp_id(%s) called",
748 cp_id)
749
750 for int_cp in self._vdud.internal_connection_point:
751 self._log.debug("Checking for int cp %s in internal connection points",
752 int_cp.id)
753 if int_cp.id == cp_id:
754 cp = int_cp
755 break
756
757 if cp is None:
758 self._log.debug("Failed to find cp %s in internal connection points",
759 cp_id)
760 msg = "Failed to find cp %s in internal connection points" % cp_id
761 raise VduRecordError(msg)
762
763 # return the VLR associated with the connection point
764 return cp
765
766 @asyncio.coroutine
767 def create_resource(self, xact, vnfr, config=None):
768 """ Request resource from ResourceMgr """
769 def find_cp_by_name(cp_name):
770 """ Find a connection point by name """
771 cp = None
772 self._log.debug("find_cp_by_name(%s) called", cp_name)
773 for ext_cp in vnfr._cprs:
774 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
775 if ext_cp.name == cp_name:
776 cp = ext_cp
777 break
778 if cp is None:
779 self._log.debug("Failed to find cp %s in external connection points",
780 cp_name)
781 return cp
782
783 def find_internal_vlr_by_cp_name(cp_name):
784 """ Find the VLR corresponding to the connection point name"""
785 cp = None
786
787 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
788 cp_name)
789
790 for int_cp in self._vdud.internal_connection_point:
791 self._log.debug("Checking for int cp %s in internal connection points",
792 int_cp.id)
793 if int_cp.id == cp_name:
794 cp = int_cp
795 break
796
797 if cp is None:
798 self._log.debug("Failed to find cp %s in internal connection points",
799 cp_name)
800 msg = "Failed to find cp %s in internal connection points" % cp_name
801 raise VduRecordError(msg)
802
803 # return the VLR associated with the connection point
804 return vnfr.find_vlr_by_cp(cp_name)
805
806 block = xact.block_create()
807
808 self._log.debug("Executing vm request id: %s, action: create",
809 self._request_id)
810
811 # Resolve the networks associated external interfaces
812 for ext_intf in self._vdud.external_interface:
813 self._log.debug("Resolving external interface name [%s], cp[%s]",
814 ext_intf.name, ext_intf.vnfd_connection_point_ref)
815 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
816 if cp is None:
817 self._log.debug("Failed to find connection point - %s",
818 ext_intf.vnfd_connection_point_ref)
819 continue
820 self._log.debug("Connection point name [%s], type[%s]",
821 cp.name, cp.type_yang)
822
823 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
824
825 etuple = (ext_intf, cp, vlr)
826 self._ext_intf.append(etuple)
827
828 self._log.debug("Created external interface tuple : %s", etuple)
829
830 # Resolve the networks associated internal interfaces
831 for intf in self._vdud.internal_interface:
832 cp_id = intf.vdu_internal_connection_point_ref
833 self._log.debug("Resolving internal interface name [%s], cp[%s]",
834 intf.name, cp_id)
835
836 try:
837 vlr = find_internal_vlr_by_cp_name(cp_id)
838 except Exception as e:
839 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
840 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
841 raise VduRecordError(msg)
842
843 ituple = (intf, cp_id, vlr)
844 self._int_intf.append(ituple)
845
846 self._log.debug("Created internal interface tuple : %s", ituple)
847
848 resmgr_path = self.resmgr_path
849 resmgr_msg = self.resmgr_msg(config)
850
851 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
852 block.add_query_create(resmgr_path, resmgr_msg)
853
854 res_iter = yield from block.execute(now=True)
855
856 resp = None
857
858 for i in res_iter:
859 r = yield from i
860 resp = r.result
861
862 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
863 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
864 self._log.debug("Got vm request response: %s", resp.resource_info)
865 return resp.resource_info
866
867 @asyncio.coroutine
868 def delete_resource(self, xact):
869 block = xact.block_create()
870
871 self._log.debug("Executing vm request id: %s, action: delete",
872 self._request_id)
873
874 block.add_query_delete(self.resmgr_path)
875
876 yield from block.execute(flags=0, now=True)
877
878 @asyncio.coroutine
879 def read_resource(self, xact):
880 block = xact.block_create()
881
882 self._log.debug("Executing vm request id: %s, action: delete",
883 self._request_id)
884
885 block.add_query_read(self.resmgr_path)
886
887 res_iter = yield from block.execute(flags=0, now=True)
888 for i in res_iter:
889 r = yield from i
890 resp = r.result
891
892 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
893 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
894 self._log.debug("Got vm request response: %s", resp.resource_info)
895 #self._vm_resp = resp.resource_info
896 return resp.resource_info
897
898
899 @asyncio.coroutine
900 def start_component(self):
901 """ This VDUR is active """
902 self._log.debug("Starting component %s for vdud %s vdur %s",
903 self._vdud.vcs_component_ref,
904 self._vdud,
905 self._vdur_id)
906 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
907 self.vm_resp.management_ip)
908
909 @property
910 def active(self):
911 """ Is this VDU active """
912 return True if self._state is VDURecordState.READY else False
913
914 @asyncio.coroutine
915 def instantiation_failed(self, failed_reason=None):
916 """ VDU instantiation failed """
917 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
918 self._state = VDURecordState.FAILED
919 self._state_failed_reason = failed_reason
920 yield from self._vnfr.instantiation_failed(failed_reason)
921
922 @asyncio.coroutine
923 def vdu_is_active(self):
924 """ This VDU is active"""
925 if self.active:
926 self._log.warning("VDU %s was already marked as active", self._vdur_id)
927 return
928
929 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
930
931 if self._vdud.vcs_component_ref is not None:
932 yield from self.start_component()
933
934 self._state = VDURecordState.READY
935
936 if self._vnfr.all_vdus_active():
937 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
938 yield from self._vnfr.is_ready()
939
940 @asyncio.coroutine
941 def instantiate(self, xact, vnfr, config=None):
942 """ Instantiate this VDU """
943 self._state = VDURecordState.INSTANTIATING
944
945 @asyncio.coroutine
946 def on_prepare(xact_info, query_action, ks_path, msg):
947 """ This VDUR is active """
948 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
949 query_action,
950 ks_path,
951 msg)
952
953 if (query_action == rwdts.QueryAction.UPDATE or
954 query_action == rwdts.QueryAction.CREATE):
955 self._vm_resp = msg
956
957 if msg.resource_state == "active":
958 # Move this VDU to ready state
959 yield from self.vdu_is_active()
960 elif msg.resource_state == "failed":
961 yield from self.instantiation_failed(msg.resource_errors)
962 elif query_action == rwdts.QueryAction.DELETE:
963 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
964 else:
965 raise NotImplementedError(
966 "%s action on VirtualDeployementUnitRecord not supported",
967 query_action)
968
969 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
970
971 try:
972 reg_event = asyncio.Event(loop=self._loop)
973
974 @asyncio.coroutine
975 def on_ready(regh, status):
976 reg_event.set()
977
978 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
979 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
980 flags=rwdts.Flag.SUBSCRIBER,
981 handler=handler)
982 yield from reg_event.wait()
983
984 vm_resp = yield from self.create_resource(xact, vnfr, config)
985 self._vm_resp = vm_resp
986 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
987
988 self._log.debug("Requested VM from resource manager response %s",
989 vm_resp)
990 if vm_resp.resource_state == "active":
991 self._log.debug("Resourcemgr responded wih an active vm resp %s",
992 vm_resp)
993 yield from self.vdu_is_active()
994 self._state = VDURecordState.READY
995 elif (vm_resp.resource_state == "pending" or
996 vm_resp.resource_state == "inactive"):
997 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
998 vm_resp)
999 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1000 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
1001 # flags=rwdts.Flag.SUBSCRIBER,
1002 # handler=handler)
1003 else:
1004 self._log.debug("Resourcemgr responded wih an error vm resp %s",
1005 vm_resp)
1006 raise VirtualDeploymentUnitRecordError(
1007 "Failed VDUR instantiation %s " % vm_resp)
1008
1009 except Exception as e:
1010 import traceback
1011 traceback.print_exc()
1012 self._log.exception(e)
1013 self._log.error("Instantiation of VDU record failed: %s", str(e))
1014 self._state = VDURecordState.FAILED
1015 yield from self.instantiation_failed(str(e))
1016
1017
1018 class VlRecordState(enum.Enum):
1019 """ VL Record State """
1020 INIT = 101
1021 INSTANTIATION_PENDING = 102
1022 ACTIVE = 103
1023 TERMINATE_PENDING = 104
1024 TERMINATED = 105
1025 FAILED = 106
1026
1027
1028 class InternalVirtualLinkRecord(object):
1029 """ Internal Virtual Link record """
1030 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
1031 self._dts = dts
1032 self._log = log
1033 self._loop = loop
1034 self._ivld_msg = ivld_msg
1035 self._vnfr_name = vnfr_name
1036 self._cloud_account_name = cloud_account_name
1037 self._ip_profile = ip_profile
1038
1039 self._vlr_req = self.create_vlr()
1040 self._vlr = None
1041 self._state = VlRecordState.INIT
1042
1043 @property
1044 def vlr_id(self):
1045 """ Find VLR by id """
1046 return self._vlr_req.id
1047
1048 @property
1049 def name(self):
1050 """ Name of this VL """
1051 if self._ivld_msg.vim_network_name:
1052 return self._ivld_msg.vim_network_name
1053 else:
1054 return self._vnfr_name + "." + self._ivld_msg.name
1055
1056 @property
1057 def network_id(self):
1058 """ Find VLR by id """
1059 return self._vlr.network_id if self._vlr else None
1060
1061 def vlr_path(self):
1062 """ VLR path for this VLR instance"""
1063 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1064
1065 def create_vlr(self):
1066 """ Create the VLR record which will be instantiated """
1067
1068 vld_fields = ["short_name",
1069 "vendor",
1070 "description",
1071 "version",
1072 "type_yang",
1073 "vim_network_name",
1074 "provider_network"]
1075
1076 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1077
1078 vlr_dict = {"id": str(uuid.uuid4()),
1079 "name": self.name,
1080 "cloud_account": self._cloud_account_name,
1081 }
1082
1083 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1084 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1085
1086 vlr_dict.update(vld_copy_dict)
1087
1088 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1089 return vlr
1090
1091 @asyncio.coroutine
1092 def instantiate(self, xact, restart_mode=False):
1093 """ Instantiate VL """
1094
1095 @asyncio.coroutine
1096 def instantiate_vlr():
1097 """ Instantiate VLR"""
1098 self._log.debug("Create VL with xpath %s and vlr %s",
1099 self.vlr_path(), self._vlr_req)
1100
1101 with self._dts.transaction(flags=0) as xact:
1102 block = xact.block_create()
1103 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1104 self._log.debug("Executing VL create path:%s msg:%s",
1105 self.vlr_path(), self._vlr_req)
1106
1107 res_iter = None
1108 try:
1109 res_iter = yield from block.execute()
1110 except Exception:
1111 self._state = VlRecordState.FAILED
1112 self._log.exception("Caught exception while instantial VL")
1113 raise
1114
1115 for ent in res_iter:
1116 res = yield from ent
1117 self._vlr = res.result
1118
1119 if self._vlr.operational_status == 'failed':
1120 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1121 self._state = VlRecordState.FAILED
1122 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1123
1124 self._log.info("Created VL with xpath %s and vlr %s",
1125 self.vlr_path(), self._vlr)
1126
1127 @asyncio.coroutine
1128 def get_vlr():
1129 """ Get the network id """
1130 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1131 vlr = None
1132 for ent in res_iter:
1133 res = yield from ent
1134 vlr = res.result
1135
1136 if vlr is None:
1137 err = "Failed to get VLR for path %s" % self.vlr_path()
1138 self._log.warn(err)
1139 raise InternalVirtualLinkRecordError(err)
1140 return vlr
1141
1142 self._state = VlRecordState.INSTANTIATION_PENDING
1143
1144 if restart_mode:
1145 vl = yield from get_vlr()
1146 if vl is None:
1147 yield from instantiate_vlr()
1148 else:
1149 yield from instantiate_vlr()
1150
1151 self._state = VlRecordState.ACTIVE
1152
1153 def vlr_in_vns(self):
1154 """ Is there a VLR record in VNS """
1155 if (self._state == VlRecordState.ACTIVE or
1156 self._state == VlRecordState.INSTANTIATION_PENDING or
1157 self._state == VlRecordState.FAILED):
1158 return True
1159
1160 return False
1161
1162 @asyncio.coroutine
1163 def terminate(self, xact):
1164 """Terminate this VL """
1165 if not self.vlr_in_vns():
1166 self._log.debug("Ignoring terminate request for id %s in state %s",
1167 self.vlr_id, self._state)
1168 return
1169
1170 self._log.debug("Terminating VL with path %s", self.vlr_path())
1171 self._state = VlRecordState.TERMINATE_PENDING
1172 block = xact.block_create()
1173 block.add_query_delete(self.vlr_path())
1174 yield from block.execute(flags=0, now=True)
1175 self._state = VlRecordState.TERMINATED
1176 self._log.debug("Terminated VL with path %s", self.vlr_path())
1177
1178
1179 class VirtualNetworkFunctionRecord(object):
1180 """ Virtual Network Function Record """
1181 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1182 self._dts = dts
1183 self._log = log
1184 self._loop = loop
1185 self._cluster_name = cluster_name
1186 self._vnfr_msg = vnfr_msg
1187 self._vnfr_id = vnfr_msg.id
1188 self._vnfd_id = vnfr_msg.vnfd.id
1189 self._vnfm = vnfm
1190 self._vcs_handler = vcs_handler
1191 self._vnfr = vnfr_msg
1192 self._mgmt_network = mgmt_network
1193
1194 self._vnfd = vnfr_msg.vnfd
1195 self._state = VirtualNetworkFunctionRecordState.INIT
1196 self._state_failed_reason = None
1197 self._ext_vlrs = {} # The list of external virtual links
1198 self._vlrs = [] # The list of internal virtual links
1199 self._vdus = [] # The list of vdu
1200 self._vlr_by_cp = {}
1201 self._cprs = []
1202 self._inventory = {}
1203 self._create_time = int(time.time())
1204 self._vnf_mon = None
1205 self._config_status = vnfr_msg.config_status
1206 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1207 self._rw_vnfd = None
1208 self._vnfd_ref_count = 0
1209
1210 def _get_vdur_from_vdu_id(self, vdu_id):
1211 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1212 self._log.debug("Searching through vdus: %s", self._vdus)
1213 for vdu in self._vdus:
1214 self._log.debug("vdu_id: %s", vdu.vdu_id)
1215 if vdu.vdu_id == vdu_id:
1216 return vdu
1217
1218 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1219
1220 @property
1221 def operational_status(self):
1222 """ Operational status of this VNFR """
1223 op_status_map = {"INIT": "init",
1224 "VL_INIT_PHASE": "vl_init_phase",
1225 "VM_INIT_PHASE": "vm_init_phase",
1226 "READY": "running",
1227 "TERMINATE": "terminate",
1228 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1229 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1230 "TERMINATED": "terminated",
1231 "FAILED": "failed", }
1232 return op_status_map[self._state.name]
1233
1234 @staticmethod
1235 def vnfd_xpath(vnfd_id):
1236 """ VNFD xpath associated with this VNFR """
1237 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1238
1239 @property
1240 def vnfd_ref_count(self):
1241 """ Returns the VNFD reference count associated with this VNFR """
1242 return self._vnfd_ref_count
1243
1244 def vnfd_in_use(self):
1245 """ Returns whether vnfd is in use or not """
1246 return True if self._vnfd_ref_count > 0 else False
1247
1248 def vnfd_ref(self):
1249 """ Take a reference on this object """
1250 self._vnfd_ref_count += 1
1251 return self._vnfd_ref_count
1252
1253 def vnfd_unref(self):
1254 """ Release reference on this object """
1255 if self._vnfd_ref_count < 1:
1256 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1257 (self.vnfd.id, self._vnfd_ref_count))
1258 self._log.critical(msg)
1259 raise VnfRecordError(msg)
1260 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1261 self.vnfd.id, self._vnfd_ref_count)
1262 self._vnfd_ref_count -= 1
1263 return self._vnfd_ref_count
1264
1265 @property
1266 def vnfd(self):
1267 """ VNFD for this VNFR """
1268 return self._vnfd
1269
1270 @property
1271 def vnf_name(self):
1272 """ VNFD name associated with this VNFR """
1273 return self.vnfd.name
1274
1275 @property
1276 def name(self):
1277 """ Name of this VNF in the record """
1278 return self._vnfr.name
1279
1280 @property
1281 def cloud_account_name(self):
1282 """ Name of the cloud account this VNFR is instantiated in """
1283 return self._vnfr.cloud_account
1284
1285 @property
1286 def vnfd_id(self):
1287 """ VNFD Id associated with this VNFR """
1288 return self.vnfd.id
1289
1290 @property
1291 def vnfr_id(self):
1292 """ VNFR Id associated with this VNFR """
1293 return self._vnfr_id
1294
1295 @property
1296 def member_vnf_index(self):
1297 """ Member VNF index associated with this VNFR """
1298 return self._vnfr.member_vnf_index_ref
1299
1300 @property
1301 def config_status(self):
1302 """ Config agent status for this VNFR """
1303 return self._config_status
1304
1305 def component_by_name(self, component_name):
1306 """ Find a component by name in the inventory list"""
1307 mangled_name = VcsComponent.mangle_name(component_name,
1308 self.vnf_name,
1309 self.vnfd_id)
1310 return self._inventory[mangled_name]
1311
1312
1313
1314 @asyncio.coroutine
1315 def get_nsr_config(self):
1316 ### Need access to NS instance configuration for runtime resolution.
1317 ### This shall be replaced when deployment flavors are implemented
1318 xpath = "C,/nsr:ns-instance-config"
1319 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1320
1321 for result in results:
1322 entry = yield from result
1323 ns_instance_config = entry.result
1324 for nsr in ns_instance_config.nsr:
1325 if nsr.id == self._vnfr_msg.nsr_id_ref:
1326 return nsr
1327 return None
1328
1329 @asyncio.coroutine
1330 def start_component(self, component_name, ip_addr):
1331 """ Start a component in the VNFR by name """
1332 comp = self.component_by_name(component_name)
1333 yield from comp.start(None, None, ip_addr)
1334
1335 def cp_ip_addr(self, cp_name):
1336 """ Get ip address for connection point """
1337 self._log.debug("cp_ip_addr()")
1338 for cp in self._cprs:
1339 if cp.name == cp_name and cp.ip_address is not None:
1340 return cp.ip_address
1341 return "0.0.0.0"
1342
1343 def mgmt_intf_info(self):
1344 """ Get Management interface info for this VNFR """
1345 mgmt_intf_desc = self.vnfd.mgmt_interface
1346 ip_addr = None
1347 if mgmt_intf_desc.has_field("cp"):
1348 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1349 elif mgmt_intf_desc.has_field("vdu_id"):
1350 try:
1351 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1352 ip_addr = vdur.management_ip
1353 except VDURecordNotFound:
1354 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1355 ip_addr = None
1356 else:
1357 ip_addr = mgmt_intf_desc.ip_address
1358 port = mgmt_intf_desc.port
1359
1360 return ip_addr, port
1361
1362 @property
1363 def msg(self):
1364 """ Message associated with this VNFR """
1365 vnfd_fields = ["short_name", "vendor", "description", "version"]
1366 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1367
1368 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1369 ip_address, port = self.mgmt_intf_info()
1370
1371 if ip_address is not None:
1372 mgmt_intf.ip_address = ip_address
1373 if port is not None:
1374 mgmt_intf.port = port
1375
1376 vnfr_dict = {"id": self._vnfr_id,
1377 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1378 "name": self.name,
1379 "member_vnf_index_ref": self.member_vnf_index,
1380 "operational_status": self.operational_status,
1381 "operational_status_details": self._state_failed_reason,
1382 "cloud_account": self.cloud_account_name,
1383 "config_status": self._config_status
1384 }
1385
1386 vnfr_dict.update(vnfd_copy_dict)
1387
1388 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1389 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1390
1391 vnfr_msg.create_time = self._create_time
1392 vnfr_msg.uptime = int(time.time()) - self._create_time
1393 vnfr_msg.mgmt_interface = mgmt_intf
1394
1395 # Add all the VLRs to VNFR
1396 for vlr in self._vlrs:
1397 ivlr = vnfr_msg.internal_vlr.add()
1398 ivlr.vlr_ref = vlr.vlr_id
1399
1400 # Add all the VDURs to VDUR
1401 if self._vdus is not None:
1402 for vdu in self._vdus:
1403 vdur = vnfr_msg.vdur.add()
1404 vdur.from_dict(vdu.msg.as_dict())
1405
1406 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1407 vnfr_msg.dashboard_url = self.dashboard_url
1408
1409 for cpr in self._cprs:
1410 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1411 vnfr_msg.connection_point.append(new_cp)
1412
1413 if self._vnf_mon is not None:
1414 for monp in self._vnf_mon.msg:
1415 vnfr_msg.monitoring_param.append(
1416 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1417
1418 if self._vnfr.vnf_configuration is not None:
1419 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1420 if (ip_address is not None and
1421 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1422 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1423
1424 for group in self._vnfr_msg.placement_groups_info:
1425 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1426 group_info.from_dict(group.as_dict())
1427 vnfr_msg.placement_groups_info.append(group_info)
1428
1429 return vnfr_msg
1430
1431 @property
1432 def dashboard_url(self):
1433 ip, cfg_port = self.mgmt_intf_info()
1434 protocol = 'http'
1435 http_port = 80
1436 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1437 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1438 protocol = 'https'
1439 http_port = 443
1440 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1441 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1442
1443 url = "{protocol}://{ip_address}:{port}/{path}".format(
1444 protocol=protocol,
1445 ip_address=ip,
1446 port=http_port,
1447 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1448 )
1449
1450 return url
1451
1452 @property
1453 def xpath(self):
1454 """ path for this VNFR """
1455 return("D,/vnfr:vnfr-catalog"
1456 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1457
1458 @asyncio.coroutine
1459 def publish(self, xact):
1460 """ publish this VNFR """
1461 vnfr = self.msg
1462 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1463 self.xpath, self.msg)
1464 vnfr.create_time = self._create_time
1465 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1466 self._log.debug("Published VNFR path = [%s], record = [%s]",
1467 self.xpath, self.msg)
1468
1469 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1470 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1471 if not vld.has_field('ip_profile_ref'):
1472 return None
1473 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1474 return profile[0] if profile else None
1475
1476 @asyncio.coroutine
1477 def create_vls(self):
1478 """ Publish The VLs associated with this VNF """
1479 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1480 self.vnfd_id)
1481 for ivld_msg in self.vnfd.internal_vld:
1482 self._log.debug("Creating internal vld:"
1483 " %s, int_cp_ref = %s",
1484 ivld_msg, ivld_msg.internal_connection_point
1485 )
1486 vlr = InternalVirtualLinkRecord(dts=self._dts,
1487 log=self._log,
1488 loop=self._loop,
1489 ivld_msg=ivld_msg,
1490 vnfr_name=self.name,
1491 cloud_account_name=self.cloud_account_name,
1492 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1493 )
1494 self._vlrs.append(vlr)
1495
1496 for int_cp in ivld_msg.internal_connection_point:
1497 if int_cp.id_ref in self._vlr_by_cp:
1498 msg = ("Connection point %s already "
1499 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1500 raise InternalVirtualLinkRecordError(msg)
1501 self._log.debug("Setting vlr %s to internal cp = %s",
1502 vlr, int_cp.id_ref)
1503 self._vlr_by_cp[int_cp.id_ref] = vlr
1504
1505 @asyncio.coroutine
1506 def instantiate_vls(self, xact, restart_mode=False):
1507 """ Instantiate the VLs associated with this VNF """
1508 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1509 self.vnfd_id)
1510
1511 for vlr in self._vlrs:
1512 self._log.debug("Instantiating VLR %s", vlr)
1513 yield from vlr.instantiate(xact, restart_mode)
1514
1515 def find_vlr_by_cp(self, cp_name):
1516 """ Find the VLR associated with the cp name """
1517 return self._vlr_by_cp[cp_name]
1518
1519 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1520 """
1521 Returns the cloud specific construct for placement group
1522 Arguments:
1523 input_group: VNFD PlacementGroup
1524 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1525 """
1526 copy_dict = ['name', 'requirement', 'strategy']
1527 for group_info in nsr_config.vnfd_placement_group_maps:
1528 if group_info.placement_group_ref == input_group.name and \
1529 group_info.vnfd_id_ref == self.vnfd_id:
1530 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1531 group_dict = {k:v for k,v in
1532 group_info.as_dict().items()
1533 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1534 for param in copy_dict:
1535 group_dict.update({param: getattr(input_group, param)})
1536 group.from_dict(group_dict)
1537 return group
1538 return None
1539
1540 @asyncio.coroutine
1541 def get_vdu_placement_groups(self, vdu, nsr_config):
1542 placement_groups = []
1543 ### Step-1: Get VNF level placement groups
1544 for group in self._vnfr_msg.placement_groups_info:
1545 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1546 #group_info.from_dict(group.as_dict())
1547 placement_groups.append(group)
1548
1549 ### Step-2: Get VDU level placement groups
1550 for group in self.vnfd.placement_groups:
1551 for member_vdu in group.member_vdus:
1552 if member_vdu.member_vdu_ref == vdu.id:
1553 group_info = self.resolve_placement_group_cloud_construct(group,
1554 nsr_config)
1555 if group_info is None:
1556 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1557 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1558 else:
1559 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1560 str(group_info),
1561 vdu.name,
1562 self.vnf_name,
1563 self.member_vnf_index)
1564 placement_groups.append(group_info)
1565
1566 return placement_groups
1567
1568 @asyncio.coroutine
1569 def vdu_cloud_init_instantiation(self):
1570 [vdu.vdud_cloud_init for vdu in self._vdus]
1571
1572 @asyncio.coroutine
1573 def create_vdus(self, vnfr, restart_mode=False):
1574 """ Create the VDUs associated with this VNF """
1575
1576 def get_vdur_id(vdud):
1577 """Get the corresponding VDUR's id for the VDUD. This is useful in
1578 case of a restart.
1579
1580 In restart mode we check for exiting VDUR's ID and use them, if
1581 available. This way we don't end up creating duplicate VDURs
1582 """
1583 vdur_id = None
1584
1585 if restart_mode and vdud is not None:
1586 try:
1587 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1588 vdur_id = vdur[0]
1589 except IndexError:
1590 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1591
1592 return vdur_id
1593
1594
1595 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1596
1597 # Get NSR config - Needed for placement groups and to derive VDU short-name
1598 nsr_config = yield from self.get_nsr_config()
1599
1600 for vdu in self._rw_vnfd.vdu:
1601 self._log.debug("Creating vdu: %s", vdu)
1602 vdur_id = get_vdur_id(vdu)
1603
1604
1605 placement_groups = yield from self.get_vdu_placement_groups(vdu, nsr_config)
1606 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s, Existing vdur_id %s",
1607 vdu.name,
1608 self.vnf_name,
1609 self.member_vnf_index,
1610 [ group.name for group in placement_groups],
1611 vdur_id)
1612
1613 vdur = VirtualDeploymentUnitRecord(
1614 dts=self._dts,
1615 log=self._log,
1616 loop=self._loop,
1617 vdud=vdu,
1618 vnfr=vnfr,
1619 nsr_config=nsr_config,
1620 mgmt_intf=self.has_mgmt_interface(vdu),
1621 mgmt_network=self._mgmt_network,
1622 cloud_account_name=self.cloud_account_name,
1623 vnfd_package_store=self._vnfd_package_store,
1624 vdur_id=vdur_id,
1625 placement_groups = placement_groups,
1626 )
1627 yield from vdur.vdu_opdata_register()
1628
1629 self._vdus.append(vdur)
1630
1631 @asyncio.coroutine
1632 def instantiate_vdus(self, xact, vnfr):
1633 """ Instantiate the VDUs associated with this VNF """
1634 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1635
1636 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1637
1638 # Identify any dependencies among the VDUs
1639 dependencies = collections.defaultdict(list)
1640 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1641
1642 for vdu in self._vdus:
1643 if vdu._vdud_cloud_init is not None:
1644 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1645 if vdu_id != vdu.vdu_id:
1646 # This means that vdu.vdu_id depends upon vdu_id,
1647 # i.e. vdu_id must be instantiated before
1648 # vdu.vdu_id.
1649 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1650
1651 # Define the terminal states of VDU instantiation
1652 terminal = (
1653 VDURecordState.READY,
1654 VDURecordState.TERMINATED,
1655 VDURecordState.FAILED,
1656 )
1657
1658 datastore = VdurDatastore()
1659 processed = set()
1660
1661 @asyncio.coroutine
1662 def instantiate_monitor(vdu):
1663 """Monitor the state of the VDU during instantiation
1664
1665 Arguments:
1666 vdu - a VirtualDeploymentUnitRecord
1667
1668 """
1669 # wait for the VDUR to enter a terminal state
1670 while vdu._state not in terminal:
1671 yield from asyncio.sleep(1, loop=self._loop)
1672 # update the datastore
1673 datastore.update(vdu)
1674
1675 # add the VDU to the set of processed VDUs
1676 processed.add(vdu.vdu_id)
1677
1678 @asyncio.coroutine
1679 def instantiate(vdu):
1680 """Instantiate the specified VDU
1681
1682 Arguments:
1683 vdu - a VirtualDeploymentUnitRecord
1684
1685 Raises:
1686 if the VDU, or any of the VDUs this VDU depends upon, are
1687 terminated or fail to instantiate properly, a
1688 VirtualDeploymentUnitRecordError is raised.
1689
1690 """
1691 for dependency in dependencies[vdu.vdu_id]:
1692 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1693
1694 while dependency.vdu_id not in processed:
1695 yield from asyncio.sleep(1, loop=self._loop)
1696
1697 if not dependency.active:
1698 raise VirtualDeploymentUnitRecordError()
1699
1700 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1701
1702 # Populate the datastore with the current values of the VDU
1703 datastore.add(vdu)
1704
1705 # Substitute any variables contained in the cloud config script
1706 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1707
1708 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1709 if len(parts) > 1:
1710
1711 # Extract the variable names
1712 variables = list()
1713 for variable in parts[1::2]:
1714 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1715
1716 # Iterate of the variables and substitute values from the
1717 # datastore.
1718 for variable in variables:
1719
1720 # Handle a reference to a VDU by ID
1721 if variable.startswith('vdu['):
1722 value = datastore.get(variable)
1723 if value is None:
1724 msg = "Unable to find a substitute for {} in {} cloud-init script"
1725 raise ValueError(msg.format(variable, vdu.vdu_id))
1726
1727 config = config.replace("{{ %s }}" % variable, value)
1728 continue
1729
1730 # Handle a reference to the current VDU
1731 if variable.startswith('vdu'):
1732 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1733 config = config.replace("{{ %s }}" % variable, value)
1734 continue
1735
1736 # Handle unrecognized variables
1737 msg = 'unrecognized cloud-config variable: {}'
1738 raise ValueError(msg.format(variable))
1739
1740 # Instantiate the VDU
1741 with self._dts.transaction() as xact:
1742 self._log.debug("Instantiating vdu: %s", vdu)
1743 yield from vdu.instantiate(xact, vnfr, config=config)
1744 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1745 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1746 self.vnfr_id, vdu)
1747
1748 # First create a set of tasks to monitor the state of the VDUs and
1749 # report when they have entered a terminal state
1750 for vdu in self._vdus:
1751 self._loop.create_task(instantiate_monitor(vdu))
1752
1753 for vdu in self._vdus:
1754 self._loop.create_task(instantiate(vdu))
1755
1756 def has_mgmt_interface(self, vdu):
1757 # ## TODO: Support additional mgmt_interface type options
1758 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1759 return True
1760 return False
1761
1762 def vlr_xpath(self, vlr_id):
1763 """ vlr xpath """
1764 return(
1765 "D,/vlr:vlr-catalog/"
1766 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1767
1768 def ext_vlr_by_id(self, vlr_id):
1769 """ find ext vlr by id """
1770 return self._ext_vlrs[vlr_id]
1771
1772 @asyncio.coroutine
1773 def publish_inventory(self, xact):
1774 """ Publish the inventory associated with this VNF """
1775 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1776
1777 for component in self._rw_vnfd.component:
1778 self._log.debug("Creating inventory component %s", component)
1779 mangled_name = VcsComponent.mangle_name(component.component_name,
1780 self.vnf_name,
1781 self.vnfd_id
1782 )
1783 comp = VcsComponent(dts=self._dts,
1784 log=self._log,
1785 loop=self._loop,
1786 cluster_name=self._cluster_name,
1787 vcs_handler=self._vcs_handler,
1788 component=component,
1789 mangled_name=mangled_name,
1790 )
1791 if comp.name in self._inventory:
1792 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1793 component, self._vnfd_id)
1794 return
1795 self._log.debug("Adding component %s for vnrf %s",
1796 comp.name, self._vnfr_id)
1797 self._inventory[comp.name] = comp
1798 yield from comp.publish(xact)
1799
1800 def all_vdus_active(self):
1801 """ Are all VDUS in this VNFR active? """
1802 for vdu in self._vdus:
1803 if not vdu.active:
1804 return False
1805
1806 self._log.debug("Inside all_vdus_active. Returning True")
1807 return True
1808
1809 @asyncio.coroutine
1810 def instantiation_failed(self, failed_reason=None):
1811 """ VNFR instantiation failed """
1812 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1813 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1814 self._state_failed_reason = failed_reason
1815
1816 # Update the VNFR with the changed status
1817 yield from self.publish(None)
1818
1819 @asyncio.coroutine
1820 def is_ready(self):
1821 """ This VNF is ready"""
1822 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1823
1824 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1825 self.set_state(VirtualNetworkFunctionRecordState.READY)
1826
1827 else:
1828 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1829
1830 # Update the VNFR with the changed status
1831 yield from self.publish(None)
1832
1833 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1834 """Updated the connection point with ip address"""
1835 for cp in self._cprs:
1836 if cp.name == cp_name:
1837 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1838 cp_name, cp, ip_address, cp_id)
1839 cp.ip_address = ip_address
1840 cp.mac_address = mac_addr
1841 cp.connection_point_id = cp_id
1842 return
1843
1844 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1845 self._log.debug(err)
1846 raise VirtualDeploymentUnitRecordError(err)
1847
1848 def set_state(self, state):
1849 """ Set state for this VNFR"""
1850 self._state = state
1851
1852 @asyncio.coroutine
1853 def instantiate(self, xact, restart_mode=False):
1854 """ instantiate this VNF """
1855 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1856 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1857
1858 @asyncio.coroutine
1859 def fetch_vlrs():
1860 """ Fetch VLRs """
1861 # Iterate over all the connection points in VNFR and fetch the
1862 # associated VLRs
1863
1864 def cpr_from_cp(cp):
1865 """ Creates a record level connection point from the desciptor cp"""
1866 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1867 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1868 cpr_dict = {}
1869 cpr_dict.update(cp_copy_dict)
1870 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1871
1872 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1873 self._vnfr_id, self._vnfr.connection_point)
1874
1875 for cp in self._vnfr.connection_point:
1876 cpr = cpr_from_cp(cp)
1877 self._cprs.append(cpr)
1878 self._log.debug("Adding Connection point record %s ", cp)
1879
1880 vlr_path = self.vlr_xpath(cp.vlr_ref)
1881 self._log.debug("Fetching VLR with path = %s", vlr_path)
1882 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1883 rwdts.XactFlag.MERGE)
1884 for i in res_iter:
1885 r = yield from i
1886 d = r.result
1887 self._ext_vlrs[cp.vlr_ref] = d
1888 cpr.vlr_ref = cp.vlr_ref
1889 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1890
1891 # Increase the VNFD reference count
1892 self.vnfd_ref()
1893
1894 assert self.vnfd
1895
1896 # Fetch External VLRs
1897 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1898 yield from fetch_vlrs()
1899
1900 # Publish inventory
1901 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1902 yield from self.publish_inventory(xact)
1903
1904 # Publish inventory
1905 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1906 yield from self.create_vls()
1907
1908 # publish the VNFR
1909 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1910 yield from self.publish(xact)
1911
1912
1913 # instantiate VLs
1914 self._log.debug("VNFR-ID %s: Instantiate VLs, restart mode %s", self._vnfr_id, restart_mode)
1915 try:
1916 yield from self.instantiate_vls(xact, restart_mode)
1917 except Exception as e:
1918 self._log.exception("VL instantiation failed (%s)", str(e))
1919 yield from self.instantiation_failed(str(e))
1920 return
1921
1922 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1923
1924 # instantiate VDUs
1925 self._log.debug("VNFR-ID %s: Create VDUs, restart mode %s", self._vnfr_id, restart_mode)
1926 yield from self.create_vdus(self, restart_mode)
1927
1928 try:
1929 yield from self.vdu_cloud_init_instantiation()
1930 except Exception as e:
1931 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1932 self._state_failed_reason = str(e)
1933 yield from self.publish(xact)
1934
1935 # publish the VNFR
1936 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1937 yield from self.publish(xact)
1938
1939 # instantiate VDUs
1940 # ToDo: Check if this should be prevented during restart
1941 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1942 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1943
1944 # publish the VNFR
1945 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1946 yield from self.publish(xact)
1947
1948 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1949
1950 # create task updating uptime for this vnfr
1951 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1952 self._loop.create_task(self.vnfr_uptime_update(xact))
1953
1954 @asyncio.coroutine
1955 def terminate(self, xact):
1956 """ Terminate this virtual network function """
1957
1958 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1959
1960 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1961
1962 # stop monitoring
1963 if self._vnf_mon is not None:
1964 self._vnf_mon.stop()
1965 self._vnf_mon.deregister()
1966 self._vnf_mon = None
1967
1968 @asyncio.coroutine
1969 def terminate_vls():
1970 """ Terminate VLs in this VNF """
1971 for vl in self._vlrs:
1972 yield from vl.terminate(xact)
1973
1974 @asyncio.coroutine
1975 def terminate_vdus():
1976 """ Terminate VDUS in this VNF """
1977 for vdu in self._vdus:
1978 yield from vdu.terminate(xact)
1979
1980 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1981 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1982 yield from terminate_vls()
1983
1984 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1985 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1986 yield from terminate_vdus()
1987
1988 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1989 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1990
1991 @asyncio.coroutine
1992 def vnfr_uptime_update(self, xact):
1993 while True:
1994 # Return when vnfr state is FAILED or TERMINATED etc
1995 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1996 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1997 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1998 VirtualNetworkFunctionRecordState.READY]:
1999 return
2000 yield from self.publish(xact)
2001 yield from asyncio.sleep(2, loop=self._loop)
2002
2003
2004
2005 class VnfdDtsHandler(object):
2006 """ DTS handler for VNFD config changes """
2007 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2008
2009 def __init__(self, dts, log, loop, vnfm):
2010 self._dts = dts
2011 self._log = log
2012 self._loop = loop
2013 self._vnfm = vnfm
2014 self._regh = None
2015
2016 @asyncio.coroutine
2017 def regh(self):
2018 """ DTS registration handle """
2019 return self._regh
2020
2021 @asyncio.coroutine
2022 def register(self):
2023 """ Register for VNFD configuration"""
2024
2025 def on_apply(dts, acg, xact, action, scratch):
2026 """Apply the configuration"""
2027 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2028 xact, action, scratch)
2029
2030 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2031
2032 @asyncio.coroutine
2033 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2034 """ on prepare callback """
2035 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
2036 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
2037 fref = ProtobufC.FieldReference.alloc()
2038 fref.goto_whole_message(msg.to_pbcm())
2039
2040 # Handle deletes in prepare_callback
2041 if fref.is_field_deleted():
2042 # Delete an VNFD record
2043 self._log.debug("Deleting VNFD with id %s", msg.id)
2044 if self._vnfm.vnfd_in_use(msg.id):
2045 self._log.debug("Cannot delete VNFD in use - %s", msg)
2046 err = "Cannot delete a VNFD in use - %s" % msg
2047 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
2048 # Delete a VNFD record
2049 yield from self._vnfm.delete_vnfd(msg.id)
2050
2051 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2052
2053 self._log.debug(
2054 "Registering for VNFD config using xpath: %s",
2055 VnfdDtsHandler.XPATH,
2056 )
2057 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2058 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2059 self._regh = acg.register(
2060 xpath=VnfdDtsHandler.XPATH,
2061 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2062 on_prepare=on_prepare)
2063
2064
2065 class VcsComponentDtsHandler(object):
2066 """ Vcs Component DTS handler """
2067 XPATH = ("D,/rw-manifest:manifest" +
2068 "/rw-manifest:operational-inventory" +
2069 "/rw-manifest:component")
2070
2071 def __init__(self, dts, log, loop, vnfm):
2072 self._dts = dts
2073 self._log = log
2074 self._loop = loop
2075 self._regh = None
2076 self._vnfm = vnfm
2077
2078 @property
2079 def regh(self):
2080 """ DTS registration handle """
2081 return self._regh
2082
2083 @asyncio.coroutine
2084 def register(self):
2085 """ Registers VCS component dts publisher registration"""
2086 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2087 VcsComponentDtsHandler.XPATH)
2088
2089 hdl = rift.tasklets.DTS.RegistrationHandler()
2090 handlers = rift.tasklets.Group.Handler()
2091 with self._dts.group_create(handler=handlers) as group:
2092 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2093 handler=hdl,
2094 flags=(rwdts.Flag.PUBLISHER |
2095 rwdts.Flag.NO_PREP_READ |
2096 rwdts.Flag.DATASTORE),)
2097
2098 @asyncio.coroutine
2099 def publish(self, xact, path, msg):
2100 """ Publishes the VCS component """
2101 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2102 xact, path, msg)
2103 self.regh.create_element(path, msg)
2104 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2105 VcsComponentDtsHandler.XPATH, xact, path, msg)
2106
2107 class VnfrConsoleOperdataDtsHandler(object):
2108 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2109 @property
2110 def vnfr_vdu_console_xpath(self):
2111 """ path for resource-mgr"""
2112 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2113
2114 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2115 self._dts = dts
2116 self._log = log
2117 self._loop = loop
2118 self._regh = None
2119 self._vnfm = vnfm
2120
2121 self._vnfr_id = vnfr_id
2122 self._vdur_id = vdur_id
2123 self._vdu_id = vdu_id
2124
2125 @asyncio.coroutine
2126 def register(self):
2127 """ Register for VNFR VDU Operational Data read from dts """
2128
2129 @asyncio.coroutine
2130 def on_prepare(xact_info, action, ks_path, msg):
2131 """ prepare callback from dts """
2132 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2133 self._log.debug(
2134 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2135 xact_info, action, xpath, msg
2136 )
2137
2138 if action == rwdts.QueryAction.READ:
2139 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2140 path_entry = schema.keyspec_to_entry(ks_path)
2141 self._log.debug("VDU Opdata path is {}".format(path_entry.key00.id))
2142 try:
2143 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2144 except VnfRecordError as e:
2145 self._log.error("VNFR id %s not found", self._vnfr_id)
2146 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2147 return
2148 try:
2149 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2150 if not vdur._state == VDURecordState.READY:
2151 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2152 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2153 return
2154 with self._dts.transaction() as new_xact:
2155 resp = yield from vdur.read_resource(new_xact)
2156 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2157 vdur_console.id = self._vdur_id
2158 if resp.console_url:
2159 vdur_console.console_url = resp.console_url
2160 else:
2161 vdur_console.console_url = 'none'
2162 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2163 except Exception:
2164 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2165 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2166 vdur_console.id = self._vdur_id
2167 vdur_console.console_url = 'none'
2168
2169 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2170 xpath=self.vnfr_vdu_console_xpath,
2171 msg=vdur_console)
2172 else:
2173 #raise VnfRecordError("Not supported operation %s" % action)
2174 self._log.error("Not supported operation %s" % action)
2175 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2176 return
2177
2178
2179 self._log.debug("Registering for VNFR VDU using xpath: %s",
2180 self.vnfr_vdu_console_xpath)
2181 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2182 with self._dts.group_create() as group:
2183 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2184 handler=hdl,
2185 flags=rwdts.Flag.PUBLISHER,
2186 )
2187
2188
2189 class VnfrDtsHandler(object):
2190 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2191 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2192
2193 def __init__(self, dts, log, loop, vnfm):
2194 self._dts = dts
2195 self._log = log
2196 self._loop = loop
2197 self._vnfm = vnfm
2198
2199 self._regh = None
2200
2201 @property
2202 def regh(self):
2203 """ Return registration handle"""
2204 return self._regh
2205
2206 @property
2207 def vnfm(self):
2208 """ Return VNF manager instance """
2209 return self._vnfm
2210
2211 @asyncio.coroutine
2212 def register(self):
2213 """ Register for vnfr create/update/delete/read requests from dts """
2214 def on_commit(xact_info):
2215 """ The transaction has been committed """
2216 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2217 return rwdts.MemberRspCode.ACTION_OK
2218
2219 def on_abort(*args):
2220 """ Abort callback """
2221 self._log.debug("VNF transaction got aborted")
2222
2223 @asyncio.coroutine
2224 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2225
2226 @asyncio.coroutine
2227 def instantiate_realloc_vnfr(vnfr):
2228 """Re-populate the vnfm after restart
2229
2230 Arguments:
2231 vlink
2232
2233 """
2234
2235 yield from vnfr.instantiate(None, restart_mode=True)
2236
2237 if xact_event == rwdts.MemberEvent.INSTALL:
2238 curr_cfg = self.regh.elements
2239 for cfg in curr_cfg:
2240 vnfr = self.vnfm.create_vnfr(cfg)
2241 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2242
2243 self._log.debug("Got on_event in vnfm")
2244
2245 return rwdts.MemberRspCode.ACTION_OK
2246
2247 @asyncio.coroutine
2248 def on_prepare(xact_info, action, ks_path, msg):
2249 """ prepare callback from dts """
2250 self._log.debug(
2251 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2252 xact_info, action, msg
2253 )
2254
2255 if action == rwdts.QueryAction.CREATE:
2256 if not msg.has_field("vnfd"):
2257 err = "Vnfd not provided"
2258 self._log.error(err)
2259 raise VnfRecordError(err)
2260
2261 vnfr = self.vnfm.create_vnfr(msg)
2262 try:
2263 # RIFT-9105: Unable to add a READ query under an existing transaction
2264 # xact = xact_info.xact
2265 yield from vnfr.instantiate(None)
2266 except Exception as e:
2267 self._log.exception(e)
2268 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2269 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2270 yield from vnfr.publish(None)
2271 elif action == rwdts.QueryAction.DELETE:
2272 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2273 path_entry = schema.keyspec_to_entry(ks_path)
2274 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2275
2276 if vnfr is None:
2277 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2278 raise VirtualNetworkFunctionRecordNotFound(
2279 "VNFR id %s", path_entry.key00.id)
2280
2281 try:
2282 yield from vnfr.terminate(xact_info.xact)
2283 # Unref the VNFD
2284 vnfr.vnfd_unref()
2285 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2286 except Exception as e:
2287 self._log.exception(e)
2288 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2289
2290 elif action == rwdts.QueryAction.UPDATE:
2291 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2292 path_entry = schema.keyspec_to_entry(ks_path)
2293 vnfr = None
2294 try:
2295 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2296 except Exception as e:
2297 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2298 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2299 return
2300
2301 if vnfr is None:
2302 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2303 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2304 return
2305
2306 self._log.debug("VNFR {} update config status {} (current {})".
2307 format(vnfr.name, msg.config_status, vnfr.config_status))
2308 # Update the config status and publish
2309 vnfr._config_status = msg.config_status
2310 yield from vnfr.publish(None)
2311
2312 else:
2313 raise NotImplementedError(
2314 "%s action on VirtualNetworkFunctionRecord not supported",
2315 action)
2316
2317 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2318
2319 self._log.debug("Registering for VNFR using xpath: %s",
2320 VnfrDtsHandler.XPATH,)
2321
2322 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2323 on_prepare=on_prepare,)
2324 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2325 with self._dts.group_create(handler=handlers) as group:
2326 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2327 handler=hdl,
2328 flags=(rwdts.Flag.PUBLISHER |
2329 rwdts.Flag.NO_PREP_READ |
2330 rwdts.Flag.CACHE |
2331 rwdts.Flag.DATASTORE),)
2332
2333 @asyncio.coroutine
2334 def create(self, xact, path, msg):
2335 """
2336 Create a VNFR record in DTS with path and message
2337 """
2338 self._log.debug("Creating VNFR xact = %s, %s:%s",
2339 xact, path, msg)
2340
2341 self.regh.create_element(path, msg)
2342 self._log.debug("Created VNFR xact = %s, %s:%s",
2343 xact, path, msg)
2344
2345 @asyncio.coroutine
2346 def update(self, xact, path, msg):
2347 """
2348 Update a VNFR record in DTS with path and message
2349 """
2350 self._log.debug("Updating VNFR xact = %s, %s:%s",
2351 xact, path, msg)
2352 self.regh.update_element(path, msg)
2353 self._log.debug("Updated VNFR xact = %s, %s:%s",
2354 xact, path, msg)
2355
2356 @asyncio.coroutine
2357 def delete(self, xact, path):
2358 """
2359 Delete a VNFR record in DTS with path and message
2360 """
2361 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2362 self.regh.delete_element(path)
2363 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2364
2365
2366 class VnfdRefCountDtsHandler(object):
2367 """ The VNFD Ref Count DTS handler """
2368 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2369
2370 def __init__(self, dts, log, loop, vnfm):
2371 self._dts = dts
2372 self._log = log
2373 self._loop = loop
2374 self._vnfm = vnfm
2375
2376 self._regh = None
2377
2378 @property
2379 def regh(self):
2380 """ Return registration handle """
2381 return self._regh
2382
2383 @property
2384 def vnfm(self):
2385 """ Return the NS manager instance """
2386 return self._vnfm
2387
2388 @asyncio.coroutine
2389 def register(self):
2390 """ Register for VNFD ref count read from dts """
2391
2392 @asyncio.coroutine
2393 def on_prepare(xact_info, action, ks_path, msg):
2394 """ prepare callback from dts """
2395 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2396 self._log.debug(
2397 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2398 xact_info, action, xpath, msg
2399 )
2400
2401 if action == rwdts.QueryAction.READ:
2402 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2403 path_entry = schema.keyspec_to_entry(ks_path)
2404 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2405 for xpath, msg in vnfd_list:
2406 self._log.debug("Responding to ref count query path:%s, msg:%s",
2407 xpath, msg)
2408 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2409 xpath=xpath,
2410 msg=msg)
2411 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2412 else:
2413 raise VnfRecordError("Not supported operation %s" % action)
2414
2415 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2416 with self._dts.group_create() as group:
2417 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2418 handler=hdl,
2419 flags=rwdts.Flag.PUBLISHER,
2420 )
2421
2422
2423 class VdurDatastore(object):
2424 """
2425 This VdurDatastore is intended to expose select information about a VDUR
2426 such that it can be referenced in a cloud config file. The data that is
2427 exposed does not necessarily follow the structure of the data in the yang
2428 model. This is intentional. The data that are exposed are intended to be
2429 agnostic of the yang model so that changes in the model do not necessarily
2430 require changes to the interface provided to the user. It also means that
2431 the user does not need to be familiar with the RIFT.ware yang models.
2432 """
2433
2434 def __init__(self):
2435 """Create an instance of VdurDatastore"""
2436 self._vdur_data = dict()
2437 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2438
2439 def add(self, vdur):
2440 """Add a new VDUR to the datastore
2441
2442 Arguments:
2443 vdur - a VirtualDeploymentUnitRecord instance
2444
2445 Raises:
2446 A ValueError is raised if the VDUR is (1) None or (2) already in
2447 the datastore.
2448
2449 """
2450 if vdur.vdu_id is None:
2451 raise ValueError('VDURs are required to have an ID')
2452
2453 if vdur.vdu_id in self._vdur_data:
2454 raise ValueError('cannot add a VDUR more than once')
2455
2456 self._vdur_data[vdur.vdu_id] = dict()
2457
2458 def set_if_not_none(key, attr):
2459 if attr is not None:
2460 self._vdur_data[vdur.vdu_id][key] = attr
2461
2462 set_if_not_none('name', vdur._vdud.name)
2463 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2464 # The below can be used for hostname
2465 set_if_not_none('vdur_name', vdur.unique_short_name)
2466
2467 def update(self, vdur):
2468 """Update the VDUR information in the datastore
2469
2470 Arguments:
2471 vdur - a GI representation of a VDUR
2472
2473 Raises:
2474 A ValueError is raised if the VDUR is (1) None or (2) already in
2475 the datastore.
2476
2477 """
2478 if vdur.vdu_id is None:
2479 raise ValueError('VNFDs are required to have an ID')
2480
2481 if vdur.vdu_id not in self._vdur_data:
2482 raise ValueError('VNF is not recognized')
2483
2484 def set_or_delete(key, attr):
2485 if attr is None:
2486 if key in self._vdur_data[vdur.vdu_id]:
2487 del self._vdur_data[vdur.vdu_id][key]
2488
2489 else:
2490 self._vdur_data[vdur.vdu_id][key] = attr
2491
2492 set_or_delete('name', vdur._vdud.name)
2493 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2494 # The below can be used for hostname
2495 set_or_delete('vdur_name', vdur.unique_short_name)
2496
2497 def remove(self, vdur_id):
2498 """Remove all of the data associated with specified VDUR
2499
2500 Arguments:
2501 vdur_id - the identifier of a VNFD in the datastore
2502
2503 Raises:
2504 A ValueError is raised if the VDUR is not contained in the
2505 datastore.
2506
2507 """
2508 if vdur_id not in self._vdur_data:
2509 raise ValueError('VNF is not recognized')
2510
2511 del self._vdur_data[vdur_id]
2512
2513 def get(self, expr):
2514 """Retrieve VDUR information from the datastore
2515
2516 An expression should be of the form,
2517
2518 vdu[<id>].<attr>
2519
2520 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2521 the exposed attribute that the user wishes to retrieve.
2522
2523 If the requested data is not available, None is returned.
2524
2525 Arguments:
2526 expr - a string that specifies the data to return
2527
2528 Raises:
2529 A ValueError is raised if the provided expression cannot be parsed.
2530
2531 Returns:
2532 The requested data or None
2533
2534 """
2535 result = self._pattern.match(expr)
2536 if result is None:
2537 raise ValueError('data expression not recognized ({})'.format(expr))
2538
2539 vdur_id, key = result.groups()
2540
2541 if vdur_id not in self._vdur_data:
2542 return None
2543
2544 return self._vdur_data[vdur_id].get(key, None)
2545
2546
2547 class VnfManager(object):
2548 """ The virtual network function manager class """
2549 def __init__(self, dts, log, loop, cluster_name):
2550 self._dts = dts
2551 self._log = log
2552 self._loop = loop
2553 self._cluster_name = cluster_name
2554
2555 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2556 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2557 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2558 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2559
2560 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2561 self._vnfr_handler,
2562 self._vcs_handler,
2563 self._vnfr_ref_handler,
2564 self._nsr_handler]
2565 self._vnfrs = {}
2566 self._vnfds_to_vnfr = {}
2567 self._nsrs = {}
2568
2569 @property
2570 def vnfr_handler(self):
2571 """ VNFR dts handler """
2572 return self._vnfr_handler
2573
2574 @property
2575 def vcs_handler(self):
2576 """ VCS dts handler """
2577 return self._vcs_handler
2578
2579 @asyncio.coroutine
2580 def register(self):
2581 """ Register all static DTS handlers """
2582 for hdl in self._dts_handlers:
2583 yield from hdl.register()
2584
2585 @asyncio.coroutine
2586 def run(self):
2587 """ Run this VNFM instance """
2588 self._log.debug("Run VNFManager - registering static DTS handlers""")
2589 yield from self.register()
2590
2591 def handle_nsr(self, nsr, action):
2592 if action in [rwdts.QueryAction.CREATE]:
2593 self._nsrs[nsr.id] = nsr
2594 elif action == rwdts.QueryAction.DELETE:
2595 if nsr.id in self._nsrs:
2596 del self._nsrs[nsr.id]
2597
2598 def get_linked_mgmt_network(self, vnfr):
2599 """For the given VNFR get the related mgmt network from the NSD, if
2600 available.
2601 """
2602 vnfd_id = vnfr.vnfd.id
2603 nsr_id = vnfr.nsr_id_ref
2604
2605 # for the given related VNFR, get the corresponding NSR-config
2606 nsr_obj = None
2607 try:
2608 nsr_obj = self._nsrs[nsr_id]
2609 except KeyError:
2610 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2611
2612 # for the related NSD check if a VLD exists such that it's a mgmt
2613 # network
2614 for vld in nsr_obj.nsd.vld:
2615 if vld.mgmt_network:
2616 return vld.name
2617
2618 return None
2619
2620 def get_vnfr(self, vnfr_id):
2621 """ get VNFR by vnfr id """
2622
2623 if vnfr_id not in self._vnfrs:
2624 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2625
2626 return self._vnfrs[vnfr_id]
2627
2628 def create_vnfr(self, vnfr):
2629 """ Create a VNFR instance """
2630 if vnfr.id in self._vnfrs:
2631 msg = "Vnfr id %s already exists" % vnfr.id
2632 self._log.error(msg)
2633 raise VnfRecordError(msg)
2634
2635 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2636 vnfr.id,
2637 vnfr.vnfd.id)
2638
2639 mgmt_network = self.get_linked_mgmt_network(vnfr)
2640
2641 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2642 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2643 mgmt_network=mgmt_network
2644 )
2645
2646 #Update ref count
2647 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2648 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2649 else:
2650 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2651
2652 return self._vnfrs[vnfr.id]
2653
2654 @asyncio.coroutine
2655 def delete_vnfr(self, xact, vnfr):
2656 """ Create a VNFR instance """
2657 if vnfr.vnfr_id in self._vnfrs:
2658 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2659 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2660
2661 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2662 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2663 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2664
2665 del self._vnfrs[vnfr.vnfr_id]
2666
2667 @asyncio.coroutine
2668 def fetch_vnfd(self, vnfd_id):
2669 """ Fetch VNFDs based with the vnfd id"""
2670 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2671 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2672 vnfd = None
2673
2674 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2675
2676 for ent in res_iter:
2677 res = yield from ent
2678 vnfd = res.result
2679
2680 if vnfd is None:
2681 err = "Failed to get Vnfd %s" % vnfd_id
2682 self._log.error(err)
2683 raise VnfRecordError(err)
2684
2685 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2686
2687 return vnfd
2688
2689 def vnfd_in_use(self, vnfd_id):
2690 """ Is this VNFD in use """
2691 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2692 if vnfd_id in self._vnfds_to_vnfr:
2693 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2694 return False
2695
2696 @asyncio.coroutine
2697 def publish_vnfr(self, xact, path, msg):
2698 """ Publish a VNFR """
2699 self._log.debug("publish_vnfr called with path %s, msg %s",
2700 path, msg)
2701 yield from self.vnfr_handler.update(xact, path, msg)
2702
2703 @asyncio.coroutine
2704 def delete_vnfd(self, vnfd_id):
2705 """ Delete the Virtual Network Function descriptor with the passed id """
2706 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2707 if vnfd_id in self._vnfds_to_vnfr:
2708 if self._vnfds_to_vnfr[vnfd_id]:
2709 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2710 vnfd_id,
2711 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2712 raise VirtualNetworkFunctionDescriptorRefCountExists(
2713 "Cannot delete :%s, ref_count:%s",
2714 vnfd_id,
2715 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2716
2717 del self._vnfds_to_vnfr[vnfd_id]
2718
2719 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2720 try:
2721 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2722 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2723 if os.path.exists(vnfd_dir):
2724 shutil.rmtree(vnfd_dir, ignore_errors=True)
2725 except Exception as e:
2726 self._log.error("Exception in cleaning up VNFD {}: {}".
2727 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2728 self._log.exception(e)
2729
2730
2731 def vnfd_refcount_xpath(self, vnfd_id):
2732 """ xpath for ref count entry """
2733 return (VnfdRefCountDtsHandler.XPATH +
2734 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2735
2736 @asyncio.coroutine
2737 def get_vnfd_refcount(self, vnfd_id):
2738 """ Get the vnfd_list from this VNFM"""
2739 vnfd_list = []
2740 if vnfd_id is None or vnfd_id == "":
2741 for vnfd in self._vnfds_to_vnfr.keys():
2742 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2743 vnfd_msg.vnfd_id_ref = vnfd
2744 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2745 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2746 elif vnfd_id in self._vnfds_to_vnfr:
2747 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2748 vnfd_msg.vnfd_id_ref = vnfd_id
2749 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2750 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2751
2752 return vnfd_list
2753
2754
2755 class VnfmTasklet(rift.tasklets.Tasklet):
2756 """ VNF Manager tasklet class """
2757 def __init__(self, *args, **kwargs):
2758 super(VnfmTasklet, self).__init__(*args, **kwargs)
2759 self.rwlog.set_category("rw-mano-log")
2760 self.rwlog.set_subcategory("vnfm")
2761
2762 self._dts = None
2763 self._vnfm = None
2764
2765 def start(self):
2766 try:
2767 super(VnfmTasklet, self).start()
2768 self.log.info("Starting VnfmTasklet")
2769
2770 self.log.setLevel(logging.DEBUG)
2771
2772 self.log.debug("Registering with dts")
2773 self._dts = rift.tasklets.DTS(self.tasklet_info,
2774 RwVnfmYang.get_schema(),
2775 self.loop,
2776 self.on_dts_state_change)
2777
2778 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2779 except Exception:
2780 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2781 raise
2782
2783 def on_instance_started(self):
2784 """ Task insance started callback """
2785 self.log.debug("Got instance started callback")
2786
2787 def stop(self):
2788 try:
2789 self._dts.deinit()
2790 except Exception:
2791 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2792 raise
2793
2794 @asyncio.coroutine
2795 def init(self):
2796 """ Task init callback """
2797 try:
2798 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2799 assert vm_parent_name is not None
2800 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2801 yield from self._vnfm.run()
2802 except Exception:
2803 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2804 raise
2805
2806 @asyncio.coroutine
2807 def run(self):
2808 """ Task run callback """
2809 pass
2810
2811 @asyncio.coroutine
2812 def on_dts_state_change(self, state):
2813 """Take action according to current dts state to transition
2814 application into the corresponding application state
2815
2816 Arguments
2817 state - current dts state
2818 """
2819 switch = {
2820 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2821 rwdts.State.CONFIG: rwdts.State.RUN,
2822 }
2823
2824 handlers = {
2825 rwdts.State.INIT: self.init,
2826 rwdts.State.RUN: self.run,
2827 }
2828
2829 # Transition application to next state
2830 handler = handlers.get(state, None)
2831 if handler is not None:
2832 yield from handler()
2833
2834 # Transition dts to next state
2835 next_state = switch.get(state, None)
2836 if next_state is not None:
2837 self._dts.handle.set_state(next_state)