2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
54 class VMResourceError(Exception):
55 """ VM resource Error"""
59 class VnfRecordError(Exception):
60 """ VNF record instatiation failed"""
64 class VduRecordError(Exception):
65 """ VDU record instatiation failed"""
69 class NotImplemented(Exception):
70 """Not implemented """
74 class VnfrRecordExistsError(Exception):
75 """VNFR record already exist with the same VNFR id"""
79 class InternalVirtualLinkRecordError(Exception):
80 """Internal virtual link record error"""
84 class VDUImageNotFound(Exception):
85 """VDU Image not found error"""
89 class VirtualDeploymentUnitRecordError(Exception):
90 """VDU Instantiation failed"""
94 class VMNotReadyError(Exception):
95 """ VM Not yet received from resource manager """
99 class VDURecordNotFound(Exception):
100 """ Could not find a VDU record """
104 class VirtualNetworkFunctionRecordDescNotFound(Exception):
105 """ Cannot find Virtual Network Function Record Descriptor """
109 class VirtualNetworkFunctionDescriptorError(Exception):
110 """ Virtual Network Function Record Descriptor Error """
114 class VirtualNetworkFunctionDescriptorNotFound(Exception):
115 """ Virtual Network Function Record Descriptor Not Found """
119 class VirtualNetworkFunctionRecordNotFound(Exception):
120 """ Virtual Network Function Record Not Found """
124 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
125 """ Virtual Network Funtion Descriptor reference count exists """
129 class VnfrInstantiationFailed(Exception):
130 """ Virtual Network Funtion Instantiation failed"""
134 class VNFMPlacementGroupError(Exception):
137 class VirtualNetworkFunctionRecordState(enum
.Enum
):
144 VL_TERMINATE_PHASE
= 6
145 VDU_TERMINATE_PHASE
= 7
150 class VDURecordState(enum
.Enum
):
151 """VDU record state """
154 RESOURCE_ALLOC_PENDING
= 3
161 class VcsComponent(object):
162 """ VCS Component within the VNF descriptor """
163 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
167 self
._component
= component
168 self
._cluster
_name
= cluster_name
169 self
._vcs
_handler
= vcs_handler
170 self
._mangled
_name
= mangled_name
173 def mangle_name(component_name
, vnf_name
, vnfd_id
):
174 """ mangled component name """
175 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
179 """ name of this component"""
180 return self
._mangled
_name
184 """ The path for this object """
185 return("D,/rw-manifest:manifest" +
186 "/rw-manifest:operational-inventory" +
187 "/rw-manifest:component" +
188 "[rw-manifest:component-name = '{}']").format(self
.name
)
191 def instance_xpath(self
):
192 """ The path for this object """
193 return("D,/rw-base:vcs" +
196 "[instance-name = '{}']".format(self
._cluster
_name
))
199 def start_comp_xpath(self
):
200 """ start component xpath """
201 return (self
.instance_xpath
+
202 "/child-n[instance-name = 'START-REQ']")
204 def get_start_comp_msg(self
, ip_address
):
205 """ start this component """
206 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
207 start_msg
.instance_name
= 'START-REQ'
208 start_msg
.component_name
= self
.name
209 start_msg
.admin_command
= "START"
210 start_msg
.ip_address
= ip_address
216 """ Returns the message for this vcs component"""
218 vcs_comp_dict
= self
._component
.as_dict()
220 def mangle_comp_names(comp_dict
):
221 """ mangle component name with VNF name, id"""
222 for key
, val
in comp_dict
.items():
223 if isinstance(val
, dict):
224 comp_dict
[key
] = mangle_comp_names(val
)
225 elif isinstance(val
, list):
228 if isinstance(ent
, dict):
229 val
[i
] = mangle_comp_names(ent
)
233 elif key
== "component_name":
234 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
239 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
240 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
244 def publish(self
, xact
):
245 """ Publishes the VCS component """
246 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
247 self
.name
, self
.path
, self
.msg
)
248 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
251 def start(self
, xact
, parent
, ip_addr
=None):
252 """ Starts this VCS component """
253 # ATTN RV - replace with block add
254 start_msg
= self
.get_start_comp_msg(ip_addr
)
255 self
._log
.debug("starting component %s %s",
256 self
.start_comp_xpath
, start_msg
)
257 yield from self
._dts
.query_create(self
.start_comp_xpath
,
260 self
._log
.debug("started component %s, %s",
261 self
.start_comp_xpath
, start_msg
)
264 class VirtualDeploymentUnitRecord(object):
265 """ Virtual Deployment Unit Record """
276 placement_groups
=[]):
282 self
._mgmt
_intf
= mgmt_intf
283 self
._cloud
_account
_name
= cloud_account_name
284 self
._vnfd
_package
_store
= vnfd_package_store
286 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
289 self
._state
= VDURecordState
.INIT
290 self
._state
_failed
_reason
= None
291 self
._request
_id
= str(uuid
.uuid4())
292 self
._name
= vnfr
.name
+ "__" + vdud
.id
293 self
._placement
_groups
= placement_groups
296 self
._vdud
_cloud
_init
= None
297 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
300 def vdu_opdata_register(self
):
301 yield from self
._vdur
_console
_handler
.register()
303 def cp_ip_addr(self
, cp_name
):
304 """ Find ip address by connection point name """
305 if self
._vm
_resp
is not None:
306 for conn_point
in self
._vm
_resp
.connection_points
:
307 if conn_point
.name
== cp_name
:
308 return conn_point
.ip_address
311 def cp_mac_addr(self
, cp_name
):
312 """ Find mac address by connection point name """
313 if self
._vm
_resp
is not None:
314 for conn_point
in self
._vm
_resp
.connection_points
:
315 if conn_point
.name
== cp_name
:
316 return conn_point
.mac_addr
317 return "00:00:00:00:00:00"
319 def cp_id(self
, cp_name
):
320 """ Find connection point id by connection point name """
321 if self
._vm
_resp
is not None:
322 for conn_point
in self
._vm
_resp
.connection_points
:
323 if conn_point
.name
== cp_name
:
324 return conn_point
.connection_point_id
337 """ Return this VDUR's name """
341 def cloud_account_name(self
):
342 """ Cloud account this VDU should be created in """
343 return self
._cloud
_account
_name
346 def image_name(self
):
347 """ name that should be used to lookup the image on the CMP """
348 return os
.path
.basename(self
._vdud
.image
)
351 def image_checksum(self
):
352 """ name that should be used to lookup the image on the CMP """
353 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
356 def management_ip(self
):
359 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
362 def vm_management_ip(self
):
365 return self
._vm
_resp
.management_ip
368 def operational_status(self
):
369 """ Operational status of this VDU"""
370 op_stats_dict
= {"INIT": "init",
371 "INSTANTIATING": "vm_init_phase",
372 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
375 "TERMINATING": "terminated",
376 "TERMINATED": "terminated",
378 return op_stats_dict
[self
._state
.name
]
383 vdu_fields
= ["vm_flavor",
389 vdu_copy_dict
= {k
: v
for k
, v
in
390 self
._vdud
.as_dict().items() if k
in vdu_fields
}
391 vdur_dict
= {"id": self
._vdur
_id
,
392 "vdu_id_ref": self
._vdud
.id,
393 "operational_status": self
.operational_status
,
394 "operational_status_details": self
._state
_failed
_reason
,
396 if self
.vm_resp
is not None:
397 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
398 "flavor_id": self
.vm_resp
.flavor_id
,
399 "image_id": self
.vm_resp
.image_id
,
402 if self
.management_ip
is not None:
403 vdur_dict
["management_ip"] = self
.management_ip
405 if self
.vm_management_ip
is not None:
406 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
408 vdur_dict
.update(vdu_copy_dict
)
413 for intf
, cp_id
, vlr
in self
._int
_intf
:
414 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
416 icp_list
.append({"name": cp
.name
,
418 "type_yang": "VPORT",
419 "ip_address": self
.cp_ip_addr(cp
.id),
420 "mac_address": self
.cp_mac_addr(cp
.id)})
422 ii_list
.append({"name": intf
.name
,
423 "vdur_internal_connection_point_ref": cp
.id,
424 "virtual_interface": {}})
426 vdur_dict
["internal_connection_point"] = icp_list
427 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
428 vdur_dict
["internal_interface"] = ii_list
431 for intf
, cp
, vlr
in self
._ext
_intf
:
432 ei_list
.append({"name": cp
,
433 "vnfd_connection_point_ref": cp
,
434 "virtual_interface": {}})
435 self
._vnfr
.update_cp(cp
,
437 self
.cp_mac_addr(cp
),
440 vdur_dict
["external_interface"] = ei_list
442 placement_groups
= []
443 for group
in self
._placement
_groups
:
444 placement_groups
.append(group
.as_dict())
446 vdur_dict
['placement_groups_info'] = placement_groups
447 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
450 def resmgr_path(self
):
451 """ path for resource-mgr"""
452 return ("D,/rw-resource-mgr:resource-mgmt" +
454 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
457 def vm_flavor_msg(self
):
458 """ VM flavor message """
459 flavor
= self
._vdud
.vm_flavor
.__class
__()
460 flavor
.copy_from(self
._vdud
.vm_flavor
)
465 def vdud_cloud_init(self
):
466 """ Return the cloud-init contents for the VDU """
467 if self
._vdud
_cloud
_init
is None:
468 self
._vdud
_cloud
_init
= self
.cloud_init()
470 return self
._vdud
_cloud
_init
472 def cloud_init(self
):
473 """ Populate cloud_init with cloud-config script from
474 either the inline contents or from the file provided
476 if self
._vdud
.cloud_init
is not None:
477 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
478 return self
._vdud
.cloud_init
479 elif self
._vdud
.cloud_init_file
is not None:
480 # Get cloud-init script contents from the file provided in the cloud_init_file param
481 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
482 filename
= self
._vdud
.cloud_init_file
483 self
._vnfd
_package
_store
.refresh()
484 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
485 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
487 return cloud_init_extractor
.read_script(stored_package
, filename
)
488 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
489 raise VirtualDeploymentUnitRecordError(e
)
491 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
493 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
495 availability_zones
= []
497 for group
in self
._placement
_groups
:
498 if group
.has_field('host_aggregate'):
499 for aggregate
in group
.host_aggregate
:
500 host_aggregates
.append(aggregate
.as_dict())
501 if group
.has_field('availability_zone'):
502 availability_zones
.append(group
.availability_zone
.as_dict())
503 if group
.has_field('server_group'):
504 server_groups
.append(group
.server_group
.as_dict())
506 if availability_zones
:
507 if len(availability_zones
) > 1:
508 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
509 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
511 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
514 if len(server_groups
) > 1:
515 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
516 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
518 vm_create_msg_dict
['server_group'] = server_groups
[0]
521 vm_create_msg_dict
['host_aggregate'] = host_aggregates
525 def process_placement_groups(self
, vm_create_msg_dict
):
526 """Process the placement_groups and fill resource-mgr request"""
527 if not self
._placement
_groups
:
530 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
531 assert len(cloud_set
) == 1
532 cloud_type
= cloud_set
.pop()
534 if cloud_type
== 'openstack':
535 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
538 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
541 def resmgr_msg(self
, config
=None):
542 vdu_fields
= ["vm_flavor",
548 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
549 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
551 vm_create_msg_dict
= {
553 "image_name": self
.image_name
,
556 if self
.image_checksum
is not None:
557 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
559 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
560 if self
._vdud
.has_field('mgmt_vpci'):
561 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
563 self
._log
.debug("VDUD: %s", self
._vdud
)
564 if config
is not None:
565 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
568 for intf
, cp
, vlr
in self
._ext
_intf
:
569 cp_info
= {"name": cp
,
570 "virtual_link_id": vlr
.network_id
,
571 "type_yang": intf
.virtual_interface
.type_yang
}
573 if (intf
.virtual_interface
.has_field('vpci') and
574 intf
.virtual_interface
.vpci
is not None):
575 cp_info
["vpci"] = intf
.virtual_interface
.vpci
577 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
578 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
580 cp_list
.append(cp_info
)
582 for intf
, cp
, vlr
in self
._int
_intf
:
583 if (intf
.virtual_interface
.has_field('vpci') and
584 intf
.virtual_interface
.vpci
is not None):
585 cp_list
.append({"name": cp
,
586 "virtual_link_id": vlr
.network_id
,
587 "type_yang": intf
.virtual_interface
.type_yang
,
588 "vpci": intf
.virtual_interface
.vpci
})
590 cp_list
.append({"name": cp
,
591 "virtual_link_id": vlr
.network_id
,
592 "type_yang": intf
.virtual_interface
.type_yang
})
594 vm_create_msg_dict
["connection_points"] = cp_list
595 vm_create_msg_dict
.update(vdu_copy_dict
)
597 self
.process_placement_groups(vm_create_msg_dict
)
599 msg
= RwResourceMgrYang
.VDUEventData()
600 msg
.event_id
= self
._request
_id
601 msg
.cloud_account
= self
.cloud_account_name
602 msg
.request_info
.from_dict(vm_create_msg_dict
)
606 def terminate(self
, xact
):
607 """ Delete resource in VIM """
608 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
609 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
612 self
._state
= VDURecordState
.TERMINATING
613 if self
._vm
_resp
is not None:
615 with self
._dts
.transaction() as new_xact
:
616 yield from self
.delete_resource(new_xact
)
618 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
620 if self
._rm
_regh
is not None:
621 self
._log
.debug("Deregistering resource manager registration handle")
622 self
._rm
_regh
.deregister()
625 if self
._vdur
_console
_handler
is not None:
626 self
._log
.error("Deregistering vnfr vdur registration handle")
627 self
._vdur
_console
_handler
._regh
.deregister()
628 self
._vdur
_console
_handler
._regh
= None
630 self
._state
= VDURecordState
.TERMINATED
632 def find_internal_cp_by_cp_id(self
, cp_id
):
633 """ Find the CP corresponding to the connection point id"""
636 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
639 for int_cp
in self
._vdud
.internal_connection_point
:
640 self
._log
.debug("Checking for int cp %s in internal connection points",
642 if int_cp
.id == cp_id
:
647 self
._log
.debug("Failed to find cp %s in internal connection points",
649 msg
= "Failed to find cp %s in internal connection points" % cp_id
650 raise VduRecordError(msg
)
652 # return the VLR associated with the connection point
656 def create_resource(self
, xact
, vnfr
, config
=None):
657 """ Request resource from ResourceMgr """
658 def find_cp_by_name(cp_name
):
659 """ Find a connection point by name """
661 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
662 for ext_cp
in vnfr
._cprs
:
663 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
664 if ext_cp
.name
== cp_name
:
668 self
._log
.debug("Failed to find cp %s in external connection points",
672 def find_internal_vlr_by_cp_name(cp_name
):
673 """ Find the VLR corresponding to the connection point name"""
676 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
679 for int_cp
in self
._vdud
.internal_connection_point
:
680 self
._log
.debug("Checking for int cp %s in internal connection points",
682 if int_cp
.id == cp_name
:
687 self
._log
.debug("Failed to find cp %s in internal connection points",
689 msg
= "Failed to find cp %s in internal connection points" % cp_name
690 raise VduRecordError(msg
)
692 # return the VLR associated with the connection point
693 return vnfr
.find_vlr_by_cp(cp_name
)
695 block
= xact
.block_create()
697 self
._log
.debug("Executing vm request id: %s, action: create",
700 # Resolve the networks associated external interfaces
701 for ext_intf
in self
._vdud
.external_interface
:
702 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
703 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
704 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
706 self
._log
.debug("Failed to find connection point - %s",
707 ext_intf
.vnfd_connection_point_ref
)
709 self
._log
.debug("Connection point name [%s], type[%s]",
710 cp
.name
, cp
.type_yang
)
712 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
714 etuple
= (ext_intf
, cp
.name
, vlr
)
715 self
._ext
_intf
.append(etuple
)
717 self
._log
.debug("Created external interface tuple : %s", etuple
)
719 # Resolve the networks associated internal interfaces
720 for intf
in self
._vdud
.internal_interface
:
721 cp_id
= intf
.vdu_internal_connection_point_ref
722 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
726 vlr
= find_internal_vlr_by_cp_name(cp_id
)
727 except Exception as e
:
728 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
729 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
730 raise VduRecordError(msg
)
732 ituple
= (intf
, cp_id
, vlr
)
733 self
._int
_intf
.append(ituple
)
735 self
._log
.debug("Created internal interface tuple : %s", ituple
)
737 resmgr_path
= self
.resmgr_path
738 resmgr_msg
= self
.resmgr_msg(config
)
740 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
741 block
.add_query_create(resmgr_path
, resmgr_msg
)
743 res_iter
= yield from block
.execute(now
=True)
751 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
752 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
753 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
754 return resp
.resource_info
757 def delete_resource(self
, xact
):
758 block
= xact
.block_create()
760 self
._log
.debug("Executing vm request id: %s, action: delete",
763 block
.add_query_delete(self
.resmgr_path
)
765 yield from block
.execute(flags
=0, now
=True)
768 def read_resource(self
, xact
):
769 block
= xact
.block_create()
771 self
._log
.debug("Executing vm request id: %s, action: delete",
774 block
.add_query_read(self
.resmgr_path
)
776 res_iter
= yield from block
.execute(flags
=0, now
=True)
781 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
782 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
783 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
784 #self._vm_resp = resp.resource_info
785 return resp
.resource_info
789 def start_component(self
):
790 """ This VDUR is active """
791 self
._log
.debug("Starting component %s for vdud %s vdur %s",
792 self
._vdud
.vcs_component_ref
,
795 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
796 self
.vm_resp
.management_ip
)
800 """ Is this VDU active """
801 return True if self
._state
is VDURecordState
.READY
else False
804 def instantiation_failed(self
, failed_reason
=None):
805 """ VDU instantiation failed """
806 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
807 self
._state
= VDURecordState
.FAILED
808 self
._state
_failed
_reason
= failed_reason
809 yield from self
._vnfr
.instantiation_failed(failed_reason
)
812 def vdu_is_active(self
):
813 """ This VDU is active"""
815 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
818 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
820 if self
._vdud
.vcs_component_ref
is not None:
821 yield from self
.start_component()
823 self
._state
= VDURecordState
.READY
825 if self
._vnfr
.all_vdus_active():
826 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
827 yield from self
._vnfr
.is_ready()
830 def instantiate(self
, xact
, vnfr
, config
=None):
831 """ Instantiate this VDU """
832 self
._state
= VDURecordState
.INSTANTIATING
835 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
836 """ This VDUR is active """
837 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
842 if (query_action
== rwdts
.QueryAction
.UPDATE
or
843 query_action
== rwdts
.QueryAction
.CREATE
):
846 if msg
.resource_state
== "active":
847 # Move this VDU to ready state
848 yield from self
.vdu_is_active()
849 elif msg
.resource_state
== "failed":
850 yield from self
.instantiation_failed(msg
.resource_errors
)
851 elif query_action
== rwdts
.QueryAction
.DELETE
:
852 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
854 raise NotImplementedError(
855 "%s action on VirtualDeployementUnitRecord not supported",
858 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
861 reg_event
= asyncio
.Event(loop
=self
._loop
)
864 def on_ready(regh
, status
):
867 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
868 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
869 flags
=rwdts
.Flag
.SUBSCRIBER
,
871 yield from reg_event
.wait()
873 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
874 self
._vm
_resp
= vm_resp
876 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
877 self
._log
.debug("Requested VM from resource manager response %s",
879 if vm_resp
.resource_state
== "active":
880 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
882 yield from self
.vdu_is_active()
883 self
._state
= VDURecordState
.READY
884 elif (vm_resp
.resource_state
== "pending" or
885 vm_resp
.resource_state
== "inactive"):
886 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
888 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
889 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
890 # flags=rwdts.Flag.SUBSCRIBER,
893 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
895 raise VirtualDeploymentUnitRecordError(
896 "Failed VDUR instantiation %s " % vm_resp
)
898 except Exception as e
:
900 traceback
.print_exc()
901 self
._log
.exception(e
)
902 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
903 self
._state
= VDURecordState
.FAILED
904 yield from self
.instantiation_failed(str(e
))
907 class VlRecordState(enum
.Enum
):
908 """ VL Record State """
910 INSTANTIATION_PENDING
= 102
912 TERMINATE_PENDING
= 104
917 class InternalVirtualLinkRecord(object):
918 """ Internal Virtual Link record """
919 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
923 self
._ivld
_msg
= ivld_msg
924 self
._vnfr
_name
= vnfr_name
925 self
._cloud
_account
_name
= cloud_account_name
927 self
._vlr
_req
= self
.create_vlr()
929 self
._state
= VlRecordState
.INIT
933 """ Find VLR by id """
934 return self
._vlr
_req
.id
938 """ Name of this VL """
939 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
942 def network_id(self
):
943 """ Find VLR by id """
944 return self
._vlr
.network_id
if self
._vlr
else None
947 """ VLR path for this VLR instance"""
948 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
950 def create_vlr(self
):
951 """ Create the VLR record which will be instantiated """
953 vld_fields
= ["short_name",
960 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
962 vlr_dict
= {"id": str(uuid
.uuid4()),
964 "cloud_account": self
._cloud
_account
_name
,
966 vlr_dict
.update(vld_copy_dict
)
968 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
972 def instantiate(self
, xact
, restart_mode
=False):
973 """ Instantiate VL """
976 def instantiate_vlr():
977 """ Instantiate VLR"""
978 self
._log
.debug("Create VL with xpath %s and vlr %s",
979 self
.vlr_path(), self
._vlr
_req
)
981 with self
._dts
.transaction(flags
=0) as xact
:
982 block
= xact
.block_create()
983 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
984 self
._log
.debug("Executing VL create path:%s msg:%s",
985 self
.vlr_path(), self
._vlr
_req
)
989 res_iter
= yield from block
.execute()
991 self
._state
= VlRecordState
.FAILED
992 self
._log
.exception("Caught exception while instantial VL")
997 self
._vlr
= res
.result
999 if self
._vlr
.operational_status
== 'failed':
1000 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1001 self
._state
= VlRecordState
.FAILED
1002 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1004 self
._log
.info("Created VL with xpath %s and vlr %s",
1005 self
.vlr_path(), self
._vlr
)
1009 """ Get the network id """
1010 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1012 for ent
in res_iter
:
1013 res
= yield from ent
1017 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1019 raise InternalVirtualLinkRecordError(err
)
1022 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1025 vl
= yield from get_vlr()
1027 yield from instantiate_vlr()
1029 yield from instantiate_vlr()
1031 self
._state
= VlRecordState
.ACTIVE
1033 def vlr_in_vns(self
):
1034 """ Is there a VLR record in VNS """
1035 if (self
._state
== VlRecordState
.ACTIVE
or
1036 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1037 self
._state
== VlRecordState
.FAILED
):
1043 def terminate(self
, xact
):
1044 """Terminate this VL """
1045 if not self
.vlr_in_vns():
1046 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1047 self
.vlr_id
, self
._state
)
1050 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1051 self
._state
= VlRecordState
.TERMINATE_PENDING
1052 block
= xact
.block_create()
1053 block
.add_query_delete(self
.vlr_path())
1054 yield from block
.execute(flags
=0, now
=True)
1055 self
._state
= VlRecordState
.TERMINATED
1056 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1059 class VirtualNetworkFunctionRecord(object):
1060 """ Virtual Network Function Record """
1061 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
):
1065 self
._cluster
_name
= cluster_name
1066 self
._vnfr
_msg
= vnfr_msg
1067 self
._vnfr
_id
= vnfr_msg
.id
1068 self
._vnfd
_id
= vnfr_msg
.vnfd_ref
1070 self
._vcs
_handler
= vcs_handler
1071 self
._vnfr
= vnfr_msg
1074 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1075 self
._state
_failed
_reason
= None
1076 self
._ext
_vlrs
= {} # The list of external virtual links
1077 self
._vlrs
= [] # The list of internal virtual links
1078 self
._vdus
= [] # The list of vdu
1079 self
._vlr
_by
_cp
= {}
1081 self
._inventory
= {}
1082 self
._create
_time
= int(time
.time())
1083 self
._vnf
_mon
= None
1084 self
._config
_status
= vnfr_msg
.config_status
1085 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1087 def _get_vdur_from_vdu_id(self
, vdu_id
):
1088 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1089 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1090 for vdu
in self
._vdus
:
1091 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1092 if vdu
.vdu_id
== vdu_id
:
1095 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1098 def operational_status(self
):
1099 """ Operational status of this VNFR """
1100 op_status_map
= {"INIT": "init",
1101 "VL_INIT_PHASE": "vl_init_phase",
1102 "VM_INIT_PHASE": "vm_init_phase",
1104 "TERMINATE": "terminate",
1105 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1106 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1107 "TERMINATED": "terminated",
1108 "FAILED": "failed", }
1109 return op_status_map
[self
._state
.name
]
1112 def vnfd_xpath(self
):
1113 """ VNFD xpath associated with this VNFR """
1114 return("C,/vnfd:vnfd-catalog/"
1115 "vnfd:vnfd[vnfd:id = '{}']".format(self
._vnfd
_id
))
1119 """ VNFD for this VNFR """
1124 """ VNFD name associated with this VNFR """
1125 return self
.vnfd
.name
1129 """ Name of this VNF in the record """
1130 return self
._vnfr
.name
1133 def cloud_account_name(self
):
1134 """ Name of the cloud account this VNFR is instantiated in """
1135 return self
._vnfr
.cloud_account
1139 """ VNFD Id associated with this VNFR """
1144 """ VNFR Id associated with this VNFR """
1145 return self
._vnfr
_id
1148 def member_vnf_index(self
):
1149 """ Member VNF index associated with this VNFR """
1150 return self
._vnfr
.member_vnf_index_ref
1153 def config_status(self
):
1154 """ Config agent status for this VNFR """
1155 return self
._config
_status
1157 def component_by_name(self
, component_name
):
1158 """ Find a component by name in the inventory list"""
1159 mangled_name
= VcsComponent
.mangle_name(component_name
,
1162 return self
._inventory
[mangled_name
]
1167 def get_nsr_config(self
):
1168 ### Need access to NS instance configuration for runtime resolution.
1169 ### This shall be replaced when deployment flavors are implemented
1170 xpath
= "C,/nsr:ns-instance-config"
1171 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1173 for result
in results
:
1174 entry
= yield from result
1175 ns_instance_config
= entry
.result
1176 for nsr
in ns_instance_config
.nsr
:
1177 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1182 def start_component(self
, component_name
, ip_addr
):
1183 """ Start a component in the VNFR by name """
1184 comp
= self
.component_by_name(component_name
)
1185 yield from comp
.start(None, None, ip_addr
)
1187 def cp_ip_addr(self
, cp_name
):
1188 """ Get ip address for connection point """
1189 self
._log
.debug("cp_ip_addr()")
1190 for cp
in self
._cprs
:
1191 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1192 return cp
.ip_address
1195 def mgmt_intf_info(self
):
1196 """ Get Management interface info for this VNFR """
1197 mgmt_intf_desc
= self
.vnfd
.msg
.mgmt_interface
1199 if mgmt_intf_desc
.has_field("cp"):
1200 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1201 elif mgmt_intf_desc
.has_field("vdu_id"):
1203 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1204 ip_addr
= vdur
.management_ip
1205 except VDURecordNotFound
:
1206 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1209 ip_addr
= mgmt_intf_desc
.ip_address
1210 port
= mgmt_intf_desc
.port
1212 return ip_addr
, port
1216 """ Message associated with this VNFR """
1217 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1218 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.msg
.as_dict().items() if k
in vnfd_fields
}
1220 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1221 ip_address
, port
= self
.mgmt_intf_info()
1223 if ip_address
is not None:
1224 mgmt_intf
.ip_address
= ip_address
1225 if port
is not None:
1226 mgmt_intf
.port
= port
1228 vnfr_dict
= {"id": self
._vnfr
_id
,
1229 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1231 "member_vnf_index_ref": self
.member_vnf_index
,
1232 "vnfd_ref": self
.vnfd_id
,
1233 "operational_status": self
.operational_status
,
1234 "operational_status_details": self
._state
_failed
_reason
,
1235 "cloud_account": self
.cloud_account_name
,
1236 "config_status": self
._config
_status
1239 vnfr_dict
.update(vnfd_copy_dict
)
1241 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1242 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1243 vnfr_msg
.mgmt_interface
= mgmt_intf
1245 # Add all the VLRs to VNFR
1246 for vlr
in self
._vlrs
:
1247 ivlr
= vnfr_msg
.internal_vlr
.add()
1248 ivlr
.vlr_ref
= vlr
.vlr_id
1250 # Add all the VDURs to VDUR
1251 if self
._vdus
is not None:
1252 for vdu
in self
._vdus
:
1253 vdur
= vnfr_msg
.vdur
.add()
1254 vdur
.from_dict(vdu
.msg
.as_dict())
1256 if self
.vnfd
.msg
.mgmt_interface
.has_field('dashboard_params'):
1257 vnfr_msg
.dashboard_url
= self
.dashboard_url
1259 for cpr
in self
._cprs
:
1260 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1261 vnfr_msg
.connection_point
.append(new_cp
)
1263 if self
._vnf
_mon
is not None:
1264 for monp
in self
._vnf
_mon
.msg
:
1265 vnfr_msg
.monitoring_param
.append(
1266 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1268 if self
._vnfr
.vnf_configuration
is not None:
1269 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1270 if (ip_address
is not None and
1271 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1272 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1274 for group
in self
._vnfr
_msg
.placement_groups_info
:
1275 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1276 group_info
.from_dict(group
.as_dict())
1277 vnfr_msg
.placement_groups_info
.append(group_info
)
1282 def dashboard_url(self
):
1283 ip
, cfg_port
= self
.mgmt_intf_info()
1286 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('https'):
1287 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.https
is True:
1290 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('port'):
1291 http_port
= self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.port
1293 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1297 path
=self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1304 """ path for this VNFR """
1305 return("D,/vnfr:vnfr-catalog"
1306 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1309 def publish(self
, xact
):
1310 """ publish this VNFR """
1312 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1313 self
.xpath
, self
.msg
)
1314 vnfr
.create_time
= self
._create
_time
1315 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1316 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1317 self
.xpath
, self
.msg
)
1320 def create_vls(self
):
1321 """ Publish The VLs associated with this VNF """
1322 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1324 for ivld_msg
in self
.vnfd
.msg
.internal_vld
:
1325 self
._log
.debug("Creating internal vld:"
1326 " %s, int_cp_ref = %s",
1327 ivld_msg
, ivld_msg
.internal_connection_point
1329 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1333 vnfr_name
=self
.name
,
1334 cloud_account_name
=self
.cloud_account_name
1336 self
._vlrs
.append(vlr
)
1338 for int_cp
in ivld_msg
.internal_connection_point
:
1339 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1340 msg
= ("Connection point %s already "
1341 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1342 raise InternalVirtualLinkRecordError(msg
)
1343 self
._log
.debug("Setting vlr %s to internal cp = %s",
1345 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1348 def instantiate_vls(self
, xact
, restart_mode
=False):
1349 """ Instantiate the VLs associated with this VNF """
1350 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1353 for vlr
in self
._vlrs
:
1354 self
._log
.debug("Instantiating VLR %s", vlr
)
1355 yield from vlr
.instantiate(xact
, restart_mode
)
1357 def find_vlr_by_cp(self
, cp_name
):
1358 """ Find the VLR associated with the cp name """
1359 return self
._vlr
_by
_cp
[cp_name
]
1361 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1363 Returns the cloud specific construct for placement group
1365 input_group: VNFD PlacementGroup
1366 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1368 copy_dict
= ['name', 'requirement', 'strategy']
1369 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1370 if group_info
.placement_group_ref
== input_group
.name
and \
1371 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1372 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1373 group_dict
= {k
:v
for k
,v
in
1374 group_info
.as_dict().items()
1375 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1376 for param
in copy_dict
:
1377 group_dict
.update({param
: getattr(input_group
, param
)})
1378 group
.from_dict(group_dict
)
1383 def get_vdu_placement_groups(self
, vdu
):
1384 placement_groups
= []
1385 ### Step-1: Get VNF level placement groups
1386 for group
in self
._vnfr
_msg
.placement_groups_info
:
1387 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1388 #group_info.from_dict(group.as_dict())
1389 placement_groups
.append(group
)
1391 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1392 nsr_config
= yield from self
.get_nsr_config()
1394 ### Step-3: Get VDU level placement groups
1395 for group
in self
.vnfd
.msg
.placement_groups
:
1396 for member_vdu
in group
.member_vdus
:
1397 if member_vdu
.member_vdu_ref
== vdu
.id:
1398 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1400 if group_info
is None:
1401 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1402 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1404 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1408 self
.member_vnf_index
)
1409 placement_groups
.append(group_info
)
1411 return placement_groups
1414 def create_vdus(self
, vnfr
, restart_mode
=False):
1415 """ Create the VDUs associated with this VNF """
1417 def get_vdur_id(vdud
):
1418 """Get the corresponding VDUR's id for the VDUD. This is useful in
1421 In restart mode we check for exiting VDUR's ID and use them, if
1422 available. This way we don't end up creating duplicate VDURs
1426 if restart_mode
and vdud
is not None:
1428 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1431 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1436 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1437 for vdu
in self
.vnfd
.msg
.vdu
:
1438 self
._log
.debug("Creating vdu: %s", vdu
)
1439 vdur_id
= get_vdur_id(vdu
)
1441 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1442 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1445 self
.member_vnf_index
,
1446 [ group
.name
for group
in placement_groups
])
1448 vdur
= VirtualDeploymentUnitRecord(
1454 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1455 cloud_account_name
=self
.cloud_account_name
,
1456 vnfd_package_store
=self
._vnfd
_package
_store
,
1458 placement_groups
= placement_groups
,
1460 yield from vdur
.vdu_opdata_register()
1462 self
._vdus
.append(vdur
)
1465 def instantiate_vdus(self
, xact
, vnfr
):
1466 """ Instantiate the VDUs associated with this VNF """
1467 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1469 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1471 # Identify any dependencies among the VDUs
1472 dependencies
= collections
.defaultdict(list)
1473 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1475 for vdu
in self
._vdus
:
1476 if vdu
.vdud_cloud_init
is not None:
1477 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1478 if vdu_id
!= vdu
.vdu_id
:
1479 # This means that vdu.vdu_id depends upon vdu_id,
1480 # i.e. vdu_id must be instantiated before
1482 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1484 # Define the terminal states of VDU instantiation
1486 VDURecordState
.READY
,
1487 VDURecordState
.TERMINATED
,
1488 VDURecordState
.FAILED
,
1491 datastore
= VdurDatastore()
1495 def instantiate_monitor(vdu
):
1496 """Monitor the state of the VDU during instantiation
1499 vdu - a VirtualDeploymentUnitRecord
1502 # wait for the VDUR to enter a terminal state
1503 while vdu
._state
not in terminal
:
1504 yield from asyncio
.sleep(1, loop
=self
._loop
)
1506 # update the datastore
1507 datastore
.update(vdu
)
1509 # add the VDU to the set of processed VDUs
1510 processed
.add(vdu
.vdu_id
)
1513 def instantiate(vdu
):
1514 """Instantiate the specified VDU
1517 vdu - a VirtualDeploymentUnitRecord
1520 if the VDU, or any of the VDUs this VDU depends upon, are
1521 terminated or fail to instantiate properly, a
1522 VirtualDeploymentUnitRecordError is raised.
1525 for dependency
in dependencies
[vdu
.vdu_id
]:
1526 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1528 while dependency
.vdu_id
not in processed
:
1529 yield from asyncio
.sleep(1, loop
=self
._loop
)
1531 if not dependency
.active
:
1532 raise VirtualDeploymentUnitRecordError()
1534 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1536 # Populate the datastore with the current values of the VDU
1539 # Substitute any variables contained in the cloud config script
1540 config
= str(vdu
.vdud_cloud_init
)
1542 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1545 # Extract the variable names
1547 for variable
in parts
[1::2]:
1548 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1550 # Iterate of the variables and substitute values from the
1552 for variable
in variables
:
1554 # Handle a reference to a VDU by ID
1555 if variable
.startswith('vdu['):
1556 value
= datastore
.get(variable
)
1558 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1559 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1561 config
= config
.replace("{{ %s }}" % variable
, value
)
1564 # Handle a reference to the current VDU
1565 if variable
.startswith('vdu'):
1566 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1567 config
= config
.replace("{{ %s }}" % variable
, value
)
1570 # Handle unrecognized variables
1571 msg
= 'unrecognized cloud-config variable: {}'
1572 raise ValueError(msg
.format(variable
))
1574 # Instantiate the VDU
1575 with self
._dts
.transaction() as xact
:
1576 self
._log
.debug("Instantiating vdu: %s", vdu
)
1577 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1578 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1579 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1582 # First create a set of tasks to monitor the state of the VDUs and
1583 # report when they have entered a terminal state
1584 for vdu
in self
._vdus
:
1585 self
._loop
.create_task(instantiate_monitor(vdu
))
1587 for vdu
in self
._vdus
:
1588 self
._loop
.create_task(instantiate(vdu
))
1590 def has_mgmt_interface(self
, vdu
):
1591 # ## TODO: Support additional mgmt_interface type options
1592 if self
.vnfd
.msg
.mgmt_interface
.vdu_id
== vdu
.id:
1596 def vlr_xpath(self
, vlr_id
):
1599 "D,/vlr:vlr-catalog/"
1600 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1602 def ext_vlr_by_id(self
, vlr_id
):
1603 """ find ext vlr by id """
1604 return self
._ext
_vlrs
[vlr_id
]
1607 def publish_inventory(self
, xact
):
1608 """ Publish the inventory associated with this VNF """
1609 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1611 for component
in self
.vnfd
.msg
.component
:
1612 self
._log
.debug("Creating inventory component %s", component
)
1613 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1617 comp
= VcsComponent(dts
=self
._dts
,
1620 cluster_name
=self
._cluster
_name
,
1621 vcs_handler
=self
._vcs
_handler
,
1622 component
=component
,
1623 mangled_name
=mangled_name
,
1625 if comp
.name
in self
._inventory
:
1626 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1627 component
, self
._vnfd
_id
)
1629 self
._log
.debug("Adding component %s for vnrf %s",
1630 comp
.name
, self
._vnfr
_id
)
1631 self
._inventory
[comp
.name
] = comp
1632 yield from comp
.publish(xact
)
1634 def all_vdus_active(self
):
1635 """ Are all VDUS in this VNFR active? """
1636 for vdu
in self
._vdus
:
1640 self
._log
.debug("Inside all_vdus_active. Returning True")
1644 def instantiation_failed(self
, failed_reason
=None):
1645 """ VNFR instantiation failed """
1646 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1647 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1648 self
._state
_failed
_reason
= failed_reason
1650 # Update the VNFR with the changed status
1651 yield from self
.publish(None)
1655 """ This VNF is ready"""
1656 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1658 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1659 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1662 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1664 # Update the VNFR with the changed status
1665 yield from self
.publish(None)
1667 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1668 """Updated the connection point with ip address"""
1669 for cp
in self
._cprs
:
1670 if cp
.name
== cp_name
:
1671 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1672 cp_name
, cp
, ip_address
, cp_id
)
1673 cp
.ip_address
= ip_address
1674 cp
.mac_address
= mac_addr
1675 cp
.connection_point_id
= cp_id
1678 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1679 self
._log
.debug(err
)
1680 raise VirtualDeploymentUnitRecordError(err
)
1682 def set_state(self
, state
):
1683 """ Set state for this VNFR"""
1687 def instantiate(self
, xact
, restart_mode
=False):
1688 """ instantiate this VNF """
1689 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1694 # Iterate over all the connection points in VNFR and fetch the
1697 def cpr_from_cp(cp
):
1698 """ Creates a record level connection point from the desciptor cp"""
1699 cp_fields
= ["name", "image", "vm-flavor"]
1700 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1702 cpr_dict
.update(cp_copy_dict
)
1703 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1705 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1706 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1708 for cp
in self
._vnfr
.connection_point
:
1709 cpr
= cpr_from_cp(cp
)
1710 self
._cprs
.append(cpr
)
1711 self
._log
.debug("Adding Connection point record %s ", cp
)
1713 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1714 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1715 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1716 rwdts
.XactFlag
.MERGE
)
1720 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1721 cpr
.vlr_ref
= cp
.vlr_ref
1722 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1724 # Fetch the VNFD associated with the VNFR
1725 self
._log
.debug("VNFR-ID %s: Fetching vnfds", self
._vnfr
_id
)
1726 self
._vnfd
= yield from self
._vnfm
.get_vnfd_ref(self
._vnfd
_id
)
1727 self
._log
.debug("VNFR-ID %s: Fetched vnfd:%s", self
._vnfr
_id
, self
._vnfd
)
1729 assert self
.vnfd
is not None
1731 # Fetch External VLRs
1732 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1733 yield from fetch_vlrs()
1736 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1737 yield from self
.publish_inventory(xact
)
1740 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1741 yield from self
.create_vls()
1744 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1745 yield from self
.publish(xact
)
1748 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1750 yield from self
.instantiate_vls(xact
, restart_mode
)
1751 except Exception as e
:
1752 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1753 yield from self
.instantiation_failed(str(e
))
1756 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1759 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1760 yield from self
.create_vdus(self
, restart_mode
)
1763 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1764 yield from self
.publish(xact
)
1767 # ToDo: Check if this should be prevented during restart
1768 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1769 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1772 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1773 yield from self
.publish(xact
)
1775 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1777 # create task updating uptime for this vnfr
1778 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1779 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1782 def terminate(self
, xact
):
1783 """ Terminate this virtual network function """
1785 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1787 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1790 if self
._vnf
_mon
is not None:
1791 self
._vnf
_mon
.stop()
1792 self
._vnf
_mon
.deregister()
1793 self
._vnf
_mon
= None
1796 def terminate_vls():
1797 """ Terminate VLs in this VNF """
1798 for vl
in self
._vlrs
:
1799 yield from vl
.terminate(xact
)
1802 def terminate_vdus():
1803 """ Terminate VDUS in this VNF """
1804 for vdu
in self
._vdus
:
1805 yield from vdu
.terminate(xact
)
1807 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1808 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1809 yield from terminate_vls()
1811 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1812 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1813 yield from terminate_vdus()
1815 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1816 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1819 def vnfr_uptime_update(self
, xact
):
1821 # Return when vnfr state is FAILED or TERMINATED etc
1822 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1823 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1824 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1825 VirtualNetworkFunctionRecordState
.READY
]:
1827 yield from self
.publish(xact
)
1828 yield from asyncio
.sleep(2, loop
=self
._loop
)
1832 class VnfdDtsHandler(object):
1833 """ DTS handler for VNFD config changes """
1834 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1836 def __init__(self
, dts
, log
, loop
, vnfm
):
1845 """ DTS registration handle """
1850 """ Register for VNFD configuration"""
1852 def on_apply(dts
, acg
, xact
, action
, scratch
):
1853 """Apply the configuration"""
1854 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1855 xact
, action
, scratch
)
1857 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1858 # Create/Update a VNFD record
1859 for cfg
in self
._regh
.get_xact_elements(xact
):
1860 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1861 if cfg
.id in scratch
.get('vnfds', []) or is_recovery
:
1862 self
._vnfm
.update_vnfd(cfg
)
1864 scratch
.pop('vnfds', None)
1867 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1868 """ on prepare callback """
1869 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1870 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1871 fref
= ProtobufC
.FieldReference
.alloc()
1872 fref
.goto_whole_message(msg
.to_pbcm())
1874 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1875 if fref
.is_field_deleted():
1876 # Delete an VNFD record
1877 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1878 if self
._vnfm
.vnfd_in_use(msg
.id):
1879 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1880 err
= "Cannot delete a VNFD in use - %s" % msg
1881 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1882 # Delete a VNFD record
1883 yield from self
._vnfm
.delete_vnfd(msg
.id)
1885 # Handle actual adds/updates in apply_callback,
1886 # just check if VNFD in use in prepare_callback
1887 if self
._vnfm
.vnfd_in_use(msg
.id):
1888 self
._log
.debug("Cannot modify an VNFD in use - %s", msg
)
1889 err
= "Cannot modify an VNFD in use - %s" % msg
1890 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1892 # Add this VNFD to scratch to create/update in apply callback
1893 vnfds
= scratch
.setdefault('vnfds', [])
1894 vnfds
.append(msg
.id)
1896 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1899 "Registering for VNFD config using xpath: %s",
1900 VnfdDtsHandler
.XPATH
,
1902 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1903 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1904 self
._regh
= acg
.register(
1905 xpath
=VnfdDtsHandler
.XPATH
,
1906 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1907 on_prepare
=on_prepare
)
1910 class VcsComponentDtsHandler(object):
1911 """ Vcs Component DTS handler """
1912 XPATH
= ("D,/rw-manifest:manifest" +
1913 "/rw-manifest:operational-inventory" +
1914 "/rw-manifest:component")
1916 def __init__(self
, dts
, log
, loop
, vnfm
):
1925 """ DTS registration handle """
1930 """ Registers VCS component dts publisher registration"""
1931 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
1932 VcsComponentDtsHandler
.XPATH
)
1934 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1935 handlers
= rift
.tasklets
.Group
.Handler()
1936 with self
._dts
.group_create(handler
=handlers
) as group
:
1937 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
1939 flags
=(rwdts
.Flag
.PUBLISHER |
1940 rwdts
.Flag
.NO_PREP_READ |
1941 rwdts
.Flag
.DATASTORE
),)
1944 def publish(self
, xact
, path
, msg
):
1945 """ Publishes the VCS component """
1946 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
1948 self
.regh
.create_element(path
, msg
)
1949 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1950 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
1952 class VnfrConsoleOperdataDtsHandler(object):
1953 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1955 def vnfr_vdu_console_xpath(self
):
1956 """ path for resource-mgr"""
1957 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
1959 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
1966 self
._vnfr
_id
= vnfr_id
1967 self
._vdur
_id
= vdur_id
1968 self
._vdu
_id
= vdu_id
1972 """ Register for VNFR VDU Operational Data read from dts """
1975 def on_prepare(xact_info
, action
, ks_path
, msg
):
1976 """ prepare callback from dts """
1977 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
1979 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1980 xact_info
, action
, xpath
, msg
1983 if action
== rwdts
.QueryAction
.READ
:
1984 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
1985 path_entry
= schema
.keyspec_to_entry(ks_path
)
1986 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
1988 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
1989 except VnfRecordError
as e
:
1990 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
1991 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1994 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
1995 if not vdur
._state
== VDURecordState
.READY
:
1996 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
1997 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1999 with self
._dts
.transaction() as new_xact
:
2000 resp
= yield from vdur
.read_resource(new_xact
)
2001 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2002 vdur_console
.id = self
._vdur
_id
2003 if resp
.console_url
:
2004 vdur_console
.console_url
= resp
.console_url
2006 vdur_console
.console_url
= 'none'
2007 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2009 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2010 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2011 vdur_console
.id = self
._vdur
_id
2012 vdur_console
.console_url
= 'none'
2014 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2015 xpath
=self
.vnfr_vdu_console_xpath
,
2018 #raise VnfRecordError("Not supported operation %s" % action)
2019 self
._log
.error("Not supported operation %s" % action
)
2020 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2024 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2025 self
.vnfr_vdu_console_xpath
)
2026 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2027 with self
._dts
.group_create() as group
:
2028 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2030 flags
=rwdts
.Flag
.PUBLISHER
,
2034 class VnfrDtsHandler(object):
2035 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2036 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2038 def __init__(self
, dts
, log
, loop
, vnfm
):
2048 """ Return registration handle"""
2053 """ Return VNF manager instance """
2058 """ Register for vnfr create/update/delete/read requests from dts """
2059 def on_commit(xact_info
):
2060 """ The transaction has been committed """
2061 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2062 return rwdts
.MemberRspCode
.ACTION_OK
2064 def on_abort(*args
):
2065 """ Abort callback """
2066 self
._log
.debug("VNF transaction got aborted")
2069 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2072 def instantiate_realloc_vnfr(vnfr
):
2073 """Re-populate the vnfm after restart
2080 yield from vnfr
.instantiate(None, restart_mode
=True)
2082 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2083 curr_cfg
= self
.regh
.elements
2084 for cfg
in curr_cfg
:
2085 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2086 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2088 self
._log
.debug("Got on_event in vnfm")
2090 return rwdts
.MemberRspCode
.ACTION_OK
2093 def on_prepare(xact_info
, action
, ks_path
, msg
):
2094 """ prepare callback from dts """
2096 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2097 xact_info
, action
, msg
2100 if action
== rwdts
.QueryAction
.CREATE
:
2101 if not msg
.has_field("vnfd_ref"):
2102 err
= "Vnfd reference not provided"
2103 self
._log
.error(err
)
2104 raise VnfRecordError(err
)
2106 vnfr
= self
.vnfm
.create_vnfr(msg
)
2108 # RIFT-9105: Unable to add a READ query under an existing transaction
2109 # xact = xact_info.xact
2110 yield from vnfr
.instantiate(None)
2111 except Exception as e
:
2112 self
._log
.exception(e
)
2113 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2114 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2115 yield from vnfr
.publish(None)
2116 elif action
== rwdts
.QueryAction
.DELETE
:
2117 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2118 path_entry
= schema
.keyspec_to_entry(ks_path
)
2119 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2122 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2123 raise VirtualNetworkFunctionRecordNotFound(
2124 "VNFR id %s", path_entry
.key00
.id)
2127 yield from vnfr
.terminate(xact_info
.xact
)
2130 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2131 except Exception as e
:
2132 self
._log
.exception(e
)
2133 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2135 elif action
== rwdts
.QueryAction
.UPDATE
:
2136 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2137 path_entry
= schema
.keyspec_to_entry(ks_path
)
2140 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2141 except Exception as e
:
2142 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2143 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2147 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2148 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2151 self
._log
.debug("VNFR {} update config status {} (current {})".
2152 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2153 # Update the config status and publish
2154 vnfr
._config
_status
= msg
.config_status
2155 yield from vnfr
.publish(None)
2158 raise NotImplementedError(
2159 "%s action on VirtualNetworkFunctionRecord not supported",
2162 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2164 self
._log
.debug("Registering for VNFR using xpath: %s",
2165 VnfrDtsHandler
.XPATH
,)
2167 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2168 on_prepare
=on_prepare
,)
2169 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2170 with self
._dts
.group_create(handler
=handlers
) as group
:
2171 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2173 flags
=(rwdts
.Flag
.PUBLISHER |
2174 rwdts
.Flag
.NO_PREP_READ |
2176 rwdts
.Flag
.DATASTORE
),)
2179 def create(self
, xact
, path
, msg
):
2181 Create a VNFR record in DTS with path and message
2183 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2186 self
.regh
.create_element(path
, msg
)
2187 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2191 def update(self
, xact
, path
, msg
):
2193 Update a VNFR record in DTS with path and message
2195 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2197 self
.regh
.update_element(path
, msg
)
2198 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2202 def delete(self
, xact
, path
):
2204 Delete a VNFR record in DTS with path and message
2206 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2207 self
.regh
.delete_element(path
)
2208 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2211 class VirtualNetworkFunctionDescriptor(object):
2213 Virtual Network Function descriptor class
2216 def __init__(self
, dts
, log
, loop
, vnfm
, vnfd
):
2226 def ref_count(self
):
2227 """ Returns the reference count associated with
2228 this Virtual Network Function Descriptor"""
2229 return self
._ref
_count
2233 """ Returns vnfd id """
2234 return self
._vnfd
.id
2238 """ Returns vnfd name """
2239 return self
._vnfd
.name
2242 """ Returns whether vnfd is in use or not """
2243 return True if self
._ref
_count
> 0 else False
2246 """ Take a reference on this object """
2247 self
._ref
_count
+= 1
2248 return self
._ref
_count
2251 """ Release reference on this object """
2252 if self
.ref_count
< 1:
2253 msg
= ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2254 (self
.id, self
._ref
_count
))
2255 self
._log
.critical(msg
)
2256 raise VnfRecordError(msg
)
2257 self
._log
.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2258 self
.id, self
.ref_count
)
2259 self
._ref
_count
-= 1
2260 return self
._ref
_count
2264 """ Return the message associated with this NetworkServiceDescriptor"""
2268 def path_for_id(vnfd_id
):
2269 """ Return path for the passed vnfd_id"""
2270 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
2273 """ Return the path associated with this NetworkServiceDescriptor"""
2274 return VirtualNetworkFunctionDescriptor
.path_for_id(self
.id)
2276 def update(self
, vnfd
):
2277 """ Update the Virtual Network Function Descriptor """
2279 self
._log
.error("Cannot update descriptor %s in use refcnt=%d",
2280 self
.id, self
.ref_count
)
2282 # The following loop is added to debug RIFT-13284
2283 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2284 if vnf_rec
.vnfd_id
== self
.id:
2285 self
._log
.error("descriptor %s in used by %s:%s",
2286 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2287 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self
.id)
2291 """ Delete the Virtual Network Function Descriptor """
2293 self
._log
.error("Cannot delete descriptor %s in use refcnt=%d",
2296 # The following loop is added to debug RIFT-13284
2297 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2298 if vnf_rec
.vnfd_id
== self
.id:
2299 self
._log
.error("descriptor %s in used by %s:%s",
2300 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2301 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self
.id)
2302 self
._vnfm
.delete_vnfd(self
.id)
2305 class VnfdRefCountDtsHandler(object):
2306 """ The VNFD Ref Count DTS handler """
2307 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2309 def __init__(self
, dts
, log
, loop
, vnfm
):
2319 """ Return registration handle """
2324 """ Return the NS manager instance """
2329 """ Register for VNFD ref count read from dts """
2332 def on_prepare(xact_info
, action
, ks_path
, msg
):
2333 """ prepare callback from dts """
2334 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2336 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2337 xact_info
, action
, xpath
, msg
2340 if action
== rwdts
.QueryAction
.READ
:
2341 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2342 path_entry
= schema
.keyspec_to_entry(ks_path
)
2343 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2344 for xpath
, msg
in vnfd_list
:
2345 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2347 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2350 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2352 raise VnfRecordError("Not supported operation %s" % action
)
2354 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2355 with self
._dts
.group_create() as group
:
2356 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2358 flags
=rwdts
.Flag
.PUBLISHER
,
2362 class VdurDatastore(object):
2364 This VdurDatastore is intended to expose select information about a VDUR
2365 such that it can be referenced in a cloud config file. The data that is
2366 exposed does not necessarily follow the structure of the data in the yang
2367 model. This is intentional. The data that are exposed are intended to be
2368 agnostic of the yang model so that changes in the model do not necessarily
2369 require changes to the interface provided to the user. It also means that
2370 the user does not need to be familiar with the RIFT.ware yang models.
2374 """Create an instance of VdurDatastore"""
2375 self
._vdur
_data
= dict()
2376 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2378 def add(self
, vdur
):
2379 """Add a new VDUR to the datastore
2382 vdur - a VirtualDeploymentUnitRecord instance
2385 A ValueError is raised if the VDUR is (1) None or (2) already in
2389 if vdur
.vdu_id
is None:
2390 raise ValueError('VDURs are required to have an ID')
2392 if vdur
.vdu_id
in self
._vdur
_data
:
2393 raise ValueError('cannot add a VDUR more than once')
2395 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2397 def set_if_not_none(key
, attr
):
2398 if attr
is not None:
2399 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2401 set_if_not_none('name', vdur
._vdud
.name
)
2402 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2404 def update(self
, vdur
):
2405 """Update the VDUR information in the datastore
2408 vdur - a GI representation of a VDUR
2411 A ValueError is raised if the VDUR is (1) None or (2) already in
2415 if vdur
.vdu_id
is None:
2416 raise ValueError('VNFDs are required to have an ID')
2418 if vdur
.vdu_id
not in self
._vdur
_data
:
2419 raise ValueError('VNF is not recognized')
2421 def set_or_delete(key
, attr
):
2423 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2424 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2427 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2429 set_or_delete('name', vdur
._vdud
.name
)
2430 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2432 def remove(self
, vdur_id
):
2433 """Remove all of the data associated with specified VDUR
2436 vdur_id - the identifier of a VNFD in the datastore
2439 A ValueError is raised if the VDUR is not contained in the
2443 if vdur_id
not in self
._vdur
_data
:
2444 raise ValueError('VNF is not recognized')
2446 del self
._vdur
_data
[vdur_id
]
2448 def get(self
, expr
):
2449 """Retrieve VDUR information from the datastore
2451 An expression should be of the form,
2455 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2456 the exposed attribute that the user wishes to retrieve.
2458 If the requested data is not available, None is returned.
2461 expr - a string that specifies the data to return
2464 A ValueError is raised if the provided expression cannot be parsed.
2467 The requested data or None
2470 result
= self
._pattern
.match(expr
)
2472 raise ValueError('data expression not recognized ({})'.format(expr
))
2474 vdur_id
, key
= result
.groups()
2476 if vdur_id
not in self
._vdur
_data
:
2479 return self
._vdur
_data
[vdur_id
].get(key
, None)
2482 class VnfManager(object):
2483 """ The virtual network function manager class """
2484 def __init__(self
, dts
, log
, loop
, cluster_name
):
2488 self
._cluster
_name
= cluster_name
2490 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2491 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2493 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2496 VnfdRefCountDtsHandler(dts
, log
, loop
, self
)]
2501 def vnfr_handler(self
):
2502 """ VNFR dts handler """
2503 return self
._vnfr
_handler
2506 def vcs_handler(self
):
2507 """ VCS dts handler """
2508 return self
._vcs
_handler
2512 """ Register all static DTS handlers """
2513 for hdl
in self
._dts
_handlers
:
2514 yield from hdl
.register()
2518 """ Run this VNFM instance """
2519 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2520 yield from self.register()
2522 def get_vnfr(self, vnfr_id):
2523 """ get VNFR by vnfr id """
2525 if vnfr_id not in self._vnfrs:
2526 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2528 return self._vnfrs[vnfr_id]
2530 def create_vnfr(self, vnfr):
2531 """ Create a VNFR instance """
2532 if vnfr.id in self._vnfrs:
2533 msg = "Vnfr
id %s already exists
" % vnfr.id
2534 self._log.error(msg)
2535 raise VnfRecordError(msg)
2537 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2541 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2542 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr
2544 return self._vnfrs[vnfr.id]
2547 def delete_vnfr(self, xact, vnfr):
2548 """ Create a VNFR instance """
2549 if vnfr.vnfr_id in self._vnfrs:
2550 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2551 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2552 del self._vnfrs[vnfr.vnfr_id]
2555 def fetch_vnfd(self, vnfd_id):
2556 """ Fetch VNFDs based with the vnfd id"""
2557 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2558 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2561 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2563 for ent in res_iter:
2564 res = yield from ent
2568 err = "Failed to get Vnfd
%s" % vnfd_id
2569 self._log.error(err)
2570 raise VnfRecordError(err)
2572 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2577 def get_vnfd_ref(self, vnfd_id):
2578 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2579 vnfd = yield from self.get_vnfd(vnfd_id)
2584 def get_vnfd(self, vnfd_id):
2585 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2587 if vnfd_id not in self._vnfds:
2588 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2589 vnfd = yield from self.fetch_vnfd(vnfd_id)
2592 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2593 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD
id:%s", vnfd_id)
2595 if vnfd.id != vnfd_id:
2596 self._log.error("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2597 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2599 if vnfd.id not in self._vnfds:
2600 self.create_vnfd(vnfd)
2602 return self._vnfds[vnfd_id]
2604 def vnfd_in_use(self, vnfd_id):
2605 """ Is this VNFD in use """
2606 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2607 if vnfd_id in self._vnfds:
2608 return self._vnfds[vnfd_id].in_use()
2612 def publish_vnfr(self, xact, path, msg):
2613 """ Publish a VNFR """
2614 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2616 yield from self.vnfr_handler.update(xact, path, msg)
2618 def create_vnfd(self, vnfd):
2619 """ Create a virtual network function descriptor """
2620 self._log.debug("Create virtual networkfunction descriptor
- %s", vnfd)
2621 if vnfd.id in self._vnfds:
2622 self._log.error("Cannot create VNFD
%s -VNFD
id already exists
", vnfd)
2623 raise VirtualNetworkFunctionDescriptorError("VNFD already exists
-%s", vnfd.id)
2625 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2630 return self._vnfds[vnfd.id]
2632 def update_vnfd(self, vnfd):
2633 """ update the Virtual Network Function descriptor """
2634 self._log.debug("Update virtual network function descriptor
- %s", vnfd)
2636 if vnfd.id not in self._vnfds:
2637 self._log.debug("No VNFD found
- creating VNFD
id = %s", vnfd.id)
2638 self.create_vnfd(vnfd)
2640 self._log.debug("Updating VNFD
id = %s, vnfd
= %s", vnfd.id, vnfd)
2641 self._vnfds[vnfd.id].update(vnfd)
2644 def delete_vnfd(self, vnfd_id):
2645 """ Delete the Virtual Network Function descriptor with the passed id """
2646 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2647 if vnfd_id not in self._vnfds:
2648 self._log.debug("Delete VNFD failed
- cannot find vnfd
-id %s", vnfd_id)
2649 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find
%s", vnfd_id)
2651 if self._vnfds[vnfd_id].in_use():
2652 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2654 self._vnfds[vnfd_id].ref_count)
2655 raise VirtualNetworkFunctionDescriptorRefCountExists(
2656 "Cannot delete
:%s, ref_count
:%s",
2658 self._vnfds[vnfd_id].ref_count)
2660 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2662 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2663 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2664 if os.path.exists(vnfd_dir):
2665 shutil.rmtree(vnfd_dir, ignore_errors=True)
2666 except Exception as e:
2667 self._log.error("Exception in cleaning up VNFD
{}: {}".
2668 format(self._vnfds[vnfd_id].name, e))
2669 self._log.exception(e)
2671 del self._vnfds[vnfd_id]
2673 def vnfd_refcount_xpath(self, vnfd_id):
2674 """ xpath for ref count entry """
2675 return (VnfdRefCountDtsHandler.XPATH +
2676 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2679 def get_vnfd_refcount(self, vnfd_id):
2680 """ Get the vnfd_list from this VNFM"""
2682 if vnfd_id is None or vnfd_id == "":
2683 for vnfd in self._vnfds.values():
2684 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2685 vnfd_msg.vnfd_id_ref = vnfd.id
2686 vnfd_msg.instance_ref_count = vnfd.ref_count
2687 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2688 elif vnfd_id in self._vnfds:
2689 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2690 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2691 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2696 class VnfmTasklet(rift.tasklets.Tasklet):
2697 """ VNF Manager tasklet class """
2698 def __init__(self, *args, **kwargs):
2699 super(VnfmTasklet, self).__init__(*args, **kwargs)
2700 self.rwlog.set_category("rw
-mano
-log
")
2701 self.rwlog.set_subcategory("vnfm
")
2708 super(VnfmTasklet, self).start()
2709 self.log.info("Starting VnfmTasklet
")
2711 self.log.setLevel(logging.DEBUG)
2713 self.log.debug("Registering with dts
")
2714 self._dts = rift.tasklets.DTS(self.tasklet_info,
2715 RwVnfmYang.get_schema(),
2717 self.on_dts_state_change)
2719 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2721 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2724 def on_instance_started(self):
2725 """ Task insance started callback """
2726 self.log.debug("Got instance started callback
")
2732 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2737 """ Task init callback """
2739 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2740 assert vm_parent_name is not None
2741 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2742 yield from self._vnfm.run()
2744 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2749 """ Task run callback """
2753 def on_dts_state_change(self, state):
2754 """Take action according to current dts state to transition
2755 application into the corresponding application state
2758 state - current dts state
2761 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2762 rwdts.State.CONFIG: rwdts.State.RUN,
2766 rwdts.State.INIT: self.init,
2767 rwdts.State.RUN: self.run,
2770 # Transition application to next state
2771 handler = handlers.get(state, None)
2772 if handler is not None:
2773 yield from handler()
2775 # Transition dts to next state
2776 next_state = switch.get(state, None)
2777 if next_state is not None:
2778 self._dts.handle.set_state(next_state)