1 # Copyright 2016 RIFT.IO Inc
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('VnfrYang', '1.0')
32 gi
.require_version('RwVnfmYang', '1.0')
33 gi
.require_version('RwVnfdYang', '1.0')
34 gi
.require_version('RwVlrYang', '1.0')
35 gi
.require_version('RwManifestYang', '1.0')
36 gi
.require_version('RwBaseYang', '1.0')
37 gi
.require_version('RwResourceMgrYang', '1.0')
39 from gi
.repository
import (
53 gi
.require_version('RwKeyspec', '1.0')
54 from gi
.repository
.RwKeyspec
import quoted_key
57 import rift
.package
.store
58 import rift
.package
.cloud_init
59 import rift
.package
.script
60 import rift
.mano
.dts
as mano_dts
61 from rift
.mano
.utils
.project
import (
65 import rift
.mano
.utils
.short_name
as mano_short_name
66 from . import subscriber
68 VCP_FIELDS
= ['name', 'id', 'connection_point_id', 'type_yang', 'ip_address', 'mac_address']
70 class VMResourceError(Exception):
71 """ VM resource Error"""
75 class VnfRecordError(Exception):
76 """ VNF record instatiation failed"""
80 class VduRecordError(Exception):
81 """ VDU record instatiation failed"""
85 class NotImplemented(Exception):
86 """Not implemented """
90 class VnfrRecordExistsError(Exception):
91 """VNFR record already exist with the same VNFR id"""
95 class InternalVirtualLinkRecordError(Exception):
96 """Internal virtual link record error"""
100 class VDUImageNotFound(Exception):
101 """VDU Image not found error"""
105 class VirtualDeploymentUnitRecordError(Exception):
106 """VDU Instantiation failed"""
110 class VMNotReadyError(Exception):
111 """ VM Not yet received from resource manager """
115 class VDURecordNotFound(Exception):
116 """ Could not find a VDU record """
120 class VirtualNetworkFunctionRecordDescNotFound(Exception):
121 """ Cannot find Virtual Network Function Record Descriptor """
125 class VirtualNetworkFunctionDescriptorError(Exception):
126 """ Virtual Network Function Record Descriptor Error """
130 class VirtualNetworkFunctionDescriptorNotFound(Exception):
131 """ Virtual Network Function Record Descriptor Not Found """
135 class VirtualNetworkFunctionRecordNotFound(Exception):
136 """ Virtual Network Function Record Not Found """
140 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
141 """ Virtual Network Funtion Descriptor reference count exists """
145 class VnfrInstantiationFailed(Exception):
146 """ Virtual Network Funtion Instantiation failed"""
150 class VNFMPlacementGroupError(Exception):
151 """ VNF placement group Error """
155 class VlrError(Exception):
156 """ Virtual Link Record Error """
160 class VirtualNetworkFunctionRecordState(enum
.Enum
):
168 VL_TERMINATE_PHASE
= 6
169 VDU_TERMINATE_PHASE
= 7
174 class VDURecordState(enum
.Enum
):
175 """VDU record state """
178 RESOURCE_ALLOC_PENDING
= 3
184 class VirtualDeploymentUnitRecord(object):
185 """ Virtual Deployment Unit Record """
199 placement_groups
=[]):
203 self
._project
= project
206 self
._nsr
_config
= nsr_config
207 self
._mgmt
_intf
= mgmt_intf
208 self
._datacenter
_name
= datacenter_name
209 self
._vnfd
_package
_store
= vnfd_package_store
210 self
._mgmt
_network
= mgmt_network
212 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
215 self
._state
= VDURecordState
.INIT
216 self
._state
_failed
_reason
= None
217 self
._request
_id
= str(uuid
.uuid4())
218 self
._name
= vnfr
.name
+ "__" + vdud
.id
219 self
._placement
_groups
= placement_groups
222 self
._vdud
_cloud
_init
= None
223 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(
224 dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
228 def vdu_opdata_register(self
):
229 yield from self
._vdur
_console
_handler
.register()
231 def vm_cp_info(self
, cp_name
):
232 """ Find the VM Connection info by connection point name """
233 if self
._vm
_resp
is not None:
234 for conn_point
in self
._vm
_resp
.connection_points
:
235 if conn_point
.name
== cp_name
:
239 def cp_ip_addr(self
, cp_name
):
240 """ Find ip address by connection point name """
241 vm_cp_info
= self
.vm_cp_info(cp_name
)
243 return vm_cp_info
.ip_address
247 def cp_mac_addr(self
, cp_name
):
248 """ Find mac address by connection point name """
249 vm_cp_info
= self
.vm_cp_info(cp_name
)
251 return vm_cp_info
.mac_addr
253 return "00:00:00:00:00:00"
255 def cp_id(self
, cp_name
):
256 """ Find connection point id by connection point name """
257 vm_cp_info
= self
.vm_cp_info(cp_name
)
259 return vm_cp_info
.connection_point_id
274 """ Return this VDUR's name """
277 # Truncated name confirming to RFC 1123
279 def unique_short_name(self
):
280 """ Return this VDUR's unique short name """
281 # Impose these restrictions on Unique name
283 # - Max trailing 10 chars of NSR name (remove all specialcharacters, only numbers and alphabets)
284 # - 9 chars of shortened name
285 # - Max trailing 10 of VDU name (remove all specialcharacters, only numbers and alphabets)
287 def _restrict_tag(input_str
):
288 # Exclude all characters except a-zA-Z0-9
289 outstr
= re
.sub('[^a-zA-Z0-9]', '', input_str
)
290 # Take max of 10 chars
293 # Use NSR name for part1
294 part1
= _restrict_tag(self
._nsr
_config
.name
)
295 # Get unique short string (6 chars)
296 part2
= mano_short_name
.StringShortner(self
._name
)
297 # Use VDU ID for part3
298 part3
= _restrict_tag(self
._vdud
.id)
299 shortstr
= part1
+ "-" + part2
.short_string
+ "-" + part3
303 def datacenter_name(self
):
304 """ Cloud account this VDU should be created in """
305 return self
._datacenter
_name
308 def image_name(self
):
309 """ name that should be used to lookup the image on the CMP """
310 if 'image' not in self
._vdud
:
312 return os
.path
.basename(self
._vdud
.image
)
315 def image_checksum(self
):
316 """ name that should be used to lookup the image on the CMP """
317 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
320 def management_ip(self
):
323 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
326 def vm_management_ip(self
):
329 return self
._vm
_resp
.management_ip
332 def operational_status(self
):
333 """ Operational status of this VDU"""
334 op_stats_dict
= {"INIT": "init",
335 "INSTANTIATING": "vm_init_phase",
336 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
339 "TERMINATING": "terminated",
340 "TERMINATED": "terminated",
342 return op_stats_dict
[self
._state
.name
]
346 """ Process VDU message from resmgr"""
347 vdu_fields
= ["vm_flavor",
355 vdu_copy_dict
= {k
: v
for k
, v
in
356 self
._vdud
.as_dict().items() if k
in vdu_fields
}
357 vdur_dict
= {"id": self
._vdur
_id
,
358 "vdu_id_ref": self
._vdud
.id,
359 "operational_status": self
.operational_status
,
360 "operational_status_details": self
._state
_failed
_reason
,
362 "unique_short_name": self
.unique_short_name
366 if self
.vm_resp
is not None:
367 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
368 "flavor_id": self
.vm_resp
.flavor_id
370 if self
._vm
_resp
.has_field('image_id'):
371 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
373 if self
.management_ip
:
374 vdur_dict
["management_ip"] = self
.management_ip
376 if self
.vm_management_ip
:
377 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
379 vdur_dict
.update(vdu_copy_dict
)
382 if self
.vm_resp
is not None:
383 if self
._vm
_resp
.has_field('volumes'):
384 for opvolume
in self
._vm
_resp
.volumes
:
385 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
386 if len(vdurvol_data
) == 1:
387 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
388 if opvolume
.has_field('custom_meta_data'):
389 metadata_list
= list()
390 for metadata_item
in opvolume
.custom_meta_data
:
391 metadata_list
.append(metadata_item
.as_dict())
392 vdurvol_data
[0]['custom_meta_data'] = metadata_list
394 if self
._vm
_resp
.has_field('supplemental_boot_data'):
395 vdur_dict
['supplemental_boot_data'] = dict()
396 if self
._vm
_resp
.supplemental_boot_data
.has_field('boot_data_drive'):
397 vdur_dict
['supplemental_boot_data']['boot_data_drive'] = self
._vm
_resp
.supplemental_boot_data
.boot_data_drive
398 if self
._vm
_resp
.supplemental_boot_data
.has_field('custom_meta_data'):
399 metadata_list
= list()
401 # supplemental_boot_data below is returned by Openstack.
402 # The self._vm_resp version of supplemental data is defaulting to CLOUD_METADATA
403 # as Openstack does not repond with 'destination' attribute of custom meta data elements.
404 # Therefore the vdur when published does not specify the destination of the custom-meta-data.
405 # Should we add this field (destination) explicitly here by comparig the keys with the already obtained
406 # details in self._vdud ?
408 for metadata_item
in self
._vm
_resp
.supplemental_boot_data
.custom_meta_data
:
409 metadata_list
.append(metadata_item
.as_dict())
410 vdur_dict
['supplemental_boot_data']['custom_meta_data'] = metadata_list
412 if self
._vm
_resp
.supplemental_boot_data
.has_field('config_file'):
414 for file_item
in self
._vm
_resp
.supplemental_boot_data
.config_file
:
415 file_list
.append(file_item
.as_dict())
416 vdur_dict
['supplemental_boot_data']['config_file'] = file_list
421 for intf
, cp_id
, vlr
in self
._int
_intf
:
422 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
424 cp_info
= dict(name
=cp
.name
,
427 ip_address
=self
.cp_ip_addr(cp
.name
),
428 mac_address
=self
.cp_mac_addr(cp
.name
),
429 connection_point_id
=self
.cp_id(cp
.name
))
431 virtual_cps
= [ vcp
for vcp
in vlr
._vlr
.virtual_connection_points
432 if [ True for cp_ref
in vcp
.associated_cps
if cp
.name
== cp_ref
]]
435 for vcp
in virtual_cps
:
436 cp_info
['virtual_cps'] = [ {k
:v
for k
,v
in vcp
.as_dict().items() if k
in VCP_FIELDS
}
437 for vcp
in virtual_cps
]
439 icp_list
.append(cp_info
)
441 ii_dict
= {"name": intf
.name
,
442 "internal_connection_point_ref": cp
.id,
443 "virtual_interface": {}}
445 if "position" in intf
.as_dict():
446 ii_dict
["position"] = intf
.position
448 ii_list
.append(ii_dict
)
450 vdur_dict
["internal_connection_point"] = icp_list
451 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
455 for intf
, cp
, vlr
in self
._ext
_intf
:
456 ei_dict
= {"name": intf
.name
,
457 "external_connection_point_ref": cp
.name
,
458 "virtual_interface": {}}
459 if "position" in intf
.as_dict():
460 ei_dict
["position"] = intf
.position
462 ei_list
.append(ei_dict
)
464 virtual_cps
= [ vcp
for vcp
in vlr
.virtual_connection_points
465 if [ True for cp_ref
in vcp
.associated_cps
if cp
.name
== cp_ref
]]
468 for vcp
in virtual_cps
:
469 virtual_cp_info
= [ {k
:v
for k
,v
in vcp
.as_dict().items() if k
in VCP_FIELDS
}
470 for vcp
in virtual_cps
]
474 self
._vnfr
.update_cp(cp
.name
,
475 self
.cp_ip_addr(cp
.name
),
476 self
.cp_mac_addr(cp
.name
),
480 vdur_dict
["interface"] = ei_list
+ ii_list
483 vdur_dict
['placement_groups_info'] = [group
.as_dict()
484 for group
in self
._placement
_groups
]
486 return RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
489 def resmgr_path(self
):
490 """ path for resource-mgr"""
491 xpath
= self
._project
.add_project("D,/rw-resource-mgr:resource-mgmt" +
493 "/vdu-event-data[event-id={}]".format(quoted_key(self
._request
_id
)))
497 def vm_flavor_msg(self
):
498 """ VM flavor message """
499 flavor
= self
._vdud
.vm_flavor
.__class
__()
500 flavor
.copy_from(self
._vdud
.vm_flavor
)
505 def vdud_cloud_init(self
):
506 """ Return the cloud-init contents for the VDU """
507 if self
._vdud
_cloud
_init
is None:
508 ci
= self
.cloud_init()
510 # VNFR ssh public key, if available
511 if self
._vnfr
.public_key
:
514 self
._vdud
_cloud
_init
= """{}
517 format(ci
, self
._vnfr
.public_key
)
519 self
._vdud
_cloud
_init
= ci
521 self
._log
.debug("Cloud init: {}".format(self
._vdud
_cloud
_init
))
523 return self
._vdud
_cloud
_init
525 def cloud_init(self
):
526 """ Populate cloud_init with cloud-config script from
527 either the inline contents or from the file provided
529 cloud_init_msg
= None
530 if self
._vdud
.cloud_init
is not None:
531 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
532 cloud_init_msg
= self
._vdud
.cloud_init
533 elif self
._vdud
.cloud_init_file
is not None:
534 # Get cloud-init script contents from the file provided in the cloud_init_file param
535 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
536 filename
= self
._vdud
.cloud_init_file
537 self
._vnfd
_package
_store
.refresh()
538 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
539 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
541 cloud_init_msg
= cloud_init_extractor
.read_script(stored_package
, filename
)
542 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
543 self
.instantiation_failed(str(e
))
544 raise VirtualDeploymentUnitRecordError(e
)
546 if not self
._vnfr
._vnfr
_msg
.cloud_config
.key_pair
and not self
._vnfr
._vnfr
_msg
.cloud_config
.user
:
547 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
550 self
._log
.debug("Current cloud init msg is {}".format(cloud_init_msg
))
551 if not self
._vnfr
._vnfr
_msg
.cloud_config
.key_pair
and not self
._vnfr
._vnfr
_msg
.cloud_config
.user
:
552 return cloud_init_msg
557 cloud_init_dict
= yaml
.load(cloud_init_msg
)
558 except Exception as e
:
559 self
._log
.exception(e
)
560 self
._log
.error("Error loading cloud init Yaml file with exception %s", str(e
))
561 return cloud_init_msg
563 self
._log
.debug("Current cloud init dict is {}".format(cloud_init_dict
))
565 for key_pair
in self
._vnfr
._vnfr
_msg
.cloud_config
.key_pair
:
566 if "ssh_authorized_keys" not in cloud_init_dict
:
567 cloud_init_dict
["ssh_authorized_keys"] = list()
568 cloud_init_dict
["ssh_authorized_keys"].append(key_pair
.key
)
571 for user_entry
in self
._vnfr
._vnfr
_msg
.cloud_config
.user
:
572 if "users" not in cloud_init_dict
:
573 cloud_init_dict
["users"] = list()
575 user
["name"] = user_entry
.name
576 user
["gecos"] = user_entry
.user_info
577 user
["sudo"] = "ALL=(ALL) NOPASSWD:ALL"
578 user
["ssh-authorized-keys"] = list()
579 for ssh_key
in user_entry
.key_pair
:
580 user
["ssh-authorized-keys"].append(ssh_key
.key
)
581 cloud_init_dict
["users"].append(user
)
583 cloud_msg
= yaml
.safe_dump(cloud_init_dict
,width
=1000,default_flow_style
=False)
584 cloud_init
= "#cloud-config\n"+cloud_msg
585 self
._log
.debug("Cloud init msg is {}".format(cloud_init
))
588 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
590 availability_zones
= []
592 for group
in self
._placement
_groups
:
593 if group
.has_field('host_aggregate'):
594 for aggregate
in group
.host_aggregate
:
595 host_aggregates
.append(aggregate
.as_dict())
596 if group
.has_field('availability_zone'):
597 availability_zones
.append(group
.availability_zone
.as_dict())
598 if group
.has_field('server_group'):
599 server_groups
.append(group
.server_group
.as_dict())
601 if availability_zones
:
602 if len(availability_zones
) > 1:
603 self
._log
.error("Can not launch VDU: %s in multiple availability zones. " +
604 "Requested Zones: %s", self
.name
, availability_zones
)
605 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability" +
606 " zones. Requsted Zones".format(self
.name
, availability_zones
))
608 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
611 if len(server_groups
) > 1:
612 self
._log
.error("Can not launch VDU: %s in multiple Server Group. " +
613 "Requested Groups: %s", self
.name
, server_groups
)
614 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple " +
615 "Server Groups. Requsted Groups".format(self
.name
, server_groups
))
617 vm_create_msg_dict
['server_group'] = server_groups
[0]
620 vm_create_msg_dict
['host_aggregate'] = host_aggregates
624 def process_placement_groups(self
, vm_create_msg_dict
):
625 """Process the placement_groups and fill resource-mgr request"""
626 if not self
._placement
_groups
:
629 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
630 assert len(cloud_set
) == 1
631 cloud_type
= cloud_set
.pop()
633 if cloud_type
== 'openstack':
634 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
637 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
640 def process_custom_bootdata(self
, vm_create_msg_dict
):
641 """Process the custom boot data"""
642 if 'config_file' not in vm_create_msg_dict
['supplemental_boot_data']:
645 self
._vnfd
_package
_store
.refresh()
646 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
647 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
648 for file_item
in vm_create_msg_dict
['supplemental_boot_data']['config_file']:
649 if 'source' not in file_item
or 'dest' not in file_item
:
651 source
= file_item
['source']
652 # Find source file in scripts dir of VNFD
653 self
._log
.debug("Checking for source config file at %s", source
)
656 source_file_str
= cloud_init_extractor
.read_script(stored_package
, source
)
657 file_item
['source'] = source_file_str
658 except rift
.package
.package
.PackageError
as e
:
659 self
._log
.info("Invalid package with Package descriptor id")
661 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
662 raise VirtualDeploymentUnitRecordError(e
)
663 # Update source file location with file contents
667 def resmgr_msg(self
, config
=None):
668 vdu_fields
= ["vm_flavor",
674 "supplemental_boot_data"]
676 def make_resmgr_cp_args(intf
, cp
, vlr
):
677 cp_info
= dict(name
= cp
.name
,
678 virtual_link_id
= vlr
.network_id
,
679 type_yang
= intf
.virtual_interface
.type_yang
)
681 if vlr
.network_id
is None:
682 raise VlrError("Unresolved virtual link id for vlr id:%s, name:%s",
685 if cp
.has_field('port_security_enabled'):
686 cp_info
["port_security_enabled"] = cp
.port_security_enabled
689 if intf
.static_ip_address
:
690 cp_info
["static_ip_address"] = intf
.static_ip_address
691 except AttributeError as e
:
692 ### This can happen because of model difference between OSM and RIFT. Ignore exception
693 self
._log
.debug(str(e
))
695 if (intf
.virtual_interface
.has_field('vpci') and
696 intf
.virtual_interface
.vpci
is not None):
697 cp_info
["vpci"] = intf
.virtual_interface
.vpci
699 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
700 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
702 if vlr
.has_field('virtual_connection_points'):
703 virtual_cps
= [ vcp
for vcp
in vlr
.virtual_connection_points
704 if [ True for cp_ref
in vcp
.associated_cps
if cp
.name
== cp_ref
]]
706 fields
= ['connection_point_id', 'name', 'ip_address', 'mac_address']
707 cp_info
['virtual_cps'] = [ {k
:v
for k
,v
in vcp
.as_dict().items() if k
in fields
}
708 for vcp
in virtual_cps
]
710 # Adding Port Sequence Information to cp_info
711 intf_dict
= intf
.as_dict()
712 if "position" in intf_dict
:
713 cp_info
["port_order"] = intf
.position
715 self
._log
.debug("CP info {}".format(cp_info
))
718 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
719 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
721 vm_create_msg_dict
= {
722 "name": self
.unique_short_name
, # Truncated name confirming to RFC 1123
723 "node_id": self
.name
, # Rift assigned Id
726 if self
.image_name
is not None:
727 vm_create_msg_dict
["image_name"] = self
.image_name
729 if self
.image_checksum
is not None:
730 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
732 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
733 if self
._vdud
.has_field('mgmt_vpci'):
734 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
736 self
._log
.debug("VDUD: %s", self
._vdud
)
737 if config
is not None:
738 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
740 if self
._mgmt
_network
:
741 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
744 for intf
, cp
, vlr
in self
._ext
_intf
:
745 cp_list
.append(make_resmgr_cp_args(intf
, cp
, vlr
))
747 for intf
, cp_id
, vlr
in self
._int
_intf
:
748 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
749 cp_list
.append(make_resmgr_cp_args(intf
, cp
, vlr
.msg()))
752 vm_create_msg_dict
["connection_points"] = cp_list
753 vm_create_msg_dict
.update(vdu_copy_dict
)
755 self
.process_placement_groups(vm_create_msg_dict
)
756 if 'supplemental_boot_data' in vm_create_msg_dict
:
757 self
.process_custom_bootdata(vm_create_msg_dict
)
759 msg
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData()
760 msg
.event_id
= self
._request
_id
761 msg
.cloud_account
= self
.datacenter_name
763 msg
.request_info
.from_dict(vm_create_msg_dict
)
765 for volume
in self
._vdud
.volumes
:
766 v
= msg
.request_info
.volumes
.add()
767 v
.from_dict(volume
.as_dict())
772 def terminate(self
, xact
):
773 """ Delete resource in VIM """
774 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
775 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
778 self
._state
= VDURecordState
.TERMINATING
779 if self
._vm
_resp
is not None:
781 with self
._dts
.transaction() as new_xact
:
782 yield from self
.delete_resource(new_xact
)
784 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
786 if self
._rm
_regh
is not None:
787 self
._log
.debug("Deregistering resource manager registration handle")
788 self
._rm
_regh
.deregister()
791 if self
._vdur
_console
_handler
is not None:
792 self
._log
.debug("Deregistering vnfr vdur console registration handle")
793 self
._vdur
_console
_handler
._regh
.deregister()
794 self
._vdur
_console
_handler
._regh
= None
796 self
._state
= VDURecordState
.TERMINATED
798 def find_internal_cp_by_cp_id(self
, cp_id
):
799 """ Find the CP corresponding to the connection point id"""
802 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
805 for int_cp
in self
._vdud
.internal_connection_point
:
806 self
._log
.debug("Checking for int cp %s in internal connection points",
808 if int_cp
.id == cp_id
:
813 self
._log
.debug("Failed to find cp %s in internal connection points",
815 msg
= "Failed to find cp %s in internal connection points" % cp_id
816 raise VduRecordError(msg
)
818 # return the VLR associated with the connection point
822 def create_resource(self
, xact
, vnfr
, config
=None):
823 """ Request resource from ResourceMgr """
824 def find_cp_by_name(cp_name
):
825 """ Find a connection point by name """
827 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
828 for ext_cp
in vnfr
._cprs
:
829 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
830 if ext_cp
.name
== cp_name
:
834 self
._log
.debug("Failed to find cp %s in external connection points",
838 def find_internal_vlr_by_cp_id(cp_id
):
839 self
._log
.debug("find_internal_vlr_by_cp_id(%s) called",
843 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
845 # return the VLR associated with the connection point
846 return vnfr
.find_vlr_by_cp(cp_id
)
849 def add_external_interface(interface
):
850 # Add an external interface from vdu interface list
851 cp
= find_cp_by_name(interface
.external_connection_point_ref
)
853 self
._log
.debug("Failed to find connection point - %s",
854 interface
.external_connection_point_ref
)
857 self
._log
.debug("Connection point name [%s], type[%s]",
858 cp
.name
, cp
.type_yang
)
860 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
862 etuple
= (interface
, cp
, vlr
)
863 self
._ext
_intf
.append(etuple
)
865 self
._log
.debug("Created external interface tuple : %s", etuple
)
868 def add_internal_interface(interface
):
869 # Add an internal interface from vdu interface list
870 cp_id
= interface
.internal_connection_point_ref
871 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
872 interface
.name
, cp_id
)
875 msg
= "The Internal Interface : %s is not mapped to an internal connection point." % (interface
.name
)
877 raise VduRecordError(msg
)
880 vlr
= find_internal_vlr_by_cp_id(cp_id
)
881 iter = yield from self
._dts
.query_read(vlr
.vlr_path())
883 vlr
._vlr
= (yield from itr
).result
884 except Exception as e
:
885 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
886 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
887 raise VduRecordError(msg
)
889 ituple
= (interface
, cp_id
, vlr
)
890 self
._int
_intf
.append(ituple
)
892 self
._log
.debug("Created internal interface tuple : %s", ituple
)
895 block
= xact
.block_create()
897 self
._log
.debug("Executing vm request id: %s, action: create",
900 # Resolve the networks associated with interfaces ( both internal and external)
902 for intf
in self
._vdud
.interface
:
903 if intf
.type_yang
== 'EXTERNAL':
904 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
905 intf
.name
, intf
.external_connection_point_ref
)
907 add_external_interface(intf
)
908 except Exception as e
:
909 msg
= "Failed to add external interface %s from vdu interface list, e = %s" % (intf
.name
, e
)
911 raise VduRecordError(msg
)
912 elif intf
.type_yang
== 'INTERNAL':
913 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
914 intf
.name
, intf
.internal_connection_point_ref
)
916 yield from add_internal_interface(intf
)
917 except Exception as e
:
918 msg
= "Failed to add internal interface %s from vdu interface list, e = %s" % (intf
.name
, e
)
920 raise VduRecordError(msg
)
924 resmgr_path
= self
.resmgr_path
925 resmgr_msg
= self
.resmgr_msg(config
)
927 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
928 block
.add_query_create(resmgr_path
, resmgr_msg
)
930 res_iter
= yield from block
.execute(now
=True)
938 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
939 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
940 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
941 return resp
.resource_info
944 def delete_resource(self
, xact
):
945 block
= xact
.block_create()
947 self
._log
.debug("Executing vm request id: %s, action: delete",
950 block
.add_query_delete(self
.resmgr_path
)
952 yield from block
.execute(flags
=0, now
=True)
955 def read_resource(self
, xact
):
956 block
= xact
.block_create()
958 self
._log
.debug("Executing vm request id: %s, action: delete",
961 block
.add_query_read(self
.resmgr_path
)
963 res_iter
= yield from block
.execute(flags
=0, now
=True)
968 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
969 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
970 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
971 #self._vm_resp = resp.resource_info
972 return resp
.resource_info
976 """ Is this VDU active """
977 return True if self
._state
is VDURecordState
.READY
else False
980 def instantiation_failed(self
, failed_reason
=None):
981 """ VDU instantiation failed """
982 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
983 self
._state
= VDURecordState
.FAILED
984 self
._state
_failed
_reason
= failed_reason
985 yield from self
._vnfr
.instantiation_failed(failed_reason
)
988 def vdu_is_active(self
):
989 """ This VDU is active"""
991 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
994 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
996 self
._state
= VDURecordState
.READY
998 if self
._vnfr
.all_vdus_active():
999 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
1000 yield from self
._vnfr
.is_ready()
1003 def instantiate(self
, xact
, vnfr
, config
=None):
1004 """ Instantiate this VDU """
1005 self
._state
= VDURecordState
.INSTANTIATING
1008 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
1009 """ This VDUR is active """
1010 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
1015 if (query_action
== rwdts
.QueryAction
.UPDATE
or
1016 query_action
== rwdts
.QueryAction
.CREATE
):
1019 if msg
.resource_state
== "active":
1020 # Move this VDU to ready state
1021 yield from self
.vdu_is_active()
1022 elif msg
.resource_state
== "failed":
1023 yield from self
.instantiation_failed(msg
.resource_errors
)
1024 elif query_action
== rwdts
.QueryAction
.DELETE
:
1025 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
1027 raise NotImplementedError(
1028 "%s action on VirtualDeployementUnitRecord not supported",
1031 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1034 #Check if resource orchestrator is not rift so that resource manager tasklet is not invoked
1035 if self
._nsr
_config
.resource_orchestrator
is not None:
1038 reg_event
= asyncio
.Event(loop
=self
._loop
)
1041 def on_ready(regh
, status
):
1044 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
1045 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
1046 flags
=rwdts
.Flag
.SUBSCRIBER
,
1048 yield from reg_event
.wait()
1050 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
1051 self
._vm
_resp
= vm_resp
1052 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
1054 self
._log
.debug("Requested VM from resource manager response %s",
1056 if vm_resp
.resource_state
== "active":
1057 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
1059 yield from self
.vdu_is_active()
1060 self
._state
= VDURecordState
.READY
1061 elif (vm_resp
.resource_state
== "pending" or
1062 vm_resp
.resource_state
== "inactive"):
1063 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
1065 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1066 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
1067 # flags=rwdts.Flag.SUBSCRIBER,
1070 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
1072 raise VirtualDeploymentUnitRecordError(
1073 "Failed VDUR instantiation %s " % vm_resp
)
1075 except Exception as e
:
1077 traceback
.print_exc()
1078 self
._log
.exception(e
)
1079 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
1080 self
._state
= VDURecordState
.FAILED
1081 yield from self
.instantiation_failed(str(e
))
1084 class VlRecordState(enum
.Enum
):
1085 """ VL Record State """
1087 INSTANTIATION_PENDING
= 102
1089 TERMINATE_PENDING
= 104
1094 class InternalVirtualLinkRecord(object):
1095 """ Internal Virtual Link record """
1096 def __init__(self
, dts
, log
, loop
, project
, vnfm
,
1097 ivld_msg
, vnfr_name
, datacenter_name
, ip_profile
=None):
1101 self
._project
= project
1103 self
._ivld
_msg
= ivld_msg
1104 self
._vnfr
_name
= vnfr_name
1105 self
._datacenter
_name
= datacenter_name
1106 self
._ip
_profile
= ip_profile
1108 self
._vlr
_req
= self
.create_vlr()
1110 self
._network
_id
= None
1111 self
._state
= VlRecordState
.INIT
1112 self
._state
_details
= ""
1116 """ Find VLR by id """
1117 return self
._vlr
_req
.id
1121 """ Name of this VL """
1122 if self
._ivld
_msg
.vim_network_name
:
1123 return self
._ivld
_msg
.vim_network_name
1125 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
1128 def network_id(self
):
1129 """ Find VLR by id """
1130 return self
._network
_id
1133 def network_id(self
, network_id
):
1134 """ network id setter"""
1135 self
._network
_id
= network_id
1140 return self
._state
== VlRecordState
.ACTIVE
1144 """ state for this VLR """
1148 def state_details(self
):
1149 """ state details for this VLR """
1150 return self
._state
_details
1153 """ VLR path for this VLR instance"""
1154 return self
._project
.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]".
1155 format(quoted_key(self
.vlr_id
)))
1157 def create_vlr(self
):
1158 """ Create the VLR record which will be instantiated """
1160 vld_fields
= ["short_name",
1168 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
1170 vlr_dict
= {"id": str(uuid
.uuid4()),
1172 "datacenter": self
._datacenter
_name
,
1175 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
1176 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
1178 vlr_dict
.update(vld_copy_dict
)
1180 vlr
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr
.from_dict(vlr_dict
)
1182 if self
._ivld
_msg
.has_field('virtual_connection_points'):
1183 for cp
in self
._ivld
_msg
.virtual_connection_points
:
1184 vcp
= vlr
.virtual_connection_points
.add()
1185 vcp
.from_dict(cp
.as_dict())
1190 def instantiate(self
, xact
, restart_mode
=False):
1191 """ Instantiate VL """
1194 def instantiate_vlr():
1195 """ Instantiate VLR"""
1196 self
._log
.debug("Create VL with xpath %s and vlr %s",
1197 self
.vlr_path(), self
._vlr
_req
)
1200 with self
._dts
.transaction(flags
=0) as xact
:
1201 block
= xact
.block_create()
1202 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1203 self
._log
.debug("Executing VL create path:%s msg:%s",
1204 self
.vlr_path(), self
._vlr
_req
)
1206 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1207 self
._state
_details
= "Oustanding VL create request:%s".format(self
.vlr_path())
1210 res_iter
= yield from block
.execute()
1211 except Exception as e
:
1212 self
._state
= VlRecordState
.FAILED
1213 self
._state
_details
= str(e
)
1214 self
._log
.exception("Caught exception while instantial VL")
1217 for ent
in res_iter
:
1218 res
= yield from ent
1219 self
._vlr
= res
.result
1221 if self
._vlr
.operational_status
== 'failed':
1222 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1223 self
._state
= VlRecordState
.FAILED
1224 self
._state
_details
= self
._vlr
.operational_status_details
1225 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1227 except Exception as e
:
1228 self
._log
.error("Caught exception while instantiating VL:%s:%s, e:%s",
1229 self
.vlr_id
, self
._vlr
.name
, e
)
1230 self
._state
_details
= str(e
)
1233 self
._log
.info("Created VL with xpath %s and vlr %s",
1234 self
.vlr_path(), self
._vlr
)
1238 """ Get the network id """
1239 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1241 for ent
in res_iter
:
1242 res
= yield from ent
1246 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1248 raise InternalVirtualLinkRecordError(err
)
1251 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1254 vl
= yield from get_vlr()
1256 yield from instantiate_vlr()
1258 yield from instantiate_vlr()
1261 def vlr_in_vns(self
):
1262 """ Is there a VLR record in VNS """
1263 if (self
._state
== VlRecordState
.ACTIVE
or
1264 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1265 self
._state
== VlRecordState
.FAILED
):
1271 def terminate(self
, xact
):
1272 """Terminate this VL """
1273 if not self
.vlr_in_vns():
1274 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1275 self
.vlr_id
, self
._state
)
1278 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1279 self
._state
= VlRecordState
.TERMINATE_PENDING
1280 self
._state
_details
= "VL Terminate pending"
1281 block
= xact
.block_create()
1282 block
.add_query_delete(self
.vlr_path())
1283 yield from block
.execute(flags
=0, now
=True)
1284 self
._state
= VlRecordState
.TERMINATED
1285 self
._state
_details
= "VL Terminated"
1286 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1288 def set_state_from_op_status(self
, operational_status
, operational_status_details
):
1289 """ Set the state of this VL based on operational_status"""
1291 self
._state
_details
= operational_status_details
1293 if operational_status
== 'running':
1294 self
._log
.info("VL %s moved to active state", self
.vlr_id
)
1295 self
._state
= VlRecordState
.ACTIVE
1296 elif operational_status
== 'failed':
1297 self
._log
.info("VL %s moved to failed state", self
.vlr_id
)
1298 self
._state
= VlRecordState
.FAILED
1299 elif operational_status
== 'vl_alloc_pending':
1300 self
._log
.debug("VL %s is in alloc pending state", self
.vlr_id
)
1301 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1303 raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status
))
1306 """ Get a proto corresponding to this VLR """
1311 class VirtualNetworkFunctionRecord(object):
1312 """ Virtual Network Function Record """
1313 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vnfr_msg
,
1314 mgmt_network
=None, external_ro
=False):
1317 self
._loop
= loop
###
1318 self
._project
= vnfm
._project
1319 self
._cluster
_name
= cluster_name
1320 self
._vnfr
_msg
= vnfr_msg
1321 self
._vnfr
_id
= vnfr_msg
.id
1322 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1324 self
._vnfr
= vnfr_msg
1325 self
._mgmt
_network
= mgmt_network
1327 self
._vnfd
= vnfr_msg
.vnfd
1328 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1329 self
._state
_failed
_reason
= None
1330 self
._ext
_vlrs
= {} # The list of external virtual links
1331 self
._vlrs
= {} # The list of internal virtual links
1332 self
._vdus
= [] # The list of vdu
1333 self
._vlr
_by
_cp
= {}
1335 self
._inventory
= {}
1336 self
._create
_time
= int(time
.time())
1337 self
._vnf
_mon
= None
1338 self
._config
_status
= vnfr_msg
.config_status
1339 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
, project
=self
._project
.name
)
1340 self
._rw
_vnfd
= None
1341 self
._vnfd
_ref
_count
= 0
1343 self
._ssh
_pub
_key
= None
1344 self
._ssh
_key
_file
= None
1346 # Create an asyncio loop to know when the virtual links are ready
1347 self
._vls
_ready
= asyncio
.Event(loop
=self
._loop
)
1349 # Counter for pre-init VNFR State Update DTS Query
1351 self
._external
_ro
= external_ro
1353 def _get_vdur_from_vdu_id(self
, vdu_id
):
1354 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1355 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1356 for vdu
in self
._vdus
:
1357 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1358 if vdu
.vdu_id
== vdu_id
:
1361 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1364 def operational_status(self
):
1365 """ Operational status of this VNFR """
1366 op_status_map
= {"PRE_INIT": "pre_init",
1368 "VL_INIT_PHASE": "vl_init_phase",
1369 "VM_INIT_PHASE": "vm_init_phase",
1371 "TERMINATE": "terminate",
1372 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1373 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1374 "TERMINATED": "terminated",
1375 "FAILED": "failed", }
1376 return op_status_map
[self
._state
.name
]
1379 def vnfd_xpath(vnfd_id
):
1380 """ VNFD xpath associated with this VNFR """
1381 return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd[project-vnfd:id={}]".
1382 format(quoted_key(vnfd_id
)))
1385 def external_ro(self
):
1386 return self
._external
_ro
1393 def task(self
, task
):
1397 def vnfd_ref_count(self
):
1398 """ Returns the VNFD reference count associated with this VNFR """
1399 return self
._vnfd
_ref
_count
1401 def vnfd_in_use(self
):
1402 """ Returns whether vnfd is in use or not """
1403 return True if self
._vnfd
_ref
_count
> 0 else False
1406 """ Take a reference on this object """
1407 self
._vnfd
_ref
_count
+= 1
1408 return self
._vnfd
_ref
_count
1410 def vnfd_unref(self
):
1411 """ Release reference on this object """
1412 if self
._vnfd
_ref
_count
< 1:
1413 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1414 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1415 self
._log
.critical(msg
)
1416 raise VnfRecordError(msg
)
1417 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1418 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1419 self
._vnfd
_ref
_count
-= 1
1420 return self
._vnfd
_ref
_count
1424 """ VNFD for this VNFR """
1429 """ VNFD name associated with this VNFR """
1430 return self
.vnfd
.name
1434 """ Name of this VNF in the record """
1435 return self
._vnfr
.name
1438 def datacenter_name(self
):
1439 """ Name of the cloud account this VNFR is instantiated in """
1440 return self
._vnfr
.datacenter
1444 """ VNFD Id associated with this VNFR """
1449 """ VNFR Id associated with this VNFR """
1450 return self
._vnfr
_id
1453 def member_vnf_index(self
):
1454 """ Member VNF index associated with this VNFR """
1455 return self
._vnfr
.member_vnf_index_ref
1458 def config_status(self
):
1459 """ Config agent status for this VNFR """
1460 return self
._config
_status
1463 def public_key(self
):
1464 return self
._ssh
_pub
_key
1467 def get_nsr_config(self
):
1468 ### Need access to NS instance configuration for runtime resolution.
1469 ### This shall be replaced when deployment flavors are implemented
1470 xpath
= self
._project
.add_project("C,/nsr:ns-instance-config")
1471 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1473 for result
in results
:
1474 entry
= yield from result
1475 ns_instance_config
= entry
.result
1476 for nsr
in ns_instance_config
.nsr
:
1477 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1482 def get_nsr_opdata(self
):
1483 """ NSR opdata associated with this VNFR """
1484 xpath
= self
._project
.add_project(
1485 "D,/nsr:ns-instance-opdata/nsr:nsr" \
1486 "[nsr:ns-instance-config-ref={}]". \
1487 format(quoted_key(self
._vnfr
_msg
.nsr_id_ref
)))
1489 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1491 for result
in results
:
1492 entry
= yield from result
1493 nsr_op
= entry
.result
1499 def cp_ip_addr(self
, cp_name
):
1500 """ Get ip address for connection point """
1501 self
._log
.debug("cp_ip_addr()")
1502 for cp
in self
._cprs
:
1503 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1504 return cp
.ip_address
1507 def mgmt_intf_info(self
):
1508 """ Get Management interface info for this VNFR """
1509 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1511 if mgmt_intf_desc
.has_field("cp"):
1512 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1513 elif mgmt_intf_desc
.has_field("vdu_id"):
1515 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1516 ip_addr
= vdur
.management_ip
1517 except VDURecordNotFound
:
1518 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1521 ip_addr
= mgmt_intf_desc
.ip_address
1522 port
= mgmt_intf_desc
.port
1524 return ip_addr
, port
1528 """ Message associated with this VNFR """
1529 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1530 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1532 mgmt_intf
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_MgmtInterface()
1533 ip_address
, port
= self
.mgmt_intf_info()
1536 mgmt_intf
.ip_address
= ip_address
1537 if port
is not None:
1538 mgmt_intf
.port
= port
1540 if self
._ssh
_pub
_key
:
1541 mgmt_intf
.ssh_key
.public_key
= self
._ssh
_pub
_key
1542 mgmt_intf
.ssh_key
.private_key_file
= self
._ssh
_key
_file
1544 vnfr_dict
= {"id": self
._vnfr
_id
,
1545 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1547 "member_vnf_index_ref": self
.member_vnf_index
,
1548 "operational_status": self
.operational_status
,
1549 "operational_status_details": self
._state
_failed
_reason
,
1550 "datacenter": self
.datacenter_name
,
1551 "config_status": self
._config
_status
1554 vnfr_dict
.update(vnfd_copy_dict
)
1556 vnfr_msg
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1557 vnfr_msg
.vnfd
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1559 vnfr_msg
.create_time
= self
._create
_time
1560 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1561 vnfr_msg
.mgmt_interface
= mgmt_intf
1563 # Add all the VLRs to VNFR
1564 for vlr_id
, vlr
in self
._vlrs
.items():
1565 ivlr
= vnfr_msg
.internal_vlr
.add()
1566 ivlr
.vlr_ref
= vlr
.vlr_id
1568 # Add all the VDUs to VDUR
1569 if self
._vdus
is not None:
1570 for vdu
in self
._vdus
:
1571 vdur
= vnfr_msg
.vdur
.add()
1572 vdur
.from_dict(vdu
.msg
.as_dict())
1574 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1575 vnfr_msg
.dashboard_url
= self
.dashboard_url
1577 for cpr
in self
._cprs
:
1578 new_cp
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1579 vnfr_msg
.connection_point
.append(new_cp
)
1581 if self
._vnf
_mon
is not None:
1582 for monp
in self
._vnf
_mon
.msg
:
1583 vnfr_msg
.monitoring_param
.append(
1584 VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1586 if self
._vnfr
.vnf_configuration
is not None:
1587 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1589 for group
in self
._vnfr
_msg
.placement_groups_info
:
1590 group_info
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1591 group_info
.from_dict(group
.as_dict())
1592 vnfr_msg
.placement_groups_info
.append(group_info
)
1597 def update_config(self
, msg
, xact
):
1598 self
._log
.debug("VNFM vnf config: {}".
1599 format(msg
.vnf_configuration
.as_dict()))
1600 self
._config
_status
= msg
.config_status
1601 self
._vnfr
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.from_dict(
1603 self
._log
.debug("VNFR msg config: {}".
1604 format(self
._vnfr
.as_dict()))
1606 yield from self
.publish(xact
)
1609 def update_vnfr_after_substitution(self
, msg
, xact
):
1610 self
._log
.debug("Updating VNFR after Input Param Substitution: {}".
1611 format(msg
.as_dict()))
1612 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1613 self
._vnfd
= msg
.vnfd
1614 msg
.operational_status
= 'init'
1615 self
._vnfr
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.from_dict(
1618 self
._log
.debug("VNFR updated: {}".
1619 format(self
._vnfr
.as_dict()))
1620 yield from self
.publish(xact
)
1623 def dashboard_url(self
):
1624 ip
, cfg_port
= self
.mgmt_intf_info()
1627 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1628 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1631 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1632 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1634 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1638 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1645 """ path for this VNFR """
1646 return self
._project
.add_project("D,/vnfr:vnfr-catalog"
1647 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self
.vnfr_id
)))
1650 def publish(self
, xact
):
1651 """ publish this VNFR """
1653 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1654 self
.xpath
, self
.msg
)
1655 vnfr
.create_time
= self
._create
_time
1656 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1657 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1658 self
.xpath
, self
.msg
)
1660 def resolve_vld_ip_profile(self
, vnfd_msg
, vld
):
1661 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1662 if not vld
.has_field('ip_profile_ref'):
1664 profile
= [profile
for profile
in vnfd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1665 return profile
[0] if profile
else None
1668 def create_vls(self
):
1669 """ Publish The VLs associated with this VNF """
1670 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1672 for ivld_msg
in self
.vnfd
.internal_vld
:
1673 self
._log
.debug("Creating internal vld:"
1674 " %s, int_cp_ref = %s",
1675 ivld_msg
, ivld_msg
.internal_connection_point
1677 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1680 project
=self
._project
,
1683 vnfr_name
=self
.name
,
1684 datacenter_name
=self
.datacenter_name
,
1685 ip_profile
=self
.resolve_vld_ip_profile(self
.vnfd
, ivld_msg
)
1687 self
._vlrs
[vlr
.vlr_id
] = vlr
1688 self
._vnfm
.add_vlr_id_vnfr_map(vlr
.vlr_id
, self
)
1690 for int_cp
in ivld_msg
.internal_connection_point
:
1691 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1692 msg
= ("Connection point %s already "
1693 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1694 raise InternalVirtualLinkRecordError(msg
)
1695 self
._log
.debug("Setting vlr %s to internal cp = %s",
1697 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1700 def instantiate_vls(self
, xact
, restart_mode
=False):
1701 """ Instantiate the VLs associated with this VNF """
1702 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1705 for vlr_id
, vlr
in self
._vlrs
.items():
1706 self
._log
.debug("Instantiating VLR %s", vlr
)
1707 yield from vlr
.instantiate(xact
, restart_mode
)
1709 # Wait for the VLs to be ready before yielding control out
1711 self
._log
.debug("VNFR id:%s, name:%s - Waiting for %d VLs to be ready",
1712 self
.vnfr_id
, self
.name
, len(self
._vlrs
))
1713 yield from self
._vls
_ready
.wait()
1715 self
._log
.debug("VNFR id:%s, name:%s, No virtual links found",
1716 self
.vnfr_id
, self
.name
)
1717 self
._vls
_ready
.set()
1719 def find_vlr_by_cp(self
, cp_name
):
1720 """ Find the VLR associated with the cp name """
1721 return self
._vlr
_by
_cp
[cp_name
]
1723 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1725 Returns the cloud specific construct for placement group
1727 input_group: VNFD PlacementGroup
1728 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1730 copy_dict
= ['name', 'requirement', 'strategy']
1731 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1732 if group_info
.placement_group_ref
== input_group
.name
and \
1733 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1734 group
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1735 group_dict
= {k
:v
for k
,v
in
1736 group_info
.as_dict().items()
1737 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1738 for param
in copy_dict
:
1739 group_dict
.update({param
: getattr(input_group
, param
)})
1740 group
.from_dict(group_dict
)
1745 def get_vdu_placement_groups(self
, vdu
, nsr_config
):
1746 placement_groups
= []
1747 ### Step-1: Get VNF level placement groups
1748 for group
in self
._vnfr
_msg
.placement_groups_info
:
1749 #group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1750 #group_info.from_dict(group.as_dict())
1751 placement_groups
.append(group
)
1753 ### Step-2: Get VDU level placement groups
1754 for group
in self
.vnfd
.placement_groups
:
1755 for member_vdu
in group
.member_vdus
:
1756 if member_vdu
.member_vdu_ref
== vdu
.id:
1757 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1759 if group_info
is None:
1760 self
._log
.info("Could not resolve cloud-construct for " +
1761 "placement group: %s", group
.name
)
1763 self
._log
.info("Successfully resolved cloud construct for " +
1764 "placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1768 self
.member_vnf_index
)
1769 placement_groups
.append(group_info
)
1771 return placement_groups
1774 def substitute_vdu_input_parameters(self
, vdu
):
1776 for vdu_vnfr
in self
.vnfd
.vdu
:
1777 if vdu
["id"] == vdu_vnfr
.id:
1778 result
= vdu_vnfr
.as_dict()
1781 return RwVnfdYang
.YangData_Vnfd_VnfdCatalog_Vnfd_Vdu
.from_dict(result
)
1785 def vdu_cloud_init_instantiation(self
):
1786 [vdu
.vdud_cloud_init
for vdu
in self
._vdus
]
1789 def create_vdus(self
, vnfr
, restart_mode
=False):
1790 """ Create the VDUs associated with this VNF """
1792 def get_vdur_id(vdud
):
1793 """Get the corresponding VDUR's id for the VDUD. This is useful in
1796 In restart mode we check for exiting VDUR's ID and use them, if
1797 available. This way we don't end up creating duplicate VDURs
1801 if restart_mode
and vdud
is not None:
1803 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1806 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1811 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1813 # Get NSR config - Needed for placement groups and to derive VDU short-name
1814 nsr_config
= yield from self
.get_nsr_config()
1816 for vdu
in self
._rw
_vnfd
.vdu
:
1817 self
._log
.debug("Creating vdu: %s", vdu
)
1818 vdur_id
= get_vdur_id(vdu
)
1821 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
, nsr_config
)
1822 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s, Existing vdur_id %s",
1825 self
.member_vnf_index
,
1826 [ group
.name
for group
in placement_groups
],
1829 # Update VDU Info from VNFR (It contains the input parameter for VDUs as well)
1830 vdu_updated
= yield from self
.substitute_vdu_input_parameters(vdu
.as_dict())
1832 vdur
= VirtualDeploymentUnitRecord(
1836 project
= self
._project
,
1839 nsr_config
=nsr_config
,
1840 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1841 mgmt_network
=self
._mgmt
_network
,
1842 datacenter_name
=self
.datacenter_name
,
1843 vnfd_package_store
=self
._vnfd
_package
_store
,
1845 placement_groups
= placement_groups
,
1847 yield from vdur
.vdu_opdata_register()
1849 self
._vdus
.append(vdur
)
1852 def instantiate_vdus(self
, xact
, vnfr
):
1853 """ Instantiate the VDUs associated with this VNF """
1854 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1856 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1858 # Identify any dependencies among the VDUs
1859 dependencies
= collections
.defaultdict(list)
1860 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1862 for vdu
in self
._vdus
:
1863 if vdu
._vdud
_cloud
_init
is not None:
1864 for vdu_id
in vdu_id_pattern
.findall(vdu
._vdud
_cloud
_init
):
1865 if vdu_id
!= vdu
.vdu_id
:
1866 # This means that vdu.vdu_id depends upon vdu_id,
1867 # i.e. vdu_id must be instantiated before
1869 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1871 # Define the terminal states of VDU instantiation
1873 VDURecordState
.READY
,
1874 VDURecordState
.TERMINATED
,
1875 VDURecordState
.FAILED
,
1878 datastore
= VdurDatastore()
1882 def instantiate_monitor(vdu
):
1883 """Monitor the state of the VDU during instantiation
1886 vdu - a VirtualDeploymentUnitRecord
1889 # wait for the VDUR to enter a terminal state
1890 while vdu
._state
not in terminal
:
1891 yield from asyncio
.sleep(1, loop
=self
._loop
)
1892 # update the datastore
1893 datastore
.update(vdu
)
1895 # add the VDU to the set of processed VDUs
1896 processed
.add(vdu
.vdu_id
)
1899 def instantiate(vdu
):
1900 """Instantiate the specified VDU
1903 vdu - a VirtualDeploymentUnitRecord
1906 if the VDU, or any of the VDUs this VDU depends upon, are
1907 terminated or fail to instantiate properly, a
1908 VirtualDeploymentUnitRecordError is raised.
1912 for dependency
in dependencies
[vdu
.vdu_id
]:
1913 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1915 while dependency
.vdu_id
not in processed
:
1916 yield from asyncio
.sleep(1, loop
=self
._loop
)
1918 if not dependency
.active
:
1919 raise VirtualDeploymentUnitRecordError()
1921 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1923 # Populate the datastore with the current values of the VDU
1926 # Substitute any variables contained in the cloud config script
1927 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1929 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1933 # Extract the variable names
1935 for variable
in parts
[1::2]:
1936 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1938 # Iterate of the variables and substitute values from the
1941 for variable
in variables
:
1943 # Handle a reference to a VDU by ID
1944 if variable
.startswith('vdu['):
1945 value
= datastore
.get(variable
)
1947 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1948 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1950 config
= config
.replace("{{ %s }}" % variable
, value
)
1953 # Handle a reference to the current VDU
1954 if variable
.startswith('vdu'):
1955 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1956 config
= config
.replace("{{ %s }}" % variable
, value
)
1959 # Handle a reference to Cloud Init Variables: Start with 'CI'
1960 if variable
.startswith('CI'):
1961 custom_meta_data
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + ".custom_meta_data")
1963 for meta_data
in custom_meta_data
:
1964 if meta_data
.destination
== 'CLOUD_INIT':
1965 if meta_data
.name
== variable
:
1966 config
= config
.replace("{{ %s }}" % variable
, meta_data
.value
)
1968 raise ValueError("Unrecognized Cloud Init Variable")
1972 # Handle unrecognized variables
1973 msg
= 'unrecognized cloud-config variable: {}'
1974 raise ValueError(msg
.format(variable
))
1976 # Instantiate the VDU
1977 with self
._dts
.transaction() as xact
:
1978 self
._log
.debug("Instantiating vdu: %s", vdu
)
1979 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1980 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1981 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1984 # First create a set of tasks to monitor the state of the VDUs and
1985 # report when they have entered a terminal state
1986 for vdu
in self
._vdus
:
1987 self
._loop
.create_task(instantiate_monitor(vdu
))
1989 for vdu
in self
._vdus
:
1990 self
._loop
.create_task(instantiate(vdu
))
1992 def has_mgmt_interface(self
, vdu
):
1993 # ## TODO: Support additional mgmt_interface type options
1994 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1998 def vlr_xpath(self
, vlr_id
):
2000 return self
._project
.add_project("D,/vlr:vlr-catalog/"
2001 "vlr:vlr[vlr:id={}]".format(quoted_key(vlr_id
)))
2003 def ext_vlr_by_id(self
, vlr_id
):
2004 """ find ext vlr by id """
2005 return self
._ext
_vlrs
[vlr_id
]
2007 def all_vdus_active(self
):
2008 """ Are all VDUS in this VNFR active? """
2009 for vdu
in self
._vdus
:
2013 self
._log
.debug("Inside all_vdus_active. Returning True")
2017 def instantiation_failed(self
, failed_reason
=None):
2018 """ VNFR instantiation failed """
2019 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
2020 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2021 self
._state
_failed
_reason
= failed_reason
2023 # Update the VNFR with the changed status
2024 yield from self
.publish(None)
2028 """ This VNF is ready"""
2029 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
2031 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
2032 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
2035 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
2037 # Update the VNFR with the changed status
2038 yield from self
.publish(None)
2040 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
, virtual_cps
= list()):
2041 """Updated the connection point with ip address"""
2042 for cp
in self
._cprs
:
2043 if cp
.name
== cp_name
:
2044 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
2045 cp_name
, cp
, ip_address
, cp_id
)
2046 cp
.ip_address
= ip_address
2047 cp
.mac_address
= mac_addr
2048 cp
.connection_point_id
= cp_id
2050 cp
.virtual_cps
= [VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint_VirtualCps
.from_dict(v
) for v
in virtual_cps
]
2053 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
2054 self
._log
.debug(err
)
2055 raise VirtualDeploymentUnitRecordError(err
)
2057 def set_state(self
, state
):
2058 """ Set state for this VNFR"""
2062 def instantiate(self
, xact
, restart_mode
=False):
2063 """ instantiate this VNF """
2064 self
._log
.info("Instantiate VNF {}: {}".format(self
._vnfr
_id
, self
._state
))
2065 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
2066 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
2068 nsr_op
= yield from self
.get_nsr_opdata()
2070 self
._ssh
_key
_file
= nsr_op
.ssh_key_generated
.private_key_file
2071 self
._ssh
_pub
_key
= nsr_op
.ssh_key_generated
.public_key
2076 # Iterate over all the connection points in VNFR and fetch the
2079 def cpr_from_cp(cp
):
2080 """ Creates a record level connection point from the desciptor cp"""
2081 cp_fields
= ["name", "image", "vm-flavor", "port_security_enabled", "type_yang"]
2082 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
2084 cpr_dict
.update(cp_copy_dict
)
2085 return VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
2087 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
2088 self
._vnfr
_id
, self
._vnfr
.connection_point
)
2090 for cp
in self
._vnfr
.connection_point
:
2091 cpr
= cpr_from_cp(cp
)
2092 self
._cprs
.append(cpr
)
2093 self
._log
.debug("Adding Connection point record %s ", cp
)
2095 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
2096 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
2097 res_iter
= yield from self
._dts
.query_read(vlr_path
,
2098 rwdts
.XactFlag
.MERGE
)
2102 self
._ext
_vlrs
[cp
.vlr_ref
] = d
2103 cpr
.vlr_ref
= cp
.vlr_ref
2104 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
2106 # Increase the VNFD reference count
2111 # Fetch External VLRs
2112 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
2113 yield from fetch_vlrs()
2116 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
2117 yield from self
.create_vls()
2120 self
._log
.debug("Publish VNFR {}: {}".format(self
._vnfr
_id
, self
._state
))
2121 yield from self
.publish(xact
)
2125 self
._log
.debug("VNFR-ID %s: Instantiate VLs, restart mode %s", self
._vnfr
_id
, restart_mode
)
2127 yield from self
.instantiate_vls(xact
, restart_mode
)
2128 except Exception as e
:
2129 self
._log
.exception("VL instantiation failed (%s)", str(e
))
2130 yield from self
.instantiation_failed(str(e
))
2133 vl_state
, failed_vl
= self
.vl_instantiation_state()
2134 if vl_state
== VlRecordState
.FAILED
:
2135 self
._log
.error("VL Instantiation failed for one or more of the internal virtual links, vl:%s",failed_vl
)
2136 yield from self
.instantiation_failed(failed_vl
.state_details
)
2139 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
2142 self
._log
.debug("VNFR-ID %s: Create VDUs, restart mode %s", self
._vnfr
_id
, restart_mode
)
2143 yield from self
.create_vdus(self
, restart_mode
)
2146 yield from self
.vdu_cloud_init_instantiation()
2147 except Exception as e
:
2148 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2149 self
._state
_failed
_reason
= str(e
)
2150 yield from self
.publish(xact
)
2153 self
._log
.debug("VNFR {}: Publish VNFR with state {}".
2154 format(self
._vnfr
_id
, self
._state
))
2155 yield from self
.publish(xact
)
2158 # ToDo: Check if this should be prevented during restart
2159 self
._log
.debug("Instantiate VDUs {}: {}".format(self
._vnfr
_id
, self
._state
))
2160 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
2163 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
2164 yield from self
.publish(xact
)
2166 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
2169 def terminate(self
, xact
):
2170 """ Terminate this virtual network function """
2173 self
._log
.debug("Canceling scheduled tasks for VNFR %s", self
._vnfr
_id
)
2176 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
2178 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
2181 if self
._vnf
_mon
is not None:
2182 self
._vnf
_mon
.stop()
2183 self
._vnf
_mon
.deregister()
2184 self
._vnf
_mon
= None
2187 def terminate_vls():
2188 """ Terminate VLs in this VNF """
2189 for vlr_id
, vl
in self
._vlrs
.items():
2190 self
._vnfm
.remove_vlr_id_vnfr_map(vlr_id
)
2191 yield from vl
.terminate(xact
)
2194 def terminate_vdus():
2195 """ Terminate VDUS in this VNF """
2196 for vdu
in self
._vdus
:
2197 yield from vdu
.terminate(xact
)
2199 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
2200 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
2201 yield from terminate_vls()
2203 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
2204 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
2205 yield from terminate_vdus()
2207 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
2208 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
2213 def vl_instantiation_state(self
):
2214 """ Get the state of VL instantiation of this VNF """
2216 for vl_id
, vlr
in self
._vlrs
.items():
2217 if vlr
.state
== VlRecordState
.ACTIVE
:
2219 elif vlr
.state
== VlRecordState
.FAILED
:
2221 return VlRecordState
.FAILED
, failed_vl
2222 elif vlr
.state
== VlRecordState
.INSTANTIATION_PENDING
:
2223 failed_vl
= vlr
, failed_vl
2224 return VlRecordState
.INSTANTIATION_PENDING
, failed_vl
2226 self
._log
.debug("vlr %s still in state %s", vlr
, vlr
.state
)
2227 raise VlRecordError("Invalid state %s", vlr
.state
)
2228 return VlRecordState
.ACTIVE
, failed_vl
2230 def vl_instantiation_successful(self
):
2231 """ Mark that all VLs in this VNF are active """
2232 if self
._vls
_ready
.is_set():
2233 self
._log
.debug("VNFR id %s, vls_ready is already set", self
.id)
2235 vl_state
, failed_vl
= self
.vl_instantiation_state()
2237 if vl_state
== VlRecordState
.ACTIVE
:
2238 self
._log
.info("VNFR id:%s name:%s has all Virtual Links in active state, Ready to orchestrate VDUs",
2239 self
.vnfr_id
, self
.name
)
2240 self
._vls
_ready
.set()
2242 elif vl_state
== VlRecordState
.FAILED
:
2243 self
._log
.error("VNFR id:%s name:%s One of the Virtual Links failed to reach active state.Failed to orchestrate VNF",
2244 self
.vnfr_id
, self
.name
)
2245 self
.instantiation_failed("VNFR id %s: failed since VL %s did not come up".format(self
.vnfr_id
, failed_vl
.name
))
2246 self
._vls
_ready
.set()
2248 def find_vlr(self
, vlr_id
):
2249 """ Find VLR matching the passed VLR id """
2251 if vlr_id
in self
._vlrs
:
2252 return self
._vlrs
[vlr_id
]
2255 def vlr_event(self
, vlr
, action
):
2256 self
._log
.debug("Received VLR %s with action:%s", vlr
, action
)
2258 vlr_local
= self
.find_vlr(vlr
.id)
2259 if vlr_local
is None:
2260 self
._log
.error("VLR %s:%s received for unknown id, state:%s ignoring event",
2261 vlr
.id, vlr
.name
, vlr
.state
)
2264 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
2265 if vlr
.operational_status
== 'running':
2266 vlr_local
.set_state_from_op_status(vlr
.operational_status
, vlr
.operational_status_details
)
2267 self
._log
.info("VLR %s:%s moving to active state",
2269 elif vlr
.operational_status
== 'failed':
2270 vlr_local
.set_state_from_op_status(vlr
.operational_status
, vlr
.operational_status_details
)
2271 self
._log
.info("VLR %s:%s moving to failed state",
2274 self
._log
.warning("VLR %s:%s received state:%s",
2275 vlr
.id, vlr
.name
, vlr
.operational_status
)
2277 if vlr
.has_field('network_id'):
2278 vlr_local
.network_id
= vlr
.network_id
2280 # Check if vl instantiation successful for this VNFR
2281 self
.vl_instantiation_successful()
2284 class VnfdDtsHandler(object):
2285 """ DTS handler for VNFD config changes """
2286 XPATH
= "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
2288 def __init__(self
, dts
, log
, loop
, vnfm
):
2298 """ DTS registration handle """
2301 def deregister(self
):
2302 '''De-register from DTS'''
2303 self
._log
.debug("De-register VNFD DTS handler for project {}".
2304 format(self
._vnfm
._project
.name
))
2306 self
._regh
.deregister()
2311 """ Register for VNFD configuration"""
2314 def on_apply(dts
, acg
, xact
, action
, scratch
):
2315 """Apply the configuration"""
2316 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2317 xact
, action
, scratch
)
2319 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
2320 # Create/Update a VNFD record
2322 for cfg
in self
._regh
.get_xact_elements(xact
):
2323 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2324 if cfg
.id in scratch
.get('vnfds', []) or is_recovery
:
2325 self
._vnfm
.update_vnfd(cfg
)
2327 self
._log
.warning("Reg handle none for {} in project {}".
2328 format(self
.__class
__, self
._vnfm
._project
))
2330 scratch
.pop('vnfds', None)
2333 #yield from self._vnfm.vnfr_handler.register()
2334 #yield from self._vnfm.vnfr_ref_handler.register()
2338 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2339 """ on prepare callback """
2340 xpath
= ks_path
.to_xpath(RwVnfmYang
.get_schema())
2341 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2343 xact_info
.query_action
, msg
)
2344 fref
= ProtobufC
.FieldReference
.alloc()
2345 fref
.goto_whole_message(msg
.to_pbcm())
2347 # Handle deletes in prepare_callback
2348 if fref
.is_field_deleted():
2349 # Delete an VNFD record
2350 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
2351 if self
._vnfm
.vnfd_in_use(msg
.id):
2352 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
2353 err_msg
= "Cannot delete a VNFD in use - %s" % msg
2354 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
, xpath
, err_msg
)
2355 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
, xpath
)
2357 # Delete a VNFD record
2358 yield from self
._vnfm
.delete_vnfd(msg
.id)
2361 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2362 except rift
.tasklets
.dts
.ResponseError
as e
:
2364 "VnfdDtsHandler in project {} with path {} for action {} failed: {}".
2365 format(self
._vnfm
._project
, xpath
, xact_info
.query_action
, e
))
2367 xpath
= self
._vnfm
._project
.add_project(VnfdDtsHandler
.XPATH
)
2368 self
._log
.debug("Registering for VNFD config using xpath: {}".
2371 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2372 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2373 self
._regh
= acg
.register(
2375 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2376 on_prepare
=on_prepare
)
2378 class VnfrConsoleOperdataDtsHandler(object):
2380 Registers 'D,/rw-project:project/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]'
2381 and handles CRUD from DTS
2385 def vnfr_vdu_console_xpath(self
):
2386 """ path for resource-mgr"""
2387 return self
._project
.add_project(
2388 "D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id={}]".format(quoted_key(self
._vnfr
_id
)) +
2389 "/rw-vnfr:vdur[vnfr:id={}]".format(quoted_key(self
._vdur
_id
)))
2391 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2398 self
._vnfr
_id
= vnfr_id
2399 self
._vdur
_id
= vdur_id
2400 self
._vdu
_id
= vdu_id
2402 self
._project
= vnfm
._project
2404 def deregister(self
):
2405 '''De-register from DTS'''
2406 self
._log
.debug("De-register VNFR console DTS handler for project {}".
2407 format(self
._project
))
2409 self
._regh
.deregister()
2414 """ Register for VNFR VDU Operational Data read from dts """
2417 def on_prepare(xact_info
, action
, ks_path
, msg
):
2418 """ prepare callback from dts """
2419 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2421 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2422 xact_info
, action
, xpath
, msg
2425 if action
== rwdts
.QueryAction
.READ
:
2426 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur
.schema()
2427 path_entry
= schema
.keyspec_to_entry(ks_path
)
2428 self
._log
.debug("VDU Opdata path is {}".format(path_entry
.key00
.id))
2430 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2431 except VnfRecordError
as e
:
2432 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2433 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2436 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2437 if not vdur
._state
== VDURecordState
.READY
:
2438 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2439 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2441 with self
._dts
.transaction() as new_xact
:
2442 resp
= yield from vdur
.read_resource(new_xact
)
2443 vdur_console
= RwVnfrYang
.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2444 vdur_console
.id = self
._vdur
_id
2445 if resp
.console_url
:
2446 vdur_console
.console_url
= resp
.console_url
2448 vdur_console
.console_url
= 'none'
2449 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2451 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2452 vdur_console
= RwVnfrYang
.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2453 vdur_console
.id = self
._vdur
_id
2454 vdur_console
.console_url
= 'none'
2456 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2457 xpath
=self
.vnfr_vdu_console_xpath
,
2460 #raise VnfRecordError("Not supported operation %s" % action)
2461 self
._log
.error("Not supported operation %s" % action
)
2462 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2466 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2467 self
.vnfr_vdu_console_xpath
)
2468 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2469 with self
._dts
.group_create() as group
:
2470 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2472 flags
=rwdts
.Flag
.PUBLISHER
,
2476 class VnfrDtsHandler(object):
2477 """ registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2478 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2480 def __init__(self
, dts
, log
, loop
, vnfm
):
2487 self
._project
= vnfm
._project
2491 """ Return registration handle"""
2496 """ Return VNF manager instance """
2499 def deregister(self
):
2500 '''De-register from DTS'''
2501 self
._log
.debug("De-register VNFR DTS handler for project {}".
2502 format(self
._project
))
2504 self
._regh
.deregister()
2509 """ Register for vnfr create/update/delete/read requests from dts """
2512 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2515 def instantiate_realloc_vnfr(vnfr
):
2516 """Re-populate the vnfm after restart
2523 yield from vnfr
.instantiate(None, restart_mode
=True)
2525 self
._log
.debug("Got on_event in vnfm: {}".format(xact_event
))
2527 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2528 curr_cfg
= self
.regh
.elements
2529 for cfg
in curr_cfg
:
2531 vnfr
= self
.vnfm
.create_vnfr(cfg
, restart_mode
= True)
2533 self
._log
.error("Not Creating VNFR {} as corresponding NS is terminated".format(cfg
.id))
2535 self
._log
.debug("Creating VNFR {}".format(vnfr
.vnfr_id
))
2536 except Exception as e
:
2537 self
._log
.exception(e
)
2540 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2542 return rwdts
.MemberRspCode
.ACTION_OK
2545 def on_prepare(xact_info
, action
, ks_path
, msg
):
2546 """ prepare callback from dts """
2548 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2549 xact_info
, action
, msg
2553 def create_vnf(vnfr
):
2555 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2557 if msg
.operational_status
== 'pre_init':
2558 vnfr
.set_state(VirtualNetworkFunctionRecordState
.PRE_INIT
)
2559 yield from vnfr
.publish(None)
2561 if vnfr
.external_ro
:
2564 if msg
.operational_status
== 'init':
2566 def on_instantiate_done(fut
):
2567 # If the do_instantiate fails, then publish NSR with failed result
2570 import traceback
, sys
2571 print(traceback
.format_exception(None,e
, e
.__traceback
__), file=sys
.stderr
, flush
=True)
2572 self
._log
.exception("VNFR instantiation failed for VNFR id %s: %s", vnfr
.vnfr_id
, str(e
))
2573 self
._loop
.create_task(vnfr
.instantiation_failed(failed_reason
=str(e
)))
2576 # RIFT-9105: Unable to add a READ query under an existing transaction
2577 # xact = xact_info.xact
2578 assert vnfr
.task
is None
2579 vnfr
.task
= self
._loop
.create_task(vnfr
.instantiate(None))
2580 vnfr
.task
.add_done_callback(on_instantiate_done
)
2583 except Exception as e
:
2584 self
._log
.exception(e
)
2585 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2586 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2587 yield from vnfr
.publish(None)
2591 if action
== rwdts
.QueryAction
.CREATE
:
2592 if not msg
.has_field("vnfd"):
2593 err
= "Vnfd not provided"
2594 self
._log
.error(err
)
2595 raise VnfRecordError(err
)
2596 vnfr
= self
.vnfm
.create_vnfr(msg
)
2598 self
._log
.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg
.id))
2599 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2601 yield from create_vnf(vnfr
)
2604 elif action
== rwdts
.QueryAction
.DELETE
:
2605 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.schema()
2606 path_entry
= schema
.keyspec_to_entry(ks_path
)
2607 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2610 self
._log
.error("VNFR id %s not found for delete", path_entry
.key00
.id)
2611 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2613 # Preventing exception here if VNFR id is not found. This means delete is
2614 # invoked before Creation.
2615 # raise VirtualNetworkFunctionRecordNotFound(
2616 # "VNFR id %s", path_entry.key00.id)
2619 if not vnfr
.external_ro
:
2620 yield from vnfr
.terminate(xact_info
.xact
)
2621 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2622 except Exception as e
:
2623 self
._log
.exception(e
)
2624 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2626 elif action
== rwdts
.QueryAction
.UPDATE
:
2627 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.schema()
2628 path_entry
= schema
.keyspec_to_entry(ks_path
)
2631 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2634 # This means one of two things : The VNFR has been deleted or its a Launchpad restart.
2635 if msg
.id in self
._vnfm
._deleted
_vnfrs
:
2637 self
._log
.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg
.id))
2640 self
._log
.debug("Launchpad Restart - Recreating VNFR - %s", msg
.id)
2641 vnfr
= self
.vnfm
.create_vnfr(msg
)
2643 self
._log
.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg
.id))
2645 yield from create_vnf(vnfr
)
2649 except Exception as e
:
2650 self
._log
.error("Exception in VNFR Update : %s", str(e
))
2651 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2654 if vnfr
.external_ro
:
2655 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2658 if (msg
.operational_status
== 'pre_init' and not vnfr
._init
):
2659 # Creating VNFR INSTANTIATION TASK
2660 self
._log
.debug("VNFR {} update after substitution {} (operational_status {})".
2661 format(vnfr
.name
, msg
.vnfd
, msg
.operational_status
))
2662 yield from vnfr
.update_vnfr_after_substitution(msg
, xact_info
)
2663 yield from create_vnf(vnfr
)
2667 self
._log
.debug("VNFR {} update config status {} (current {})".
2668 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2669 # Update the config and publish
2670 yield from vnfr
.update_config(msg
, xact_info
)
2673 raise NotImplementedError(
2674 "%s action on VirtualNetworkFunctionRecord not supported",
2677 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2679 xpath
= self
._project
.add_project(VnfrDtsHandler
.XPATH
)
2680 self
._log
.debug("Registering for VNFR using xpath: {}".
2683 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2684 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2685 with self
._dts
.group_create(handler
=handlers
) as group
:
2686 self
._regh
= group
.register(xpath
=xpath
,
2688 flags
=(rwdts
.Flag
.PUBLISHER |
2690 rwdts
.Flag
.NO_PREP_READ |
2691 rwdts
.Flag
.DATASTORE
),)
2694 def create(self
, xact
, xpath
, msg
):
2696 Create a VNFR record in DTS with path and message
2698 path
= self
._project
.add_project(xpath
)
2699 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2702 self
.regh
.create_element(path
, msg
)
2703 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2707 def update(self
, xact
, xpath
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
2709 Update a VNFR record in DTS with path and message
2711 path
= self
._project
.add_project(xpath
)
2712 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2714 self
.regh
.update_element(path
, msg
, flags
)
2715 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2719 def delete(self
, xact
, xpath
):
2721 Delete a VNFR record in DTS with path and message
2723 path
= self
._project
.add_project(xpath
)
2724 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2725 self
.regh
.delete_element(path
)
2726 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2729 class VnfdRefCountDtsHandler(object):
2730 """ The VNFD Ref Count DTS handler """
2731 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2733 def __init__(self
, dts
, log
, loop
, vnfm
):
2743 """ Return registration handle """
2748 """ Return the NS manager instance """
2751 def deregister(self
):
2752 '''De-register from DTS'''
2753 self
._log
.debug("De-register VNFD Ref DTS handler for project {}".
2754 format(self
._vnfm
._project
))
2756 self
._regh
.deregister()
2761 """ Register for VNFD ref count read from dts """
2764 def on_prepare(xact_info
, action
, ks_path
, msg
):
2765 """ prepare callback from dts """
2766 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2768 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2769 xact_info
, action
, xpath
, msg
2772 if action
== rwdts
.QueryAction
.READ
:
2773 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount
.schema()
2774 path_entry
= schema
.keyspec_to_entry(ks_path
)
2775 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2776 for xpath
, msg
in vnfd_list
:
2777 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2779 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2782 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2784 raise VnfRecordError("Not supported operation %s" % action
)
2786 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2787 with self
._dts
.group_create() as group
:
2788 self
._regh
= group
.register(xpath
=self
._vnfm
._project
.add_project(
2789 VnfdRefCountDtsHandler
.XPATH
),
2791 flags
=rwdts
.Flag
.PUBLISHER
,
2795 class VdurDatastore(object):
2797 This VdurDatastore is intended to expose select information about a VDUR
2798 such that it can be referenced in a cloud config file. The data that is
2799 exposed does not necessarily follow the structure of the data in the yang
2800 model. This is intentional. The data that are exposed are intended to be
2801 agnostic of the yang model so that changes in the model do not necessarily
2802 require changes to the interface provided to the user. It also means that
2803 the user does not need to be familiar with the RIFT.ware yang models.
2807 """Create an instance of VdurDatastore"""
2808 self
._vdur
_data
= dict()
2809 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2811 def add(self
, vdur
):
2812 """Add a new VDUR to the datastore
2815 vdur - a VirtualDeploymentUnitRecord instance
2818 A ValueError is raised if the VDUR is (1) None or (2) already in
2822 if vdur
.vdu_id
is None:
2823 raise ValueError('VDURs are required to have an ID')
2825 if vdur
.vdu_id
in self
._vdur
_data
:
2826 raise ValueError('cannot add a VDUR more than once')
2828 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2830 def set_if_not_none(key
, attr
):
2831 if attr
is not None:
2832 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2834 set_if_not_none('name', vdur
._vdud
.name
)
2835 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2836 # The below can be used for hostname
2837 set_if_not_none('vdur_name', vdur
.unique_short_name
)
2838 set_if_not_none('custom_meta_data', vdur
._vdud
.supplemental_boot_data
.custom_meta_data
)
2840 def update(self
, vdur
):
2841 """Update the VDUR information in the datastore
2844 vdur - a GI representation of a VDUR
2847 A ValueError is raised if the VDUR is (1) None or (2) already in
2851 if vdur
.vdu_id
is None:
2852 raise ValueError('VNFDs are required to have an ID')
2854 if vdur
.vdu_id
not in self
._vdur
_data
:
2855 raise ValueError('VNF is not recognized')
2857 def set_or_delete(key
, attr
):
2859 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2860 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2863 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2865 set_or_delete('name', vdur
._vdud
.name
)
2866 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2867 # The below can be used for hostname
2868 set_or_delete('vdur_name', vdur
.unique_short_name
)
2869 set_or_delete('custom_meta_data', vdur
._vdud
.supplemental_boot_data
.custom_meta_data
)
2871 def remove(self
, vdur_id
):
2872 """Remove all of the data associated with specified VDUR
2875 vdur_id - the identifier of a VNFD in the datastore
2878 A ValueError is raised if the VDUR is not contained in the
2882 if vdur_id
not in self
._vdur
_data
:
2883 raise ValueError('VNF is not recognized')
2885 del self
._vdur
_data
[vdur_id
]
2887 def get(self
, expr
):
2888 """Retrieve VDUR information from the datastore
2890 An expression should be of the form,
2894 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2895 the exposed attribute that the user wishes to retrieve.
2897 If the requested data is not available, None is returned.
2900 expr - a string that specifies the data to return
2903 A ValueError is raised if the provided expression cannot be parsed.
2906 The requested data or None
2910 result
= self
._pattern
.match(expr
)
2912 raise ValueError('data expression not recognized ({})'.format(expr
))
2914 vdur_id
, key
= result
.groups()
2916 if vdur_id
not in self
._vdur
_data
:
2919 return self
._vdur
_data
[vdur_id
].get(key
, None)
2922 class VnfManager(object):
2923 """ The virtual network function manager class """
2924 def __init__(self
, dts
, log
, loop
, project
, cluster_name
):
2928 self
._project
= project
2929 self
._cluster
_name
= cluster_name
2931 # This list maintains a list of all the deleted vnfrs' ids. This is done to be able to determine
2932 # if the vnfr is not found because of restart or simply because it was deleted. In the first case we
2933 # recreate the vnfr while in the latter we do not.
2934 self
._deleted
_vnfrs
= []
2936 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2937 self
._vnfd
_handler
= VnfdDtsHandler(dts
, log
, loop
, self
)
2938 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2939 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(
2940 log
, dts
, loop
, project
, callback
=self
.handle_nsr
)
2941 self
._vlr
_handler
= subscriber
.VlrSubscriberDtsHandler(log
, dts
, loop
, project
,
2942 callback
=self
.vlr_event
)
2944 self
._dts
_handlers
= [self
._vnfd
_handler
,
2946 self
._vnfr
_ref
_handler
,
2951 self
._vnfds
_to
_vnfr
= {}
2953 self
._vnfr
_for
_vlr
= {}
2956 def vnfr_handler(self
):
2957 """ VNFR dts handler """
2958 return self
._vnfr
_handler
2961 def vnfr_ref_handler(self
):
2962 """ VNFR dts handler """
2963 return self
._vnfr
_ref
_handler
2967 """ Register all static DTS handlers """
2968 for hdl
in self
._dts
_handlers
:
2969 yield from hdl
.register()
2971 def deregister(self
):
2972 self
._log
.debug("De-register VNFM project {}".format(self
._project
.name
))
2973 for hdl
in self
._dts
_handlers
:
2978 """ Run this VNFM instance """
2979 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2980 yield from self.register()
2982 def handle_nsr(self, nsr, action):
2983 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
2984 self._nsrs[nsr.id] = nsr
2985 elif action == rwdts.QueryAction.DELETE:
2986 if nsr.id in self._nsrs:
2987 del self._nsrs[nsr.id]
2989 def get_nsr_config(self, nsr_id):
2991 Gets the NSR config from the DTS cache.
2992 Called in recovery mode only.
2994 if nsr_id in self._nsrs:
2995 return self._nsrs[nsr_id]
2998 self._log.error("VNFR with
id {} not found
".format(nsr_id))
3001 curr_cfgs = list(self._nsr_handler.reg.elements)
3002 key_map = { getattr(cfg, self._nsr_handler.key_name()): cfg for cfg in curr_cfgs }
3003 curr_cfgs = [key_map[key] for key in key_map]
3005 for cfg in curr_cfgs:
3006 self._nsrs[cfg.id] = cfg
3008 if nsr_id in self._nsrs:
3009 return self._nsrs[nsr_id]
3011 self._log.error("VNFR with
id {} not found
in DTS cache
".format(nsr_id))
3015 def get_linked_mgmt_network(self, vnfr, restart_mode=False):
3016 """For the given VNFR get the related mgmt network from the NSD, if
3019 vnfd_id = vnfr.vnfd.id
3020 nsr_id = vnfr.nsr_id_ref
3023 self._nsrs[nsr_id] = self.get_nsr_config(vnfr.nsr_id_ref)
3025 # for the given related VNFR, get the corresponding NSR-config
3028 nsr_obj = self._nsrs[nsr_id]
3030 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
3032 # for the related NSD check if a VLD exists such that it's a mgmt
3034 for vld in nsr_obj.nsd.vld:
3035 if vld.mgmt_network:
3036 for vnfd in vld.vnfd_connection_point_ref:
3037 if vnfd.vnfd_id_ref == vnfd_id:
3038 if vld.vim_network_name is not None:
3039 mgmt_net = vld.vim_network_name
3041 mgmt_net = self._project.name + "." + nsr_obj.name + "." + vld.name
3046 def get_vnfr(self, vnfr_id):
3047 """ get VNFR by vnfr id """
3049 if vnfr_id not in self._vnfrs:
3050 self._log.error("VNFR
id {} not found
".format(vnfr_id))
3052 # Returning None to prevent exception here. The caller raises the exception.
3053 # raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
3055 return self._vnfrs[vnfr_id]
3057 def create_vnfr(self, vnfr, restart_mode=False):
3058 # Check if NSR is present. This is a situation where the NS has been deleted before
3059 # VNFR Create starts.
3060 if vnfr.nsr_id_ref not in self._nsrs:
3063 """ Create a VNFR instance """
3064 if vnfr.id in self._vnfrs:
3065 msg = "Vnfr
id %s already exists
" % vnfr.id
3066 self._log.error(msg)
3067 raise VnfRecordError(msg)
3069 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
3074 mgmt_network = self.get_linked_mgmt_network(vnfr, restart_mode)
3075 except Exception as e:
3076 self._log.exception(e)
3079 # Identify if we are using Rift RO or external RO
3081 nsr = self._nsrs[vnfr.nsr_id_ref]
3082 if (nsr.resource_orchestrator and
3083 nsr.resource_orchestrator != 'rift'):
3084 self._log.debug("VNFR
{} using external RO
".
3088 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
3089 self._dts, self._log, self._loop, self._cluster_name, self, vnfr,
3090 mgmt_network=mgmt_network, external_ro=external_ro,
3094 if vnfr.vnfd.id in self._vnfds_to_vnfr:
3095 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
3097 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
3099 return self._vnfrs[vnfr.id]
3102 def delete_vnfr(self, xact, vnfr):
3103 """ Create a VNFR instance """
3104 if vnfr.vnfr_id in self._vnfrs:
3105 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
3106 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
3108 if vnfr.vnfd.id in self._vnfds_to_vnfr:
3109 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
3110 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
3112 del self._vnfrs[vnfr.vnfr_id]
3113 self._deleted_vnfrs.append(vnfr.vnfr_id)
3116 def fetch_vnfd(self, vnfd_id):
3117 """ Fetch VNFDs based with the vnfd id"""
3118 vnfd_path = self._project.add_project(
3119 VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id))
3120 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
3123 res_iter = yield from self._dts.query_read(vnfd_path,
3124 rwdts.XactFlag.MERGE)
3126 for ent in res_iter:
3127 res = yield from ent
3131 err = "Failed to get Vnfd
%s" % vnfd_id
3132 self._log.error(err)
3133 raise VnfRecordError(err)
3135 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
3139 def vnfd_in_use(self, vnfd_id):
3140 """ Is this VNFD in use """
3141 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
3142 if vnfd_id in self._vnfds_to_vnfr:
3143 return (self._vnfds_to_vnfr[vnfd_id] > 0)
3147 def publish_vnfr(self, xact, path, msg):
3148 """ Publish a VNFR """
3149 self._log.debug("publish_vnfr called with path
%s, msg
%s",
3151 yield from self.vnfr_handler.update(xact, path, msg)
3154 def delete_vnfd(self, vnfd_id):
3155 """ Delete the Virtual Network Function descriptor with the passed id """
3156 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
3157 if vnfd_id in self._vnfds_to_vnfr:
3158 if self._vnfds_to_vnfr[vnfd_id]:
3159 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
3161 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
3162 raise VirtualNetworkFunctionDescriptorRefCountExists(
3163 "Cannot delete
:%s, ref_count
:%s",
3165 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
3167 del self._vnfds_to_vnfr[vnfd_id]
3169 def vnfd_refcount_xpath(self, vnfd_id):
3170 """ xpath for ref count entry """
3171 return self._project.add_project(VnfdRefCountDtsHandler.XPATH +
3172 "[rw
-vnfr
:vnfd
-id-ref
={}]").format(quoted_key(vnfd_id))
3175 def get_vnfd_refcount(self, vnfd_id):
3176 """ Get the vnfd_list from this VNFM"""
3178 if vnfd_id is None or vnfd_id == "":
3179 for vnfd in self._vnfds_to_vnfr.keys():
3180 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
3181 vnfd_msg.vnfd_id_ref = vnfd
3182 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
3183 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
3184 elif vnfd_id in self._vnfds_to_vnfr:
3185 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
3186 vnfd_msg.vnfd_id_ref = vnfd_id
3187 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
3188 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
3192 def add_vlr_id_vnfr_map(self, vlr_id, vnfr):
3193 """ Add a mapping for vlr_id into VNFR """
3194 self._vnfr_for_vlr[vlr_id] = vnfr
3196 def remove_vlr_id_vnfr_map(self, vlr_id):
3197 """ Remove a mapping for vlr_id into VNFR """
3198 del self._vnfr_for_vlr[vlr_id]
3200 def find_vnfr_for_vlr_id(self, vlr_id):
3201 """ Find VNFR for VLR id """
3203 if vlr_id in self._vnfr_for_vlr:
3204 vnfr = self._vnfr_for_vlr[vlr_id]
3206 def vlr_event(self, vlr, action):
3207 """ VLR event handler """
3208 self._log.debug("VnfManager
: Received VLR
%s with action
:%s", vlr, action)
3210 if vlr.id not in self._vnfr_for_vlr:
3211 self._log.warning("VLR
%s:%s received
for unknown
id; %s",
3212 vlr.id, vlr.name, vlr)
3214 vnfr = self._vnfr_for_vlr[vlr.id]
3216 vnfr.vlr_event(vlr, action)
3219 class VnfmProject(ManoProject):
3221 def __init__(self, name, tasklet, **kw):
3222 super(VnfmProject, self).__init__(tasklet.log, name)
3223 self.update(tasklet)
3228 def register (self):
3230 vm_parent_name = self._tasklet.tasklet_info.get_parent_vm_parent_instance_name()
3231 assert vm_parent_name is not None
3232 self._vnfm = VnfManager(self._dts, self.log, self.loop, self, vm_parent_name)
3233 yield from self._vnfm.run()
3235 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
3238 def deregister(self):
3239 self._log.debug("De
-register project
{} for VnfmProject
".
3241 self._vnfm.deregister()
3244 def delete_prepare(self):
3245 if self._vnfm and self._vnfm._vnfrs:
3246 delete_msg = "Project has VNFR associated with it
. Delete all Project NSR
and try again
."
3247 return False, delete_msg
3250 class VnfmTasklet(rift.tasklets.Tasklet):
3251 """ VNF Manager tasklet class """
3252 def __init__(self, *args, **kwargs):
3253 super(VnfmTasklet, self).__init__(*args, **kwargs)
3254 self.rwlog.set_category("rw
-mano
-log
")
3255 self.rwlog.set_subcategory("vnfm
")
3258 self._project_handler = None
3267 super(VnfmTasklet, self).start()
3268 self.log.info("Starting VnfmTasklet
")
3270 self.log.setLevel(logging.DEBUG)
3272 self.log.debug("Registering with dts
")
3273 self._dts = rift.tasklets.DTS(self.tasklet_info,
3274 RwVnfmYang.get_schema(),
3276 self.on_dts_state_change)
3278 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
3280 self._log.error("Caught
Exception in VNFM start
:", sys.exc_info()[0])
3283 def on_instance_started(self):
3284 """ Task insance started callback """
3285 self.log.debug("Got instance started callback
")
3291 self._log.error("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
3296 """ Task init callback """
3297 self.log.debug("creating project handler
")
3298 self.project_handler = ProjectHandler(self, VnfmProject)
3299 self.project_handler.register()
3303 """ Task run callback """
3307 def on_dts_state_change(self, state):
3308 """Take action according to current dts state to transition
3309 application into the corresponding application state
3312 state - current dts state
3315 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
3316 rwdts.State.CONFIG: rwdts.State.RUN,
3320 rwdts.State.INIT: self.init,
3321 rwdts.State.RUN: self.run,
3324 # Transition application to next state
3325 handler = handlers.get(state, None)
3326 if handler is not None:
3327 yield from handler()
3329 # Transition dts to next state
3330 next_state = switch.get(state, None)
3331 if next_state is not None:
3332 self._dts.handle.set_state(next_state)