2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.package
.script
53 import rift
.mano
.dts
as mano_dts
56 class VMResourceError(Exception):
57 """ VM resource Error"""
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
71 class NotImplemented(Exception):
72 """Not implemented """
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
136 class VNFMPlacementGroupError(Exception):
139 class VirtualNetworkFunctionRecordState(enum
.Enum
):
146 VL_TERMINATE_PHASE
= 6
147 VDU_TERMINATE_PHASE
= 7
152 class VDURecordState(enum
.Enum
):
153 """VDU record state """
156 RESOURCE_ALLOC_PENDING
= 3
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
169 self
._component
= component
170 self
._cluster
_name
= cluster_name
171 self
._vcs
_handler
= vcs_handler
172 self
._mangled
_name
= mangled_name
175 def mangle_name(component_name
, vnf_name
, vnfd_id
):
176 """ mangled component name """
177 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
181 """ name of this component"""
182 return self
._mangled
_name
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self
.name
)
193 def instance_xpath(self
):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
198 "[instance-name = '{}']".format(self
._cluster
_name
))
201 def start_comp_xpath(self
):
202 """ start component xpath """
203 return (self
.instance_xpath
+
204 "/child-n[instance-name = 'START-REQ']")
206 def get_start_comp_msg(self
, ip_address
):
207 """ start this component """
208 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
209 start_msg
.instance_name
= 'START-REQ'
210 start_msg
.component_name
= self
.name
211 start_msg
.admin_command
= "START"
212 start_msg
.ip_address
= ip_address
218 """ Returns the message for this vcs component"""
220 vcs_comp_dict
= self
._component
.as_dict()
222 def mangle_comp_names(comp_dict
):
223 """ mangle component name with VNF name, id"""
224 for key
, val
in comp_dict
.items():
225 if isinstance(val
, dict):
226 comp_dict
[key
] = mangle_comp_names(val
)
227 elif isinstance(val
, list):
230 if isinstance(ent
, dict):
231 val
[i
] = mangle_comp_names(ent
)
235 elif key
== "component_name":
236 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
241 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
242 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
246 def publish(self
, xact
):
247 """ Publishes the VCS component """
248 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self
.name
, self
.path
, self
.msg
)
250 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
253 def start(self
, xact
, parent
, ip_addr
=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg
= self
.get_start_comp_msg(ip_addr
)
257 self
._log
.debug("starting component %s %s",
258 self
.start_comp_xpath
, start_msg
)
259 yield from self
._dts
.query_create(self
.start_comp_xpath
,
262 self
._log
.debug("started component %s, %s",
263 self
.start_comp_xpath
, start_msg
)
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
279 placement_groups
=[]):
285 self
._mgmt
_intf
= mgmt_intf
286 self
._cloud
_account
_name
= cloud_account_name
287 self
._vnfd
_package
_store
= vnfd_package_store
288 self
._mgmt
_network
= mgmt_network
290 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
293 self
._state
= VDURecordState
.INIT
294 self
._state
_failed
_reason
= None
295 self
._request
_id
= str(uuid
.uuid4())
296 self
._name
= vnfr
.name
+ "__" + vdud
.id
297 self
._placement
_groups
= placement_groups
300 self
._vdud
_cloud
_init
= None
301 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
304 def vdu_opdata_register(self
):
305 yield from self
._vdur
_console
_handler
.register()
307 def cp_ip_addr(self
, cp_name
):
308 """ Find ip address by connection point name """
309 if self
._vm
_resp
is not None:
310 for conn_point
in self
._vm
_resp
.connection_points
:
311 if conn_point
.name
== cp_name
:
312 return conn_point
.ip_address
315 def cp_mac_addr(self
, cp_name
):
316 """ Find mac address by connection point name """
317 if self
._vm
_resp
is not None:
318 for conn_point
in self
._vm
_resp
.connection_points
:
319 if conn_point
.name
== cp_name
:
320 return conn_point
.mac_addr
321 return "00:00:00:00:00:00"
323 def cp_id(self
, cp_name
):
324 """ Find connection point id by connection point name """
325 if self
._vm
_resp
is not None:
326 for conn_point
in self
._vm
_resp
.connection_points
:
327 if conn_point
.name
== cp_name
:
328 return conn_point
.connection_point_id
341 """ Return this VDUR's name """
345 def cloud_account_name(self
):
346 """ Cloud account this VDU should be created in """
347 return self
._cloud
_account
_name
350 def image_name(self
):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self
._vdud
:
354 return os
.path
.basename(self
._vdud
.image
)
357 def image_checksum(self
):
358 """ name that should be used to lookup the image on the CMP """
359 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
362 def management_ip(self
):
365 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
368 def vm_management_ip(self
):
371 return self
._vm
_resp
.management_ip
374 def operational_status(self
):
375 """ Operational status of this VDU"""
376 op_stats_dict
= {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
384 return op_stats_dict
[self
._state
.name
]
388 """ Process VDU message from resmgr"""
389 vdu_fields
= ["vm_flavor",
396 vdu_copy_dict
= {k
: v
for k
, v
in
397 self
._vdud
.as_dict().items() if k
in vdu_fields
}
398 vdur_dict
= {"id": self
._vdur
_id
,
399 "vdu_id_ref": self
._vdud
.id,
400 "operational_status": self
.operational_status
,
401 "operational_status_details": self
._state
_failed
_reason
,
403 if self
.vm_resp
is not None:
404 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
405 "flavor_id": self
.vm_resp
.flavor_id
407 if self
._vm
_resp
.has_field('image_id'):
408 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
410 if self
.management_ip
is not None:
411 vdur_dict
["management_ip"] = self
.management_ip
413 if self
.vm_management_ip
is not None:
414 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
416 vdur_dict
.update(vdu_copy_dict
)
418 if self
.vm_resp
is not None:
419 if self
._vm
_resp
.has_field('volumes'):
420 for opvolume
in self
._vm
_resp
.volumes
:
421 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
422 if len(vdurvol_data
) == 1:
423 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
424 if opvolume
.has_field('custom_meta_data'):
425 metadata_list
= list()
426 for metadata_item
in opvolume
.custom_meta_data
:
427 metadata_list
.append(metadata_item
.as_dict())
428 vdurvol_data
[0]['custom_meta_data'] = metadata_list
430 if self
._vm
_resp
.has_field('supplemental_boot_data'):
431 vdur_dict
['supplemental_boot_data'] = dict()
432 if self
._vm
_resp
.supplemental_boot_data
.has_field('boot_data_drive'):
433 vdur_dict
['supplemental_boot_data']['boot_data_drive'] = self
._vm
_resp
.supplemental_boot_data
.boot_data_drive
434 if self
._vm
_resp
.supplemental_boot_data
.has_field('custom_meta_data'):
435 metadata_list
= list()
436 for metadata_item
in self
._vm
_resp
.supplemental_boot_data
.custom_meta_data
:
437 metadata_list
.append(metadata_item
.as_dict())
438 vdur_dict
['supplemental_boot_data']['custom_meta_data'] = metadata_list
439 if self
._vm
_resp
.supplemental_boot_data
.has_field('config_file'):
441 for file_item
in self
._vm
_resp
.supplemental_boot_data
.config_file
:
442 file_list
.append(file_item
.as_dict())
443 vdur_dict
['supplemental_boot_data']['config_file'] = file_list
448 for intf
, cp_id
, vlr
in self
._int
_intf
:
449 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
451 icp_list
.append({"name": cp
.name
,
453 "type_yang": "VPORT",
454 "ip_address": self
.cp_ip_addr(cp
.id),
455 "mac_address": self
.cp_mac_addr(cp
.id)})
457 ii_list
.append({"name": intf
.name
,
458 "vdur_internal_connection_point_ref": cp
.id,
459 "virtual_interface": {}})
461 vdur_dict
["internal_connection_point"] = icp_list
462 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
463 vdur_dict
["internal_interface"] = ii_list
466 for intf
, cp
, vlr
in self
._ext
_intf
:
467 ei_list
.append({"name": cp
.name
,
468 "vnfd_connection_point_ref": cp
.name
,
469 "virtual_interface": {}})
470 self
._vnfr
.update_cp(cp
.name
,
471 self
.cp_ip_addr(cp
.name
),
472 self
.cp_mac_addr(cp
.name
),
475 vdur_dict
["external_interface"] = ei_list
477 placement_groups
= []
478 for group
in self
._placement
_groups
:
479 placement_groups
.append(group
.as_dict())
480 vdur_dict
['placement_groups_info'] = placement_groups
482 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
485 def resmgr_path(self
):
486 """ path for resource-mgr"""
487 return ("D,/rw-resource-mgr:resource-mgmt" +
489 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
492 def vm_flavor_msg(self
):
493 """ VM flavor message """
494 flavor
= self
._vdud
.vm_flavor
.__class
__()
495 flavor
.copy_from(self
._vdud
.vm_flavor
)
500 def vdud_cloud_init(self
):
501 """ Return the cloud-init contents for the VDU """
502 if self
._vdud
_cloud
_init
is None:
503 self
._vdud
_cloud
_init
= self
.cloud_init()
505 return self
._vdud
_cloud
_init
507 def cloud_init(self
):
508 """ Populate cloud_init with cloud-config script from
509 either the inline contents or from the file provided
511 if self
._vdud
.cloud_init
is not None:
512 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
513 return self
._vdud
.cloud_init
514 elif self
._vdud
.cloud_init_file
is not None:
515 # Get cloud-init script contents from the file provided in the cloud_init_file param
516 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
517 filename
= self
._vdud
.cloud_init_file
518 self
._vnfd
_package
_store
.refresh()
519 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
520 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
522 return cloud_init_extractor
.read_script(stored_package
, filename
)
523 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
524 self
.instantiation_failed(str(e
))
525 raise VirtualDeploymentUnitRecordError(e
)
527 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
529 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
531 availability_zones
= []
533 for group
in self
._placement
_groups
:
534 if group
.has_field('host_aggregate'):
535 for aggregate
in group
.host_aggregate
:
536 host_aggregates
.append(aggregate
.as_dict())
537 if group
.has_field('availability_zone'):
538 availability_zones
.append(group
.availability_zone
.as_dict())
539 if group
.has_field('server_group'):
540 server_groups
.append(group
.server_group
.as_dict())
542 if availability_zones
:
543 if len(availability_zones
) > 1:
544 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
545 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
547 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
550 if len(server_groups
) > 1:
551 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
552 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
554 vm_create_msg_dict
['server_group'] = server_groups
[0]
557 vm_create_msg_dict
['host_aggregate'] = host_aggregates
561 def process_placement_groups(self
, vm_create_msg_dict
):
562 """Process the placement_groups and fill resource-mgr request"""
563 if not self
._placement
_groups
:
566 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
567 assert len(cloud_set
) == 1
568 cloud_type
= cloud_set
.pop()
570 if cloud_type
== 'openstack':
571 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
574 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
577 def process_custom_bootdata(self
, vm_create_msg_dict
):
578 """Process the custom boot data"""
579 if 'config_file' not in vm_create_msg_dict
['supplemental_boot_data']:
582 self
._vnfd
_package
_store
.refresh()
583 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
584 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
585 for file_item
in vm_create_msg_dict
['supplemental_boot_data']['config_file']:
586 if 'source' not in file_item
or 'dest' not in file_item
:
588 source
= file_item
['source']
589 # Find source file in scripts dir of VNFD
590 self
._log
.debug("Checking for source config file at %s", source
)
592 source_file_str
= cloud_init_extractor
.read_script(stored_package
, source
)
593 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
594 raise VirtualDeploymentUnitRecordError(e
)
595 # Update source file location with file contents
596 file_item
['source'] = source_file_str
600 def resmgr_msg(self
, config
=None):
601 vdu_fields
= ["vm_flavor",
607 "supplemental_boot_data"]
609 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
610 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
612 vm_create_msg_dict
= {
616 if self
.image_name
is not None:
617 vm_create_msg_dict
["image_name"] = self
.image_name
619 if self
.image_checksum
is not None:
620 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
622 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
623 if self
._vdud
.has_field('mgmt_vpci'):
624 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
626 self
._log
.debug("VDUD: %s", self
._vdud
)
627 if config
is not None:
628 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
630 if self
._mgmt
_network
:
631 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
634 for intf
, cp
, vlr
in self
._ext
_intf
:
635 cp_info
= { "name": cp
.name
,
636 "virtual_link_id": vlr
.network_id
,
637 "type_yang": intf
.virtual_interface
.type_yang
}
639 if cp
.has_field('port_security_enabled'):
640 cp_info
["port_security_enabled"] = cp
.port_security_enabled
642 if (intf
.virtual_interface
.has_field('vpci') and
643 intf
.virtual_interface
.vpci
is not None):
644 cp_info
["vpci"] = intf
.virtual_interface
.vpci
646 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
647 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
649 cp_list
.append(cp_info
)
651 for intf
, cp
, vlr
in self
._int
_intf
:
652 if (intf
.virtual_interface
.has_field('vpci') and
653 intf
.virtual_interface
.vpci
is not None):
654 cp_list
.append({"name": cp
,
655 "virtual_link_id": vlr
.network_id
,
656 "type_yang": intf
.virtual_interface
.type_yang
,
657 "vpci": intf
.virtual_interface
.vpci
})
659 if cp
.has_field('port_security_enabled'):
660 cp_list
.append({"name": cp
,
661 "virtual_link_id": vlr
.network_id
,
662 "type_yang": intf
.virtual_interface
.type_yang
,
663 "port_security_enabled": cp
.port_security_enabled
})
665 cp_list
.append({"name": cp
,
666 "virtual_link_id": vlr
.network_id
,
667 "type_yang": intf
.virtual_interface
.type_yang
})
670 vm_create_msg_dict
["connection_points"] = cp_list
671 vm_create_msg_dict
.update(vdu_copy_dict
)
673 self
.process_placement_groups(vm_create_msg_dict
)
674 if 'supplemental_boot_data' in vm_create_msg_dict
:
675 self
.process_custom_bootdata(vm_create_msg_dict
)
677 msg
= RwResourceMgrYang
.VDUEventData()
678 msg
.event_id
= self
._request
_id
679 msg
.cloud_account
= self
.cloud_account_name
680 msg
.request_info
.from_dict(vm_create_msg_dict
)
685 def terminate(self
, xact
):
686 """ Delete resource in VIM """
687 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
688 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
691 self
._state
= VDURecordState
.TERMINATING
692 if self
._vm
_resp
is not None:
694 with self
._dts
.transaction() as new_xact
:
695 yield from self
.delete_resource(new_xact
)
697 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
699 if self
._rm
_regh
is not None:
700 self
._log
.debug("Deregistering resource manager registration handle")
701 self
._rm
_regh
.deregister()
704 if self
._vdur
_console
_handler
is not None:
705 self
._log
.error("Deregistering vnfr vdur registration handle")
706 self
._vdur
_console
_handler
._regh
.deregister()
707 self
._vdur
_console
_handler
._regh
= None
709 self
._state
= VDURecordState
.TERMINATED
711 def find_internal_cp_by_cp_id(self
, cp_id
):
712 """ Find the CP corresponding to the connection point id"""
715 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
718 for int_cp
in self
._vdud
.internal_connection_point
:
719 self
._log
.debug("Checking for int cp %s in internal connection points",
721 if int_cp
.id == cp_id
:
726 self
._log
.debug("Failed to find cp %s in internal connection points",
728 msg
= "Failed to find cp %s in internal connection points" % cp_id
729 raise VduRecordError(msg
)
731 # return the VLR associated with the connection point
735 def create_resource(self
, xact
, vnfr
, config
=None):
736 """ Request resource from ResourceMgr """
737 def find_cp_by_name(cp_name
):
738 """ Find a connection point by name """
740 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
741 for ext_cp
in vnfr
._cprs
:
742 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
743 if ext_cp
.name
== cp_name
:
747 self
._log
.debug("Failed to find cp %s in external connection points",
751 def find_internal_vlr_by_cp_name(cp_name
):
752 """ Find the VLR corresponding to the connection point name"""
755 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
758 for int_cp
in self
._vdud
.internal_connection_point
:
759 self
._log
.debug("Checking for int cp %s in internal connection points",
761 if int_cp
.id == cp_name
:
766 self
._log
.debug("Failed to find cp %s in internal connection points",
768 msg
= "Failed to find cp %s in internal connection points" % cp_name
769 raise VduRecordError(msg
)
771 # return the VLR associated with the connection point
772 return vnfr
.find_vlr_by_cp(cp_name
)
774 block
= xact
.block_create()
776 self
._log
.debug("Executing vm request id: %s, action: create",
779 # Resolve the networks associated external interfaces
780 for ext_intf
in self
._vdud
.external_interface
:
781 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
782 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
783 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
785 self
._log
.debug("Failed to find connection point - %s",
786 ext_intf
.vnfd_connection_point_ref
)
788 self
._log
.debug("Connection point name [%s], type[%s]",
789 cp
.name
, cp
.type_yang
)
791 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
793 etuple
= (ext_intf
, cp
, vlr
)
794 self
._ext
_intf
.append(etuple
)
796 self
._log
.debug("Created external interface tuple : %s", etuple
)
798 # Resolve the networks associated internal interfaces
799 for intf
in self
._vdud
.internal_interface
:
800 cp_id
= intf
.vdu_internal_connection_point_ref
801 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
805 vlr
= find_internal_vlr_by_cp_name(cp_id
)
806 except Exception as e
:
807 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
808 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
809 raise VduRecordError(msg
)
811 ituple
= (intf
, cp_id
, vlr
)
812 self
._int
_intf
.append(ituple
)
814 self
._log
.debug("Created internal interface tuple : %s", ituple
)
816 resmgr_path
= self
.resmgr_path
817 resmgr_msg
= self
.resmgr_msg(config
)
819 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
820 block
.add_query_create(resmgr_path
, resmgr_msg
)
822 res_iter
= yield from block
.execute(now
=True)
830 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
831 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
832 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
833 return resp
.resource_info
836 def delete_resource(self
, xact
):
837 block
= xact
.block_create()
839 self
._log
.debug("Executing vm request id: %s, action: delete",
842 block
.add_query_delete(self
.resmgr_path
)
844 yield from block
.execute(flags
=0, now
=True)
847 def read_resource(self
, xact
):
848 block
= xact
.block_create()
850 self
._log
.debug("Executing vm request id: %s, action: delete",
853 block
.add_query_read(self
.resmgr_path
)
855 res_iter
= yield from block
.execute(flags
=0, now
=True)
860 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
861 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
862 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
863 #self._vm_resp = resp.resource_info
864 return resp
.resource_info
868 def start_component(self
):
869 """ This VDUR is active """
870 self
._log
.debug("Starting component %s for vdud %s vdur %s",
871 self
._vdud
.vcs_component_ref
,
874 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
875 self
.vm_resp
.management_ip
)
879 """ Is this VDU active """
880 return True if self
._state
is VDURecordState
.READY
else False
883 def instantiation_failed(self
, failed_reason
=None):
884 """ VDU instantiation failed """
885 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
886 self
._state
= VDURecordState
.FAILED
887 self
._state
_failed
_reason
= failed_reason
888 yield from self
._vnfr
.instantiation_failed(failed_reason
)
891 def vdu_is_active(self
):
892 """ This VDU is active"""
894 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
897 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
899 if self
._vdud
.vcs_component_ref
is not None:
900 yield from self
.start_component()
902 self
._state
= VDURecordState
.READY
904 if self
._vnfr
.all_vdus_active():
905 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
906 yield from self
._vnfr
.is_ready()
909 def instantiate(self
, xact
, vnfr
, config
=None):
910 """ Instantiate this VDU """
911 self
._state
= VDURecordState
.INSTANTIATING
914 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
915 """ This VDUR is active """
916 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
921 if (query_action
== rwdts
.QueryAction
.UPDATE
or
922 query_action
== rwdts
.QueryAction
.CREATE
):
925 if msg
.resource_state
== "active":
926 # Move this VDU to ready state
927 yield from self
.vdu_is_active()
928 elif msg
.resource_state
== "failed":
929 yield from self
.instantiation_failed(msg
.resource_errors
)
930 elif query_action
== rwdts
.QueryAction
.DELETE
:
931 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
933 raise NotImplementedError(
934 "%s action on VirtualDeployementUnitRecord not supported",
937 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
940 reg_event
= asyncio
.Event(loop
=self
._loop
)
943 def on_ready(regh
, status
):
946 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
947 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
948 flags
=rwdts
.Flag
.SUBSCRIBER
,
950 yield from reg_event
.wait()
952 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
953 self
._vm
_resp
= vm_resp
954 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
956 self
._log
.debug("Requested VM from resource manager response %s",
958 if vm_resp
.resource_state
== "active":
959 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
961 yield from self
.vdu_is_active()
962 self
._state
= VDURecordState
.READY
963 elif (vm_resp
.resource_state
== "pending" or
964 vm_resp
.resource_state
== "inactive"):
965 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
967 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
968 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
969 # flags=rwdts.Flag.SUBSCRIBER,
972 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
974 raise VirtualDeploymentUnitRecordError(
975 "Failed VDUR instantiation %s " % vm_resp
)
977 except Exception as e
:
979 traceback
.print_exc()
980 self
._log
.exception(e
)
981 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
982 self
._state
= VDURecordState
.FAILED
983 yield from self
.instantiation_failed(str(e
))
986 class VlRecordState(enum
.Enum
):
987 """ VL Record State """
989 INSTANTIATION_PENDING
= 102
991 TERMINATE_PENDING
= 104
996 class InternalVirtualLinkRecord(object):
997 """ Internal Virtual Link record """
998 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
, ip_profile
=None):
1002 self
._ivld
_msg
= ivld_msg
1003 self
._vnfr
_name
= vnfr_name
1004 self
._cloud
_account
_name
= cloud_account_name
1005 self
._ip
_profile
= ip_profile
1007 self
._vlr
_req
= self
.create_vlr()
1009 self
._state
= VlRecordState
.INIT
1013 """ Find VLR by id """
1014 return self
._vlr
_req
.id
1018 """ Name of this VL """
1019 if self
._ivld
_msg
.vim_network_name
:
1020 return self
._ivld
_msg
.vim_network_name
1022 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
1025 def network_id(self
):
1026 """ Find VLR by id """
1027 return self
._vlr
.network_id
if self
._vlr
else None
1030 """ VLR path for this VLR instance"""
1031 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
1033 def create_vlr(self
):
1034 """ Create the VLR record which will be instantiated """
1036 vld_fields
= ["short_name",
1044 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
1046 vlr_dict
= {"id": str(uuid
.uuid4()),
1048 "cloud_account": self
._cloud
_account
_name
,
1051 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
1052 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
1054 vlr_dict
.update(vld_copy_dict
)
1056 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
1060 def instantiate(self
, xact
, restart_mode
=False):
1061 """ Instantiate VL """
1064 def instantiate_vlr():
1065 """ Instantiate VLR"""
1066 self
._log
.debug("Create VL with xpath %s and vlr %s",
1067 self
.vlr_path(), self
._vlr
_req
)
1069 with self
._dts
.transaction(flags
=0) as xact
:
1070 block
= xact
.block_create()
1071 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1072 self
._log
.debug("Executing VL create path:%s msg:%s",
1073 self
.vlr_path(), self
._vlr
_req
)
1077 res_iter
= yield from block
.execute()
1079 self
._state
= VlRecordState
.FAILED
1080 self
._log
.exception("Caught exception while instantial VL")
1083 for ent
in res_iter
:
1084 res
= yield from ent
1085 self
._vlr
= res
.result
1087 if self
._vlr
.operational_status
== 'failed':
1088 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1089 self
._state
= VlRecordState
.FAILED
1090 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1092 self
._log
.info("Created VL with xpath %s and vlr %s",
1093 self
.vlr_path(), self
._vlr
)
1097 """ Get the network id """
1098 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1100 for ent
in res_iter
:
1101 res
= yield from ent
1105 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1107 raise InternalVirtualLinkRecordError(err
)
1110 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1113 vl
= yield from get_vlr()
1115 yield from instantiate_vlr()
1117 yield from instantiate_vlr()
1119 self
._state
= VlRecordState
.ACTIVE
1121 def vlr_in_vns(self
):
1122 """ Is there a VLR record in VNS """
1123 if (self
._state
== VlRecordState
.ACTIVE
or
1124 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1125 self
._state
== VlRecordState
.FAILED
):
1131 def terminate(self
, xact
):
1132 """Terminate this VL """
1133 if not self
.vlr_in_vns():
1134 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1135 self
.vlr_id
, self
._state
)
1138 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1139 self
._state
= VlRecordState
.TERMINATE_PENDING
1140 block
= xact
.block_create()
1141 block
.add_query_delete(self
.vlr_path())
1142 yield from block
.execute(flags
=0, now
=True)
1143 self
._state
= VlRecordState
.TERMINATED
1144 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1147 class VirtualNetworkFunctionRecord(object):
1148 """ Virtual Network Function Record """
1149 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1153 self
._cluster
_name
= cluster_name
1154 self
._vnfr
_msg
= vnfr_msg
1155 self
._vnfr
_id
= vnfr_msg
.id
1156 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1158 self
._vcs
_handler
= vcs_handler
1159 self
._vnfr
= vnfr_msg
1160 self
._mgmt
_network
= mgmt_network
1162 self
._vnfd
= vnfr_msg
.vnfd
1163 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1164 self
._state
_failed
_reason
= None
1165 self
._ext
_vlrs
= {} # The list of external virtual links
1166 self
._vlrs
= [] # The list of internal virtual links
1167 self
._vdus
= [] # The list of vdu
1168 self
._vlr
_by
_cp
= {}
1170 self
._inventory
= {}
1171 self
._create
_time
= int(time
.time())
1172 self
._vnf
_mon
= None
1173 self
._config
_status
= vnfr_msg
.config_status
1174 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1175 self
._rw
_vnfd
= None
1176 self
._vnfd
_ref
_count
= 0
1178 def _get_vdur_from_vdu_id(self
, vdu_id
):
1179 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1180 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1181 for vdu
in self
._vdus
:
1182 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1183 if vdu
.vdu_id
== vdu_id
:
1186 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1189 def operational_status(self
):
1190 """ Operational status of this VNFR """
1191 op_status_map
= {"INIT": "init",
1192 "VL_INIT_PHASE": "vl_init_phase",
1193 "VM_INIT_PHASE": "vm_init_phase",
1195 "TERMINATE": "terminate",
1196 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1197 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1198 "TERMINATED": "terminated",
1199 "FAILED": "failed", }
1200 return op_status_map
[self
._state
.name
]
1203 def vnfd_xpath(vnfd_id
):
1204 """ VNFD xpath associated with this VNFR """
1205 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1208 def vnfd_ref_count(self
):
1209 """ Returns the VNFD reference count associated with this VNFR """
1210 return self
._vnfd
_ref
_count
1212 def vnfd_in_use(self
):
1213 """ Returns whether vnfd is in use or not """
1214 return True if self
._vnfd
_ref
_count
> 0 else False
1217 """ Take a reference on this object """
1218 self
._vnfd
_ref
_count
+= 1
1219 return self
._vnfd
_ref
_count
1221 def vnfd_unref(self
):
1222 """ Release reference on this object """
1223 if self
._vnfd
_ref
_count
< 1:
1224 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1225 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1226 self
._log
.critical(msg
)
1227 raise VnfRecordError(msg
)
1228 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1229 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1230 self
._vnfd
_ref
_count
-= 1
1231 return self
._vnfd
_ref
_count
1235 """ VNFD for this VNFR """
1240 """ VNFD name associated with this VNFR """
1241 return self
.vnfd
.name
1245 """ Name of this VNF in the record """
1246 return self
._vnfr
.name
1249 def cloud_account_name(self
):
1250 """ Name of the cloud account this VNFR is instantiated in """
1251 return self
._vnfr
.cloud_account
1255 """ VNFD Id associated with this VNFR """
1260 """ VNFR Id associated with this VNFR """
1261 return self
._vnfr
_id
1264 def member_vnf_index(self
):
1265 """ Member VNF index associated with this VNFR """
1266 return self
._vnfr
.member_vnf_index_ref
1269 def config_status(self
):
1270 """ Config agent status for this VNFR """
1271 return self
._config
_status
1273 def component_by_name(self
, component_name
):
1274 """ Find a component by name in the inventory list"""
1275 mangled_name
= VcsComponent
.mangle_name(component_name
,
1278 return self
._inventory
[mangled_name
]
1283 def get_nsr_config(self
):
1284 ### Need access to NS instance configuration for runtime resolution.
1285 ### This shall be replaced when deployment flavors are implemented
1286 xpath
= "C,/nsr:ns-instance-config"
1287 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1289 for result
in results
:
1290 entry
= yield from result
1291 ns_instance_config
= entry
.result
1292 for nsr
in ns_instance_config
.nsr
:
1293 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1298 def start_component(self
, component_name
, ip_addr
):
1299 """ Start a component in the VNFR by name """
1300 comp
= self
.component_by_name(component_name
)
1301 yield from comp
.start(None, None, ip_addr
)
1303 def cp_ip_addr(self
, cp_name
):
1304 """ Get ip address for connection point """
1305 self
._log
.debug("cp_ip_addr()")
1306 for cp
in self
._cprs
:
1307 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1308 return cp
.ip_address
1311 def mgmt_intf_info(self
):
1312 """ Get Management interface info for this VNFR """
1313 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1315 if mgmt_intf_desc
.has_field("cp"):
1316 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1317 elif mgmt_intf_desc
.has_field("vdu_id"):
1319 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1320 ip_addr
= vdur
.management_ip
1321 except VDURecordNotFound
:
1322 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1325 ip_addr
= mgmt_intf_desc
.ip_address
1326 port
= mgmt_intf_desc
.port
1328 return ip_addr
, port
1332 """ Message associated with this VNFR """
1333 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1334 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1336 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1337 ip_address
, port
= self
.mgmt_intf_info()
1339 if ip_address
is not None:
1340 mgmt_intf
.ip_address
= ip_address
1341 if port
is not None:
1342 mgmt_intf
.port
= port
1344 vnfr_dict
= {"id": self
._vnfr
_id
,
1345 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1347 "member_vnf_index_ref": self
.member_vnf_index
,
1348 "operational_status": self
.operational_status
,
1349 "operational_status_details": self
._state
_failed
_reason
,
1350 "cloud_account": self
.cloud_account_name
,
1351 "config_status": self
._config
_status
1354 vnfr_dict
.update(vnfd_copy_dict
)
1356 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1357 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1359 vnfr_msg
.create_time
= self
._create
_time
1360 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1361 vnfr_msg
.mgmt_interface
= mgmt_intf
1363 # Add all the VLRs to VNFR
1364 for vlr
in self
._vlrs
:
1365 ivlr
= vnfr_msg
.internal_vlr
.add()
1366 ivlr
.vlr_ref
= vlr
.vlr_id
1368 # Add all the VDURs to VDUR
1369 if self
._vdus
is not None:
1370 for vdu
in self
._vdus
:
1371 vdur
= vnfr_msg
.vdur
.add()
1372 vdur
.from_dict(vdu
.msg
.as_dict())
1374 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1375 vnfr_msg
.dashboard_url
= self
.dashboard_url
1377 for cpr
in self
._cprs
:
1378 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1379 vnfr_msg
.connection_point
.append(new_cp
)
1381 if self
._vnf
_mon
is not None:
1382 for monp
in self
._vnf
_mon
.msg
:
1383 vnfr_msg
.monitoring_param
.append(
1384 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1386 if self
._vnfr
.vnf_configuration
is not None:
1387 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1388 if (ip_address
is not None and
1389 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1390 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1392 for group
in self
._vnfr
_msg
.placement_groups_info
:
1393 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1394 group_info
.from_dict(group
.as_dict())
1395 vnfr_msg
.placement_groups_info
.append(group_info
)
1400 def dashboard_url(self
):
1401 ip
, cfg_port
= self
.mgmt_intf_info()
1404 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1405 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1408 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1409 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1411 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1415 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1422 """ path for this VNFR """
1423 return("D,/vnfr:vnfr-catalog"
1424 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1427 def publish(self
, xact
):
1428 """ publish this VNFR """
1430 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1431 self
.xpath
, self
.msg
)
1432 vnfr
.create_time
= self
._create
_time
1433 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1434 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1435 self
.xpath
, self
.msg
)
1437 def resolve_vld_ip_profile(self
, vnfd_msg
, vld
):
1438 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1439 if not vld
.has_field('ip_profile_ref'):
1441 profile
= [profile
for profile
in vnfd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1442 return profile
[0] if profile
else None
1445 def create_vls(self
):
1446 """ Publish The VLs associated with this VNF """
1447 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1449 for ivld_msg
in self
.vnfd
.internal_vld
:
1450 self
._log
.debug("Creating internal vld:"
1451 " %s, int_cp_ref = %s",
1452 ivld_msg
, ivld_msg
.internal_connection_point
1454 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1458 vnfr_name
=self
.name
,
1459 cloud_account_name
=self
.cloud_account_name
,
1460 ip_profile
=self
.resolve_vld_ip_profile(self
.vnfd
, ivld_msg
)
1462 self
._vlrs
.append(vlr
)
1464 for int_cp
in ivld_msg
.internal_connection_point
:
1465 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1466 msg
= ("Connection point %s already "
1467 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1468 raise InternalVirtualLinkRecordError(msg
)
1469 self
._log
.debug("Setting vlr %s to internal cp = %s",
1471 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1474 def instantiate_vls(self
, xact
, restart_mode
=False):
1475 """ Instantiate the VLs associated with this VNF """
1476 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1479 for vlr
in self
._vlrs
:
1480 self
._log
.debug("Instantiating VLR %s", vlr
)
1481 yield from vlr
.instantiate(xact
, restart_mode
)
1483 def find_vlr_by_cp(self
, cp_name
):
1484 """ Find the VLR associated with the cp name """
1485 return self
._vlr
_by
_cp
[cp_name
]
1487 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1489 Returns the cloud specific construct for placement group
1491 input_group: VNFD PlacementGroup
1492 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1494 copy_dict
= ['name', 'requirement', 'strategy']
1495 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1496 if group_info
.placement_group_ref
== input_group
.name
and \
1497 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1498 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1499 group_dict
= {k
:v
for k
,v
in
1500 group_info
.as_dict().items()
1501 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1502 for param
in copy_dict
:
1503 group_dict
.update({param
: getattr(input_group
, param
)})
1504 group
.from_dict(group_dict
)
1509 def get_vdu_placement_groups(self
, vdu
):
1510 placement_groups
= []
1511 ### Step-1: Get VNF level placement groups
1512 for group
in self
._vnfr
_msg
.placement_groups_info
:
1513 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1514 #group_info.from_dict(group.as_dict())
1515 placement_groups
.append(group
)
1517 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1518 nsr_config
= yield from self
.get_nsr_config()
1520 ### Step-3: Get VDU level placement groups
1521 for group
in self
.vnfd
.placement_groups
:
1522 for member_vdu
in group
.member_vdus
:
1523 if member_vdu
.member_vdu_ref
== vdu
.id:
1524 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1526 if group_info
is None:
1527 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1528 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1530 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1534 self
.member_vnf_index
)
1535 placement_groups
.append(group_info
)
1537 return placement_groups
1540 def vdu_cloud_init_instantiation(self
):
1541 [vdu
.vdud_cloud_init
for vdu
in self
._vdus
]
1544 def create_vdus(self
, vnfr
, restart_mode
=False):
1545 """ Create the VDUs associated with this VNF """
1547 def get_vdur_id(vdud
):
1548 """Get the corresponding VDUR's id for the VDUD. This is useful in
1551 In restart mode we check for exiting VDUR's ID and use them, if
1552 available. This way we don't end up creating duplicate VDURs
1556 if restart_mode
and vdud
is not None:
1558 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1561 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1566 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1567 for vdu
in self
._rw
_vnfd
.vdu
:
1568 self
._log
.debug("Creating vdu: %s", vdu
)
1569 vdur_id
= get_vdur_id(vdu
)
1571 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1572 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1575 self
.member_vnf_index
,
1576 [ group
.name
for group
in placement_groups
])
1578 vdur
= VirtualDeploymentUnitRecord(
1584 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1585 mgmt_network
=self
._mgmt
_network
,
1586 cloud_account_name
=self
.cloud_account_name
,
1587 vnfd_package_store
=self
._vnfd
_package
_store
,
1589 placement_groups
= placement_groups
,
1591 yield from vdur
.vdu_opdata_register()
1593 self
._vdus
.append(vdur
)
1596 def instantiate_vdus(self
, xact
, vnfr
):
1597 """ Instantiate the VDUs associated with this VNF """
1598 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1600 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1602 # Identify any dependencies among the VDUs
1603 dependencies
= collections
.defaultdict(list)
1604 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1606 for vdu
in self
._vdus
:
1607 if vdu
._vdud
_cloud
_init
is not None:
1608 for vdu_id
in vdu_id_pattern
.findall(vdu
._vdud
_cloud
_init
):
1609 if vdu_id
!= vdu
.vdu_id
:
1610 # This means that vdu.vdu_id depends upon vdu_id,
1611 # i.e. vdu_id must be instantiated before
1613 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1615 # Define the terminal states of VDU instantiation
1617 VDURecordState
.READY
,
1618 VDURecordState
.TERMINATED
,
1619 VDURecordState
.FAILED
,
1622 datastore
= VdurDatastore()
1626 def instantiate_monitor(vdu
):
1627 """Monitor the state of the VDU during instantiation
1630 vdu - a VirtualDeploymentUnitRecord
1633 # wait for the VDUR to enter a terminal state
1634 while vdu
._state
not in terminal
:
1635 yield from asyncio
.sleep(1, loop
=self
._loop
)
1636 # update the datastore
1637 datastore
.update(vdu
)
1639 # add the VDU to the set of processed VDUs
1640 processed
.add(vdu
.vdu_id
)
1643 def instantiate(vdu
):
1644 """Instantiate the specified VDU
1647 vdu - a VirtualDeploymentUnitRecord
1650 if the VDU, or any of the VDUs this VDU depends upon, are
1651 terminated or fail to instantiate properly, a
1652 VirtualDeploymentUnitRecordError is raised.
1655 for dependency
in dependencies
[vdu
.vdu_id
]:
1656 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1658 while dependency
.vdu_id
not in processed
:
1659 yield from asyncio
.sleep(1, loop
=self
._loop
)
1661 if not dependency
.active
:
1662 raise VirtualDeploymentUnitRecordError()
1664 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1666 # Populate the datastore with the current values of the VDU
1669 # Substitute any variables contained in the cloud config script
1670 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1672 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1675 # Extract the variable names
1677 for variable
in parts
[1::2]:
1678 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1680 # Iterate of the variables and substitute values from the
1682 for variable
in variables
:
1684 # Handle a reference to a VDU by ID
1685 if variable
.startswith('vdu['):
1686 value
= datastore
.get(variable
)
1688 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1689 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1691 config
= config
.replace("{{ %s }}" % variable
, value
)
1694 # Handle a reference to the current VDU
1695 if variable
.startswith('vdu'):
1696 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1697 config
= config
.replace("{{ %s }}" % variable
, value
)
1700 # Handle unrecognized variables
1701 msg
= 'unrecognized cloud-config variable: {}'
1702 raise ValueError(msg
.format(variable
))
1704 # Instantiate the VDU
1705 with self
._dts
.transaction() as xact
:
1706 self
._log
.debug("Instantiating vdu: %s", vdu
)
1707 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1708 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1709 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1712 # First create a set of tasks to monitor the state of the VDUs and
1713 # report when they have entered a terminal state
1714 for vdu
in self
._vdus
:
1715 self
._loop
.create_task(instantiate_monitor(vdu
))
1717 for vdu
in self
._vdus
:
1718 self
._loop
.create_task(instantiate(vdu
))
1720 def has_mgmt_interface(self
, vdu
):
1721 # ## TODO: Support additional mgmt_interface type options
1722 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1726 def vlr_xpath(self
, vlr_id
):
1729 "D,/vlr:vlr-catalog/"
1730 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1732 def ext_vlr_by_id(self
, vlr_id
):
1733 """ find ext vlr by id """
1734 return self
._ext
_vlrs
[vlr_id
]
1737 def publish_inventory(self
, xact
):
1738 """ Publish the inventory associated with this VNF """
1739 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1741 for component
in self
._rw
_vnfd
.component
:
1742 self
._log
.debug("Creating inventory component %s", component
)
1743 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1747 comp
= VcsComponent(dts
=self
._dts
,
1750 cluster_name
=self
._cluster
_name
,
1751 vcs_handler
=self
._vcs
_handler
,
1752 component
=component
,
1753 mangled_name
=mangled_name
,
1755 if comp
.name
in self
._inventory
:
1756 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1757 component
, self
._vnfd
_id
)
1759 self
._log
.debug("Adding component %s for vnrf %s",
1760 comp
.name
, self
._vnfr
_id
)
1761 self
._inventory
[comp
.name
] = comp
1762 yield from comp
.publish(xact
)
1764 def all_vdus_active(self
):
1765 """ Are all VDUS in this VNFR active? """
1766 for vdu
in self
._vdus
:
1770 self
._log
.debug("Inside all_vdus_active. Returning True")
1774 def instantiation_failed(self
, failed_reason
=None):
1775 """ VNFR instantiation failed """
1776 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1777 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1778 self
._state
_failed
_reason
= failed_reason
1780 # Update the VNFR with the changed status
1781 yield from self
.publish(None)
1785 """ This VNF is ready"""
1786 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1788 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1789 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1792 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1794 # Update the VNFR with the changed status
1795 yield from self
.publish(None)
1797 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1798 """Updated the connection point with ip address"""
1799 for cp
in self
._cprs
:
1800 if cp
.name
== cp_name
:
1801 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1802 cp_name
, cp
, ip_address
, cp_id
)
1803 cp
.ip_address
= ip_address
1804 cp
.mac_address
= mac_addr
1805 cp
.connection_point_id
= cp_id
1808 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1809 self
._log
.debug(err
)
1810 raise VirtualDeploymentUnitRecordError(err
)
1812 def set_state(self
, state
):
1813 """ Set state for this VNFR"""
1817 def instantiate(self
, xact
, restart_mode
=False):
1818 """ instantiate this VNF """
1819 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1820 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1825 # Iterate over all the connection points in VNFR and fetch the
1828 def cpr_from_cp(cp
):
1829 """ Creates a record level connection point from the desciptor cp"""
1830 cp_fields
= ["name", "image", "vm-flavor", "port_security_enabled"]
1831 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1833 cpr_dict
.update(cp_copy_dict
)
1834 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1836 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1837 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1839 for cp
in self
._vnfr
.connection_point
:
1840 cpr
= cpr_from_cp(cp
)
1841 self
._cprs
.append(cpr
)
1842 self
._log
.debug("Adding Connection point record %s ", cp
)
1844 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1845 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1846 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1847 rwdts
.XactFlag
.MERGE
)
1851 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1852 cpr
.vlr_ref
= cp
.vlr_ref
1853 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1855 # Increase the VNFD reference count
1860 # Fetch External VLRs
1861 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1862 yield from fetch_vlrs()
1865 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1866 yield from self
.publish_inventory(xact
)
1869 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1870 yield from self
.create_vls()
1873 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1874 yield from self
.publish(xact
)
1878 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1880 yield from self
.instantiate_vls(xact
, restart_mode
)
1881 except Exception as e
:
1882 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1883 yield from self
.instantiation_failed(str(e
))
1886 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1889 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1890 yield from self
.create_vdus(self
, restart_mode
)
1893 yield from self
.vdu_cloud_init_instantiation()
1894 except Exception as e
:
1895 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1896 self
._state
_failed
_reason
= str(e
)
1897 yield from self
.publish(xact
)
1900 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1901 yield from self
.publish(xact
)
1904 # ToDo: Check if this should be prevented during restart
1905 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1906 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1909 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1910 yield from self
.publish(xact
)
1912 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1914 # create task updating uptime for this vnfr
1915 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1916 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1919 def terminate(self
, xact
):
1920 """ Terminate this virtual network function """
1922 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1924 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1927 if self
._vnf
_mon
is not None:
1928 self
._vnf
_mon
.stop()
1929 self
._vnf
_mon
.deregister()
1930 self
._vnf
_mon
= None
1933 def terminate_vls():
1934 """ Terminate VLs in this VNF """
1935 for vl
in self
._vlrs
:
1936 yield from vl
.terminate(xact
)
1939 def terminate_vdus():
1940 """ Terminate VDUS in this VNF """
1941 for vdu
in self
._vdus
:
1942 yield from vdu
.terminate(xact
)
1944 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1945 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1946 yield from terminate_vls()
1948 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1949 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1950 yield from terminate_vdus()
1952 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1953 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1956 def vnfr_uptime_update(self
, xact
):
1958 # Return when vnfr state is FAILED or TERMINATED etc
1959 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1960 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1961 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1962 VirtualNetworkFunctionRecordState
.READY
]:
1964 yield from self
.publish(xact
)
1965 yield from asyncio
.sleep(2, loop
=self
._loop
)
1969 class VnfdDtsHandler(object):
1970 """ DTS handler for VNFD config changes """
1971 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1973 def __init__(self
, dts
, log
, loop
, vnfm
):
1982 """ DTS registration handle """
1987 """ Register for VNFD configuration"""
1989 def on_apply(dts
, acg
, xact
, action
, scratch
):
1990 """Apply the configuration"""
1991 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1992 xact
, action
, scratch
)
1994 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1997 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1998 """ on prepare callback """
1999 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
2000 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
2001 fref
= ProtobufC
.FieldReference
.alloc()
2002 fref
.goto_whole_message(msg
.to_pbcm())
2004 # Handle deletes in prepare_callback
2005 if fref
.is_field_deleted():
2006 # Delete an VNFD record
2007 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
2008 if self
._vnfm
.vnfd_in_use(msg
.id):
2009 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
2010 err
= "Cannot delete a VNFD in use - %s" % msg
2011 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
2012 # Delete a VNFD record
2013 yield from self
._vnfm
.delete_vnfd(msg
.id)
2015 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2018 "Registering for VNFD config using xpath: %s",
2019 VnfdDtsHandler
.XPATH
,
2021 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2022 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2023 self
._regh
= acg
.register(
2024 xpath
=VnfdDtsHandler
.XPATH
,
2025 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2026 on_prepare
=on_prepare
)
2029 class VcsComponentDtsHandler(object):
2030 """ Vcs Component DTS handler """
2031 XPATH
= ("D,/rw-manifest:manifest" +
2032 "/rw-manifest:operational-inventory" +
2033 "/rw-manifest:component")
2035 def __init__(self
, dts
, log
, loop
, vnfm
):
2044 """ DTS registration handle """
2049 """ Registers VCS component dts publisher registration"""
2050 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
2051 VcsComponentDtsHandler
.XPATH
)
2053 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
2054 handlers
= rift
.tasklets
.Group
.Handler()
2055 with self
._dts
.group_create(handler
=handlers
) as group
:
2056 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
2058 flags
=(rwdts
.Flag
.PUBLISHER |
2059 rwdts
.Flag
.NO_PREP_READ |
2060 rwdts
.Flag
.DATASTORE
),)
2063 def publish(self
, xact
, path
, msg
):
2064 """ Publishes the VCS component """
2065 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
2067 self
.regh
.create_element(path
, msg
)
2068 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2069 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
2071 class VnfrConsoleOperdataDtsHandler(object):
2072 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2074 def vnfr_vdu_console_xpath(self
):
2075 """ path for resource-mgr"""
2076 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
2078 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2085 self
._vnfr
_id
= vnfr_id
2086 self
._vdur
_id
= vdur_id
2087 self
._vdu
_id
= vdu_id
2091 """ Register for VNFR VDU Operational Data read from dts """
2094 def on_prepare(xact_info
, action
, ks_path
, msg
):
2095 """ prepare callback from dts """
2096 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2098 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2099 xact_info
, action
, xpath
, msg
2102 if action
== rwdts
.QueryAction
.READ
:
2103 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2104 path_entry
= schema
.keyspec_to_entry(ks_path
)
2105 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2107 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2108 except VnfRecordError
as e
:
2109 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2110 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2113 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2114 if not vdur
._state
== VDURecordState
.READY
:
2115 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2116 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2118 with self
._dts
.transaction() as new_xact
:
2119 resp
= yield from vdur
.read_resource(new_xact
)
2120 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2121 vdur_console
.id = self
._vdur
_id
2122 if resp
.console_url
:
2123 vdur_console
.console_url
= resp
.console_url
2125 vdur_console
.console_url
= 'none'
2126 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2128 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2129 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2130 vdur_console
.id = self
._vdur
_id
2131 vdur_console
.console_url
= 'none'
2133 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2134 xpath
=self
.vnfr_vdu_console_xpath
,
2137 #raise VnfRecordError("Not supported operation %s" % action)
2138 self
._log
.error("Not supported operation %s" % action
)
2139 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2143 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2144 self
.vnfr_vdu_console_xpath
)
2145 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2146 with self
._dts
.group_create() as group
:
2147 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2149 flags
=rwdts
.Flag
.PUBLISHER
,
2153 class VnfrDtsHandler(object):
2154 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2155 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2157 def __init__(self
, dts
, log
, loop
, vnfm
):
2167 """ Return registration handle"""
2172 """ Return VNF manager instance """
2177 """ Register for vnfr create/update/delete/read requests from dts """
2178 def on_commit(xact_info
):
2179 """ The transaction has been committed """
2180 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2181 return rwdts
.MemberRspCode
.ACTION_OK
2183 def on_abort(*args
):
2184 """ Abort callback """
2185 self
._log
.debug("VNF transaction got aborted")
2188 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2191 def instantiate_realloc_vnfr(vnfr
):
2192 """Re-populate the vnfm after restart
2199 yield from vnfr
.instantiate(None, restart_mode
=True)
2201 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2202 curr_cfg
= self
.regh
.elements
2203 for cfg
in curr_cfg
:
2204 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2205 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2207 self
._log
.debug("Got on_event in vnfm")
2209 return rwdts
.MemberRspCode
.ACTION_OK
2212 def on_prepare(xact_info
, action
, ks_path
, msg
):
2213 """ prepare callback from dts """
2215 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2216 xact_info
, action
, msg
2219 if action
== rwdts
.QueryAction
.CREATE
:
2220 if not msg
.has_field("vnfd"):
2221 err
= "Vnfd not provided"
2222 self
._log
.error(err
)
2223 raise VnfRecordError(err
)
2225 vnfr
= self
.vnfm
.create_vnfr(msg
)
2227 # RIFT-9105: Unable to add a READ query under an existing transaction
2228 # xact = xact_info.xact
2229 yield from vnfr
.instantiate(None)
2230 except Exception as e
:
2231 self
._log
.exception(e
)
2232 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2233 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2234 yield from vnfr
.publish(None)
2235 elif action
== rwdts
.QueryAction
.DELETE
:
2236 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2237 path_entry
= schema
.keyspec_to_entry(ks_path
)
2238 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2241 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2242 raise VirtualNetworkFunctionRecordNotFound(
2243 "VNFR id %s", path_entry
.key00
.id)
2246 yield from vnfr
.terminate(xact_info
.xact
)
2249 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2250 except Exception as e
:
2251 self
._log
.exception(e
)
2252 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2254 elif action
== rwdts
.QueryAction
.UPDATE
:
2255 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2256 path_entry
= schema
.keyspec_to_entry(ks_path
)
2259 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2260 except Exception as e
:
2261 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2262 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2266 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2267 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2270 self
._log
.debug("VNFR {} update config status {} (current {})".
2271 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2272 # Update the config status and publish
2273 vnfr
._config
_status
= msg
.config_status
2274 yield from vnfr
.publish(None)
2277 raise NotImplementedError(
2278 "%s action on VirtualNetworkFunctionRecord not supported",
2281 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2283 self
._log
.debug("Registering for VNFR using xpath: %s",
2284 VnfrDtsHandler
.XPATH
,)
2286 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2287 on_prepare
=on_prepare
,)
2288 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2289 with self
._dts
.group_create(handler
=handlers
) as group
:
2290 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2292 flags
=(rwdts
.Flag
.PUBLISHER |
2293 rwdts
.Flag
.NO_PREP_READ |
2295 rwdts
.Flag
.DATASTORE
),)
2298 def create(self
, xact
, path
, msg
):
2300 Create a VNFR record in DTS with path and message
2302 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2305 self
.regh
.create_element(path
, msg
)
2306 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2310 def update(self
, xact
, path
, msg
):
2312 Update a VNFR record in DTS with path and message
2314 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2316 self
.regh
.update_element(path
, msg
)
2317 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2321 def delete(self
, xact
, path
):
2323 Delete a VNFR record in DTS with path and message
2325 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2326 self
.regh
.delete_element(path
)
2327 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2330 class VnfdRefCountDtsHandler(object):
2331 """ The VNFD Ref Count DTS handler """
2332 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2334 def __init__(self
, dts
, log
, loop
, vnfm
):
2344 """ Return registration handle """
2349 """ Return the NS manager instance """
2354 """ Register for VNFD ref count read from dts """
2357 def on_prepare(xact_info
, action
, ks_path
, msg
):
2358 """ prepare callback from dts """
2359 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2361 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2362 xact_info
, action
, xpath
, msg
2365 if action
== rwdts
.QueryAction
.READ
:
2366 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2367 path_entry
= schema
.keyspec_to_entry(ks_path
)
2368 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2369 for xpath
, msg
in vnfd_list
:
2370 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2372 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2375 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2377 raise VnfRecordError("Not supported operation %s" % action
)
2379 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2380 with self
._dts
.group_create() as group
:
2381 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2383 flags
=rwdts
.Flag
.PUBLISHER
,
2387 class VdurDatastore(object):
2389 This VdurDatastore is intended to expose select information about a VDUR
2390 such that it can be referenced in a cloud config file. The data that is
2391 exposed does not necessarily follow the structure of the data in the yang
2392 model. This is intentional. The data that are exposed are intended to be
2393 agnostic of the yang model so that changes in the model do not necessarily
2394 require changes to the interface provided to the user. It also means that
2395 the user does not need to be familiar with the RIFT.ware yang models.
2399 """Create an instance of VdurDatastore"""
2400 self
._vdur
_data
= dict()
2401 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2403 def add(self
, vdur
):
2404 """Add a new VDUR to the datastore
2407 vdur - a VirtualDeploymentUnitRecord instance
2410 A ValueError is raised if the VDUR is (1) None or (2) already in
2414 if vdur
.vdu_id
is None:
2415 raise ValueError('VDURs are required to have an ID')
2417 if vdur
.vdu_id
in self
._vdur
_data
:
2418 raise ValueError('cannot add a VDUR more than once')
2420 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2422 def set_if_not_none(key
, attr
):
2423 if attr
is not None:
2424 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2426 set_if_not_none('name', vdur
._vdud
.name
)
2427 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2429 def update(self
, vdur
):
2430 """Update the VDUR information in the datastore
2433 vdur - a GI representation of a VDUR
2436 A ValueError is raised if the VDUR is (1) None or (2) already in
2440 if vdur
.vdu_id
is None:
2441 raise ValueError('VNFDs are required to have an ID')
2443 if vdur
.vdu_id
not in self
._vdur
_data
:
2444 raise ValueError('VNF is not recognized')
2446 def set_or_delete(key
, attr
):
2448 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2449 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2452 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2454 set_or_delete('name', vdur
._vdud
.name
)
2455 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2457 def remove(self
, vdur_id
):
2458 """Remove all of the data associated with specified VDUR
2461 vdur_id - the identifier of a VNFD in the datastore
2464 A ValueError is raised if the VDUR is not contained in the
2468 if vdur_id
not in self
._vdur
_data
:
2469 raise ValueError('VNF is not recognized')
2471 del self
._vdur
_data
[vdur_id
]
2473 def get(self
, expr
):
2474 """Retrieve VDUR information from the datastore
2476 An expression should be of the form,
2480 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2481 the exposed attribute that the user wishes to retrieve.
2483 If the requested data is not available, None is returned.
2486 expr - a string that specifies the data to return
2489 A ValueError is raised if the provided expression cannot be parsed.
2492 The requested data or None
2495 result
= self
._pattern
.match(expr
)
2497 raise ValueError('data expression not recognized ({})'.format(expr
))
2499 vdur_id
, key
= result
.groups()
2501 if vdur_id
not in self
._vdur
_data
:
2504 return self
._vdur
_data
[vdur_id
].get(key
, None)
2507 class VnfManager(object):
2508 """ The virtual network function manager class """
2509 def __init__(self
, dts
, log
, loop
, cluster_name
):
2513 self
._cluster
_name
= cluster_name
2515 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2516 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2517 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2518 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2520 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2523 self
._vnfr
_ref
_handler
,
2526 self
._vnfds
_to
_vnfr
= {}
2530 def vnfr_handler(self
):
2531 """ VNFR dts handler """
2532 return self
._vnfr
_handler
2535 def vcs_handler(self
):
2536 """ VCS dts handler """
2537 return self
._vcs
_handler
2541 """ Register all static DTS handlers """
2542 for hdl
in self
._dts
_handlers
:
2543 yield from hdl
.register()
2547 """ Run this VNFM instance """
2548 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2549 yield from self.register()
2551 def handle_nsr(self, nsr, action):
2552 if action in [rwdts.QueryAction.CREATE]:
2553 self._nsrs[nsr.id] = nsr
2554 elif action == rwdts.QueryAction.DELETE:
2555 if nsr.id in self._nsrs:
2556 del self._nsrs[nsr.id]
2558 def get_linked_mgmt_network(self, vnfr):
2559 """For the given VNFR get the related mgmt network from the NSD, if
2562 vnfd_id = vnfr.vnfd.id
2563 nsr_id = vnfr.nsr_id_ref
2565 # for the given related VNFR, get the corresponding NSR-config
2568 nsr_obj = self._nsrs[nsr_id]
2570 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2572 # for the related NSD check if a VLD exists such that it's a mgmt
2574 for vld in nsr_obj.nsd.vld:
2575 if vld.mgmt_network:
2580 def get_vnfr(self, vnfr_id):
2581 """ get VNFR by vnfr id """
2583 if vnfr_id not in self._vnfrs:
2584 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2586 return self._vnfrs[vnfr_id]
2588 def create_vnfr(self, vnfr):
2589 """ Create a VNFR instance """
2590 if vnfr.id in self._vnfrs:
2591 msg = "Vnfr
id %s already exists
" % vnfr.id
2592 self._log.error(msg)
2593 raise VnfRecordError(msg)
2595 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2599 mgmt_network = self.get_linked_mgmt_network(vnfr)
2601 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2602 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2603 mgmt_network=mgmt_network
2607 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2608 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2610 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2612 return self._vnfrs[vnfr.id]
2615 def delete_vnfr(self, xact, vnfr):
2616 """ Create a VNFR instance """
2617 if vnfr.vnfr_id in self._vnfrs:
2618 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2619 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2621 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2622 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2623 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2625 del self._vnfrs[vnfr.vnfr_id]
2628 def fetch_vnfd(self, vnfd_id):
2629 """ Fetch VNFDs based with the vnfd id"""
2630 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2631 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2634 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2636 for ent in res_iter:
2637 res = yield from ent
2641 err = "Failed to get Vnfd
%s" % vnfd_id
2642 self._log.error(err)
2643 raise VnfRecordError(err)
2645 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2649 def vnfd_in_use(self, vnfd_id):
2650 """ Is this VNFD in use """
2651 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2652 if vnfd_id in self._vnfds_to_vnfr:
2653 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2657 def publish_vnfr(self, xact, path, msg):
2658 """ Publish a VNFR """
2659 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2661 yield from self.vnfr_handler.update(xact, path, msg)
2664 def delete_vnfd(self, vnfd_id):
2665 """ Delete the Virtual Network Function descriptor with the passed id """
2666 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2667 if vnfd_id in self._vnfds_to_vnfr:
2668 if self._vnfds_to_vnfr[vnfd_id]:
2669 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2671 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2672 raise VirtualNetworkFunctionDescriptorRefCountExists(
2673 "Cannot delete
:%s, ref_count
:%s",
2675 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2677 del self._vnfds_to_vnfr[vnfd_id]
2679 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2681 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2682 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2683 if os.path.exists(vnfd_dir):
2684 shutil.rmtree(vnfd_dir, ignore_errors=True)
2685 except Exception as e:
2686 self._log.error("Exception in cleaning up VNFD
{}: {}".
2687 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2688 self._log.exception(e)
2691 def vnfd_refcount_xpath(self, vnfd_id):
2692 """ xpath for ref count entry """
2693 return (VnfdRefCountDtsHandler.XPATH +
2694 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2697 def get_vnfd_refcount(self, vnfd_id):
2698 """ Get the vnfd_list from this VNFM"""
2700 if vnfd_id is None or vnfd_id == "":
2701 for vnfd in self._vnfds_to_vnfr.keys():
2702 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2703 vnfd_msg.vnfd_id_ref = vnfd
2704 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2705 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2706 elif vnfd_id in self._vnfds_to_vnfr:
2707 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2708 vnfd_msg.vnfd_id_ref = vnfd_id
2709 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2710 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2715 class VnfmTasklet(rift.tasklets.Tasklet):
2716 """ VNF Manager tasklet class """
2717 def __init__(self, *args, **kwargs):
2718 super(VnfmTasklet, self).__init__(*args, **kwargs)
2719 self.rwlog.set_category("rw
-mano
-log
")
2720 self.rwlog.set_subcategory("vnfm
")
2727 super(VnfmTasklet, self).start()
2728 self.log.info("Starting VnfmTasklet
")
2730 self.log.setLevel(logging.DEBUG)
2732 self.log.debug("Registering with dts
")
2733 self._dts = rift.tasklets.DTS(self.tasklet_info,
2734 RwVnfmYang.get_schema(),
2736 self.on_dts_state_change)
2738 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2740 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2743 def on_instance_started(self):
2744 """ Task insance started callback """
2745 self.log.debug("Got instance started callback
")
2751 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2756 """ Task init callback """
2758 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2759 assert vm_parent_name is not None
2760 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2761 yield from self._vnfm.run()
2763 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2768 """ Task run callback """
2772 def on_dts_state_change(self, state):
2773 """Take action according to current dts state to transition
2774 application into the corresponding application state
2777 state - current dts state
2780 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2781 rwdts.State.CONFIG: rwdts.State.RUN,
2785 rwdts.State.INIT: self.init,
2786 rwdts.State.RUN: self.run,
2789 # Transition application to next state
2790 handler = handlers.get(state, None)
2791 if handler is not None:
2792 yield from handler()
2794 # Transition dts to next state
2795 next_state = switch.get(state, None)
2796 if next_state is not None:
2797 self._dts.handle.set_state(next_state)