2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
54 class VMResourceError(Exception):
55 """ VM resource Error"""
59 class VnfRecordError(Exception):
60 """ VNF record instatiation failed"""
64 class VduRecordError(Exception):
65 """ VDU record instatiation failed"""
69 class NotImplemented(Exception):
70 """Not implemented """
74 class VnfrRecordExistsError(Exception):
75 """VNFR record already exist with the same VNFR id"""
79 class InternalVirtualLinkRecordError(Exception):
80 """Internal virtual link record error"""
84 class VDUImageNotFound(Exception):
85 """VDU Image not found error"""
89 class VirtualDeploymentUnitRecordError(Exception):
90 """VDU Instantiation failed"""
94 class VMNotReadyError(Exception):
95 """ VM Not yet received from resource manager """
99 class VDURecordNotFound(Exception):
100 """ Could not find a VDU record """
104 class VirtualNetworkFunctionRecordDescNotFound(Exception):
105 """ Cannot find Virtual Network Function Record Descriptor """
109 class VirtualNetworkFunctionDescriptorError(Exception):
110 """ Virtual Network Function Record Descriptor Error """
114 class VirtualNetworkFunctionDescriptorNotFound(Exception):
115 """ Virtual Network Function Record Descriptor Not Found """
119 class VirtualNetworkFunctionRecordNotFound(Exception):
120 """ Virtual Network Function Record Not Found """
124 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
125 """ Virtual Network Funtion Descriptor reference count exists """
129 class VnfrInstantiationFailed(Exception):
130 """ Virtual Network Funtion Instantiation failed"""
134 class VNFMPlacementGroupError(Exception):
137 class VirtualNetworkFunctionRecordState(enum
.Enum
):
144 VL_TERMINATE_PHASE
= 6
145 VDU_TERMINATE_PHASE
= 7
150 class VDURecordState(enum
.Enum
):
151 """VDU record state """
154 RESOURCE_ALLOC_PENDING
= 3
161 class VcsComponent(object):
162 """ VCS Component within the VNF descriptor """
163 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
167 self
._component
= component
168 self
._cluster
_name
= cluster_name
169 self
._vcs
_handler
= vcs_handler
170 self
._mangled
_name
= mangled_name
173 def mangle_name(component_name
, vnf_name
, vnfd_id
):
174 """ mangled component name """
175 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
179 """ name of this component"""
180 return self
._mangled
_name
184 """ The path for this object """
185 return("D,/rw-manifest:manifest" +
186 "/rw-manifest:operational-inventory" +
187 "/rw-manifest:component" +
188 "[rw-manifest:component-name = '{}']").format(self
.name
)
191 def instance_xpath(self
):
192 """ The path for this object """
193 return("D,/rw-base:vcs" +
196 "[instance-name = '{}']".format(self
._cluster
_name
))
199 def start_comp_xpath(self
):
200 """ start component xpath """
201 return (self
.instance_xpath
+
202 "/child-n[instance-name = 'START-REQ']")
204 def get_start_comp_msg(self
, ip_address
):
205 """ start this component """
206 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
207 start_msg
.instance_name
= 'START-REQ'
208 start_msg
.component_name
= self
.name
209 start_msg
.admin_command
= "START"
210 start_msg
.ip_address
= ip_address
216 """ Returns the message for this vcs component"""
218 vcs_comp_dict
= self
._component
.as_dict()
220 def mangle_comp_names(comp_dict
):
221 """ mangle component name with VNF name, id"""
222 for key
, val
in comp_dict
.items():
223 if isinstance(val
, dict):
224 comp_dict
[key
] = mangle_comp_names(val
)
225 elif isinstance(val
, list):
228 if isinstance(ent
, dict):
229 val
[i
] = mangle_comp_names(ent
)
233 elif key
== "component_name":
234 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
239 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
240 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
244 def publish(self
, xact
):
245 """ Publishes the VCS component """
246 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
247 self
.name
, self
.path
, self
.msg
)
248 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
251 def start(self
, xact
, parent
, ip_addr
=None):
252 """ Starts this VCS component """
253 # ATTN RV - replace with block add
254 start_msg
= self
.get_start_comp_msg(ip_addr
)
255 self
._log
.debug("starting component %s %s",
256 self
.start_comp_xpath
, start_msg
)
257 yield from self
._dts
.query_create(self
.start_comp_xpath
,
260 self
._log
.debug("started component %s, %s",
261 self
.start_comp_xpath
, start_msg
)
264 class VirtualDeploymentUnitRecord(object):
265 """ Virtual Deployment Unit Record """
276 placement_groups
=[]):
282 self
._mgmt
_intf
= mgmt_intf
283 self
._cloud
_account
_name
= cloud_account_name
284 self
._vnfd
_package
_store
= vnfd_package_store
286 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
289 self
._state
= VDURecordState
.INIT
290 self
._state
_failed
_reason
= None
291 self
._request
_id
= str(uuid
.uuid4())
292 self
._name
= vnfr
.name
+ "__" + vdud
.id
293 self
._placement
_groups
= placement_groups
296 self
._vdud
_cloud
_init
= None
297 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
300 def vdu_opdata_register(self
):
301 yield from self
._vdur
_console
_handler
.register()
303 def cp_ip_addr(self
, cp_name
):
304 """ Find ip address by connection point name """
305 if self
._vm
_resp
is not None:
306 for conn_point
in self
._vm
_resp
.connection_points
:
307 if conn_point
.name
== cp_name
:
308 return conn_point
.ip_address
311 def cp_mac_addr(self
, cp_name
):
312 """ Find mac address by connection point name """
313 if self
._vm
_resp
is not None:
314 for conn_point
in self
._vm
_resp
.connection_points
:
315 if conn_point
.name
== cp_name
:
316 return conn_point
.mac_addr
317 return "00:00:00:00:00:00"
319 def cp_id(self
, cp_name
):
320 """ Find connection point id by connection point name """
321 if self
._vm
_resp
is not None:
322 for conn_point
in self
._vm
_resp
.connection_points
:
323 if conn_point
.name
== cp_name
:
324 return conn_point
.connection_point_id
337 """ Return this VDUR's name """
341 def cloud_account_name(self
):
342 """ Cloud account this VDU should be created in """
343 return self
._cloud
_account
_name
346 def image_name(self
):
347 """ name that should be used to lookup the image on the CMP """
348 return os
.path
.basename(self
._vdud
.image
)
351 def image_checksum(self
):
352 """ name that should be used to lookup the image on the CMP """
353 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
356 def management_ip(self
):
359 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
362 def vm_management_ip(self
):
365 return self
._vm
_resp
.management_ip
368 def operational_status(self
):
369 """ Operational status of this VDU"""
370 op_stats_dict
= {"INIT": "init",
371 "INSTANTIATING": "vm_init_phase",
372 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
375 "TERMINATING": "terminated",
376 "TERMINATED": "terminated",
378 return op_stats_dict
[self
._state
.name
]
383 vdu_fields
= ["vm_flavor",
389 vdu_copy_dict
= {k
: v
for k
, v
in
390 self
._vdud
.as_dict().items() if k
in vdu_fields
}
391 vdur_dict
= {"id": self
._vdur
_id
,
392 "vdu_id_ref": self
._vdud
.id,
393 "operational_status": self
.operational_status
,
394 "operational_status_details": self
._state
_failed
_reason
,
396 if self
.vm_resp
is not None:
397 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
398 "flavor_id": self
.vm_resp
.flavor_id
,
399 "image_id": self
.vm_resp
.image_id
,
402 if self
.management_ip
is not None:
403 vdur_dict
["management_ip"] = self
.management_ip
405 if self
.vm_management_ip
is not None:
406 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
408 vdur_dict
.update(vdu_copy_dict
)
413 for intf
, cp_id
, vlr
in self
._int
_intf
:
414 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
416 icp_list
.append({"name": cp
.name
,
418 "type_yang": "VPORT",
419 "ip_address": self
.cp_ip_addr(cp
.id),
420 "mac_address": self
.cp_mac_addr(cp
.id)})
422 ii_list
.append({"name": intf
.name
,
423 "vdur_internal_connection_point_ref": cp
.id,
424 "virtual_interface": {}})
426 vdur_dict
["internal_connection_point"] = icp_list
427 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
428 vdur_dict
["internal_interface"] = ii_list
431 for intf
, cp
, vlr
in self
._ext
_intf
:
432 ei_list
.append({"name": cp
,
433 "vnfd_connection_point_ref": cp
,
434 "virtual_interface": {}})
435 self
._vnfr
.update_cp(cp
,
437 self
.cp_mac_addr(cp
),
440 vdur_dict
["external_interface"] = ei_list
442 placement_groups
= []
443 for group
in self
._placement
_groups
:
444 placement_groups
.append(group
.as_dict())
446 vdur_dict
['placement_groups_info'] = placement_groups
447 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
450 def resmgr_path(self
):
451 """ path for resource-mgr"""
452 return ("D,/rw-resource-mgr:resource-mgmt" +
454 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
457 def vm_flavor_msg(self
):
458 """ VM flavor message """
459 flavor
= self
._vdud
.vm_flavor
.__class
__()
460 flavor
.copy_from(self
._vdud
.vm_flavor
)
465 def vdud_cloud_init(self
):
466 """ Return the cloud-init contents for the VDU """
467 if self
._vdud
_cloud
_init
is None:
468 self
._vdud
_cloud
_init
= self
.cloud_init()
470 return self
._vdud
_cloud
_init
472 def cloud_init(self
):
473 """ Populate cloud_init with cloud-config script from
474 either the inline contents or from the file provided
476 if self
._vdud
.cloud_init
is not None:
477 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
478 return self
._vdud
.cloud_init
479 elif self
._vdud
.cloud_init_file
is not None:
480 # Get cloud-init script contents from the file provided in the cloud_init_file param
481 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
482 filename
= self
._vdud
.cloud_init_file
483 self
._vnfd
_package
_store
.refresh()
484 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
485 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
487 return cloud_init_extractor
.read_script(stored_package
, filename
)
488 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
489 raise VirtualDeploymentUnitRecordError(e
)
491 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
493 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
495 availability_zones
= []
497 for group
in self
._placement
_groups
:
498 if group
.has_field('host_aggregate'):
499 for aggregate
in group
.host_aggregate
:
500 host_aggregates
.append(aggregate
.as_dict())
501 if group
.has_field('availability_zone'):
502 availability_zones
.append(group
.availability_zone
.as_dict())
503 if group
.has_field('server_group'):
504 server_groups
.append(group
.server_group
.as_dict())
506 if availability_zones
:
507 if len(availability_zones
) > 1:
508 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
509 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
511 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
514 if len(server_groups
) > 1:
515 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
516 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
518 vm_create_msg_dict
['server_group'] = server_groups
[0]
521 vm_create_msg_dict
['host_aggregate'] = host_aggregates
525 def process_placement_groups(self
, vm_create_msg_dict
):
526 """Process the placement_groups and fill resource-mgr request"""
527 if not self
._placement
_groups
:
530 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
531 assert len(cloud_set
) == 1
532 cloud_type
= cloud_set
.pop()
534 if cloud_type
== 'openstack':
535 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
538 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
541 def resmgr_msg(self
, config
=None):
542 vdu_fields
= ["vm_flavor",
548 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
549 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
551 vm_create_msg_dict
= {
553 "image_name": self
.image_name
,
556 if self
.image_checksum
is not None:
557 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
559 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
560 if self
._vdud
.has_field('mgmt_vpci'):
561 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
563 self
._log
.debug("VDUD: %s", self
._vdud
)
564 if config
is not None:
565 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
568 for intf
, cp
, vlr
in self
._ext
_intf
:
569 cp_info
= {"name": cp
,
570 "virtual_link_id": vlr
.network_id
,
571 "type_yang": intf
.virtual_interface
.type_yang
}
573 if (intf
.virtual_interface
.has_field('vpci') and
574 intf
.virtual_interface
.vpci
is not None):
575 cp_info
["vpci"] = intf
.virtual_interface
.vpci
577 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
578 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
580 cp_list
.append(cp_info
)
582 for intf
, cp
, vlr
in self
._int
_intf
:
583 if (intf
.virtual_interface
.has_field('vpci') and
584 intf
.virtual_interface
.vpci
is not None):
585 cp_list
.append({"name": cp
,
586 "virtual_link_id": vlr
.network_id
,
587 "type_yang": intf
.virtual_interface
.type_yang
,
588 "vpci": intf
.virtual_interface
.vpci
})
590 cp_list
.append({"name": cp
,
591 "virtual_link_id": vlr
.network_id
,
592 "type_yang": intf
.virtual_interface
.type_yang
})
594 vm_create_msg_dict
["connection_points"] = cp_list
595 vm_create_msg_dict
.update(vdu_copy_dict
)
597 self
.process_placement_groups(vm_create_msg_dict
)
599 msg
= RwResourceMgrYang
.VDUEventData()
600 msg
.event_id
= self
._request
_id
601 msg
.cloud_account
= self
.cloud_account_name
602 msg
.request_info
.from_dict(vm_create_msg_dict
)
606 def terminate(self
, xact
):
607 """ Delete resource in VIM """
608 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
609 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
612 self
._state
= VDURecordState
.TERMINATING
613 if self
._vm
_resp
is not None:
615 with self
._dts
.transaction() as new_xact
:
616 yield from self
.delete_resource(new_xact
)
618 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
620 if self
._rm
_regh
is not None:
621 self
._log
.debug("Deregistering resource manager registration handle")
622 self
._rm
_regh
.deregister()
625 if self
._vdur
_console
_handler
is not None:
626 self
._log
.error("Deregistering vnfr vdur registration handle")
627 self
._vdur
_console
_handler
._regh
.deregister()
628 self
._vdur
_console
_handler
._regh
= None
630 self
._state
= VDURecordState
.TERMINATED
632 def find_internal_cp_by_cp_id(self
, cp_id
):
633 """ Find the CP corresponding to the connection point id"""
636 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
639 for int_cp
in self
._vdud
.internal_connection_point
:
640 self
._log
.debug("Checking for int cp %s in internal connection points",
642 if int_cp
.id == cp_id
:
647 self
._log
.debug("Failed to find cp %s in internal connection points",
649 msg
= "Failed to find cp %s in internal connection points" % cp_id
650 raise VduRecordError(msg
)
652 # return the VLR associated with the connection point
656 def create_resource(self
, xact
, vnfr
, config
=None):
657 """ Request resource from ResourceMgr """
658 def find_cp_by_name(cp_name
):
659 """ Find a connection point by name """
661 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
662 for ext_cp
in vnfr
._cprs
:
663 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
664 if ext_cp
.name
== cp_name
:
668 self
._log
.debug("Failed to find cp %s in external connection points",
672 def find_internal_vlr_by_cp_name(cp_name
):
673 """ Find the VLR corresponding to the connection point name"""
676 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
679 for int_cp
in self
._vdud
.internal_connection_point
:
680 self
._log
.debug("Checking for int cp %s in internal connection points",
682 if int_cp
.id == cp_name
:
687 self
._log
.debug("Failed to find cp %s in internal connection points",
689 msg
= "Failed to find cp %s in internal connection points" % cp_name
690 raise VduRecordError(msg
)
692 # return the VLR associated with the connection point
693 return vnfr
.find_vlr_by_cp(cp_name
)
695 block
= xact
.block_create()
697 self
._log
.debug("Executing vm request id: %s, action: create",
700 # Resolve the networks associated external interfaces
701 for ext_intf
in self
._vdud
.external_interface
:
702 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
703 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
704 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
706 self
._log
.debug("Failed to find connection point - %s",
707 ext_intf
.vnfd_connection_point_ref
)
709 self
._log
.debug("Connection point name [%s], type[%s]",
710 cp
.name
, cp
.type_yang
)
712 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
714 etuple
= (ext_intf
, cp
.name
, vlr
)
715 self
._ext
_intf
.append(etuple
)
717 self
._log
.debug("Created external interface tuple : %s", etuple
)
719 # Resolve the networks associated internal interfaces
720 for intf
in self
._vdud
.internal_interface
:
721 cp_id
= intf
.vdu_internal_connection_point_ref
722 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
726 vlr
= find_internal_vlr_by_cp_name(cp_id
)
727 except Exception as e
:
728 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
729 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
730 raise VduRecordError(msg
)
732 ituple
= (intf
, cp_id
, vlr
)
733 self
._int
_intf
.append(ituple
)
735 self
._log
.debug("Created internal interface tuple : %s", ituple
)
737 resmgr_path
= self
.resmgr_path
738 resmgr_msg
= self
.resmgr_msg(config
)
740 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
741 block
.add_query_create(resmgr_path
, resmgr_msg
)
743 res_iter
= yield from block
.execute(now
=True)
751 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
752 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
753 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
754 return resp
.resource_info
757 def delete_resource(self
, xact
):
758 block
= xact
.block_create()
760 self
._log
.debug("Executing vm request id: %s, action: delete",
763 block
.add_query_delete(self
.resmgr_path
)
765 yield from block
.execute(flags
=0, now
=True)
768 def read_resource(self
, xact
):
769 block
= xact
.block_create()
771 self
._log
.debug("Executing vm request id: %s, action: delete",
774 block
.add_query_read(self
.resmgr_path
)
776 res_iter
= yield from block
.execute(flags
=0, now
=True)
781 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
782 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
783 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
784 #self._vm_resp = resp.resource_info
785 return resp
.resource_info
789 def start_component(self
):
790 """ This VDUR is active """
791 self
._log
.debug("Starting component %s for vdud %s vdur %s",
792 self
._vdud
.vcs_component_ref
,
795 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
796 self
.vm_resp
.management_ip
)
800 """ Is this VDU active """
801 return True if self
._state
is VDURecordState
.READY
else False
804 def instantiation_failed(self
, failed_reason
=None):
805 """ VDU instantiation failed """
806 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
807 self
._state
= VDURecordState
.FAILED
808 self
._state
_failed
_reason
= failed_reason
809 yield from self
._vnfr
.instantiation_failed(failed_reason
)
812 def vdu_is_active(self
):
813 """ This VDU is active"""
815 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
818 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
820 if self
._vdud
.vcs_component_ref
is not None:
821 yield from self
.start_component()
823 self
._state
= VDURecordState
.READY
825 if self
._vnfr
.all_vdus_active():
826 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
827 yield from self
._vnfr
.is_ready()
830 def instantiate(self
, xact
, vnfr
, config
=None):
831 """ Instantiate this VDU """
832 self
._state
= VDURecordState
.INSTANTIATING
835 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
836 """ This VDUR is active """
837 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
842 if (query_action
== rwdts
.QueryAction
.UPDATE
or
843 query_action
== rwdts
.QueryAction
.CREATE
):
846 if msg
.resource_state
== "active":
847 # Move this VDU to ready state
848 yield from self
.vdu_is_active()
849 elif msg
.resource_state
== "failed":
850 yield from self
.instantiation_failed(msg
.resource_errors
)
851 elif query_action
== rwdts
.QueryAction
.DELETE
:
852 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
854 raise NotImplementedError(
855 "%s action on VirtualDeployementUnitRecord not supported",
858 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
861 reg_event
= asyncio
.Event(loop
=self
._loop
)
864 def on_ready(regh
, status
):
867 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
868 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
869 flags
=rwdts
.Flag
.SUBSCRIBER
,
871 yield from reg_event
.wait()
873 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
874 self
._vm
_resp
= vm_resp
876 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
877 self
._log
.debug("Requested VM from resource manager response %s",
879 if vm_resp
.resource_state
== "active":
880 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
882 yield from self
.vdu_is_active()
883 self
._state
= VDURecordState
.READY
884 elif (vm_resp
.resource_state
== "pending" or
885 vm_resp
.resource_state
== "inactive"):
886 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
888 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
889 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
890 # flags=rwdts.Flag.SUBSCRIBER,
893 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
895 raise VirtualDeploymentUnitRecordError(
896 "Failed VDUR instantiation %s " % vm_resp
)
898 except Exception as e
:
900 traceback
.print_exc()
901 self
._log
.exception(e
)
902 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
903 self
._state
= VDURecordState
.FAILED
904 yield from self
.instantiation_failed(str(e
))
907 class VlRecordState(enum
.Enum
):
908 """ VL Record State """
910 INSTANTIATION_PENDING
= 102
912 TERMINATE_PENDING
= 104
917 class InternalVirtualLinkRecord(object):
918 """ Internal Virtual Link record """
919 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
923 self
._ivld
_msg
= ivld_msg
924 self
._vnfr
_name
= vnfr_name
925 self
._cloud
_account
_name
= cloud_account_name
927 self
._vlr
_req
= self
.create_vlr()
929 self
._state
= VlRecordState
.INIT
933 """ Find VLR by id """
934 return self
._vlr
_req
.id
938 """ Name of this VL """
939 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
942 def network_id(self
):
943 """ Find VLR by id """
944 return self
._vlr
.network_id
if self
._vlr
else None
947 """ VLR path for this VLR instance"""
948 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
950 def create_vlr(self
):
951 """ Create the VLR record which will be instantiated """
953 vld_fields
= ["short_name",
960 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
962 vlr_dict
= {"id": str(uuid
.uuid4()),
964 "cloud_account": self
._cloud
_account
_name
,
966 vlr_dict
.update(vld_copy_dict
)
968 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
972 def instantiate(self
, xact
, restart_mode
=False):
973 """ Instantiate VL """
976 def instantiate_vlr():
977 """ Instantiate VLR"""
978 self
._log
.debug("Create VL with xpath %s and vlr %s",
979 self
.vlr_path(), self
._vlr
_req
)
981 with self
._dts
.transaction(flags
=0) as xact
:
982 block
= xact
.block_create()
983 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
984 self
._log
.debug("Executing VL create path:%s msg:%s",
985 self
.vlr_path(), self
._vlr
_req
)
989 res_iter
= yield from block
.execute()
991 self
._state
= VlRecordState
.FAILED
992 self
._log
.exception("Caught exception while instantial VL")
997 self
._vlr
= res
.result
999 if self
._vlr
.operational_status
== 'failed':
1000 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1001 self
._state
= VlRecordState
.FAILED
1002 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1004 self
._log
.info("Created VL with xpath %s and vlr %s",
1005 self
.vlr_path(), self
._vlr
)
1009 """ Get the network id """
1010 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1012 for ent
in res_iter
:
1013 res
= yield from ent
1017 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1019 raise InternalVirtualLinkRecordError(err
)
1022 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1025 vl
= yield from get_vlr()
1027 yield from instantiate_vlr()
1029 yield from instantiate_vlr()
1031 self
._state
= VlRecordState
.ACTIVE
1033 def vlr_in_vns(self
):
1034 """ Is there a VLR record in VNS """
1035 if (self
._state
== VlRecordState
.ACTIVE
or
1036 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1037 self
._state
== VlRecordState
.FAILED
):
1043 def terminate(self
, xact
):
1044 """Terminate this VL """
1045 if not self
.vlr_in_vns():
1046 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1047 self
.vlr_id
, self
._state
)
1050 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1051 self
._state
= VlRecordState
.TERMINATE_PENDING
1052 block
= xact
.block_create()
1053 block
.add_query_delete(self
.vlr_path())
1054 yield from block
.execute(flags
=0, now
=True)
1055 self
._state
= VlRecordState
.TERMINATED
1056 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1059 class VirtualNetworkFunctionRecord(object):
1060 """ Virtual Network Function Record """
1061 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
):
1065 self
._cluster
_name
= cluster_name
1066 self
._vnfr
_msg
= vnfr_msg
1067 self
._vnfr
_id
= vnfr_msg
.id
1068 self
._vnfd
_id
= vnfr_msg
.vnfd_ref
1070 self
._vcs
_handler
= vcs_handler
1071 self
._vnfr
= vnfr_msg
1074 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1075 self
._state
_failed
_reason
= None
1076 self
._ext
_vlrs
= {} # The list of external virtual links
1077 self
._vlrs
= [] # The list of internal virtual links
1078 self
._vdus
= [] # The list of vdu
1079 self
._vlr
_by
_cp
= {}
1081 self
._inventory
= {}
1082 self
._create
_time
= int(time
.time())
1083 self
._vnf
_mon
= None
1084 self
._config
_status
= vnfr_msg
.config_status
1085 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1087 def _get_vdur_from_vdu_id(self
, vdu_id
):
1088 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1089 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1090 for vdu
in self
._vdus
:
1091 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1092 if vdu
.vdu_id
== vdu_id
:
1095 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1098 def operational_status(self
):
1099 """ Operational status of this VNFR """
1100 op_status_map
= {"INIT": "init",
1101 "VL_INIT_PHASE": "vl_init_phase",
1102 "VM_INIT_PHASE": "vm_init_phase",
1104 "TERMINATE": "terminate",
1105 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1106 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1107 "TERMINATED": "terminated",
1108 "FAILED": "failed", }
1109 return op_status_map
[self
._state
.name
]
1112 def vnfd_xpath(self
):
1113 """ VNFD xpath associated with this VNFR """
1114 return("C,/vnfd:vnfd-catalog/"
1115 "vnfd:vnfd[vnfd:id = '{}']".format(self
._vnfd
_id
))
1119 """ VNFD for this VNFR """
1124 """ VNFD name associated with this VNFR """
1125 return self
.vnfd
.name
1129 """ Name of this VNF in the record """
1130 return self
._vnfr
.name
1133 def cloud_account_name(self
):
1134 """ Name of the cloud account this VNFR is instantiated in """
1135 return self
._vnfr
.cloud_account
1139 """ VNFD Id associated with this VNFR """
1144 """ VNFR Id associated with this VNFR """
1145 return self
._vnfr
_id
1148 def member_vnf_index(self
):
1149 """ Member VNF index associated with this VNFR """
1150 return self
._vnfr
.member_vnf_index_ref
1153 def config_status(self
):
1154 """ Config agent status for this VNFR """
1155 return self
._config
_status
1157 def component_by_name(self
, component_name
):
1158 """ Find a component by name in the inventory list"""
1159 mangled_name
= VcsComponent
.mangle_name(component_name
,
1162 return self
._inventory
[mangled_name
]
1167 def get_nsr_config(self
):
1168 ### Need access to NS instance configuration for runtime resolution.
1169 ### This shall be replaced when deployment flavors are implemented
1170 xpath
= "C,/nsr:ns-instance-config"
1171 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1173 for result
in results
:
1174 entry
= yield from result
1175 ns_instance_config
= entry
.result
1176 for nsr
in ns_instance_config
.nsr
:
1177 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1182 def start_component(self
, component_name
, ip_addr
):
1183 """ Start a component in the VNFR by name """
1184 comp
= self
.component_by_name(component_name
)
1185 yield from comp
.start(None, None, ip_addr
)
1187 def cp_ip_addr(self
, cp_name
):
1188 """ Get ip address for connection point """
1189 self
._log
.debug("cp_ip_addr()")
1190 for cp
in self
._cprs
:
1191 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1192 return cp
.ip_address
1195 def mgmt_intf_info(self
):
1196 """ Get Management interface info for this VNFR """
1197 mgmt_intf_desc
= self
.vnfd
.msg
.mgmt_interface
1199 if mgmt_intf_desc
.has_field("cp"):
1200 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1201 elif mgmt_intf_desc
.has_field("vdu_id"):
1203 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1204 ip_addr
= vdur
.management_ip
1205 except VDURecordNotFound
:
1206 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1209 ip_addr
= mgmt_intf_desc
.ip_address
1210 port
= mgmt_intf_desc
.port
1212 return ip_addr
, port
1216 """ Message associated with this VNFR """
1217 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1218 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.msg
.as_dict().items() if k
in vnfd_fields
}
1220 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1221 ip_address
, port
= self
.mgmt_intf_info()
1223 if ip_address
is not None:
1224 mgmt_intf
.ip_address
= ip_address
1225 if port
is not None:
1226 mgmt_intf
.port
= port
1228 vnfr_dict
= {"id": self
._vnfr
_id
,
1229 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1231 "member_vnf_index_ref": self
.member_vnf_index
,
1232 "vnfd_ref": self
.vnfd_id
,
1233 "operational_status": self
.operational_status
,
1234 "operational_status_details": self
._state
_failed
_reason
,
1235 "cloud_account": self
.cloud_account_name
,
1236 "config_status": self
._config
_status
1239 vnfr_dict
.update(vnfd_copy_dict
)
1241 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1242 vnfr_msg
.mgmt_interface
= mgmt_intf
1244 # Add all the VLRs to VNFR
1245 for vlr
in self
._vlrs
:
1246 ivlr
= vnfr_msg
.internal_vlr
.add()
1247 ivlr
.vlr_ref
= vlr
.vlr_id
1249 # Add all the VDURs to VDUR
1250 if self
._vdus
is not None:
1251 for vdu
in self
._vdus
:
1252 vdur
= vnfr_msg
.vdur
.add()
1253 vdur
.from_dict(vdu
.msg
.as_dict())
1255 if self
.vnfd
.msg
.mgmt_interface
.has_field('dashboard_params'):
1256 vnfr_msg
.dashboard_url
= self
.dashboard_url
1258 for cpr
in self
._cprs
:
1259 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1260 vnfr_msg
.connection_point
.append(new_cp
)
1262 if self
._vnf
_mon
is not None:
1263 for monp
in self
._vnf
_mon
.msg
:
1264 vnfr_msg
.monitoring_param
.append(
1265 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1267 if self
._vnfr
.vnf_configuration
is not None:
1268 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1269 if (ip_address
is not None and
1270 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1271 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1273 for group
in self
._vnfr
_msg
.placement_groups_info
:
1274 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1275 group_info
.from_dict(group
.as_dict())
1276 vnfr_msg
.placement_groups_info
.append(group_info
)
1281 def dashboard_url(self
):
1282 ip
, cfg_port
= self
.mgmt_intf_info()
1285 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('https'):
1286 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.https
is True:
1289 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('port'):
1290 http_port
= self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.port
1292 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1296 path
=self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1303 """ path for this VNFR """
1304 return("D,/vnfr:vnfr-catalog"
1305 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1308 def publish(self
, xact
):
1309 """ publish this VNFR """
1311 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1312 self
.xpath
, self
.msg
)
1313 vnfr
.create_time
= self
._create
_time
1314 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1315 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1316 self
.xpath
, self
.msg
)
1319 def create_vls(self
):
1320 """ Publish The VLs associated with this VNF """
1321 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1323 for ivld_msg
in self
.vnfd
.msg
.internal_vld
:
1324 self
._log
.debug("Creating internal vld:"
1325 " %s, int_cp_ref = %s",
1326 ivld_msg
, ivld_msg
.internal_connection_point
1328 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1332 vnfr_name
=self
.name
,
1333 cloud_account_name
=self
.cloud_account_name
1335 self
._vlrs
.append(vlr
)
1337 for int_cp
in ivld_msg
.internal_connection_point
:
1338 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1339 msg
= ("Connection point %s already "
1340 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1341 raise InternalVirtualLinkRecordError(msg
)
1342 self
._log
.debug("Setting vlr %s to internal cp = %s",
1344 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1347 def instantiate_vls(self
, xact
, restart_mode
=False):
1348 """ Instantiate the VLs associated with this VNF """
1349 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1352 for vlr
in self
._vlrs
:
1353 self
._log
.debug("Instantiating VLR %s", vlr
)
1354 yield from vlr
.instantiate(xact
, restart_mode
)
1356 def find_vlr_by_cp(self
, cp_name
):
1357 """ Find the VLR associated with the cp name """
1358 return self
._vlr
_by
_cp
[cp_name
]
1360 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1362 Returns the cloud specific construct for placement group
1364 input_group: VNFD PlacementGroup
1365 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1367 copy_dict
= ['name', 'requirement', 'strategy']
1368 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1369 if group_info
.placement_group_ref
== input_group
.name
and \
1370 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1371 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1372 group_dict
= {k
:v
for k
,v
in
1373 group_info
.as_dict().items()
1374 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1375 for param
in copy_dict
:
1376 group_dict
.update({param
: getattr(input_group
, param
)})
1377 group
.from_dict(group_dict
)
1382 def get_vdu_placement_groups(self
, vdu
):
1383 placement_groups
= []
1384 ### Step-1: Get VNF level placement groups
1385 for group
in self
._vnfr
_msg
.placement_groups_info
:
1386 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1387 #group_info.from_dict(group.as_dict())
1388 placement_groups
.append(group
)
1390 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1391 nsr_config
= yield from self
.get_nsr_config()
1393 ### Step-3: Get VDU level placement groups
1394 for group
in self
.vnfd
.msg
.placement_groups
:
1395 for member_vdu
in group
.member_vdus
:
1396 if member_vdu
.member_vdu_ref
== vdu
.id:
1397 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1399 if group_info
is None:
1400 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1401 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1403 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1407 self
.member_vnf_index
)
1408 placement_groups
.append(group_info
)
1410 return placement_groups
1413 def create_vdus(self
, vnfr
, restart_mode
=False):
1414 """ Create the VDUs associated with this VNF """
1416 def get_vdur_id(vdud
):
1417 """Get the corresponding VDUR's id for the VDUD. This is useful in
1420 In restart mode we check for exiting VDUR's ID and use them, if
1421 available. This way we don't end up creating duplicate VDURs
1425 if restart_mode
and vdud
is not None:
1427 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1430 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1435 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1436 for vdu
in self
.vnfd
.msg
.vdu
:
1437 self
._log
.debug("Creating vdu: %s", vdu
)
1438 vdur_id
= get_vdur_id(vdu
)
1440 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1441 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1444 self
.member_vnf_index
,
1445 [ group
.name
for group
in placement_groups
])
1447 vdur
= VirtualDeploymentUnitRecord(
1453 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1454 cloud_account_name
=self
.cloud_account_name
,
1455 vnfd_package_store
=self
._vnfd
_package
_store
,
1457 placement_groups
= placement_groups
,
1459 yield from vdur
.vdu_opdata_register()
1461 self
._vdus
.append(vdur
)
1464 def instantiate_vdus(self
, xact
, vnfr
):
1465 """ Instantiate the VDUs associated with this VNF """
1466 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1468 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1470 # Identify any dependencies among the VDUs
1471 dependencies
= collections
.defaultdict(list)
1472 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1474 for vdu
in self
._vdus
:
1475 if vdu
.vdud_cloud_init
is not None:
1476 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1477 if vdu_id
!= vdu
.vdu_id
:
1478 # This means that vdu.vdu_id depends upon vdu_id,
1479 # i.e. vdu_id must be instantiated before
1481 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1483 # Define the terminal states of VDU instantiation
1485 VDURecordState
.READY
,
1486 VDURecordState
.TERMINATED
,
1487 VDURecordState
.FAILED
,
1490 datastore
= VdurDatastore()
1494 def instantiate_monitor(vdu
):
1495 """Monitor the state of the VDU during instantiation
1498 vdu - a VirtualDeploymentUnitRecord
1501 # wait for the VDUR to enter a terminal state
1502 while vdu
._state
not in terminal
:
1503 yield from asyncio
.sleep(1, loop
=self
._loop
)
1505 # update the datastore
1506 datastore
.update(vdu
)
1508 # add the VDU to the set of processed VDUs
1509 processed
.add(vdu
.vdu_id
)
1512 def instantiate(vdu
):
1513 """Instantiate the specified VDU
1516 vdu - a VirtualDeploymentUnitRecord
1519 if the VDU, or any of the VDUs this VDU depends upon, are
1520 terminated or fail to instantiate properly, a
1521 VirtualDeploymentUnitRecordError is raised.
1524 for dependency
in dependencies
[vdu
.vdu_id
]:
1525 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1527 while dependency
.vdu_id
not in processed
:
1528 yield from asyncio
.sleep(1, loop
=self
._loop
)
1530 if not dependency
.active
:
1531 raise VirtualDeploymentUnitRecordError()
1533 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1535 # Populate the datastore with the current values of the VDU
1538 # Substitute any variables contained in the cloud config script
1539 config
= str(vdu
.vdud_cloud_init
)
1541 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1544 # Extract the variable names
1546 for variable
in parts
[1::2]:
1547 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1549 # Iterate of the variables and substitute values from the
1551 for variable
in variables
:
1553 # Handle a reference to a VDU by ID
1554 if variable
.startswith('vdu['):
1555 value
= datastore
.get(variable
)
1557 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1558 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1560 config
= config
.replace("{{ %s }}" % variable
, value
)
1563 # Handle a reference to the current VDU
1564 if variable
.startswith('vdu'):
1565 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1566 config
= config
.replace("{{ %s }}" % variable
, value
)
1569 # Handle unrecognized variables
1570 msg
= 'unrecognized cloud-config variable: {}'
1571 raise ValueError(msg
.format(variable
))
1573 # Instantiate the VDU
1574 with self
._dts
.transaction() as xact
:
1575 self
._log
.debug("Instantiating vdu: %s", vdu
)
1576 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1577 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1578 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1581 # First create a set of tasks to monitor the state of the VDUs and
1582 # report when they have entered a terminal state
1583 for vdu
in self
._vdus
:
1584 self
._loop
.create_task(instantiate_monitor(vdu
))
1586 for vdu
in self
._vdus
:
1587 self
._loop
.create_task(instantiate(vdu
))
1589 def has_mgmt_interface(self
, vdu
):
1590 # ## TODO: Support additional mgmt_interface type options
1591 if self
.vnfd
.msg
.mgmt_interface
.vdu_id
== vdu
.id:
1595 def vlr_xpath(self
, vlr_id
):
1598 "D,/vlr:vlr-catalog/"
1599 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1601 def ext_vlr_by_id(self
, vlr_id
):
1602 """ find ext vlr by id """
1603 return self
._ext
_vlrs
[vlr_id
]
1606 def publish_inventory(self
, xact
):
1607 """ Publish the inventory associated with this VNF """
1608 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1610 for component
in self
.vnfd
.msg
.component
:
1611 self
._log
.debug("Creating inventory component %s", component
)
1612 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1616 comp
= VcsComponent(dts
=self
._dts
,
1619 cluster_name
=self
._cluster
_name
,
1620 vcs_handler
=self
._vcs
_handler
,
1621 component
=component
,
1622 mangled_name
=mangled_name
,
1624 if comp
.name
in self
._inventory
:
1625 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1626 component
, self
._vnfd
_id
)
1628 self
._log
.debug("Adding component %s for vnrf %s",
1629 comp
.name
, self
._vnfr
_id
)
1630 self
._inventory
[comp
.name
] = comp
1631 yield from comp
.publish(xact
)
1633 def all_vdus_active(self
):
1634 """ Are all VDUS in this VNFR active? """
1635 for vdu
in self
._vdus
:
1639 self
._log
.debug("Inside all_vdus_active. Returning True")
1643 def instantiation_failed(self
, failed_reason
=None):
1644 """ VNFR instantiation failed """
1645 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1646 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1647 self
._state
_failed
_reason
= failed_reason
1649 # Update the VNFR with the changed status
1650 yield from self
.publish(None)
1654 """ This VNF is ready"""
1655 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1657 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1658 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1661 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1663 # Update the VNFR with the changed status
1664 yield from self
.publish(None)
1666 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1667 """Updated the connection point with ip address"""
1668 for cp
in self
._cprs
:
1669 if cp
.name
== cp_name
:
1670 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1671 cp_name
, cp
, ip_address
, cp_id
)
1672 cp
.ip_address
= ip_address
1673 cp
.mac_address
= mac_addr
1674 cp
.connection_point_id
= cp_id
1677 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1678 self
._log
.debug(err
)
1679 raise VirtualDeploymentUnitRecordError(err
)
1681 def set_state(self
, state
):
1682 """ Set state for this VNFR"""
1686 def instantiate(self
, xact
, restart_mode
=False):
1687 """ instantiate this VNF """
1688 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1693 # Iterate over all the connection points in VNFR and fetch the
1696 def cpr_from_cp(cp
):
1697 """ Creates a record level connection point from the desciptor cp"""
1698 cp_fields
= ["name", "image", "vm-flavor"]
1699 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1701 cpr_dict
.update(cp_copy_dict
)
1702 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1704 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1705 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1707 for cp
in self
._vnfr
.connection_point
:
1708 cpr
= cpr_from_cp(cp
)
1709 self
._cprs
.append(cpr
)
1710 self
._log
.debug("Adding Connection point record %s ", cp
)
1712 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1713 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1714 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1715 rwdts
.XactFlag
.MERGE
)
1719 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1720 cpr
.vlr_ref
= cp
.vlr_ref
1721 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1723 # Fetch the VNFD associated with the VNFR
1724 self
._log
.debug("VNFR-ID %s: Fetching vnfds", self
._vnfr
_id
)
1725 self
._vnfd
= yield from self
._vnfm
.get_vnfd_ref(self
._vnfd
_id
)
1726 self
._log
.debug("VNFR-ID %s: Fetched vnfd:%s", self
._vnfr
_id
, self
._vnfd
)
1728 assert self
.vnfd
is not None
1730 # Fetch External VLRs
1731 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1732 yield from fetch_vlrs()
1735 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1736 yield from self
.publish_inventory(xact
)
1739 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1740 yield from self
.create_vls()
1743 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1744 yield from self
.publish(xact
)
1747 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1749 yield from self
.instantiate_vls(xact
, restart_mode
)
1750 except Exception as e
:
1751 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1752 yield from self
.instantiation_failed(str(e
))
1755 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1758 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1759 yield from self
.create_vdus(self
, restart_mode
)
1762 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1763 yield from self
.publish(xact
)
1766 # ToDo: Check if this should be prevented during restart
1767 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1768 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1771 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1772 yield from self
.publish(xact
)
1774 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1777 def terminate(self
, xact
):
1778 """ Terminate this virtual network function """
1780 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1782 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1785 if self
._vnf
_mon
is not None:
1786 self
._vnf
_mon
.stop()
1787 self
._vnf
_mon
.deregister()
1788 self
._vnf
_mon
= None
1791 def terminate_vls():
1792 """ Terminate VLs in this VNF """
1793 for vl
in self
._vlrs
:
1794 yield from vl
.terminate(xact
)
1797 def terminate_vdus():
1798 """ Terminate VDUS in this VNF """
1799 for vdu
in self
._vdus
:
1800 yield from vdu
.terminate(xact
)
1802 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1803 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1804 yield from terminate_vls()
1806 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1807 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1808 yield from terminate_vdus()
1810 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1811 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1814 class VnfdDtsHandler(object):
1815 """ DTS handler for VNFD config changes """
1816 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1818 def __init__(self
, dts
, log
, loop
, vnfm
):
1827 """ DTS registration handle """
1832 """ Register for VNFD configuration"""
1834 def on_apply(dts
, acg
, xact
, action
, scratch
):
1835 """Apply the configuration"""
1836 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1837 xact
, action
, scratch
)
1839 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1840 # Create/Update a VNFD record
1841 for cfg
in self
._regh
.get_xact_elements(xact
):
1842 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1843 if cfg
.id in scratch
.get('vnfds', []) or is_recovery
:
1844 self
._vnfm
.update_vnfd(cfg
)
1846 scratch
.pop('vnfds', None)
1849 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1850 """ on prepare callback """
1851 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1852 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1853 fref
= ProtobufC
.FieldReference
.alloc()
1854 fref
.goto_whole_message(msg
.to_pbcm())
1856 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1857 if fref
.is_field_deleted():
1858 # Delete an VNFD record
1859 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1860 if self
._vnfm
.vnfd_in_use(msg
.id):
1861 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1862 err
= "Cannot delete a VNFD in use - %s" % msg
1863 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1864 # Delete a VNFD record
1865 yield from self
._vnfm
.delete_vnfd(msg
.id)
1867 # Handle actual adds/updates in apply_callback,
1868 # just check if VNFD in use in prepare_callback
1869 if self
._vnfm
.vnfd_in_use(msg
.id):
1870 self
._log
.debug("Cannot modify an VNFD in use - %s", msg
)
1871 err
= "Cannot modify an VNFD in use - %s" % msg
1872 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1874 # Add this VNFD to scratch to create/update in apply callback
1875 vnfds
= scratch
.setdefault('vnfds', [])
1876 vnfds
.append(msg
.id)
1878 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1881 "Registering for VNFD config using xpath: %s",
1882 VnfdDtsHandler
.XPATH
,
1884 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1885 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1886 self
._regh
= acg
.register(
1887 xpath
=VnfdDtsHandler
.XPATH
,
1888 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1889 on_prepare
=on_prepare
)
1892 class VcsComponentDtsHandler(object):
1893 """ Vcs Component DTS handler """
1894 XPATH
= ("D,/rw-manifest:manifest" +
1895 "/rw-manifest:operational-inventory" +
1896 "/rw-manifest:component")
1898 def __init__(self
, dts
, log
, loop
, vnfm
):
1907 """ DTS registration handle """
1912 """ Registers VCS component dts publisher registration"""
1913 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
1914 VcsComponentDtsHandler
.XPATH
)
1916 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1917 handlers
= rift
.tasklets
.Group
.Handler()
1918 with self
._dts
.group_create(handler
=handlers
) as group
:
1919 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
1921 flags
=(rwdts
.Flag
.PUBLISHER |
1922 rwdts
.Flag
.NO_PREP_READ |
1923 rwdts
.Flag
.DATASTORE
),)
1926 def publish(self
, xact
, path
, msg
):
1927 """ Publishes the VCS component """
1928 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
1930 self
.regh
.create_element(path
, msg
)
1931 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1932 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
1934 class VnfrConsoleOperdataDtsHandler(object):
1935 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1937 def vnfr_vdu_console_xpath(self
):
1938 """ path for resource-mgr"""
1939 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
1941 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
1948 self
._vnfr
_id
= vnfr_id
1949 self
._vdur
_id
= vdur_id
1950 self
._vdu
_id
= vdu_id
1954 """ Register for VNFR VDU Operational Data read from dts """
1957 def on_prepare(xact_info
, action
, ks_path
, msg
):
1958 """ prepare callback from dts """
1959 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
1961 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1962 xact_info
, action
, xpath
, msg
1965 if action
== rwdts
.QueryAction
.READ
:
1966 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
1967 path_entry
= schema
.keyspec_to_entry(ks_path
)
1968 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
1970 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
1971 except VnfRecordError
as e
:
1972 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
1973 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1976 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
1977 if not vdur
._state
== VDURecordState
.READY
:
1978 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
1979 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1981 with self
._dts
.transaction() as new_xact
:
1982 resp
= yield from vdur
.read_resource(new_xact
)
1983 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1984 vdur_console
.id = self
._vdur
_id
1985 if resp
.console_url
:
1986 vdur_console
.console_url
= resp
.console_url
1988 vdur_console
.console_url
= 'none'
1989 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
1991 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
1992 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1993 vdur_console
.id = self
._vdur
_id
1994 vdur_console
.console_url
= 'none'
1996 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
1997 xpath
=self
.vnfr_vdu_console_xpath
,
2000 #raise VnfRecordError("Not supported operation %s" % action)
2001 self
._log
.error("Not supported operation %s" % action
)
2002 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2006 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2007 self
.vnfr_vdu_console_xpath
)
2008 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2009 with self
._dts
.group_create() as group
:
2010 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2012 flags
=rwdts
.Flag
.PUBLISHER
,
2016 class VnfrDtsHandler(object):
2017 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2018 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2020 def __init__(self
, dts
, log
, loop
, vnfm
):
2030 """ Return registration handle"""
2035 """ Return VNF manager instance """
2040 """ Register for vnfr create/update/delete/read requests from dts """
2041 def on_commit(xact_info
):
2042 """ The transaction has been committed """
2043 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2044 return rwdts
.MemberRspCode
.ACTION_OK
2046 def on_abort(*args
):
2047 """ Abort callback """
2048 self
._log
.debug("VNF transaction got aborted")
2051 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2054 def instantiate_realloc_vnfr(vnfr
):
2055 """Re-populate the vnfm after restart
2062 yield from vnfr
.instantiate(None, restart_mode
=True)
2064 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2065 curr_cfg
= self
.regh
.elements
2066 for cfg
in curr_cfg
:
2067 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2068 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2070 self
._log
.debug("Got on_event in vnfm")
2072 return rwdts
.MemberRspCode
.ACTION_OK
2075 def on_prepare(xact_info
, action
, ks_path
, msg
):
2076 """ prepare callback from dts """
2078 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2079 xact_info
, action
, msg
2082 if action
== rwdts
.QueryAction
.CREATE
:
2083 if not msg
.has_field("vnfd_ref"):
2084 err
= "Vnfd reference not provided"
2085 self
._log
.error(err
)
2086 raise VnfRecordError(err
)
2088 vnfr
= self
.vnfm
.create_vnfr(msg
)
2090 # RIFT-9105: Unable to add a READ query under an existing transaction
2091 # xact = xact_info.xact
2092 yield from vnfr
.instantiate(None)
2093 except Exception as e
:
2094 self
._log
.exception(e
)
2095 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2096 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2097 yield from vnfr
.publish(None)
2098 elif action
== rwdts
.QueryAction
.DELETE
:
2099 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2100 path_entry
= schema
.keyspec_to_entry(ks_path
)
2101 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2104 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2105 raise VirtualNetworkFunctionRecordNotFound(
2106 "VNFR id %s", path_entry
.key00
.id)
2109 yield from vnfr
.terminate(xact_info
.xact
)
2112 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2113 except Exception as e
:
2114 self
._log
.exception(e
)
2115 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2117 elif action
== rwdts
.QueryAction
.UPDATE
:
2118 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2119 path_entry
= schema
.keyspec_to_entry(ks_path
)
2122 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2123 except Exception as e
:
2124 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2125 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2129 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2130 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2133 self
._log
.debug("VNFR {} update config status {} (current {})".
2134 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2135 # Update the config status and publish
2136 vnfr
._config
_status
= msg
.config_status
2137 yield from vnfr
.publish(None)
2140 raise NotImplementedError(
2141 "%s action on VirtualNetworkFunctionRecord not supported",
2144 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2146 self
._log
.debug("Registering for VNFR using xpath: %s",
2147 VnfrDtsHandler
.XPATH
,)
2149 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2150 on_prepare
=on_prepare
,)
2151 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2152 with self
._dts
.group_create(handler
=handlers
) as group
:
2153 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2155 flags
=(rwdts
.Flag
.PUBLISHER |
2156 rwdts
.Flag
.NO_PREP_READ |
2158 rwdts
.Flag
.DATASTORE
),)
2161 def create(self
, xact
, path
, msg
):
2163 Create a VNFR record in DTS with path and message
2165 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2168 self
.regh
.create_element(path
, msg
)
2169 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2173 def update(self
, xact
, path
, msg
):
2175 Update a VNFR record in DTS with path and message
2177 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2179 self
.regh
.update_element(path
, msg
)
2180 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2184 def delete(self
, xact
, path
):
2186 Delete a VNFR record in DTS with path and message
2188 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2189 self
.regh
.delete_element(path
)
2190 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2193 class VirtualNetworkFunctionDescriptor(object):
2195 Virtual Network Function descriptor class
2198 def __init__(self
, dts
, log
, loop
, vnfm
, vnfd
):
2208 def ref_count(self
):
2209 """ Returns the reference count associated with
2210 this Virtual Network Function Descriptor"""
2211 return self
._ref
_count
2215 """ Returns vnfd id """
2216 return self
._vnfd
.id
2220 """ Returns vnfd name """
2221 return self
._vnfd
.name
2224 """ Returns whether vnfd is in use or not """
2225 return True if self
._ref
_count
> 0 else False
2228 """ Take a reference on this object """
2229 self
._ref
_count
+= 1
2230 return self
._ref
_count
2233 """ Release reference on this object """
2234 if self
.ref_count
< 1:
2235 msg
= ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2236 (self
.id, self
._ref
_count
))
2237 self
._log
.critical(msg
)
2238 raise VnfRecordError(msg
)
2239 self
._log
.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2240 self
.id, self
.ref_count
)
2241 self
._ref
_count
-= 1
2242 return self
._ref
_count
2246 """ Return the message associated with this NetworkServiceDescriptor"""
2250 def path_for_id(vnfd_id
):
2251 """ Return path for the passed vnfd_id"""
2252 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
2255 """ Return the path associated with this NetworkServiceDescriptor"""
2256 return VirtualNetworkFunctionDescriptor
.path_for_id(self
.id)
2258 def update(self
, vnfd
):
2259 """ Update the Virtual Network Function Descriptor """
2261 self
._log
.error("Cannot update descriptor %s in use refcnt=%d",
2262 self
.id, self
.ref_count
)
2264 # The following loop is added to debug RIFT-13284
2265 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2266 if vnf_rec
.vnfd_id
== self
.id:
2267 self
._log
.error("descriptor %s in used by %s:%s",
2268 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2269 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self
.id)
2273 """ Delete the Virtual Network Function Descriptor """
2275 self
._log
.error("Cannot delete descriptor %s in use refcnt=%d",
2278 # The following loop is added to debug RIFT-13284
2279 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2280 if vnf_rec
.vnfd_id
== self
.id:
2281 self
._log
.error("descriptor %s in used by %s:%s",
2282 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2283 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self
.id)
2284 self
._vnfm
.delete_vnfd(self
.id)
2287 class VnfdRefCountDtsHandler(object):
2288 """ The VNFD Ref Count DTS handler """
2289 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2291 def __init__(self
, dts
, log
, loop
, vnfm
):
2301 """ Return registration handle """
2306 """ Return the NS manager instance """
2311 """ Register for VNFD ref count read from dts """
2314 def on_prepare(xact_info
, action
, ks_path
, msg
):
2315 """ prepare callback from dts """
2316 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2318 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2319 xact_info
, action
, xpath
, msg
2322 if action
== rwdts
.QueryAction
.READ
:
2323 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2324 path_entry
= schema
.keyspec_to_entry(ks_path
)
2325 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2326 for xpath
, msg
in vnfd_list
:
2327 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2329 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2332 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2334 raise VnfRecordError("Not supported operation %s" % action
)
2336 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2337 with self
._dts
.group_create() as group
:
2338 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2340 flags
=rwdts
.Flag
.PUBLISHER
,
2344 class VdurDatastore(object):
2346 This VdurDatastore is intended to expose select information about a VDUR
2347 such that it can be referenced in a cloud config file. The data that is
2348 exposed does not necessarily follow the structure of the data in the yang
2349 model. This is intentional. The data that are exposed are intended to be
2350 agnostic of the yang model so that changes in the model do not necessarily
2351 require changes to the interface provided to the user. It also means that
2352 the user does not need to be familiar with the RIFT.ware yang models.
2356 """Create an instance of VdurDatastore"""
2357 self
._vdur
_data
= dict()
2358 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2360 def add(self
, vdur
):
2361 """Add a new VDUR to the datastore
2364 vdur - a VirtualDeploymentUnitRecord instance
2367 A ValueError is raised if the VDUR is (1) None or (2) already in
2371 if vdur
.vdu_id
is None:
2372 raise ValueError('VDURs are required to have an ID')
2374 if vdur
.vdu_id
in self
._vdur
_data
:
2375 raise ValueError('cannot add a VDUR more than once')
2377 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2379 def set_if_not_none(key
, attr
):
2380 if attr
is not None:
2381 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2383 set_if_not_none('name', vdur
._vdud
.name
)
2384 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2386 def update(self
, vdur
):
2387 """Update the VDUR information in the datastore
2390 vdur - a GI representation of a VDUR
2393 A ValueError is raised if the VDUR is (1) None or (2) already in
2397 if vdur
.vdu_id
is None:
2398 raise ValueError('VNFDs are required to have an ID')
2400 if vdur
.vdu_id
not in self
._vdur
_data
:
2401 raise ValueError('VNF is not recognized')
2403 def set_or_delete(key
, attr
):
2405 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2406 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2409 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2411 set_or_delete('name', vdur
._vdud
.name
)
2412 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2414 def remove(self
, vdur_id
):
2415 """Remove all of the data associated with specified VDUR
2418 vdur_id - the identifier of a VNFD in the datastore
2421 A ValueError is raised if the VDUR is not contained in the
2425 if vdur_id
not in self
._vdur
_data
:
2426 raise ValueError('VNF is not recognized')
2428 del self
._vdur
_data
[vdur_id
]
2430 def get(self
, expr
):
2431 """Retrieve VDUR information from the datastore
2433 An expression should be of the form,
2437 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2438 the exposed attribute that the user wishes to retrieve.
2440 If the requested data is not available, None is returned.
2443 expr - a string that specifies the data to return
2446 A ValueError is raised if the provided expression cannot be parsed.
2449 The requested data or None
2452 result
= self
._pattern
.match(expr
)
2454 raise ValueError('data expression not recognized ({})'.format(expr
))
2456 vdur_id
, key
= result
.groups()
2458 if vdur_id
not in self
._vdur
_data
:
2461 return self
._vdur
_data
[vdur_id
].get(key
, None)
2464 class VnfManager(object):
2465 """ The virtual network function manager class """
2466 def __init__(self
, dts
, log
, loop
, cluster_name
):
2470 self
._cluster
_name
= cluster_name
2472 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2473 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2475 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2478 VnfdRefCountDtsHandler(dts
, log
, loop
, self
)]
2483 def vnfr_handler(self
):
2484 """ VNFR dts handler """
2485 return self
._vnfr
_handler
2488 def vcs_handler(self
):
2489 """ VCS dts handler """
2490 return self
._vcs
_handler
2494 """ Register all static DTS handlers """
2495 for hdl
in self
._dts
_handlers
:
2496 yield from hdl
.register()
2500 """ Run this VNFM instance """
2501 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2502 yield from self.register()
2504 def get_vnfr(self, vnfr_id):
2505 """ get VNFR by vnfr id """
2507 if vnfr_id not in self._vnfrs:
2508 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2510 return self._vnfrs[vnfr_id]
2512 def create_vnfr(self, vnfr):
2513 """ Create a VNFR instance """
2514 if vnfr.id in self._vnfrs:
2515 msg = "Vnfr
id %s already exists
" % vnfr.id
2516 self._log.error(msg)
2517 raise VnfRecordError(msg)
2519 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2523 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2524 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr
2526 return self._vnfrs[vnfr.id]
2529 def delete_vnfr(self, xact, vnfr):
2530 """ Create a VNFR instance """
2531 if vnfr.vnfr_id in self._vnfrs:
2532 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2533 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2534 del self._vnfrs[vnfr.vnfr_id]
2537 def fetch_vnfd(self, vnfd_id):
2538 """ Fetch VNFDs based with the vnfd id"""
2539 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2540 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2543 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2545 for ent in res_iter:
2546 res = yield from ent
2550 err = "Failed to get Vnfd
%s" % vnfd_id
2551 self._log.error(err)
2552 raise VnfRecordError(err)
2554 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2559 def get_vnfd_ref(self, vnfd_id):
2560 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2561 vnfd = yield from self.get_vnfd(vnfd_id)
2566 def get_vnfd(self, vnfd_id):
2567 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2569 if vnfd_id not in self._vnfds:
2570 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2571 vnfd = yield from self.fetch_vnfd(vnfd_id)
2574 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2575 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD
id:%s", vnfd_id)
2577 if vnfd.id != vnfd_id:
2578 self._log.error("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2579 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2581 if vnfd.id not in self._vnfds:
2582 self.create_vnfd(vnfd)
2584 return self._vnfds[vnfd_id]
2586 def vnfd_in_use(self, vnfd_id):
2587 """ Is this VNFD in use """
2588 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2589 if vnfd_id in self._vnfds:
2590 return self._vnfds[vnfd_id].in_use()
2594 def publish_vnfr(self, xact, path, msg):
2595 """ Publish a VNFR """
2596 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2598 yield from self.vnfr_handler.update(xact, path, msg)
2600 def create_vnfd(self, vnfd):
2601 """ Create a virtual network function descriptor """
2602 self._log.debug("Create virtual networkfunction descriptor
- %s", vnfd)
2603 if vnfd.id in self._vnfds:
2604 self._log.error("Cannot create VNFD
%s -VNFD
id already exists
", vnfd)
2605 raise VirtualNetworkFunctionDescriptorError("VNFD already exists
-%s", vnfd.id)
2607 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2612 return self._vnfds[vnfd.id]
2614 def update_vnfd(self, vnfd):
2615 """ update the Virtual Network Function descriptor """
2616 self._log.debug("Update virtual network function descriptor
- %s", vnfd)
2618 if vnfd.id not in self._vnfds:
2619 self._log.debug("No VNFD found
- creating VNFD
id = %s", vnfd.id)
2620 self.create_vnfd(vnfd)
2622 self._log.debug("Updating VNFD
id = %s, vnfd
= %s", vnfd.id, vnfd)
2623 self._vnfds[vnfd.id].update(vnfd)
2626 def delete_vnfd(self, vnfd_id):
2627 """ Delete the Virtual Network Function descriptor with the passed id """
2628 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2629 if vnfd_id not in self._vnfds:
2630 self._log.debug("Delete VNFD failed
- cannot find vnfd
-id %s", vnfd_id)
2631 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find
%s", vnfd_id)
2633 if self._vnfds[vnfd_id].in_use():
2634 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2636 self._vnfds[vnfd_id].ref_count)
2637 raise VirtualNetworkFunctionDescriptorRefCountExists(
2638 "Cannot delete
:%s, ref_count
:%s",
2640 self._vnfds[vnfd_id].ref_count)
2642 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2644 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2645 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2646 if os.path.exists(vnfd_dir):
2647 shutil.rmtree(vnfd_dir, ignore_errors=True)
2648 except Exception as e:
2649 self._log.error("Exception in cleaning up VNFD
{}: {}".
2650 format(self._vnfds[vnfd_id].name, e))
2651 self._log.exception(e)
2653 del self._vnfds[vnfd_id]
2655 def vnfd_refcount_xpath(self, vnfd_id):
2656 """ xpath for ref count entry """
2657 return (VnfdRefCountDtsHandler.XPATH +
2658 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2661 def get_vnfd_refcount(self, vnfd_id):
2662 """ Get the vnfd_list from this VNFM"""
2664 if vnfd_id is None or vnfd_id == "":
2665 for vnfd in self._vnfds.values():
2666 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2667 vnfd_msg.vnfd_id_ref = vnfd.id
2668 vnfd_msg.instance_ref_count = vnfd.ref_count
2669 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2670 elif vnfd_id in self._vnfds:
2671 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2672 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2673 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2678 class VnfmTasklet(rift.tasklets.Tasklet):
2679 """ VNF Manager tasklet class """
2680 def __init__(self, *args, **kwargs):
2681 super(VnfmTasklet, self).__init__(*args, **kwargs)
2682 self.rwlog.set_category("rw
-mano
-log
")
2683 self.rwlog.set_subcategory("vnfm
")
2690 super(VnfmTasklet, self).start()
2691 self.log.info("Starting VnfmTasklet
")
2693 self.log.setLevel(logging.DEBUG)
2695 self.log.debug("Registering with dts
")
2696 self._dts = rift.tasklets.DTS(self.tasklet_info,
2697 RwVnfmYang.get_schema(),
2699 self.on_dts_state_change)
2701 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2703 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2706 def on_instance_started(self):
2707 """ Task insance started callback """
2708 self.log.debug("Got instance started callback
")
2714 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2719 """ Task init callback """
2721 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2722 assert vm_parent_name is not None
2723 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2724 yield from self._vnfm.run()
2726 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2731 """ Task run callback """
2735 def on_dts_state_change(self, state):
2736 """Take action according to current dts state to transition
2737 application into the corresponding application state
2740 state - current dts state
2743 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2744 rwdts.State.CONFIG: rwdts.State.RUN,
2748 rwdts.State.INIT: self.init,
2749 rwdts.State.RUN: self.run,
2752 # Transition application to next state
2753 handler = handlers.get(state, None)
2754 if handler is not None:
2755 yield from handler()
2757 # Transition dts to next state
2758 next_state = switch.get(state, None)
2759 if next_state is not None:
2760 self._dts.handle.set_state(next_state)