2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.package
.script
53 import rift
.mano
.dts
as mano_dts
56 class VMResourceError(Exception):
57 """ VM resource Error"""
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
71 class NotImplemented(Exception):
72 """Not implemented """
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
136 class VNFMPlacementGroupError(Exception):
139 class VirtualNetworkFunctionRecordState(enum
.Enum
):
146 VL_TERMINATE_PHASE
= 6
147 VDU_TERMINATE_PHASE
= 7
152 class VDURecordState(enum
.Enum
):
153 """VDU record state """
156 RESOURCE_ALLOC_PENDING
= 3
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
169 self
._component
= component
170 self
._cluster
_name
= cluster_name
171 self
._vcs
_handler
= vcs_handler
172 self
._mangled
_name
= mangled_name
175 def mangle_name(component_name
, vnf_name
, vnfd_id
):
176 """ mangled component name """
177 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
181 """ name of this component"""
182 return self
._mangled
_name
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self
.name
)
193 def instance_xpath(self
):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
198 "[instance-name = '{}']".format(self
._cluster
_name
))
201 def start_comp_xpath(self
):
202 """ start component xpath """
203 return (self
.instance_xpath
+
204 "/child-n[instance-name = 'START-REQ']")
206 def get_start_comp_msg(self
, ip_address
):
207 """ start this component """
208 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
209 start_msg
.instance_name
= 'START-REQ'
210 start_msg
.component_name
= self
.name
211 start_msg
.admin_command
= "START"
212 start_msg
.ip_address
= ip_address
218 """ Returns the message for this vcs component"""
220 vcs_comp_dict
= self
._component
.as_dict()
222 def mangle_comp_names(comp_dict
):
223 """ mangle component name with VNF name, id"""
224 for key
, val
in comp_dict
.items():
225 if isinstance(val
, dict):
226 comp_dict
[key
] = mangle_comp_names(val
)
227 elif isinstance(val
, list):
230 if isinstance(ent
, dict):
231 val
[i
] = mangle_comp_names(ent
)
235 elif key
== "component_name":
236 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
241 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
242 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
246 def publish(self
, xact
):
247 """ Publishes the VCS component """
248 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self
.name
, self
.path
, self
.msg
)
250 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
253 def start(self
, xact
, parent
, ip_addr
=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg
= self
.get_start_comp_msg(ip_addr
)
257 self
._log
.debug("starting component %s %s",
258 self
.start_comp_xpath
, start_msg
)
259 yield from self
._dts
.query_create(self
.start_comp_xpath
,
262 self
._log
.debug("started component %s, %s",
263 self
.start_comp_xpath
, start_msg
)
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
279 placement_groups
=[]):
285 self
._mgmt
_intf
= mgmt_intf
286 self
._cloud
_account
_name
= cloud_account_name
287 self
._vnfd
_package
_store
= vnfd_package_store
288 self
._mgmt
_network
= mgmt_network
290 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
293 self
._state
= VDURecordState
.INIT
294 self
._state
_failed
_reason
= None
295 self
._request
_id
= str(uuid
.uuid4())
296 self
._name
= vnfr
.name
+ "__" + vdud
.id
297 self
._placement
_groups
= placement_groups
300 self
._vdud
_cloud
_init
= None
301 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
304 def vdu_opdata_register(self
):
305 yield from self
._vdur
_console
_handler
.register()
307 def cp_ip_addr(self
, cp_name
):
308 """ Find ip address by connection point name """
309 if self
._vm
_resp
is not None:
310 for conn_point
in self
._vm
_resp
.connection_points
:
311 if conn_point
.name
== cp_name
:
312 return conn_point
.ip_address
315 def cp_mac_addr(self
, cp_name
):
316 """ Find mac address by connection point name """
317 if self
._vm
_resp
is not None:
318 for conn_point
in self
._vm
_resp
.connection_points
:
319 if conn_point
.name
== cp_name
:
320 return conn_point
.mac_addr
321 return "00:00:00:00:00:00"
323 def cp_id(self
, cp_name
):
324 """ Find connection point id by connection point name """
325 if self
._vm
_resp
is not None:
326 for conn_point
in self
._vm
_resp
.connection_points
:
327 if conn_point
.name
== cp_name
:
328 return conn_point
.connection_point_id
341 """ Return this VDUR's name """
345 def cloud_account_name(self
):
346 """ Cloud account this VDU should be created in """
347 return self
._cloud
_account
_name
350 def image_name(self
):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self
._vdud
:
354 return os
.path
.basename(self
._vdud
.image
)
357 def image_checksum(self
):
358 """ name that should be used to lookup the image on the CMP """
359 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
362 def management_ip(self
):
365 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
368 def vm_management_ip(self
):
371 return self
._vm
_resp
.management_ip
374 def operational_status(self
):
375 """ Operational status of this VDU"""
376 op_stats_dict
= {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
384 return op_stats_dict
[self
._state
.name
]
388 """ Process VDU message from resmgr"""
389 vdu_fields
= ["vm_flavor",
396 vdu_copy_dict
= {k
: v
for k
, v
in
397 self
._vdud
.as_dict().items() if k
in vdu_fields
}
398 vdur_dict
= {"id": self
._vdur
_id
,
399 "vdu_id_ref": self
._vdud
.id,
400 "operational_status": self
.operational_status
,
401 "operational_status_details": self
._state
_failed
_reason
,
403 if self
.vm_resp
is not None:
404 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
405 "flavor_id": self
.vm_resp
.flavor_id
407 if self
._vm
_resp
.has_field('image_id'):
408 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
410 if self
.management_ip
is not None:
411 vdur_dict
["management_ip"] = self
.management_ip
413 if self
.vm_management_ip
is not None:
414 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
416 vdur_dict
.update(vdu_copy_dict
)
418 if self
.vm_resp
is not None:
419 if self
._vm
_resp
.has_field('volumes'):
420 for opvolume
in self
._vm
_resp
.volumes
:
421 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
422 if len(vdurvol_data
) == 1:
423 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
424 if opvolume
.has_field('custom_meta_data'):
425 metadata_list
= list()
426 for metadata_item
in opvolume
.custom_meta_data
:
427 metadata_list
.append(metadata_item
.as_dict())
428 vdurvol_data
[0]['custom_meta_data'] = metadata_list
430 if self
._vm
_resp
.has_field('supplemental_boot_data'):
431 vdur_dict
['supplemental_boot_data'] = dict()
432 if self
._vm
_resp
.supplemental_boot_data
.has_field('boot_data_drive'):
433 vdur_dict
['supplemental_boot_data']['boot_data_drive'] = self
._vm
_resp
.supplemental_boot_data
.boot_data_drive
434 if self
._vm
_resp
.supplemental_boot_data
.has_field('custom_meta_data'):
435 metadata_list
= list()
436 for metadata_item
in self
._vm
_resp
.supplemental_boot_data
.custom_meta_data
:
437 metadata_list
.append(metadata_item
.as_dict())
438 vdur_dict
['supplemental_boot_data']['custom_meta_data'] = metadata_list
439 if self
._vm
_resp
.supplemental_boot_data
.has_field('config_file'):
441 for file_item
in self
._vm
_resp
.supplemental_boot_data
.config_file
:
442 file_list
.append(file_item
.as_dict())
443 vdur_dict
['supplemental_boot_data']['config_file'] = file_list
448 for intf
, cp_id
, vlr
in self
._int
_intf
:
449 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
451 icp_list
.append({"name": cp
.name
,
453 "type_yang": "VPORT",
454 "ip_address": self
.cp_ip_addr(cp
.id),
455 "mac_address": self
.cp_mac_addr(cp
.id)})
457 ii_list
.append({"name": intf
.name
,
458 "vdur_internal_connection_point_ref": cp
.id,
459 "virtual_interface": {}})
461 vdur_dict
["internal_connection_point"] = icp_list
462 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
463 vdur_dict
["internal_interface"] = ii_list
466 for intf
, cp
, vlr
in self
._ext
_intf
:
467 ei_list
.append({"name": cp
.name
,
468 "vnfd_connection_point_ref": cp
.name
,
469 "virtual_interface": {}})
470 self
._vnfr
.update_cp(cp
.name
,
471 self
.cp_ip_addr(cp
.name
),
472 self
.cp_mac_addr(cp
.name
),
475 vdur_dict
["external_interface"] = ei_list
477 placement_groups
= []
478 for group
in self
._placement
_groups
:
479 placement_groups
.append(group
.as_dict())
480 vdur_dict
['placement_groups_info'] = placement_groups
482 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
485 def resmgr_path(self
):
486 """ path for resource-mgr"""
487 return ("D,/rw-resource-mgr:resource-mgmt" +
489 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
492 def vm_flavor_msg(self
):
493 """ VM flavor message """
494 flavor
= self
._vdud
.vm_flavor
.__class
__()
495 flavor
.copy_from(self
._vdud
.vm_flavor
)
500 def vdud_cloud_init(self
):
501 """ Return the cloud-init contents for the VDU """
502 if self
._vdud
_cloud
_init
is None:
503 self
._vdud
_cloud
_init
= self
.cloud_init()
505 return self
._vdud
_cloud
_init
507 def cloud_init(self
):
508 """ Populate cloud_init with cloud-config script from
509 either the inline contents or from the file provided
511 if self
._vdud
.cloud_init
is not None:
512 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
513 return self
._vdud
.cloud_init
514 elif self
._vdud
.cloud_init_file
is not None:
515 # Get cloud-init script contents from the file provided in the cloud_init_file param
516 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
517 filename
= self
._vdud
.cloud_init_file
518 self
._vnfd
_package
_store
.refresh()
519 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
520 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
522 return cloud_init_extractor
.read_script(stored_package
, filename
)
523 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
524 raise VirtualDeploymentUnitRecordError(e
)
526 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
528 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
530 availability_zones
= []
532 for group
in self
._placement
_groups
:
533 if group
.has_field('host_aggregate'):
534 for aggregate
in group
.host_aggregate
:
535 host_aggregates
.append(aggregate
.as_dict())
536 if group
.has_field('availability_zone'):
537 availability_zones
.append(group
.availability_zone
.as_dict())
538 if group
.has_field('server_group'):
539 server_groups
.append(group
.server_group
.as_dict())
541 if availability_zones
:
542 if len(availability_zones
) > 1:
543 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
544 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
546 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
549 if len(server_groups
) > 1:
550 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
551 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
553 vm_create_msg_dict
['server_group'] = server_groups
[0]
556 vm_create_msg_dict
['host_aggregate'] = host_aggregates
560 def process_placement_groups(self
, vm_create_msg_dict
):
561 """Process the placement_groups and fill resource-mgr request"""
562 if not self
._placement
_groups
:
565 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
566 assert len(cloud_set
) == 1
567 cloud_type
= cloud_set
.pop()
569 if cloud_type
== 'openstack':
570 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
573 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
576 def process_custom_bootdata(self
, vm_create_msg_dict
):
577 """Process the custom boot data"""
578 if 'config_file' not in vm_create_msg_dict
['supplemental_boot_data']:
581 self
._vnfd
_package
_store
.refresh()
582 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
583 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
584 for file_item
in vm_create_msg_dict
['supplemental_boot_data']['config_file']:
585 if 'source' not in file_item
or 'dest' not in file_item
:
587 source
= file_item
['source']
588 # Find source file in scripts dir of VNFD
589 self
._log
.debug("Checking for source config file at %s", source
)
591 source_file_str
= cloud_init_extractor
.read_script(stored_package
, source
)
592 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
593 raise VirtualDeploymentUnitRecordError(e
)
594 # Update source file location with file contents
595 file_item
['source'] = source_file_str
599 def resmgr_msg(self
, config
=None):
600 vdu_fields
= ["vm_flavor",
606 "supplemental_boot_data"]
608 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
609 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
611 vm_create_msg_dict
= {
615 if self
.image_name
is not None:
616 vm_create_msg_dict
["image_name"] = self
.image_name
618 if self
.image_checksum
is not None:
619 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
621 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
622 if self
._vdud
.has_field('mgmt_vpci'):
623 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
625 self
._log
.debug("VDUD: %s", self
._vdud
)
626 if config
is not None:
627 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
629 if self
._mgmt
_network
:
630 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
633 for intf
, cp
, vlr
in self
._ext
_intf
:
634 cp_info
= {"name": cp
.name
,
635 "virtual_link_id": vlr
.network_id
,
636 "type_yang": intf
.virtual_interface
.type_yang
,
637 "port_security_enabled": cp
.port_security_enabled
}
639 if (intf
.virtual_interface
.has_field('vpci') and
640 intf
.virtual_interface
.vpci
is not None):
641 cp_info
["vpci"] = intf
.virtual_interface
.vpci
643 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
644 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
646 cp_list
.append(cp_info
)
648 for intf
, cp
, vlr
in self
._int
_intf
:
649 if (intf
.virtual_interface
.has_field('vpci') and
650 intf
.virtual_interface
.vpci
is not None):
651 cp_list
.append({"name": cp
,
652 "virtual_link_id": vlr
.network_id
,
653 "type_yang": intf
.virtual_interface
.type_yang
,
654 "vpci": intf
.virtual_interface
.vpci
})
656 cp_list
.append({"name": cp
,
657 "virtual_link_id": vlr
.network_id
,
658 "type_yang": intf
.virtual_interface
.type_yang
,
659 "port_security_enabled": cp
.port_security_enabled
})
661 vm_create_msg_dict
["connection_points"] = cp_list
662 vm_create_msg_dict
.update(vdu_copy_dict
)
664 self
.process_placement_groups(vm_create_msg_dict
)
665 if 'supplemental_boot_data' in vm_create_msg_dict
:
666 self
.process_custom_bootdata(vm_create_msg_dict
)
668 msg
= RwResourceMgrYang
.VDUEventData()
669 msg
.event_id
= self
._request
_id
670 msg
.cloud_account
= self
.cloud_account_name
671 msg
.request_info
.from_dict(vm_create_msg_dict
)
676 def terminate(self
, xact
):
677 """ Delete resource in VIM """
678 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
679 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
682 self
._state
= VDURecordState
.TERMINATING
683 if self
._vm
_resp
is not None:
685 with self
._dts
.transaction() as new_xact
:
686 yield from self
.delete_resource(new_xact
)
688 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
690 if self
._rm
_regh
is not None:
691 self
._log
.debug("Deregistering resource manager registration handle")
692 self
._rm
_regh
.deregister()
695 if self
._vdur
_console
_handler
is not None:
696 self
._log
.error("Deregistering vnfr vdur registration handle")
697 self
._vdur
_console
_handler
._regh
.deregister()
698 self
._vdur
_console
_handler
._regh
= None
700 self
._state
= VDURecordState
.TERMINATED
702 def find_internal_cp_by_cp_id(self
, cp_id
):
703 """ Find the CP corresponding to the connection point id"""
706 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
709 for int_cp
in self
._vdud
.internal_connection_point
:
710 self
._log
.debug("Checking for int cp %s in internal connection points",
712 if int_cp
.id == cp_id
:
717 self
._log
.debug("Failed to find cp %s in internal connection points",
719 msg
= "Failed to find cp %s in internal connection points" % cp_id
720 raise VduRecordError(msg
)
722 # return the VLR associated with the connection point
726 def create_resource(self
, xact
, vnfr
, config
=None):
727 """ Request resource from ResourceMgr """
728 def find_cp_by_name(cp_name
):
729 """ Find a connection point by name """
731 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
732 for ext_cp
in vnfr
._cprs
:
733 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
734 if ext_cp
.name
== cp_name
:
738 self
._log
.debug("Failed to find cp %s in external connection points",
742 def find_internal_vlr_by_cp_name(cp_name
):
743 """ Find the VLR corresponding to the connection point name"""
746 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
749 for int_cp
in self
._vdud
.internal_connection_point
:
750 self
._log
.debug("Checking for int cp %s in internal connection points",
752 if int_cp
.id == cp_name
:
757 self
._log
.debug("Failed to find cp %s in internal connection points",
759 msg
= "Failed to find cp %s in internal connection points" % cp_name
760 raise VduRecordError(msg
)
762 # return the VLR associated with the connection point
763 return vnfr
.find_vlr_by_cp(cp_name
)
765 block
= xact
.block_create()
767 self
._log
.debug("Executing vm request id: %s, action: create",
770 # Resolve the networks associated external interfaces
771 for ext_intf
in self
._vdud
.external_interface
:
772 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
773 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
774 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
776 self
._log
.debug("Failed to find connection point - %s",
777 ext_intf
.vnfd_connection_point_ref
)
779 self
._log
.debug("Connection point name [%s], type[%s]",
780 cp
.name
, cp
.type_yang
)
782 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
784 etuple
= (ext_intf
, cp
, vlr
)
785 self
._ext
_intf
.append(etuple
)
787 self
._log
.debug("Created external interface tuple : %s", etuple
)
789 # Resolve the networks associated internal interfaces
790 for intf
in self
._vdud
.internal_interface
:
791 cp_id
= intf
.vdu_internal_connection_point_ref
792 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
796 vlr
= find_internal_vlr_by_cp_name(cp_id
)
797 except Exception as e
:
798 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
799 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
800 raise VduRecordError(msg
)
802 ituple
= (intf
, cp_id
, vlr
)
803 self
._int
_intf
.append(ituple
)
805 self
._log
.debug("Created internal interface tuple : %s", ituple
)
807 resmgr_path
= self
.resmgr_path
808 resmgr_msg
= self
.resmgr_msg(config
)
810 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
811 block
.add_query_create(resmgr_path
, resmgr_msg
)
813 res_iter
= yield from block
.execute(now
=True)
821 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
822 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
823 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
824 return resp
.resource_info
827 def delete_resource(self
, xact
):
828 block
= xact
.block_create()
830 self
._log
.debug("Executing vm request id: %s, action: delete",
833 block
.add_query_delete(self
.resmgr_path
)
835 yield from block
.execute(flags
=0, now
=True)
838 def read_resource(self
, xact
):
839 block
= xact
.block_create()
841 self
._log
.debug("Executing vm request id: %s, action: delete",
844 block
.add_query_read(self
.resmgr_path
)
846 res_iter
= yield from block
.execute(flags
=0, now
=True)
851 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
852 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
853 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
854 #self._vm_resp = resp.resource_info
855 return resp
.resource_info
859 def start_component(self
):
860 """ This VDUR is active """
861 self
._log
.debug("Starting component %s for vdud %s vdur %s",
862 self
._vdud
.vcs_component_ref
,
865 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
866 self
.vm_resp
.management_ip
)
870 """ Is this VDU active """
871 return True if self
._state
is VDURecordState
.READY
else False
874 def instantiation_failed(self
, failed_reason
=None):
875 """ VDU instantiation failed """
876 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
877 self
._state
= VDURecordState
.FAILED
878 self
._state
_failed
_reason
= failed_reason
879 yield from self
._vnfr
.instantiation_failed(failed_reason
)
882 def vdu_is_active(self
):
883 """ This VDU is active"""
885 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
888 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
890 if self
._vdud
.vcs_component_ref
is not None:
891 yield from self
.start_component()
893 self
._state
= VDURecordState
.READY
895 if self
._vnfr
.all_vdus_active():
896 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
897 yield from self
._vnfr
.is_ready()
900 def instantiate(self
, xact
, vnfr
, config
=None):
901 """ Instantiate this VDU """
902 self
._state
= VDURecordState
.INSTANTIATING
905 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
906 """ This VDUR is active """
907 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
912 if (query_action
== rwdts
.QueryAction
.UPDATE
or
913 query_action
== rwdts
.QueryAction
.CREATE
):
916 if msg
.resource_state
== "active":
917 # Move this VDU to ready state
918 yield from self
.vdu_is_active()
919 elif msg
.resource_state
== "failed":
920 yield from self
.instantiation_failed(msg
.resource_errors
)
921 elif query_action
== rwdts
.QueryAction
.DELETE
:
922 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
924 raise NotImplementedError(
925 "%s action on VirtualDeployementUnitRecord not supported",
928 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
931 reg_event
= asyncio
.Event(loop
=self
._loop
)
934 def on_ready(regh
, status
):
937 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
938 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
939 flags
=rwdts
.Flag
.SUBSCRIBER
,
941 yield from reg_event
.wait()
943 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
944 self
._vm
_resp
= vm_resp
946 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
947 self
._log
.debug("Requested VM from resource manager response %s",
949 if vm_resp
.resource_state
== "active":
950 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
952 yield from self
.vdu_is_active()
953 self
._state
= VDURecordState
.READY
954 elif (vm_resp
.resource_state
== "pending" or
955 vm_resp
.resource_state
== "inactive"):
956 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
958 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
959 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
960 # flags=rwdts.Flag.SUBSCRIBER,
963 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
965 raise VirtualDeploymentUnitRecordError(
966 "Failed VDUR instantiation %s " % vm_resp
)
968 except Exception as e
:
970 traceback
.print_exc()
971 self
._log
.exception(e
)
972 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
973 self
._state
= VDURecordState
.FAILED
974 yield from self
.instantiation_failed(str(e
))
977 class VlRecordState(enum
.Enum
):
978 """ VL Record State """
980 INSTANTIATION_PENDING
= 102
982 TERMINATE_PENDING
= 104
987 class InternalVirtualLinkRecord(object):
988 """ Internal Virtual Link record """
989 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
, ip_profile
=None):
993 self
._ivld
_msg
= ivld_msg
994 self
._vnfr
_name
= vnfr_name
995 self
._cloud
_account
_name
= cloud_account_name
996 self
._ip
_profile
= ip_profile
998 self
._vlr
_req
= self
.create_vlr()
1000 self
._state
= VlRecordState
.INIT
1004 """ Find VLR by id """
1005 return self
._vlr
_req
.id
1009 """ Name of this VL """
1010 if self
._ivld
_msg
.vim_network_name
:
1011 return self
._ivld
_msg
.vim_network_name
1013 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
1016 def network_id(self
):
1017 """ Find VLR by id """
1018 return self
._vlr
.network_id
if self
._vlr
else None
1021 """ VLR path for this VLR instance"""
1022 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
1024 def create_vlr(self
):
1025 """ Create the VLR record which will be instantiated """
1027 vld_fields
= ["short_name",
1035 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
1037 vlr_dict
= {"id": str(uuid
.uuid4()),
1039 "cloud_account": self
._cloud
_account
_name
,
1042 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
1043 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
1045 vlr_dict
.update(vld_copy_dict
)
1047 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
1051 def instantiate(self
, xact
, restart_mode
=False):
1052 """ Instantiate VL """
1055 def instantiate_vlr():
1056 """ Instantiate VLR"""
1057 self
._log
.debug("Create VL with xpath %s and vlr %s",
1058 self
.vlr_path(), self
._vlr
_req
)
1060 with self
._dts
.transaction(flags
=0) as xact
:
1061 block
= xact
.block_create()
1062 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1063 self
._log
.debug("Executing VL create path:%s msg:%s",
1064 self
.vlr_path(), self
._vlr
_req
)
1068 res_iter
= yield from block
.execute()
1070 self
._state
= VlRecordState
.FAILED
1071 self
._log
.exception("Caught exception while instantial VL")
1074 for ent
in res_iter
:
1075 res
= yield from ent
1076 self
._vlr
= res
.result
1078 if self
._vlr
.operational_status
== 'failed':
1079 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1080 self
._state
= VlRecordState
.FAILED
1081 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1083 self
._log
.info("Created VL with xpath %s and vlr %s",
1084 self
.vlr_path(), self
._vlr
)
1088 """ Get the network id """
1089 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1091 for ent
in res_iter
:
1092 res
= yield from ent
1096 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1098 raise InternalVirtualLinkRecordError(err
)
1101 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1104 vl
= yield from get_vlr()
1106 yield from instantiate_vlr()
1108 yield from instantiate_vlr()
1110 self
._state
= VlRecordState
.ACTIVE
1112 def vlr_in_vns(self
):
1113 """ Is there a VLR record in VNS """
1114 if (self
._state
== VlRecordState
.ACTIVE
or
1115 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1116 self
._state
== VlRecordState
.FAILED
):
1122 def terminate(self
, xact
):
1123 """Terminate this VL """
1124 if not self
.vlr_in_vns():
1125 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1126 self
.vlr_id
, self
._state
)
1129 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1130 self
._state
= VlRecordState
.TERMINATE_PENDING
1131 block
= xact
.block_create()
1132 block
.add_query_delete(self
.vlr_path())
1133 yield from block
.execute(flags
=0, now
=True)
1134 self
._state
= VlRecordState
.TERMINATED
1135 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1138 class VirtualNetworkFunctionRecord(object):
1139 """ Virtual Network Function Record """
1140 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1144 self
._cluster
_name
= cluster_name
1145 self
._vnfr
_msg
= vnfr_msg
1146 self
._vnfr
_id
= vnfr_msg
.id
1147 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1149 self
._vcs
_handler
= vcs_handler
1150 self
._vnfr
= vnfr_msg
1151 self
._mgmt
_network
= mgmt_network
1153 self
._vnfd
= vnfr_msg
.vnfd
1154 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1155 self
._state
_failed
_reason
= None
1156 self
._ext
_vlrs
= {} # The list of external virtual links
1157 self
._vlrs
= [] # The list of internal virtual links
1158 self
._vdus
= [] # The list of vdu
1159 self
._vlr
_by
_cp
= {}
1161 self
._inventory
= {}
1162 self
._create
_time
= int(time
.time())
1163 self
._vnf
_mon
= None
1164 self
._config
_status
= vnfr_msg
.config_status
1165 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1166 self
._rw
_vnfd
= None
1167 self
._vnfd
_ref
_count
= 0
1169 def _get_vdur_from_vdu_id(self
, vdu_id
):
1170 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1171 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1172 for vdu
in self
._vdus
:
1173 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1174 if vdu
.vdu_id
== vdu_id
:
1177 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1180 def operational_status(self
):
1181 """ Operational status of this VNFR """
1182 op_status_map
= {"INIT": "init",
1183 "VL_INIT_PHASE": "vl_init_phase",
1184 "VM_INIT_PHASE": "vm_init_phase",
1186 "TERMINATE": "terminate",
1187 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1188 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1189 "TERMINATED": "terminated",
1190 "FAILED": "failed", }
1191 return op_status_map
[self
._state
.name
]
1194 def vnfd_xpath(vnfd_id
):
1195 """ VNFD xpath associated with this VNFR """
1196 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1199 def vnfd_ref_count(self
):
1200 """ Returns the VNFD reference count associated with this VNFR """
1201 return self
._vnfd
_ref
_count
1203 def vnfd_in_use(self
):
1204 """ Returns whether vnfd is in use or not """
1205 return True if self
._vnfd
_ref
_count
> 0 else False
1208 """ Take a reference on this object """
1209 self
._vnfd
_ref
_count
+= 1
1210 return self
._vnfd
_ref
_count
1212 def vnfd_unref(self
):
1213 """ Release reference on this object """
1214 if self
._vnfd
_ref
_count
< 1:
1215 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1216 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1217 self
._log
.critical(msg
)
1218 raise VnfRecordError(msg
)
1219 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1220 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1221 self
._vnfd
_ref
_count
-= 1
1222 return self
._vnfd
_ref
_count
1226 """ VNFD for this VNFR """
1231 """ VNFD name associated with this VNFR """
1232 return self
.vnfd
.name
1236 """ Name of this VNF in the record """
1237 return self
._vnfr
.name
1240 def cloud_account_name(self
):
1241 """ Name of the cloud account this VNFR is instantiated in """
1242 return self
._vnfr
.cloud_account
1246 """ VNFD Id associated with this VNFR """
1251 """ VNFR Id associated with this VNFR """
1252 return self
._vnfr
_id
1255 def member_vnf_index(self
):
1256 """ Member VNF index associated with this VNFR """
1257 return self
._vnfr
.member_vnf_index_ref
1260 def config_status(self
):
1261 """ Config agent status for this VNFR """
1262 return self
._config
_status
1264 def component_by_name(self
, component_name
):
1265 """ Find a component by name in the inventory list"""
1266 mangled_name
= VcsComponent
.mangle_name(component_name
,
1269 return self
._inventory
[mangled_name
]
1274 def get_nsr_config(self
):
1275 ### Need access to NS instance configuration for runtime resolution.
1276 ### This shall be replaced when deployment flavors are implemented
1277 xpath
= "C,/nsr:ns-instance-config"
1278 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1280 for result
in results
:
1281 entry
= yield from result
1282 ns_instance_config
= entry
.result
1283 for nsr
in ns_instance_config
.nsr
:
1284 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1289 def start_component(self
, component_name
, ip_addr
):
1290 """ Start a component in the VNFR by name """
1291 comp
= self
.component_by_name(component_name
)
1292 yield from comp
.start(None, None, ip_addr
)
1294 def cp_ip_addr(self
, cp_name
):
1295 """ Get ip address for connection point """
1296 self
._log
.debug("cp_ip_addr()")
1297 for cp
in self
._cprs
:
1298 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1299 return cp
.ip_address
1302 def mgmt_intf_info(self
):
1303 """ Get Management interface info for this VNFR """
1304 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1306 if mgmt_intf_desc
.has_field("cp"):
1307 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1308 elif mgmt_intf_desc
.has_field("vdu_id"):
1310 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1311 ip_addr
= vdur
.management_ip
1312 except VDURecordNotFound
:
1313 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1316 ip_addr
= mgmt_intf_desc
.ip_address
1317 port
= mgmt_intf_desc
.port
1319 return ip_addr
, port
1323 """ Message associated with this VNFR """
1324 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1325 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1327 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1328 ip_address
, port
= self
.mgmt_intf_info()
1330 if ip_address
is not None:
1331 mgmt_intf
.ip_address
= ip_address
1332 if port
is not None:
1333 mgmt_intf
.port
= port
1335 vnfr_dict
= {"id": self
._vnfr
_id
,
1336 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1338 "member_vnf_index_ref": self
.member_vnf_index
,
1339 "operational_status": self
.operational_status
,
1340 "operational_status_details": self
._state
_failed
_reason
,
1341 "cloud_account": self
.cloud_account_name
,
1342 "config_status": self
._config
_status
1345 vnfr_dict
.update(vnfd_copy_dict
)
1347 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1348 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1350 vnfr_msg
.create_time
= self
._create
_time
1351 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1352 vnfr_msg
.mgmt_interface
= mgmt_intf
1354 # Add all the VLRs to VNFR
1355 for vlr
in self
._vlrs
:
1356 ivlr
= vnfr_msg
.internal_vlr
.add()
1357 ivlr
.vlr_ref
= vlr
.vlr_id
1359 # Add all the VDURs to VDUR
1360 if self
._vdus
is not None:
1361 for vdu
in self
._vdus
:
1362 vdur
= vnfr_msg
.vdur
.add()
1363 vdur
.from_dict(vdu
.msg
.as_dict())
1365 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1366 vnfr_msg
.dashboard_url
= self
.dashboard_url
1368 for cpr
in self
._cprs
:
1369 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1370 vnfr_msg
.connection_point
.append(new_cp
)
1372 if self
._vnf
_mon
is not None:
1373 for monp
in self
._vnf
_mon
.msg
:
1374 vnfr_msg
.monitoring_param
.append(
1375 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1377 if self
._vnfr
.vnf_configuration
is not None:
1378 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1379 if (ip_address
is not None and
1380 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1381 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1383 for group
in self
._vnfr
_msg
.placement_groups_info
:
1384 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1385 group_info
.from_dict(group
.as_dict())
1386 vnfr_msg
.placement_groups_info
.append(group_info
)
1391 def dashboard_url(self
):
1392 ip
, cfg_port
= self
.mgmt_intf_info()
1395 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1396 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1399 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1400 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1402 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1406 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1413 """ path for this VNFR """
1414 return("D,/vnfr:vnfr-catalog"
1415 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1418 def publish(self
, xact
):
1419 """ publish this VNFR """
1421 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1422 self
.xpath
, self
.msg
)
1423 vnfr
.create_time
= self
._create
_time
1424 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1425 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1426 self
.xpath
, self
.msg
)
1428 def resolve_vld_ip_profile(self
, vnfd_msg
, vld
):
1429 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1430 if not vld
.has_field('ip_profile_ref'):
1432 profile
= [profile
for profile
in vnfd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1433 return profile
[0] if profile
else None
1436 def create_vls(self
):
1437 """ Publish The VLs associated with this VNF """
1438 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1440 for ivld_msg
in self
.vnfd
.internal_vld
:
1441 self
._log
.debug("Creating internal vld:"
1442 " %s, int_cp_ref = %s",
1443 ivld_msg
, ivld_msg
.internal_connection_point
1445 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1449 vnfr_name
=self
.name
,
1450 cloud_account_name
=self
.cloud_account_name
,
1451 ip_profile
=self
.resolve_vld_ip_profile(self
.vnfd
, ivld_msg
)
1453 self
._vlrs
.append(vlr
)
1455 for int_cp
in ivld_msg
.internal_connection_point
:
1456 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1457 msg
= ("Connection point %s already "
1458 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1459 raise InternalVirtualLinkRecordError(msg
)
1460 self
._log
.debug("Setting vlr %s to internal cp = %s",
1462 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1465 def instantiate_vls(self
, xact
, restart_mode
=False):
1466 """ Instantiate the VLs associated with this VNF """
1467 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1470 for vlr
in self
._vlrs
:
1471 self
._log
.debug("Instantiating VLR %s", vlr
)
1472 yield from vlr
.instantiate(xact
, restart_mode
)
1474 def find_vlr_by_cp(self
, cp_name
):
1475 """ Find the VLR associated with the cp name """
1476 return self
._vlr
_by
_cp
[cp_name
]
1478 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1480 Returns the cloud specific construct for placement group
1482 input_group: VNFD PlacementGroup
1483 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1485 copy_dict
= ['name', 'requirement', 'strategy']
1486 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1487 if group_info
.placement_group_ref
== input_group
.name
and \
1488 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1489 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1490 group_dict
= {k
:v
for k
,v
in
1491 group_info
.as_dict().items()
1492 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1493 for param
in copy_dict
:
1494 group_dict
.update({param
: getattr(input_group
, param
)})
1495 group
.from_dict(group_dict
)
1500 def get_vdu_placement_groups(self
, vdu
):
1501 placement_groups
= []
1502 ### Step-1: Get VNF level placement groups
1503 for group
in self
._vnfr
_msg
.placement_groups_info
:
1504 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1505 #group_info.from_dict(group.as_dict())
1506 placement_groups
.append(group
)
1508 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1509 nsr_config
= yield from self
.get_nsr_config()
1511 ### Step-3: Get VDU level placement groups
1512 for group
in self
.vnfd
.placement_groups
:
1513 for member_vdu
in group
.member_vdus
:
1514 if member_vdu
.member_vdu_ref
== vdu
.id:
1515 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1517 if group_info
is None:
1518 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1519 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1521 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1525 self
.member_vnf_index
)
1526 placement_groups
.append(group_info
)
1528 return placement_groups
1531 def create_vdus(self
, vnfr
, restart_mode
=False):
1532 """ Create the VDUs associated with this VNF """
1534 def get_vdur_id(vdud
):
1535 """Get the corresponding VDUR's id for the VDUD. This is useful in
1538 In restart mode we check for exiting VDUR's ID and use them, if
1539 available. This way we don't end up creating duplicate VDURs
1543 if restart_mode
and vdud
is not None:
1545 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1548 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1553 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1554 for vdu
in self
._rw
_vnfd
.vdu
:
1555 self
._log
.debug("Creating vdu: %s", vdu
)
1556 vdur_id
= get_vdur_id(vdu
)
1558 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1559 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1562 self
.member_vnf_index
,
1563 [ group
.name
for group
in placement_groups
])
1565 vdur
= VirtualDeploymentUnitRecord(
1571 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1572 mgmt_network
=self
._mgmt
_network
,
1573 cloud_account_name
=self
.cloud_account_name
,
1574 vnfd_package_store
=self
._vnfd
_package
_store
,
1576 placement_groups
= placement_groups
,
1578 yield from vdur
.vdu_opdata_register()
1580 self
._vdus
.append(vdur
)
1583 def instantiate_vdus(self
, xact
, vnfr
):
1584 """ Instantiate the VDUs associated with this VNF """
1585 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1587 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1589 # Identify any dependencies among the VDUs
1590 dependencies
= collections
.defaultdict(list)
1591 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1593 for vdu
in self
._vdus
:
1594 if vdu
.vdud_cloud_init
is not None:
1595 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1596 if vdu_id
!= vdu
.vdu_id
:
1597 # This means that vdu.vdu_id depends upon vdu_id,
1598 # i.e. vdu_id must be instantiated before
1600 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1602 # Define the terminal states of VDU instantiation
1604 VDURecordState
.READY
,
1605 VDURecordState
.TERMINATED
,
1606 VDURecordState
.FAILED
,
1609 datastore
= VdurDatastore()
1613 def instantiate_monitor(vdu
):
1614 """Monitor the state of the VDU during instantiation
1617 vdu - a VirtualDeploymentUnitRecord
1620 # wait for the VDUR to enter a terminal state
1621 while vdu
._state
not in terminal
:
1622 yield from asyncio
.sleep(1, loop
=self
._loop
)
1624 # update the datastore
1625 datastore
.update(vdu
)
1627 # add the VDU to the set of processed VDUs
1628 processed
.add(vdu
.vdu_id
)
1631 def instantiate(vdu
):
1632 """Instantiate the specified VDU
1635 vdu - a VirtualDeploymentUnitRecord
1638 if the VDU, or any of the VDUs this VDU depends upon, are
1639 terminated or fail to instantiate properly, a
1640 VirtualDeploymentUnitRecordError is raised.
1643 for dependency
in dependencies
[vdu
.vdu_id
]:
1644 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1646 while dependency
.vdu_id
not in processed
:
1647 yield from asyncio
.sleep(1, loop
=self
._loop
)
1649 if not dependency
.active
:
1650 raise VirtualDeploymentUnitRecordError()
1652 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1654 # Populate the datastore with the current values of the VDU
1657 # Substitute any variables contained in the cloud config script
1658 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1660 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1663 # Extract the variable names
1665 for variable
in parts
[1::2]:
1666 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1668 # Iterate of the variables and substitute values from the
1670 for variable
in variables
:
1672 # Handle a reference to a VDU by ID
1673 if variable
.startswith('vdu['):
1674 value
= datastore
.get(variable
)
1676 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1677 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1679 config
= config
.replace("{{ %s }}" % variable
, value
)
1682 # Handle a reference to the current VDU
1683 if variable
.startswith('vdu'):
1684 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1685 config
= config
.replace("{{ %s }}" % variable
, value
)
1688 # Handle unrecognized variables
1689 msg
= 'unrecognized cloud-config variable: {}'
1690 raise ValueError(msg
.format(variable
))
1692 # Instantiate the VDU
1693 with self
._dts
.transaction() as xact
:
1694 self
._log
.debug("Instantiating vdu: %s", vdu
)
1695 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1696 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1697 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1700 # First create a set of tasks to monitor the state of the VDUs and
1701 # report when they have entered a terminal state
1702 for vdu
in self
._vdus
:
1703 self
._loop
.create_task(instantiate_monitor(vdu
))
1705 for vdu
in self
._vdus
:
1706 self
._loop
.create_task(instantiate(vdu
))
1708 def has_mgmt_interface(self
, vdu
):
1709 # ## TODO: Support additional mgmt_interface type options
1710 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1714 def vlr_xpath(self
, vlr_id
):
1717 "D,/vlr:vlr-catalog/"
1718 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1720 def ext_vlr_by_id(self
, vlr_id
):
1721 """ find ext vlr by id """
1722 return self
._ext
_vlrs
[vlr_id
]
1725 def publish_inventory(self
, xact
):
1726 """ Publish the inventory associated with this VNF """
1727 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1729 for component
in self
._rw
_vnfd
.component
:
1730 self
._log
.debug("Creating inventory component %s", component
)
1731 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1735 comp
= VcsComponent(dts
=self
._dts
,
1738 cluster_name
=self
._cluster
_name
,
1739 vcs_handler
=self
._vcs
_handler
,
1740 component
=component
,
1741 mangled_name
=mangled_name
,
1743 if comp
.name
in self
._inventory
:
1744 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1745 component
, self
._vnfd
_id
)
1747 self
._log
.debug("Adding component %s for vnrf %s",
1748 comp
.name
, self
._vnfr
_id
)
1749 self
._inventory
[comp
.name
] = comp
1750 yield from comp
.publish(xact
)
1752 def all_vdus_active(self
):
1753 """ Are all VDUS in this VNFR active? """
1754 for vdu
in self
._vdus
:
1758 self
._log
.debug("Inside all_vdus_active. Returning True")
1762 def instantiation_failed(self
, failed_reason
=None):
1763 """ VNFR instantiation failed """
1764 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1765 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1766 self
._state
_failed
_reason
= failed_reason
1768 # Update the VNFR with the changed status
1769 yield from self
.publish(None)
1773 """ This VNF is ready"""
1774 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1776 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1777 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1780 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1782 # Update the VNFR with the changed status
1783 yield from self
.publish(None)
1785 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1786 """Updated the connection point with ip address"""
1787 for cp
in self
._cprs
:
1788 if cp
.name
== cp_name
:
1789 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1790 cp_name
, cp
, ip_address
, cp_id
)
1791 cp
.ip_address
= ip_address
1792 cp
.mac_address
= mac_addr
1793 cp
.connection_point_id
= cp_id
1796 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1797 self
._log
.debug(err
)
1798 raise VirtualDeploymentUnitRecordError(err
)
1800 def set_state(self
, state
):
1801 """ Set state for this VNFR"""
1805 def instantiate(self
, xact
, restart_mode
=False):
1806 """ instantiate this VNF """
1807 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1808 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1813 # Iterate over all the connection points in VNFR and fetch the
1816 def cpr_from_cp(cp
):
1817 """ Creates a record level connection point from the desciptor cp"""
1818 cp_fields
= ["name", "image", "vm-flavor", "port_security_enabled"]
1819 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1821 cpr_dict
.update(cp_copy_dict
)
1822 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1824 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1825 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1827 for cp
in self
._vnfr
.connection_point
:
1828 cpr
= cpr_from_cp(cp
)
1829 self
._cprs
.append(cpr
)
1830 self
._log
.debug("Adding Connection point record %s ", cp
)
1832 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1833 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1834 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1835 rwdts
.XactFlag
.MERGE
)
1839 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1840 cpr
.vlr_ref
= cp
.vlr_ref
1841 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1843 # Increase the VNFD reference count
1848 # Fetch External VLRs
1849 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1850 yield from fetch_vlrs()
1853 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1854 yield from self
.publish_inventory(xact
)
1857 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1858 yield from self
.create_vls()
1861 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1862 yield from self
.publish(xact
)
1865 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1867 yield from self
.instantiate_vls(xact
, restart_mode
)
1868 except Exception as e
:
1869 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1870 yield from self
.instantiation_failed(str(e
))
1873 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1876 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1877 yield from self
.create_vdus(self
, restart_mode
)
1880 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1881 yield from self
.publish(xact
)
1884 # ToDo: Check if this should be prevented during restart
1885 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1886 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1889 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1890 yield from self
.publish(xact
)
1892 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1894 # create task updating uptime for this vnfr
1895 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1896 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1899 def terminate(self
, xact
):
1900 """ Terminate this virtual network function """
1902 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1904 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1907 if self
._vnf
_mon
is not None:
1908 self
._vnf
_mon
.stop()
1909 self
._vnf
_mon
.deregister()
1910 self
._vnf
_mon
= None
1913 def terminate_vls():
1914 """ Terminate VLs in this VNF """
1915 for vl
in self
._vlrs
:
1916 yield from vl
.terminate(xact
)
1919 def terminate_vdus():
1920 """ Terminate VDUS in this VNF """
1921 for vdu
in self
._vdus
:
1922 yield from vdu
.terminate(xact
)
1924 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1925 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1926 yield from terminate_vls()
1928 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1929 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1930 yield from terminate_vdus()
1932 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1933 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1936 def vnfr_uptime_update(self
, xact
):
1938 # Return when vnfr state is FAILED or TERMINATED etc
1939 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1940 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1941 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1942 VirtualNetworkFunctionRecordState
.READY
]:
1944 yield from self
.publish(xact
)
1945 yield from asyncio
.sleep(2, loop
=self
._loop
)
1949 class VnfdDtsHandler(object):
1950 """ DTS handler for VNFD config changes """
1951 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1953 def __init__(self
, dts
, log
, loop
, vnfm
):
1962 """ DTS registration handle """
1967 """ Register for VNFD configuration"""
1969 def on_apply(dts
, acg
, xact
, action
, scratch
):
1970 """Apply the configuration"""
1971 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1972 xact
, action
, scratch
)
1974 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1977 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1978 """ on prepare callback """
1979 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1980 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1981 fref
= ProtobufC
.FieldReference
.alloc()
1982 fref
.goto_whole_message(msg
.to_pbcm())
1984 # Handle deletes in prepare_callback
1985 if fref
.is_field_deleted():
1986 # Delete an VNFD record
1987 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1988 if self
._vnfm
.vnfd_in_use(msg
.id):
1989 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1990 err
= "Cannot delete a VNFD in use - %s" % msg
1991 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1992 # Delete a VNFD record
1993 yield from self
._vnfm
.delete_vnfd(msg
.id)
1995 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1998 "Registering for VNFD config using xpath: %s",
1999 VnfdDtsHandler
.XPATH
,
2001 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2002 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2003 self
._regh
= acg
.register(
2004 xpath
=VnfdDtsHandler
.XPATH
,
2005 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2006 on_prepare
=on_prepare
)
2009 class VcsComponentDtsHandler(object):
2010 """ Vcs Component DTS handler """
2011 XPATH
= ("D,/rw-manifest:manifest" +
2012 "/rw-manifest:operational-inventory" +
2013 "/rw-manifest:component")
2015 def __init__(self
, dts
, log
, loop
, vnfm
):
2024 """ DTS registration handle """
2029 """ Registers VCS component dts publisher registration"""
2030 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
2031 VcsComponentDtsHandler
.XPATH
)
2033 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
2034 handlers
= rift
.tasklets
.Group
.Handler()
2035 with self
._dts
.group_create(handler
=handlers
) as group
:
2036 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
2038 flags
=(rwdts
.Flag
.PUBLISHER |
2039 rwdts
.Flag
.NO_PREP_READ |
2040 rwdts
.Flag
.DATASTORE
),)
2043 def publish(self
, xact
, path
, msg
):
2044 """ Publishes the VCS component """
2045 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
2047 self
.regh
.create_element(path
, msg
)
2048 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2049 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
2051 class VnfrConsoleOperdataDtsHandler(object):
2052 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2054 def vnfr_vdu_console_xpath(self
):
2055 """ path for resource-mgr"""
2056 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
2058 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2065 self
._vnfr
_id
= vnfr_id
2066 self
._vdur
_id
= vdur_id
2067 self
._vdu
_id
= vdu_id
2071 """ Register for VNFR VDU Operational Data read from dts """
2074 def on_prepare(xact_info
, action
, ks_path
, msg
):
2075 """ prepare callback from dts """
2076 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2078 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2079 xact_info
, action
, xpath
, msg
2082 if action
== rwdts
.QueryAction
.READ
:
2083 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2084 path_entry
= schema
.keyspec_to_entry(ks_path
)
2085 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2087 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2088 except VnfRecordError
as e
:
2089 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2090 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2093 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2094 if not vdur
._state
== VDURecordState
.READY
:
2095 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2096 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2098 with self
._dts
.transaction() as new_xact
:
2099 resp
= yield from vdur
.read_resource(new_xact
)
2100 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2101 vdur_console
.id = self
._vdur
_id
2102 if resp
.console_url
:
2103 vdur_console
.console_url
= resp
.console_url
2105 vdur_console
.console_url
= 'none'
2106 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2108 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2109 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2110 vdur_console
.id = self
._vdur
_id
2111 vdur_console
.console_url
= 'none'
2113 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2114 xpath
=self
.vnfr_vdu_console_xpath
,
2117 #raise VnfRecordError("Not supported operation %s" % action)
2118 self
._log
.error("Not supported operation %s" % action
)
2119 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2123 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2124 self
.vnfr_vdu_console_xpath
)
2125 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2126 with self
._dts
.group_create() as group
:
2127 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2129 flags
=rwdts
.Flag
.PUBLISHER
,
2133 class VnfrDtsHandler(object):
2134 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2135 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2137 def __init__(self
, dts
, log
, loop
, vnfm
):
2147 """ Return registration handle"""
2152 """ Return VNF manager instance """
2157 """ Register for vnfr create/update/delete/read requests from dts """
2158 def on_commit(xact_info
):
2159 """ The transaction has been committed """
2160 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2161 return rwdts
.MemberRspCode
.ACTION_OK
2163 def on_abort(*args
):
2164 """ Abort callback """
2165 self
._log
.debug("VNF transaction got aborted")
2168 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2171 def instantiate_realloc_vnfr(vnfr
):
2172 """Re-populate the vnfm after restart
2179 yield from vnfr
.instantiate(None, restart_mode
=True)
2181 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2182 curr_cfg
= self
.regh
.elements
2183 for cfg
in curr_cfg
:
2184 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2185 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2187 self
._log
.debug("Got on_event in vnfm")
2189 return rwdts
.MemberRspCode
.ACTION_OK
2192 def on_prepare(xact_info
, action
, ks_path
, msg
):
2193 """ prepare callback from dts """
2195 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2196 xact_info
, action
, msg
2199 if action
== rwdts
.QueryAction
.CREATE
:
2200 if not msg
.has_field("vnfd"):
2201 err
= "Vnfd not provided"
2202 self
._log
.error(err
)
2203 raise VnfRecordError(err
)
2205 vnfr
= self
.vnfm
.create_vnfr(msg
)
2207 # RIFT-9105: Unable to add a READ query under an existing transaction
2208 # xact = xact_info.xact
2209 yield from vnfr
.instantiate(None)
2210 except Exception as e
:
2211 self
._log
.exception(e
)
2212 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2213 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2214 yield from vnfr
.publish(None)
2215 elif action
== rwdts
.QueryAction
.DELETE
:
2216 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2217 path_entry
= schema
.keyspec_to_entry(ks_path
)
2218 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2221 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2222 raise VirtualNetworkFunctionRecordNotFound(
2223 "VNFR id %s", path_entry
.key00
.id)
2226 yield from vnfr
.terminate(xact_info
.xact
)
2229 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2230 except Exception as e
:
2231 self
._log
.exception(e
)
2232 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2234 elif action
== rwdts
.QueryAction
.UPDATE
:
2235 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2236 path_entry
= schema
.keyspec_to_entry(ks_path
)
2239 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2240 except Exception as e
:
2241 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2242 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2246 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2247 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2250 self
._log
.debug("VNFR {} update config status {} (current {})".
2251 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2252 # Update the config status and publish
2253 vnfr
._config
_status
= msg
.config_status
2254 yield from vnfr
.publish(None)
2257 raise NotImplementedError(
2258 "%s action on VirtualNetworkFunctionRecord not supported",
2261 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2263 self
._log
.debug("Registering for VNFR using xpath: %s",
2264 VnfrDtsHandler
.XPATH
,)
2266 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2267 on_prepare
=on_prepare
,)
2268 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2269 with self
._dts
.group_create(handler
=handlers
) as group
:
2270 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2272 flags
=(rwdts
.Flag
.PUBLISHER |
2273 rwdts
.Flag
.NO_PREP_READ |
2275 rwdts
.Flag
.DATASTORE
),)
2278 def create(self
, xact
, path
, msg
):
2280 Create a VNFR record in DTS with path and message
2282 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2285 self
.regh
.create_element(path
, msg
)
2286 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2290 def update(self
, xact
, path
, msg
):
2292 Update a VNFR record in DTS with path and message
2294 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2296 self
.regh
.update_element(path
, msg
)
2297 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2301 def delete(self
, xact
, path
):
2303 Delete a VNFR record in DTS with path and message
2305 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2306 self
.regh
.delete_element(path
)
2307 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2310 class VnfdRefCountDtsHandler(object):
2311 """ The VNFD Ref Count DTS handler """
2312 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2314 def __init__(self
, dts
, log
, loop
, vnfm
):
2324 """ Return registration handle """
2329 """ Return the NS manager instance """
2334 """ Register for VNFD ref count read from dts """
2337 def on_prepare(xact_info
, action
, ks_path
, msg
):
2338 """ prepare callback from dts """
2339 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2341 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2342 xact_info
, action
, xpath
, msg
2345 if action
== rwdts
.QueryAction
.READ
:
2346 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2347 path_entry
= schema
.keyspec_to_entry(ks_path
)
2348 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2349 for xpath
, msg
in vnfd_list
:
2350 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2352 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2355 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2357 raise VnfRecordError("Not supported operation %s" % action
)
2359 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2360 with self
._dts
.group_create() as group
:
2361 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2363 flags
=rwdts
.Flag
.PUBLISHER
,
2367 class VdurDatastore(object):
2369 This VdurDatastore is intended to expose select information about a VDUR
2370 such that it can be referenced in a cloud config file. The data that is
2371 exposed does not necessarily follow the structure of the data in the yang
2372 model. This is intentional. The data that are exposed are intended to be
2373 agnostic of the yang model so that changes in the model do not necessarily
2374 require changes to the interface provided to the user. It also means that
2375 the user does not need to be familiar with the RIFT.ware yang models.
2379 """Create an instance of VdurDatastore"""
2380 self
._vdur
_data
= dict()
2381 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2383 def add(self
, vdur
):
2384 """Add a new VDUR to the datastore
2387 vdur - a VirtualDeploymentUnitRecord instance
2390 A ValueError is raised if the VDUR is (1) None or (2) already in
2394 if vdur
.vdu_id
is None:
2395 raise ValueError('VDURs are required to have an ID')
2397 if vdur
.vdu_id
in self
._vdur
_data
:
2398 raise ValueError('cannot add a VDUR more than once')
2400 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2402 def set_if_not_none(key
, attr
):
2403 if attr
is not None:
2404 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2406 set_if_not_none('name', vdur
._vdud
.name
)
2407 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2409 def update(self
, vdur
):
2410 """Update the VDUR information in the datastore
2413 vdur - a GI representation of a VDUR
2416 A ValueError is raised if the VDUR is (1) None or (2) already in
2420 if vdur
.vdu_id
is None:
2421 raise ValueError('VNFDs are required to have an ID')
2423 if vdur
.vdu_id
not in self
._vdur
_data
:
2424 raise ValueError('VNF is not recognized')
2426 def set_or_delete(key
, attr
):
2428 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2429 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2432 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2434 set_or_delete('name', vdur
._vdud
.name
)
2435 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2437 def remove(self
, vdur_id
):
2438 """Remove all of the data associated with specified VDUR
2441 vdur_id - the identifier of a VNFD in the datastore
2444 A ValueError is raised if the VDUR is not contained in the
2448 if vdur_id
not in self
._vdur
_data
:
2449 raise ValueError('VNF is not recognized')
2451 del self
._vdur
_data
[vdur_id
]
2453 def get(self
, expr
):
2454 """Retrieve VDUR information from the datastore
2456 An expression should be of the form,
2460 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2461 the exposed attribute that the user wishes to retrieve.
2463 If the requested data is not available, None is returned.
2466 expr - a string that specifies the data to return
2469 A ValueError is raised if the provided expression cannot be parsed.
2472 The requested data or None
2475 result
= self
._pattern
.match(expr
)
2477 raise ValueError('data expression not recognized ({})'.format(expr
))
2479 vdur_id
, key
= result
.groups()
2481 if vdur_id
not in self
._vdur
_data
:
2484 return self
._vdur
_data
[vdur_id
].get(key
, None)
2487 class VnfManager(object):
2488 """ The virtual network function manager class """
2489 def __init__(self
, dts
, log
, loop
, cluster_name
):
2493 self
._cluster
_name
= cluster_name
2495 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2496 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2497 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2498 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2500 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2503 self
._vnfr
_ref
_handler
,
2506 self
._vnfds
_to
_vnfr
= {}
2510 def vnfr_handler(self
):
2511 """ VNFR dts handler """
2512 return self
._vnfr
_handler
2515 def vcs_handler(self
):
2516 """ VCS dts handler """
2517 return self
._vcs
_handler
2521 """ Register all static DTS handlers """
2522 for hdl
in self
._dts
_handlers
:
2523 yield from hdl
.register()
2527 """ Run this VNFM instance """
2528 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2529 yield from self.register()
2531 def handle_nsr(self, nsr, action):
2532 if action in [rwdts.QueryAction.CREATE]:
2533 self._nsrs[nsr.id] = nsr
2534 elif action == rwdts.QueryAction.DELETE:
2535 if nsr.id in self._nsrs:
2536 del self._nsrs[nsr.id]
2538 def get_linked_mgmt_network(self, vnfr):
2539 """For the given VNFR get the related mgmt network from the NSD, if
2542 vnfd_id = vnfr.vnfd.id
2543 nsr_id = vnfr.nsr_id_ref
2545 # for the given related VNFR, get the corresponding NSR-config
2548 nsr_obj = self._nsrs[nsr_id]
2550 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2552 # for the related NSD check if a VLD exists such that it's a mgmt
2554 for vld in nsr_obj.nsd.vld:
2555 if vld.mgmt_network:
2560 def get_vnfr(self, vnfr_id):
2561 """ get VNFR by vnfr id """
2563 if vnfr_id not in self._vnfrs:
2564 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2566 return self._vnfrs[vnfr_id]
2568 def create_vnfr(self, vnfr):
2569 """ Create a VNFR instance """
2570 if vnfr.id in self._vnfrs:
2571 msg = "Vnfr
id %s already exists
" % vnfr.id
2572 self._log.error(msg)
2573 raise VnfRecordError(msg)
2575 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2579 mgmt_network = self.get_linked_mgmt_network(vnfr)
2581 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2582 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2583 mgmt_network=mgmt_network
2587 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2588 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2590 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2592 return self._vnfrs[vnfr.id]
2595 def delete_vnfr(self, xact, vnfr):
2596 """ Create a VNFR instance """
2597 if vnfr.vnfr_id in self._vnfrs:
2598 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2599 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2601 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2602 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2603 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2605 del self._vnfrs[vnfr.vnfr_id]
2608 def fetch_vnfd(self, vnfd_id):
2609 """ Fetch VNFDs based with the vnfd id"""
2610 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2611 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2614 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2616 for ent in res_iter:
2617 res = yield from ent
2621 err = "Failed to get Vnfd
%s" % vnfd_id
2622 self._log.error(err)
2623 raise VnfRecordError(err)
2625 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2629 def vnfd_in_use(self, vnfd_id):
2630 """ Is this VNFD in use """
2631 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2632 if vnfd_id in self._vnfds_to_vnfr:
2633 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2637 def publish_vnfr(self, xact, path, msg):
2638 """ Publish a VNFR """
2639 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2641 yield from self.vnfr_handler.update(xact, path, msg)
2644 def delete_vnfd(self, vnfd_id):
2645 """ Delete the Virtual Network Function descriptor with the passed id """
2646 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2647 if vnfd_id in self._vnfds_to_vnfr:
2648 if self._vnfds_to_vnfr[vnfd_id]:
2649 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2651 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2652 raise VirtualNetworkFunctionDescriptorRefCountExists(
2653 "Cannot delete
:%s, ref_count
:%s",
2655 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2657 del self._vnfds_to_vnfr[vnfd_id]
2659 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2661 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2662 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2663 if os.path.exists(vnfd_dir):
2664 shutil.rmtree(vnfd_dir, ignore_errors=True)
2665 except Exception as e:
2666 self._log.error("Exception in cleaning up VNFD
{}: {}".
2667 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2668 self._log.exception(e)
2671 def vnfd_refcount_xpath(self, vnfd_id):
2672 """ xpath for ref count entry """
2673 return (VnfdRefCountDtsHandler.XPATH +
2674 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2677 def get_vnfd_refcount(self, vnfd_id):
2678 """ Get the vnfd_list from this VNFM"""
2680 if vnfd_id is None or vnfd_id == "":
2681 for vnfd in self._vnfds_to_vnfr.keys():
2682 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2683 vnfd_msg.vnfd_id_ref = vnfd
2684 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2685 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2686 elif vnfd_id in self._vnfds_to_vnfr:
2687 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2688 vnfd_msg.vnfd_id_ref = vnfd_id
2689 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2690 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2695 class VnfmTasklet(rift.tasklets.Tasklet):
2696 """ VNF Manager tasklet class """
2697 def __init__(self, *args, **kwargs):
2698 super(VnfmTasklet, self).__init__(*args, **kwargs)
2699 self.rwlog.set_category("rw
-mano
-log
")
2700 self.rwlog.set_subcategory("vnfm
")
2707 super(VnfmTasklet, self).start()
2708 self.log.info("Starting VnfmTasklet
")
2710 self.log.setLevel(logging.DEBUG)
2712 self.log.debug("Registering with dts
")
2713 self._dts = rift.tasklets.DTS(self.tasklet_info,
2714 RwVnfmYang.get_schema(),
2716 self.on_dts_state_change)
2718 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2720 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2723 def on_instance_started(self):
2724 """ Task insance started callback """
2725 self.log.debug("Got instance started callback
")
2731 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2736 """ Task init callback """
2738 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2739 assert vm_parent_name is not None
2740 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2741 yield from self._vnfm.run()
2743 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2748 """ Task run callback """
2752 def on_dts_state_change(self, state):
2753 """Take action according to current dts state to transition
2754 application into the corresponding application state
2757 state - current dts state
2760 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2761 rwdts.State.CONFIG: rwdts.State.RUN,
2765 rwdts.State.INIT: self.init,
2766 rwdts.State.RUN: self.run,
2769 # Transition application to next state
2770 handler = handlers.get(state, None)
2771 if handler is not None:
2772 yield from handler()
2774 # Transition dts to next state
2775 next_state = switch.get(state, None)
2776 if next_state is not None:
2777 self._dts.handle.set_state(next_state)