2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.package
.script
53 import rift
.mano
.dts
as mano_dts
56 class VMResourceError(Exception):
57 """ VM resource Error"""
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
71 class NotImplemented(Exception):
72 """Not implemented """
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
136 class VNFMPlacementGroupError(Exception):
139 class VirtualNetworkFunctionRecordState(enum
.Enum
):
146 VL_TERMINATE_PHASE
= 6
147 VDU_TERMINATE_PHASE
= 7
152 class VDURecordState(enum
.Enum
):
153 """VDU record state """
156 RESOURCE_ALLOC_PENDING
= 3
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
169 self
._component
= component
170 self
._cluster
_name
= cluster_name
171 self
._vcs
_handler
= vcs_handler
172 self
._mangled
_name
= mangled_name
175 def mangle_name(component_name
, vnf_name
, vnfd_id
):
176 """ mangled component name """
177 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
181 """ name of this component"""
182 return self
._mangled
_name
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self
.name
)
193 def instance_xpath(self
):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
198 "[instance-name = '{}']".format(self
._cluster
_name
))
201 def start_comp_xpath(self
):
202 """ start component xpath """
203 return (self
.instance_xpath
+
204 "/child-n[instance-name = 'START-REQ']")
206 def get_start_comp_msg(self
, ip_address
):
207 """ start this component """
208 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
209 start_msg
.instance_name
= 'START-REQ'
210 start_msg
.component_name
= self
.name
211 start_msg
.admin_command
= "START"
212 start_msg
.ip_address
= ip_address
218 """ Returns the message for this vcs component"""
220 vcs_comp_dict
= self
._component
.as_dict()
222 def mangle_comp_names(comp_dict
):
223 """ mangle component name with VNF name, id"""
224 for key
, val
in comp_dict
.items():
225 if isinstance(val
, dict):
226 comp_dict
[key
] = mangle_comp_names(val
)
227 elif isinstance(val
, list):
230 if isinstance(ent
, dict):
231 val
[i
] = mangle_comp_names(ent
)
235 elif key
== "component_name":
236 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
241 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
242 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
246 def publish(self
, xact
):
247 """ Publishes the VCS component """
248 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self
.name
, self
.path
, self
.msg
)
250 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
253 def start(self
, xact
, parent
, ip_addr
=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg
= self
.get_start_comp_msg(ip_addr
)
257 self
._log
.debug("starting component %s %s",
258 self
.start_comp_xpath
, start_msg
)
259 yield from self
._dts
.query_create(self
.start_comp_xpath
,
262 self
._log
.debug("started component %s, %s",
263 self
.start_comp_xpath
, start_msg
)
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
279 placement_groups
=[]):
285 self
._mgmt
_intf
= mgmt_intf
286 self
._cloud
_account
_name
= cloud_account_name
287 self
._vnfd
_package
_store
= vnfd_package_store
288 self
._mgmt
_network
= mgmt_network
290 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
293 self
._state
= VDURecordState
.INIT
294 self
._state
_failed
_reason
= None
295 self
._request
_id
= str(uuid
.uuid4())
296 self
._name
= vnfr
.name
+ "__" + vdud
.id
297 self
._placement
_groups
= placement_groups
300 self
._vdud
_cloud
_init
= None
301 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
304 def vdu_opdata_register(self
):
305 yield from self
._vdur
_console
_handler
.register()
307 def cp_ip_addr(self
, cp_name
):
308 """ Find ip address by connection point name """
309 if self
._vm
_resp
is not None:
310 for conn_point
in self
._vm
_resp
.connection_points
:
311 if conn_point
.name
== cp_name
:
312 return conn_point
.ip_address
315 def cp_mac_addr(self
, cp_name
):
316 """ Find mac address by connection point name """
317 if self
._vm
_resp
is not None:
318 for conn_point
in self
._vm
_resp
.connection_points
:
319 if conn_point
.name
== cp_name
:
320 return conn_point
.mac_addr
321 return "00:00:00:00:00:00"
323 def cp_id(self
, cp_name
):
324 """ Find connection point id by connection point name """
325 if self
._vm
_resp
is not None:
326 for conn_point
in self
._vm
_resp
.connection_points
:
327 if conn_point
.name
== cp_name
:
328 return conn_point
.connection_point_id
341 """ Return this VDUR's name """
345 def cloud_account_name(self
):
346 """ Cloud account this VDU should be created in """
347 return self
._cloud
_account
_name
350 def image_name(self
):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self
._vdud
:
354 return os
.path
.basename(self
._vdud
.image
)
357 def image_checksum(self
):
358 """ name that should be used to lookup the image on the CMP """
359 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
362 def management_ip(self
):
365 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
368 def vm_management_ip(self
):
371 return self
._vm
_resp
.management_ip
374 def operational_status(self
):
375 """ Operational status of this VDU"""
376 op_stats_dict
= {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
384 return op_stats_dict
[self
._state
.name
]
388 """ Process VDU message from resmgr"""
389 vdu_fields
= ["vm_flavor",
396 vdu_copy_dict
= {k
: v
for k
, v
in
397 self
._vdud
.as_dict().items() if k
in vdu_fields
}
398 vdur_dict
= {"id": self
._vdur
_id
,
399 "vdu_id_ref": self
._vdud
.id,
400 "operational_status": self
.operational_status
,
401 "operational_status_details": self
._state
_failed
_reason
,
403 if self
.vm_resp
is not None:
404 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
405 "flavor_id": self
.vm_resp
.flavor_id
407 if self
._vm
_resp
.has_field('image_id'):
408 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
410 if self
.management_ip
is not None:
411 vdur_dict
["management_ip"] = self
.management_ip
413 if self
.vm_management_ip
is not None:
414 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
416 vdur_dict
.update(vdu_copy_dict
)
418 if self
.vm_resp
is not None:
419 if self
._vm
_resp
.has_field('volumes'):
420 for opvolume
in self
._vm
_resp
.volumes
:
421 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
422 if len(vdurvol_data
) == 1:
423 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
424 if opvolume
.has_field('custom_meta_data'):
425 metadata_list
= list()
426 for metadata_item
in opvolume
.custom_meta_data
:
427 metadata_list
.append(metadata_item
.as_dict())
428 if 'guest_params' not in vdurvol_data
[0]:
429 vdurvol_data
[0]['guest_params'] = dict()
430 vdurvol_data
[0]['guest_params']['custom_meta_data'] = metadata_list
432 if self
._vm
_resp
.has_field('custom_boot_data'):
433 vdur_dict
['custom_boot_data'] = dict()
434 if self
._vm
_resp
.custom_boot_data
.has_field('custom_drive'):
435 vdur_dict
['custom_boot_data']['custom_drive'] = self
._vm
_resp
.custom_boot_data
.custom_drive
436 if self
._vm
_resp
.custom_boot_data
.has_field('custom_meta_data'):
437 metadata_list
= list()
438 for metadata_item
in self
._vm
_resp
.custom_boot_data
.custom_meta_data
:
439 metadata_list
.append(metadata_item
.as_dict())
440 vdur_dict
['custom_boot_data']['custom_meta_data'] = metadata_list
441 if self
._vm
_resp
.custom_boot_data
.has_field('custom_config_files'):
443 for file_item
in self
._vm
_resp
.custom_boot_data
.custom_config_files
:
444 file_list
.append(file_item
.as_dict())
445 vdur_dict
['custom_boot_data']['custom_config_files'] = file_list
450 for intf
, cp_id
, vlr
in self
._int
_intf
:
451 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
453 icp_list
.append({"name": cp
.name
,
455 "type_yang": "VPORT",
456 "ip_address": self
.cp_ip_addr(cp
.id),
457 "mac_address": self
.cp_mac_addr(cp
.id)})
459 ii_list
.append({"name": intf
.name
,
460 "vdur_internal_connection_point_ref": cp
.id,
461 "virtual_interface": {}})
463 vdur_dict
["internal_connection_point"] = icp_list
464 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
465 vdur_dict
["internal_interface"] = ii_list
468 for intf
, cp
, vlr
in self
._ext
_intf
:
469 ei_list
.append({"name": cp
.name
,
470 "vnfd_connection_point_ref": cp
.name
,
471 "virtual_interface": {}})
472 self
._vnfr
.update_cp(cp
.name
,
473 self
.cp_ip_addr(cp
.name
),
474 self
.cp_mac_addr(cp
.name
),
477 vdur_dict
["external_interface"] = ei_list
479 placement_groups
= []
480 for group
in self
._placement
_groups
:
481 placement_groups
.append(group
.as_dict())
482 vdur_dict
['placement_groups_info'] = placement_groups
484 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
487 def resmgr_path(self
):
488 """ path for resource-mgr"""
489 return ("D,/rw-resource-mgr:resource-mgmt" +
491 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
494 def vm_flavor_msg(self
):
495 """ VM flavor message """
496 flavor
= self
._vdud
.vm_flavor
.__class
__()
497 flavor
.copy_from(self
._vdud
.vm_flavor
)
502 def vdud_cloud_init(self
):
503 """ Return the cloud-init contents for the VDU """
504 if self
._vdud
_cloud
_init
is None:
505 self
._vdud
_cloud
_init
= self
.cloud_init()
507 return self
._vdud
_cloud
_init
509 def cloud_init(self
):
510 """ Populate cloud_init with cloud-config script from
511 either the inline contents or from the file provided
513 if self
._vdud
.cloud_init
is not None:
514 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
515 return self
._vdud
.cloud_init
516 elif self
._vdud
.cloud_init_file
is not None:
517 # Get cloud-init script contents from the file provided in the cloud_init_file param
518 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
519 filename
= self
._vdud
.cloud_init_file
520 self
._vnfd
_package
_store
.refresh()
521 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
522 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
524 return cloud_init_extractor
.read_script(stored_package
, filename
)
525 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
526 raise VirtualDeploymentUnitRecordError(e
)
528 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
530 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
532 availability_zones
= []
534 for group
in self
._placement
_groups
:
535 if group
.has_field('host_aggregate'):
536 for aggregate
in group
.host_aggregate
:
537 host_aggregates
.append(aggregate
.as_dict())
538 if group
.has_field('availability_zone'):
539 availability_zones
.append(group
.availability_zone
.as_dict())
540 if group
.has_field('server_group'):
541 server_groups
.append(group
.server_group
.as_dict())
543 if availability_zones
:
544 if len(availability_zones
) > 1:
545 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
546 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
548 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
551 if len(server_groups
) > 1:
552 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
553 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
555 vm_create_msg_dict
['server_group'] = server_groups
[0]
558 vm_create_msg_dict
['host_aggregate'] = host_aggregates
562 def process_placement_groups(self
, vm_create_msg_dict
):
563 """Process the placement_groups and fill resource-mgr request"""
564 if not self
._placement
_groups
:
567 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
568 assert len(cloud_set
) == 1
569 cloud_type
= cloud_set
.pop()
571 if cloud_type
== 'openstack':
572 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
575 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
578 def process_custom_bootdata(self
, vm_create_msg_dict
):
579 """Process the custom boot data"""
580 if 'custom_config_files' not in vm_create_msg_dict
['custom_boot_data']:
583 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
584 script_extractor
= rift
.package
.script
.PackageScriptExtractor(self
._log
)
585 for custom_file_item
in vm_create_msg_dict
['custom_boot_data']['custom_config_files']:
586 if 'source' not in custom_file_item
or 'dest' not in custom_file_item
:
588 source
= custom_file_item
['source']
589 # Find source file in scripts dir of VNFD
590 self
._vnfd
_package
_store
.refresh()
591 self
._log
.debug("Checking for source config file at %s", source
)
593 source_file_str
= script_extractor
.read_script(stored_package
, source
)
594 except rift
.package
.script
.ScriptExtractionError
as e
:
595 raise VirtualDeploymentUnitRecordError(e
)
596 # Update source file location with file contents
597 custom_file_item
['source'] = source_file_str
601 def resmgr_msg(self
, config
=None):
602 vdu_fields
= ["vm_flavor",
610 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
611 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
613 vm_create_msg_dict
= {
617 if self
.image_name
is not None:
618 vm_create_msg_dict
["image_name"] = self
.image_name
620 if self
.image_checksum
is not None:
621 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
623 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
624 if self
._vdud
.has_field('mgmt_vpci'):
625 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
627 self
._log
.debug("VDUD: %s", self
._vdud
)
628 if config
is not None:
629 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
631 if self
._mgmt
_network
:
632 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
635 for intf
, cp
, vlr
in self
._ext
_intf
:
636 cp_info
= {"name": cp
.name
,
637 "virtual_link_id": vlr
.network_id
,
638 "type_yang": intf
.virtual_interface
.type_yang
,
639 "port_security_enabled": cp
.port_security_enabled
}
641 if (intf
.virtual_interface
.has_field('vpci') and
642 intf
.virtual_interface
.vpci
is not None):
643 cp_info
["vpci"] = intf
.virtual_interface
.vpci
645 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
646 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
648 cp_list
.append(cp_info
)
650 for intf
, cp
, vlr
in self
._int
_intf
:
651 if (intf
.virtual_interface
.has_field('vpci') and
652 intf
.virtual_interface
.vpci
is not None):
653 cp_list
.append({"name": cp
,
654 "virtual_link_id": vlr
.network_id
,
655 "type_yang": intf
.virtual_interface
.type_yang
,
656 "vpci": intf
.virtual_interface
.vpci
})
658 cp_list
.append({"name": cp
,
659 "virtual_link_id": vlr
.network_id
,
660 "type_yang": intf
.virtual_interface
.type_yang
,
661 "port_security_enabled": cp
.port_security_enabled
})
663 vm_create_msg_dict
["connection_points"] = cp_list
664 vm_create_msg_dict
.update(vdu_copy_dict
)
666 self
.process_placement_groups(vm_create_msg_dict
)
667 if 'custom_boot_data' in vm_create_msg_dict
:
668 self
.process_custom_bootdata(vm_create_msg_dict
)
670 msg
= RwResourceMgrYang
.VDUEventData()
671 msg
.event_id
= self
._request
_id
672 msg
.cloud_account
= self
.cloud_account_name
673 msg
.request_info
.from_dict(vm_create_msg_dict
)
678 def terminate(self
, xact
):
679 """ Delete resource in VIM """
680 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
681 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
684 self
._state
= VDURecordState
.TERMINATING
685 if self
._vm
_resp
is not None:
687 with self
._dts
.transaction() as new_xact
:
688 yield from self
.delete_resource(new_xact
)
690 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
692 if self
._rm
_regh
is not None:
693 self
._log
.debug("Deregistering resource manager registration handle")
694 self
._rm
_regh
.deregister()
697 if self
._vdur
_console
_handler
is not None:
698 self
._log
.error("Deregistering vnfr vdur registration handle")
699 self
._vdur
_console
_handler
._regh
.deregister()
700 self
._vdur
_console
_handler
._regh
= None
702 self
._state
= VDURecordState
.TERMINATED
704 def find_internal_cp_by_cp_id(self
, cp_id
):
705 """ Find the CP corresponding to the connection point id"""
708 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
711 for int_cp
in self
._vdud
.internal_connection_point
:
712 self
._log
.debug("Checking for int cp %s in internal connection points",
714 if int_cp
.id == cp_id
:
719 self
._log
.debug("Failed to find cp %s in internal connection points",
721 msg
= "Failed to find cp %s in internal connection points" % cp_id
722 raise VduRecordError(msg
)
724 # return the VLR associated with the connection point
728 def create_resource(self
, xact
, vnfr
, config
=None):
729 """ Request resource from ResourceMgr """
730 def find_cp_by_name(cp_name
):
731 """ Find a connection point by name """
733 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
734 for ext_cp
in vnfr
._cprs
:
735 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
736 if ext_cp
.name
== cp_name
:
740 self
._log
.debug("Failed to find cp %s in external connection points",
744 def find_internal_vlr_by_cp_name(cp_name
):
745 """ Find the VLR corresponding to the connection point name"""
748 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
751 for int_cp
in self
._vdud
.internal_connection_point
:
752 self
._log
.debug("Checking for int cp %s in internal connection points",
754 if int_cp
.id == cp_name
:
759 self
._log
.debug("Failed to find cp %s in internal connection points",
761 msg
= "Failed to find cp %s in internal connection points" % cp_name
762 raise VduRecordError(msg
)
764 # return the VLR associated with the connection point
765 return vnfr
.find_vlr_by_cp(cp_name
)
767 block
= xact
.block_create()
769 self
._log
.debug("Executing vm request id: %s, action: create",
772 # Resolve the networks associated external interfaces
773 for ext_intf
in self
._vdud
.external_interface
:
774 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
775 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
776 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
778 self
._log
.debug("Failed to find connection point - %s",
779 ext_intf
.vnfd_connection_point_ref
)
781 self
._log
.debug("Connection point name [%s], type[%s]",
782 cp
.name
, cp
.type_yang
)
784 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
786 etuple
= (ext_intf
, cp
, vlr
)
787 self
._ext
_intf
.append(etuple
)
789 self
._log
.debug("Created external interface tuple : %s", etuple
)
791 # Resolve the networks associated internal interfaces
792 for intf
in self
._vdud
.internal_interface
:
793 cp_id
= intf
.vdu_internal_connection_point_ref
794 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
798 vlr
= find_internal_vlr_by_cp_name(cp_id
)
799 except Exception as e
:
800 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
801 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
802 raise VduRecordError(msg
)
804 ituple
= (intf
, cp_id
, vlr
)
805 self
._int
_intf
.append(ituple
)
807 self
._log
.debug("Created internal interface tuple : %s", ituple
)
809 resmgr_path
= self
.resmgr_path
810 resmgr_msg
= self
.resmgr_msg(config
)
812 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
813 block
.add_query_create(resmgr_path
, resmgr_msg
)
815 res_iter
= yield from block
.execute(now
=True)
823 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
824 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
825 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
826 return resp
.resource_info
829 def delete_resource(self
, xact
):
830 block
= xact
.block_create()
832 self
._log
.debug("Executing vm request id: %s, action: delete",
835 block
.add_query_delete(self
.resmgr_path
)
837 yield from block
.execute(flags
=0, now
=True)
840 def read_resource(self
, xact
):
841 block
= xact
.block_create()
843 self
._log
.debug("Executing vm request id: %s, action: delete",
846 block
.add_query_read(self
.resmgr_path
)
848 res_iter
= yield from block
.execute(flags
=0, now
=True)
853 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
854 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
855 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
856 #self._vm_resp = resp.resource_info
857 return resp
.resource_info
861 def start_component(self
):
862 """ This VDUR is active """
863 self
._log
.debug("Starting component %s for vdud %s vdur %s",
864 self
._vdud
.vcs_component_ref
,
867 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
868 self
.vm_resp
.management_ip
)
872 """ Is this VDU active """
873 return True if self
._state
is VDURecordState
.READY
else False
876 def instantiation_failed(self
, failed_reason
=None):
877 """ VDU instantiation failed """
878 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
879 self
._state
= VDURecordState
.FAILED
880 self
._state
_failed
_reason
= failed_reason
881 yield from self
._vnfr
.instantiation_failed(failed_reason
)
884 def vdu_is_active(self
):
885 """ This VDU is active"""
887 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
890 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
892 if self
._vdud
.vcs_component_ref
is not None:
893 yield from self
.start_component()
895 self
._state
= VDURecordState
.READY
897 if self
._vnfr
.all_vdus_active():
898 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
899 yield from self
._vnfr
.is_ready()
902 def instantiate(self
, xact
, vnfr
, config
=None):
903 """ Instantiate this VDU """
904 self
._state
= VDURecordState
.INSTANTIATING
907 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
908 """ This VDUR is active """
909 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
914 if (query_action
== rwdts
.QueryAction
.UPDATE
or
915 query_action
== rwdts
.QueryAction
.CREATE
):
918 if msg
.resource_state
== "active":
919 # Move this VDU to ready state
920 yield from self
.vdu_is_active()
921 elif msg
.resource_state
== "failed":
922 yield from self
.instantiation_failed(msg
.resource_errors
)
923 elif query_action
== rwdts
.QueryAction
.DELETE
:
924 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
926 raise NotImplementedError(
927 "%s action on VirtualDeployementUnitRecord not supported",
930 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
933 reg_event
= asyncio
.Event(loop
=self
._loop
)
936 def on_ready(regh
, status
):
939 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
940 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
941 flags
=rwdts
.Flag
.SUBSCRIBER
,
943 yield from reg_event
.wait()
945 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
946 self
._vm
_resp
= vm_resp
948 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
949 self
._log
.debug("Requested VM from resource manager response %s",
951 if vm_resp
.resource_state
== "active":
952 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
954 yield from self
.vdu_is_active()
955 self
._state
= VDURecordState
.READY
956 elif (vm_resp
.resource_state
== "pending" or
957 vm_resp
.resource_state
== "inactive"):
958 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
960 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
961 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
962 # flags=rwdts.Flag.SUBSCRIBER,
965 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
967 raise VirtualDeploymentUnitRecordError(
968 "Failed VDUR instantiation %s " % vm_resp
)
970 except Exception as e
:
972 traceback
.print_exc()
973 self
._log
.exception(e
)
974 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
975 self
._state
= VDURecordState
.FAILED
976 yield from self
.instantiation_failed(str(e
))
979 class VlRecordState(enum
.Enum
):
980 """ VL Record State """
982 INSTANTIATION_PENDING
= 102
984 TERMINATE_PENDING
= 104
989 class InternalVirtualLinkRecord(object):
990 """ Internal Virtual Link record """
991 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
, ip_profile
=None):
995 self
._ivld
_msg
= ivld_msg
996 self
._vnfr
_name
= vnfr_name
997 self
._cloud
_account
_name
= cloud_account_name
998 self
._ip
_profile
= ip_profile
1000 self
._vlr
_req
= self
.create_vlr()
1002 self
._state
= VlRecordState
.INIT
1006 """ Find VLR by id """
1007 return self
._vlr
_req
.id
1011 """ Name of this VL """
1012 if self
._ivld
_msg
.vim_network_name
:
1013 return self
._ivld
_msg
.vim_network_name
1015 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
1018 def network_id(self
):
1019 """ Find VLR by id """
1020 return self
._vlr
.network_id
if self
._vlr
else None
1023 """ VLR path for this VLR instance"""
1024 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
1026 def create_vlr(self
):
1027 """ Create the VLR record which will be instantiated """
1029 vld_fields
= ["short_name",
1037 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
1039 vlr_dict
= {"id": str(uuid
.uuid4()),
1041 "cloud_account": self
._cloud
_account
_name
,
1044 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
1045 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
1047 vlr_dict
.update(vld_copy_dict
)
1049 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
1053 def instantiate(self
, xact
, restart_mode
=False):
1054 """ Instantiate VL """
1057 def instantiate_vlr():
1058 """ Instantiate VLR"""
1059 self
._log
.debug("Create VL with xpath %s and vlr %s",
1060 self
.vlr_path(), self
._vlr
_req
)
1062 with self
._dts
.transaction(flags
=0) as xact
:
1063 block
= xact
.block_create()
1064 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1065 self
._log
.debug("Executing VL create path:%s msg:%s",
1066 self
.vlr_path(), self
._vlr
_req
)
1070 res_iter
= yield from block
.execute()
1072 self
._state
= VlRecordState
.FAILED
1073 self
._log
.exception("Caught exception while instantial VL")
1076 for ent
in res_iter
:
1077 res
= yield from ent
1078 self
._vlr
= res
.result
1080 if self
._vlr
.operational_status
== 'failed':
1081 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1082 self
._state
= VlRecordState
.FAILED
1083 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1085 self
._log
.info("Created VL with xpath %s and vlr %s",
1086 self
.vlr_path(), self
._vlr
)
1090 """ Get the network id """
1091 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1093 for ent
in res_iter
:
1094 res
= yield from ent
1098 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1100 raise InternalVirtualLinkRecordError(err
)
1103 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1106 vl
= yield from get_vlr()
1108 yield from instantiate_vlr()
1110 yield from instantiate_vlr()
1112 self
._state
= VlRecordState
.ACTIVE
1114 def vlr_in_vns(self
):
1115 """ Is there a VLR record in VNS """
1116 if (self
._state
== VlRecordState
.ACTIVE
or
1117 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1118 self
._state
== VlRecordState
.FAILED
):
1124 def terminate(self
, xact
):
1125 """Terminate this VL """
1126 if not self
.vlr_in_vns():
1127 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1128 self
.vlr_id
, self
._state
)
1131 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1132 self
._state
= VlRecordState
.TERMINATE_PENDING
1133 block
= xact
.block_create()
1134 block
.add_query_delete(self
.vlr_path())
1135 yield from block
.execute(flags
=0, now
=True)
1136 self
._state
= VlRecordState
.TERMINATED
1137 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1140 class VirtualNetworkFunctionRecord(object):
1141 """ Virtual Network Function Record """
1142 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1146 self
._cluster
_name
= cluster_name
1147 self
._vnfr
_msg
= vnfr_msg
1148 self
._vnfr
_id
= vnfr_msg
.id
1149 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1151 self
._vcs
_handler
= vcs_handler
1152 self
._vnfr
= vnfr_msg
1153 self
._mgmt
_network
= mgmt_network
1155 self
._vnfd
= vnfr_msg
.vnfd
1156 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1157 self
._state
_failed
_reason
= None
1158 self
._ext
_vlrs
= {} # The list of external virtual links
1159 self
._vlrs
= [] # The list of internal virtual links
1160 self
._vdus
= [] # The list of vdu
1161 self
._vlr
_by
_cp
= {}
1163 self
._inventory
= {}
1164 self
._create
_time
= int(time
.time())
1165 self
._vnf
_mon
= None
1166 self
._config
_status
= vnfr_msg
.config_status
1167 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1168 self
._rw
_vnfd
= None
1169 self
._vnfd
_ref
_count
= 0
1171 def _get_vdur_from_vdu_id(self
, vdu_id
):
1172 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1173 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1174 for vdu
in self
._vdus
:
1175 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1176 if vdu
.vdu_id
== vdu_id
:
1179 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1182 def operational_status(self
):
1183 """ Operational status of this VNFR """
1184 op_status_map
= {"INIT": "init",
1185 "VL_INIT_PHASE": "vl_init_phase",
1186 "VM_INIT_PHASE": "vm_init_phase",
1188 "TERMINATE": "terminate",
1189 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1190 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1191 "TERMINATED": "terminated",
1192 "FAILED": "failed", }
1193 return op_status_map
[self
._state
.name
]
1196 def vnfd_xpath(vnfd_id
):
1197 """ VNFD xpath associated with this VNFR """
1198 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1201 def vnfd_ref_count(self
):
1202 """ Returns the VNFD reference count associated with this VNFR """
1203 return self
._vnfd
_ref
_count
1205 def vnfd_in_use(self
):
1206 """ Returns whether vnfd is in use or not """
1207 return True if self
._vnfd
_ref
_count
> 0 else False
1210 """ Take a reference on this object """
1211 self
._vnfd
_ref
_count
+= 1
1212 return self
._vnfd
_ref
_count
1214 def vnfd_unref(self
):
1215 """ Release reference on this object """
1216 if self
._vnfd
_ref
_count
< 1:
1217 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1218 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1219 self
._log
.critical(msg
)
1220 raise VnfRecordError(msg
)
1221 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1222 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1223 self
._vnfd
_ref
_count
-= 1
1224 return self
._vnfd
_ref
_count
1228 """ VNFD for this VNFR """
1233 """ VNFD name associated with this VNFR """
1234 return self
.vnfd
.name
1238 """ Name of this VNF in the record """
1239 return self
._vnfr
.name
1242 def cloud_account_name(self
):
1243 """ Name of the cloud account this VNFR is instantiated in """
1244 return self
._vnfr
.cloud_account
1248 """ VNFD Id associated with this VNFR """
1253 """ VNFR Id associated with this VNFR """
1254 return self
._vnfr
_id
1257 def member_vnf_index(self
):
1258 """ Member VNF index associated with this VNFR """
1259 return self
._vnfr
.member_vnf_index_ref
1262 def config_status(self
):
1263 """ Config agent status for this VNFR """
1264 return self
._config
_status
1266 def component_by_name(self
, component_name
):
1267 """ Find a component by name in the inventory list"""
1268 mangled_name
= VcsComponent
.mangle_name(component_name
,
1271 return self
._inventory
[mangled_name
]
1276 def get_nsr_config(self
):
1277 ### Need access to NS instance configuration for runtime resolution.
1278 ### This shall be replaced when deployment flavors are implemented
1279 xpath
= "C,/nsr:ns-instance-config"
1280 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1282 for result
in results
:
1283 entry
= yield from result
1284 ns_instance_config
= entry
.result
1285 for nsr
in ns_instance_config
.nsr
:
1286 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1291 def start_component(self
, component_name
, ip_addr
):
1292 """ Start a component in the VNFR by name """
1293 comp
= self
.component_by_name(component_name
)
1294 yield from comp
.start(None, None, ip_addr
)
1296 def cp_ip_addr(self
, cp_name
):
1297 """ Get ip address for connection point """
1298 self
._log
.debug("cp_ip_addr()")
1299 for cp
in self
._cprs
:
1300 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1301 return cp
.ip_address
1304 def mgmt_intf_info(self
):
1305 """ Get Management interface info for this VNFR """
1306 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1308 if mgmt_intf_desc
.has_field("cp"):
1309 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1310 elif mgmt_intf_desc
.has_field("vdu_id"):
1312 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1313 ip_addr
= vdur
.management_ip
1314 except VDURecordNotFound
:
1315 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1318 ip_addr
= mgmt_intf_desc
.ip_address
1319 port
= mgmt_intf_desc
.port
1321 return ip_addr
, port
1325 """ Message associated with this VNFR """
1326 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1327 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1329 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1330 ip_address
, port
= self
.mgmt_intf_info()
1332 if ip_address
is not None:
1333 mgmt_intf
.ip_address
= ip_address
1334 if port
is not None:
1335 mgmt_intf
.port
= port
1337 vnfr_dict
= {"id": self
._vnfr
_id
,
1338 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1340 "member_vnf_index_ref": self
.member_vnf_index
,
1341 "operational_status": self
.operational_status
,
1342 "operational_status_details": self
._state
_failed
_reason
,
1343 "cloud_account": self
.cloud_account_name
,
1344 "config_status": self
._config
_status
1347 vnfr_dict
.update(vnfd_copy_dict
)
1349 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1350 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1352 vnfr_msg
.create_time
= self
._create
_time
1353 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1354 vnfr_msg
.mgmt_interface
= mgmt_intf
1356 # Add all the VLRs to VNFR
1357 for vlr
in self
._vlrs
:
1358 ivlr
= vnfr_msg
.internal_vlr
.add()
1359 ivlr
.vlr_ref
= vlr
.vlr_id
1361 # Add all the VDURs to VDUR
1362 if self
._vdus
is not None:
1363 for vdu
in self
._vdus
:
1364 vdur
= vnfr_msg
.vdur
.add()
1365 vdur
.from_dict(vdu
.msg
.as_dict())
1367 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1368 vnfr_msg
.dashboard_url
= self
.dashboard_url
1370 for cpr
in self
._cprs
:
1371 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1372 vnfr_msg
.connection_point
.append(new_cp
)
1374 if self
._vnf
_mon
is not None:
1375 for monp
in self
._vnf
_mon
.msg
:
1376 vnfr_msg
.monitoring_param
.append(
1377 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1379 if self
._vnfr
.vnf_configuration
is not None:
1380 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1381 if (ip_address
is not None and
1382 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1383 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1385 for group
in self
._vnfr
_msg
.placement_groups_info
:
1386 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1387 group_info
.from_dict(group
.as_dict())
1388 vnfr_msg
.placement_groups_info
.append(group_info
)
1393 def dashboard_url(self
):
1394 ip
, cfg_port
= self
.mgmt_intf_info()
1397 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1398 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1401 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1402 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1404 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1408 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1415 """ path for this VNFR """
1416 return("D,/vnfr:vnfr-catalog"
1417 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1420 def publish(self
, xact
):
1421 """ publish this VNFR """
1423 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1424 self
.xpath
, self
.msg
)
1425 vnfr
.create_time
= self
._create
_time
1426 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1427 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1428 self
.xpath
, self
.msg
)
1430 def resolve_vld_ip_profile(self
, vnfd_msg
, vld
):
1431 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1432 if not vld
.has_field('ip_profile_ref'):
1434 profile
= [profile
for profile
in vnfd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1435 return profile
[0] if profile
else None
1438 def create_vls(self
):
1439 """ Publish The VLs associated with this VNF """
1440 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1442 for ivld_msg
in self
.vnfd
.internal_vld
:
1443 self
._log
.debug("Creating internal vld:"
1444 " %s, int_cp_ref = %s",
1445 ivld_msg
, ivld_msg
.internal_connection_point
1447 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1451 vnfr_name
=self
.name
,
1452 cloud_account_name
=self
.cloud_account_name
,
1453 ip_profile
=self
.resolve_vld_ip_profile(self
.vnfd
, ivld_msg
)
1455 self
._vlrs
.append(vlr
)
1457 for int_cp
in ivld_msg
.internal_connection_point
:
1458 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1459 msg
= ("Connection point %s already "
1460 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1461 raise InternalVirtualLinkRecordError(msg
)
1462 self
._log
.debug("Setting vlr %s to internal cp = %s",
1464 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1467 def instantiate_vls(self
, xact
, restart_mode
=False):
1468 """ Instantiate the VLs associated with this VNF """
1469 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1472 for vlr
in self
._vlrs
:
1473 self
._log
.debug("Instantiating VLR %s", vlr
)
1474 yield from vlr
.instantiate(xact
, restart_mode
)
1476 def find_vlr_by_cp(self
, cp_name
):
1477 """ Find the VLR associated with the cp name """
1478 return self
._vlr
_by
_cp
[cp_name
]
1480 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1482 Returns the cloud specific construct for placement group
1484 input_group: VNFD PlacementGroup
1485 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1487 copy_dict
= ['name', 'requirement', 'strategy']
1488 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1489 if group_info
.placement_group_ref
== input_group
.name
and \
1490 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1491 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1492 group_dict
= {k
:v
for k
,v
in
1493 group_info
.as_dict().items()
1494 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1495 for param
in copy_dict
:
1496 group_dict
.update({param
: getattr(input_group
, param
)})
1497 group
.from_dict(group_dict
)
1502 def get_vdu_placement_groups(self
, vdu
):
1503 placement_groups
= []
1504 ### Step-1: Get VNF level placement groups
1505 for group
in self
._vnfr
_msg
.placement_groups_info
:
1506 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1507 #group_info.from_dict(group.as_dict())
1508 placement_groups
.append(group
)
1510 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1511 nsr_config
= yield from self
.get_nsr_config()
1513 ### Step-3: Get VDU level placement groups
1514 for group
in self
.vnfd
.placement_groups
:
1515 for member_vdu
in group
.member_vdus
:
1516 if member_vdu
.member_vdu_ref
== vdu
.id:
1517 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1519 if group_info
is None:
1520 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1521 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1523 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1527 self
.member_vnf_index
)
1528 placement_groups
.append(group_info
)
1530 return placement_groups
1533 def create_vdus(self
, vnfr
, restart_mode
=False):
1534 """ Create the VDUs associated with this VNF """
1536 def get_vdur_id(vdud
):
1537 """Get the corresponding VDUR's id for the VDUD. This is useful in
1540 In restart mode we check for exiting VDUR's ID and use them, if
1541 available. This way we don't end up creating duplicate VDURs
1545 if restart_mode
and vdud
is not None:
1547 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1550 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1555 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1556 for vdu
in self
._rw
_vnfd
.vdu
:
1557 self
._log
.debug("Creating vdu: %s", vdu
)
1558 vdur_id
= get_vdur_id(vdu
)
1560 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1561 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1564 self
.member_vnf_index
,
1565 [ group
.name
for group
in placement_groups
])
1567 vdur
= VirtualDeploymentUnitRecord(
1573 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1574 mgmt_network
=self
._mgmt
_network
,
1575 cloud_account_name
=self
.cloud_account_name
,
1576 vnfd_package_store
=self
._vnfd
_package
_store
,
1578 placement_groups
= placement_groups
,
1580 yield from vdur
.vdu_opdata_register()
1582 self
._vdus
.append(vdur
)
1585 def instantiate_vdus(self
, xact
, vnfr
):
1586 """ Instantiate the VDUs associated with this VNF """
1587 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1589 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1591 # Identify any dependencies among the VDUs
1592 dependencies
= collections
.defaultdict(list)
1593 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1595 for vdu
in self
._vdus
:
1596 if vdu
.vdud_cloud_init
is not None:
1597 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1598 if vdu_id
!= vdu
.vdu_id
:
1599 # This means that vdu.vdu_id depends upon vdu_id,
1600 # i.e. vdu_id must be instantiated before
1602 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1604 # Define the terminal states of VDU instantiation
1606 VDURecordState
.READY
,
1607 VDURecordState
.TERMINATED
,
1608 VDURecordState
.FAILED
,
1611 datastore
= VdurDatastore()
1615 def instantiate_monitor(vdu
):
1616 """Monitor the state of the VDU during instantiation
1619 vdu - a VirtualDeploymentUnitRecord
1622 # wait for the VDUR to enter a terminal state
1623 while vdu
._state
not in terminal
:
1624 yield from asyncio
.sleep(1, loop
=self
._loop
)
1626 # update the datastore
1627 datastore
.update(vdu
)
1629 # add the VDU to the set of processed VDUs
1630 processed
.add(vdu
.vdu_id
)
1633 def instantiate(vdu
):
1634 """Instantiate the specified VDU
1637 vdu - a VirtualDeploymentUnitRecord
1640 if the VDU, or any of the VDUs this VDU depends upon, are
1641 terminated or fail to instantiate properly, a
1642 VirtualDeploymentUnitRecordError is raised.
1645 for dependency
in dependencies
[vdu
.vdu_id
]:
1646 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1648 while dependency
.vdu_id
not in processed
:
1649 yield from asyncio
.sleep(1, loop
=self
._loop
)
1651 if not dependency
.active
:
1652 raise VirtualDeploymentUnitRecordError()
1654 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1656 # Populate the datastore with the current values of the VDU
1659 # Substitute any variables contained in the cloud config script
1660 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1662 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1665 # Extract the variable names
1667 for variable
in parts
[1::2]:
1668 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1670 # Iterate of the variables and substitute values from the
1672 for variable
in variables
:
1674 # Handle a reference to a VDU by ID
1675 if variable
.startswith('vdu['):
1676 value
= datastore
.get(variable
)
1678 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1679 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1681 config
= config
.replace("{{ %s }}" % variable
, value
)
1684 # Handle a reference to the current VDU
1685 if variable
.startswith('vdu'):
1686 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1687 config
= config
.replace("{{ %s }}" % variable
, value
)
1690 # Handle unrecognized variables
1691 msg
= 'unrecognized cloud-config variable: {}'
1692 raise ValueError(msg
.format(variable
))
1694 # Instantiate the VDU
1695 with self
._dts
.transaction() as xact
:
1696 self
._log
.debug("Instantiating vdu: %s", vdu
)
1697 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1698 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1699 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1702 # First create a set of tasks to monitor the state of the VDUs and
1703 # report when they have entered a terminal state
1704 for vdu
in self
._vdus
:
1705 self
._loop
.create_task(instantiate_monitor(vdu
))
1707 for vdu
in self
._vdus
:
1708 self
._loop
.create_task(instantiate(vdu
))
1710 def has_mgmt_interface(self
, vdu
):
1711 # ## TODO: Support additional mgmt_interface type options
1712 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1716 def vlr_xpath(self
, vlr_id
):
1719 "D,/vlr:vlr-catalog/"
1720 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1722 def ext_vlr_by_id(self
, vlr_id
):
1723 """ find ext vlr by id """
1724 return self
._ext
_vlrs
[vlr_id
]
1727 def publish_inventory(self
, xact
):
1728 """ Publish the inventory associated with this VNF """
1729 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1731 for component
in self
._rw
_vnfd
.component
:
1732 self
._log
.debug("Creating inventory component %s", component
)
1733 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1737 comp
= VcsComponent(dts
=self
._dts
,
1740 cluster_name
=self
._cluster
_name
,
1741 vcs_handler
=self
._vcs
_handler
,
1742 component
=component
,
1743 mangled_name
=mangled_name
,
1745 if comp
.name
in self
._inventory
:
1746 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1747 component
, self
._vnfd
_id
)
1749 self
._log
.debug("Adding component %s for vnrf %s",
1750 comp
.name
, self
._vnfr
_id
)
1751 self
._inventory
[comp
.name
] = comp
1752 yield from comp
.publish(xact
)
1754 def all_vdus_active(self
):
1755 """ Are all VDUS in this VNFR active? """
1756 for vdu
in self
._vdus
:
1760 self
._log
.debug("Inside all_vdus_active. Returning True")
1764 def instantiation_failed(self
, failed_reason
=None):
1765 """ VNFR instantiation failed """
1766 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1767 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1768 self
._state
_failed
_reason
= failed_reason
1770 # Update the VNFR with the changed status
1771 yield from self
.publish(None)
1775 """ This VNF is ready"""
1776 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1778 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1779 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1782 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1784 # Update the VNFR with the changed status
1785 yield from self
.publish(None)
1787 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1788 """Updated the connection point with ip address"""
1789 for cp
in self
._cprs
:
1790 if cp
.name
== cp_name
:
1791 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1792 cp_name
, cp
, ip_address
, cp_id
)
1793 cp
.ip_address
= ip_address
1794 cp
.mac_address
= mac_addr
1795 cp
.connection_point_id
= cp_id
1798 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1799 self
._log
.debug(err
)
1800 raise VirtualDeploymentUnitRecordError(err
)
1802 def set_state(self
, state
):
1803 """ Set state for this VNFR"""
1807 def instantiate(self
, xact
, restart_mode
=False):
1808 """ instantiate this VNF """
1809 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1810 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1815 # Iterate over all the connection points in VNFR and fetch the
1818 def cpr_from_cp(cp
):
1819 """ Creates a record level connection point from the desciptor cp"""
1820 cp_fields
= ["name", "image", "vm-flavor", "port_security_enabled"]
1821 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1823 cpr_dict
.update(cp_copy_dict
)
1824 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1826 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1827 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1829 for cp
in self
._vnfr
.connection_point
:
1830 cpr
= cpr_from_cp(cp
)
1831 self
._cprs
.append(cpr
)
1832 self
._log
.debug("Adding Connection point record %s ", cp
)
1834 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1835 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1836 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1837 rwdts
.XactFlag
.MERGE
)
1841 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1842 cpr
.vlr_ref
= cp
.vlr_ref
1843 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1845 # Increase the VNFD reference count
1850 # Fetch External VLRs
1851 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1852 yield from fetch_vlrs()
1855 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1856 yield from self
.publish_inventory(xact
)
1859 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1860 yield from self
.create_vls()
1863 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1864 yield from self
.publish(xact
)
1867 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1869 yield from self
.instantiate_vls(xact
, restart_mode
)
1870 except Exception as e
:
1871 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1872 yield from self
.instantiation_failed(str(e
))
1875 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1878 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1879 yield from self
.create_vdus(self
, restart_mode
)
1882 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1883 yield from self
.publish(xact
)
1886 # ToDo: Check if this should be prevented during restart
1887 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1888 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1891 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1892 yield from self
.publish(xact
)
1894 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1896 # create task updating uptime for this vnfr
1897 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1898 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1901 def terminate(self
, xact
):
1902 """ Terminate this virtual network function """
1904 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1906 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1909 if self
._vnf
_mon
is not None:
1910 self
._vnf
_mon
.stop()
1911 self
._vnf
_mon
.deregister()
1912 self
._vnf
_mon
= None
1915 def terminate_vls():
1916 """ Terminate VLs in this VNF """
1917 for vl
in self
._vlrs
:
1918 yield from vl
.terminate(xact
)
1921 def terminate_vdus():
1922 """ Terminate VDUS in this VNF """
1923 for vdu
in self
._vdus
:
1924 yield from vdu
.terminate(xact
)
1926 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1927 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1928 yield from terminate_vls()
1930 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1931 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1932 yield from terminate_vdus()
1934 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1935 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1938 def vnfr_uptime_update(self
, xact
):
1940 # Return when vnfr state is FAILED or TERMINATED etc
1941 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1942 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1943 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1944 VirtualNetworkFunctionRecordState
.READY
]:
1946 yield from self
.publish(xact
)
1947 yield from asyncio
.sleep(2, loop
=self
._loop
)
1951 class VnfdDtsHandler(object):
1952 """ DTS handler for VNFD config changes """
1953 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1955 def __init__(self
, dts
, log
, loop
, vnfm
):
1964 """ DTS registration handle """
1969 """ Register for VNFD configuration"""
1971 def on_apply(dts
, acg
, xact
, action
, scratch
):
1972 """Apply the configuration"""
1973 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1974 xact
, action
, scratch
)
1976 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1979 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1980 """ on prepare callback """
1981 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1982 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1983 fref
= ProtobufC
.FieldReference
.alloc()
1984 fref
.goto_whole_message(msg
.to_pbcm())
1986 # Handle deletes in prepare_callback
1987 if fref
.is_field_deleted():
1988 # Delete an VNFD record
1989 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1990 if self
._vnfm
.vnfd_in_use(msg
.id):
1991 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1992 err
= "Cannot delete a VNFD in use - %s" % msg
1993 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1994 # Delete a VNFD record
1995 yield from self
._vnfm
.delete_vnfd(msg
.id)
1997 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2000 "Registering for VNFD config using xpath: %s",
2001 VnfdDtsHandler
.XPATH
,
2003 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2004 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2005 self
._regh
= acg
.register(
2006 xpath
=VnfdDtsHandler
.XPATH
,
2007 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2008 on_prepare
=on_prepare
)
2011 class VcsComponentDtsHandler(object):
2012 """ Vcs Component DTS handler """
2013 XPATH
= ("D,/rw-manifest:manifest" +
2014 "/rw-manifest:operational-inventory" +
2015 "/rw-manifest:component")
2017 def __init__(self
, dts
, log
, loop
, vnfm
):
2026 """ DTS registration handle """
2031 """ Registers VCS component dts publisher registration"""
2032 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
2033 VcsComponentDtsHandler
.XPATH
)
2035 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
2036 handlers
= rift
.tasklets
.Group
.Handler()
2037 with self
._dts
.group_create(handler
=handlers
) as group
:
2038 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
2040 flags
=(rwdts
.Flag
.PUBLISHER |
2041 rwdts
.Flag
.NO_PREP_READ |
2042 rwdts
.Flag
.DATASTORE
),)
2045 def publish(self
, xact
, path
, msg
):
2046 """ Publishes the VCS component """
2047 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
2049 self
.regh
.create_element(path
, msg
)
2050 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2051 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
2053 class VnfrConsoleOperdataDtsHandler(object):
2054 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2056 def vnfr_vdu_console_xpath(self
):
2057 """ path for resource-mgr"""
2058 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
2060 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2067 self
._vnfr
_id
= vnfr_id
2068 self
._vdur
_id
= vdur_id
2069 self
._vdu
_id
= vdu_id
2073 """ Register for VNFR VDU Operational Data read from dts """
2076 def on_prepare(xact_info
, action
, ks_path
, msg
):
2077 """ prepare callback from dts """
2078 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2080 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2081 xact_info
, action
, xpath
, msg
2084 if action
== rwdts
.QueryAction
.READ
:
2085 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2086 path_entry
= schema
.keyspec_to_entry(ks_path
)
2087 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2089 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2090 except VnfRecordError
as e
:
2091 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2092 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2095 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2096 if not vdur
._state
== VDURecordState
.READY
:
2097 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2098 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2100 with self
._dts
.transaction() as new_xact
:
2101 resp
= yield from vdur
.read_resource(new_xact
)
2102 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2103 vdur_console
.id = self
._vdur
_id
2104 if resp
.console_url
:
2105 vdur_console
.console_url
= resp
.console_url
2107 vdur_console
.console_url
= 'none'
2108 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2110 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2111 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2112 vdur_console
.id = self
._vdur
_id
2113 vdur_console
.console_url
= 'none'
2115 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2116 xpath
=self
.vnfr_vdu_console_xpath
,
2119 #raise VnfRecordError("Not supported operation %s" % action)
2120 self
._log
.error("Not supported operation %s" % action
)
2121 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2125 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2126 self
.vnfr_vdu_console_xpath
)
2127 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2128 with self
._dts
.group_create() as group
:
2129 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2131 flags
=rwdts
.Flag
.PUBLISHER
,
2135 class VnfrDtsHandler(object):
2136 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2137 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2139 def __init__(self
, dts
, log
, loop
, vnfm
):
2149 """ Return registration handle"""
2154 """ Return VNF manager instance """
2159 """ Register for vnfr create/update/delete/read requests from dts """
2160 def on_commit(xact_info
):
2161 """ The transaction has been committed """
2162 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2163 return rwdts
.MemberRspCode
.ACTION_OK
2165 def on_abort(*args
):
2166 """ Abort callback """
2167 self
._log
.debug("VNF transaction got aborted")
2170 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2173 def instantiate_realloc_vnfr(vnfr
):
2174 """Re-populate the vnfm after restart
2181 yield from vnfr
.instantiate(None, restart_mode
=True)
2183 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2184 curr_cfg
= self
.regh
.elements
2185 for cfg
in curr_cfg
:
2186 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2187 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2189 self
._log
.debug("Got on_event in vnfm")
2191 return rwdts
.MemberRspCode
.ACTION_OK
2194 def on_prepare(xact_info
, action
, ks_path
, msg
):
2195 """ prepare callback from dts """
2197 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2198 xact_info
, action
, msg
2201 if action
== rwdts
.QueryAction
.CREATE
:
2202 if not msg
.has_field("vnfd"):
2203 err
= "Vnfd not provided"
2204 self
._log
.error(err
)
2205 raise VnfRecordError(err
)
2207 vnfr
= self
.vnfm
.create_vnfr(msg
)
2209 # RIFT-9105: Unable to add a READ query under an existing transaction
2210 # xact = xact_info.xact
2211 yield from vnfr
.instantiate(None)
2212 except Exception as e
:
2213 self
._log
.exception(e
)
2214 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2215 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2216 yield from vnfr
.publish(None)
2217 elif action
== rwdts
.QueryAction
.DELETE
:
2218 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2219 path_entry
= schema
.keyspec_to_entry(ks_path
)
2220 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2223 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2224 raise VirtualNetworkFunctionRecordNotFound(
2225 "VNFR id %s", path_entry
.key00
.id)
2228 yield from vnfr
.terminate(xact_info
.xact
)
2231 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2232 except Exception as e
:
2233 self
._log
.exception(e
)
2234 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2236 elif action
== rwdts
.QueryAction
.UPDATE
:
2237 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2238 path_entry
= schema
.keyspec_to_entry(ks_path
)
2241 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2242 except Exception as e
:
2243 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2244 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2248 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2249 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2252 self
._log
.debug("VNFR {} update config status {} (current {})".
2253 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2254 # Update the config status and publish
2255 vnfr
._config
_status
= msg
.config_status
2256 yield from vnfr
.publish(None)
2259 raise NotImplementedError(
2260 "%s action on VirtualNetworkFunctionRecord not supported",
2263 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2265 self
._log
.debug("Registering for VNFR using xpath: %s",
2266 VnfrDtsHandler
.XPATH
,)
2268 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2269 on_prepare
=on_prepare
,)
2270 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2271 with self
._dts
.group_create(handler
=handlers
) as group
:
2272 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2274 flags
=(rwdts
.Flag
.PUBLISHER |
2275 rwdts
.Flag
.NO_PREP_READ |
2277 rwdts
.Flag
.DATASTORE
),)
2280 def create(self
, xact
, path
, msg
):
2282 Create a VNFR record in DTS with path and message
2284 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2287 self
.regh
.create_element(path
, msg
)
2288 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2292 def update(self
, xact
, path
, msg
):
2294 Update a VNFR record in DTS with path and message
2296 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2298 self
.regh
.update_element(path
, msg
)
2299 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2303 def delete(self
, xact
, path
):
2305 Delete a VNFR record in DTS with path and message
2307 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2308 self
.regh
.delete_element(path
)
2309 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2312 class VnfdRefCountDtsHandler(object):
2313 """ The VNFD Ref Count DTS handler """
2314 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2316 def __init__(self
, dts
, log
, loop
, vnfm
):
2326 """ Return registration handle """
2331 """ Return the NS manager instance """
2336 """ Register for VNFD ref count read from dts """
2339 def on_prepare(xact_info
, action
, ks_path
, msg
):
2340 """ prepare callback from dts """
2341 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2343 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2344 xact_info
, action
, xpath
, msg
2347 if action
== rwdts
.QueryAction
.READ
:
2348 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2349 path_entry
= schema
.keyspec_to_entry(ks_path
)
2350 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2351 for xpath
, msg
in vnfd_list
:
2352 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2354 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2357 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2359 raise VnfRecordError("Not supported operation %s" % action
)
2361 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2362 with self
._dts
.group_create() as group
:
2363 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2365 flags
=rwdts
.Flag
.PUBLISHER
,
2369 class VdurDatastore(object):
2371 This VdurDatastore is intended to expose select information about a VDUR
2372 such that it can be referenced in a cloud config file. The data that is
2373 exposed does not necessarily follow the structure of the data in the yang
2374 model. This is intentional. The data that are exposed are intended to be
2375 agnostic of the yang model so that changes in the model do not necessarily
2376 require changes to the interface provided to the user. It also means that
2377 the user does not need to be familiar with the RIFT.ware yang models.
2381 """Create an instance of VdurDatastore"""
2382 self
._vdur
_data
= dict()
2383 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2385 def add(self
, vdur
):
2386 """Add a new VDUR to the datastore
2389 vdur - a VirtualDeploymentUnitRecord instance
2392 A ValueError is raised if the VDUR is (1) None or (2) already in
2396 if vdur
.vdu_id
is None:
2397 raise ValueError('VDURs are required to have an ID')
2399 if vdur
.vdu_id
in self
._vdur
_data
:
2400 raise ValueError('cannot add a VDUR more than once')
2402 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2404 def set_if_not_none(key
, attr
):
2405 if attr
is not None:
2406 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2408 set_if_not_none('name', vdur
._vdud
.name
)
2409 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2411 def update(self
, vdur
):
2412 """Update the VDUR information in the datastore
2415 vdur - a GI representation of a VDUR
2418 A ValueError is raised if the VDUR is (1) None or (2) already in
2422 if vdur
.vdu_id
is None:
2423 raise ValueError('VNFDs are required to have an ID')
2425 if vdur
.vdu_id
not in self
._vdur
_data
:
2426 raise ValueError('VNF is not recognized')
2428 def set_or_delete(key
, attr
):
2430 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2431 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2434 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2436 set_or_delete('name', vdur
._vdud
.name
)
2437 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2439 def remove(self
, vdur_id
):
2440 """Remove all of the data associated with specified VDUR
2443 vdur_id - the identifier of a VNFD in the datastore
2446 A ValueError is raised if the VDUR is not contained in the
2450 if vdur_id
not in self
._vdur
_data
:
2451 raise ValueError('VNF is not recognized')
2453 del self
._vdur
_data
[vdur_id
]
2455 def get(self
, expr
):
2456 """Retrieve VDUR information from the datastore
2458 An expression should be of the form,
2462 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2463 the exposed attribute that the user wishes to retrieve.
2465 If the requested data is not available, None is returned.
2468 expr - a string that specifies the data to return
2471 A ValueError is raised if the provided expression cannot be parsed.
2474 The requested data or None
2477 result
= self
._pattern
.match(expr
)
2479 raise ValueError('data expression not recognized ({})'.format(expr
))
2481 vdur_id
, key
= result
.groups()
2483 if vdur_id
not in self
._vdur
_data
:
2486 return self
._vdur
_data
[vdur_id
].get(key
, None)
2489 class VnfManager(object):
2490 """ The virtual network function manager class """
2491 def __init__(self
, dts
, log
, loop
, cluster_name
):
2495 self
._cluster
_name
= cluster_name
2497 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2498 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2499 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2500 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2502 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2505 self
._vnfr
_ref
_handler
,
2508 self
._vnfds
_to
_vnfr
= {}
2512 def vnfr_handler(self
):
2513 """ VNFR dts handler """
2514 return self
._vnfr
_handler
2517 def vcs_handler(self
):
2518 """ VCS dts handler """
2519 return self
._vcs
_handler
2523 """ Register all static DTS handlers """
2524 for hdl
in self
._dts
_handlers
:
2525 yield from hdl
.register()
2529 """ Run this VNFM instance """
2530 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2531 yield from self.register()
2533 def handle_nsr(self, nsr, action):
2534 if action in [rwdts.QueryAction.CREATE]:
2535 self._nsrs[nsr.id] = nsr
2536 elif action == rwdts.QueryAction.DELETE:
2537 if nsr.id in self._nsrs:
2538 del self._nsrs[nsr.id]
2540 def get_linked_mgmt_network(self, vnfr):
2541 """For the given VNFR get the related mgmt network from the NSD, if
2544 vnfd_id = vnfr.vnfd.id
2545 nsr_id = vnfr.nsr_id_ref
2547 # for the given related VNFR, get the corresponding NSR-config
2550 nsr_obj = self._nsrs[nsr_id]
2552 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2554 # for the related NSD check if a VLD exists such that it's a mgmt
2556 for vld in nsr_obj.nsd.vld:
2557 if vld.mgmt_network:
2562 def get_vnfr(self, vnfr_id):
2563 """ get VNFR by vnfr id """
2565 if vnfr_id not in self._vnfrs:
2566 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2568 return self._vnfrs[vnfr_id]
2570 def create_vnfr(self, vnfr):
2571 """ Create a VNFR instance """
2572 if vnfr.id in self._vnfrs:
2573 msg = "Vnfr
id %s already exists
" % vnfr.id
2574 self._log.error(msg)
2575 raise VnfRecordError(msg)
2577 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2581 mgmt_network = self.get_linked_mgmt_network(vnfr)
2583 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2584 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2585 mgmt_network=mgmt_network
2589 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2590 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2592 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2594 return self._vnfrs[vnfr.id]
2597 def delete_vnfr(self, xact, vnfr):
2598 """ Create a VNFR instance """
2599 if vnfr.vnfr_id in self._vnfrs:
2600 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2601 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2603 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2604 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2605 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2607 del self._vnfrs[vnfr.vnfr_id]
2610 def fetch_vnfd(self, vnfd_id):
2611 """ Fetch VNFDs based with the vnfd id"""
2612 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2613 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2616 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2618 for ent in res_iter:
2619 res = yield from ent
2623 err = "Failed to get Vnfd
%s" % vnfd_id
2624 self._log.error(err)
2625 raise VnfRecordError(err)
2627 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2631 def vnfd_in_use(self, vnfd_id):
2632 """ Is this VNFD in use """
2633 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2634 if vnfd_id in self._vnfds_to_vnfr:
2635 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2639 def publish_vnfr(self, xact, path, msg):
2640 """ Publish a VNFR """
2641 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2643 yield from self.vnfr_handler.update(xact, path, msg)
2646 def delete_vnfd(self, vnfd_id):
2647 """ Delete the Virtual Network Function descriptor with the passed id """
2648 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2649 if vnfd_id in self._vnfds_to_vnfr:
2650 if self._vnfds_to_vnfr[vnfd_id]:
2651 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2653 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2654 raise VirtualNetworkFunctionDescriptorRefCountExists(
2655 "Cannot delete
:%s, ref_count
:%s",
2657 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2659 del self._vnfds_to_vnfr[vnfd_id]
2661 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2663 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2664 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2665 if os.path.exists(vnfd_dir):
2666 shutil.rmtree(vnfd_dir, ignore_errors=True)
2667 except Exception as e:
2668 self._log.error("Exception in cleaning up VNFD
{}: {}".
2669 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2670 self._log.exception(e)
2673 def vnfd_refcount_xpath(self, vnfd_id):
2674 """ xpath for ref count entry """
2675 return (VnfdRefCountDtsHandler.XPATH +
2676 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2679 def get_vnfd_refcount(self, vnfd_id):
2680 """ Get the vnfd_list from this VNFM"""
2682 if vnfd_id is None or vnfd_id == "":
2683 for vnfd in self._vnfds_to_vnfr.keys():
2684 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2685 vnfd_msg.vnfd_id_ref = vnfd
2686 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2687 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2688 elif vnfd_id in self._vnfds_to_vnfr:
2689 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2690 vnfd_msg.vnfd_id_ref = vnfd_id
2691 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2692 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2697 class VnfmTasklet(rift.tasklets.Tasklet):
2698 """ VNF Manager tasklet class """
2699 def __init__(self, *args, **kwargs):
2700 super(VnfmTasklet, self).__init__(*args, **kwargs)
2701 self.rwlog.set_category("rw
-mano
-log
")
2702 self.rwlog.set_subcategory("vnfm
")
2709 super(VnfmTasklet, self).start()
2710 self.log.info("Starting VnfmTasklet
")
2712 self.log.setLevel(logging.DEBUG)
2714 self.log.debug("Registering with dts
")
2715 self._dts = rift.tasklets.DTS(self.tasklet_info,
2716 RwVnfmYang.get_schema(),
2718 self.on_dts_state_change)
2720 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2722 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2725 def on_instance_started(self):
2726 """ Task insance started callback """
2727 self.log.debug("Got instance started callback
")
2733 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2738 """ Task init callback """
2740 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2741 assert vm_parent_name is not None
2742 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2743 yield from self._vnfm.run()
2745 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2750 """ Task run callback """
2754 def on_dts_state_change(self, state):
2755 """Take action according to current dts state to transition
2756 application into the corresponding application state
2759 state - current dts state
2762 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2763 rwdts.State.CONFIG: rwdts.State.RUN,
2767 rwdts.State.INIT: self.init,
2768 rwdts.State.RUN: self.run,
2771 # Transition application to next state
2772 handler = handlers.get(state, None)
2773 if handler is not None:
2774 yield from handler()
2776 # Transition dts to next state
2777 next_state = switch.get(state, None)
2778 if next_state is not None:
2779 self._dts.handle.set_state(next_state)