2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.package
.script
53 import rift
.mano
.dts
as mano_dts
54 import rift
.mano
.utils
.short_name
as mano_short_name
57 class VMResourceError(Exception):
58 """ VM resource Error"""
62 class VnfRecordError(Exception):
63 """ VNF record instatiation failed"""
67 class VduRecordError(Exception):
68 """ VDU record instatiation failed"""
72 class NotImplemented(Exception):
73 """Not implemented """
77 class VnfrRecordExistsError(Exception):
78 """VNFR record already exist with the same VNFR id"""
82 class InternalVirtualLinkRecordError(Exception):
83 """Internal virtual link record error"""
87 class VDUImageNotFound(Exception):
88 """VDU Image not found error"""
92 class VirtualDeploymentUnitRecordError(Exception):
93 """VDU Instantiation failed"""
97 class VMNotReadyError(Exception):
98 """ VM Not yet received from resource manager """
102 class VDURecordNotFound(Exception):
103 """ Could not find a VDU record """
107 class VirtualNetworkFunctionRecordDescNotFound(Exception):
108 """ Cannot find Virtual Network Function Record Descriptor """
112 class VirtualNetworkFunctionDescriptorError(Exception):
113 """ Virtual Network Function Record Descriptor Error """
117 class VirtualNetworkFunctionDescriptorNotFound(Exception):
118 """ Virtual Network Function Record Descriptor Not Found """
122 class VirtualNetworkFunctionRecordNotFound(Exception):
123 """ Virtual Network Function Record Not Found """
127 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
128 """ Virtual Network Funtion Descriptor reference count exists """
132 class VnfrInstantiationFailed(Exception):
133 """ Virtual Network Funtion Instantiation failed"""
137 class VNFMPlacementGroupError(Exception):
140 class VirtualNetworkFunctionRecordState(enum
.Enum
):
147 VL_TERMINATE_PHASE
= 6
148 VDU_TERMINATE_PHASE
= 7
153 class VDURecordState(enum
.Enum
):
154 """VDU record state """
157 RESOURCE_ALLOC_PENDING
= 3
164 class VcsComponent(object):
165 """ VCS Component within the VNF descriptor """
166 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
170 self
._component
= component
171 self
._cluster
_name
= cluster_name
172 self
._vcs
_handler
= vcs_handler
173 self
._mangled
_name
= mangled_name
176 def mangle_name(component_name
, vnf_name
, vnfd_id
):
177 """ mangled component name """
178 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
182 """ name of this component"""
183 return self
._mangled
_name
187 """ The path for this object """
188 return("D,/rw-manifest:manifest" +
189 "/rw-manifest:operational-inventory" +
190 "/rw-manifest:component" +
191 "[rw-manifest:component-name = '{}']").format(self
.name
)
194 def instance_xpath(self
):
195 """ The path for this object """
196 return("D,/rw-base:vcs" +
199 "[instance-name = '{}']".format(self
._cluster
_name
))
202 def start_comp_xpath(self
):
203 """ start component xpath """
204 return (self
.instance_xpath
+
205 "/child-n[instance-name = 'START-REQ']")
207 def get_start_comp_msg(self
, ip_address
):
208 """ start this component """
209 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
210 start_msg
.instance_name
= 'START-REQ'
211 start_msg
.component_name
= self
.name
212 start_msg
.admin_command
= "START"
213 start_msg
.ip_address
= ip_address
219 """ Returns the message for this vcs component"""
221 vcs_comp_dict
= self
._component
.as_dict()
223 def mangle_comp_names(comp_dict
):
224 """ mangle component name with VNF name, id"""
225 for key
, val
in comp_dict
.items():
226 if isinstance(val
, dict):
227 comp_dict
[key
] = mangle_comp_names(val
)
228 elif isinstance(val
, list):
231 if isinstance(ent
, dict):
232 val
[i
] = mangle_comp_names(ent
)
236 elif key
== "component_name":
237 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
242 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
243 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
247 def publish(self
, xact
):
248 """ Publishes the VCS component """
249 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
250 self
.name
, self
.path
, self
.msg
)
251 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
254 def start(self
, xact
, parent
, ip_addr
=None):
255 """ Starts this VCS component """
256 # ATTN RV - replace with block add
257 start_msg
= self
.get_start_comp_msg(ip_addr
)
258 self
._log
.debug("starting component %s %s",
259 self
.start_comp_xpath
, start_msg
)
260 yield from self
._dts
.query_create(self
.start_comp_xpath
,
263 self
._log
.debug("started component %s, %s",
264 self
.start_comp_xpath
, start_msg
)
267 class VirtualDeploymentUnitRecord(object):
268 """ Virtual Deployment Unit Record """
281 placement_groups
=[]):
287 self
._nsr
_config
= nsr_config
288 self
._mgmt
_intf
= mgmt_intf
289 self
._cloud
_account
_name
= cloud_account_name
290 self
._vnfd
_package
_store
= vnfd_package_store
291 self
._mgmt
_network
= mgmt_network
293 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
296 self
._state
= VDURecordState
.INIT
297 self
._state
_failed
_reason
= None
298 self
._request
_id
= str(uuid
.uuid4())
299 self
._name
= vnfr
.name
+ "__" + vdud
.id
300 self
._placement
_groups
= placement_groups
303 self
._vdud
_cloud
_init
= None
304 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
307 def vdu_opdata_register(self
):
308 yield from self
._vdur
_console
_handler
.register()
310 def cp_ip_addr(self
, cp_name
):
311 """ Find ip address by connection point name """
312 if self
._vm
_resp
is not None:
313 for conn_point
in self
._vm
_resp
.connection_points
:
314 if conn_point
.name
== cp_name
:
315 return conn_point
.ip_address
318 def cp_mac_addr(self
, cp_name
):
319 """ Find mac address by connection point name """
320 if self
._vm
_resp
is not None:
321 for conn_point
in self
._vm
_resp
.connection_points
:
322 if conn_point
.name
== cp_name
:
323 return conn_point
.mac_addr
324 return "00:00:00:00:00:00"
326 def cp_id(self
, cp_name
):
327 """ Find connection point id by connection point name """
328 if self
._vm
_resp
is not None:
329 for conn_point
in self
._vm
_resp
.connection_points
:
330 if conn_point
.name
== cp_name
:
331 return conn_point
.connection_point_id
344 """ Return this VDUR's name """
347 # Truncated name confirming to RFC 1123
349 def unique_short_name(self
):
350 """ Return this VDUR's unique short name """
351 # Impose these restrictions on Unique name
353 # - Max 10 of NSR name (remove all specialcharacters, only numbers and alphabets)
354 # - 6 chars of shortened name
355 # - Max 10 of VDU name (remove all specialcharacters, only numbers and alphabets)
357 def _restrict_tag(input_str
):
358 # Exclude all characters except a-zA-Z0-9
359 outstr
= re
.sub('[^a-zA-Z0-9]', '', input_str
)
360 # Take max of 10 chars
363 # Use NSR name for part1
364 part1
= _restrict_tag(self
._nsr
_config
.name
)
365 # Get unique short string (6 chars)
366 part2
= mano_short_name
.StringShortner(self
._name
)
367 # Use VDU ID for part3
368 part3
= _restrict_tag(self
._vdud
.id)
369 shortstr
= part1
+ "-" + part2
.short_string
+ "-" + part3
373 def cloud_account_name(self
):
374 """ Cloud account this VDU should be created in """
375 return self
._cloud
_account
_name
378 def image_name(self
):
379 """ name that should be used to lookup the image on the CMP """
380 if 'image' not in self
._vdud
:
382 return os
.path
.basename(self
._vdud
.image
)
385 def image_checksum(self
):
386 """ name that should be used to lookup the image on the CMP """
387 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
390 def management_ip(self
):
393 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
396 def vm_management_ip(self
):
399 return self
._vm
_resp
.management_ip
402 def operational_status(self
):
403 """ Operational status of this VDU"""
404 op_stats_dict
= {"INIT": "init",
405 "INSTANTIATING": "vm_init_phase",
406 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
409 "TERMINATING": "terminated",
410 "TERMINATED": "terminated",
412 return op_stats_dict
[self
._state
.name
]
416 """ Process VDU message from resmgr"""
417 vdu_fields
= ["vm_flavor",
424 vdu_copy_dict
= {k
: v
for k
, v
in
425 self
._vdud
.as_dict().items() if k
in vdu_fields
}
426 vdur_dict
= {"id": self
._vdur
_id
,
427 "vdu_id_ref": self
._vdud
.id,
428 "operational_status": self
.operational_status
,
429 "operational_status_details": self
._state
_failed
_reason
,
431 "unique_short_name": self
.unique_short_name
434 if self
.vm_resp
is not None:
435 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
436 "flavor_id": self
.vm_resp
.flavor_id
438 if self
._vm
_resp
.has_field('image_id'):
439 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
441 if self
.management_ip
is not None:
442 vdur_dict
["management_ip"] = self
.management_ip
444 if self
.vm_management_ip
is not None:
445 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
447 vdur_dict
.update(vdu_copy_dict
)
449 if self
.vm_resp
is not None:
450 if self
._vm
_resp
.has_field('volumes'):
451 for opvolume
in self
._vm
_resp
.volumes
:
452 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
453 if len(vdurvol_data
) == 1:
454 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
455 if opvolume
.has_field('custom_meta_data'):
456 metadata_list
= list()
457 for metadata_item
in opvolume
.custom_meta_data
:
458 metadata_list
.append(metadata_item
.as_dict())
459 vdurvol_data
[0]['custom_meta_data'] = metadata_list
461 if self
._vm
_resp
.has_field('supplemental_boot_data'):
462 vdur_dict
['supplemental_boot_data'] = dict()
463 if self
._vm
_resp
.supplemental_boot_data
.has_field('boot_data_drive'):
464 vdur_dict
['supplemental_boot_data']['boot_data_drive'] = self
._vm
_resp
.supplemental_boot_data
.boot_data_drive
465 if self
._vm
_resp
.supplemental_boot_data
.has_field('custom_meta_data'):
466 metadata_list
= list()
467 for metadata_item
in self
._vm
_resp
.supplemental_boot_data
.custom_meta_data
:
468 metadata_list
.append(metadata_item
.as_dict())
469 vdur_dict
['supplemental_boot_data']['custom_meta_data'] = metadata_list
470 if self
._vm
_resp
.supplemental_boot_data
.has_field('config_file'):
472 for file_item
in self
._vm
_resp
.supplemental_boot_data
.config_file
:
473 file_list
.append(file_item
.as_dict())
474 vdur_dict
['supplemental_boot_data']['config_file'] = file_list
479 for intf
, cp_id
, vlr
in self
._int
_intf
:
480 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
482 icp_list
.append({"name": cp
.name
,
484 "type_yang": "VPORT",
485 "ip_address": self
.cp_ip_addr(cp
.id),
486 "mac_address": self
.cp_mac_addr(cp
.id)})
488 ii_list
.append({"name": intf
.name
,
489 "vdur_internal_connection_point_ref": cp
.id,
490 "virtual_interface": {}})
492 vdur_dict
["internal_connection_point"] = icp_list
493 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
494 vdur_dict
["internal_interface"] = ii_list
497 for intf
, cp
, vlr
in self
._ext
_intf
:
498 ei_list
.append({"name": cp
.name
,
499 "vnfd_connection_point_ref": cp
.name
,
500 "virtual_interface": {}})
501 self
._vnfr
.update_cp(cp
.name
,
502 self
.cp_ip_addr(cp
.name
),
503 self
.cp_mac_addr(cp
.name
),
506 vdur_dict
["external_interface"] = ei_list
508 placement_groups
= []
509 for group
in self
._placement
_groups
:
510 placement_groups
.append(group
.as_dict())
511 vdur_dict
['placement_groups_info'] = placement_groups
513 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
516 def resmgr_path(self
):
517 """ path for resource-mgr"""
518 return ("D,/rw-resource-mgr:resource-mgmt" +
520 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
523 def vm_flavor_msg(self
):
524 """ VM flavor message """
525 flavor
= self
._vdud
.vm_flavor
.__class
__()
526 flavor
.copy_from(self
._vdud
.vm_flavor
)
531 def vdud_cloud_init(self
):
532 """ Return the cloud-init contents for the VDU """
533 if self
._vdud
_cloud
_init
is None:
534 self
._vdud
_cloud
_init
= self
.cloud_init()
536 return self
._vdud
_cloud
_init
538 def cloud_init(self
):
539 """ Populate cloud_init with cloud-config script from
540 either the inline contents or from the file provided
542 if self
._vdud
.cloud_init
is not None:
543 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
544 return self
._vdud
.cloud_init
545 elif self
._vdud
.cloud_init_file
is not None:
546 # Get cloud-init script contents from the file provided in the cloud_init_file param
547 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
548 filename
= self
._vdud
.cloud_init_file
549 self
._vnfd
_package
_store
.refresh()
550 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
551 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
553 return cloud_init_extractor
.read_script(stored_package
, filename
)
554 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
555 self
.instantiation_failed(str(e
))
556 raise VirtualDeploymentUnitRecordError(e
)
558 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
560 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
562 availability_zones
= []
564 for group
in self
._placement
_groups
:
565 if group
.has_field('host_aggregate'):
566 for aggregate
in group
.host_aggregate
:
567 host_aggregates
.append(aggregate
.as_dict())
568 if group
.has_field('availability_zone'):
569 availability_zones
.append(group
.availability_zone
.as_dict())
570 if group
.has_field('server_group'):
571 server_groups
.append(group
.server_group
.as_dict())
573 if availability_zones
:
574 if len(availability_zones
) > 1:
575 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
576 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
578 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
581 if len(server_groups
) > 1:
582 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
583 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
585 vm_create_msg_dict
['server_group'] = server_groups
[0]
588 vm_create_msg_dict
['host_aggregate'] = host_aggregates
592 def process_placement_groups(self
, vm_create_msg_dict
):
593 """Process the placement_groups and fill resource-mgr request"""
594 if not self
._placement
_groups
:
597 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
598 assert len(cloud_set
) == 1
599 cloud_type
= cloud_set
.pop()
601 if cloud_type
== 'openstack':
602 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
605 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
608 def process_custom_bootdata(self
, vm_create_msg_dict
):
609 """Process the custom boot data"""
610 if 'config_file' not in vm_create_msg_dict
['supplemental_boot_data']:
613 self
._vnfd
_package
_store
.refresh()
614 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
615 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
616 for file_item
in vm_create_msg_dict
['supplemental_boot_data']['config_file']:
617 if 'source' not in file_item
or 'dest' not in file_item
:
619 source
= file_item
['source']
620 # Find source file in scripts dir of VNFD
621 self
._log
.debug("Checking for source config file at %s", source
)
623 source_file_str
= cloud_init_extractor
.read_script(stored_package
, source
)
624 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
625 raise VirtualDeploymentUnitRecordError(e
)
626 # Update source file location with file contents
627 file_item
['source'] = source_file_str
631 def resmgr_msg(self
, config
=None):
632 vdu_fields
= ["vm_flavor",
638 "supplemental_boot_data"]
640 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
641 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
643 vm_create_msg_dict
= {
644 "name": self
.unique_short_name
, # Truncated name confirming to RFC 1123
645 "node_id": self
.name
, # Rift assigned Id
648 if self
.image_name
is not None:
649 vm_create_msg_dict
["image_name"] = self
.image_name
651 if self
.image_checksum
is not None:
652 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
654 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
655 if self
._vdud
.has_field('mgmt_vpci'):
656 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
658 self
._log
.debug("VDUD: %s", self
._vdud
)
659 if config
is not None:
660 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
662 if self
._mgmt
_network
:
663 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
666 for intf
, cp
, vlr
in self
._ext
_intf
:
667 cp_info
= { "name": cp
.name
,
668 "virtual_link_id": vlr
.network_id
,
669 "type_yang": intf
.virtual_interface
.type_yang
}
671 if cp
.has_field('port_security_enabled'):
672 cp_info
["port_security_enabled"] = cp
.port_security_enabled
674 if (intf
.virtual_interface
.has_field('vpci') and
675 intf
.virtual_interface
.vpci
is not None):
676 cp_info
["vpci"] = intf
.virtual_interface
.vpci
678 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
679 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
681 cp_list
.append(cp_info
)
683 for intf
, cp
, vlr
in self
._int
_intf
:
684 if (intf
.virtual_interface
.has_field('vpci') and
685 intf
.virtual_interface
.vpci
is not None):
686 cp_list
.append({"name": cp
,
687 "virtual_link_id": vlr
.network_id
,
688 "type_yang": intf
.virtual_interface
.type_yang
,
689 "vpci": intf
.virtual_interface
.vpci
})
691 if cp
.has_field('port_security_enabled'):
692 cp_list
.append({"name": cp
,
693 "virtual_link_id": vlr
.network_id
,
694 "type_yang": intf
.virtual_interface
.type_yang
,
695 "port_security_enabled": cp
.port_security_enabled
})
697 cp_list
.append({"name": cp
,
698 "virtual_link_id": vlr
.network_id
,
699 "type_yang": intf
.virtual_interface
.type_yang
})
702 vm_create_msg_dict
["connection_points"] = cp_list
703 vm_create_msg_dict
.update(vdu_copy_dict
)
705 self
.process_placement_groups(vm_create_msg_dict
)
706 if 'supplemental_boot_data' in vm_create_msg_dict
:
707 self
.process_custom_bootdata(vm_create_msg_dict
)
709 msg
= RwResourceMgrYang
.VDUEventData()
710 msg
.event_id
= self
._request
_id
711 msg
.cloud_account
= self
.cloud_account_name
712 msg
.request_info
.from_dict(vm_create_msg_dict
)
717 def terminate(self
, xact
):
718 """ Delete resource in VIM """
719 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
720 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
723 self
._state
= VDURecordState
.TERMINATING
724 if self
._vm
_resp
is not None:
726 with self
._dts
.transaction() as new_xact
:
727 yield from self
.delete_resource(new_xact
)
729 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
731 if self
._rm
_regh
is not None:
732 self
._log
.debug("Deregistering resource manager registration handle")
733 self
._rm
_regh
.deregister()
736 if self
._vdur
_console
_handler
is not None:
737 self
._log
.error("Deregistering vnfr vdur registration handle")
738 self
._vdur
_console
_handler
._regh
.deregister()
739 self
._vdur
_console
_handler
._regh
= None
741 self
._state
= VDURecordState
.TERMINATED
743 def find_internal_cp_by_cp_id(self
, cp_id
):
744 """ Find the CP corresponding to the connection point id"""
747 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
750 for int_cp
in self
._vdud
.internal_connection_point
:
751 self
._log
.debug("Checking for int cp %s in internal connection points",
753 if int_cp
.id == cp_id
:
758 self
._log
.debug("Failed to find cp %s in internal connection points",
760 msg
= "Failed to find cp %s in internal connection points" % cp_id
761 raise VduRecordError(msg
)
763 # return the VLR associated with the connection point
767 def create_resource(self
, xact
, vnfr
, config
=None):
768 """ Request resource from ResourceMgr """
769 def find_cp_by_name(cp_name
):
770 """ Find a connection point by name """
772 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
773 for ext_cp
in vnfr
._cprs
:
774 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
775 if ext_cp
.name
== cp_name
:
779 self
._log
.debug("Failed to find cp %s in external connection points",
783 def find_internal_vlr_by_cp_name(cp_name
):
784 """ Find the VLR corresponding to the connection point name"""
787 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
790 for int_cp
in self
._vdud
.internal_connection_point
:
791 self
._log
.debug("Checking for int cp %s in internal connection points",
793 if int_cp
.id == cp_name
:
798 self
._log
.debug("Failed to find cp %s in internal connection points",
800 msg
= "Failed to find cp %s in internal connection points" % cp_name
801 raise VduRecordError(msg
)
803 # return the VLR associated with the connection point
804 return vnfr
.find_vlr_by_cp(cp_name
)
806 block
= xact
.block_create()
808 self
._log
.debug("Executing vm request id: %s, action: create",
811 # Resolve the networks associated external interfaces
812 for ext_intf
in self
._vdud
.external_interface
:
813 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
814 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
815 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
817 self
._log
.debug("Failed to find connection point - %s",
818 ext_intf
.vnfd_connection_point_ref
)
820 self
._log
.debug("Connection point name [%s], type[%s]",
821 cp
.name
, cp
.type_yang
)
823 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
825 etuple
= (ext_intf
, cp
, vlr
)
826 self
._ext
_intf
.append(etuple
)
828 self
._log
.debug("Created external interface tuple : %s", etuple
)
830 # Resolve the networks associated internal interfaces
831 for intf
in self
._vdud
.internal_interface
:
832 cp_id
= intf
.vdu_internal_connection_point_ref
833 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
837 vlr
= find_internal_vlr_by_cp_name(cp_id
)
838 except Exception as e
:
839 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
840 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
841 raise VduRecordError(msg
)
843 ituple
= (intf
, cp_id
, vlr
)
844 self
._int
_intf
.append(ituple
)
846 self
._log
.debug("Created internal interface tuple : %s", ituple
)
848 resmgr_path
= self
.resmgr_path
849 resmgr_msg
= self
.resmgr_msg(config
)
851 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
852 block
.add_query_create(resmgr_path
, resmgr_msg
)
854 res_iter
= yield from block
.execute(now
=True)
862 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
863 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
864 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
865 return resp
.resource_info
868 def delete_resource(self
, xact
):
869 block
= xact
.block_create()
871 self
._log
.debug("Executing vm request id: %s, action: delete",
874 block
.add_query_delete(self
.resmgr_path
)
876 yield from block
.execute(flags
=0, now
=True)
879 def read_resource(self
, xact
):
880 block
= xact
.block_create()
882 self
._log
.debug("Executing vm request id: %s, action: delete",
885 block
.add_query_read(self
.resmgr_path
)
887 res_iter
= yield from block
.execute(flags
=0, now
=True)
892 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
893 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
894 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
895 #self._vm_resp = resp.resource_info
896 return resp
.resource_info
900 def start_component(self
):
901 """ This VDUR is active """
902 self
._log
.debug("Starting component %s for vdud %s vdur %s",
903 self
._vdud
.vcs_component_ref
,
906 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
907 self
.vm_resp
.management_ip
)
911 """ Is this VDU active """
912 return True if self
._state
is VDURecordState
.READY
else False
915 def instantiation_failed(self
, failed_reason
=None):
916 """ VDU instantiation failed """
917 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
918 self
._state
= VDURecordState
.FAILED
919 self
._state
_failed
_reason
= failed_reason
920 yield from self
._vnfr
.instantiation_failed(failed_reason
)
923 def vdu_is_active(self
):
924 """ This VDU is active"""
926 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
929 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
931 if self
._vdud
.vcs_component_ref
is not None:
932 yield from self
.start_component()
934 self
._state
= VDURecordState
.READY
936 if self
._vnfr
.all_vdus_active():
937 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
938 yield from self
._vnfr
.is_ready()
941 def instantiate(self
, xact
, vnfr
, config
=None):
942 """ Instantiate this VDU """
943 self
._state
= VDURecordState
.INSTANTIATING
946 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
947 """ This VDUR is active """
948 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
953 if (query_action
== rwdts
.QueryAction
.UPDATE
or
954 query_action
== rwdts
.QueryAction
.CREATE
):
957 if msg
.resource_state
== "active":
958 # Move this VDU to ready state
959 yield from self
.vdu_is_active()
960 elif msg
.resource_state
== "failed":
961 yield from self
.instantiation_failed(msg
.resource_errors
)
962 elif query_action
== rwdts
.QueryAction
.DELETE
:
963 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
965 raise NotImplementedError(
966 "%s action on VirtualDeployementUnitRecord not supported",
969 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
972 reg_event
= asyncio
.Event(loop
=self
._loop
)
975 def on_ready(regh
, status
):
978 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
979 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
980 flags
=rwdts
.Flag
.SUBSCRIBER
,
982 yield from reg_event
.wait()
984 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
985 self
._vm
_resp
= vm_resp
986 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
988 self
._log
.debug("Requested VM from resource manager response %s",
990 if vm_resp
.resource_state
== "active":
991 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
993 yield from self
.vdu_is_active()
994 self
._state
= VDURecordState
.READY
995 elif (vm_resp
.resource_state
== "pending" or
996 vm_resp
.resource_state
== "inactive"):
997 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
999 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1000 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
1001 # flags=rwdts.Flag.SUBSCRIBER,
1004 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
1006 raise VirtualDeploymentUnitRecordError(
1007 "Failed VDUR instantiation %s " % vm_resp
)
1009 except Exception as e
:
1011 traceback
.print_exc()
1012 self
._log
.exception(e
)
1013 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
1014 self
._state
= VDURecordState
.FAILED
1015 yield from self
.instantiation_failed(str(e
))
1018 class VlRecordState(enum
.Enum
):
1019 """ VL Record State """
1021 INSTANTIATION_PENDING
= 102
1023 TERMINATE_PENDING
= 104
1028 class InternalVirtualLinkRecord(object):
1029 """ Internal Virtual Link record """
1030 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
, ip_profile
=None):
1034 self
._ivld
_msg
= ivld_msg
1035 self
._vnfr
_name
= vnfr_name
1036 self
._cloud
_account
_name
= cloud_account_name
1037 self
._ip
_profile
= ip_profile
1039 self
._vlr
_req
= self
.create_vlr()
1041 self
._state
= VlRecordState
.INIT
1045 """ Find VLR by id """
1046 return self
._vlr
_req
.id
1050 """ Name of this VL """
1051 if self
._ivld
_msg
.vim_network_name
:
1052 return self
._ivld
_msg
.vim_network_name
1054 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
1057 def network_id(self
):
1058 """ Find VLR by id """
1059 return self
._vlr
.network_id
if self
._vlr
else None
1062 """ VLR path for this VLR instance"""
1063 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
1065 def create_vlr(self
):
1066 """ Create the VLR record which will be instantiated """
1068 vld_fields
= ["short_name",
1076 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
1078 vlr_dict
= {"id": str(uuid
.uuid4()),
1080 "cloud_account": self
._cloud
_account
_name
,
1083 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
1084 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
1086 vlr_dict
.update(vld_copy_dict
)
1088 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
1092 def instantiate(self
, xact
, restart_mode
=False):
1093 """ Instantiate VL """
1096 def instantiate_vlr():
1097 """ Instantiate VLR"""
1098 self
._log
.debug("Create VL with xpath %s and vlr %s",
1099 self
.vlr_path(), self
._vlr
_req
)
1101 with self
._dts
.transaction(flags
=0) as xact
:
1102 block
= xact
.block_create()
1103 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1104 self
._log
.debug("Executing VL create path:%s msg:%s",
1105 self
.vlr_path(), self
._vlr
_req
)
1109 res_iter
= yield from block
.execute()
1111 self
._state
= VlRecordState
.FAILED
1112 self
._log
.exception("Caught exception while instantial VL")
1115 for ent
in res_iter
:
1116 res
= yield from ent
1117 self
._vlr
= res
.result
1119 if self
._vlr
.operational_status
== 'failed':
1120 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1121 self
._state
= VlRecordState
.FAILED
1122 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1124 self
._log
.info("Created VL with xpath %s and vlr %s",
1125 self
.vlr_path(), self
._vlr
)
1129 """ Get the network id """
1130 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1132 for ent
in res_iter
:
1133 res
= yield from ent
1137 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1139 raise InternalVirtualLinkRecordError(err
)
1142 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1145 vl
= yield from get_vlr()
1147 yield from instantiate_vlr()
1149 yield from instantiate_vlr()
1151 self
._state
= VlRecordState
.ACTIVE
1153 def vlr_in_vns(self
):
1154 """ Is there a VLR record in VNS """
1155 if (self
._state
== VlRecordState
.ACTIVE
or
1156 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1157 self
._state
== VlRecordState
.FAILED
):
1163 def terminate(self
, xact
):
1164 """Terminate this VL """
1165 if not self
.vlr_in_vns():
1166 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1167 self
.vlr_id
, self
._state
)
1170 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1171 self
._state
= VlRecordState
.TERMINATE_PENDING
1172 block
= xact
.block_create()
1173 block
.add_query_delete(self
.vlr_path())
1174 yield from block
.execute(flags
=0, now
=True)
1175 self
._state
= VlRecordState
.TERMINATED
1176 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1179 class VirtualNetworkFunctionRecord(object):
1180 """ Virtual Network Function Record """
1181 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1185 self
._cluster
_name
= cluster_name
1186 self
._vnfr
_msg
= vnfr_msg
1187 self
._vnfr
_id
= vnfr_msg
.id
1188 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1190 self
._vcs
_handler
= vcs_handler
1191 self
._vnfr
= vnfr_msg
1192 self
._mgmt
_network
= mgmt_network
1194 self
._vnfd
= vnfr_msg
.vnfd
1195 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1196 self
._state
_failed
_reason
= None
1197 self
._ext
_vlrs
= {} # The list of external virtual links
1198 self
._vlrs
= [] # The list of internal virtual links
1199 self
._vdus
= [] # The list of vdu
1200 self
._vlr
_by
_cp
= {}
1202 self
._inventory
= {}
1203 self
._create
_time
= int(time
.time())
1204 self
._vnf
_mon
= None
1205 self
._config
_status
= vnfr_msg
.config_status
1206 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1207 self
._rw
_vnfd
= None
1208 self
._vnfd
_ref
_count
= 0
1210 def _get_vdur_from_vdu_id(self
, vdu_id
):
1211 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1212 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1213 for vdu
in self
._vdus
:
1214 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1215 if vdu
.vdu_id
== vdu_id
:
1218 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1221 def operational_status(self
):
1222 """ Operational status of this VNFR """
1223 op_status_map
= {"INIT": "init",
1224 "VL_INIT_PHASE": "vl_init_phase",
1225 "VM_INIT_PHASE": "vm_init_phase",
1227 "TERMINATE": "terminate",
1228 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1229 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1230 "TERMINATED": "terminated",
1231 "FAILED": "failed", }
1232 return op_status_map
[self
._state
.name
]
1235 def vnfd_xpath(vnfd_id
):
1236 """ VNFD xpath associated with this VNFR """
1237 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1240 def vnfd_ref_count(self
):
1241 """ Returns the VNFD reference count associated with this VNFR """
1242 return self
._vnfd
_ref
_count
1244 def vnfd_in_use(self
):
1245 """ Returns whether vnfd is in use or not """
1246 return True if self
._vnfd
_ref
_count
> 0 else False
1249 """ Take a reference on this object """
1250 self
._vnfd
_ref
_count
+= 1
1251 return self
._vnfd
_ref
_count
1253 def vnfd_unref(self
):
1254 """ Release reference on this object """
1255 if self
._vnfd
_ref
_count
< 1:
1256 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1257 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1258 self
._log
.critical(msg
)
1259 raise VnfRecordError(msg
)
1260 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1261 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1262 self
._vnfd
_ref
_count
-= 1
1263 return self
._vnfd
_ref
_count
1267 """ VNFD for this VNFR """
1272 """ VNFD name associated with this VNFR """
1273 return self
.vnfd
.name
1277 """ Name of this VNF in the record """
1278 return self
._vnfr
.name
1281 def cloud_account_name(self
):
1282 """ Name of the cloud account this VNFR is instantiated in """
1283 return self
._vnfr
.cloud_account
1287 """ VNFD Id associated with this VNFR """
1292 """ VNFR Id associated with this VNFR """
1293 return self
._vnfr
_id
1296 def member_vnf_index(self
):
1297 """ Member VNF index associated with this VNFR """
1298 return self
._vnfr
.member_vnf_index_ref
1301 def config_status(self
):
1302 """ Config agent status for this VNFR """
1303 return self
._config
_status
1305 def component_by_name(self
, component_name
):
1306 """ Find a component by name in the inventory list"""
1307 mangled_name
= VcsComponent
.mangle_name(component_name
,
1310 return self
._inventory
[mangled_name
]
1315 def get_nsr_config(self
):
1316 ### Need access to NS instance configuration for runtime resolution.
1317 ### This shall be replaced when deployment flavors are implemented
1318 xpath
= "C,/nsr:ns-instance-config"
1319 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1321 for result
in results
:
1322 entry
= yield from result
1323 ns_instance_config
= entry
.result
1324 for nsr
in ns_instance_config
.nsr
:
1325 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1330 def start_component(self
, component_name
, ip_addr
):
1331 """ Start a component in the VNFR by name """
1332 comp
= self
.component_by_name(component_name
)
1333 yield from comp
.start(None, None, ip_addr
)
1335 def cp_ip_addr(self
, cp_name
):
1336 """ Get ip address for connection point """
1337 self
._log
.debug("cp_ip_addr()")
1338 for cp
in self
._cprs
:
1339 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1340 return cp
.ip_address
1343 def mgmt_intf_info(self
):
1344 """ Get Management interface info for this VNFR """
1345 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1347 if mgmt_intf_desc
.has_field("cp"):
1348 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1349 elif mgmt_intf_desc
.has_field("vdu_id"):
1351 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1352 ip_addr
= vdur
.management_ip
1353 except VDURecordNotFound
:
1354 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1357 ip_addr
= mgmt_intf_desc
.ip_address
1358 port
= mgmt_intf_desc
.port
1360 return ip_addr
, port
1364 """ Message associated with this VNFR """
1365 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1366 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1368 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1369 ip_address
, port
= self
.mgmt_intf_info()
1371 if ip_address
is not None:
1372 mgmt_intf
.ip_address
= ip_address
1373 if port
is not None:
1374 mgmt_intf
.port
= port
1376 vnfr_dict
= {"id": self
._vnfr
_id
,
1377 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1379 "member_vnf_index_ref": self
.member_vnf_index
,
1380 "operational_status": self
.operational_status
,
1381 "operational_status_details": self
._state
_failed
_reason
,
1382 "cloud_account": self
.cloud_account_name
,
1383 "config_status": self
._config
_status
1386 vnfr_dict
.update(vnfd_copy_dict
)
1388 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1389 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1391 vnfr_msg
.create_time
= self
._create
_time
1392 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1393 vnfr_msg
.mgmt_interface
= mgmt_intf
1395 # Add all the VLRs to VNFR
1396 for vlr
in self
._vlrs
:
1397 ivlr
= vnfr_msg
.internal_vlr
.add()
1398 ivlr
.vlr_ref
= vlr
.vlr_id
1400 # Add all the VDURs to VDUR
1401 if self
._vdus
is not None:
1402 for vdu
in self
._vdus
:
1403 vdur
= vnfr_msg
.vdur
.add()
1404 vdur
.from_dict(vdu
.msg
.as_dict())
1406 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1407 vnfr_msg
.dashboard_url
= self
.dashboard_url
1409 for cpr
in self
._cprs
:
1410 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1411 vnfr_msg
.connection_point
.append(new_cp
)
1413 if self
._vnf
_mon
is not None:
1414 for monp
in self
._vnf
_mon
.msg
:
1415 vnfr_msg
.monitoring_param
.append(
1416 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1418 if self
._vnfr
.vnf_configuration
is not None:
1419 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1420 if (ip_address
is not None and
1421 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1422 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1424 for group
in self
._vnfr
_msg
.placement_groups_info
:
1425 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1426 group_info
.from_dict(group
.as_dict())
1427 vnfr_msg
.placement_groups_info
.append(group_info
)
1432 def dashboard_url(self
):
1433 ip
, cfg_port
= self
.mgmt_intf_info()
1436 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1437 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1440 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1441 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1443 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1447 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1454 """ path for this VNFR """
1455 return("D,/vnfr:vnfr-catalog"
1456 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1459 def publish(self
, xact
):
1460 """ publish this VNFR """
1462 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1463 self
.xpath
, self
.msg
)
1464 vnfr
.create_time
= self
._create
_time
1465 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1466 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1467 self
.xpath
, self
.msg
)
1469 def resolve_vld_ip_profile(self
, vnfd_msg
, vld
):
1470 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1471 if not vld
.has_field('ip_profile_ref'):
1473 profile
= [profile
for profile
in vnfd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1474 return profile
[0] if profile
else None
1477 def create_vls(self
):
1478 """ Publish The VLs associated with this VNF """
1479 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1481 for ivld_msg
in self
.vnfd
.internal_vld
:
1482 self
._log
.debug("Creating internal vld:"
1483 " %s, int_cp_ref = %s",
1484 ivld_msg
, ivld_msg
.internal_connection_point
1486 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1490 vnfr_name
=self
.name
,
1491 cloud_account_name
=self
.cloud_account_name
,
1492 ip_profile
=self
.resolve_vld_ip_profile(self
.vnfd
, ivld_msg
)
1494 self
._vlrs
.append(vlr
)
1496 for int_cp
in ivld_msg
.internal_connection_point
:
1497 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1498 msg
= ("Connection point %s already "
1499 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1500 raise InternalVirtualLinkRecordError(msg
)
1501 self
._log
.debug("Setting vlr %s to internal cp = %s",
1503 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1506 def instantiate_vls(self
, xact
, restart_mode
=False):
1507 """ Instantiate the VLs associated with this VNF """
1508 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1511 for vlr
in self
._vlrs
:
1512 self
._log
.debug("Instantiating VLR %s", vlr
)
1513 yield from vlr
.instantiate(xact
, restart_mode
)
1515 def find_vlr_by_cp(self
, cp_name
):
1516 """ Find the VLR associated with the cp name """
1517 return self
._vlr
_by
_cp
[cp_name
]
1519 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1521 Returns the cloud specific construct for placement group
1523 input_group: VNFD PlacementGroup
1524 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1526 copy_dict
= ['name', 'requirement', 'strategy']
1527 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1528 if group_info
.placement_group_ref
== input_group
.name
and \
1529 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1530 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1531 group_dict
= {k
:v
for k
,v
in
1532 group_info
.as_dict().items()
1533 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1534 for param
in copy_dict
:
1535 group_dict
.update({param
: getattr(input_group
, param
)})
1536 group
.from_dict(group_dict
)
1541 def get_vdu_placement_groups(self
, vdu
, nsr_config
):
1542 placement_groups
= []
1543 ### Step-1: Get VNF level placement groups
1544 for group
in self
._vnfr
_msg
.placement_groups_info
:
1545 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1546 #group_info.from_dict(group.as_dict())
1547 placement_groups
.append(group
)
1549 ### Step-2: Get VDU level placement groups
1550 for group
in self
.vnfd
.placement_groups
:
1551 for member_vdu
in group
.member_vdus
:
1552 if member_vdu
.member_vdu_ref
== vdu
.id:
1553 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1555 if group_info
is None:
1556 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1557 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1559 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1563 self
.member_vnf_index
)
1564 placement_groups
.append(group_info
)
1566 return placement_groups
1569 def vdu_cloud_init_instantiation(self
):
1570 [vdu
.vdud_cloud_init
for vdu
in self
._vdus
]
1573 def create_vdus(self
, vnfr
, restart_mode
=False):
1574 """ Create the VDUs associated with this VNF """
1576 def get_vdur_id(vdud
):
1577 """Get the corresponding VDUR's id for the VDUD. This is useful in
1580 In restart mode we check for exiting VDUR's ID and use them, if
1581 available. This way we don't end up creating duplicate VDURs
1585 if restart_mode
and vdud
is not None:
1587 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1590 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1595 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1597 # Get NSR config - Needed for placement groups and to derive VDU short-name
1598 nsr_config
= yield from self
.get_nsr_config()
1600 for vdu
in self
._rw
_vnfd
.vdu
:
1601 self
._log
.debug("Creating vdu: %s", vdu
)
1602 vdur_id
= get_vdur_id(vdu
)
1605 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
, nsr_config
)
1606 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s, Existing vdur_id %s",
1609 self
.member_vnf_index
,
1610 [ group
.name
for group
in placement_groups
],
1613 vdur
= VirtualDeploymentUnitRecord(
1619 nsr_config
=nsr_config
,
1620 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1621 mgmt_network
=self
._mgmt
_network
,
1622 cloud_account_name
=self
.cloud_account_name
,
1623 vnfd_package_store
=self
._vnfd
_package
_store
,
1625 placement_groups
= placement_groups
,
1627 yield from vdur
.vdu_opdata_register()
1629 self
._vdus
.append(vdur
)
1632 def instantiate_vdus(self
, xact
, vnfr
):
1633 """ Instantiate the VDUs associated with this VNF """
1634 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1636 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1638 # Identify any dependencies among the VDUs
1639 dependencies
= collections
.defaultdict(list)
1640 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1642 for vdu
in self
._vdus
:
1643 if vdu
._vdud
_cloud
_init
is not None:
1644 for vdu_id
in vdu_id_pattern
.findall(vdu
._vdud
_cloud
_init
):
1645 if vdu_id
!= vdu
.vdu_id
:
1646 # This means that vdu.vdu_id depends upon vdu_id,
1647 # i.e. vdu_id must be instantiated before
1649 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1651 # Define the terminal states of VDU instantiation
1653 VDURecordState
.READY
,
1654 VDURecordState
.TERMINATED
,
1655 VDURecordState
.FAILED
,
1658 datastore
= VdurDatastore()
1662 def instantiate_monitor(vdu
):
1663 """Monitor the state of the VDU during instantiation
1666 vdu - a VirtualDeploymentUnitRecord
1669 # wait for the VDUR to enter a terminal state
1670 while vdu
._state
not in terminal
:
1671 yield from asyncio
.sleep(1, loop
=self
._loop
)
1672 # update the datastore
1673 datastore
.update(vdu
)
1675 # add the VDU to the set of processed VDUs
1676 processed
.add(vdu
.vdu_id
)
1679 def instantiate(vdu
):
1680 """Instantiate the specified VDU
1683 vdu - a VirtualDeploymentUnitRecord
1686 if the VDU, or any of the VDUs this VDU depends upon, are
1687 terminated or fail to instantiate properly, a
1688 VirtualDeploymentUnitRecordError is raised.
1691 for dependency
in dependencies
[vdu
.vdu_id
]:
1692 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1694 while dependency
.vdu_id
not in processed
:
1695 yield from asyncio
.sleep(1, loop
=self
._loop
)
1697 if not dependency
.active
:
1698 raise VirtualDeploymentUnitRecordError()
1700 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1702 # Populate the datastore with the current values of the VDU
1705 # Substitute any variables contained in the cloud config script
1706 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1708 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1711 # Extract the variable names
1713 for variable
in parts
[1::2]:
1714 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1716 # Iterate of the variables and substitute values from the
1718 for variable
in variables
:
1720 # Handle a reference to a VDU by ID
1721 if variable
.startswith('vdu['):
1722 value
= datastore
.get(variable
)
1724 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1725 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1727 config
= config
.replace("{{ %s }}" % variable
, value
)
1730 # Handle a reference to the current VDU
1731 if variable
.startswith('vdu'):
1732 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1733 config
= config
.replace("{{ %s }}" % variable
, value
)
1736 # Handle unrecognized variables
1737 msg
= 'unrecognized cloud-config variable: {}'
1738 raise ValueError(msg
.format(variable
))
1740 # Instantiate the VDU
1741 with self
._dts
.transaction() as xact
:
1742 self
._log
.debug("Instantiating vdu: %s", vdu
)
1743 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1744 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1745 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1748 # First create a set of tasks to monitor the state of the VDUs and
1749 # report when they have entered a terminal state
1750 for vdu
in self
._vdus
:
1751 self
._loop
.create_task(instantiate_monitor(vdu
))
1753 for vdu
in self
._vdus
:
1754 self
._loop
.create_task(instantiate(vdu
))
1756 def has_mgmt_interface(self
, vdu
):
1757 # ## TODO: Support additional mgmt_interface type options
1758 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1762 def vlr_xpath(self
, vlr_id
):
1765 "D,/vlr:vlr-catalog/"
1766 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1768 def ext_vlr_by_id(self
, vlr_id
):
1769 """ find ext vlr by id """
1770 return self
._ext
_vlrs
[vlr_id
]
1773 def publish_inventory(self
, xact
):
1774 """ Publish the inventory associated with this VNF """
1775 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1777 for component
in self
._rw
_vnfd
.component
:
1778 self
._log
.debug("Creating inventory component %s", component
)
1779 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1783 comp
= VcsComponent(dts
=self
._dts
,
1786 cluster_name
=self
._cluster
_name
,
1787 vcs_handler
=self
._vcs
_handler
,
1788 component
=component
,
1789 mangled_name
=mangled_name
,
1791 if comp
.name
in self
._inventory
:
1792 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1793 component
, self
._vnfd
_id
)
1795 self
._log
.debug("Adding component %s for vnrf %s",
1796 comp
.name
, self
._vnfr
_id
)
1797 self
._inventory
[comp
.name
] = comp
1798 yield from comp
.publish(xact
)
1800 def all_vdus_active(self
):
1801 """ Are all VDUS in this VNFR active? """
1802 for vdu
in self
._vdus
:
1806 self
._log
.debug("Inside all_vdus_active. Returning True")
1810 def instantiation_failed(self
, failed_reason
=None):
1811 """ VNFR instantiation failed """
1812 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1813 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1814 self
._state
_failed
_reason
= failed_reason
1816 # Update the VNFR with the changed status
1817 yield from self
.publish(None)
1821 """ This VNF is ready"""
1822 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1824 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1825 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1828 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1830 # Update the VNFR with the changed status
1831 yield from self
.publish(None)
1833 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1834 """Updated the connection point with ip address"""
1835 for cp
in self
._cprs
:
1836 if cp
.name
== cp_name
:
1837 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1838 cp_name
, cp
, ip_address
, cp_id
)
1839 cp
.ip_address
= ip_address
1840 cp
.mac_address
= mac_addr
1841 cp
.connection_point_id
= cp_id
1844 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1845 self
._log
.debug(err
)
1846 raise VirtualDeploymentUnitRecordError(err
)
1848 def set_state(self
, state
):
1849 """ Set state for this VNFR"""
1853 def instantiate(self
, xact
, restart_mode
=False):
1854 """ instantiate this VNF """
1855 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1856 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1861 # Iterate over all the connection points in VNFR and fetch the
1864 def cpr_from_cp(cp
):
1865 """ Creates a record level connection point from the desciptor cp"""
1866 cp_fields
= ["name", "image", "vm-flavor", "port_security_enabled"]
1867 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1869 cpr_dict
.update(cp_copy_dict
)
1870 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1872 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1873 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1875 for cp
in self
._vnfr
.connection_point
:
1876 cpr
= cpr_from_cp(cp
)
1877 self
._cprs
.append(cpr
)
1878 self
._log
.debug("Adding Connection point record %s ", cp
)
1880 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1881 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1882 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1883 rwdts
.XactFlag
.MERGE
)
1887 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1888 cpr
.vlr_ref
= cp
.vlr_ref
1889 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1891 # Increase the VNFD reference count
1896 # Fetch External VLRs
1897 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1898 yield from fetch_vlrs()
1901 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1902 yield from self
.publish_inventory(xact
)
1905 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1906 yield from self
.create_vls()
1909 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1910 yield from self
.publish(xact
)
1914 self
._log
.debug("VNFR-ID %s: Instantiate VLs, restart mode %s", self
._vnfr
_id
, restart_mode
)
1916 yield from self
.instantiate_vls(xact
, restart_mode
)
1917 except Exception as e
:
1918 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1919 yield from self
.instantiation_failed(str(e
))
1922 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1925 self
._log
.debug("VNFR-ID %s: Create VDUs, restart mode %s", self
._vnfr
_id
, restart_mode
)
1926 yield from self
.create_vdus(self
, restart_mode
)
1929 yield from self
.vdu_cloud_init_instantiation()
1930 except Exception as e
:
1931 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1932 self
._state
_failed
_reason
= str(e
)
1933 yield from self
.publish(xact
)
1936 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1937 yield from self
.publish(xact
)
1940 # ToDo: Check if this should be prevented during restart
1941 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1942 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1945 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1946 yield from self
.publish(xact
)
1948 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1950 # create task updating uptime for this vnfr
1951 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1952 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1955 def terminate(self
, xact
):
1956 """ Terminate this virtual network function """
1958 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1960 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1963 if self
._vnf
_mon
is not None:
1964 self
._vnf
_mon
.stop()
1965 self
._vnf
_mon
.deregister()
1966 self
._vnf
_mon
= None
1969 def terminate_vls():
1970 """ Terminate VLs in this VNF """
1971 for vl
in self
._vlrs
:
1972 yield from vl
.terminate(xact
)
1975 def terminate_vdus():
1976 """ Terminate VDUS in this VNF """
1977 for vdu
in self
._vdus
:
1978 yield from vdu
.terminate(xact
)
1980 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1981 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1982 yield from terminate_vls()
1984 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1985 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1986 yield from terminate_vdus()
1988 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1989 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1992 def vnfr_uptime_update(self
, xact
):
1994 # Return when vnfr state is FAILED or TERMINATED etc
1995 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1996 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1997 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1998 VirtualNetworkFunctionRecordState
.READY
]:
2000 yield from self
.publish(xact
)
2001 yield from asyncio
.sleep(2, loop
=self
._loop
)
2005 class VnfdDtsHandler(object):
2006 """ DTS handler for VNFD config changes """
2007 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2009 def __init__(self
, dts
, log
, loop
, vnfm
):
2018 """ DTS registration handle """
2023 """ Register for VNFD configuration"""
2025 def on_apply(dts
, acg
, xact
, action
, scratch
):
2026 """Apply the configuration"""
2027 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2028 xact
, action
, scratch
)
2030 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
2033 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2034 """ on prepare callback """
2035 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
2036 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
2037 fref
= ProtobufC
.FieldReference
.alloc()
2038 fref
.goto_whole_message(msg
.to_pbcm())
2040 # Handle deletes in prepare_callback
2041 if fref
.is_field_deleted():
2042 # Delete an VNFD record
2043 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
2044 if self
._vnfm
.vnfd_in_use(msg
.id):
2045 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
2046 err
= "Cannot delete a VNFD in use - %s" % msg
2047 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
2048 # Delete a VNFD record
2049 yield from self
._vnfm
.delete_vnfd(msg
.id)
2051 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2054 "Registering for VNFD config using xpath: %s",
2055 VnfdDtsHandler
.XPATH
,
2057 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2058 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2059 self
._regh
= acg
.register(
2060 xpath
=VnfdDtsHandler
.XPATH
,
2061 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2062 on_prepare
=on_prepare
)
2065 class VcsComponentDtsHandler(object):
2066 """ Vcs Component DTS handler """
2067 XPATH
= ("D,/rw-manifest:manifest" +
2068 "/rw-manifest:operational-inventory" +
2069 "/rw-manifest:component")
2071 def __init__(self
, dts
, log
, loop
, vnfm
):
2080 """ DTS registration handle """
2085 """ Registers VCS component dts publisher registration"""
2086 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
2087 VcsComponentDtsHandler
.XPATH
)
2089 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
2090 handlers
= rift
.tasklets
.Group
.Handler()
2091 with self
._dts
.group_create(handler
=handlers
) as group
:
2092 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
2094 flags
=(rwdts
.Flag
.PUBLISHER |
2095 rwdts
.Flag
.NO_PREP_READ |
2096 rwdts
.Flag
.DATASTORE
),)
2099 def publish(self
, xact
, path
, msg
):
2100 """ Publishes the VCS component """
2101 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
2103 self
.regh
.create_element(path
, msg
)
2104 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2105 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
2107 class VnfrConsoleOperdataDtsHandler(object):
2108 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2110 def vnfr_vdu_console_xpath(self
):
2111 """ path for resource-mgr"""
2112 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
2114 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2121 self
._vnfr
_id
= vnfr_id
2122 self
._vdur
_id
= vdur_id
2123 self
._vdu
_id
= vdu_id
2127 """ Register for VNFR VDU Operational Data read from dts """
2130 def on_prepare(xact_info
, action
, ks_path
, msg
):
2131 """ prepare callback from dts """
2132 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2134 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2135 xact_info
, action
, xpath
, msg
2138 if action
== rwdts
.QueryAction
.READ
:
2139 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2140 path_entry
= schema
.keyspec_to_entry(ks_path
)
2141 self
._log
.debug("VDU Opdata path is {}".format(path_entry
.key00
.id))
2143 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2144 except VnfRecordError
as e
:
2145 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2146 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2149 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2150 if not vdur
._state
== VDURecordState
.READY
:
2151 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2152 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2154 with self
._dts
.transaction() as new_xact
:
2155 resp
= yield from vdur
.read_resource(new_xact
)
2156 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2157 vdur_console
.id = self
._vdur
_id
2158 if resp
.console_url
:
2159 vdur_console
.console_url
= resp
.console_url
2161 vdur_console
.console_url
= 'none'
2162 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2164 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2165 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2166 vdur_console
.id = self
._vdur
_id
2167 vdur_console
.console_url
= 'none'
2169 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2170 xpath
=self
.vnfr_vdu_console_xpath
,
2173 #raise VnfRecordError("Not supported operation %s" % action)
2174 self
._log
.error("Not supported operation %s" % action
)
2175 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2179 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2180 self
.vnfr_vdu_console_xpath
)
2181 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2182 with self
._dts
.group_create() as group
:
2183 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2185 flags
=rwdts
.Flag
.PUBLISHER
,
2189 class VnfrDtsHandler(object):
2190 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2191 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2193 def __init__(self
, dts
, log
, loop
, vnfm
):
2203 """ Return registration handle"""
2208 """ Return VNF manager instance """
2213 """ Register for vnfr create/update/delete/read requests from dts """
2214 def on_commit(xact_info
):
2215 """ The transaction has been committed """
2216 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2217 return rwdts
.MemberRspCode
.ACTION_OK
2219 def on_abort(*args
):
2220 """ Abort callback """
2221 self
._log
.debug("VNF transaction got aborted")
2224 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2227 def instantiate_realloc_vnfr(vnfr
):
2228 """Re-populate the vnfm after restart
2235 yield from vnfr
.instantiate(None, restart_mode
=True)
2237 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2238 curr_cfg
= self
.regh
.elements
2239 for cfg
in curr_cfg
:
2240 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2241 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2243 self
._log
.debug("Got on_event in vnfm")
2245 return rwdts
.MemberRspCode
.ACTION_OK
2248 def on_prepare(xact_info
, action
, ks_path
, msg
):
2249 """ prepare callback from dts """
2251 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2252 xact_info
, action
, msg
2255 if action
== rwdts
.QueryAction
.CREATE
:
2256 if not msg
.has_field("vnfd"):
2257 err
= "Vnfd not provided"
2258 self
._log
.error(err
)
2259 raise VnfRecordError(err
)
2261 vnfr
= self
.vnfm
.create_vnfr(msg
)
2263 # RIFT-9105: Unable to add a READ query under an existing transaction
2264 # xact = xact_info.xact
2265 yield from vnfr
.instantiate(None)
2266 except Exception as e
:
2267 self
._log
.exception(e
)
2268 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2269 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2270 yield from vnfr
.publish(None)
2271 elif action
== rwdts
.QueryAction
.DELETE
:
2272 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2273 path_entry
= schema
.keyspec_to_entry(ks_path
)
2274 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2277 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2278 raise VirtualNetworkFunctionRecordNotFound(
2279 "VNFR id %s", path_entry
.key00
.id)
2282 yield from vnfr
.terminate(xact_info
.xact
)
2285 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2286 except Exception as e
:
2287 self
._log
.exception(e
)
2288 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2290 elif action
== rwdts
.QueryAction
.UPDATE
:
2291 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2292 path_entry
= schema
.keyspec_to_entry(ks_path
)
2295 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2296 except Exception as e
:
2297 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2298 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2302 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2303 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2306 self
._log
.debug("VNFR {} update config status {} (current {})".
2307 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2308 # Update the config status and publish
2309 vnfr
._config
_status
= msg
.config_status
2310 yield from vnfr
.publish(None)
2313 raise NotImplementedError(
2314 "%s action on VirtualNetworkFunctionRecord not supported",
2317 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2319 self
._log
.debug("Registering for VNFR using xpath: %s",
2320 VnfrDtsHandler
.XPATH
,)
2322 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2323 on_prepare
=on_prepare
,)
2324 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2325 with self
._dts
.group_create(handler
=handlers
) as group
:
2326 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2328 flags
=(rwdts
.Flag
.PUBLISHER |
2329 rwdts
.Flag
.NO_PREP_READ |
2331 rwdts
.Flag
.DATASTORE
),)
2334 def create(self
, xact
, path
, msg
):
2336 Create a VNFR record in DTS with path and message
2338 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2341 self
.regh
.create_element(path
, msg
)
2342 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2346 def update(self
, xact
, path
, msg
):
2348 Update a VNFR record in DTS with path and message
2350 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2352 self
.regh
.update_element(path
, msg
)
2353 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2357 def delete(self
, xact
, path
):
2359 Delete a VNFR record in DTS with path and message
2361 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2362 self
.regh
.delete_element(path
)
2363 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2366 class VnfdRefCountDtsHandler(object):
2367 """ The VNFD Ref Count DTS handler """
2368 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2370 def __init__(self
, dts
, log
, loop
, vnfm
):
2380 """ Return registration handle """
2385 """ Return the NS manager instance """
2390 """ Register for VNFD ref count read from dts """
2393 def on_prepare(xact_info
, action
, ks_path
, msg
):
2394 """ prepare callback from dts """
2395 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2397 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2398 xact_info
, action
, xpath
, msg
2401 if action
== rwdts
.QueryAction
.READ
:
2402 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2403 path_entry
= schema
.keyspec_to_entry(ks_path
)
2404 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2405 for xpath
, msg
in vnfd_list
:
2406 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2408 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2411 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2413 raise VnfRecordError("Not supported operation %s" % action
)
2415 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2416 with self
._dts
.group_create() as group
:
2417 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2419 flags
=rwdts
.Flag
.PUBLISHER
,
2423 class VdurDatastore(object):
2425 This VdurDatastore is intended to expose select information about a VDUR
2426 such that it can be referenced in a cloud config file. The data that is
2427 exposed does not necessarily follow the structure of the data in the yang
2428 model. This is intentional. The data that are exposed are intended to be
2429 agnostic of the yang model so that changes in the model do not necessarily
2430 require changes to the interface provided to the user. It also means that
2431 the user does not need to be familiar with the RIFT.ware yang models.
2435 """Create an instance of VdurDatastore"""
2436 self
._vdur
_data
= dict()
2437 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2439 def add(self
, vdur
):
2440 """Add a new VDUR to the datastore
2443 vdur - a VirtualDeploymentUnitRecord instance
2446 A ValueError is raised if the VDUR is (1) None or (2) already in
2450 if vdur
.vdu_id
is None:
2451 raise ValueError('VDURs are required to have an ID')
2453 if vdur
.vdu_id
in self
._vdur
_data
:
2454 raise ValueError('cannot add a VDUR more than once')
2456 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2458 def set_if_not_none(key
, attr
):
2459 if attr
is not None:
2460 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2462 set_if_not_none('name', vdur
._vdud
.name
)
2463 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2464 # The below can be used for hostname
2465 set_if_not_none('vdur_name', vdur
.unique_short_name
)
2467 def update(self
, vdur
):
2468 """Update the VDUR information in the datastore
2471 vdur - a GI representation of a VDUR
2474 A ValueError is raised if the VDUR is (1) None or (2) already in
2478 if vdur
.vdu_id
is None:
2479 raise ValueError('VNFDs are required to have an ID')
2481 if vdur
.vdu_id
not in self
._vdur
_data
:
2482 raise ValueError('VNF is not recognized')
2484 def set_or_delete(key
, attr
):
2486 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2487 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2490 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2492 set_or_delete('name', vdur
._vdud
.name
)
2493 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2494 # The below can be used for hostname
2495 set_or_delete('vdur_name', vdur
.unique_short_name
)
2497 def remove(self
, vdur_id
):
2498 """Remove all of the data associated with specified VDUR
2501 vdur_id - the identifier of a VNFD in the datastore
2504 A ValueError is raised if the VDUR is not contained in the
2508 if vdur_id
not in self
._vdur
_data
:
2509 raise ValueError('VNF is not recognized')
2511 del self
._vdur
_data
[vdur_id
]
2513 def get(self
, expr
):
2514 """Retrieve VDUR information from the datastore
2516 An expression should be of the form,
2520 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2521 the exposed attribute that the user wishes to retrieve.
2523 If the requested data is not available, None is returned.
2526 expr - a string that specifies the data to return
2529 A ValueError is raised if the provided expression cannot be parsed.
2532 The requested data or None
2535 result
= self
._pattern
.match(expr
)
2537 raise ValueError('data expression not recognized ({})'.format(expr
))
2539 vdur_id
, key
= result
.groups()
2541 if vdur_id
not in self
._vdur
_data
:
2544 return self
._vdur
_data
[vdur_id
].get(key
, None)
2547 class VnfManager(object):
2548 """ The virtual network function manager class """
2549 def __init__(self
, dts
, log
, loop
, cluster_name
):
2553 self
._cluster
_name
= cluster_name
2555 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2556 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2557 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2558 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2560 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2563 self
._vnfr
_ref
_handler
,
2566 self
._vnfds
_to
_vnfr
= {}
2570 def vnfr_handler(self
):
2571 """ VNFR dts handler """
2572 return self
._vnfr
_handler
2575 def vcs_handler(self
):
2576 """ VCS dts handler """
2577 return self
._vcs
_handler
2581 """ Register all static DTS handlers """
2582 for hdl
in self
._dts
_handlers
:
2583 yield from hdl
.register()
2587 """ Run this VNFM instance """
2588 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2589 yield from self.register()
2591 def handle_nsr(self, nsr, action):
2592 if action in [rwdts.QueryAction.CREATE]:
2593 self._nsrs[nsr.id] = nsr
2594 elif action == rwdts.QueryAction.DELETE:
2595 if nsr.id in self._nsrs:
2596 del self._nsrs[nsr.id]
2598 def get_linked_mgmt_network(self, vnfr):
2599 """For the given VNFR get the related mgmt network from the NSD, if
2602 vnfd_id = vnfr.vnfd.id
2603 nsr_id = vnfr.nsr_id_ref
2605 # for the given related VNFR, get the corresponding NSR-config
2608 nsr_obj = self._nsrs[nsr_id]
2610 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2612 # for the related NSD check if a VLD exists such that it's a mgmt
2614 for vld in nsr_obj.nsd.vld:
2615 if vld.mgmt_network:
2620 def get_vnfr(self, vnfr_id):
2621 """ get VNFR by vnfr id """
2623 if vnfr_id not in self._vnfrs:
2624 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2626 return self._vnfrs[vnfr_id]
2628 def create_vnfr(self, vnfr):
2629 """ Create a VNFR instance """
2630 if vnfr.id in self._vnfrs:
2631 msg = "Vnfr
id %s already exists
" % vnfr.id
2632 self._log.error(msg)
2633 raise VnfRecordError(msg)
2635 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2639 mgmt_network = self.get_linked_mgmt_network(vnfr)
2641 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2642 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2643 mgmt_network=mgmt_network
2647 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2648 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2650 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2652 return self._vnfrs[vnfr.id]
2655 def delete_vnfr(self, xact, vnfr):
2656 """ Create a VNFR instance """
2657 if vnfr.vnfr_id in self._vnfrs:
2658 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2659 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2661 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2662 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2663 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2665 del self._vnfrs[vnfr.vnfr_id]
2668 def fetch_vnfd(self, vnfd_id):
2669 """ Fetch VNFDs based with the vnfd id"""
2670 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2671 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2674 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2676 for ent in res_iter:
2677 res = yield from ent
2681 err = "Failed to get Vnfd
%s" % vnfd_id
2682 self._log.error(err)
2683 raise VnfRecordError(err)
2685 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2689 def vnfd_in_use(self, vnfd_id):
2690 """ Is this VNFD in use """
2691 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2692 if vnfd_id in self._vnfds_to_vnfr:
2693 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2697 def publish_vnfr(self, xact, path, msg):
2698 """ Publish a VNFR """
2699 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2701 yield from self.vnfr_handler.update(xact, path, msg)
2704 def delete_vnfd(self, vnfd_id):
2705 """ Delete the Virtual Network Function descriptor with the passed id """
2706 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2707 if vnfd_id in self._vnfds_to_vnfr:
2708 if self._vnfds_to_vnfr[vnfd_id]:
2709 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2711 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2712 raise VirtualNetworkFunctionDescriptorRefCountExists(
2713 "Cannot delete
:%s, ref_count
:%s",
2715 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2717 del self._vnfds_to_vnfr[vnfd_id]
2719 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2721 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2722 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2723 if os.path.exists(vnfd_dir):
2724 shutil.rmtree(vnfd_dir, ignore_errors=True)
2725 except Exception as e:
2726 self._log.error("Exception in cleaning up VNFD
{}: {}".
2727 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2728 self._log.exception(e)
2731 def vnfd_refcount_xpath(self, vnfd_id):
2732 """ xpath for ref count entry """
2733 return (VnfdRefCountDtsHandler.XPATH +
2734 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2737 def get_vnfd_refcount(self, vnfd_id):
2738 """ Get the vnfd_list from this VNFM"""
2740 if vnfd_id is None or vnfd_id == "":
2741 for vnfd in self._vnfds_to_vnfr.keys():
2742 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2743 vnfd_msg.vnfd_id_ref = vnfd
2744 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2745 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2746 elif vnfd_id in self._vnfds_to_vnfr:
2747 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2748 vnfd_msg.vnfd_id_ref = vnfd_id
2749 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2750 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2755 class VnfmTasklet(rift.tasklets.Tasklet):
2756 """ VNF Manager tasklet class """
2757 def __init__(self, *args, **kwargs):
2758 super(VnfmTasklet, self).__init__(*args, **kwargs)
2759 self.rwlog.set_category("rw
-mano
-log
")
2760 self.rwlog.set_subcategory("vnfm
")
2767 super(VnfmTasklet, self).start()
2768 self.log.info("Starting VnfmTasklet
")
2770 self.log.setLevel(logging.DEBUG)
2772 self.log.debug("Registering with dts
")
2773 self._dts = rift.tasklets.DTS(self.tasklet_info,
2774 RwVnfmYang.get_schema(),
2776 self.on_dts_state_change)
2778 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2780 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2783 def on_instance_started(self):
2784 """ Task insance started callback """
2785 self.log.debug("Got instance started callback
")
2791 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2796 """ Task init callback """
2798 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2799 assert vm_parent_name is not None
2800 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2801 yield from self._vnfm.run()
2803 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2808 """ Task run callback """
2812 def on_dts_state_change(self, state):
2813 """Take action according to current dts state to transition
2814 application into the corresponding application state
2817 state - current dts state
2820 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2821 rwdts.State.CONFIG: rwdts.State.RUN,
2825 rwdts.State.INIT: self.init,
2826 rwdts.State.RUN: self.run,
2829 # Transition application to next state
2830 handler = handlers.get(state, None)
2831 if handler is not None:
2832 yield from handler()
2834 # Transition dts to next state
2835 next_state = switch.get(state, None)
2836 if next_state is not None:
2837 self._dts.handle.set_state(next_state)