2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.package
.script
53 import rift
.mano
.dts
as mano_dts
56 class VMResourceError(Exception):
57 """ VM resource Error"""
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
71 class NotImplemented(Exception):
72 """Not implemented """
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
136 class VNFMPlacementGroupError(Exception):
139 class VirtualNetworkFunctionRecordState(enum
.Enum
):
146 VL_TERMINATE_PHASE
= 6
147 VDU_TERMINATE_PHASE
= 7
152 class VDURecordState(enum
.Enum
):
153 """VDU record state """
156 RESOURCE_ALLOC_PENDING
= 3
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
169 self
._component
= component
170 self
._cluster
_name
= cluster_name
171 self
._vcs
_handler
= vcs_handler
172 self
._mangled
_name
= mangled_name
175 def mangle_name(component_name
, vnf_name
, vnfd_id
):
176 """ mangled component name """
177 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
181 """ name of this component"""
182 return self
._mangled
_name
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self
.name
)
193 def instance_xpath(self
):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
198 "[instance-name = '{}']".format(self
._cluster
_name
))
201 def start_comp_xpath(self
):
202 """ start component xpath """
203 return (self
.instance_xpath
+
204 "/child-n[instance-name = 'START-REQ']")
206 def get_start_comp_msg(self
, ip_address
):
207 """ start this component """
208 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
209 start_msg
.instance_name
= 'START-REQ'
210 start_msg
.component_name
= self
.name
211 start_msg
.admin_command
= "START"
212 start_msg
.ip_address
= ip_address
218 """ Returns the message for this vcs component"""
220 vcs_comp_dict
= self
._component
.as_dict()
222 def mangle_comp_names(comp_dict
):
223 """ mangle component name with VNF name, id"""
224 for key
, val
in comp_dict
.items():
225 if isinstance(val
, dict):
226 comp_dict
[key
] = mangle_comp_names(val
)
227 elif isinstance(val
, list):
230 if isinstance(ent
, dict):
231 val
[i
] = mangle_comp_names(ent
)
235 elif key
== "component_name":
236 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
241 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
242 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
246 def publish(self
, xact
):
247 """ Publishes the VCS component """
248 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self
.name
, self
.path
, self
.msg
)
250 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
253 def start(self
, xact
, parent
, ip_addr
=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg
= self
.get_start_comp_msg(ip_addr
)
257 self
._log
.debug("starting component %s %s",
258 self
.start_comp_xpath
, start_msg
)
259 yield from self
._dts
.query_create(self
.start_comp_xpath
,
262 self
._log
.debug("started component %s, %s",
263 self
.start_comp_xpath
, start_msg
)
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
279 placement_groups
=[]):
285 self
._mgmt
_intf
= mgmt_intf
286 self
._cloud
_account
_name
= cloud_account_name
287 self
._vnfd
_package
_store
= vnfd_package_store
288 self
._mgmt
_network
= mgmt_network
290 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
293 self
._state
= VDURecordState
.INIT
294 self
._state
_failed
_reason
= None
295 self
._request
_id
= str(uuid
.uuid4())
296 self
._name
= vnfr
.name
+ "__" + vdud
.id
297 self
._placement
_groups
= placement_groups
300 self
._vdud
_cloud
_init
= None
301 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
304 def vdu_opdata_register(self
):
305 yield from self
._vdur
_console
_handler
.register()
307 def cp_ip_addr(self
, cp_name
):
308 """ Find ip address by connection point name """
309 if self
._vm
_resp
is not None:
310 for conn_point
in self
._vm
_resp
.connection_points
:
311 if conn_point
.name
== cp_name
:
312 return conn_point
.ip_address
315 def cp_mac_addr(self
, cp_name
):
316 """ Find mac address by connection point name """
317 if self
._vm
_resp
is not None:
318 for conn_point
in self
._vm
_resp
.connection_points
:
319 if conn_point
.name
== cp_name
:
320 return conn_point
.mac_addr
321 return "00:00:00:00:00:00"
323 def cp_id(self
, cp_name
):
324 """ Find connection point id by connection point name """
325 if self
._vm
_resp
is not None:
326 for conn_point
in self
._vm
_resp
.connection_points
:
327 if conn_point
.name
== cp_name
:
328 return conn_point
.connection_point_id
341 """ Return this VDUR's name """
345 def cloud_account_name(self
):
346 """ Cloud account this VDU should be created in """
347 return self
._cloud
_account
_name
350 def image_name(self
):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self
._vdud
:
354 return os
.path
.basename(self
._vdud
.image
)
357 def image_checksum(self
):
358 """ name that should be used to lookup the image on the CMP """
359 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
362 def management_ip(self
):
365 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
368 def vm_management_ip(self
):
371 return self
._vm
_resp
.management_ip
374 def operational_status(self
):
375 """ Operational status of this VDU"""
376 op_stats_dict
= {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
384 return op_stats_dict
[self
._state
.name
]
388 """ Process VDU message from resmgr"""
389 vdu_fields
= ["vm_flavor",
396 vdu_copy_dict
= {k
: v
for k
, v
in
397 self
._vdud
.as_dict().items() if k
in vdu_fields
}
398 vdur_dict
= {"id": self
._vdur
_id
,
399 "vdu_id_ref": self
._vdud
.id,
400 "operational_status": self
.operational_status
,
401 "operational_status_details": self
._state
_failed
_reason
,
403 if self
.vm_resp
is not None:
404 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
405 "flavor_id": self
.vm_resp
.flavor_id
407 if self
._vm
_resp
.has_field('image_id'):
408 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
410 if self
.management_ip
is not None:
411 vdur_dict
["management_ip"] = self
.management_ip
413 if self
.vm_management_ip
is not None:
414 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
416 vdur_dict
.update(vdu_copy_dict
)
418 if self
.vm_resp
is not None:
419 if self
._vm
_resp
.has_field('volumes'):
420 for opvolume
in self
._vm
_resp
.volumes
:
421 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
422 if len(vdurvol_data
) == 1:
423 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
424 if opvolume
.has_field('custom_meta_data'):
425 metadata_list
= list()
426 for metadata_item
in opvolume
.custom_meta_data
:
427 metadata_list
.append(metadata_item
.as_dict())
428 if 'guest_params' not in vdurvol_data
[0]:
429 vdurvol_data
[0]['guest_params'] = dict()
430 vdurvol_data
[0]['guest_params']['custom_meta_data'] = metadata_list
432 if self
._vm
_resp
.has_field('custom_boot_data'):
433 vdur_dict
['custom_boot_data'] = dict()
434 if self
._vm
_resp
.custom_boot_data
.has_field('custom_drive'):
435 vdur_dict
['custom_boot_data']['custom_drive'] = self
._vm
_resp
.custom_boot_data
.custom_drive
436 if self
._vm
_resp
.custom_boot_data
.has_field('custom_meta_data'):
437 metadata_list
= list()
438 for metadata_item
in self
._vm
_resp
.custom_boot_data
.custom_meta_data
:
439 metadata_list
.append(metadata_item
.as_dict())
440 vdur_dict
['custom_boot_data']['custom_meta_data'] = metadata_list
441 if self
._vm
_resp
.custom_boot_data
.has_field('custom_config_files'):
443 for file_item
in self
._vm
_resp
.custom_boot_data
.custom_config_files
:
444 file_list
.append(file_item
.as_dict())
445 vdur_dict
['custom_boot_data']['custom_config_files'] = file_list
450 for intf
, cp_id
, vlr
in self
._int
_intf
:
451 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
453 icp_list
.append({"name": cp
.name
,
455 "type_yang": "VPORT",
456 "ip_address": self
.cp_ip_addr(cp
.id),
457 "mac_address": self
.cp_mac_addr(cp
.id)})
459 ii_list
.append({"name": intf
.name
,
460 "vdur_internal_connection_point_ref": cp
.id,
461 "virtual_interface": {}})
463 vdur_dict
["internal_connection_point"] = icp_list
464 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
465 vdur_dict
["internal_interface"] = ii_list
468 for intf
, cp
, vlr
in self
._ext
_intf
:
469 ei_list
.append({"name": cp
,
470 "vnfd_connection_point_ref": cp
,
471 "virtual_interface": {}})
472 self
._vnfr
.update_cp(cp
,
474 self
.cp_mac_addr(cp
),
477 vdur_dict
["external_interface"] = ei_list
479 placement_groups
= []
480 for group
in self
._placement
_groups
:
481 placement_groups
.append(group
.as_dict())
482 vdur_dict
['placement_groups_info'] = placement_groups
484 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
487 def resmgr_path(self
):
488 """ path for resource-mgr"""
489 return ("D,/rw-resource-mgr:resource-mgmt" +
491 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
494 def vm_flavor_msg(self
):
495 """ VM flavor message """
496 flavor
= self
._vdud
.vm_flavor
.__class
__()
497 flavor
.copy_from(self
._vdud
.vm_flavor
)
502 def vdud_cloud_init(self
):
503 """ Return the cloud-init contents for the VDU """
504 if self
._vdud
_cloud
_init
is None:
505 self
._vdud
_cloud
_init
= self
.cloud_init()
507 return self
._vdud
_cloud
_init
509 def cloud_init(self
):
510 """ Populate cloud_init with cloud-config script from
511 either the inline contents or from the file provided
513 if self
._vdud
.cloud_init
is not None:
514 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
515 return self
._vdud
.cloud_init
516 elif self
._vdud
.cloud_init_file
is not None:
517 # Get cloud-init script contents from the file provided in the cloud_init_file param
518 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
519 filename
= self
._vdud
.cloud_init_file
520 self
._vnfd
_package
_store
.refresh()
521 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
522 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
524 return cloud_init_extractor
.read_script(stored_package
, filename
)
525 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
526 raise VirtualDeploymentUnitRecordError(e
)
528 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
530 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
532 availability_zones
= []
534 for group
in self
._placement
_groups
:
535 if group
.has_field('host_aggregate'):
536 for aggregate
in group
.host_aggregate
:
537 host_aggregates
.append(aggregate
.as_dict())
538 if group
.has_field('availability_zone'):
539 availability_zones
.append(group
.availability_zone
.as_dict())
540 if group
.has_field('server_group'):
541 server_groups
.append(group
.server_group
.as_dict())
543 if availability_zones
:
544 if len(availability_zones
) > 1:
545 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
546 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
548 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
551 if len(server_groups
) > 1:
552 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
553 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
555 vm_create_msg_dict
['server_group'] = server_groups
[0]
558 vm_create_msg_dict
['host_aggregate'] = host_aggregates
562 def process_placement_groups(self
, vm_create_msg_dict
):
563 """Process the placement_groups and fill resource-mgr request"""
564 if not self
._placement
_groups
:
567 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
568 assert len(cloud_set
) == 1
569 cloud_type
= cloud_set
.pop()
571 if cloud_type
== 'openstack':
572 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
575 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
578 def process_custom_bootdata(self
, vm_create_msg_dict
):
579 """Process the custom boot data"""
580 if 'custom_config_files' not in vm_create_msg_dict
['custom_boot_data']:
583 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
584 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
._log
)
585 for custom_file_item
in vm_create_msg_dict
['custom_boot_data']['custom_config_files']:
586 if 'source' not in custom_file_item
or 'dest' not in custom_file_item
:
588 source
= custom_file_item
['source']
589 # Find source file in scripts dir of VNFD
590 self
._vnfd
_package
_store
.refresh()
591 self
._log
.debug("Checking for source config file at %s", source
)
593 source_file_str
= script_extractor
.read_script(stored_package
, source
)
594 except rift
.package
.script
.ScriptExtractionError
as e
:
595 raise VirtualDeploymentUnitRecordError(e
)
596 # Update source file location with file contents
597 custom_file_item
['source'] = source_file_str
601 def resmgr_msg(self
, config
=None):
602 vdu_fields
= ["vm_flavor",
610 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
611 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
613 vm_create_msg_dict
= {
617 if self
.image_name
is not None:
618 vm_create_msg_dict
["image_name"] = self
.image_name
620 if self
.image_checksum
is not None:
621 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
623 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
624 if self
._vdud
.has_field('mgmt_vpci'):
625 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
627 self
._log
.debug("VDUD: %s", self
._vdud
)
628 if config
is not None:
629 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
631 if self
._mgmt
_network
:
632 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
635 for intf
, cp
, vlr
in self
._ext
_intf
:
636 cp_info
= {"name": cp
,
637 "virtual_link_id": vlr
.network_id
,
638 "type_yang": intf
.virtual_interface
.type_yang
}
640 if (intf
.virtual_interface
.has_field('vpci') and
641 intf
.virtual_interface
.vpci
is not None):
642 cp_info
["vpci"] = intf
.virtual_interface
.vpci
644 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
645 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
647 cp_list
.append(cp_info
)
649 for intf
, cp
, vlr
in self
._int
_intf
:
650 if (intf
.virtual_interface
.has_field('vpci') and
651 intf
.virtual_interface
.vpci
is not None):
652 cp_list
.append({"name": cp
,
653 "virtual_link_id": vlr
.network_id
,
654 "type_yang": intf
.virtual_interface
.type_yang
,
655 "vpci": intf
.virtual_interface
.vpci
})
657 cp_list
.append({"name": cp
,
658 "virtual_link_id": vlr
.network_id
,
659 "type_yang": intf
.virtual_interface
.type_yang
})
661 vm_create_msg_dict
["connection_points"] = cp_list
662 vm_create_msg_dict
.update(vdu_copy_dict
)
664 self
.process_placement_groups(vm_create_msg_dict
)
665 if 'custom_boot_data' in vm_create_msg_dict
:
666 self
.process_custom_bootdata(vm_create_msg_dict
)
668 msg
= RwResourceMgrYang
.VDUEventData()
669 msg
.event_id
= self
._request
_id
670 msg
.cloud_account
= self
.cloud_account_name
671 msg
.request_info
.from_dict(vm_create_msg_dict
)
676 def terminate(self
, xact
):
677 """ Delete resource in VIM """
678 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
679 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
682 self
._state
= VDURecordState
.TERMINATING
683 if self
._vm
_resp
is not None:
685 with self
._dts
.transaction() as new_xact
:
686 yield from self
.delete_resource(new_xact
)
688 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
690 if self
._rm
_regh
is not None:
691 self
._log
.debug("Deregistering resource manager registration handle")
692 self
._rm
_regh
.deregister()
695 if self
._vdur
_console
_handler
is not None:
696 self
._log
.error("Deregistering vnfr vdur registration handle")
697 self
._vdur
_console
_handler
._regh
.deregister()
698 self
._vdur
_console
_handler
._regh
= None
700 self
._state
= VDURecordState
.TERMINATED
702 def find_internal_cp_by_cp_id(self
, cp_id
):
703 """ Find the CP corresponding to the connection point id"""
706 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
709 for int_cp
in self
._vdud
.internal_connection_point
:
710 self
._log
.debug("Checking for int cp %s in internal connection points",
712 if int_cp
.id == cp_id
:
717 self
._log
.debug("Failed to find cp %s in internal connection points",
719 msg
= "Failed to find cp %s in internal connection points" % cp_id
720 raise VduRecordError(msg
)
722 # return the VLR associated with the connection point
726 def create_resource(self
, xact
, vnfr
, config
=None):
727 """ Request resource from ResourceMgr """
728 def find_cp_by_name(cp_name
):
729 """ Find a connection point by name """
731 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
732 for ext_cp
in vnfr
._cprs
:
733 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
734 if ext_cp
.name
== cp_name
:
738 self
._log
.debug("Failed to find cp %s in external connection points",
742 def find_internal_vlr_by_cp_name(cp_name
):
743 """ Find the VLR corresponding to the connection point name"""
746 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
749 for int_cp
in self
._vdud
.internal_connection_point
:
750 self
._log
.debug("Checking for int cp %s in internal connection points",
752 if int_cp
.id == cp_name
:
757 self
._log
.debug("Failed to find cp %s in internal connection points",
759 msg
= "Failed to find cp %s in internal connection points" % cp_name
760 raise VduRecordError(msg
)
762 # return the VLR associated with the connection point
763 return vnfr
.find_vlr_by_cp(cp_name
)
765 block
= xact
.block_create()
767 self
._log
.debug("Executing vm request id: %s, action: create",
770 # Resolve the networks associated external interfaces
771 for ext_intf
in self
._vdud
.external_interface
:
772 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
773 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
774 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
776 self
._log
.debug("Failed to find connection point - %s",
777 ext_intf
.vnfd_connection_point_ref
)
779 self
._log
.debug("Connection point name [%s], type[%s]",
780 cp
.name
, cp
.type_yang
)
782 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
784 etuple
= (ext_intf
, cp
.name
, vlr
)
785 self
._ext
_intf
.append(etuple
)
787 self
._log
.debug("Created external interface tuple : %s", etuple
)
789 # Resolve the networks associated internal interfaces
790 for intf
in self
._vdud
.internal_interface
:
791 cp_id
= intf
.vdu_internal_connection_point_ref
792 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
796 vlr
= find_internal_vlr_by_cp_name(cp_id
)
797 except Exception as e
:
798 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
799 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
800 raise VduRecordError(msg
)
802 ituple
= (intf
, cp_id
, vlr
)
803 self
._int
_intf
.append(ituple
)
805 self
._log
.debug("Created internal interface tuple : %s", ituple
)
807 resmgr_path
= self
.resmgr_path
808 resmgr_msg
= self
.resmgr_msg(config
)
810 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
811 block
.add_query_create(resmgr_path
, resmgr_msg
)
813 res_iter
= yield from block
.execute(now
=True)
821 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
822 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
823 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
824 return resp
.resource_info
827 def delete_resource(self
, xact
):
828 block
= xact
.block_create()
830 self
._log
.debug("Executing vm request id: %s, action: delete",
833 block
.add_query_delete(self
.resmgr_path
)
835 yield from block
.execute(flags
=0, now
=True)
838 def read_resource(self
, xact
):
839 block
= xact
.block_create()
841 self
._log
.debug("Executing vm request id: %s, action: delete",
844 block
.add_query_read(self
.resmgr_path
)
846 res_iter
= yield from block
.execute(flags
=0, now
=True)
851 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
852 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
853 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
854 #self._vm_resp = resp.resource_info
855 return resp
.resource_info
859 def start_component(self
):
860 """ This VDUR is active """
861 self
._log
.debug("Starting component %s for vdud %s vdur %s",
862 self
._vdud
.vcs_component_ref
,
865 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
866 self
.vm_resp
.management_ip
)
870 """ Is this VDU active """
871 return True if self
._state
is VDURecordState
.READY
else False
874 def instantiation_failed(self
, failed_reason
=None):
875 """ VDU instantiation failed """
876 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
877 self
._state
= VDURecordState
.FAILED
878 self
._state
_failed
_reason
= failed_reason
879 yield from self
._vnfr
.instantiation_failed(failed_reason
)
882 def vdu_is_active(self
):
883 """ This VDU is active"""
885 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
888 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
890 if self
._vdud
.vcs_component_ref
is not None:
891 yield from self
.start_component()
893 self
._state
= VDURecordState
.READY
895 if self
._vnfr
.all_vdus_active():
896 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
897 yield from self
._vnfr
.is_ready()
900 def instantiate(self
, xact
, vnfr
, config
=None):
901 """ Instantiate this VDU """
902 self
._state
= VDURecordState
.INSTANTIATING
905 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
906 """ This VDUR is active """
907 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
912 if (query_action
== rwdts
.QueryAction
.UPDATE
or
913 query_action
== rwdts
.QueryAction
.CREATE
):
916 if msg
.resource_state
== "active":
917 # Move this VDU to ready state
918 yield from self
.vdu_is_active()
919 elif msg
.resource_state
== "failed":
920 yield from self
.instantiation_failed(msg
.resource_errors
)
921 elif query_action
== rwdts
.QueryAction
.DELETE
:
922 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
924 raise NotImplementedError(
925 "%s action on VirtualDeployementUnitRecord not supported",
928 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
931 reg_event
= asyncio
.Event(loop
=self
._loop
)
934 def on_ready(regh
, status
):
937 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
938 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
939 flags
=rwdts
.Flag
.SUBSCRIBER
,
941 yield from reg_event
.wait()
943 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
944 self
._vm
_resp
= vm_resp
946 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
947 self
._log
.debug("Requested VM from resource manager response %s",
949 if vm_resp
.resource_state
== "active":
950 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
952 yield from self
.vdu_is_active()
953 self
._state
= VDURecordState
.READY
954 elif (vm_resp
.resource_state
== "pending" or
955 vm_resp
.resource_state
== "inactive"):
956 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
958 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
959 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
960 # flags=rwdts.Flag.SUBSCRIBER,
963 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
965 raise VirtualDeploymentUnitRecordError(
966 "Failed VDUR instantiation %s " % vm_resp
)
968 except Exception as e
:
970 traceback
.print_exc()
971 self
._log
.exception(e
)
972 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
973 self
._state
= VDURecordState
.FAILED
974 yield from self
.instantiation_failed(str(e
))
977 class VlRecordState(enum
.Enum
):
978 """ VL Record State """
980 INSTANTIATION_PENDING
= 102
982 TERMINATE_PENDING
= 104
987 class InternalVirtualLinkRecord(object):
988 """ Internal Virtual Link record """
989 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
993 self
._ivld
_msg
= ivld_msg
994 self
._vnfr
_name
= vnfr_name
995 self
._cloud
_account
_name
= cloud_account_name
997 self
._vlr
_req
= self
.create_vlr()
999 self
._state
= VlRecordState
.INIT
1003 """ Find VLR by id """
1004 return self
._vlr
_req
.id
1008 """ Name of this VL """
1009 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
1012 def network_id(self
):
1013 """ Find VLR by id """
1014 return self
._vlr
.network_id
if self
._vlr
else None
1017 """ VLR path for this VLR instance"""
1018 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
1020 def create_vlr(self
):
1021 """ Create the VLR record which will be instantiated """
1023 vld_fields
= ["short_name",
1030 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
1032 vlr_dict
= {"id": str(uuid
.uuid4()),
1034 "cloud_account": self
._cloud
_account
_name
,
1036 vlr_dict
.update(vld_copy_dict
)
1038 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
1042 def instantiate(self
, xact
, restart_mode
=False):
1043 """ Instantiate VL """
1046 def instantiate_vlr():
1047 """ Instantiate VLR"""
1048 self
._log
.debug("Create VL with xpath %s and vlr %s",
1049 self
.vlr_path(), self
._vlr
_req
)
1051 with self
._dts
.transaction(flags
=0) as xact
:
1052 block
= xact
.block_create()
1053 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1054 self
._log
.debug("Executing VL create path:%s msg:%s",
1055 self
.vlr_path(), self
._vlr
_req
)
1059 res_iter
= yield from block
.execute()
1061 self
._state
= VlRecordState
.FAILED
1062 self
._log
.exception("Caught exception while instantial VL")
1065 for ent
in res_iter
:
1066 res
= yield from ent
1067 self
._vlr
= res
.result
1069 if self
._vlr
.operational_status
== 'failed':
1070 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1071 self
._state
= VlRecordState
.FAILED
1072 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1074 self
._log
.info("Created VL with xpath %s and vlr %s",
1075 self
.vlr_path(), self
._vlr
)
1079 """ Get the network id """
1080 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1082 for ent
in res_iter
:
1083 res
= yield from ent
1087 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1089 raise InternalVirtualLinkRecordError(err
)
1092 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1095 vl
= yield from get_vlr()
1097 yield from instantiate_vlr()
1099 yield from instantiate_vlr()
1101 self
._state
= VlRecordState
.ACTIVE
1103 def vlr_in_vns(self
):
1104 """ Is there a VLR record in VNS """
1105 if (self
._state
== VlRecordState
.ACTIVE
or
1106 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1107 self
._state
== VlRecordState
.FAILED
):
1113 def terminate(self
, xact
):
1114 """Terminate this VL """
1115 if not self
.vlr_in_vns():
1116 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1117 self
.vlr_id
, self
._state
)
1120 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1121 self
._state
= VlRecordState
.TERMINATE_PENDING
1122 block
= xact
.block_create()
1123 block
.add_query_delete(self
.vlr_path())
1124 yield from block
.execute(flags
=0, now
=True)
1125 self
._state
= VlRecordState
.TERMINATED
1126 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1129 class VirtualNetworkFunctionRecord(object):
1130 """ Virtual Network Function Record """
1131 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1135 self
._cluster
_name
= cluster_name
1136 self
._vnfr
_msg
= vnfr_msg
1137 self
._vnfr
_id
= vnfr_msg
.id
1138 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1140 self
._vcs
_handler
= vcs_handler
1141 self
._vnfr
= vnfr_msg
1142 self
._mgmt
_network
= mgmt_network
1144 self
._vnfd
= vnfr_msg
.vnfd
1145 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1146 self
._state
_failed
_reason
= None
1147 self
._ext
_vlrs
= {} # The list of external virtual links
1148 self
._vlrs
= [] # The list of internal virtual links
1149 self
._vdus
= [] # The list of vdu
1150 self
._vlr
_by
_cp
= {}
1152 self
._inventory
= {}
1153 self
._create
_time
= int(time
.time())
1154 self
._vnf
_mon
= None
1155 self
._config
_status
= vnfr_msg
.config_status
1156 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1157 self
._rw
_vnfd
= None
1158 self
._vnfd
_ref
_count
= 0
1160 def _get_vdur_from_vdu_id(self
, vdu_id
):
1161 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1162 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1163 for vdu
in self
._vdus
:
1164 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1165 if vdu
.vdu_id
== vdu_id
:
1168 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1171 def operational_status(self
):
1172 """ Operational status of this VNFR """
1173 op_status_map
= {"INIT": "init",
1174 "VL_INIT_PHASE": "vl_init_phase",
1175 "VM_INIT_PHASE": "vm_init_phase",
1177 "TERMINATE": "terminate",
1178 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1179 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1180 "TERMINATED": "terminated",
1181 "FAILED": "failed", }
1182 return op_status_map
[self
._state
.name
]
1185 def vnfd_xpath(vnfd_id
):
1186 """ VNFD xpath associated with this VNFR """
1187 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1190 def vnfd_ref_count(self
):
1191 """ Returns the VNFD reference count associated with this VNFR """
1192 return self
._vnfd
_ref
_count
1194 def vnfd_in_use(self
):
1195 """ Returns whether vnfd is in use or not """
1196 return True if self
._vnfd
_ref
_count
> 0 else False
1199 """ Take a reference on this object """
1200 self
._vnfd
_ref
_count
+= 1
1201 return self
._vnfd
_ref
_count
1203 def vnfd_unref(self
):
1204 """ Release reference on this object """
1205 if self
._vnfd
_ref
_count
< 1:
1206 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1207 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1208 self
._log
.critical(msg
)
1209 raise VnfRecordError(msg
)
1210 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1211 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1212 self
._vnfd
_ref
_count
-= 1
1213 return self
._vnfd
_ref
_count
1217 """ VNFD for this VNFR """
1222 """ VNFD name associated with this VNFR """
1223 return self
.vnfd
.name
1227 """ Name of this VNF in the record """
1228 return self
._vnfr
.name
1231 def cloud_account_name(self
):
1232 """ Name of the cloud account this VNFR is instantiated in """
1233 return self
._vnfr
.cloud_account
1237 """ VNFD Id associated with this VNFR """
1242 """ VNFR Id associated with this VNFR """
1243 return self
._vnfr
_id
1246 def member_vnf_index(self
):
1247 """ Member VNF index associated with this VNFR """
1248 return self
._vnfr
.member_vnf_index_ref
1251 def config_status(self
):
1252 """ Config agent status for this VNFR """
1253 return self
._config
_status
1255 def component_by_name(self
, component_name
):
1256 """ Find a component by name in the inventory list"""
1257 mangled_name
= VcsComponent
.mangle_name(component_name
,
1260 return self
._inventory
[mangled_name
]
1265 def get_nsr_config(self
):
1266 ### Need access to NS instance configuration for runtime resolution.
1267 ### This shall be replaced when deployment flavors are implemented
1268 xpath
= "C,/nsr:ns-instance-config"
1269 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1271 for result
in results
:
1272 entry
= yield from result
1273 ns_instance_config
= entry
.result
1274 for nsr
in ns_instance_config
.nsr
:
1275 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1280 def start_component(self
, component_name
, ip_addr
):
1281 """ Start a component in the VNFR by name """
1282 comp
= self
.component_by_name(component_name
)
1283 yield from comp
.start(None, None, ip_addr
)
1285 def cp_ip_addr(self
, cp_name
):
1286 """ Get ip address for connection point """
1287 self
._log
.debug("cp_ip_addr()")
1288 for cp
in self
._cprs
:
1289 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1290 return cp
.ip_address
1293 def mgmt_intf_info(self
):
1294 """ Get Management interface info for this VNFR """
1295 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1297 if mgmt_intf_desc
.has_field("cp"):
1298 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1299 elif mgmt_intf_desc
.has_field("vdu_id"):
1301 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1302 ip_addr
= vdur
.management_ip
1303 except VDURecordNotFound
:
1304 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1307 ip_addr
= mgmt_intf_desc
.ip_address
1308 port
= mgmt_intf_desc
.port
1310 return ip_addr
, port
1314 """ Message associated with this VNFR """
1315 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1316 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1318 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1319 ip_address
, port
= self
.mgmt_intf_info()
1321 if ip_address
is not None:
1322 mgmt_intf
.ip_address
= ip_address
1323 if port
is not None:
1324 mgmt_intf
.port
= port
1326 vnfr_dict
= {"id": self
._vnfr
_id
,
1327 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1329 "member_vnf_index_ref": self
.member_vnf_index
,
1330 "operational_status": self
.operational_status
,
1331 "operational_status_details": self
._state
_failed
_reason
,
1332 "cloud_account": self
.cloud_account_name
,
1333 "config_status": self
._config
_status
1336 vnfr_dict
.update(vnfd_copy_dict
)
1338 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1339 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1341 vnfr_msg
.create_time
= self
._create
_time
1342 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1343 vnfr_msg
.mgmt_interface
= mgmt_intf
1345 # Add all the VLRs to VNFR
1346 for vlr
in self
._vlrs
:
1347 ivlr
= vnfr_msg
.internal_vlr
.add()
1348 ivlr
.vlr_ref
= vlr
.vlr_id
1350 # Add all the VDURs to VDUR
1351 if self
._vdus
is not None:
1352 for vdu
in self
._vdus
:
1353 vdur
= vnfr_msg
.vdur
.add()
1354 vdur
.from_dict(vdu
.msg
.as_dict())
1356 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1357 vnfr_msg
.dashboard_url
= self
.dashboard_url
1359 for cpr
in self
._cprs
:
1360 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1361 vnfr_msg
.connection_point
.append(new_cp
)
1363 if self
._vnf
_mon
is not None:
1364 for monp
in self
._vnf
_mon
.msg
:
1365 vnfr_msg
.monitoring_param
.append(
1366 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1368 if self
._vnfr
.vnf_configuration
is not None:
1369 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1370 if (ip_address
is not None and
1371 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1372 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1374 for group
in self
._vnfr
_msg
.placement_groups_info
:
1375 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1376 group_info
.from_dict(group
.as_dict())
1377 vnfr_msg
.placement_groups_info
.append(group_info
)
1382 def dashboard_url(self
):
1383 ip
, cfg_port
= self
.mgmt_intf_info()
1386 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1387 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1390 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1391 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1393 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1397 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1404 """ path for this VNFR """
1405 return("D,/vnfr:vnfr-catalog"
1406 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1409 def publish(self
, xact
):
1410 """ publish this VNFR """
1412 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1413 self
.xpath
, self
.msg
)
1414 vnfr
.create_time
= self
._create
_time
1415 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1416 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1417 self
.xpath
, self
.msg
)
1420 def create_vls(self
):
1421 """ Publish The VLs associated with this VNF """
1422 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1424 for ivld_msg
in self
.vnfd
.internal_vld
:
1425 self
._log
.debug("Creating internal vld:"
1426 " %s, int_cp_ref = %s",
1427 ivld_msg
, ivld_msg
.internal_connection_point
1429 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1433 vnfr_name
=self
.name
,
1434 cloud_account_name
=self
.cloud_account_name
1436 self
._vlrs
.append(vlr
)
1438 for int_cp
in ivld_msg
.internal_connection_point
:
1439 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1440 msg
= ("Connection point %s already "
1441 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1442 raise InternalVirtualLinkRecordError(msg
)
1443 self
._log
.debug("Setting vlr %s to internal cp = %s",
1445 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1448 def instantiate_vls(self
, xact
, restart_mode
=False):
1449 """ Instantiate the VLs associated with this VNF """
1450 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1453 for vlr
in self
._vlrs
:
1454 self
._log
.debug("Instantiating VLR %s", vlr
)
1455 yield from vlr
.instantiate(xact
, restart_mode
)
1457 def find_vlr_by_cp(self
, cp_name
):
1458 """ Find the VLR associated with the cp name """
1459 return self
._vlr
_by
_cp
[cp_name
]
1461 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1463 Returns the cloud specific construct for placement group
1465 input_group: VNFD PlacementGroup
1466 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1468 copy_dict
= ['name', 'requirement', 'strategy']
1469 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1470 if group_info
.placement_group_ref
== input_group
.name
and \
1471 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1472 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1473 group_dict
= {k
:v
for k
,v
in
1474 group_info
.as_dict().items()
1475 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1476 for param
in copy_dict
:
1477 group_dict
.update({param
: getattr(input_group
, param
)})
1478 group
.from_dict(group_dict
)
1483 def get_vdu_placement_groups(self
, vdu
):
1484 placement_groups
= []
1485 ### Step-1: Get VNF level placement groups
1486 for group
in self
._vnfr
_msg
.placement_groups_info
:
1487 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1488 #group_info.from_dict(group.as_dict())
1489 placement_groups
.append(group
)
1491 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1492 nsr_config
= yield from self
.get_nsr_config()
1494 ### Step-3: Get VDU level placement groups
1495 for group
in self
.vnfd
.placement_groups
:
1496 for member_vdu
in group
.member_vdus
:
1497 if member_vdu
.member_vdu_ref
== vdu
.id:
1498 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1500 if group_info
is None:
1501 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1502 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1504 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1508 self
.member_vnf_index
)
1509 placement_groups
.append(group_info
)
1511 return placement_groups
1514 def create_vdus(self
, vnfr
, restart_mode
=False):
1515 """ Create the VDUs associated with this VNF """
1517 def get_vdur_id(vdud
):
1518 """Get the corresponding VDUR's id for the VDUD. This is useful in
1521 In restart mode we check for exiting VDUR's ID and use them, if
1522 available. This way we don't end up creating duplicate VDURs
1526 if restart_mode
and vdud
is not None:
1528 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1531 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1536 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1537 for vdu
in self
._rw
_vnfd
.vdu
:
1538 self
._log
.debug("Creating vdu: %s", vdu
)
1539 vdur_id
= get_vdur_id(vdu
)
1541 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1542 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1545 self
.member_vnf_index
,
1546 [ group
.name
for group
in placement_groups
])
1548 vdur
= VirtualDeploymentUnitRecord(
1554 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1555 mgmt_network
=self
._mgmt
_network
,
1556 cloud_account_name
=self
.cloud_account_name
,
1557 vnfd_package_store
=self
._vnfd
_package
_store
,
1559 placement_groups
= placement_groups
,
1561 yield from vdur
.vdu_opdata_register()
1563 self
._vdus
.append(vdur
)
1566 def instantiate_vdus(self
, xact
, vnfr
):
1567 """ Instantiate the VDUs associated with this VNF """
1568 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1570 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1572 # Identify any dependencies among the VDUs
1573 dependencies
= collections
.defaultdict(list)
1574 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1576 for vdu
in self
._vdus
:
1577 if vdu
.vdud_cloud_init
is not None:
1578 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1579 if vdu_id
!= vdu
.vdu_id
:
1580 # This means that vdu.vdu_id depends upon vdu_id,
1581 # i.e. vdu_id must be instantiated before
1583 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1585 # Define the terminal states of VDU instantiation
1587 VDURecordState
.READY
,
1588 VDURecordState
.TERMINATED
,
1589 VDURecordState
.FAILED
,
1592 datastore
= VdurDatastore()
1596 def instantiate_monitor(vdu
):
1597 """Monitor the state of the VDU during instantiation
1600 vdu - a VirtualDeploymentUnitRecord
1603 # wait for the VDUR to enter a terminal state
1604 while vdu
._state
not in terminal
:
1605 yield from asyncio
.sleep(1, loop
=self
._loop
)
1607 # update the datastore
1608 datastore
.update(vdu
)
1610 # add the VDU to the set of processed VDUs
1611 processed
.add(vdu
.vdu_id
)
1614 def instantiate(vdu
):
1615 """Instantiate the specified VDU
1618 vdu - a VirtualDeploymentUnitRecord
1621 if the VDU, or any of the VDUs this VDU depends upon, are
1622 terminated or fail to instantiate properly, a
1623 VirtualDeploymentUnitRecordError is raised.
1626 for dependency
in dependencies
[vdu
.vdu_id
]:
1627 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1629 while dependency
.vdu_id
not in processed
:
1630 yield from asyncio
.sleep(1, loop
=self
._loop
)
1632 if not dependency
.active
:
1633 raise VirtualDeploymentUnitRecordError()
1635 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1637 # Populate the datastore with the current values of the VDU
1640 # Substitute any variables contained in the cloud config script
1641 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1643 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1646 # Extract the variable names
1648 for variable
in parts
[1::2]:
1649 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1651 # Iterate of the variables and substitute values from the
1653 for variable
in variables
:
1655 # Handle a reference to a VDU by ID
1656 if variable
.startswith('vdu['):
1657 value
= datastore
.get(variable
)
1659 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1660 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1662 config
= config
.replace("{{ %s }}" % variable
, value
)
1665 # Handle a reference to the current VDU
1666 if variable
.startswith('vdu'):
1667 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1668 config
= config
.replace("{{ %s }}" % variable
, value
)
1671 # Handle unrecognized variables
1672 msg
= 'unrecognized cloud-config variable: {}'
1673 raise ValueError(msg
.format(variable
))
1675 # Instantiate the VDU
1676 with self
._dts
.transaction() as xact
:
1677 self
._log
.debug("Instantiating vdu: %s", vdu
)
1678 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1679 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1680 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1683 # First create a set of tasks to monitor the state of the VDUs and
1684 # report when they have entered a terminal state
1685 for vdu
in self
._vdus
:
1686 self
._loop
.create_task(instantiate_monitor(vdu
))
1688 for vdu
in self
._vdus
:
1689 self
._loop
.create_task(instantiate(vdu
))
1691 def has_mgmt_interface(self
, vdu
):
1692 # ## TODO: Support additional mgmt_interface type options
1693 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1697 def vlr_xpath(self
, vlr_id
):
1700 "D,/vlr:vlr-catalog/"
1701 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1703 def ext_vlr_by_id(self
, vlr_id
):
1704 """ find ext vlr by id """
1705 return self
._ext
_vlrs
[vlr_id
]
1708 def publish_inventory(self
, xact
):
1709 """ Publish the inventory associated with this VNF """
1710 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1712 for component
in self
._rw
_vnfd
.component
:
1713 self
._log
.debug("Creating inventory component %s", component
)
1714 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1718 comp
= VcsComponent(dts
=self
._dts
,
1721 cluster_name
=self
._cluster
_name
,
1722 vcs_handler
=self
._vcs
_handler
,
1723 component
=component
,
1724 mangled_name
=mangled_name
,
1726 if comp
.name
in self
._inventory
:
1727 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1728 component
, self
._vnfd
_id
)
1730 self
._log
.debug("Adding component %s for vnrf %s",
1731 comp
.name
, self
._vnfr
_id
)
1732 self
._inventory
[comp
.name
] = comp
1733 yield from comp
.publish(xact
)
1735 def all_vdus_active(self
):
1736 """ Are all VDUS in this VNFR active? """
1737 for vdu
in self
._vdus
:
1741 self
._log
.debug("Inside all_vdus_active. Returning True")
1745 def instantiation_failed(self
, failed_reason
=None):
1746 """ VNFR instantiation failed """
1747 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1748 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1749 self
._state
_failed
_reason
= failed_reason
1751 # Update the VNFR with the changed status
1752 yield from self
.publish(None)
1756 """ This VNF is ready"""
1757 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1759 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1760 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1763 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1765 # Update the VNFR with the changed status
1766 yield from self
.publish(None)
1768 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1769 """Updated the connection point with ip address"""
1770 for cp
in self
._cprs
:
1771 if cp
.name
== cp_name
:
1772 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1773 cp_name
, cp
, ip_address
, cp_id
)
1774 cp
.ip_address
= ip_address
1775 cp
.mac_address
= mac_addr
1776 cp
.connection_point_id
= cp_id
1779 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1780 self
._log
.debug(err
)
1781 raise VirtualDeploymentUnitRecordError(err
)
1783 def set_state(self
, state
):
1784 """ Set state for this VNFR"""
1788 def instantiate(self
, xact
, restart_mode
=False):
1789 """ instantiate this VNF """
1790 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1791 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1796 # Iterate over all the connection points in VNFR and fetch the
1799 def cpr_from_cp(cp
):
1800 """ Creates a record level connection point from the desciptor cp"""
1801 cp_fields
= ["name", "image", "vm-flavor"]
1802 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1804 cpr_dict
.update(cp_copy_dict
)
1805 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1807 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1808 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1810 for cp
in self
._vnfr
.connection_point
:
1811 cpr
= cpr_from_cp(cp
)
1812 self
._cprs
.append(cpr
)
1813 self
._log
.debug("Adding Connection point record %s ", cp
)
1815 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1816 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1817 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1818 rwdts
.XactFlag
.MERGE
)
1822 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1823 cpr
.vlr_ref
= cp
.vlr_ref
1824 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1826 # Increase the VNFD reference count
1831 # Fetch External VLRs
1832 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1833 yield from fetch_vlrs()
1836 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1837 yield from self
.publish_inventory(xact
)
1840 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1841 yield from self
.create_vls()
1844 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1845 yield from self
.publish(xact
)
1848 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1850 yield from self
.instantiate_vls(xact
, restart_mode
)
1851 except Exception as e
:
1852 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1853 yield from self
.instantiation_failed(str(e
))
1856 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1859 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1860 yield from self
.create_vdus(self
, restart_mode
)
1863 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1864 yield from self
.publish(xact
)
1867 # ToDo: Check if this should be prevented during restart
1868 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1869 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1872 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1873 yield from self
.publish(xact
)
1875 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1877 # create task updating uptime for this vnfr
1878 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1879 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1882 def terminate(self
, xact
):
1883 """ Terminate this virtual network function """
1885 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1887 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1890 if self
._vnf
_mon
is not None:
1891 self
._vnf
_mon
.stop()
1892 self
._vnf
_mon
.deregister()
1893 self
._vnf
_mon
= None
1896 def terminate_vls():
1897 """ Terminate VLs in this VNF """
1898 for vl
in self
._vlrs
:
1899 yield from vl
.terminate(xact
)
1902 def terminate_vdus():
1903 """ Terminate VDUS in this VNF """
1904 for vdu
in self
._vdus
:
1905 yield from vdu
.terminate(xact
)
1907 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1908 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1909 yield from terminate_vls()
1911 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1912 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1913 yield from terminate_vdus()
1915 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1916 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1919 def vnfr_uptime_update(self
, xact
):
1921 # Return when vnfr state is FAILED or TERMINATED etc
1922 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1923 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1924 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1925 VirtualNetworkFunctionRecordState
.READY
]:
1927 yield from self
.publish(xact
)
1928 yield from asyncio
.sleep(2, loop
=self
._loop
)
1932 class VnfdDtsHandler(object):
1933 """ DTS handler for VNFD config changes """
1934 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1936 def __init__(self
, dts
, log
, loop
, vnfm
):
1945 """ DTS registration handle """
1950 """ Register for VNFD configuration"""
1952 def on_apply(dts
, acg
, xact
, action
, scratch
):
1953 """Apply the configuration"""
1954 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1955 xact
, action
, scratch
)
1957 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1960 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1961 """ on prepare callback """
1962 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1963 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1964 fref
= ProtobufC
.FieldReference
.alloc()
1965 fref
.goto_whole_message(msg
.to_pbcm())
1967 # Handle deletes in prepare_callback
1968 if fref
.is_field_deleted():
1969 # Delete an VNFD record
1970 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1971 if self
._vnfm
.vnfd_in_use(msg
.id):
1972 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1973 err
= "Cannot delete a VNFD in use - %s" % msg
1974 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1975 # Delete a VNFD record
1976 yield from self
._vnfm
.delete_vnfd(msg
.id)
1978 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1981 "Registering for VNFD config using xpath: %s",
1982 VnfdDtsHandler
.XPATH
,
1984 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1985 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1986 self
._regh
= acg
.register(
1987 xpath
=VnfdDtsHandler
.XPATH
,
1988 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1989 on_prepare
=on_prepare
)
1992 class VcsComponentDtsHandler(object):
1993 """ Vcs Component DTS handler """
1994 XPATH
= ("D,/rw-manifest:manifest" +
1995 "/rw-manifest:operational-inventory" +
1996 "/rw-manifest:component")
1998 def __init__(self
, dts
, log
, loop
, vnfm
):
2007 """ DTS registration handle """
2012 """ Registers VCS component dts publisher registration"""
2013 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
2014 VcsComponentDtsHandler
.XPATH
)
2016 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
2017 handlers
= rift
.tasklets
.Group
.Handler()
2018 with self
._dts
.group_create(handler
=handlers
) as group
:
2019 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
2021 flags
=(rwdts
.Flag
.PUBLISHER |
2022 rwdts
.Flag
.NO_PREP_READ |
2023 rwdts
.Flag
.DATASTORE
),)
2026 def publish(self
, xact
, path
, msg
):
2027 """ Publishes the VCS component """
2028 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
2030 self
.regh
.create_element(path
, msg
)
2031 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2032 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
2034 class VnfrConsoleOperdataDtsHandler(object):
2035 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2037 def vnfr_vdu_console_xpath(self
):
2038 """ path for resource-mgr"""
2039 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
2041 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2048 self
._vnfr
_id
= vnfr_id
2049 self
._vdur
_id
= vdur_id
2050 self
._vdu
_id
= vdu_id
2054 """ Register for VNFR VDU Operational Data read from dts """
2057 def on_prepare(xact_info
, action
, ks_path
, msg
):
2058 """ prepare callback from dts """
2059 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2061 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2062 xact_info
, action
, xpath
, msg
2065 if action
== rwdts
.QueryAction
.READ
:
2066 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2067 path_entry
= schema
.keyspec_to_entry(ks_path
)
2068 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2070 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2071 except VnfRecordError
as e
:
2072 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2073 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2076 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2077 if not vdur
._state
== VDURecordState
.READY
:
2078 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2079 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2081 with self
._dts
.transaction() as new_xact
:
2082 resp
= yield from vdur
.read_resource(new_xact
)
2083 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2084 vdur_console
.id = self
._vdur
_id
2085 if resp
.console_url
:
2086 vdur_console
.console_url
= resp
.console_url
2088 vdur_console
.console_url
= 'none'
2089 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2091 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2092 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2093 vdur_console
.id = self
._vdur
_id
2094 vdur_console
.console_url
= 'none'
2096 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2097 xpath
=self
.vnfr_vdu_console_xpath
,
2100 #raise VnfRecordError("Not supported operation %s" % action)
2101 self
._log
.error("Not supported operation %s" % action
)
2102 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2106 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2107 self
.vnfr_vdu_console_xpath
)
2108 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2109 with self
._dts
.group_create() as group
:
2110 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2112 flags
=rwdts
.Flag
.PUBLISHER
,
2116 class VnfrDtsHandler(object):
2117 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2118 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2120 def __init__(self
, dts
, log
, loop
, vnfm
):
2130 """ Return registration handle"""
2135 """ Return VNF manager instance """
2140 """ Register for vnfr create/update/delete/read requests from dts """
2141 def on_commit(xact_info
):
2142 """ The transaction has been committed """
2143 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2144 return rwdts
.MemberRspCode
.ACTION_OK
2146 def on_abort(*args
):
2147 """ Abort callback """
2148 self
._log
.debug("VNF transaction got aborted")
2151 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2154 def instantiate_realloc_vnfr(vnfr
):
2155 """Re-populate the vnfm after restart
2162 yield from vnfr
.instantiate(None, restart_mode
=True)
2164 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2165 curr_cfg
= self
.regh
.elements
2166 for cfg
in curr_cfg
:
2167 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2168 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2170 self
._log
.debug("Got on_event in vnfm")
2172 return rwdts
.MemberRspCode
.ACTION_OK
2175 def on_prepare(xact_info
, action
, ks_path
, msg
):
2176 """ prepare callback from dts """
2178 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2179 xact_info
, action
, msg
2182 if action
== rwdts
.QueryAction
.CREATE
:
2183 if not msg
.has_field("vnfd"):
2184 err
= "Vnfd not provided"
2185 self
._log
.error(err
)
2186 raise VnfRecordError(err
)
2188 vnfr
= self
.vnfm
.create_vnfr(msg
)
2190 # RIFT-9105: Unable to add a READ query under an existing transaction
2191 # xact = xact_info.xact
2192 yield from vnfr
.instantiate(None)
2193 except Exception as e
:
2194 self
._log
.exception(e
)
2195 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2196 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2197 yield from vnfr
.publish(None)
2198 elif action
== rwdts
.QueryAction
.DELETE
:
2199 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2200 path_entry
= schema
.keyspec_to_entry(ks_path
)
2201 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2204 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2205 raise VirtualNetworkFunctionRecordNotFound(
2206 "VNFR id %s", path_entry
.key00
.id)
2209 yield from vnfr
.terminate(xact_info
.xact
)
2212 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2213 except Exception as e
:
2214 self
._log
.exception(e
)
2215 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2217 elif action
== rwdts
.QueryAction
.UPDATE
:
2218 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2219 path_entry
= schema
.keyspec_to_entry(ks_path
)
2222 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2223 except Exception as e
:
2224 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2225 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2229 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2230 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2233 self
._log
.debug("VNFR {} update config status {} (current {})".
2234 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2235 # Update the config status and publish
2236 vnfr
._config
_status
= msg
.config_status
2237 yield from vnfr
.publish(None)
2240 raise NotImplementedError(
2241 "%s action on VirtualNetworkFunctionRecord not supported",
2244 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2246 self
._log
.debug("Registering for VNFR using xpath: %s",
2247 VnfrDtsHandler
.XPATH
,)
2249 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2250 on_prepare
=on_prepare
,)
2251 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2252 with self
._dts
.group_create(handler
=handlers
) as group
:
2253 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2255 flags
=(rwdts
.Flag
.PUBLISHER |
2256 rwdts
.Flag
.NO_PREP_READ |
2258 rwdts
.Flag
.DATASTORE
),)
2261 def create(self
, xact
, path
, msg
):
2263 Create a VNFR record in DTS with path and message
2265 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2268 self
.regh
.create_element(path
, msg
)
2269 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2273 def update(self
, xact
, path
, msg
):
2275 Update a VNFR record in DTS with path and message
2277 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2279 self
.regh
.update_element(path
, msg
)
2280 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2284 def delete(self
, xact
, path
):
2286 Delete a VNFR record in DTS with path and message
2288 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2289 self
.regh
.delete_element(path
)
2290 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2293 class VnfdRefCountDtsHandler(object):
2294 """ The VNFD Ref Count DTS handler """
2295 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2297 def __init__(self
, dts
, log
, loop
, vnfm
):
2307 """ Return registration handle """
2312 """ Return the NS manager instance """
2317 """ Register for VNFD ref count read from dts """
2320 def on_prepare(xact_info
, action
, ks_path
, msg
):
2321 """ prepare callback from dts """
2322 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2324 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2325 xact_info
, action
, xpath
, msg
2328 if action
== rwdts
.QueryAction
.READ
:
2329 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2330 path_entry
= schema
.keyspec_to_entry(ks_path
)
2331 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2332 for xpath
, msg
in vnfd_list
:
2333 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2335 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2338 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2340 raise VnfRecordError("Not supported operation %s" % action
)
2342 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2343 with self
._dts
.group_create() as group
:
2344 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2346 flags
=rwdts
.Flag
.PUBLISHER
,
2350 class VdurDatastore(object):
2352 This VdurDatastore is intended to expose select information about a VDUR
2353 such that it can be referenced in a cloud config file. The data that is
2354 exposed does not necessarily follow the structure of the data in the yang
2355 model. This is intentional. The data that are exposed are intended to be
2356 agnostic of the yang model so that changes in the model do not necessarily
2357 require changes to the interface provided to the user. It also means that
2358 the user does not need to be familiar with the RIFT.ware yang models.
2362 """Create an instance of VdurDatastore"""
2363 self
._vdur
_data
= dict()
2364 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2366 def add(self
, vdur
):
2367 """Add a new VDUR to the datastore
2370 vdur - a VirtualDeploymentUnitRecord instance
2373 A ValueError is raised if the VDUR is (1) None or (2) already in
2377 if vdur
.vdu_id
is None:
2378 raise ValueError('VDURs are required to have an ID')
2380 if vdur
.vdu_id
in self
._vdur
_data
:
2381 raise ValueError('cannot add a VDUR more than once')
2383 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2385 def set_if_not_none(key
, attr
):
2386 if attr
is not None:
2387 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2389 set_if_not_none('name', vdur
._vdud
.name
)
2390 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2392 def update(self
, vdur
):
2393 """Update the VDUR information in the datastore
2396 vdur - a GI representation of a VDUR
2399 A ValueError is raised if the VDUR is (1) None or (2) already in
2403 if vdur
.vdu_id
is None:
2404 raise ValueError('VNFDs are required to have an ID')
2406 if vdur
.vdu_id
not in self
._vdur
_data
:
2407 raise ValueError('VNF is not recognized')
2409 def set_or_delete(key
, attr
):
2411 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2412 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2415 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2417 set_or_delete('name', vdur
._vdud
.name
)
2418 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2420 def remove(self
, vdur_id
):
2421 """Remove all of the data associated with specified VDUR
2424 vdur_id - the identifier of a VNFD in the datastore
2427 A ValueError is raised if the VDUR is not contained in the
2431 if vdur_id
not in self
._vdur
_data
:
2432 raise ValueError('VNF is not recognized')
2434 del self
._vdur
_data
[vdur_id
]
2436 def get(self
, expr
):
2437 """Retrieve VDUR information from the datastore
2439 An expression should be of the form,
2443 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2444 the exposed attribute that the user wishes to retrieve.
2446 If the requested data is not available, None is returned.
2449 expr - a string that specifies the data to return
2452 A ValueError is raised if the provided expression cannot be parsed.
2455 The requested data or None
2458 result
= self
._pattern
.match(expr
)
2460 raise ValueError('data expression not recognized ({})'.format(expr
))
2462 vdur_id
, key
= result
.groups()
2464 if vdur_id
not in self
._vdur
_data
:
2467 return self
._vdur
_data
[vdur_id
].get(key
, None)
2470 class VnfManager(object):
2471 """ The virtual network function manager class """
2472 def __init__(self
, dts
, log
, loop
, cluster_name
):
2476 self
._cluster
_name
= cluster_name
2478 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2479 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2480 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2481 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2483 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2486 self
._vnfr
_ref
_handler
,
2489 self
._vnfds
_to
_vnfr
= {}
2493 def vnfr_handler(self
):
2494 """ VNFR dts handler """
2495 return self
._vnfr
_handler
2498 def vcs_handler(self
):
2499 """ VCS dts handler """
2500 return self
._vcs
_handler
2504 """ Register all static DTS handlers """
2505 for hdl
in self
._dts
_handlers
:
2506 yield from hdl
.register()
2510 """ Run this VNFM instance """
2511 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2512 yield from self.register()
2514 def handle_nsr(self, nsr, action):
2515 if action in [rwdts.QueryAction.CREATE]:
2516 self._nsrs[nsr.id] = nsr
2517 elif action == rwdts.QueryAction.DELETE:
2518 if nsr.id in self._nsrs:
2519 del self._nsrs[nsr.id]
2521 def get_linked_mgmt_network(self, vnfr):
2522 """For the given VNFR get the related mgmt network from the NSD, if
2525 vnfd_id = vnfr.vnfd.id
2526 nsr_id = vnfr.nsr_id_ref
2528 # for the given related VNFR, get the corresponding NSR-config
2531 nsr_obj = self._nsrs[nsr_id]
2533 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2535 # for the related NSD check if a VLD exists such that it's a mgmt
2537 for vld in nsr_obj.nsd.vld:
2538 if vld.mgmt_network:
2543 def get_vnfr(self, vnfr_id):
2544 """ get VNFR by vnfr id """
2546 if vnfr_id not in self._vnfrs:
2547 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2549 return self._vnfrs[vnfr_id]
2551 def create_vnfr(self, vnfr):
2552 """ Create a VNFR instance """
2553 if vnfr.id in self._vnfrs:
2554 msg = "Vnfr
id %s already exists
" % vnfr.id
2555 self._log.error(msg)
2556 raise VnfRecordError(msg)
2558 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2562 mgmt_network = self.get_linked_mgmt_network(vnfr)
2564 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2565 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2566 mgmt_network=mgmt_network
2570 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2571 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2573 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2575 return self._vnfrs[vnfr.id]
2578 def delete_vnfr(self, xact, vnfr):
2579 """ Create a VNFR instance """
2580 if vnfr.vnfr_id in self._vnfrs:
2581 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2582 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2584 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2585 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2586 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2588 del self._vnfrs[vnfr.vnfr_id]
2591 def fetch_vnfd(self, vnfd_id):
2592 """ Fetch VNFDs based with the vnfd id"""
2593 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2594 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2597 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2599 for ent in res_iter:
2600 res = yield from ent
2604 err = "Failed to get Vnfd
%s" % vnfd_id
2605 self._log.error(err)
2606 raise VnfRecordError(err)
2608 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2612 def vnfd_in_use(self, vnfd_id):
2613 """ Is this VNFD in use """
2614 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2615 if vnfd_id in self._vnfds_to_vnfr:
2616 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2620 def publish_vnfr(self, xact, path, msg):
2621 """ Publish a VNFR """
2622 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2624 yield from self.vnfr_handler.update(xact, path, msg)
2627 def delete_vnfd(self, vnfd_id):
2628 """ Delete the Virtual Network Function descriptor with the passed id """
2629 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2630 if vnfd_id in self._vnfds_to_vnfr:
2631 if self._vnfds_to_vnfr[vnfd_id]:
2632 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2634 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2635 raise VirtualNetworkFunctionDescriptorRefCountExists(
2636 "Cannot delete
:%s, ref_count
:%s",
2638 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2640 del self._vnfds_to_vnfr[vnfd_id]
2642 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2644 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2645 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2646 if os.path.exists(vnfd_dir):
2647 shutil.rmtree(vnfd_dir, ignore_errors=True)
2648 except Exception as e:
2649 self._log.error("Exception in cleaning up VNFD
{}: {}".
2650 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2651 self._log.exception(e)
2654 def vnfd_refcount_xpath(self, vnfd_id):
2655 """ xpath for ref count entry """
2656 return (VnfdRefCountDtsHandler.XPATH +
2657 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2660 def get_vnfd_refcount(self, vnfd_id):
2661 """ Get the vnfd_list from this VNFM"""
2663 if vnfd_id is None or vnfd_id == "":
2664 for vnfd in self._vnfds_to_vnfr.keys():
2665 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2666 vnfd_msg.vnfd_id_ref = vnfd
2667 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2668 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2669 elif vnfd_id in self._vnfds_to_vnfr:
2670 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2671 vnfd_msg.vnfd_id_ref = vnfd_id
2672 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2673 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2678 class VnfmTasklet(rift.tasklets.Tasklet):
2679 """ VNF Manager tasklet class """
2680 def __init__(self, *args, **kwargs):
2681 super(VnfmTasklet, self).__init__(*args, **kwargs)
2682 self.rwlog.set_category("rw
-mano
-log
")
2683 self.rwlog.set_subcategory("vnfm
")
2690 super(VnfmTasklet, self).start()
2691 self.log.info("Starting VnfmTasklet
")
2693 self.log.setLevel(logging.DEBUG)
2695 self.log.debug("Registering with dts
")
2696 self._dts = rift.tasklets.DTS(self.tasklet_info,
2697 RwVnfmYang.get_schema(),
2699 self.on_dts_state_change)
2701 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2703 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2706 def on_instance_started(self):
2707 """ Task insance started callback """
2708 self.log.debug("Got instance started callback
")
2714 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2719 """ Task init callback """
2721 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2722 assert vm_parent_name is not None
2723 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2724 yield from self._vnfm.run()
2726 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2731 """ Task run callback """
2735 def on_dts_state_change(self, state):
2736 """Take action according to current dts state to transition
2737 application into the corresponding application state
2740 state - current dts state
2743 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2744 rwdts.State.CONFIG: rwdts.State.RUN,
2748 rwdts.State.INIT: self.init,
2749 rwdts.State.RUN: self.run,
2752 # Transition application to next state
2753 handler = handlers.get(state, None)
2754 if handler is not None:
2755 yield from handler()
2757 # Transition dts to next state
2758 next_state = switch.get(state, None)
2759 if next_state is not None:
2760 self._dts.handle.set_state(next_state)