2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.package
.script
53 import rift
.mano
.dts
as mano_dts
54 from rift
.mano
.utils
.project
import (
60 class VMResourceError(Exception):
61 """ VM resource Error"""
65 class VnfRecordError(Exception):
66 """ VNF record instatiation failed"""
70 class VduRecordError(Exception):
71 """ VDU record instatiation failed"""
75 class NotImplemented(Exception):
76 """Not implemented """
80 class VnfrRecordExistsError(Exception):
81 """VNFR record already exist with the same VNFR id"""
85 class InternalVirtualLinkRecordError(Exception):
86 """Internal virtual link record error"""
90 class VDUImageNotFound(Exception):
91 """VDU Image not found error"""
95 class VirtualDeploymentUnitRecordError(Exception):
96 """VDU Instantiation failed"""
100 class VMNotReadyError(Exception):
101 """ VM Not yet received from resource manager """
105 class VDURecordNotFound(Exception):
106 """ Could not find a VDU record """
110 class VirtualNetworkFunctionRecordDescNotFound(Exception):
111 """ Cannot find Virtual Network Function Record Descriptor """
115 class VirtualNetworkFunctionDescriptorError(Exception):
116 """ Virtual Network Function Record Descriptor Error """
120 class VirtualNetworkFunctionDescriptorNotFound(Exception):
121 """ Virtual Network Function Record Descriptor Not Found """
125 class VirtualNetworkFunctionRecordNotFound(Exception):
126 """ Virtual Network Function Record Not Found """
130 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
131 """ Virtual Network Funtion Descriptor reference count exists """
135 class VnfrInstantiationFailed(Exception):
136 """ Virtual Network Funtion Instantiation failed"""
140 class VNFMPlacementGroupError(Exception):
143 class VirtualNetworkFunctionRecordState(enum
.Enum
):
150 VL_TERMINATE_PHASE
= 6
151 VDU_TERMINATE_PHASE
= 7
156 class VDURecordState(enum
.Enum
):
157 """VDU record state """
160 RESOURCE_ALLOC_PENDING
= 3
167 class VcsComponent(object):
168 """ VCS Component within the VNF descriptor """
169 def __init__(self
, dts
, log
, loop
, cluster_name
,
170 vcs_handler
, component
, mangled_name
):
174 self
._component
= component
175 self
._cluster
_name
= cluster_name
176 self
._vcs
_handler
= vcs_handler
177 self
._mangled
_name
= mangled_name
180 def mangle_name(component_name
, vnf_name
, vnfd_id
):
181 """ mangled component name """
182 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
186 """ name of this component"""
187 return self
._mangled
_name
191 """ The path for this object """
192 return ("D,/rw-manifest:manifest" +
193 "/rw-manifest:operational-inventory" +
194 "/rw-manifest:component" +
195 "[rw-manifest:component-name = '{}']").format(self
.name
)
198 def instance_xpath(self
):
199 """ The path for this object """
200 return("D,/rw-base:vcs" +
203 "[instance-name = '{}']".format(self
._cluster
_name
))
206 def start_comp_xpath(self
):
207 """ start component xpath """
208 return (self
.instance_xpath
+
209 "/child-n[instance-name = 'START-REQ']")
211 def get_start_comp_msg(self
, ip_address
):
212 """ start this component """
213 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
214 start_msg
.instance_name
= 'START-REQ'
215 start_msg
.component_name
= self
.name
216 start_msg
.admin_command
= "START"
217 start_msg
.ip_address
= ip_address
223 """ Returns the message for this vcs component"""
225 vcs_comp_dict
= self
._component
.as_dict()
227 def mangle_comp_names(comp_dict
):
228 """ mangle component name with VNF name, id"""
229 for key
, val
in comp_dict
.items():
230 if isinstance(val
, dict):
231 comp_dict
[key
] = mangle_comp_names(val
)
232 elif isinstance(val
, list):
235 if isinstance(ent
, dict):
236 val
[i
] = mangle_comp_names(ent
)
240 elif key
== "component_name":
241 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
246 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
247 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
251 def publish(self
, xact
):
252 """ Publishes the VCS component """
253 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
254 self
.name
, self
.path
, self
.msg
)
255 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
258 def start(self
, xact
, parent
, ip_addr
=None):
259 """ Starts this VCS component """
260 # ATTN RV - replace with block add
261 start_msg
= self
.get_start_comp_msg(ip_addr
)
262 self
._log
.debug("starting component %s %s",
263 self
.start_comp_xpath
, start_msg
)
264 yield from self
._dts
.query_create(self
.start_comp_xpath
,
267 self
._log
.debug("started component %s, %s",
268 self
.start_comp_xpath
, start_msg
)
271 class VirtualDeploymentUnitRecord(object):
272 """ Virtual Deployment Unit Record """
285 placement_groups
=[]):
289 self
._project
= project
292 self
._mgmt
_intf
= mgmt_intf
293 self
._cloud
_account
_name
= cloud_account_name
294 self
._vnfd
_package
_store
= vnfd_package_store
295 self
._mgmt
_network
= mgmt_network
297 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
300 self
._state
= VDURecordState
.INIT
301 self
._state
_failed
_reason
= None
302 self
._request
_id
= str(uuid
.uuid4())
303 self
._name
= vnfr
.name
+ "__" + vdud
.id
304 self
._placement
_groups
= placement_groups
307 self
._vdud
_cloud
_init
= None
308 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(
309 dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
312 def vdu_opdata_register(self
):
313 yield from self
._vdur
_console
_handler
.register()
315 def cp_ip_addr(self
, cp_name
):
316 """ Find ip address by connection point name """
317 if self
._vm
_resp
is not None:
318 for conn_point
in self
._vm
_resp
.connection_points
:
319 if conn_point
.name
== cp_name
:
320 return conn_point
.ip_address
323 def cp_mac_addr(self
, cp_name
):
324 """ Find mac address by connection point name """
325 if self
._vm
_resp
is not None:
326 for conn_point
in self
._vm
_resp
.connection_points
:
327 if conn_point
.name
== cp_name
:
328 return conn_point
.mac_addr
329 return "00:00:00:00:00:00"
331 def cp_id(self
, cp_name
):
332 """ Find connection point id by connection point name """
333 if self
._vm
_resp
is not None:
334 for conn_point
in self
._vm
_resp
.connection_points
:
335 if conn_point
.name
== cp_name
:
336 return conn_point
.connection_point_id
349 """ Return this VDUR's name """
353 def cloud_account_name(self
):
354 """ Cloud account this VDU should be created in """
355 return self
._cloud
_account
_name
358 def image_name(self
):
359 """ name that should be used to lookup the image on the CMP """
360 if 'image' not in self
._vdud
:
362 return os
.path
.basename(self
._vdud
.image
)
365 def image_checksum(self
):
366 """ name that should be used to lookup the image on the CMP """
367 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
370 def management_ip(self
):
373 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
376 def vm_management_ip(self
):
379 return self
._vm
_resp
.management_ip
382 def operational_status(self
):
383 """ Operational status of this VDU"""
384 op_stats_dict
= {"INIT": "init",
385 "INSTANTIATING": "vm_init_phase",
386 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
389 "TERMINATING": "terminated",
390 "TERMINATED": "terminated",
392 return op_stats_dict
[self
._state
.name
]
396 """ Process VDU message from resmgr"""
397 vdu_fields
= ["vm_flavor",
404 vdu_copy_dict
= {k
: v
for k
, v
in
405 self
._vdud
.as_dict().items() if k
in vdu_fields
}
406 vdur_dict
= {"id": self
._vdur
_id
,
407 "vdu_id_ref": self
._vdud
.id,
408 "operational_status": self
.operational_status
,
409 "operational_status_details": self
._state
_failed
_reason
,
411 if self
.vm_resp
is not None:
412 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
413 "flavor_id": self
.vm_resp
.flavor_id
415 if self
._vm
_resp
.has_field('image_id'):
416 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
418 if self
.management_ip
is not None:
419 vdur_dict
["management_ip"] = self
.management_ip
421 if self
.vm_management_ip
is not None:
422 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
424 vdur_dict
.update(vdu_copy_dict
)
426 if self
.vm_resp
is not None:
427 if self
._vm
_resp
.has_field('volumes'):
428 for opvolume
in self
._vm
_resp
.volumes
:
429 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
430 if len(vdurvol_data
) == 1:
431 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
432 if opvolume
.has_field('custom_meta_data'):
433 metadata_list
= list()
434 for metadata_item
in opvolume
.custom_meta_data
:
435 metadata_list
.append(metadata_item
.as_dict())
436 vdurvol_data
[0]['custom_meta_data'] = metadata_list
438 if self
._vm
_resp
.has_field('supplemental_boot_data'):
439 vdur_dict
['supplemental_boot_data'] = dict()
440 if self
._vm
_resp
.supplemental_boot_data
.has_field('boot_data_drive'):
441 vdur_dict
['supplemental_boot_data']['boot_data_drive'] = self
._vm
_resp
.supplemental_boot_data
.boot_data_drive
442 if self
._vm
_resp
.supplemental_boot_data
.has_field('custom_meta_data'):
443 metadata_list
= list()
444 for metadata_item
in self
._vm
_resp
.supplemental_boot_data
.custom_meta_data
:
445 metadata_list
.append(metadata_item
.as_dict())
446 vdur_dict
['supplemental_boot_data']['custom_meta_data'] = metadata_list
447 if self
._vm
_resp
.supplemental_boot_data
.has_field('config_file'):
449 for file_item
in self
._vm
_resp
.supplemental_boot_data
.config_file
:
450 file_list
.append(file_item
.as_dict())
451 vdur_dict
['supplemental_boot_data']['config_file'] = file_list
456 for intf
, cp_id
, vlr
in self
._int
_intf
:
457 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
459 icp_list
.append({"name": cp
.name
,
461 "type_yang": "VPORT",
462 "ip_address": self
.cp_ip_addr(cp
.id),
463 "mac_address": self
.cp_mac_addr(cp
.id)})
465 ii_list
.append({"name": intf
.name
,
466 "vdur_internal_connection_point_ref": cp
.id,
467 "virtual_interface": {}})
469 vdur_dict
["internal_connection_point"] = icp_list
470 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
471 vdur_dict
["internal_interface"] = ii_list
474 for intf
, cp
, vlr
in self
._ext
_intf
:
475 ei_list
.append({"name": cp
.name
,
476 "vnfd_connection_point_ref": cp
.name
,
477 "virtual_interface": {}})
478 self
._vnfr
.update_cp(cp
.name
,
479 self
.cp_ip_addr(cp
.name
),
480 self
.cp_mac_addr(cp
.name
),
483 vdur_dict
["external_interface"] = ei_list
485 placement_groups
= []
486 for group
in self
._placement
_groups
:
487 placement_groups
.append(group
.as_dict())
488 vdur_dict
['placement_groups_info'] = placement_groups
490 return RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
493 def resmgr_path(self
):
494 """ path for resource-mgr"""
495 xpath
= self
._project
.add_project("D,/rw-resource-mgr:resource-mgmt" +
497 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
501 def vm_flavor_msg(self
):
502 """ VM flavor message """
503 flavor
= self
._vdud
.vm_flavor
.__class
__()
504 flavor
.copy_from(self
._vdud
.vm_flavor
)
509 def vdud_cloud_init(self
):
510 """ Return the cloud-init contents for the VDU """
511 if self
._vdud
_cloud
_init
is None:
512 self
._vdud
_cloud
_init
= self
.cloud_init()
514 return self
._vdud
_cloud
_init
516 def cloud_init(self
):
517 """ Populate cloud_init with cloud-config script from
518 either the inline contents or from the file provided
520 if self
._vdud
.cloud_init
is not None:
521 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
522 return self
._vdud
.cloud_init
523 elif self
._vdud
.cloud_init_file
is not None:
524 # Get cloud-init script contents from the file provided in the cloud_init_file param
525 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
526 filename
= self
._vdud
.cloud_init_file
527 self
._vnfd
_package
_store
.refresh()
528 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
529 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
531 return cloud_init_extractor
.read_script(stored_package
, filename
)
532 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
533 self
.instantiation_failed(str(e
))
534 raise VirtualDeploymentUnitRecordError(e
)
536 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
538 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
540 availability_zones
= []
542 for group
in self
._placement
_groups
:
543 if group
.has_field('host_aggregate'):
544 for aggregate
in group
.host_aggregate
:
545 host_aggregates
.append(aggregate
.as_dict())
546 if group
.has_field('availability_zone'):
547 availability_zones
.append(group
.availability_zone
.as_dict())
548 if group
.has_field('server_group'):
549 server_groups
.append(group
.server_group
.as_dict())
551 if availability_zones
:
552 if len(availability_zones
) > 1:
553 self
._log
.error("Can not launch VDU: %s in multiple availability zones. " +
554 "Requested Zones: %s", self
.name
, availability_zones
)
555 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability" +
556 " zones. Requsted Zones".format(self
.name
, availability_zones
))
558 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
561 if len(server_groups
) > 1:
562 self
._log
.error("Can not launch VDU: %s in multiple Server Group. " +
563 "Requested Groups: %s", self
.name
, server_groups
)
564 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple " +
565 "Server Groups. Requsted Groups".format(self
.name
, server_groups
))
567 vm_create_msg_dict
['server_group'] = server_groups
[0]
570 vm_create_msg_dict
['host_aggregate'] = host_aggregates
574 def process_placement_groups(self
, vm_create_msg_dict
):
575 """Process the placement_groups and fill resource-mgr request"""
576 if not self
._placement
_groups
:
579 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
580 assert len(cloud_set
) == 1
581 cloud_type
= cloud_set
.pop()
583 if cloud_type
== 'openstack':
584 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
587 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
590 def process_custom_bootdata(self
, vm_create_msg_dict
):
591 """Process the custom boot data"""
592 if 'config_file' not in vm_create_msg_dict
['supplemental_boot_data']:
595 self
._vnfd
_package
_store
.refresh()
596 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
597 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
598 for file_item
in vm_create_msg_dict
['supplemental_boot_data']['config_file']:
599 if 'source' not in file_item
or 'dest' not in file_item
:
601 source
= file_item
['source']
602 # Find source file in scripts dir of VNFD
603 self
._log
.debug("Checking for source config file at %s", source
)
605 source_file_str
= cloud_init_extractor
.read_script(stored_package
, source
)
606 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
607 raise VirtualDeploymentUnitRecordError(e
)
608 # Update source file location with file contents
609 file_item
['source'] = source_file_str
613 def resmgr_msg(self
, config
=None):
614 vdu_fields
= ["vm_flavor",
620 "supplemental_boot_data"]
622 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
623 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
625 vm_create_msg_dict
= {
629 if self
.image_name
is not None:
630 vm_create_msg_dict
["image_name"] = self
.image_name
632 if self
.image_checksum
is not None:
633 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
635 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
636 if self
._vdud
.has_field('mgmt_vpci'):
637 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
639 self
._log
.debug("VDUD: %s", self
._vdud
)
640 if config
is not None:
641 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
643 if self
._mgmt
_network
:
644 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
647 for intf
, cp
, vlr
in self
._ext
_intf
:
648 cp_info
= { "name": cp
.name
,
649 "virtual_link_id": vlr
.network_id
,
650 "type_yang": intf
.virtual_interface
.type_yang
}
652 if cp
.has_field('port_security_enabled'):
653 cp_info
["port_security_enabled"] = cp
.port_security_enabled
655 if (intf
.virtual_interface
.has_field('vpci') and
656 intf
.virtual_interface
.vpci
is not None):
657 cp_info
["vpci"] = intf
.virtual_interface
.vpci
659 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
660 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
662 cp_list
.append(cp_info
)
664 for intf
, cp
, vlr
in self
._int
_intf
:
665 if (intf
.virtual_interface
.has_field('vpci') and
666 intf
.virtual_interface
.vpci
is not None):
667 cp_list
.append({"name": cp
,
668 "virtual_link_id": vlr
.network_id
,
669 "type_yang": intf
.virtual_interface
.type_yang
,
670 "vpci": intf
.virtual_interface
.vpci
})
672 if cp
.has_field('port_security_enabled'):
673 cp_list
.append({"name": cp
,
674 "virtual_link_id": vlr
.network_id
,
675 "type_yang": intf
.virtual_interface
.type_yang
,
676 "port_security_enabled": cp
.port_security_enabled
})
678 cp_list
.append({"name": cp
,
679 "virtual_link_id": vlr
.network_id
,
680 "type_yang": intf
.virtual_interface
.type_yang
})
683 vm_create_msg_dict
["connection_points"] = cp_list
684 vm_create_msg_dict
.update(vdu_copy_dict
)
686 self
.process_placement_groups(vm_create_msg_dict
)
687 if 'supplemental_boot_data' in vm_create_msg_dict
:
688 self
.process_custom_bootdata(vm_create_msg_dict
)
690 msg
= RwResourceMgrYang
.VDUEventData()
691 msg
.event_id
= self
._request
_id
692 msg
.cloud_account
= self
.cloud_account_name
693 msg
.request_info
.from_dict(vm_create_msg_dict
)
698 def terminate(self
, xact
):
699 """ Delete resource in VIM """
700 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
701 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
704 self
._state
= VDURecordState
.TERMINATING
705 if self
._vm
_resp
is not None:
707 with self
._dts
.transaction() as new_xact
:
708 yield from self
.delete_resource(new_xact
)
710 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
712 if self
._rm
_regh
is not None:
713 self
._log
.debug("Deregistering resource manager registration handle")
714 self
._rm
_regh
.deregister()
717 if self
._vdur
_console
_handler
is not None:
718 self
._log
.debug("Deregistering vnfr vdur registration handle")
719 self
._vdur
_console
_handler
._regh
.deregister()
720 self
._vdur
_console
_handler
._regh
= None
722 self
._state
= VDURecordState
.TERMINATED
724 def find_internal_cp_by_cp_id(self
, cp_id
):
725 """ Find the CP corresponding to the connection point id"""
728 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
731 for int_cp
in self
._vdud
.internal_connection_point
:
732 self
._log
.debug("Checking for int cp %s in internal connection points",
734 if int_cp
.id == cp_id
:
739 self
._log
.debug("Failed to find cp %s in internal connection points",
741 msg
= "Failed to find cp %s in internal connection points" % cp_id
742 raise VduRecordError(msg
)
744 # return the VLR associated with the connection point
748 def create_resource(self
, xact
, vnfr
, config
=None):
749 """ Request resource from ResourceMgr """
750 def find_cp_by_name(cp_name
):
751 """ Find a connection point by name """
753 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
754 for ext_cp
in vnfr
._cprs
:
755 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
756 if ext_cp
.name
== cp_name
:
760 self
._log
.debug("Failed to find cp %s in external connection points",
764 def find_internal_vlr_by_cp_name(cp_name
):
765 """ Find the VLR corresponding to the connection point name"""
768 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
771 for int_cp
in self
._vdud
.internal_connection_point
:
772 self
._log
.debug("Checking for int cp %s in internal connection points",
774 if int_cp
.id == cp_name
:
779 self
._log
.debug("Failed to find cp %s in internal connection points",
781 msg
= "Failed to find cp %s in internal connection points" % cp_name
782 raise VduRecordError(msg
)
784 # return the VLR associated with the connection point
785 return vnfr
.find_vlr_by_cp(cp_name
)
787 block
= xact
.block_create()
789 self
._log
.debug("Executing vm request id: %s, action: create",
792 # Resolve the networks associated external interfaces
793 for ext_intf
in self
._vdud
.external_interface
:
794 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
795 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
796 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
798 self
._log
.debug("Failed to find connection point - %s",
799 ext_intf
.vnfd_connection_point_ref
)
801 self
._log
.debug("Connection point name [%s], type[%s]",
802 cp
.name
, cp
.type_yang
)
804 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
806 etuple
= (ext_intf
, cp
, vlr
)
807 self
._ext
_intf
.append(etuple
)
809 self
._log
.debug("Created external interface tuple : %s", etuple
)
811 # Resolve the networks associated internal interfaces
812 for intf
in self
._vdud
.internal_interface
:
813 cp_id
= intf
.vdu_internal_connection_point_ref
814 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
818 vlr
= find_internal_vlr_by_cp_name(cp_id
)
819 except Exception as e
:
820 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
821 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
822 raise VduRecordError(msg
)
824 ituple
= (intf
, cp_id
, vlr
)
825 self
._int
_intf
.append(ituple
)
827 self
._log
.debug("Created internal interface tuple : %s", ituple
)
829 resmgr_path
= self
.resmgr_path
830 resmgr_msg
= self
.resmgr_msg(config
)
832 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
833 block
.add_query_create(resmgr_path
, resmgr_msg
)
835 res_iter
= yield from block
.execute(now
=True)
843 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
844 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
845 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
846 return resp
.resource_info
849 def delete_resource(self
, xact
):
850 block
= xact
.block_create()
852 self
._log
.debug("Executing vm request id: %s, action: delete",
855 block
.add_query_delete(self
.resmgr_path
)
857 yield from block
.execute(flags
=0, now
=True)
860 def read_resource(self
, xact
):
861 block
= xact
.block_create()
863 self
._log
.debug("Executing vm request id: %s, action: delete",
866 block
.add_query_read(self
.resmgr_path
)
868 res_iter
= yield from block
.execute(flags
=0, now
=True)
873 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
874 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
875 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
876 #self._vm_resp = resp.resource_info
877 return resp
.resource_info
881 def start_component(self
):
882 """ This VDUR is active """
883 self
._log
.debug("Starting component %s for vdud %s vdur %s",
884 self
._vdud
.vcs_component_ref
,
887 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
888 self
.vm_resp
.management_ip
)
892 """ Is this VDU active """
893 return True if self
._state
is VDURecordState
.READY
else False
896 def instantiation_failed(self
, failed_reason
=None):
897 """ VDU instantiation failed """
898 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
899 self
._state
= VDURecordState
.FAILED
900 self
._state
_failed
_reason
= failed_reason
901 yield from self
._vnfr
.instantiation_failed(failed_reason
)
904 def vdu_is_active(self
):
905 """ This VDU is active"""
907 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
910 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
912 if self
._vdud
.vcs_component_ref
is not None:
913 yield from self
.start_component()
915 self
._state
= VDURecordState
.READY
917 if self
._vnfr
.all_vdus_active():
918 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
919 yield from self
._vnfr
.is_ready()
922 def instantiate(self
, xact
, vnfr
, config
=None):
923 """ Instantiate this VDU """
924 self
._state
= VDURecordState
.INSTANTIATING
927 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
928 """ This VDUR is active """
929 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
934 if (query_action
== rwdts
.QueryAction
.UPDATE
or
935 query_action
== rwdts
.QueryAction
.CREATE
):
938 if msg
.resource_state
== "active":
939 # Move this VDU to ready state
940 yield from self
.vdu_is_active()
941 elif msg
.resource_state
== "failed":
942 yield from self
.instantiation_failed(msg
.resource_errors
)
943 elif query_action
== rwdts
.QueryAction
.DELETE
:
944 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
946 raise NotImplementedError(
947 "%s action on VirtualDeployementUnitRecord not supported",
950 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
953 reg_event
= asyncio
.Event(loop
=self
._loop
)
956 def on_ready(regh
, status
):
959 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
960 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
961 flags
=rwdts
.Flag
.SUBSCRIBER
,
963 yield from reg_event
.wait()
965 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
966 self
._vm
_resp
= vm_resp
967 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
969 self
._log
.debug("Requested VM from resource manager response %s",
971 if vm_resp
.resource_state
== "active":
972 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
974 yield from self
.vdu_is_active()
975 self
._state
= VDURecordState
.READY
976 elif (vm_resp
.resource_state
== "pending" or
977 vm_resp
.resource_state
== "inactive"):
978 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
980 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
981 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
982 # flags=rwdts.Flag.SUBSCRIBER,
985 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
987 raise VirtualDeploymentUnitRecordError(
988 "Failed VDUR instantiation %s " % vm_resp
)
990 except Exception as e
:
992 traceback
.print_exc()
993 self
._log
.exception(e
)
994 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
995 self
._state
= VDURecordState
.FAILED
996 yield from self
.instantiation_failed(str(e
))
999 class VlRecordState(enum
.Enum
):
1000 """ VL Record State """
1002 INSTANTIATION_PENDING
= 102
1004 TERMINATE_PENDING
= 104
1009 class InternalVirtualLinkRecord(object):
1010 """ Internal Virtual Link record """
1011 def __init__(self
, dts
, log
, loop
, project
,
1012 ivld_msg
, vnfr_name
, cloud_account_name
, ip_profile
=None):
1016 self
._project
= project
1017 self
._ivld
_msg
= ivld_msg
1018 self
._vnfr
_name
= vnfr_name
1019 self
._cloud
_account
_name
= cloud_account_name
1020 self
._ip
_profile
= ip_profile
1022 self
._vlr
_req
= self
.create_vlr()
1024 self
._state
= VlRecordState
.INIT
1028 """ Find VLR by id """
1029 return self
._vlr
_req
.id
1033 """ Name of this VL """
1034 if self
._ivld
_msg
.vim_network_name
:
1035 return self
._ivld
_msg
.vim_network_name
1037 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
1040 def network_id(self
):
1041 """ Find VLR by id """
1042 return self
._vlr
.network_id
if self
._vlr
else None
1045 """ VLR path for this VLR instance"""
1046 return self
._project
.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".
1047 format(self
.vlr_id
))
1049 def create_vlr(self
):
1050 """ Create the VLR record which will be instantiated """
1052 vld_fields
= ["short_name",
1060 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
1062 vlr_dict
= {"id": str(uuid
.uuid4()),
1064 "cloud_account": self
._cloud
_account
_name
,
1067 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
1068 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
1070 vlr_dict
.update(vld_copy_dict
)
1072 vlr
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr
.from_dict(vlr_dict
)
1076 def instantiate(self
, xact
, restart_mode
=False):
1077 """ Instantiate VL """
1080 def instantiate_vlr():
1081 """ Instantiate VLR"""
1082 self
._log
.debug("Create VL with xpath %s and vlr %s",
1083 self
.vlr_path(), self
._vlr
_req
)
1085 with self
._dts
.transaction(flags
=0) as xact
:
1086 block
= xact
.block_create()
1087 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1088 self
._log
.debug("Executing VL create path:%s msg:%s",
1089 self
.vlr_path(), self
._vlr
_req
)
1093 res_iter
= yield from block
.execute()
1095 self
._state
= VlRecordState
.FAILED
1096 self
._log
.exception("Caught exception while instantial VL")
1099 for ent
in res_iter
:
1100 res
= yield from ent
1101 self
._vlr
= res
.result
1103 if self
._vlr
.operational_status
== 'failed':
1104 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1105 self
._state
= VlRecordState
.FAILED
1106 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1108 self
._log
.info("Created VL with xpath %s and vlr %s",
1109 self
.vlr_path(), self
._vlr
)
1113 """ Get the network id """
1114 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1116 for ent
in res_iter
:
1117 res
= yield from ent
1121 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1123 raise InternalVirtualLinkRecordError(err
)
1126 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1129 vl
= yield from get_vlr()
1131 yield from instantiate_vlr()
1133 yield from instantiate_vlr()
1135 self
._state
= VlRecordState
.ACTIVE
1137 def vlr_in_vns(self
):
1138 """ Is there a VLR record in VNS """
1139 if (self
._state
== VlRecordState
.ACTIVE
or
1140 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1141 self
._state
== VlRecordState
.FAILED
):
1147 def terminate(self
, xact
):
1148 """Terminate this VL """
1149 if not self
.vlr_in_vns():
1150 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1151 self
.vlr_id
, self
._state
)
1154 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1155 self
._state
= VlRecordState
.TERMINATE_PENDING
1156 block
= xact
.block_create()
1157 block
.add_query_delete(self
.vlr_path())
1158 yield from block
.execute(flags
=0, now
=True)
1159 self
._state
= VlRecordState
.TERMINATED
1160 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1163 class VirtualNetworkFunctionRecord(object):
1164 """ Virtual Network Function Record """
1165 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1169 self
._project
= vnfm
._project
1170 self
._cluster
_name
= cluster_name
1171 self
._vnfr
_msg
= vnfr_msg
1172 self
._vnfr
_id
= vnfr_msg
.id
1173 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1175 self
._vcs
_handler
= vcs_handler
1176 self
._vnfr
= vnfr_msg
1177 self
._mgmt
_network
= mgmt_network
1179 self
._vnfd
= vnfr_msg
.vnfd
1180 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1181 self
._state
_failed
_reason
= None
1182 self
._ext
_vlrs
= {} # The list of external virtual links
1183 self
._vlrs
= [] # The list of internal virtual links
1184 self
._vdus
= [] # The list of vdu
1185 self
._vlr
_by
_cp
= {}
1187 self
._inventory
= {}
1188 self
._create
_time
= int(time
.time())
1189 self
._vnf
_mon
= None
1190 self
._config
_status
= vnfr_msg
.config_status
1191 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1192 self
._rw
_vnfd
= None
1193 self
._vnfd
_ref
_count
= 0
1195 def _get_vdur_from_vdu_id(self
, vdu_id
):
1196 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1197 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1198 for vdu
in self
._vdus
:
1199 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1200 if vdu
.vdu_id
== vdu_id
:
1203 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1206 def operational_status(self
):
1207 """ Operational status of this VNFR """
1208 op_status_map
= {"INIT": "init",
1209 "VL_INIT_PHASE": "vl_init_phase",
1210 "VM_INIT_PHASE": "vm_init_phase",
1212 "TERMINATE": "terminate",
1213 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1214 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1215 "TERMINATED": "terminated",
1216 "FAILED": "failed", }
1217 return op_status_map
[self
._state
.name
]
1220 def vnfd_xpath(vnfd_id
):
1221 """ VNFD xpath associated with this VNFR """
1222 return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd[project-vnfd:id = '{}']".
1226 def vnfd_ref_count(self
):
1227 """ Returns the VNFD reference count associated with this VNFR """
1228 return self
._vnfd
_ref
_count
1230 def vnfd_in_use(self
):
1231 """ Returns whether vnfd is in use or not """
1232 return True if self
._vnfd
_ref
_count
> 0 else False
1235 """ Take a reference on this object """
1236 self
._vnfd
_ref
_count
+= 1
1237 return self
._vnfd
_ref
_count
1239 def vnfd_unref(self
):
1240 """ Release reference on this object """
1241 if self
._vnfd
_ref
_count
< 1:
1242 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1243 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1244 self
._log
.critical(msg
)
1245 raise VnfRecordError(msg
)
1246 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1247 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1248 self
._vnfd
_ref
_count
-= 1
1249 return self
._vnfd
_ref
_count
1253 """ VNFD for this VNFR """
1258 """ VNFD name associated with this VNFR """
1259 return self
.vnfd
.name
1263 """ Name of this VNF in the record """
1264 return self
._vnfr
.name
1267 def cloud_account_name(self
):
1268 """ Name of the cloud account this VNFR is instantiated in """
1269 return self
._vnfr
.cloud_account
1273 """ VNFD Id associated with this VNFR """
1278 """ VNFR Id associated with this VNFR """
1279 return self
._vnfr
_id
1282 def member_vnf_index(self
):
1283 """ Member VNF index associated with this VNFR """
1284 return self
._vnfr
.member_vnf_index_ref
1287 def config_status(self
):
1288 """ Config agent status for this VNFR """
1289 return self
._config
_status
1291 def component_by_name(self
, component_name
):
1292 """ Find a component by name in the inventory list"""
1293 mangled_name
= VcsComponent
.mangle_name(component_name
,
1296 return self
._inventory
[mangled_name
]
1301 def get_nsr_config(self
):
1302 ### Need access to NS instance configuration for runtime resolution.
1303 ### This shall be replaced when deployment flavors are implemented
1304 xpath
= self
._project
.add_project("C,/nsr:ns-instance-config")
1305 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1307 for result
in results
:
1308 entry
= yield from result
1309 ns_instance_config
= entry
.result
1310 for nsr
in ns_instance_config
.nsr
:
1311 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1316 def start_component(self
, component_name
, ip_addr
):
1317 """ Start a component in the VNFR by name """
1318 comp
= self
.component_by_name(component_name
)
1319 yield from comp
.start(None, None, ip_addr
)
1321 def cp_ip_addr(self
, cp_name
):
1322 """ Get ip address for connection point """
1323 self
._log
.debug("cp_ip_addr()")
1324 for cp
in self
._cprs
:
1325 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1326 return cp
.ip_address
1329 def mgmt_intf_info(self
):
1330 """ Get Management interface info for this VNFR """
1331 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1333 if mgmt_intf_desc
.has_field("cp"):
1334 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1335 elif mgmt_intf_desc
.has_field("vdu_id"):
1337 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1338 ip_addr
= vdur
.management_ip
1339 except VDURecordNotFound
:
1340 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1343 ip_addr
= mgmt_intf_desc
.ip_address
1344 port
= mgmt_intf_desc
.port
1346 return ip_addr
, port
1350 """ Message associated with this VNFR """
1351 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1352 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1354 mgmt_intf
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_MgmtInterface()
1355 ip_address
, port
= self
.mgmt_intf_info()
1357 if ip_address
is not None:
1358 mgmt_intf
.ip_address
= ip_address
1359 if port
is not None:
1360 mgmt_intf
.port
= port
1362 vnfr_dict
= {"id": self
._vnfr
_id
,
1363 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1365 "member_vnf_index_ref": self
.member_vnf_index
,
1366 "operational_status": self
.operational_status
,
1367 "operational_status_details": self
._state
_failed
_reason
,
1368 "cloud_account": self
.cloud_account_name
,
1369 "config_status": self
._config
_status
1372 vnfr_dict
.update(vnfd_copy_dict
)
1374 vnfr_msg
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1375 vnfr_msg
.vnfd
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1377 vnfr_msg
.create_time
= self
._create
_time
1378 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1379 vnfr_msg
.mgmt_interface
= mgmt_intf
1381 # Add all the VLRs to VNFR
1382 for vlr
in self
._vlrs
:
1383 ivlr
= vnfr_msg
.internal_vlr
.add()
1384 ivlr
.vlr_ref
= vlr
.vlr_id
1386 # Add all the VDURs to VDUR
1387 if self
._vdus
is not None:
1388 for vdu
in self
._vdus
:
1389 vdur
= vnfr_msg
.vdur
.add()
1390 vdur
.from_dict(vdu
.msg
.as_dict())
1392 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1393 vnfr_msg
.dashboard_url
= self
.dashboard_url
1395 for cpr
in self
._cprs
:
1396 new_cp
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1397 vnfr_msg
.connection_point
.append(new_cp
)
1399 if self
._vnf
_mon
is not None:
1400 for monp
in self
._vnf
_mon
.msg
:
1401 vnfr_msg
.monitoring_param
.append(
1402 VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1404 if self
._vnfr
.vnf_configuration
is not None:
1405 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1406 if (ip_address
is not None and
1407 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1408 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1410 for group
in self
._vnfr
_msg
.placement_groups_info
:
1411 group_info
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1412 group_info
.from_dict(group
.as_dict())
1413 vnfr_msg
.placement_groups_info
.append(group_info
)
1418 def dashboard_url(self
):
1419 ip
, cfg_port
= self
.mgmt_intf_info()
1422 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1423 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1426 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1427 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1429 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1433 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1440 """ path for this VNFR """
1441 return self
._project
.add_project("D,/vnfr:vnfr-catalog"
1442 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1445 def publish(self
, xact
):
1446 """ publish this VNFR """
1448 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1449 self
.xpath
, self
.msg
)
1450 vnfr
.create_time
= self
._create
_time
1451 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1452 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1453 self
.xpath
, self
.msg
)
1455 def resolve_vld_ip_profile(self
, vnfd_msg
, vld
):
1456 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1457 if not vld
.has_field('ip_profile_ref'):
1459 profile
= [profile
for profile
in vnfd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1460 return profile
[0] if profile
else None
1463 def create_vls(self
):
1464 """ Publish The VLs associated with this VNF """
1465 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1467 for ivld_msg
in self
.vnfd
.internal_vld
:
1468 self
._log
.debug("Creating internal vld:"
1469 " %s, int_cp_ref = %s",
1470 ivld_msg
, ivld_msg
.internal_connection_point
1472 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1476 vnfr_name
=self
.name
,
1477 cloud_account_name
=self
.cloud_account_name
,
1478 ip_profile
=self
.resolve_vld_ip_profile(self
.vnfd
, ivld_msg
)
1480 self
._vlrs
.append(vlr
)
1482 for int_cp
in ivld_msg
.internal_connection_point
:
1483 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1484 msg
= ("Connection point %s already "
1485 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1486 raise InternalVirtualLinkRecordError(msg
)
1487 self
._log
.debug("Setting vlr %s to internal cp = %s",
1489 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1492 def instantiate_vls(self
, xact
, restart_mode
=False):
1493 """ Instantiate the VLs associated with this VNF """
1494 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1497 for vlr
in self
._vlrs
:
1498 self
._log
.debug("Instantiating VLR %s", vlr
)
1499 yield from vlr
.instantiate(xact
, restart_mode
)
1501 def find_vlr_by_cp(self
, cp_name
):
1502 """ Find the VLR associated with the cp name """
1503 return self
._vlr
_by
_cp
[cp_name
]
1505 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1507 Returns the cloud specific construct for placement group
1509 input_group: VNFD PlacementGroup
1510 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1512 copy_dict
= ['name', 'requirement', 'strategy']
1513 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1514 if group_info
.placement_group_ref
== input_group
.name
and \
1515 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1516 group
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1517 group_dict
= {k
:v
for k
,v
in
1518 group_info
.as_dict().items()
1519 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1520 for param
in copy_dict
:
1521 group_dict
.update({param
: getattr(input_group
, param
)})
1522 group
.from_dict(group_dict
)
1527 def get_vdu_placement_groups(self
, vdu
):
1528 placement_groups
= []
1529 ### Step-1: Get VNF level placement groups
1530 for group
in self
._vnfr
_msg
.placement_groups_info
:
1531 #group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1532 #group_info.from_dict(group.as_dict())
1533 placement_groups
.append(group
)
1535 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1536 nsr_config
= yield from self
.get_nsr_config()
1538 ### Step-3: Get VDU level placement groups
1539 for group
in self
.vnfd
.placement_groups
:
1540 for member_vdu
in group
.member_vdus
:
1541 if member_vdu
.member_vdu_ref
== vdu
.id:
1542 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1544 if group_info
is None:
1545 self
._log
.info("Could not resolve cloud-construct for " +
1546 "placement group: %s", group
.name
)
1548 self
._log
.info("Successfully resolved cloud construct for " +
1549 "placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1553 self
.member_vnf_index
)
1554 placement_groups
.append(group_info
)
1556 return placement_groups
1559 def vdu_cloud_init_instantiation(self
):
1560 [vdu
.vdud_cloud_init
for vdu
in self
._vdus
]
1563 def create_vdus(self
, vnfr
, restart_mode
=False):
1564 """ Create the VDUs associated with this VNF """
1566 def get_vdur_id(vdud
):
1567 """Get the corresponding VDUR's id for the VDUD. This is useful in
1570 In restart mode we check for exiting VDUR's ID and use them, if
1571 available. This way we don't end up creating duplicate VDURs
1575 if restart_mode
and vdud
is not None:
1577 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1580 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1585 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1586 for vdu
in self
._rw
_vnfd
.vdu
:
1587 self
._log
.debug("Creating vdu: %s", vdu
)
1588 vdur_id
= get_vdur_id(vdu
)
1590 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1591 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1594 self
.member_vnf_index
,
1595 [ group
.name
for group
in placement_groups
])
1597 vdur
= VirtualDeploymentUnitRecord(
1601 project
= self
._project
,
1604 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1605 mgmt_network
=self
._mgmt
_network
,
1606 cloud_account_name
=self
.cloud_account_name
,
1607 vnfd_package_store
=self
._vnfd
_package
_store
,
1609 placement_groups
= placement_groups
,
1611 yield from vdur
.vdu_opdata_register()
1613 self
._vdus
.append(vdur
)
1616 def instantiate_vdus(self
, xact
, vnfr
):
1617 """ Instantiate the VDUs associated with this VNF """
1618 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1620 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1622 # Identify any dependencies among the VDUs
1623 dependencies
= collections
.defaultdict(list)
1624 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1626 for vdu
in self
._vdus
:
1627 if vdu
._vdud
_cloud
_init
is not None:
1628 for vdu_id
in vdu_id_pattern
.findall(vdu
._vdud
_cloud
_init
):
1629 if vdu_id
!= vdu
.vdu_id
:
1630 # This means that vdu.vdu_id depends upon vdu_id,
1631 # i.e. vdu_id must be instantiated before
1633 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1635 # Define the terminal states of VDU instantiation
1637 VDURecordState
.READY
,
1638 VDURecordState
.TERMINATED
,
1639 VDURecordState
.FAILED
,
1642 datastore
= VdurDatastore()
1646 def instantiate_monitor(vdu
):
1647 """Monitor the state of the VDU during instantiation
1650 vdu - a VirtualDeploymentUnitRecord
1653 # wait for the VDUR to enter a terminal state
1654 while vdu
._state
not in terminal
:
1655 yield from asyncio
.sleep(1, loop
=self
._loop
)
1656 # update the datastore
1657 datastore
.update(vdu
)
1659 # add the VDU to the set of processed VDUs
1660 processed
.add(vdu
.vdu_id
)
1663 def instantiate(vdu
):
1664 """Instantiate the specified VDU
1667 vdu - a VirtualDeploymentUnitRecord
1670 if the VDU, or any of the VDUs this VDU depends upon, are
1671 terminated or fail to instantiate properly, a
1672 VirtualDeploymentUnitRecordError is raised.
1675 for dependency
in dependencies
[vdu
.vdu_id
]:
1676 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1678 while dependency
.vdu_id
not in processed
:
1679 yield from asyncio
.sleep(1, loop
=self
._loop
)
1681 if not dependency
.active
:
1682 raise VirtualDeploymentUnitRecordError()
1684 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1686 # Populate the datastore with the current values of the VDU
1689 # Substitute any variables contained in the cloud config script
1690 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1692 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1695 # Extract the variable names
1697 for variable
in parts
[1::2]:
1698 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1700 # Iterate of the variables and substitute values from the
1702 for variable
in variables
:
1704 # Handle a reference to a VDU by ID
1705 if variable
.startswith('vdu['):
1706 value
= datastore
.get(variable
)
1708 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1709 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1711 config
= config
.replace("{{ %s }}" % variable
, value
)
1714 # Handle a reference to the current VDU
1715 if variable
.startswith('vdu'):
1716 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1717 config
= config
.replace("{{ %s }}" % variable
, value
)
1720 # Handle unrecognized variables
1721 msg
= 'unrecognized cloud-config variable: {}'
1722 raise ValueError(msg
.format(variable
))
1724 # Instantiate the VDU
1725 with self
._dts
.transaction() as xact
:
1726 self
._log
.debug("Instantiating vdu: %s", vdu
)
1727 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1728 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1729 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1732 # First create a set of tasks to monitor the state of the VDUs and
1733 # report when they have entered a terminal state
1734 for vdu
in self
._vdus
:
1735 self
._loop
.create_task(instantiate_monitor(vdu
))
1737 for vdu
in self
._vdus
:
1738 self
._loop
.create_task(instantiate(vdu
))
1740 def has_mgmt_interface(self
, vdu
):
1741 # ## TODO: Support additional mgmt_interface type options
1742 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1746 def vlr_xpath(self
, vlr_id
):
1748 return self
._project
.add_project("D,/vlr:vlr-catalog/"
1749 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1751 def ext_vlr_by_id(self
, vlr_id
):
1752 """ find ext vlr by id """
1753 return self
._ext
_vlrs
[vlr_id
]
1756 def publish_inventory(self
, xact
):
1757 """ Publish the inventory associated with this VNF """
1758 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1760 for component
in self
._rw
_vnfd
.component
:
1761 self
._log
.debug("Creating inventory component %s", component
)
1762 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1766 comp
= VcsComponent(dts
=self
._dts
,
1769 cluster_name
=self
._cluster
_name
,
1770 vcs_handler
=self
._vcs
_handler
,
1771 component
=component
,
1772 mangled_name
=mangled_name
,
1774 if comp
.name
in self
._inventory
:
1775 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1776 component
, self
._vnfd
_id
)
1778 self
._log
.debug("Adding component %s for vnrf %s",
1779 comp
.name
, self
._vnfr
_id
)
1780 self
._inventory
[comp
.name
] = comp
1781 yield from comp
.publish(xact
)
1783 def all_vdus_active(self
):
1784 """ Are all VDUS in this VNFR active? """
1785 for vdu
in self
._vdus
:
1789 self
._log
.debug("Inside all_vdus_active. Returning True")
1793 def instantiation_failed(self
, failed_reason
=None):
1794 """ VNFR instantiation failed """
1795 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1796 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1797 self
._state
_failed
_reason
= failed_reason
1799 # Update the VNFR with the changed status
1800 yield from self
.publish(None)
1804 """ This VNF is ready"""
1805 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1807 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1808 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1811 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1813 # Update the VNFR with the changed status
1814 yield from self
.publish(None)
1816 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1817 """Updated the connection point with ip address"""
1818 for cp
in self
._cprs
:
1819 if cp
.name
== cp_name
:
1820 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1821 cp_name
, cp
, ip_address
, cp_id
)
1822 cp
.ip_address
= ip_address
1823 cp
.mac_address
= mac_addr
1824 cp
.connection_point_id
= cp_id
1827 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1828 self
._log
.debug(err
)
1829 raise VirtualDeploymentUnitRecordError(err
)
1831 def set_state(self
, state
):
1832 """ Set state for this VNFR"""
1836 def instantiate(self
, xact
, restart_mode
=False):
1837 """ instantiate this VNF """
1838 self
._log
.info("Instantiate VNF {}: {}".format(self
._vnfr
_id
, self
._state
))
1839 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1840 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1845 # Iterate over all the connection points in VNFR and fetch the
1848 def cpr_from_cp(cp
):
1849 """ Creates a record level connection point from the desciptor cp"""
1850 cp_fields
= ["name", "image", "vm-flavor", "port_security_enabled"]
1851 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1853 cpr_dict
.update(cp_copy_dict
)
1854 return VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1856 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1857 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1859 for cp
in self
._vnfr
.connection_point
:
1860 cpr
= cpr_from_cp(cp
)
1861 self
._cprs
.append(cpr
)
1862 self
._log
.debug("Adding Connection point record %s ", cp
)
1864 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1865 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1866 res_iter
= yield from self
._dts
.query_read(vlr_path
,
1867 rwdts
.XactFlag
.MERGE
)
1871 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1872 cpr
.vlr_ref
= cp
.vlr_ref
1873 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1875 # Increase the VNFD reference count
1880 # Fetch External VLRs
1881 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1882 yield from fetch_vlrs()
1885 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1886 yield from self
.publish_inventory(xact
)
1889 self
._log
.debug("Create VLs {}: {}".format(self
._vnfr
_id
, self
._state
))
1890 yield from self
.create_vls()
1893 self
._log
.debug("Publish VNFR {}: {}".format(self
._vnfr
_id
, self
._state
))
1894 yield from self
.publish(xact
)
1898 self
._log
.debug("Instantiate VLs {}: {}".format(self
._vnfr
_id
, self
._state
))
1900 yield from self
.instantiate_vls(xact
, restart_mode
)
1901 except Exception as e
:
1902 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1903 yield from self
.instantiation_failed(str(e
))
1906 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1909 self
._log
.debug("Create VDUs {}: {}".format(self
._vnfr
_id
, self
._state
))
1910 yield from self
.create_vdus(self
, restart_mode
)
1913 yield from self
.vdu_cloud_init_instantiation()
1914 except Exception as e
:
1915 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1916 self
._state
_failed
_reason
= str(e
)
1917 yield from self
.publish(xact
)
1920 self
._log
.debug("VNFR {}: Publish VNFR with state {}".
1921 format(self
._vnfr
_id
, self
._state
))
1922 yield from self
.publish(xact
)
1925 # ToDo: Check if this should be prevented during restart
1926 self
._log
.debug("Instantiate VDUs {}: {}".format(self
._vnfr
_id
, self
._state
))
1927 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1930 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1931 yield from self
.publish(xact
)
1933 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1935 # create task updating uptime for this vnfr
1936 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1937 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1940 def terminate(self
, xact
):
1941 """ Terminate this virtual network function """
1943 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1945 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1948 if self
._vnf
_mon
is not None:
1949 self
._vnf
_mon
.stop()
1950 self
._vnf
_mon
.deregister()
1951 self
._vnf
_mon
= None
1954 def terminate_vls():
1955 """ Terminate VLs in this VNF """
1956 for vl
in self
._vlrs
:
1957 yield from vl
.terminate(xact
)
1960 def terminate_vdus():
1961 """ Terminate VDUS in this VNF """
1962 for vdu
in self
._vdus
:
1963 yield from vdu
.terminate(xact
)
1965 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1966 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1967 yield from terminate_vls()
1969 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1970 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1971 yield from terminate_vdus()
1973 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1974 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1977 def vnfr_uptime_update(self
, xact
):
1979 # Return when vnfr state is FAILED or TERMINATED etc
1980 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1981 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1982 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1983 VirtualNetworkFunctionRecordState
.READY
]:
1985 yield from self
.publish(xact
)
1986 yield from asyncio
.sleep(2, loop
=self
._loop
)
1990 class VnfdDtsHandler(object):
1991 """ DTS handler for VNFD config changes """
1992 XPATH
= "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
1994 def __init__(self
, dts
, log
, loop
, vnfm
):
2003 """ DTS registration handle """
2006 def deregister(self
):
2007 '''De-register from DTS'''
2008 self
._log
.debug("De-register VNFD DTS handler for project {}".
2009 format(self
._project
))
2011 self
._regh
.deregister()
2016 """ Register for VNFD configuration"""
2018 def on_apply(dts
, acg
, xact
, action
, scratch
):
2019 """Apply the configuration"""
2020 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2021 xact
, action
, scratch
)
2023 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
2026 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2027 """ on prepare callback """
2028 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
2029 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
2030 fref
= ProtobufC
.FieldReference
.alloc()
2031 fref
.goto_whole_message(msg
.to_pbcm())
2033 # Handle deletes in prepare_callback
2034 if fref
.is_field_deleted():
2035 # Delete an VNFD record
2036 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
2037 if self
._vnfm
.vnfd_in_use(msg
.id):
2038 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
2039 err
= "Cannot delete a VNFD in use - %s" % msg
2040 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
2041 # Delete a VNFD record
2042 yield from self
._vnfm
.delete_vnfd(msg
.id)
2044 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2046 xpath
= self
._vnfm
._project
.add_project(VnfdDtsHandler
.XPATH
)
2047 self
._log
.debug("Registering for VNFD config using xpath: {}".
2050 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2051 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2052 self
._regh
= acg
.register(
2054 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2055 on_prepare
=on_prepare
)
2058 class VcsComponentDtsHandler(object):
2059 """ Vcs Component DTS handler """
2060 XPATH
= ("D,/rw-manifest:manifest" +
2061 "/rw-manifest:operational-inventory" +
2062 "/rw-manifest:component")
2064 def __init__(self
, dts
, log
, loop
, vnfm
):
2073 """ DTS registration handle """
2076 def deregister(self
):
2077 '''De-register from DTS'''
2078 self
._log
.debug("De-register VCS DTS handler for project {}".
2079 format(self
._project
))
2081 self
._regh
.deregister()
2086 """ Registers VCS component dts publisher registration"""
2087 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
2088 VcsComponentDtsHandler
.XPATH
)
2090 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
2091 handlers
= rift
.tasklets
.Group
.Handler()
2092 with self
._dts
.group_create(handler
=handlers
) as group
:
2093 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
2095 flags
=(rwdts
.Flag
.PUBLISHER |
2096 rwdts
.Flag
.NO_PREP_READ |
2097 rwdts
.Flag
.DATASTORE
),)
2100 def publish(self
, xact
, path
, msg
):
2101 """ Publishes the VCS component """
2102 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
2104 self
.regh
.create_element(path
, msg
)
2105 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2106 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
2108 class VnfrConsoleOperdataDtsHandler(object):
2110 Registers 'D,/rw-project:project/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]'
2111 and handles CRUD from DTS
2115 def vnfr_vdu_console_xpath(self
):
2116 """ path for resource-mgr"""
2117 return self
._project
.add_project("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']" +
2118 "/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
2120 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2127 self
._vnfr
_id
= vnfr_id
2128 self
._vdur
_id
= vdur_id
2129 self
._vdu
_id
= vdu_id
2131 self
._project
= vnfm
._project
2133 def deregister(self
):
2134 '''De-register from DTS'''
2135 self
._log
.debug("De-register VNFR console DTS handler for project {}".
2136 format(self
._project
))
2138 self
._regh
.deregister()
2143 """ Register for VNFR VDU Operational Data read from dts """
2146 def on_prepare(xact_info
, action
, ks_path
, msg
):
2147 """ prepare callback from dts """
2148 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2150 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2151 xact_info
, action
, xpath
, msg
2154 if action
== rwdts
.QueryAction
.READ
:
2155 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur
.schema()
2156 path_entry
= schema
.keyspec_to_entry(ks_path
)
2157 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2159 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2160 except VnfRecordError
as e
:
2161 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2162 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2165 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2166 if not vdur
._state
== VDURecordState
.READY
:
2167 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2168 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2170 with self
._dts
.transaction() as new_xact
:
2171 resp
= yield from vdur
.read_resource(new_xact
)
2172 vdur_console
= RwVnfrYang
.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2173 vdur_console
.id = self
._vdur
_id
2174 if resp
.console_url
:
2175 vdur_console
.console_url
= resp
.console_url
2177 vdur_console
.console_url
= 'none'
2178 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2180 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2181 vdur_console
= RwVnfrYang
.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2182 vdur_console
.id = self
._vdur
_id
2183 vdur_console
.console_url
= 'none'
2185 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2186 xpath
=self
.vnfr_vdu_console_xpath
,
2189 #raise VnfRecordError("Not supported operation %s" % action)
2190 self
._log
.error("Not supported operation %s" % action
)
2191 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2195 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2196 self
.vnfr_vdu_console_xpath
)
2197 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2198 with self
._dts
.group_create() as group
:
2199 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2201 flags
=rwdts
.Flag
.PUBLISHER
,
2205 class VnfrDtsHandler(object):
2206 """ registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2207 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2209 def __init__(self
, dts
, log
, loop
, vnfm
):
2216 self
._project
= vnfm
._project
2220 """ Return registration handle"""
2225 """ Return VNF manager instance """
2228 def deregister(self
):
2229 '''De-register from DTS'''
2230 self
._log
.debug("De-register VNFR DTS handler for project {}".
2231 format(self
._project
))
2233 self
._regh
.deregister()
2238 """ Register for vnfr create/update/delete/read requests from dts """
2239 def on_commit(xact_info
):
2240 """ The transaction has been committed """
2241 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2242 return rwdts
.MemberRspCode
.ACTION_OK
2244 def on_abort(*args
):
2245 """ Abort callback """
2246 self
._log
.debug("VNF transaction got aborted")
2249 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2252 def instantiate_realloc_vnfr(vnfr
):
2253 """Re-populate the vnfm after restart
2260 yield from vnfr
.instantiate(None, restart_mode
=True)
2262 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2263 curr_cfg
= self
.regh
.elements
2264 for cfg
in curr_cfg
:
2265 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2266 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2268 self
._log
.debug("Got on_event in vnfm")
2270 return rwdts
.MemberRspCode
.ACTION_OK
2273 def on_prepare(xact_info
, action
, ks_path
, msg
):
2274 """ prepare callback from dts """
2276 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2277 xact_info
, action
, msg
2280 if action
== rwdts
.QueryAction
.CREATE
:
2281 if not msg
.has_field("vnfd"):
2282 err
= "Vnfd not provided"
2283 self
._log
.error(err
)
2284 raise VnfRecordError(err
)
2286 vnfr
= self
.vnfm
.create_vnfr(msg
)
2288 # RIFT-9105: Unable to add a READ query under an existing transaction
2289 # xact = xact_info.xact
2290 yield from vnfr
.instantiate(None)
2291 except Exception as e
:
2292 self
._log
.exception(e
)
2293 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2294 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2295 yield from vnfr
.publish(None)
2296 elif action
== rwdts
.QueryAction
.DELETE
:
2297 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.schema()
2298 path_entry
= schema
.keyspec_to_entry(ks_path
)
2299 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2302 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2303 raise VirtualNetworkFunctionRecordNotFound(
2304 "VNFR id %s", path_entry
.key00
.id)
2307 yield from vnfr
.terminate(xact_info
.xact
)
2310 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2311 except Exception as e
:
2312 self
._log
.exception(e
)
2313 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2315 elif action
== rwdts
.QueryAction
.UPDATE
:
2316 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.schema()
2317 path_entry
= schema
.keyspec_to_entry(ks_path
)
2320 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2321 except Exception as e
:
2322 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2323 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2327 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2328 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2331 self
._log
.debug("VNFR {} update config status {} (current {})".
2332 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2333 # Update the config status and publish
2334 vnfr
._config
_status
= msg
.config_status
2335 yield from vnfr
.publish(None)
2338 raise NotImplementedError(
2339 "%s action on VirtualNetworkFunctionRecord not supported",
2342 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2344 xpath
= self
._project
.add_project(VnfrDtsHandler
.XPATH
)
2345 self
._log
.debug("Registering for VNFR using xpath: {}".
2348 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2349 on_prepare
=on_prepare
,)
2350 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2351 with self
._dts
.group_create(handler
=handlers
) as group
:
2352 self
._regh
= group
.register(xpath
=xpath
,
2354 flags
=(rwdts
.Flag
.PUBLISHER |
2355 rwdts
.Flag
.NO_PREP_READ |
2357 rwdts
.Flag
.DATASTORE
),)
2360 def create(self
, xact
, xpath
, msg
):
2362 Create a VNFR record in DTS with path and message
2364 path
= self
._project
.add_project(xpath
)
2365 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2368 self
.regh
.create_element(path
, msg
)
2369 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2373 def update(self
, xact
, xpath
, msg
):
2375 Update a VNFR record in DTS with path and message
2377 path
= self
._project
.add_project(xpath
)
2378 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2380 self
.regh
.update_element(path
, msg
)
2381 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2385 def delete(self
, xact
, xpath
):
2387 Delete a VNFR record in DTS with path and message
2389 path
= self
._project
.add_project(xpath
)
2390 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2391 self
.regh
.delete_element(path
)
2392 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2395 class VnfdRefCountDtsHandler(object):
2396 """ The VNFD Ref Count DTS handler """
2397 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2399 def __init__(self
, dts
, log
, loop
, vnfm
):
2409 """ Return registration handle """
2414 """ Return the NS manager instance """
2417 def deregister(self
):
2418 '''De-register from DTS'''
2419 self
._log
.debug("De-register VNFD Ref DTS handler for project {}".
2420 format(self
._project
))
2422 self
._regh
.deregister()
2427 """ Register for VNFD ref count read from dts """
2430 def on_prepare(xact_info
, action
, ks_path
, msg
):
2431 """ prepare callback from dts """
2432 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2434 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2435 xact_info
, action
, xpath
, msg
2438 if action
== rwdts
.QueryAction
.READ
:
2439 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount
.schema()
2440 path_entry
= schema
.keyspec_to_entry(ks_path
)
2441 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2442 for xpath
, msg
in vnfd_list
:
2443 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2445 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2448 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2450 raise VnfRecordError("Not supported operation %s" % action
)
2452 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2453 with self
._dts
.group_create() as group
:
2454 self
._regh
= group
.register(xpath
=self
._vnfm
._project
.add_project(
2455 VnfdRefCountDtsHandler
.XPATH
),
2457 flags
=rwdts
.Flag
.PUBLISHER
,
2461 class VdurDatastore(object):
2463 This VdurDatastore is intended to expose select information about a VDUR
2464 such that it can be referenced in a cloud config file. The data that is
2465 exposed does not necessarily follow the structure of the data in the yang
2466 model. This is intentional. The data that are exposed are intended to be
2467 agnostic of the yang model so that changes in the model do not necessarily
2468 require changes to the interface provided to the user. It also means that
2469 the user does not need to be familiar with the RIFT.ware yang models.
2473 """Create an instance of VdurDatastore"""
2474 self
._vdur
_data
= dict()
2475 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2477 def add(self
, vdur
):
2478 """Add a new VDUR to the datastore
2481 vdur - a VirtualDeploymentUnitRecord instance
2484 A ValueError is raised if the VDUR is (1) None or (2) already in
2488 if vdur
.vdu_id
is None:
2489 raise ValueError('VDURs are required to have an ID')
2491 if vdur
.vdu_id
in self
._vdur
_data
:
2492 raise ValueError('cannot add a VDUR more than once')
2494 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2496 def set_if_not_none(key
, attr
):
2497 if attr
is not None:
2498 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2500 set_if_not_none('name', vdur
._vdud
.name
)
2501 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2503 def update(self
, vdur
):
2504 """Update the VDUR information in the datastore
2507 vdur - a GI representation of a VDUR
2510 A ValueError is raised if the VDUR is (1) None or (2) already in
2514 if vdur
.vdu_id
is None:
2515 raise ValueError('VNFDs are required to have an ID')
2517 if vdur
.vdu_id
not in self
._vdur
_data
:
2518 raise ValueError('VNF is not recognized')
2520 def set_or_delete(key
, attr
):
2522 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2523 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2526 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2528 set_or_delete('name', vdur
._vdud
.name
)
2529 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2531 def remove(self
, vdur_id
):
2532 """Remove all of the data associated with specified VDUR
2535 vdur_id - the identifier of a VNFD in the datastore
2538 A ValueError is raised if the VDUR is not contained in the
2542 if vdur_id
not in self
._vdur
_data
:
2543 raise ValueError('VNF is not recognized')
2545 del self
._vdur
_data
[vdur_id
]
2547 def get(self
, expr
):
2548 """Retrieve VDUR information from the datastore
2550 An expression should be of the form,
2554 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2555 the exposed attribute that the user wishes to retrieve.
2557 If the requested data is not available, None is returned.
2560 expr - a string that specifies the data to return
2563 A ValueError is raised if the provided expression cannot be parsed.
2566 The requested data or None
2569 result
= self
._pattern
.match(expr
)
2571 raise ValueError('data expression not recognized ({})'.format(expr
))
2573 vdur_id
, key
= result
.groups()
2575 if vdur_id
not in self
._vdur
_data
:
2578 return self
._vdur
_data
[vdur_id
].get(key
, None)
2581 class VnfManager(object):
2582 """ The virtual network function manager class """
2583 def __init__(self
, dts
, log
, loop
, project
, cluster_name
):
2587 self
._project
= project
2588 self
._cluster
_name
= cluster_name
2590 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2591 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2592 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2593 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(
2594 log
, dts
, loop
, project
, callback
=self
.handle_nsr
)
2596 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2599 self
._vnfr
_ref
_handler
,
2602 self
._vnfds
_to
_vnfr
= {}
2606 def vnfr_handler(self
):
2607 """ VNFR dts handler """
2608 return self
._vnfr
_handler
2611 def vcs_handler(self
):
2612 """ VCS dts handler """
2613 return self
._vcs
_handler
2617 """ Register all static DTS handlers """
2618 for hdl
in self
._dts
_handlers
:
2619 yield from hdl
.register()
2621 def deregister(self
):
2622 self
.log
.debug("De-register VNFM project {}".format(self
.name
))
2623 for hdl
in self
._dts
_handlers
:
2624 yield from hdl
.deregister()
2628 """ Run this VNFM instance """
2629 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2630 yield from self.register()
2632 def handle_nsr(self, nsr, action):
2633 if action in [rwdts.QueryAction.CREATE]:
2634 self._nsrs[nsr.id] = nsr
2635 elif action == rwdts.QueryAction.DELETE:
2636 if nsr.id in self._nsrs:
2637 del self._nsrs[nsr.id]
2639 def get_linked_mgmt_network(self, vnfr):
2640 """For the given VNFR get the related mgmt network from the NSD, if
2643 vnfd_id = vnfr.vnfd.id
2644 nsr_id = vnfr.nsr_id_ref
2646 # for the given related VNFR, get the corresponding NSR-config
2649 nsr_obj = self._nsrs[nsr_id]
2651 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2653 # for the related NSD check if a VLD exists such that it's a mgmt
2655 for vld in nsr_obj.nsd.vld:
2656 if vld.mgmt_network:
2661 def get_vnfr(self, vnfr_id):
2662 """ get VNFR by vnfr id """
2664 if vnfr_id not in self._vnfrs:
2665 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2667 return self._vnfrs[vnfr_id]
2669 def create_vnfr(self, vnfr):
2670 """ Create a VNFR instance """
2671 if vnfr.id in self._vnfrs:
2672 msg = "Vnfr
id %s already exists
" % vnfr.id
2673 self._log.error(msg)
2674 raise VnfRecordError(msg)
2676 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2680 mgmt_network = self.get_linked_mgmt_network(vnfr)
2682 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2683 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2684 mgmt_network=mgmt_network
2688 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2689 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2691 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2693 return self._vnfrs[vnfr.id]
2696 def delete_vnfr(self, xact, vnfr):
2697 """ Create a VNFR instance """
2698 if vnfr.vnfr_id in self._vnfrs:
2699 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2700 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2702 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2703 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2704 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2706 del self._vnfrs[vnfr.vnfr_id]
2709 def fetch_vnfd(self, vnfd_id):
2710 """ Fetch VNFDs based with the vnfd id"""
2711 vnfd_path = self._project.add_project(
2712 VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id))
2713 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2716 res_iter = yield from self._dts.query_read(vnfd_path,
2717 rwdts.XactFlag.MERGE)
2719 for ent in res_iter:
2720 res = yield from ent
2724 err = "Failed to get Vnfd
%s" % vnfd_id
2725 self._log.error(err)
2726 raise VnfRecordError(err)
2728 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2732 def vnfd_in_use(self, vnfd_id):
2733 """ Is this VNFD in use """
2734 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2735 if vnfd_id in self._vnfds_to_vnfr:
2736 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2740 def publish_vnfr(self, xact, path, msg):
2741 """ Publish a VNFR """
2742 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2744 yield from self.vnfr_handler.update(xact, path, msg)
2747 def delete_vnfd(self, vnfd_id):
2748 """ Delete the Virtual Network Function descriptor with the passed id """
2749 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2750 if vnfd_id in self._vnfds_to_vnfr:
2751 if self._vnfds_to_vnfr[vnfd_id]:
2752 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2754 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2755 raise VirtualNetworkFunctionDescriptorRefCountExists(
2756 "Cannot delete
:%s, ref_count
:%s",
2758 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2760 del self._vnfds_to_vnfr[vnfd_id]
2762 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2764 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2765 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2766 if os.path.exists(vnfd_dir):
2767 shutil.rmtree(vnfd_dir, ignore_errors=True)
2768 except Exception as e:
2769 self._log.error("Exception in cleaning up VNFD
{}: {}".
2770 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2771 self._log.exception(e)
2774 def vnfd_refcount_xpath(self, vnfd_id):
2775 """ xpath for ref count entry """
2776 return self._project.add_project(VnfdRefCountDtsHandler.XPATH +
2777 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2780 def get_vnfd_refcount(self, vnfd_id):
2781 """ Get the vnfd_list from this VNFM"""
2783 if vnfd_id is None or vnfd_id == "":
2784 for vnfd in self._vnfds_to_vnfr.keys():
2785 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2786 vnfd_msg.vnfd_id_ref = vnfd
2787 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2788 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2789 elif vnfd_id in self._vnfds_to_vnfr:
2790 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2791 vnfd_msg.vnfd_id_ref = vnfd_id
2792 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2793 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2798 class VnfmProject(ManoProject):
2800 def __init__(self, name, tasklet, **kw):
2801 super(VnfmProject, self).__init__(tasklet.log, name)
2802 self.update(tasklet)
2807 def register (self):
2809 vm_parent_name = self._tasklet.tasklet_info.get_parent_vm_parent_instance_name()
2810 assert vm_parent_name is not None
2811 self._vnfm = VnfManager(self._dts, self.log, self.loop, self, vm_parent_name)
2812 yield from self._vnfm.run()
2814 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2817 def deregister(self):
2818 self._log.debug("De
-register project
{} for VnfmProject
".
2820 self._vnfm.deregister()
2823 class VnfmTasklet(rift.tasklets.Tasklet):
2824 """ VNF Manager tasklet class """
2825 def __init__(self, *args, **kwargs):
2826 super(VnfmTasklet, self).__init__(*args, **kwargs)
2827 self.rwlog.set_category("rw
-mano
-log
")
2828 self.rwlog.set_subcategory("vnfm
")
2831 self._project_handler = None
2840 super(VnfmTasklet, self).start()
2841 self.log.info("Starting VnfmTasklet
")
2843 self.log.setLevel(logging.DEBUG)
2845 self.log.debug("Registering with dts
")
2846 self._dts = rift.tasklets.DTS(self.tasklet_info,
2847 RwVnfmYang.get_schema(),
2849 self.on_dts_state_change)
2851 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2853 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2856 def on_instance_started(self):
2857 """ Task insance started callback """
2858 self.log.debug("Got instance started callback
")
2864 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2869 """ Task init callback """
2870 self.log.debug("creating project handler
")
2871 self.project_handler = ProjectHandler(self, VnfmProject)
2872 self.project_handler.register()
2876 """ Task run callback """
2880 def on_dts_state_change(self, state):
2881 """Take action according to current dts state to transition
2882 application into the corresponding application state
2885 state - current dts state
2888 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2889 rwdts.State.CONFIG: rwdts.State.RUN,
2893 rwdts.State.INIT: self.init,
2894 rwdts.State.RUN: self.run,
2897 # Transition application to next state
2898 handler = handlers.get(state, None)
2899 if handler is not None:
2900 yield from handler()
2902 # Transition dts to next state
2903 next_state = switch.get(state, None)
2904 if next_state is not None:
2905 self._dts.handle.set_state(next_state)