2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.mano
.dts
as mano_dts
55 class VMResourceError(Exception):
56 """ VM resource Error"""
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
70 class NotImplemented(Exception):
71 """Not implemented """
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
135 class VNFMPlacementGroupError(Exception):
138 class VirtualNetworkFunctionRecordState(enum
.Enum
):
145 VL_TERMINATE_PHASE
= 6
146 VDU_TERMINATE_PHASE
= 7
151 class VDURecordState(enum
.Enum
):
152 """VDU record state """
155 RESOURCE_ALLOC_PENDING
= 3
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
168 self
._component
= component
169 self
._cluster
_name
= cluster_name
170 self
._vcs
_handler
= vcs_handler
171 self
._mangled
_name
= mangled_name
174 def mangle_name(component_name
, vnf_name
, vnfd_id
):
175 """ mangled component name """
176 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
180 """ name of this component"""
181 return self
._mangled
_name
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self
.name
)
192 def instance_xpath(self
):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
197 "[instance-name = '{}']".format(self
._cluster
_name
))
200 def start_comp_xpath(self
):
201 """ start component xpath """
202 return (self
.instance_xpath
+
203 "/child-n[instance-name = 'START-REQ']")
205 def get_start_comp_msg(self
, ip_address
):
206 """ start this component """
207 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
208 start_msg
.instance_name
= 'START-REQ'
209 start_msg
.component_name
= self
.name
210 start_msg
.admin_command
= "START"
211 start_msg
.ip_address
= ip_address
217 """ Returns the message for this vcs component"""
219 vcs_comp_dict
= self
._component
.as_dict()
221 def mangle_comp_names(comp_dict
):
222 """ mangle component name with VNF name, id"""
223 for key
, val
in comp_dict
.items():
224 if isinstance(val
, dict):
225 comp_dict
[key
] = mangle_comp_names(val
)
226 elif isinstance(val
, list):
229 if isinstance(ent
, dict):
230 val
[i
] = mangle_comp_names(ent
)
234 elif key
== "component_name":
235 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
240 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
241 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
245 def publish(self
, xact
):
246 """ Publishes the VCS component """
247 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self
.name
, self
.path
, self
.msg
)
249 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
252 def start(self
, xact
, parent
, ip_addr
=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg
= self
.get_start_comp_msg(ip_addr
)
256 self
._log
.debug("starting component %s %s",
257 self
.start_comp_xpath
, start_msg
)
258 yield from self
._dts
.query_create(self
.start_comp_xpath
,
261 self
._log
.debug("started component %s, %s",
262 self
.start_comp_xpath
, start_msg
)
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
278 placement_groups
=[]):
284 self
._mgmt
_intf
= mgmt_intf
285 self
._cloud
_account
_name
= cloud_account_name
286 self
._vnfd
_package
_store
= vnfd_package_store
287 self
._mgmt
_network
= mgmt_network
289 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
292 self
._state
= VDURecordState
.INIT
293 self
._state
_failed
_reason
= None
294 self
._request
_id
= str(uuid
.uuid4())
295 self
._name
= vnfr
.name
+ "__" + vdud
.id
296 self
._placement
_groups
= placement_groups
299 self
._vdud
_cloud
_init
= None
300 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
303 def vdu_opdata_register(self
):
304 yield from self
._vdur
_console
_handler
.register()
306 def cp_ip_addr(self
, cp_name
):
307 """ Find ip address by connection point name """
308 if self
._vm
_resp
is not None:
309 for conn_point
in self
._vm
_resp
.connection_points
:
310 if conn_point
.name
== cp_name
:
311 return conn_point
.ip_address
314 def cp_id(self
, cp_name
):
315 """ Find connection point id by connection point name """
316 if self
._vm
_resp
is not None:
317 for conn_point
in self
._vm
_resp
.connection_points
:
318 if conn_point
.name
== cp_name
:
319 return conn_point
.connection_point_id
332 """ Return this VDUR's name """
336 def cloud_account_name(self
):
337 """ Cloud account this VDU should be created in """
338 return self
._cloud
_account
_name
341 def image_name(self
):
342 """ name that should be used to lookup the image on the CMP """
343 if 'image' not in self
._vdud
:
345 return os
.path
.basename(self
._vdud
.image
)
348 def image_checksum(self
):
349 """ name that should be used to lookup the image on the CMP """
350 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
353 def management_ip(self
):
356 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
359 def vm_management_ip(self
):
362 return self
._vm
_resp
.management_ip
365 def operational_status(self
):
366 """ Operational status of this VDU"""
367 op_stats_dict
= {"INIT": "init",
368 "INSTANTIATING": "vm_init_phase",
369 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
372 "TERMINATING": "terminated",
373 "TERMINATED": "terminated",
375 return op_stats_dict
[self
._state
.name
]
380 vdu_fields
= ["vm_flavor",
387 vdu_copy_dict
= {k
: v
for k
, v
in
388 self
._vdud
.as_dict().items() if k
in vdu_fields
}
389 vdur_dict
= {"id": self
._vdur
_id
,
390 "vdu_id_ref": self
._vdud
.id,
391 "operational_status": self
.operational_status
,
392 "operational_status_details": self
._state
_failed
_reason
,
394 if self
.vm_resp
is not None:
395 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
396 "flavor_id": self
.vm_resp
.flavor_id
398 if self
._vm
_resp
.has_field('image_id'):
399 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
401 if self
.management_ip
is not None:
402 vdur_dict
["management_ip"] = self
.management_ip
404 if self
.vm_management_ip
is not None:
405 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
407 vdur_dict
.update(vdu_copy_dict
)
409 if self
.vm_resp
is not None:
410 if self
._vm
_resp
.has_field('volumes'):
411 for opvolume
in self
._vm
_resp
.volumes
:
412 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
413 if len(vdurvol_data
) == 1:
414 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
419 for intf
, cp_id
, vlr
in self
._int
_intf
:
420 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
422 icp_list
.append({"name": cp
.name
,
424 "type_yang": "VPORT",
425 "ip_address": self
.cp_ip_addr(cp
.name
)})
427 ii_list
.append({"name": intf
.name
,
428 "vdur_internal_connection_point_ref": cp
.id,
429 "virtual_interface": {}})
431 vdur_dict
["internal_connection_point"] = icp_list
432 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
433 vdur_dict
["internal_interface"] = ii_list
436 for intf
, cp
, vlr
in self
._ext
_intf
:
437 ei_list
.append({"name": cp
.name
,
438 "vnfd_connection_point_ref": cp
.name
,
439 "virtual_interface": {}})
440 self
._vnfr
.update_cp(cp
.name
, self
.cp_ip_addr(cp
.name
), self
.cp_id(cp
.name
))
442 vdur_dict
["external_interface"] = ei_list
444 placement_groups
= []
445 for group
in self
._placement
_groups
:
446 placement_groups
.append(group
.as_dict())
447 vdur_dict
['placement_groups_info'] = placement_groups
449 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
452 def resmgr_path(self
):
453 """ path for resource-mgr"""
454 return ("D,/rw-resource-mgr:resource-mgmt" +
456 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
459 def vm_flavor_msg(self
):
460 """ VM flavor message """
461 flavor
= self
._vdud
.vm_flavor
.__class
__()
462 flavor
.copy_from(self
._vdud
.vm_flavor
)
467 def vdud_cloud_init(self
):
468 """ Return the cloud-init contents for the VDU """
469 if self
._vdud
_cloud
_init
is None:
470 self
._vdud
_cloud
_init
= self
.cloud_init()
472 return self
._vdud
_cloud
_init
474 def cloud_init(self
):
475 """ Populate cloud_init with cloud-config script from
476 either the inline contents or from the file provided
478 if self
._vdud
.cloud_init
is not None:
479 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
480 return self
._vdud
.cloud_init
481 elif self
._vdud
.cloud_init_file
is not None:
482 # Get cloud-init script contents from the file provided in the cloud_init_file param
483 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
484 filename
= self
._vdud
.cloud_init_file
485 self
._vnfd
_package
_store
.refresh()
486 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
487 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
489 return cloud_init_extractor
.read_script(stored_package
, filename
)
490 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
491 raise VirtualDeploymentUnitRecordError(e
)
493 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
495 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
497 availability_zones
= []
499 for group
in self
._placement
_groups
:
500 if group
.has_field('host_aggregate'):
501 for aggregate
in group
.host_aggregate
:
502 host_aggregates
.append(aggregate
.as_dict())
503 if group
.has_field('availability_zone'):
504 availability_zones
.append(group
.availability_zone
.as_dict())
505 if group
.has_field('server_group'):
506 server_groups
.append(group
.server_group
.as_dict())
508 if availability_zones
:
509 if len(availability_zones
) > 1:
510 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
511 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
513 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
516 if len(server_groups
) > 1:
517 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
518 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
520 vm_create_msg_dict
['server_group'] = server_groups
[0]
523 vm_create_msg_dict
['host_aggregate'] = host_aggregates
527 def process_placement_groups(self
, vm_create_msg_dict
):
528 """Process the placement_groups and fill resource-mgr request"""
529 if not self
._placement
_groups
:
532 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
533 assert len(cloud_set
) == 1
534 cloud_type
= cloud_set
.pop()
536 if cloud_type
== 'openstack':
537 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
540 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
543 def resmgr_msg(self
, config
=None):
544 vdu_fields
= ["vm_flavor",
550 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
551 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
553 vm_create_msg_dict
= {
557 if self
.image_name
is not None:
558 vm_create_msg_dict
["image_name"] = self
.image_name
560 if self
.image_checksum
is not None:
561 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
563 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
564 if self
._vdud
.has_field('mgmt_vpci'):
565 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
567 self
._log
.debug("VDUD: %s", self
._vdud
)
568 if config
is not None:
569 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
571 if self
._mgmt
_network
:
572 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
575 for intf
, cp
, vlr
in self
._ext
_intf
:
576 cp_info
= {"name": cp
.name
,
577 "virtual_link_id": vlr
.network_id
,
578 "type_yang": intf
.virtual_interface
.type_yang
}
581 if cp
.static_ip_address
:
582 cp_info
["static_ip_address"] = cp
.static_ip_address
583 except AttributeError as e
:
586 if (intf
.virtual_interface
.has_field('vpci') and
587 intf
.virtual_interface
.vpci
is not None):
588 cp_info
["vpci"] = intf
.virtual_interface
.vpci
590 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
591 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
593 self
._log
.debug("External CP info {}".format(cp_info
))
594 cp_list
.append(cp_info
)
596 for intf
, cp_id
, vlr
in self
._int
_intf
:
597 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
599 cp_dict
= {"name": cp_id
,
600 "virtual_link_id": vlr
.network_id
,
601 "type_yang": intf
.virtual_interface
.type_yang
}
603 if (intf
.virtual_interface
.has_field('vpci') and
604 intf
.virtual_interface
.vpci
is not None):
605 cp_dict
["vpci"] = intf
.virtual_interface
.vpci
608 if cp
.static_ip_address
:
609 cp_dict
["static_ip_address"] = cp
.static_ip_address
610 except AttributeError as e
:
613 self
._log
.debug("Internal CP info {}".format(cp_info
))
614 cp_list
.append(cp_dict
)
616 vm_create_msg_dict
["connection_points"] = cp_list
617 vm_create_msg_dict
.update(vdu_copy_dict
)
619 self
.process_placement_groups(vm_create_msg_dict
)
621 msg
= RwResourceMgrYang
.VDUEventData()
622 msg
.event_id
= self
._request
_id
623 msg
.cloud_account
= self
.cloud_account_name
624 msg
.request_info
.from_dict(vm_create_msg_dict
)
626 for volume
in self
._vdud
.volumes
:
627 v
= msg
.request_info
.volumes
.add()
628 v
.from_dict(volume
.as_dict())
632 def terminate(self
, xact
):
633 """ Delete resource in VIM """
634 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
635 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
638 self
._state
= VDURecordState
.TERMINATING
639 if self
._vm
_resp
is not None:
641 with self
._dts
.transaction() as new_xact
:
642 yield from self
.delete_resource(new_xact
)
644 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
646 if self
._rm
_regh
is not None:
647 self
._log
.debug("Deregistering resource manager registration handle")
648 self
._rm
_regh
.deregister()
651 if self
._vdur
_console
_handler
is not None:
652 self
._log
.error("Deregistering vnfr vdur registration handle")
653 self
._vdur
_console
_handler
._regh
.deregister()
654 self
._vdur
_console
_handler
._regh
= None
656 self
._state
= VDURecordState
.TERMINATED
658 def find_internal_cp_by_cp_id(self
, cp_id
):
659 """ Find the CP corresponding to the connection point id"""
662 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
665 for int_cp
in self
._vdud
.internal_connection_point
:
666 self
._log
.debug("Checking for int cp %s in internal connection points",
668 if int_cp
.id == cp_id
:
673 self
._log
.debug("Failed to find cp %s in internal connection points",
675 msg
= "Failed to find cp %s in internal connection points" % cp_id
676 raise VduRecordError(msg
)
678 # return the VLR associated with the connection point
682 def create_resource(self
, xact
, vnfr
, config
=None):
683 """ Request resource from ResourceMgr """
684 def find_cp_by_name(cp_name
):
685 """ Find a connection point by name """
687 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
688 for ext_cp
in vnfr
._cprs
:
689 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
690 if ext_cp
.name
== cp_name
:
694 self
._log
.debug("Failed to find cp %s in external connection points",
698 def find_internal_vlr_by_cp_id(cp_id
):
699 self
._log
.debug("find_internal_vlr_by_cp_id(%s) called",
703 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
705 # return the VLR associated with the connection point
706 return vnfr
.find_vlr_by_cp(cp_id
)
708 block
= xact
.block_create()
710 self
._log
.debug("Executing vm request id: %s, action: create",
713 # Resolve the networks associated external interfaces
714 for ext_intf
in self
._vdud
.external_interface
:
715 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
716 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
717 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
719 self
._log
.debug("Failed to find connection point - %s",
720 ext_intf
.vnfd_connection_point_ref
)
722 self
._log
.debug("Connection point name [%s], type[%s]",
723 cp
.name
, cp
.type_yang
)
725 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
727 etuple
= (ext_intf
, cp
, vlr
)
728 self
._ext
_intf
.append(etuple
)
730 self
._log
.debug("Created external interface tuple : %s", etuple
)
732 # Resolve the networks associated internal interfaces
733 for intf
in self
._vdud
.internal_interface
:
734 cp_id
= intf
.vdu_internal_connection_point_ref
735 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
739 vlr
= find_internal_vlr_by_cp_id(cp_id
)
740 except Exception as e
:
741 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
742 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
743 raise VduRecordError(msg
)
745 ituple
= (intf
, cp_id
, vlr
)
746 self
._int
_intf
.append(ituple
)
748 self
._log
.debug("Created internal interface tuple : %s", ituple
)
750 resmgr_path
= self
.resmgr_path
751 resmgr_msg
= self
.resmgr_msg(config
)
753 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
754 block
.add_query_create(resmgr_path
, resmgr_msg
)
756 res_iter
= yield from block
.execute(now
=True)
764 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
765 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
766 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
767 return resp
.resource_info
770 def delete_resource(self
, xact
):
771 block
= xact
.block_create()
773 self
._log
.debug("Executing vm request id: %s, action: delete",
776 block
.add_query_delete(self
.resmgr_path
)
778 yield from block
.execute(flags
=0, now
=True)
781 def read_resource(self
, xact
):
782 block
= xact
.block_create()
784 self
._log
.debug("Executing vm request id: %s, action: delete",
787 block
.add_query_read(self
.resmgr_path
)
789 res_iter
= yield from block
.execute(flags
=0, now
=True)
794 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
795 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
796 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
797 #self._vm_resp = resp.resource_info
798 return resp
.resource_info
802 def start_component(self
):
803 """ This VDUR is active """
804 self
._log
.debug("Starting component %s for vdud %s vdur %s",
805 self
._vdud
.vcs_component_ref
,
808 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
809 self
.vm_resp
.management_ip
)
813 """ Is this VDU active """
814 return True if self
._state
is VDURecordState
.READY
else False
817 def instantiation_failed(self
, failed_reason
=None):
818 """ VDU instantiation failed """
819 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
820 self
._state
= VDURecordState
.FAILED
821 self
._state
_failed
_reason
= failed_reason
822 yield from self
._vnfr
.instantiation_failed(failed_reason
)
825 def vdu_is_active(self
):
826 """ This VDU is active"""
828 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
831 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
833 if self
._vdud
.vcs_component_ref
is not None:
834 yield from self
.start_component()
836 self
._state
= VDURecordState
.READY
838 if self
._vnfr
.all_vdus_active():
839 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
840 yield from self
._vnfr
.is_ready()
843 def instantiate(self
, xact
, vnfr
, config
=None):
844 """ Instantiate this VDU """
845 self
._state
= VDURecordState
.INSTANTIATING
848 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
849 """ This VDUR is active """
850 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
855 if (query_action
== rwdts
.QueryAction
.UPDATE
or
856 query_action
== rwdts
.QueryAction
.CREATE
):
859 if msg
.resource_state
== "active":
860 # Move this VDU to ready state
861 yield from self
.vdu_is_active()
862 elif msg
.resource_state
== "failed":
863 yield from self
.instantiation_failed(msg
.resource_errors
)
864 elif query_action
== rwdts
.QueryAction
.DELETE
:
865 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
867 raise NotImplementedError(
868 "%s action on VirtualDeployementUnitRecord not supported",
870 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
873 reg_event
= asyncio
.Event(loop
=self
._loop
)
876 def on_ready(regh
, status
):
879 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
880 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
881 flags
=rwdts
.Flag
.SUBSCRIBER
,
883 yield from reg_event
.wait()
885 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
886 self
._vm
_resp
= vm_resp
888 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
889 self
._log
.debug("Requested VM from resource manager response %s",
891 if vm_resp
.resource_state
== "active":
892 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
894 yield from self
.vdu_is_active()
895 self
._state
= VDURecordState
.READY
896 elif (vm_resp
.resource_state
== "pending" or
897 vm_resp
.resource_state
== "inactive"):
898 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
900 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
901 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
902 # flags=rwdts.Flag.SUBSCRIBER,
905 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
907 raise VirtualDeploymentUnitRecordError(
908 "Failed VDUR instantiation %s " % vm_resp
)
910 except Exception as e
:
912 traceback
.print_exc()
913 self
._log
.exception(e
)
914 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
915 self
._state
= VDURecordState
.FAILED
916 yield from self
.instantiation_failed(str(e
))
919 class VlRecordState(enum
.Enum
):
920 """ VL Record State """
922 INSTANTIATION_PENDING
= 102
924 TERMINATE_PENDING
= 104
929 class InternalVirtualLinkRecord(object):
930 """ Internal Virtual Link record """
931 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
935 self
._ivld
_msg
= ivld_msg
936 self
._vnfr
_name
= vnfr_name
937 self
._cloud
_account
_name
= cloud_account_name
939 self
._vlr
_req
= self
.create_vlr()
941 self
._state
= VlRecordState
.INIT
945 """ Find VLR by id """
946 return self
._vlr
_req
.id
950 """ Name of this VL """
951 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
954 def network_id(self
):
955 """ Find VLR by id """
956 return self
._vlr
.network_id
if self
._vlr
else None
959 """ VLR path for this VLR instance"""
960 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
962 def create_vlr(self
):
963 """ Create the VLR record which will be instantiated """
965 vld_fields
= ["short_name",
972 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
974 vlr_dict
= {"id": str(uuid
.uuid4()),
976 "cloud_account": self
._cloud
_account
_name
,
978 vlr_dict
.update(vld_copy_dict
)
980 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
984 def instantiate(self
, xact
, restart_mode
=False):
985 """ Instantiate VL """
988 def instantiate_vlr():
989 """ Instantiate VLR"""
990 self
._log
.debug("Create VL with xpath %s and vlr %s",
991 self
.vlr_path(), self
._vlr
_req
)
993 with self
._dts
.transaction(flags
=0) as xact
:
994 block
= xact
.block_create()
995 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
996 self
._log
.debug("Executing VL create path:%s msg:%s",
997 self
.vlr_path(), self
._vlr
_req
)
1001 res_iter
= yield from block
.execute()
1003 self
._state
= VlRecordState
.FAILED
1004 self
._log
.exception("Caught exception while instantial VL")
1007 for ent
in res_iter
:
1008 res
= yield from ent
1009 self
._vlr
= res
.result
1011 if self
._vlr
.operational_status
== 'failed':
1012 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1013 self
._state
= VlRecordState
.FAILED
1014 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1016 self
._log
.info("Created VL with xpath %s and vlr %s",
1017 self
.vlr_path(), self
._vlr
)
1021 """ Get the network id """
1022 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1024 for ent
in res_iter
:
1025 res
= yield from ent
1029 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1031 raise InternalVirtualLinkRecordError(err
)
1034 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1037 vl
= yield from get_vlr()
1039 yield from instantiate_vlr()
1041 yield from instantiate_vlr()
1043 self
._state
= VlRecordState
.ACTIVE
1045 def vlr_in_vns(self
):
1046 """ Is there a VLR record in VNS """
1047 if (self
._state
== VlRecordState
.ACTIVE
or
1048 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1049 self
._state
== VlRecordState
.FAILED
):
1055 def terminate(self
, xact
):
1056 """Terminate this VL """
1057 if not self
.vlr_in_vns():
1058 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1059 self
.vlr_id
, self
._state
)
1062 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1063 self
._state
= VlRecordState
.TERMINATE_PENDING
1064 block
= xact
.block_create()
1065 block
.add_query_delete(self
.vlr_path())
1066 yield from block
.execute(flags
=0, now
=True)
1067 self
._state
= VlRecordState
.TERMINATED
1068 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1071 class VirtualNetworkFunctionRecord(object):
1072 """ Virtual Network Function Record """
1073 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1077 self
._cluster
_name
= cluster_name
1078 self
._vnfr
_msg
= vnfr_msg
1079 self
._vnfr
_id
= vnfr_msg
.id
1080 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1082 self
._vcs
_handler
= vcs_handler
1083 self
._vnfr
= vnfr_msg
1084 self
._mgmt
_network
= mgmt_network
1086 self
._vnfd
= vnfr_msg
.vnfd
1087 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1088 self
._state
_failed
_reason
= None
1089 self
._ext
_vlrs
= {} # The list of external virtual links
1090 self
._vlrs
= [] # The list of internal virtual links
1091 self
._vdus
= [] # The list of vdu
1092 self
._vlr
_by
_cp
= {}
1094 self
._inventory
= {}
1095 self
._create
_time
= int(time
.time())
1096 self
._vnf
_mon
= None
1097 self
._config
_status
= vnfr_msg
.config_status
1098 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1099 self
._rw
_vnfd
= None
1100 self
._vnfd
_ref
_count
= 0
1102 def _get_vdur_from_vdu_id(self
, vdu_id
):
1103 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1104 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1105 for vdu
in self
._vdus
:
1106 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1107 if vdu
.vdu_id
== vdu_id
:
1110 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1113 def operational_status(self
):
1114 """ Operational status of this VNFR """
1115 op_status_map
= {"INIT": "init",
1116 "VL_INIT_PHASE": "vl_init_phase",
1117 "VM_INIT_PHASE": "vm_init_phase",
1119 "TERMINATE": "terminate",
1120 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1121 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1122 "TERMINATED": "terminated",
1123 "FAILED": "failed", }
1124 return op_status_map
[self
._state
.name
]
1127 def vnfd_xpath(vnfd_id
):
1128 """ VNFD xpath associated with this VNFR """
1129 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1132 def vnfd_ref_count(self
):
1133 """ Returns the VNFD reference count associated with this VNFR """
1134 return self
._vnfd
_ref
_count
1136 def vnfd_in_use(self
):
1137 """ Returns whether vnfd is in use or not """
1138 return True if self
._vnfd
_ref
_count
> 0 else False
1141 """ Take a reference on this object """
1142 self
._vnfd
_ref
_count
+= 1
1143 return self
._vnfd
_ref
_count
1145 def vnfd_unref(self
):
1146 """ Release reference on this object """
1147 if self
._vnfd
_ref
_count
< 1:
1148 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1149 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1150 self
._log
.critical(msg
)
1151 raise VnfRecordError(msg
)
1152 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1153 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1154 self
._vnfd
_ref
_count
-= 1
1155 return self
._vnfd
_ref
_count
1159 """ VNFD for this VNFR """
1164 """ VNFD name associated with this VNFR """
1165 return self
.vnfd
.name
1169 """ Name of this VNF in the record """
1170 return self
._vnfr
.name
1173 def cloud_account_name(self
):
1174 """ Name of the cloud account this VNFR is instantiated in """
1175 return self
._vnfr
.cloud_account
1179 """ VNFD Id associated with this VNFR """
1184 """ VNFR Id associated with this VNFR """
1185 return self
._vnfr
_id
1188 def member_vnf_index(self
):
1189 """ Member VNF index associated with this VNFR """
1190 return self
._vnfr
.member_vnf_index_ref
1193 def config_status(self
):
1194 """ Config agent status for this VNFR """
1195 return self
._config
_status
1197 def component_by_name(self
, component_name
):
1198 """ Find a component by name in the inventory list"""
1199 mangled_name
= VcsComponent
.mangle_name(component_name
,
1202 return self
._inventory
[mangled_name
]
1207 def get_nsr_config(self
):
1208 ### Need access to NS instance configuration for runtime resolution.
1209 ### This shall be replaced when deployment flavors are implemented
1210 xpath
= "C,/nsr:ns-instance-config"
1211 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1213 for result
in results
:
1214 entry
= yield from result
1215 ns_instance_config
= entry
.result
1216 for nsr
in ns_instance_config
.nsr
:
1217 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1222 def start_component(self
, component_name
, ip_addr
):
1223 """ Start a component in the VNFR by name """
1224 comp
= self
.component_by_name(component_name
)
1225 yield from comp
.start(None, None, ip_addr
)
1227 def cp_ip_addr(self
, cp_name
):
1228 """ Get ip address for connection point """
1229 self
._log
.debug("cp_ip_addr()")
1230 for cp
in self
._cprs
:
1231 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1232 return cp
.ip_address
1235 def mgmt_intf_info(self
):
1236 """ Get Management interface info for this VNFR """
1237 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1239 if mgmt_intf_desc
.has_field("cp"):
1240 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1241 elif mgmt_intf_desc
.has_field("vdu_id"):
1243 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1244 ip_addr
= vdur
.management_ip
1245 except VDURecordNotFound
:
1246 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1249 ip_addr
= mgmt_intf_desc
.ip_address
1250 port
= mgmt_intf_desc
.port
1252 return ip_addr
, port
1256 """ Message associated with this VNFR """
1257 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1258 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1260 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1261 ip_address
, port
= self
.mgmt_intf_info()
1263 if ip_address
is not None:
1264 mgmt_intf
.ip_address
= ip_address
1265 if port
is not None:
1266 mgmt_intf
.port
= port
1268 vnfr_dict
= {"id": self
._vnfr
_id
,
1269 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1271 "member_vnf_index_ref": self
.member_vnf_index
,
1272 "operational_status": self
.operational_status
,
1273 "operational_status_details": self
._state
_failed
_reason
,
1274 "cloud_account": self
.cloud_account_name
,
1275 "config_status": self
._config
_status
1278 vnfr_dict
.update(vnfd_copy_dict
)
1280 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1281 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1283 vnfr_msg
.create_time
= self
._create
_time
1284 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1285 vnfr_msg
.mgmt_interface
= mgmt_intf
1287 # Add all the VLRs to VNFR
1288 for vlr
in self
._vlrs
:
1289 ivlr
= vnfr_msg
.internal_vlr
.add()
1290 ivlr
.vlr_ref
= vlr
.vlr_id
1292 # Add all the VDURs to VDUR
1293 if self
._vdus
is not None:
1294 for vdu
in self
._vdus
:
1295 vdur
= vnfr_msg
.vdur
.add()
1296 vdur
.from_dict(vdu
.msg
.as_dict())
1298 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1299 vnfr_msg
.dashboard_url
= self
.dashboard_url
1301 for cpr
in self
._cprs
:
1302 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1303 vnfr_msg
.connection_point
.append(new_cp
)
1305 if self
._vnf
_mon
is not None:
1306 for monp
in self
._vnf
_mon
.msg
:
1307 vnfr_msg
.monitoring_param
.append(
1308 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1310 if self
._vnfr
.vnf_configuration
is not None:
1311 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1312 if (ip_address
is not None and
1313 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1314 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1316 for group
in self
._vnfr
_msg
.placement_groups_info
:
1317 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1318 group_info
.from_dict(group
.as_dict())
1319 vnfr_msg
.placement_groups_info
.append(group_info
)
1324 def dashboard_url(self
):
1325 ip
, cfg_port
= self
.mgmt_intf_info()
1328 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1329 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1332 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1333 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1335 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1339 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1346 """ path for this VNFR """
1347 return("D,/vnfr:vnfr-catalog"
1348 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1351 def publish(self
, xact
):
1352 """ publish this VNFR """
1354 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1355 self
.xpath
, self
.msg
)
1356 vnfr
.create_time
= self
._create
_time
1357 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1358 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1359 self
.xpath
, self
.msg
)
1362 def create_vls(self
):
1363 """ Publish The VLs associated with this VNF """
1364 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1366 for ivld_msg
in self
.vnfd
.internal_vld
:
1367 self
._log
.debug("Creating internal vld:"
1368 " %s, int_cp_ref = %s",
1369 ivld_msg
, ivld_msg
.internal_connection_point
1371 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1375 vnfr_name
=self
.name
,
1376 cloud_account_name
=self
.cloud_account_name
1378 self
._vlrs
.append(vlr
)
1380 for int_cp
in ivld_msg
.internal_connection_point
:
1381 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1382 msg
= ("Connection point %s already "
1383 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1384 raise InternalVirtualLinkRecordError(msg
)
1385 self
._log
.debug("Setting vlr %s to internal cp = %s",
1387 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1390 def instantiate_vls(self
, xact
, restart_mode
=False):
1391 """ Instantiate the VLs associated with this VNF """
1392 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1395 for vlr
in self
._vlrs
:
1396 self
._log
.debug("Instantiating VLR %s", vlr
)
1397 yield from vlr
.instantiate(xact
, restart_mode
)
1399 def find_vlr_by_cp(self
, cp_name
):
1400 """ Find the VLR associated with the cp name """
1401 return self
._vlr
_by
_cp
[cp_name
]
1403 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1405 Returns the cloud specific construct for placement group
1407 input_group: VNFD PlacementGroup
1408 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1410 copy_dict
= ['name', 'requirement', 'strategy']
1411 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1412 if group_info
.placement_group_ref
== input_group
.name
and \
1413 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1414 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1415 group_dict
= {k
:v
for k
,v
in
1416 group_info
.as_dict().items()
1417 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1418 for param
in copy_dict
:
1419 group_dict
.update({param
: getattr(input_group
, param
)})
1420 group
.from_dict(group_dict
)
1425 def get_vdu_placement_groups(self
, vdu
):
1426 placement_groups
= []
1427 ### Step-1: Get VNF level placement groups
1428 for group
in self
._vnfr
_msg
.placement_groups_info
:
1429 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1430 #group_info.from_dict(group.as_dict())
1431 placement_groups
.append(group
)
1433 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1434 nsr_config
= yield from self
.get_nsr_config()
1436 ### Step-3: Get VDU level placement groups
1437 for group
in self
.vnfd
.placement_groups
:
1438 for member_vdu
in group
.member_vdus
:
1439 if member_vdu
.member_vdu_ref
== vdu
.id:
1440 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1442 if group_info
is None:
1443 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1444 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1446 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1450 self
.member_vnf_index
)
1451 placement_groups
.append(group_info
)
1453 return placement_groups
1456 def create_vdus(self
, vnfr
, restart_mode
=False):
1457 """ Create the VDUs associated with this VNF """
1459 def get_vdur_id(vdud
):
1460 """Get the corresponding VDUR's id for the VDUD. This is useful in
1463 In restart mode we check for exiting VDUR's ID and use them, if
1464 available. This way we don't end up creating duplicate VDURs
1468 if restart_mode
and vdud
is not None:
1470 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1473 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1478 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1479 for vdu
in self
._rw
_vnfd
.vdu
:
1480 self
._log
.debug("Creating vdu: %s", vdu
)
1481 vdur_id
= get_vdur_id(vdu
)
1483 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1484 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1487 self
.member_vnf_index
,
1488 [ group
.name
for group
in placement_groups
])
1490 vdur
= VirtualDeploymentUnitRecord(
1496 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1497 mgmt_network
=self
._mgmt
_network
,
1498 cloud_account_name
=self
.cloud_account_name
,
1499 vnfd_package_store
=self
._vnfd
_package
_store
,
1501 placement_groups
= placement_groups
,
1503 yield from vdur
.vdu_opdata_register()
1505 self
._vdus
.append(vdur
)
1508 def instantiate_vdus(self
, xact
, vnfr
):
1509 """ Instantiate the VDUs associated with this VNF """
1510 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1512 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1514 # Identify any dependencies among the VDUs
1515 dependencies
= collections
.defaultdict(list)
1516 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1518 for vdu
in self
._vdus
:
1519 if vdu
.vdud_cloud_init
is not None:
1520 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1521 if vdu_id
!= vdu
.vdu_id
:
1522 # This means that vdu.vdu_id depends upon vdu_id,
1523 # i.e. vdu_id must be instantiated before
1525 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1527 # Define the terminal states of VDU instantiation
1529 VDURecordState
.READY
,
1530 VDURecordState
.TERMINATED
,
1531 VDURecordState
.FAILED
,
1534 datastore
= VdurDatastore()
1538 def instantiate_monitor(vdu
):
1539 """Monitor the state of the VDU during instantiation
1542 vdu - a VirtualDeploymentUnitRecord
1545 # wait for the VDUR to enter a terminal state
1546 while vdu
._state
not in terminal
:
1547 yield from asyncio
.sleep(1, loop
=self
._loop
)
1549 # update the datastore
1550 datastore
.update(vdu
)
1552 # add the VDU to the set of processed VDUs
1553 processed
.add(vdu
.vdu_id
)
1556 def instantiate(vdu
):
1557 """Instantiate the specified VDU
1560 vdu - a VirtualDeploymentUnitRecord
1563 if the VDU, or any of the VDUs this VDU depends upon, are
1564 terminated or fail to instantiate properly, a
1565 VirtualDeploymentUnitRecordError is raised.
1568 for dependency
in dependencies
[vdu
.vdu_id
]:
1569 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1571 while dependency
.vdu_id
not in processed
:
1572 yield from asyncio
.sleep(1, loop
=self
._loop
)
1574 if not dependency
.active
:
1575 raise VirtualDeploymentUnitRecordError()
1577 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1579 # Populate the datastore with the current values of the VDU
1582 # Substitute any variables contained in the cloud config script
1583 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1585 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1588 # Extract the variable names
1590 for variable
in parts
[1::2]:
1591 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1593 # Iterate of the variables and substitute values from the
1595 for variable
in variables
:
1597 # Handle a reference to a VDU by ID
1598 if variable
.startswith('vdu['):
1599 value
= datastore
.get(variable
)
1601 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1602 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1604 config
= config
.replace("{{ %s }}" % variable
, value
)
1607 # Handle a reference to the current VDU
1608 if variable
.startswith('vdu'):
1609 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1610 config
= config
.replace("{{ %s }}" % variable
, value
)
1613 # Handle unrecognized variables
1614 msg
= 'unrecognized cloud-config variable: {}'
1615 raise ValueError(msg
.format(variable
))
1617 # Instantiate the VDU
1618 with self
._dts
.transaction() as xact
:
1619 self
._log
.debug("Instantiating vdu: %s", vdu
)
1620 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1621 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1622 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1625 # First create a set of tasks to monitor the state of the VDUs and
1626 # report when they have entered a terminal state
1627 for vdu
in self
._vdus
:
1628 self
._loop
.create_task(instantiate_monitor(vdu
))
1630 for vdu
in self
._vdus
:
1631 self
._loop
.create_task(instantiate(vdu
))
1633 def has_mgmt_interface(self
, vdu
):
1634 # ## TODO: Support additional mgmt_interface type options
1635 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1639 def vlr_xpath(self
, vlr_id
):
1642 "D,/vlr:vlr-catalog/"
1643 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1645 def ext_vlr_by_id(self
, vlr_id
):
1646 """ find ext vlr by id """
1647 return self
._ext
_vlrs
[vlr_id
]
1650 def publish_inventory(self
, xact
):
1651 """ Publish the inventory associated with this VNF """
1652 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1654 for component
in self
._rw
_vnfd
.component
:
1655 self
._log
.debug("Creating inventory component %s", component
)
1656 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1660 comp
= VcsComponent(dts
=self
._dts
,
1663 cluster_name
=self
._cluster
_name
,
1664 vcs_handler
=self
._vcs
_handler
,
1665 component
=component
,
1666 mangled_name
=mangled_name
,
1668 if comp
.name
in self
._inventory
:
1669 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1670 component
, self
._vnfd
_id
)
1672 self
._log
.debug("Adding component %s for vnrf %s",
1673 comp
.name
, self
._vnfr
_id
)
1674 self
._inventory
[comp
.name
] = comp
1675 yield from comp
.publish(xact
)
1677 def all_vdus_active(self
):
1678 """ Are all VDUS in this VNFR active? """
1679 for vdu
in self
._vdus
:
1683 self
._log
.debug("Inside all_vdus_active. Returning True")
1687 def instantiation_failed(self
, failed_reason
=None):
1688 """ VNFR instantiation failed """
1689 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1690 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1691 self
._state
_failed
_reason
= failed_reason
1693 # Update the VNFR with the changed status
1694 yield from self
.publish(None)
1698 """ This VNF is ready"""
1699 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1701 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1702 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1705 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1707 # Update the VNFR with the changed status
1708 yield from self
.publish(None)
1710 def update_cp(self
, cp_name
, ip_address
, cp_id
):
1711 """Updated the connection point with ip address"""
1712 for cp
in self
._cprs
:
1713 if cp
.name
== cp_name
:
1714 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1715 cp_name
, cp
, ip_address
, cp_id
)
1716 cp
.ip_address
= ip_address
1717 cp
.connection_point_id
= cp_id
1720 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1721 self
._log
.debug(err
)
1722 raise VirtualDeploymentUnitRecordError(err
)
1724 def set_state(self
, state
):
1725 """ Set state for this VNFR"""
1729 def instantiate(self
, xact
, restart_mode
=False):
1730 """ instantiate this VNF """
1731 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1732 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1737 # Iterate over all the connection points in VNFR and fetch the
1740 def cpr_from_cp(cp
):
1741 """ Creates a record level connection point from the desciptor cp"""
1742 cp_fields
= ["name", "image", "vm-flavor", "static_ip_address"]
1743 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1745 cpr_dict
.update(cp_copy_dict
)
1746 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1748 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1749 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1751 for cp
in self
._vnfr
.connection_point
:
1752 cpr
= cpr_from_cp(cp
)
1753 self
._cprs
.append(cpr
)
1754 self
._log
.debug("Adding Connection point record %s ", cp
)
1756 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1757 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1758 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1759 rwdts
.XactFlag
.MERGE
)
1763 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1764 cpr
.vlr_ref
= cp
.vlr_ref
1765 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1767 # Increase the VNFD reference count
1772 # Fetch External VLRs
1773 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1774 yield from fetch_vlrs()
1777 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1778 yield from self
.publish_inventory(xact
)
1781 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1782 yield from self
.create_vls()
1785 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1786 yield from self
.publish(xact
)
1789 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1791 yield from self
.instantiate_vls(xact
, restart_mode
)
1792 except Exception as e
:
1793 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1794 yield from self
.instantiation_failed(str(e
))
1797 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1800 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1801 yield from self
.create_vdus(self
, restart_mode
)
1804 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1805 yield from self
.publish(xact
)
1808 # ToDo: Check if this should be prevented during restart
1809 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1810 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1813 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1814 yield from self
.publish(xact
)
1816 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1818 # create task updating uptime for this vnfr
1819 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1820 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1823 def terminate(self
, xact
):
1824 """ Terminate this virtual network function """
1826 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1828 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1831 if self
._vnf
_mon
is not None:
1832 self
._vnf
_mon
.stop()
1833 self
._vnf
_mon
.deregister()
1834 self
._vnf
_mon
= None
1837 def terminate_vls():
1838 """ Terminate VLs in this VNF """
1839 for vl
in self
._vlrs
:
1840 yield from vl
.terminate(xact
)
1843 def terminate_vdus():
1844 """ Terminate VDUS in this VNF """
1845 for vdu
in self
._vdus
:
1846 yield from vdu
.terminate(xact
)
1848 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1849 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1850 yield from terminate_vls()
1852 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1853 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1854 yield from terminate_vdus()
1856 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1857 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1860 def vnfr_uptime_update(self
, xact
):
1862 # Return when vnfr state is FAILED or TERMINATED etc
1863 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1864 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1865 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1866 VirtualNetworkFunctionRecordState
.READY
]:
1868 yield from self
.publish(xact
)
1869 yield from asyncio
.sleep(2, loop
=self
._loop
)
1873 class VnfdDtsHandler(object):
1874 """ DTS handler for VNFD config changes """
1875 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1877 def __init__(self
, dts
, log
, loop
, vnfm
):
1886 """ DTS registration handle """
1891 """ Register for VNFD configuration"""
1893 def on_apply(dts
, acg
, xact
, action
, scratch
):
1894 """Apply the configuration"""
1895 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1896 xact
, action
, scratch
)
1898 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1901 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1902 """ on prepare callback """
1903 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1904 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1905 fref
= ProtobufC
.FieldReference
.alloc()
1906 fref
.goto_whole_message(msg
.to_pbcm())
1908 # Handle deletes in prepare_callback
1909 if fref
.is_field_deleted():
1910 # Delete an VNFD record
1911 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1912 if self
._vnfm
.vnfd_in_use(msg
.id):
1913 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1914 err
= "Cannot delete a VNFD in use - %s" % msg
1915 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1916 # Delete a VNFD record
1917 yield from self
._vnfm
.delete_vnfd(msg
.id)
1919 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1922 "Registering for VNFD config using xpath: %s",
1923 VnfdDtsHandler
.XPATH
,
1925 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1926 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1927 self
._regh
= acg
.register(
1928 xpath
=VnfdDtsHandler
.XPATH
,
1929 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1930 on_prepare
=on_prepare
)
1933 class VcsComponentDtsHandler(object):
1934 """ Vcs Component DTS handler """
1935 XPATH
= ("D,/rw-manifest:manifest" +
1936 "/rw-manifest:operational-inventory" +
1937 "/rw-manifest:component")
1939 def __init__(self
, dts
, log
, loop
, vnfm
):
1948 """ DTS registration handle """
1953 """ Registers VCS component dts publisher registration"""
1954 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
1955 VcsComponentDtsHandler
.XPATH
)
1957 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1958 handlers
= rift
.tasklets
.Group
.Handler()
1959 with self
._dts
.group_create(handler
=handlers
) as group
:
1960 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
1962 flags
=(rwdts
.Flag
.PUBLISHER |
1963 rwdts
.Flag
.NO_PREP_READ |
1964 rwdts
.Flag
.DATASTORE
),)
1967 def publish(self
, xact
, path
, msg
):
1968 """ Publishes the VCS component """
1969 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
1971 self
.regh
.create_element(path
, msg
)
1972 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1973 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
1975 class VnfrConsoleOperdataDtsHandler(object):
1976 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1978 def vnfr_vdu_console_xpath(self
):
1979 """ path for resource-mgr"""
1980 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
1982 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
1989 self
._vnfr
_id
= vnfr_id
1990 self
._vdur
_id
= vdur_id
1991 self
._vdu
_id
= vdu_id
1995 """ Register for VNFR VDU Operational Data read from dts """
1998 def on_prepare(xact_info
, action
, ks_path
, msg
):
1999 """ prepare callback from dts """
2000 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2002 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2003 xact_info
, action
, xpath
, msg
2006 if action
== rwdts
.QueryAction
.READ
:
2007 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2008 path_entry
= schema
.keyspec_to_entry(ks_path
)
2009 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2011 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2012 except VnfRecordError
as e
:
2013 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2014 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2017 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2018 if not vdur
._state
== VDURecordState
.READY
:
2019 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2020 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2022 with self
._dts
.transaction() as new_xact
:
2023 resp
= yield from vdur
.read_resource(new_xact
)
2024 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2025 vdur_console
.id = self
._vdur
_id
2026 if resp
.console_url
:
2027 vdur_console
.console_url
= resp
.console_url
2029 vdur_console
.console_url
= 'none'
2030 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2032 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2033 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2034 vdur_console
.id = self
._vdur
_id
2035 vdur_console
.console_url
= 'none'
2037 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2038 xpath
=self
.vnfr_vdu_console_xpath
,
2041 #raise VnfRecordError("Not supported operation %s" % action)
2042 self
._log
.error("Not supported operation %s" % action
)
2043 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2047 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2048 self
.vnfr_vdu_console_xpath
)
2049 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2050 with self
._dts
.group_create() as group
:
2051 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2053 flags
=rwdts
.Flag
.PUBLISHER
,
2057 class VnfrDtsHandler(object):
2058 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2059 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2061 def __init__(self
, dts
, log
, loop
, vnfm
):
2071 """ Return registration handle"""
2076 """ Return VNF manager instance """
2081 """ Register for vnfr create/update/delete/read requests from dts """
2082 def on_commit(xact_info
):
2083 """ The transaction has been committed """
2084 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2085 return rwdts
.MemberRspCode
.ACTION_OK
2087 def on_abort(*args
):
2088 """ Abort callback """
2089 self
._log
.debug("VNF transaction got aborted")
2092 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2095 def instantiate_realloc_vnfr(vnfr
):
2096 """Re-populate the vnfm after restart
2103 yield from vnfr
.instantiate(None, restart_mode
=True)
2105 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2106 curr_cfg
= self
.regh
.elements
2107 for cfg
in curr_cfg
:
2108 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2109 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2111 self
._log
.debug("Got on_event in vnfm")
2113 return rwdts
.MemberRspCode
.ACTION_OK
2116 def on_prepare(xact_info
, action
, ks_path
, msg
):
2117 """ prepare callback from dts """
2119 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2120 xact_info
, action
, msg
2123 if action
== rwdts
.QueryAction
.CREATE
:
2124 if not msg
.has_field("vnfd"):
2125 err
= "Vnfd not provided"
2126 self
._log
.error(err
)
2127 raise VnfRecordError(err
)
2129 vnfr
= self
.vnfm
.create_vnfr(msg
)
2131 # RIFT-9105: Unable to add a READ query under an existing transaction
2132 # xact = xact_info.xact
2133 yield from vnfr
.instantiate(None)
2134 except Exception as e
:
2135 self
._log
.exception(e
)
2136 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2137 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2138 yield from vnfr
.publish(None)
2139 elif action
== rwdts
.QueryAction
.DELETE
:
2140 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2141 path_entry
= schema
.keyspec_to_entry(ks_path
)
2142 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2145 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2146 raise VirtualNetworkFunctionRecordNotFound(
2147 "VNFR id %s", path_entry
.key00
.id)
2150 yield from vnfr
.terminate(xact_info
.xact
)
2153 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2154 except Exception as e
:
2155 self
._log
.exception(e
)
2156 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2158 elif action
== rwdts
.QueryAction
.UPDATE
:
2159 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2160 path_entry
= schema
.keyspec_to_entry(ks_path
)
2163 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2164 except Exception as e
:
2165 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2166 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2170 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2171 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2174 self
._log
.debug("VNFR {} update config status {} (current {})".
2175 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2176 # Update the config status and publish
2177 vnfr
._config
_status
= msg
.config_status
2178 yield from vnfr
.publish(None)
2181 raise NotImplementedError(
2182 "%s action on VirtualNetworkFunctionRecord not supported",
2185 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2187 self
._log
.debug("Registering for VNFR using xpath: %s",
2188 VnfrDtsHandler
.XPATH
,)
2190 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2191 on_prepare
=on_prepare
,)
2192 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2193 with self
._dts
.group_create(handler
=handlers
) as group
:
2194 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2196 flags
=(rwdts
.Flag
.PUBLISHER |
2197 rwdts
.Flag
.NO_PREP_READ |
2199 rwdts
.Flag
.DATASTORE
),)
2202 def create(self
, xact
, path
, msg
):
2204 Create a VNFR record in DTS with path and message
2206 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2209 self
.regh
.create_element(path
, msg
)
2210 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2214 def update(self
, xact
, path
, msg
):
2216 Update a VNFR record in DTS with path and message
2218 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2220 self
.regh
.update_element(path
, msg
)
2221 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2225 def delete(self
, xact
, path
):
2227 Delete a VNFR record in DTS with path and message
2229 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2230 self
.regh
.delete_element(path
)
2231 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2234 class VnfdRefCountDtsHandler(object):
2235 """ The VNFD Ref Count DTS handler """
2236 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2238 def __init__(self
, dts
, log
, loop
, vnfm
):
2248 """ Return registration handle """
2253 """ Return the NS manager instance """
2258 """ Register for VNFD ref count read from dts """
2261 def on_prepare(xact_info
, action
, ks_path
, msg
):
2262 """ prepare callback from dts """
2263 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2265 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2266 xact_info
, action
, xpath
, msg
2269 if action
== rwdts
.QueryAction
.READ
:
2270 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2271 path_entry
= schema
.keyspec_to_entry(ks_path
)
2272 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2273 for xpath
, msg
in vnfd_list
:
2274 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2276 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2279 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2281 raise VnfRecordError("Not supported operation %s" % action
)
2283 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2284 with self
._dts
.group_create() as group
:
2285 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2287 flags
=rwdts
.Flag
.PUBLISHER
,
2291 class VdurDatastore(object):
2293 This VdurDatastore is intended to expose select information about a VDUR
2294 such that it can be referenced in a cloud config file. The data that is
2295 exposed does not necessarily follow the structure of the data in the yang
2296 model. This is intentional. The data that are exposed are intended to be
2297 agnostic of the yang model so that changes in the model do not necessarily
2298 require changes to the interface provided to the user. It also means that
2299 the user does not need to be familiar with the RIFT.ware yang models.
2303 """Create an instance of VdurDatastore"""
2304 self
._vdur
_data
= dict()
2305 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2307 def add(self
, vdur
):
2308 """Add a new VDUR to the datastore
2311 vdur - a VirtualDeploymentUnitRecord instance
2314 A ValueError is raised if the VDUR is (1) None or (2) already in
2318 if vdur
.vdu_id
is None:
2319 raise ValueError('VDURs are required to have an ID')
2321 if vdur
.vdu_id
in self
._vdur
_data
:
2322 raise ValueError('cannot add a VDUR more than once')
2324 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2326 def set_if_not_none(key
, attr
):
2327 if attr
is not None:
2328 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2330 set_if_not_none('name', vdur
._vdud
.name
)
2331 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2333 def update(self
, vdur
):
2334 """Update the VDUR information in the datastore
2337 vdur - a GI representation of a VDUR
2340 A ValueError is raised if the VDUR is (1) None or (2) already in
2344 if vdur
.vdu_id
is None:
2345 raise ValueError('VNFDs are required to have an ID')
2347 if vdur
.vdu_id
not in self
._vdur
_data
:
2348 raise ValueError('VNF is not recognized')
2350 def set_or_delete(key
, attr
):
2352 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2353 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2356 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2358 set_or_delete('name', vdur
._vdud
.name
)
2359 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2361 def remove(self
, vdur_id
):
2362 """Remove all of the data associated with specified VDUR
2365 vdur_id - the identifier of a VNFD in the datastore
2368 A ValueError is raised if the VDUR is not contained in the
2372 if vdur_id
not in self
._vdur
_data
:
2373 raise ValueError('VNF is not recognized')
2375 del self
._vdur
_data
[vdur_id
]
2377 def get(self
, expr
):
2378 """Retrieve VDUR information from the datastore
2380 An expression should be of the form,
2384 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2385 the exposed attribute that the user wishes to retrieve.
2387 If the requested data is not available, None is returned.
2390 expr - a string that specifies the data to return
2393 A ValueError is raised if the provided expression cannot be parsed.
2396 The requested data or None
2399 result
= self
._pattern
.match(expr
)
2401 raise ValueError('data expression not recognized ({})'.format(expr
))
2403 vdur_id
, key
= result
.groups()
2405 if vdur_id
not in self
._vdur
_data
:
2408 return self
._vdur
_data
[vdur_id
].get(key
, None)
2411 class VnfManager(object):
2412 """ The virtual network function manager class """
2413 def __init__(self
, dts
, log
, loop
, cluster_name
):
2417 self
._cluster
_name
= cluster_name
2419 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2420 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2421 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2422 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2424 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2427 self
._vnfr
_ref
_handler
,
2430 self
._vnfds
_to
_vnfr
= {}
2434 def vnfr_handler(self
):
2435 """ VNFR dts handler """
2436 return self
._vnfr
_handler
2439 def vcs_handler(self
):
2440 """ VCS dts handler """
2441 return self
._vcs
_handler
2445 """ Register all static DTS handlers """
2446 for hdl
in self
._dts
_handlers
:
2447 yield from hdl
.register()
2451 """ Run this VNFM instance """
2452 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2453 yield from self.register()
2455 def handle_nsr(self, nsr, action):
2456 if action in [rwdts.QueryAction.CREATE]:
2457 self._nsrs[nsr.id] = nsr
2458 elif action == rwdts.QueryAction.DELETE:
2459 if nsr.id in self._nsrs:
2460 del self._nsrs[nsr.id]
2462 def get_linked_mgmt_network(self, vnfr):
2463 """For the given VNFR get the related mgmt network from the NSD, if
2466 vnfd_id = vnfr.vnfd.id
2467 nsr_id = vnfr.nsr_id_ref
2469 # for the given related VNFR, get the corresponding NSR-config
2472 nsr_obj = self._nsrs[nsr_id]
2474 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2476 # for the related NSD check if a VLD exists such that it's a mgmt
2478 for vld in nsr_obj.nsd.vld:
2479 if vld.mgmt_network:
2480 return vld.vim_network_name
2484 def get_vnfr(self, vnfr_id):
2485 """ get VNFR by vnfr id """
2487 if vnfr_id not in self._vnfrs:
2488 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2490 return self._vnfrs[vnfr_id]
2492 def create_vnfr(self, vnfr):
2493 """ Create a VNFR instance """
2494 if vnfr.id in self._vnfrs:
2495 msg = "Vnfr
id %s already exists
" % vnfr.id
2496 self._log.error(msg)
2497 raise VnfRecordError(msg)
2499 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2503 mgmt_network = self.get_linked_mgmt_network(vnfr)
2505 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2506 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2507 mgmt_network=mgmt_network
2509 self._vnfds_to_vnfr[vnfr.vnfd.id] = self._vnfrs[vnfr.id]
2510 return self._vnfrs[vnfr.id]
2513 def delete_vnfr(self, xact, vnfr):
2514 """ Create a VNFR instance """
2515 if vnfr.vnfr_id in self._vnfrs:
2516 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2517 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2518 del self._vnfrs[vnfr.vnfr_id]
2521 def fetch_vnfd(self, vnfd_id):
2522 """ Fetch VNFDs based with the vnfd id"""
2523 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2524 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2527 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2529 for ent in res_iter:
2530 res = yield from ent
2534 err = "Failed to get Vnfd
%s" % vnfd_id
2535 self._log.error(err)
2536 raise VnfRecordError(err)
2538 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2542 def vnfd_in_use(self, vnfd_id):
2543 """ Is this VNFD in use """
2544 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2545 if vnfd_id in self._vnfds_to_vnfr:
2546 return self._vnfds_to_vnfr[vnfd_id].in_use()
2550 def publish_vnfr(self, xact, path, msg):
2551 """ Publish a VNFR """
2552 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2554 yield from self.vnfr_handler.update(xact, path, msg)
2557 def delete_vnfd(self, vnfd_id):
2558 """ Delete the Virtual Network Function descriptor with the passed id """
2559 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2560 if vnfd_id not in self._vnfds_to_vnfr:
2561 self._log.debug("Delete VNFD failed
- cannot find vnfd
-id %s", vnfd_id)
2562 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find
%s", vnfd_id)
2564 if self._vnfds_to_vnfr[vnfd_id].in_use():
2565 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2567 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2568 raise VirtualNetworkFunctionDescriptorRefCountExists(
2569 "Cannot delete
:%s, ref_count
:%s",
2571 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2573 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2575 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2576 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2577 if os.path.exists(vnfd_dir):
2578 shutil.rmtree(vnfd_dir, ignore_errors=True)
2579 except Exception as e:
2580 self._log.error("Exception in cleaning up VNFD
{}: {}".
2581 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2582 self._log.exception(e)
2584 del self._vnfds_to_vnfr[vnfd_id]
2586 def vnfd_refcount_xpath(self, vnfd_id):
2587 """ xpath for ref count entry """
2588 return (VnfdRefCountDtsHandler.XPATH +
2589 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2592 def get_vnfd_refcount(self, vnfd_id):
2593 """ Get the vnfd_list from this VNFM"""
2595 if vnfd_id is None or vnfd_id == "":
2596 for vnfr in self._vnfds_to_vnfr.values():
2597 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2598 vnfd_msg.vnfd_id_ref = vnfr.vnfd.id
2599 vnfd_msg.instance_ref_count = vnfr.vnfd_ref_count
2600 vnfd_list.append((self.vnfd_refcount_xpath(vnfr.vnfd.id), vnfd_msg))
2601 elif vnfd_id in self._vnfds_to_vnfr:
2602 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2603 vnfd_msg.vnfd_id_ref = self._vnfds_to_vnfr[vnfd_id].vnfd.id
2604 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count
2605 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2610 class VnfmTasklet(rift.tasklets.Tasklet):
2611 """ VNF Manager tasklet class """
2612 def __init__(self, *args, **kwargs):
2613 super(VnfmTasklet, self).__init__(*args, **kwargs)
2614 self.rwlog.set_category("rw
-mano
-log
")
2615 self.rwlog.set_subcategory("vnfm
")
2622 super(VnfmTasklet, self).start()
2623 self.log.info("Starting VnfmTasklet
")
2625 self.log.setLevel(logging.DEBUG)
2627 self.log.debug("Registering with dts
")
2628 self._dts = rift.tasklets.DTS(self.tasklet_info,
2629 RwVnfmYang.get_schema(),
2631 self.on_dts_state_change)
2633 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2635 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2638 def on_instance_started(self):
2639 """ Task insance started callback """
2640 self.log.debug("Got instance started callback
")
2646 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2651 """ Task init callback """
2653 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2654 assert vm_parent_name is not None
2655 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2656 yield from self._vnfm.run()
2658 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2663 """ Task run callback """
2667 def on_dts_state_change(self, state):
2668 """Take action according to current dts state to transition
2669 application into the corresponding application state
2672 state - current dts state
2675 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2676 rwdts.State.CONFIG: rwdts.State.RUN,
2680 rwdts.State.INIT: self.init,
2681 rwdts.State.RUN: self.run,
2684 # Transition application to next state
2685 handler = handlers.get(state, None)
2686 if handler is not None:
2687 yield from handler()
2689 # Transition dts to next state
2690 next_state = switch.get(state, None)
2691 if next_state is not None:
2692 self._dts.handle.set_state(next_state)