2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.mano
.dts
as mano_dts
55 class VMResourceError(Exception):
56 """ VM resource Error"""
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
70 class NotImplemented(Exception):
71 """Not implemented """
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
135 class VNFMPlacementGroupError(Exception):
138 class VirtualNetworkFunctionRecordState(enum
.Enum
):
145 VL_TERMINATE_PHASE
= 6
146 VDU_TERMINATE_PHASE
= 7
151 class VDURecordState(enum
.Enum
):
152 """VDU record state """
155 RESOURCE_ALLOC_PENDING
= 3
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
168 self
._component
= component
169 self
._cluster
_name
= cluster_name
170 self
._vcs
_handler
= vcs_handler
171 self
._mangled
_name
= mangled_name
174 def mangle_name(component_name
, vnf_name
, vnfd_id
):
175 """ mangled component name """
176 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
180 """ name of this component"""
181 return self
._mangled
_name
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self
.name
)
192 def instance_xpath(self
):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
197 "[instance-name = '{}']".format(self
._cluster
_name
))
200 def start_comp_xpath(self
):
201 """ start component xpath """
202 return (self
.instance_xpath
+
203 "/child-n[instance-name = 'START-REQ']")
205 def get_start_comp_msg(self
, ip_address
):
206 """ start this component """
207 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
208 start_msg
.instance_name
= 'START-REQ'
209 start_msg
.component_name
= self
.name
210 start_msg
.admin_command
= "START"
211 start_msg
.ip_address
= ip_address
217 """ Returns the message for this vcs component"""
219 vcs_comp_dict
= self
._component
.as_dict()
221 def mangle_comp_names(comp_dict
):
222 """ mangle component name with VNF name, id"""
223 for key
, val
in comp_dict
.items():
224 if isinstance(val
, dict):
225 comp_dict
[key
] = mangle_comp_names(val
)
226 elif isinstance(val
, list):
229 if isinstance(ent
, dict):
230 val
[i
] = mangle_comp_names(ent
)
234 elif key
== "component_name":
235 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
240 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
241 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
245 def publish(self
, xact
):
246 """ Publishes the VCS component """
247 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self
.name
, self
.path
, self
.msg
)
249 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
252 def start(self
, xact
, parent
, ip_addr
=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg
= self
.get_start_comp_msg(ip_addr
)
256 self
._log
.debug("starting component %s %s",
257 self
.start_comp_xpath
, start_msg
)
258 yield from self
._dts
.query_create(self
.start_comp_xpath
,
261 self
._log
.debug("started component %s, %s",
262 self
.start_comp_xpath
, start_msg
)
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
278 placement_groups
=[]):
284 self
._mgmt
_intf
= mgmt_intf
285 self
._cloud
_account
_name
= cloud_account_name
286 self
._vnfd
_package
_store
= vnfd_package_store
287 self
._mgmt
_network
= mgmt_network
289 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
292 self
._state
= VDURecordState
.INIT
293 self
._state
_failed
_reason
= None
294 self
._request
_id
= str(uuid
.uuid4())
295 self
._name
= vnfr
.name
+ "__" + vdud
.id
296 self
._placement
_groups
= placement_groups
299 self
._vdud
_cloud
_init
= None
300 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
303 def vdu_opdata_register(self
):
304 yield from self
._vdur
_console
_handler
.register()
306 def cp_ip_addr(self
, cp_name
):
307 """ Find ip address by connection point name """
308 if self
._vm
_resp
is not None:
309 for conn_point
in self
._vm
_resp
.connection_points
:
310 if conn_point
.name
== cp_name
:
311 return conn_point
.ip_address
314 def cp_id(self
, cp_name
):
315 """ Find connection point id by connection point name """
316 if self
._vm
_resp
is not None:
317 for conn_point
in self
._vm
_resp
.connection_points
:
318 if conn_point
.name
== cp_name
:
319 return conn_point
.connection_point_id
332 """ Return this VDUR's name """
336 def cloud_account_name(self
):
337 """ Cloud account this VDU should be created in """
338 return self
._cloud
_account
_name
341 def image_name(self
):
342 """ name that should be used to lookup the image on the CMP """
343 if 'image' not in self
._vdud
:
345 return os
.path
.basename(self
._vdud
.image
)
348 def image_checksum(self
):
349 """ name that should be used to lookup the image on the CMP """
350 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
353 def management_ip(self
):
356 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
359 def vm_management_ip(self
):
362 return self
._vm
_resp
.management_ip
365 def operational_status(self
):
366 """ Operational status of this VDU"""
367 op_stats_dict
= {"INIT": "init",
368 "INSTANTIATING": "vm_init_phase",
369 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
372 "TERMINATING": "terminated",
373 "TERMINATED": "terminated",
375 return op_stats_dict
[self
._state
.name
]
380 vdu_fields
= ["vm_flavor",
387 vdu_copy_dict
= {k
: v
for k
, v
in
388 self
._vdud
.as_dict().items() if k
in vdu_fields
}
389 vdur_dict
= {"id": self
._vdur
_id
,
390 "vdu_id_ref": self
._vdud
.id,
391 "operational_status": self
.operational_status
,
392 "operational_status_details": self
._state
_failed
_reason
,
394 if self
.vm_resp
is not None:
395 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
396 "flavor_id": self
.vm_resp
.flavor_id
398 if self
._vm
_resp
.has_field('image_id'):
399 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
401 if self
.management_ip
is not None:
402 vdur_dict
["management_ip"] = self
.management_ip
404 if self
.vm_management_ip
is not None:
405 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
407 vdur_dict
.update(vdu_copy_dict
)
409 if self
.vm_resp
is not None:
410 if self
._vm
_resp
.has_field('volumes'):
411 for opvolume
in self
._vm
_resp
.volumes
:
412 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
413 if len(vdurvol_data
) == 1:
414 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
419 for intf
, cp_id
, vlr
in self
._int
_intf
:
420 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
422 icp_list
.append({"name": cp
.name
,
424 "type_yang": "VPORT",
425 "ip_address": self
.cp_ip_addr(cp
.name
)})
427 ii_list
.append({"name": intf
.name
,
428 "vdur_internal_connection_point_ref": cp
.id,
429 "virtual_interface": {}})
431 vdur_dict
["internal_connection_point"] = icp_list
432 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
433 vdur_dict
["internal_interface"] = ii_list
436 for intf
, cp
, vlr
in self
._ext
_intf
:
437 ei_list
.append({"name": cp
.name
,
438 "vnfd_connection_point_ref": cp
.name
,
439 "virtual_interface": {}})
440 self
._vnfr
.update_cp(cp
.name
, self
.cp_ip_addr(cp
.name
), self
.cp_id(cp
.name
))
442 vdur_dict
["external_interface"] = ei_list
444 placement_groups
= []
445 for group
in self
._placement
_groups
:
446 placement_groups
.append(group
.as_dict())
447 vdur_dict
['placement_groups_info'] = placement_groups
449 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
452 def resmgr_path(self
):
453 """ path for resource-mgr"""
454 return ("D,/rw-resource-mgr:resource-mgmt" +
456 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
459 def vm_flavor_msg(self
):
460 """ VM flavor message """
461 flavor
= self
._vdud
.vm_flavor
.__class
__()
462 flavor
.copy_from(self
._vdud
.vm_flavor
)
467 def vdud_cloud_init(self
):
468 """ Return the cloud-init contents for the VDU """
469 if self
._vdud
_cloud
_init
is None:
470 self
._vdud
_cloud
_init
= self
.cloud_init()
472 return self
._vdud
_cloud
_init
474 def cloud_init(self
):
475 """ Populate cloud_init with cloud-config script from
476 either the inline contents or from the file provided
478 if self
._vdud
.cloud_init
is not None:
479 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
480 return self
._vdud
.cloud_init
481 elif self
._vdud
.cloud_init_file
is not None:
482 # Get cloud-init script contents from the file provided in the cloud_init_file param
483 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
484 filename
= self
._vdud
.cloud_init_file
485 self
._vnfd
_package
_store
.refresh()
486 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
487 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
489 return cloud_init_extractor
.read_script(stored_package
, filename
)
490 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
491 raise VirtualDeploymentUnitRecordError(e
)
493 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
495 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
497 availability_zones
= []
499 for group
in self
._placement
_groups
:
500 if group
.has_field('host_aggregate'):
501 for aggregate
in group
.host_aggregate
:
502 host_aggregates
.append(aggregate
.as_dict())
503 if group
.has_field('availability_zone'):
504 availability_zones
.append(group
.availability_zone
.as_dict())
505 if group
.has_field('server_group'):
506 server_groups
.append(group
.server_group
.as_dict())
508 if availability_zones
:
509 if len(availability_zones
) > 1:
510 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
511 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
513 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
516 if len(server_groups
) > 1:
517 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
518 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
520 vm_create_msg_dict
['server_group'] = server_groups
[0]
523 vm_create_msg_dict
['host_aggregate'] = host_aggregates
527 def process_placement_groups(self
, vm_create_msg_dict
):
528 """Process the placement_groups and fill resource-mgr request"""
529 if not self
._placement
_groups
:
532 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
533 assert len(cloud_set
) == 1
534 cloud_type
= cloud_set
.pop()
536 if cloud_type
== 'openstack':
537 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
540 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
543 def resmgr_msg(self
, config
=None):
544 vdu_fields
= ["vm_flavor",
550 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
551 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
553 vm_create_msg_dict
= {
557 if self
.image_name
is not None:
558 vm_create_msg_dict
["image_name"] = self
.image_name
560 if self
.image_checksum
is not None:
561 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
563 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
564 if self
._vdud
.has_field('mgmt_vpci'):
565 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
567 self
._log
.debug("VDUD: %s", self
._vdud
)
568 if config
is not None:
569 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
571 if self
._mgmt
_network
:
572 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
575 for intf
, cp
, vlr
in self
._ext
_intf
:
576 cp_info
= {"name": cp
.name
,
577 "virtual_link_id": vlr
.network_id
,
578 "type_yang": intf
.virtual_interface
.type_yang
}
581 if cp
.static_ip_address
:
582 cp_info
["static_ip_address"] = cp
.static_ip_address
583 except AttributeError as e
:
586 if (intf
.virtual_interface
.has_field('vpci') and
587 intf
.virtual_interface
.vpci
is not None):
588 cp_info
["vpci"] = intf
.virtual_interface
.vpci
590 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
591 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
593 self
._log
.debug("External CP info {}".format(cp_info
))
594 cp_list
.append(cp_info
)
596 for intf
, cp_id
, vlr
in self
._int
_intf
:
597 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
599 cp_dict
= {"name": cp_id
,
600 "virtual_link_id": vlr
.network_id
,
601 "type_yang": intf
.virtual_interface
.type_yang
}
603 if (intf
.virtual_interface
.has_field('vpci') and
604 intf
.virtual_interface
.vpci
is not None):
605 cp_dict
["vpci"] = intf
.virtual_interface
.vpci
608 if cp
.static_ip_address
:
609 cp_dict
["static_ip_address"] = cp
.static_ip_address
610 except AttributeError as e
:
613 self
._log
.debug("Internal CP info {}".format(cp_info
))
614 cp_list
.append(cp_dict
)
616 vm_create_msg_dict
["connection_points"] = cp_list
617 vm_create_msg_dict
.update(vdu_copy_dict
)
619 self
.process_placement_groups(vm_create_msg_dict
)
621 msg
= RwResourceMgrYang
.VDUEventData()
622 msg
.event_id
= self
._request
_id
623 msg
.cloud_account
= self
.cloud_account_name
624 msg
.request_info
.from_dict(vm_create_msg_dict
)
626 for volume
in self
._vdud
.volumes
:
627 v
= msg
.request_info
.volumes
.add()
628 v
.from_dict(volume
.as_dict())
632 def terminate(self
, xact
):
633 """ Delete resource in VIM """
634 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
635 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
638 self
._state
= VDURecordState
.TERMINATING
639 if self
._vm
_resp
is not None:
641 with self
._dts
.transaction() as new_xact
:
642 yield from self
.delete_resource(new_xact
)
644 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
646 if self
._rm
_regh
is not None:
647 self
._log
.debug("Deregistering resource manager registration handle")
648 self
._rm
_regh
.deregister()
651 if self
._vdur
_console
_handler
is not None:
652 self
._log
.error("Deregistering vnfr vdur registration handle")
653 self
._vdur
_console
_handler
._regh
.deregister()
654 self
._vdur
_console
_handler
._regh
= None
656 self
._state
= VDURecordState
.TERMINATED
658 def find_internal_cp_by_cp_id(self
, cp_id
):
659 """ Find the CP corresponding to the connection point id"""
662 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
665 for int_cp
in self
._vdud
.internal_connection_point
:
666 self
._log
.debug("Checking for int cp %s in internal connection points",
668 if int_cp
.id == cp_id
:
673 self
._log
.debug("Failed to find cp %s in internal connection points",
675 msg
= "Failed to find cp %s in internal connection points" % cp_id
676 raise VduRecordError(msg
)
678 # return the VLR associated with the connection point
682 def create_resource(self
, xact
, vnfr
, config
=None):
683 """ Request resource from ResourceMgr """
684 def find_cp_by_name(cp_name
):
685 """ Find a connection point by name """
687 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
688 for ext_cp
in vnfr
._cprs
:
689 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
690 if ext_cp
.name
== cp_name
:
694 self
._log
.debug("Failed to find cp %s in external connection points",
698 def find_internal_vlr_by_cp_id(cp_id
):
699 self
._log
.debug("find_internal_vlr_by_cp_id(%s) called",
703 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
705 # return the VLR associated with the connection point
706 return vnfr
.find_vlr_by_cp(cp_id
)
708 block
= xact
.block_create()
710 self
._log
.debug("Executing vm request id: %s, action: create",
713 # Resolve the networks associated external interfaces
714 for ext_intf
in self
._vdud
.external_interface
:
715 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
716 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
717 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
719 self
._log
.debug("Failed to find connection point - %s",
720 ext_intf
.vnfd_connection_point_ref
)
722 self
._log
.debug("Connection point name [%s], type[%s]",
723 cp
.name
, cp
.type_yang
)
725 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
727 etuple
= (ext_intf
, cp
, vlr
)
728 self
._ext
_intf
.append(etuple
)
730 self
._log
.debug("Created external interface tuple : %s", etuple
)
732 # Resolve the networks associated internal interfaces
733 for intf
in self
._vdud
.internal_interface
:
734 cp_id
= intf
.vdu_internal_connection_point_ref
735 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
739 vlr
= find_internal_vlr_by_cp_id(cp_id
)
740 except Exception as e
:
741 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
742 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
743 raise VduRecordError(msg
)
745 ituple
= (intf
, cp_id
, vlr
)
746 self
._int
_intf
.append(ituple
)
748 self
._log
.debug("Created internal interface tuple : %s", ituple
)
750 resmgr_path
= self
.resmgr_path
751 resmgr_msg
= self
.resmgr_msg(config
)
753 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
754 block
.add_query_create(resmgr_path
, resmgr_msg
)
756 res_iter
= yield from block
.execute(now
=True)
764 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
765 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
766 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
767 return resp
.resource_info
770 def delete_resource(self
, xact
):
771 block
= xact
.block_create()
773 self
._log
.debug("Executing vm request id: %s, action: delete",
776 block
.add_query_delete(self
.resmgr_path
)
778 yield from block
.execute(flags
=0, now
=True)
781 def read_resource(self
, xact
):
782 block
= xact
.block_create()
784 self
._log
.debug("Executing vm request id: %s, action: delete",
787 block
.add_query_read(self
.resmgr_path
)
789 res_iter
= yield from block
.execute(flags
=0, now
=True)
794 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
795 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
796 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
797 #self._vm_resp = resp.resource_info
798 return resp
.resource_info
802 def start_component(self
):
803 """ This VDUR is active """
804 self
._log
.debug("Starting component %s for vdud %s vdur %s",
805 self
._vdud
.vcs_component_ref
,
808 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
809 self
.vm_resp
.management_ip
)
813 """ Is this VDU active """
814 return True if self
._state
is VDURecordState
.READY
else False
817 def instantiation_failed(self
, failed_reason
=None):
818 """ VDU instantiation failed """
819 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
820 self
._state
= VDURecordState
.FAILED
821 self
._state
_failed
_reason
= failed_reason
822 yield from self
._vnfr
.instantiation_failed(failed_reason
)
825 def vdu_is_active(self
):
826 """ This VDU is active"""
828 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
831 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
833 if self
._vdud
.vcs_component_ref
is not None:
834 yield from self
.start_component()
836 self
._state
= VDURecordState
.READY
838 if self
._vnfr
.all_vdus_active():
839 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
840 yield from self
._vnfr
.is_ready()
843 def instantiate(self
, xact
, vnfr
, config
=None):
844 """ Instantiate this VDU """
845 self
._state
= VDURecordState
.INSTANTIATING
848 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
849 """ This VDUR is active """
850 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
855 if (query_action
== rwdts
.QueryAction
.UPDATE
or
856 query_action
== rwdts
.QueryAction
.CREATE
):
859 if msg
.resource_state
== "active":
860 # Move this VDU to ready state
861 yield from self
.vdu_is_active()
862 elif msg
.resource_state
== "failed":
863 yield from self
.instantiation_failed(msg
.resource_errors
)
864 elif query_action
== rwdts
.QueryAction
.DELETE
:
865 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
867 raise NotImplementedError(
868 "%s action on VirtualDeployementUnitRecord not supported",
871 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
874 reg_event
= asyncio
.Event(loop
=self
._loop
)
877 def on_ready(regh
, status
):
880 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
881 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
882 flags
=rwdts
.Flag
.SUBSCRIBER
,
884 yield from reg_event
.wait()
886 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
887 self
._vm
_resp
= vm_resp
889 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
890 self
._log
.debug("Requested VM from resource manager response %s",
892 if vm_resp
.resource_state
== "active":
893 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
895 yield from self
.vdu_is_active()
896 self
._state
= VDURecordState
.READY
897 elif (vm_resp
.resource_state
== "pending" or
898 vm_resp
.resource_state
== "inactive"):
899 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
901 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
902 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
903 # flags=rwdts.Flag.SUBSCRIBER,
906 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
908 raise VirtualDeploymentUnitRecordError(
909 "Failed VDUR instantiation %s " % vm_resp
)
911 except Exception as e
:
913 traceback
.print_exc()
914 self
._log
.exception(e
)
915 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
916 self
._state
= VDURecordState
.FAILED
917 yield from self
.instantiation_failed(str(e
))
920 class VlRecordState(enum
.Enum
):
921 """ VL Record State """
923 INSTANTIATION_PENDING
= 102
925 TERMINATE_PENDING
= 104
930 class InternalVirtualLinkRecord(object):
931 """ Internal Virtual Link record """
932 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
936 self
._ivld
_msg
= ivld_msg
937 self
._vnfr
_name
= vnfr_name
938 self
._cloud
_account
_name
= cloud_account_name
940 self
._vlr
_req
= self
.create_vlr()
942 self
._state
= VlRecordState
.INIT
946 """ Find VLR by id """
947 return self
._vlr
_req
.id
951 """ Name of this VL """
952 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
955 def network_id(self
):
956 """ Find VLR by id """
957 return self
._vlr
.network_id
if self
._vlr
else None
960 """ VLR path for this VLR instance"""
961 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
963 def create_vlr(self
):
964 """ Create the VLR record which will be instantiated """
966 vld_fields
= ["short_name",
973 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
975 vlr_dict
= {"id": str(uuid
.uuid4()),
977 "cloud_account": self
._cloud
_account
_name
,
979 vlr_dict
.update(vld_copy_dict
)
981 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
985 def instantiate(self
, xact
, restart_mode
=False):
986 """ Instantiate VL """
989 def instantiate_vlr():
990 """ Instantiate VLR"""
991 self
._log
.debug("Create VL with xpath %s and vlr %s",
992 self
.vlr_path(), self
._vlr
_req
)
994 with self
._dts
.transaction(flags
=0) as xact
:
995 block
= xact
.block_create()
996 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
997 self
._log
.debug("Executing VL create path:%s msg:%s",
998 self
.vlr_path(), self
._vlr
_req
)
1002 res_iter
= yield from block
.execute()
1004 self
._state
= VlRecordState
.FAILED
1005 self
._log
.exception("Caught exception while instantial VL")
1008 for ent
in res_iter
:
1009 res
= yield from ent
1010 self
._vlr
= res
.result
1012 if self
._vlr
.operational_status
== 'failed':
1013 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1014 self
._state
= VlRecordState
.FAILED
1015 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1017 self
._log
.info("Created VL with xpath %s and vlr %s",
1018 self
.vlr_path(), self
._vlr
)
1022 """ Get the network id """
1023 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1025 for ent
in res_iter
:
1026 res
= yield from ent
1030 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1032 raise InternalVirtualLinkRecordError(err
)
1035 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1038 vl
= yield from get_vlr()
1040 yield from instantiate_vlr()
1042 yield from instantiate_vlr()
1044 self
._state
= VlRecordState
.ACTIVE
1046 def vlr_in_vns(self
):
1047 """ Is there a VLR record in VNS """
1048 if (self
._state
== VlRecordState
.ACTIVE
or
1049 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1050 self
._state
== VlRecordState
.FAILED
):
1056 def terminate(self
, xact
):
1057 """Terminate this VL """
1058 if not self
.vlr_in_vns():
1059 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1060 self
.vlr_id
, self
._state
)
1063 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1064 self
._state
= VlRecordState
.TERMINATE_PENDING
1065 block
= xact
.block_create()
1066 block
.add_query_delete(self
.vlr_path())
1067 yield from block
.execute(flags
=0, now
=True)
1068 self
._state
= VlRecordState
.TERMINATED
1069 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1072 class VirtualNetworkFunctionRecord(object):
1073 """ Virtual Network Function Record """
1074 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1078 self
._cluster
_name
= cluster_name
1079 self
._vnfr
_msg
= vnfr_msg
1080 self
._vnfr
_id
= vnfr_msg
.id
1081 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1083 self
._vcs
_handler
= vcs_handler
1084 self
._vnfr
= vnfr_msg
1085 self
._mgmt
_network
= mgmt_network
1087 self
._vnfd
= vnfr_msg
.vnfd
1088 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1089 self
._state
_failed
_reason
= None
1090 self
._ext
_vlrs
= {} # The list of external virtual links
1091 self
._vlrs
= [] # The list of internal virtual links
1092 self
._vdus
= [] # The list of vdu
1093 self
._vlr
_by
_cp
= {}
1095 self
._inventory
= {}
1096 self
._create
_time
= int(time
.time())
1097 self
._vnf
_mon
= None
1098 self
._config
_status
= vnfr_msg
.config_status
1099 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1100 self
._rw
_vnfd
= None
1101 self
._vnfd
_ref
_count
= 0
1103 def _get_vdur_from_vdu_id(self
, vdu_id
):
1104 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1105 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1106 for vdu
in self
._vdus
:
1107 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1108 if vdu
.vdu_id
== vdu_id
:
1111 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1114 def operational_status(self
):
1115 """ Operational status of this VNFR """
1116 op_status_map
= {"INIT": "init",
1117 "VL_INIT_PHASE": "vl_init_phase",
1118 "VM_INIT_PHASE": "vm_init_phase",
1120 "TERMINATE": "terminate",
1121 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1122 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1123 "TERMINATED": "terminated",
1124 "FAILED": "failed", }
1125 return op_status_map
[self
._state
.name
]
1128 def vnfd_xpath(vnfd_id
):
1129 """ VNFD xpath associated with this VNFR """
1130 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1133 def vnfd_ref_count(self
):
1134 """ Returns the VNFD reference count associated with this VNFR """
1135 return self
._vnfd
_ref
_count
1137 def vnfd_in_use(self
):
1138 """ Returns whether vnfd is in use or not """
1139 return True if self
._vnfd
_ref
_count
> 0 else False
1142 """ Take a reference on this object """
1143 self
._vnfd
_ref
_count
+= 1
1144 return self
._vnfd
_ref
_count
1146 def vnfd_unref(self
):
1147 """ Release reference on this object """
1148 if self
._vnfd
_ref
_count
< 1:
1149 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1150 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1151 self
._log
.critical(msg
)
1152 raise VnfRecordError(msg
)
1153 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1154 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1155 self
._vnfd
_ref
_count
-= 1
1156 return self
._vnfd
_ref
_count
1160 """ VNFD for this VNFR """
1165 """ VNFD name associated with this VNFR """
1166 return self
.vnfd
.name
1170 """ Name of this VNF in the record """
1171 return self
._vnfr
.name
1174 def cloud_account_name(self
):
1175 """ Name of the cloud account this VNFR is instantiated in """
1176 return self
._vnfr
.cloud_account
1180 """ VNFD Id associated with this VNFR """
1185 """ VNFR Id associated with this VNFR """
1186 return self
._vnfr
_id
1189 def member_vnf_index(self
):
1190 """ Member VNF index associated with this VNFR """
1191 return self
._vnfr
.member_vnf_index_ref
1194 def config_status(self
):
1195 """ Config agent status for this VNFR """
1196 return self
._config
_status
1198 def component_by_name(self
, component_name
):
1199 """ Find a component by name in the inventory list"""
1200 mangled_name
= VcsComponent
.mangle_name(component_name
,
1203 return self
._inventory
[mangled_name
]
1208 def get_nsr_config(self
):
1209 ### Need access to NS instance configuration for runtime resolution.
1210 ### This shall be replaced when deployment flavors are implemented
1211 xpath
= "C,/nsr:ns-instance-config"
1212 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1214 for result
in results
:
1215 entry
= yield from result
1216 ns_instance_config
= entry
.result
1217 for nsr
in ns_instance_config
.nsr
:
1218 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1223 def start_component(self
, component_name
, ip_addr
):
1224 """ Start a component in the VNFR by name """
1225 comp
= self
.component_by_name(component_name
)
1226 yield from comp
.start(None, None, ip_addr
)
1228 def cp_ip_addr(self
, cp_name
):
1229 """ Get ip address for connection point """
1230 self
._log
.debug("cp_ip_addr()")
1231 for cp
in self
._cprs
:
1232 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1233 return cp
.ip_address
1236 def mgmt_intf_info(self
):
1237 """ Get Management interface info for this VNFR """
1238 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1240 if mgmt_intf_desc
.has_field("cp"):
1241 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1242 elif mgmt_intf_desc
.has_field("vdu_id"):
1244 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1245 ip_addr
= vdur
.management_ip
1246 except VDURecordNotFound
:
1247 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1250 ip_addr
= mgmt_intf_desc
.ip_address
1251 port
= mgmt_intf_desc
.port
1253 return ip_addr
, port
1257 """ Message associated with this VNFR """
1258 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1259 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1261 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1262 ip_address
, port
= self
.mgmt_intf_info()
1264 if ip_address
is not None:
1265 mgmt_intf
.ip_address
= ip_address
1266 if port
is not None:
1267 mgmt_intf
.port
= port
1269 vnfr_dict
= {"id": self
._vnfr
_id
,
1270 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1272 "member_vnf_index_ref": self
.member_vnf_index
,
1273 "operational_status": self
.operational_status
,
1274 "operational_status_details": self
._state
_failed
_reason
,
1275 "cloud_account": self
.cloud_account_name
,
1276 "config_status": self
._config
_status
1279 vnfr_dict
.update(vnfd_copy_dict
)
1281 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1282 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1284 vnfr_msg
.create_time
= self
._create
_time
1285 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1286 vnfr_msg
.mgmt_interface
= mgmt_intf
1288 # Add all the VLRs to VNFR
1289 for vlr
in self
._vlrs
:
1290 ivlr
= vnfr_msg
.internal_vlr
.add()
1291 ivlr
.vlr_ref
= vlr
.vlr_id
1293 # Add all the VDURs to VDUR
1294 if self
._vdus
is not None:
1295 for vdu
in self
._vdus
:
1296 vdur
= vnfr_msg
.vdur
.add()
1297 vdur
.from_dict(vdu
.msg
.as_dict())
1299 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1300 vnfr_msg
.dashboard_url
= self
.dashboard_url
1302 for cpr
in self
._cprs
:
1303 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1304 vnfr_msg
.connection_point
.append(new_cp
)
1306 if self
._vnf
_mon
is not None:
1307 for monp
in self
._vnf
_mon
.msg
:
1308 vnfr_msg
.monitoring_param
.append(
1309 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1311 if self
._vnfr
.vnf_configuration
is not None:
1312 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1313 if (ip_address
is not None and
1314 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1315 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1317 for group
in self
._vnfr
_msg
.placement_groups_info
:
1318 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1319 group_info
.from_dict(group
.as_dict())
1320 vnfr_msg
.placement_groups_info
.append(group_info
)
1325 def dashboard_url(self
):
1326 ip
, cfg_port
= self
.mgmt_intf_info()
1329 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1330 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1333 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1334 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1336 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1340 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1347 """ path for this VNFR """
1348 return("D,/vnfr:vnfr-catalog"
1349 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1352 def publish(self
, xact
):
1353 """ publish this VNFR """
1355 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1356 self
.xpath
, self
.msg
)
1357 vnfr
.create_time
= self
._create
_time
1358 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1359 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1360 self
.xpath
, self
.msg
)
1363 def create_vls(self
):
1364 """ Publish The VLs associated with this VNF """
1365 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1367 for ivld_msg
in self
.vnfd
.internal_vld
:
1368 self
._log
.debug("Creating internal vld:"
1369 " %s, int_cp_ref = %s",
1370 ivld_msg
, ivld_msg
.internal_connection_point
1372 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1376 vnfr_name
=self
.name
,
1377 cloud_account_name
=self
.cloud_account_name
1379 self
._vlrs
.append(vlr
)
1381 for int_cp
in ivld_msg
.internal_connection_point
:
1382 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1383 msg
= ("Connection point %s already "
1384 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1385 raise InternalVirtualLinkRecordError(msg
)
1386 self
._log
.debug("Setting vlr %s to internal cp = %s",
1388 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1391 def instantiate_vls(self
, xact
, restart_mode
=False):
1392 """ Instantiate the VLs associated with this VNF """
1393 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1396 for vlr
in self
._vlrs
:
1397 self
._log
.debug("Instantiating VLR %s", vlr
)
1398 yield from vlr
.instantiate(xact
, restart_mode
)
1400 def find_vlr_by_cp(self
, cp_name
):
1401 """ Find the VLR associated with the cp name """
1402 return self
._vlr
_by
_cp
[cp_name
]
1404 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1406 Returns the cloud specific construct for placement group
1408 input_group: VNFD PlacementGroup
1409 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1411 copy_dict
= ['name', 'requirement', 'strategy']
1412 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1413 if group_info
.placement_group_ref
== input_group
.name
and \
1414 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1415 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1416 group_dict
= {k
:v
for k
,v
in
1417 group_info
.as_dict().items()
1418 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1419 for param
in copy_dict
:
1420 group_dict
.update({param
: getattr(input_group
, param
)})
1421 group
.from_dict(group_dict
)
1426 def get_vdu_placement_groups(self
, vdu
):
1427 placement_groups
= []
1428 ### Step-1: Get VNF level placement groups
1429 for group
in self
._vnfr
_msg
.placement_groups_info
:
1430 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1431 #group_info.from_dict(group.as_dict())
1432 placement_groups
.append(group
)
1434 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1435 nsr_config
= yield from self
.get_nsr_config()
1437 ### Step-3: Get VDU level placement groups
1438 for group
in self
.vnfd
.placement_groups
:
1439 for member_vdu
in group
.member_vdus
:
1440 if member_vdu
.member_vdu_ref
== vdu
.id:
1441 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1443 if group_info
is None:
1444 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1445 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1447 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1451 self
.member_vnf_index
)
1452 placement_groups
.append(group_info
)
1454 return placement_groups
1457 def create_vdus(self
, vnfr
, restart_mode
=False):
1458 """ Create the VDUs associated with this VNF """
1460 def get_vdur_id(vdud
):
1461 """Get the corresponding VDUR's id for the VDUD. This is useful in
1464 In restart mode we check for exiting VDUR's ID and use them, if
1465 available. This way we don't end up creating duplicate VDURs
1469 if restart_mode
and vdud
is not None:
1471 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1474 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1479 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1480 for vdu
in self
._rw
_vnfd
.vdu
:
1481 self
._log
.debug("Creating vdu: %s", vdu
)
1482 vdur_id
= get_vdur_id(vdu
)
1484 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1485 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1488 self
.member_vnf_index
,
1489 [ group
.name
for group
in placement_groups
])
1491 vdur
= VirtualDeploymentUnitRecord(
1497 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1498 mgmt_network
=self
._mgmt
_network
,
1499 cloud_account_name
=self
.cloud_account_name
,
1500 vnfd_package_store
=self
._vnfd
_package
_store
,
1502 placement_groups
= placement_groups
,
1504 yield from vdur
.vdu_opdata_register()
1506 self
._vdus
.append(vdur
)
1509 def instantiate_vdus(self
, xact
, vnfr
):
1510 """ Instantiate the VDUs associated with this VNF """
1511 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1513 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1515 # Identify any dependencies among the VDUs
1516 dependencies
= collections
.defaultdict(list)
1517 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1519 for vdu
in self
._vdus
:
1520 if vdu
.vdud_cloud_init
is not None:
1521 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1522 if vdu_id
!= vdu
.vdu_id
:
1523 # This means that vdu.vdu_id depends upon vdu_id,
1524 # i.e. vdu_id must be instantiated before
1526 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1528 # Define the terminal states of VDU instantiation
1530 VDURecordState
.READY
,
1531 VDURecordState
.TERMINATED
,
1532 VDURecordState
.FAILED
,
1535 datastore
= VdurDatastore()
1539 def instantiate_monitor(vdu
):
1540 """Monitor the state of the VDU during instantiation
1543 vdu - a VirtualDeploymentUnitRecord
1546 # wait for the VDUR to enter a terminal state
1547 while vdu
._state
not in terminal
:
1548 yield from asyncio
.sleep(1, loop
=self
._loop
)
1550 # update the datastore
1551 datastore
.update(vdu
)
1553 # add the VDU to the set of processed VDUs
1554 processed
.add(vdu
.vdu_id
)
1557 def instantiate(vdu
):
1558 """Instantiate the specified VDU
1561 vdu - a VirtualDeploymentUnitRecord
1564 if the VDU, or any of the VDUs this VDU depends upon, are
1565 terminated or fail to instantiate properly, a
1566 VirtualDeploymentUnitRecordError is raised.
1569 for dependency
in dependencies
[vdu
.vdu_id
]:
1570 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1572 while dependency
.vdu_id
not in processed
:
1573 yield from asyncio
.sleep(1, loop
=self
._loop
)
1575 if not dependency
.active
:
1576 raise VirtualDeploymentUnitRecordError()
1578 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1580 # Populate the datastore with the current values of the VDU
1583 # Substitute any variables contained in the cloud config script
1584 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1586 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1589 # Extract the variable names
1591 for variable
in parts
[1::2]:
1592 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1594 # Iterate of the variables and substitute values from the
1596 for variable
in variables
:
1598 # Handle a reference to a VDU by ID
1599 if variable
.startswith('vdu['):
1600 value
= datastore
.get(variable
)
1602 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1603 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1605 config
= config
.replace("{{ %s }}" % variable
, value
)
1608 # Handle a reference to the current VDU
1609 if variable
.startswith('vdu'):
1610 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1611 config
= config
.replace("{{ %s }}" % variable
, value
)
1614 # Handle unrecognized variables
1615 msg
= 'unrecognized cloud-config variable: {}'
1616 raise ValueError(msg
.format(variable
))
1618 # Instantiate the VDU
1619 with self
._dts
.transaction() as xact
:
1620 self
._log
.debug("Instantiating vdu: %s", vdu
)
1621 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1622 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1623 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1626 # First create a set of tasks to monitor the state of the VDUs and
1627 # report when they have entered a terminal state
1628 for vdu
in self
._vdus
:
1629 self
._loop
.create_task(instantiate_monitor(vdu
))
1631 for vdu
in self
._vdus
:
1632 self
._loop
.create_task(instantiate(vdu
))
1634 def has_mgmt_interface(self
, vdu
):
1635 # ## TODO: Support additional mgmt_interface type options
1636 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1640 def vlr_xpath(self
, vlr_id
):
1643 "D,/vlr:vlr-catalog/"
1644 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1646 def ext_vlr_by_id(self
, vlr_id
):
1647 """ find ext vlr by id """
1648 return self
._ext
_vlrs
[vlr_id
]
1651 def publish_inventory(self
, xact
):
1652 """ Publish the inventory associated with this VNF """
1653 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1655 for component
in self
._rw
_vnfd
.component
:
1656 self
._log
.debug("Creating inventory component %s", component
)
1657 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1661 comp
= VcsComponent(dts
=self
._dts
,
1664 cluster_name
=self
._cluster
_name
,
1665 vcs_handler
=self
._vcs
_handler
,
1666 component
=component
,
1667 mangled_name
=mangled_name
,
1669 if comp
.name
in self
._inventory
:
1670 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1671 component
, self
._vnfd
_id
)
1673 self
._log
.debug("Adding component %s for vnrf %s",
1674 comp
.name
, self
._vnfr
_id
)
1675 self
._inventory
[comp
.name
] = comp
1676 yield from comp
.publish(xact
)
1678 def all_vdus_active(self
):
1679 """ Are all VDUS in this VNFR active? """
1680 for vdu
in self
._vdus
:
1684 self
._log
.debug("Inside all_vdus_active. Returning True")
1688 def instantiation_failed(self
, failed_reason
=None):
1689 """ VNFR instantiation failed """
1690 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1691 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1692 self
._state
_failed
_reason
= failed_reason
1694 # Update the VNFR with the changed status
1695 yield from self
.publish(None)
1699 """ This VNF is ready"""
1700 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1702 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1703 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1706 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1708 # Update the VNFR with the changed status
1709 yield from self
.publish(None)
1711 def update_cp(self
, cp_name
, ip_address
, cp_id
):
1712 """Updated the connection point with ip address"""
1713 for cp
in self
._cprs
:
1714 if cp
.name
== cp_name
:
1715 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1716 cp_name
, cp
, ip_address
, cp_id
)
1717 cp
.ip_address
= ip_address
1718 cp
.connection_point_id
= cp_id
1721 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1722 self
._log
.debug(err
)
1723 raise VirtualDeploymentUnitRecordError(err
)
1725 def set_state(self
, state
):
1726 """ Set state for this VNFR"""
1730 def instantiate(self
, xact
, restart_mode
=False):
1731 """ instantiate this VNF """
1732 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1733 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1738 # Iterate over all the connection points in VNFR and fetch the
1741 def cpr_from_cp(cp
):
1742 """ Creates a record level connection point from the desciptor cp"""
1743 cp_fields
= ["name", "image", "vm-flavor", "static_ip_address"]
1744 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1746 cpr_dict
.update(cp_copy_dict
)
1747 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1749 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1750 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1752 for cp
in self
._vnfr
.connection_point
:
1753 cpr
= cpr_from_cp(cp
)
1754 self
._cprs
.append(cpr
)
1755 self
._log
.debug("Adding Connection point record %s ", cp
)
1757 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1758 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1759 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1760 rwdts
.XactFlag
.MERGE
)
1764 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1765 cpr
.vlr_ref
= cp
.vlr_ref
1766 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1768 # Increase the VNFD reference count
1773 # Fetch External VLRs
1774 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1775 yield from fetch_vlrs()
1778 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1779 yield from self
.publish_inventory(xact
)
1782 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1783 yield from self
.create_vls()
1786 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1787 yield from self
.publish(xact
)
1790 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1792 yield from self
.instantiate_vls(xact
, restart_mode
)
1793 except Exception as e
:
1794 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1795 yield from self
.instantiation_failed(str(e
))
1798 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1801 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1802 yield from self
.create_vdus(self
, restart_mode
)
1805 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1806 yield from self
.publish(xact
)
1809 # ToDo: Check if this should be prevented during restart
1810 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1811 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1814 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1815 yield from self
.publish(xact
)
1817 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1819 # create task updating uptime for this vnfr
1820 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1821 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1824 def terminate(self
, xact
):
1825 """ Terminate this virtual network function """
1827 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1829 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1832 if self
._vnf
_mon
is not None:
1833 self
._vnf
_mon
.stop()
1834 self
._vnf
_mon
.deregister()
1835 self
._vnf
_mon
= None
1838 def terminate_vls():
1839 """ Terminate VLs in this VNF """
1840 for vl
in self
._vlrs
:
1841 yield from vl
.terminate(xact
)
1844 def terminate_vdus():
1845 """ Terminate VDUS in this VNF """
1846 for vdu
in self
._vdus
:
1847 yield from vdu
.terminate(xact
)
1849 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1850 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1851 yield from terminate_vls()
1853 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1854 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1855 yield from terminate_vdus()
1857 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1858 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1861 def vnfr_uptime_update(self
, xact
):
1863 # Return when vnfr state is FAILED or TERMINATED etc
1864 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1865 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1866 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1867 VirtualNetworkFunctionRecordState
.READY
]:
1869 yield from self
.publish(xact
)
1870 yield from asyncio
.sleep(2, loop
=self
._loop
)
1874 class VnfdDtsHandler(object):
1875 """ DTS handler for VNFD config changes """
1876 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1878 def __init__(self
, dts
, log
, loop
, vnfm
):
1887 """ DTS registration handle """
1892 """ Register for VNFD configuration"""
1894 def on_apply(dts
, acg
, xact
, action
, scratch
):
1895 """Apply the configuration"""
1896 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1897 xact
, action
, scratch
)
1899 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1902 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1903 """ on prepare callback """
1904 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1905 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1906 fref
= ProtobufC
.FieldReference
.alloc()
1907 fref
.goto_whole_message(msg
.to_pbcm())
1909 # Handle deletes in prepare_callback
1910 if fref
.is_field_deleted():
1911 # Delete an VNFD record
1912 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1913 if self
._vnfm
.vnfd_in_use(msg
.id):
1914 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1915 err
= "Cannot delete a VNFD in use - %s" % msg
1916 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1917 # Delete a VNFD record
1918 yield from self
._vnfm
.delete_vnfd(msg
.id)
1920 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1923 "Registering for VNFD config using xpath: %s",
1924 VnfdDtsHandler
.XPATH
,
1926 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1927 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1928 self
._regh
= acg
.register(
1929 xpath
=VnfdDtsHandler
.XPATH
,
1930 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1931 on_prepare
=on_prepare
)
1934 class VcsComponentDtsHandler(object):
1935 """ Vcs Component DTS handler """
1936 XPATH
= ("D,/rw-manifest:manifest" +
1937 "/rw-manifest:operational-inventory" +
1938 "/rw-manifest:component")
1940 def __init__(self
, dts
, log
, loop
, vnfm
):
1949 """ DTS registration handle """
1954 """ Registers VCS component dts publisher registration"""
1955 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
1956 VcsComponentDtsHandler
.XPATH
)
1958 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1959 handlers
= rift
.tasklets
.Group
.Handler()
1960 with self
._dts
.group_create(handler
=handlers
) as group
:
1961 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
1963 flags
=(rwdts
.Flag
.PUBLISHER |
1964 rwdts
.Flag
.NO_PREP_READ |
1965 rwdts
.Flag
.DATASTORE
),)
1968 def publish(self
, xact
, path
, msg
):
1969 """ Publishes the VCS component """
1970 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
1972 self
.regh
.create_element(path
, msg
)
1973 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1974 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
1976 class VnfrConsoleOperdataDtsHandler(object):
1977 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1979 def vnfr_vdu_console_xpath(self
):
1980 """ path for resource-mgr"""
1981 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
1983 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
1990 self
._vnfr
_id
= vnfr_id
1991 self
._vdur
_id
= vdur_id
1992 self
._vdu
_id
= vdu_id
1996 """ Register for VNFR VDU Operational Data read from dts """
1999 def on_prepare(xact_info
, action
, ks_path
, msg
):
2000 """ prepare callback from dts """
2001 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2003 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2004 xact_info
, action
, xpath
, msg
2007 if action
== rwdts
.QueryAction
.READ
:
2008 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2009 path_entry
= schema
.keyspec_to_entry(ks_path
)
2010 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2012 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2013 except VnfRecordError
as e
:
2014 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2015 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2018 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2019 if not vdur
._state
== VDURecordState
.READY
:
2020 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2021 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2023 with self
._dts
.transaction() as new_xact
:
2024 resp
= yield from vdur
.read_resource(new_xact
)
2025 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2026 vdur_console
.id = self
._vdur
_id
2027 if resp
.console_url
:
2028 vdur_console
.console_url
= resp
.console_url
2030 vdur_console
.console_url
= 'none'
2031 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2033 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2034 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2035 vdur_console
.id = self
._vdur
_id
2036 vdur_console
.console_url
= 'none'
2038 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2039 xpath
=self
.vnfr_vdu_console_xpath
,
2042 #raise VnfRecordError("Not supported operation %s" % action)
2043 self
._log
.error("Not supported operation %s" % action
)
2044 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2048 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2049 self
.vnfr_vdu_console_xpath
)
2050 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2051 with self
._dts
.group_create() as group
:
2052 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2054 flags
=rwdts
.Flag
.PUBLISHER
,
2058 class VnfrDtsHandler(object):
2059 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2060 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2062 def __init__(self
, dts
, log
, loop
, vnfm
):
2072 """ Return registration handle"""
2077 """ Return VNF manager instance """
2082 """ Register for vnfr create/update/delete/read requests from dts """
2083 def on_commit(xact_info
):
2084 """ The transaction has been committed """
2085 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2086 return rwdts
.MemberRspCode
.ACTION_OK
2088 def on_abort(*args
):
2089 """ Abort callback """
2090 self
._log
.debug("VNF transaction got aborted")
2093 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2096 def instantiate_realloc_vnfr(vnfr
):
2097 """Re-populate the vnfm after restart
2104 yield from vnfr
.instantiate(None, restart_mode
=True)
2106 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2107 curr_cfg
= self
.regh
.elements
2108 for cfg
in curr_cfg
:
2109 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2110 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2112 self
._log
.debug("Got on_event in vnfm")
2114 return rwdts
.MemberRspCode
.ACTION_OK
2117 def on_prepare(xact_info
, action
, ks_path
, msg
):
2118 """ prepare callback from dts """
2120 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2121 xact_info
, action
, msg
2124 if action
== rwdts
.QueryAction
.CREATE
:
2125 if not msg
.has_field("vnfd"):
2126 err
= "Vnfd not provided"
2127 self
._log
.error(err
)
2128 raise VnfRecordError(err
)
2130 vnfr
= self
.vnfm
.create_vnfr(msg
)
2132 # RIFT-9105: Unable to add a READ query under an existing transaction
2133 # xact = xact_info.xact
2134 yield from vnfr
.instantiate(None)
2135 except Exception as e
:
2136 self
._log
.exception(e
)
2137 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2138 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2139 yield from vnfr
.publish(None)
2140 elif action
== rwdts
.QueryAction
.DELETE
:
2141 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2142 path_entry
= schema
.keyspec_to_entry(ks_path
)
2143 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2146 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2147 raise VirtualNetworkFunctionRecordNotFound(
2148 "VNFR id %s", path_entry
.key00
.id)
2151 yield from vnfr
.terminate(xact_info
.xact
)
2154 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2155 except Exception as e
:
2156 self
._log
.exception(e
)
2157 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2159 elif action
== rwdts
.QueryAction
.UPDATE
:
2160 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2161 path_entry
= schema
.keyspec_to_entry(ks_path
)
2164 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2165 except Exception as e
:
2166 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2167 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2171 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2172 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2175 self
._log
.debug("VNFR {} update config status {} (current {})".
2176 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2177 # Update the config status and publish
2178 vnfr
._config
_status
= msg
.config_status
2179 yield from vnfr
.publish(None)
2182 raise NotImplementedError(
2183 "%s action on VirtualNetworkFunctionRecord not supported",
2186 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2188 self
._log
.debug("Registering for VNFR using xpath: %s",
2189 VnfrDtsHandler
.XPATH
,)
2191 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2192 on_prepare
=on_prepare
,)
2193 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2194 with self
._dts
.group_create(handler
=handlers
) as group
:
2195 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2197 flags
=(rwdts
.Flag
.PUBLISHER |
2198 rwdts
.Flag
.NO_PREP_READ |
2200 rwdts
.Flag
.DATASTORE
),)
2203 def create(self
, xact
, path
, msg
):
2205 Create a VNFR record in DTS with path and message
2207 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2210 self
.regh
.create_element(path
, msg
)
2211 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2215 def update(self
, xact
, path
, msg
):
2217 Update a VNFR record in DTS with path and message
2219 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2221 self
.regh
.update_element(path
, msg
)
2222 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2226 def delete(self
, xact
, path
):
2228 Delete a VNFR record in DTS with path and message
2230 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2231 self
.regh
.delete_element(path
)
2232 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2235 class VnfdRefCountDtsHandler(object):
2236 """ The VNFD Ref Count DTS handler """
2237 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2239 def __init__(self
, dts
, log
, loop
, vnfm
):
2249 """ Return registration handle """
2254 """ Return the NS manager instance """
2259 """ Register for VNFD ref count read from dts """
2262 def on_prepare(xact_info
, action
, ks_path
, msg
):
2263 """ prepare callback from dts """
2264 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2266 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2267 xact_info
, action
, xpath
, msg
2270 if action
== rwdts
.QueryAction
.READ
:
2271 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2272 path_entry
= schema
.keyspec_to_entry(ks_path
)
2273 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2274 for xpath
, msg
in vnfd_list
:
2275 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2277 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2280 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2282 raise VnfRecordError("Not supported operation %s" % action
)
2284 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2285 with self
._dts
.group_create() as group
:
2286 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2288 flags
=rwdts
.Flag
.PUBLISHER
,
2292 class VdurDatastore(object):
2294 This VdurDatastore is intended to expose select information about a VDUR
2295 such that it can be referenced in a cloud config file. The data that is
2296 exposed does not necessarily follow the structure of the data in the yang
2297 model. This is intentional. The data that are exposed are intended to be
2298 agnostic of the yang model so that changes in the model do not necessarily
2299 require changes to the interface provided to the user. It also means that
2300 the user does not need to be familiar with the RIFT.ware yang models.
2304 """Create an instance of VdurDatastore"""
2305 self
._vdur
_data
= dict()
2306 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2308 def add(self
, vdur
):
2309 """Add a new VDUR to the datastore
2312 vdur - a VirtualDeploymentUnitRecord instance
2315 A ValueError is raised if the VDUR is (1) None or (2) already in
2319 if vdur
.vdu_id
is None:
2320 raise ValueError('VDURs are required to have an ID')
2322 if vdur
.vdu_id
in self
._vdur
_data
:
2323 raise ValueError('cannot add a VDUR more than once')
2325 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2327 def set_if_not_none(key
, attr
):
2328 if attr
is not None:
2329 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2331 set_if_not_none('name', vdur
._vdud
.name
)
2332 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2334 def update(self
, vdur
):
2335 """Update the VDUR information in the datastore
2338 vdur - a GI representation of a VDUR
2341 A ValueError is raised if the VDUR is (1) None or (2) already in
2345 if vdur
.vdu_id
is None:
2346 raise ValueError('VNFDs are required to have an ID')
2348 if vdur
.vdu_id
not in self
._vdur
_data
:
2349 raise ValueError('VNF is not recognized')
2351 def set_or_delete(key
, attr
):
2353 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2354 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2357 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2359 set_or_delete('name', vdur
._vdud
.name
)
2360 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2362 def remove(self
, vdur_id
):
2363 """Remove all of the data associated with specified VDUR
2366 vdur_id - the identifier of a VNFD in the datastore
2369 A ValueError is raised if the VDUR is not contained in the
2373 if vdur_id
not in self
._vdur
_data
:
2374 raise ValueError('VNF is not recognized')
2376 del self
._vdur
_data
[vdur_id
]
2378 def get(self
, expr
):
2379 """Retrieve VDUR information from the datastore
2381 An expression should be of the form,
2385 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2386 the exposed attribute that the user wishes to retrieve.
2388 If the requested data is not available, None is returned.
2391 expr - a string that specifies the data to return
2394 A ValueError is raised if the provided expression cannot be parsed.
2397 The requested data or None
2400 result
= self
._pattern
.match(expr
)
2402 raise ValueError('data expression not recognized ({})'.format(expr
))
2404 vdur_id
, key
= result
.groups()
2406 if vdur_id
not in self
._vdur
_data
:
2409 return self
._vdur
_data
[vdur_id
].get(key
, None)
2412 class VnfManager(object):
2413 """ The virtual network function manager class """
2414 def __init__(self
, dts
, log
, loop
, cluster_name
):
2418 self
._cluster
_name
= cluster_name
2420 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2421 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2422 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2423 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2425 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2428 self
._vnfr
_ref
_handler
,
2431 self
._vnfds
_to
_vnfr
= {}
2435 def vnfr_handler(self
):
2436 """ VNFR dts handler """
2437 return self
._vnfr
_handler
2440 def vcs_handler(self
):
2441 """ VCS dts handler """
2442 return self
._vcs
_handler
2446 """ Register all static DTS handlers """
2447 for hdl
in self
._dts
_handlers
:
2448 yield from hdl
.register()
2452 """ Run this VNFM instance """
2453 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2454 yield from self.register()
2456 def handle_nsr(self, nsr, action):
2457 if action in [rwdts.QueryAction.CREATE]:
2458 self._nsrs[nsr.id] = nsr
2459 elif action == rwdts.QueryAction.DELETE:
2460 if nsr.id in self._nsrs:
2461 del self._nsrs[nsr.id]
2463 def get_linked_mgmt_network(self, vnfr):
2464 """For the given VNFR get the related mgmt network from the NSD, if
2467 vnfd_id = vnfr.vnfd.id
2468 nsr_id = vnfr.nsr_id_ref
2470 # for the given related VNFR, get the corresponding NSR-config
2473 nsr_obj = self._nsrs[nsr_id]
2475 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2477 # for the related NSD check if a VLD exists such that it's a mgmt
2479 for vld in nsr_obj.nsd.vld:
2480 if vld.mgmt_network:
2485 def get_vnfr(self, vnfr_id):
2486 """ get VNFR by vnfr id """
2488 if vnfr_id not in self._vnfrs:
2489 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2491 return self._vnfrs[vnfr_id]
2493 def create_vnfr(self, vnfr):
2494 """ Create a VNFR instance """
2495 if vnfr.id in self._vnfrs:
2496 msg = "Vnfr
id %s already exists
" % vnfr.id
2497 self._log.error(msg)
2498 raise VnfRecordError(msg)
2500 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2504 mgmt_network = self.get_linked_mgmt_network(vnfr)
2506 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2507 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2508 mgmt_network=mgmt_network
2510 self._vnfds_to_vnfr[vnfr.vnfd.id] = self._vnfrs[vnfr.id]
2511 return self._vnfrs[vnfr.id]
2514 def delete_vnfr(self, xact, vnfr):
2515 """ Create a VNFR instance """
2516 if vnfr.vnfr_id in self._vnfrs:
2517 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2518 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2519 del self._vnfrs[vnfr.vnfr_id]
2522 def fetch_vnfd(self, vnfd_id):
2523 """ Fetch VNFDs based with the vnfd id"""
2524 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2525 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2528 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2530 for ent in res_iter:
2531 res = yield from ent
2535 err = "Failed to get Vnfd
%s" % vnfd_id
2536 self._log.error(err)
2537 raise VnfRecordError(err)
2539 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2543 def vnfd_in_use(self, vnfd_id):
2544 """ Is this VNFD in use """
2545 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2546 if vnfd_id in self._vnfds_to_vnfr:
2547 return self._vnfds_to_vnfr[vnfd_id].in_use()
2551 def publish_vnfr(self, xact, path, msg):
2552 """ Publish a VNFR """
2553 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2555 yield from self.vnfr_handler.update(xact, path, msg)
2558 def delete_vnfd(self, vnfd_id):
2559 """ Delete the Virtual Network Function descriptor with the passed id """
2560 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2561 if vnfd_id not in self._vnfds_to_vnfr:
2562 self._log.debug("Delete VNFD failed
- cannot find vnfd
-id %s", vnfd_id)
2563 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find
%s", vnfd_id)
2565 if self._vnfds_to_vnfr[vnfd_id].in_use():
2566 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2568 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2569 raise VirtualNetworkFunctionDescriptorRefCountExists(
2570 "Cannot delete
:%s, ref_count
:%s",
2572 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2574 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2576 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2577 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2578 if os.path.exists(vnfd_dir):
2579 shutil.rmtree(vnfd_dir, ignore_errors=True)
2580 except Exception as e:
2581 self._log.error("Exception in cleaning up VNFD
{}: {}".
2582 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2583 self._log.exception(e)
2585 del self._vnfds_to_vnfr[vnfd_id]
2587 def vnfd_refcount_xpath(self, vnfd_id):
2588 """ xpath for ref count entry """
2589 return (VnfdRefCountDtsHandler.XPATH +
2590 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2593 def get_vnfd_refcount(self, vnfd_id):
2594 """ Get the vnfd_list from this VNFM"""
2596 if vnfd_id is None or vnfd_id == "":
2597 for vnfr in self._vnfds_to_vnfr.values():
2598 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2599 vnfd_msg.vnfd_id_ref = vnfr.vnfd.id
2600 vnfd_msg.instance_ref_count = vnfr.vnfd_ref_count
2601 vnfd_list.append((self.vnfd_refcount_xpath(vnfr.vnfd.id), vnfd_msg))
2602 elif vnfd_id in self._vnfds_to_vnfr:
2603 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2604 vnfd_msg.vnfd_id_ref = self._vnfds_to_vnfr[vnfd_id].vnfd.id
2605 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count
2606 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2611 class VnfmTasklet(rift.tasklets.Tasklet):
2612 """ VNF Manager tasklet class """
2613 def __init__(self, *args, **kwargs):
2614 super(VnfmTasklet, self).__init__(*args, **kwargs)
2615 self.rwlog.set_category("rw
-mano
-log
")
2616 self.rwlog.set_subcategory("vnfm
")
2623 super(VnfmTasklet, self).start()
2624 self.log.info("Starting VnfmTasklet
")
2626 self.log.setLevel(logging.DEBUG)
2628 self.log.debug("Registering with dts
")
2629 self._dts = rift.tasklets.DTS(self.tasklet_info,
2630 RwVnfmYang.get_schema(),
2632 self.on_dts_state_change)
2634 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2636 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2639 def on_instance_started(self):
2640 """ Task insance started callback """
2641 self.log.debug("Got instance started callback
")
2647 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2652 """ Task init callback """
2654 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2655 assert vm_parent_name is not None
2656 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2657 yield from self._vnfm.run()
2659 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2664 """ Task run callback """
2668 def on_dts_state_change(self, state):
2669 """Take action according to current dts state to transition
2670 application into the corresponding application state
2673 state - current dts state
2676 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2677 rwdts.State.CONFIG: rwdts.State.RUN,
2681 rwdts.State.INIT: self.init,
2682 rwdts.State.RUN: self.run,
2685 # Transition application to next state
2686 handler = handlers.get(state, None)
2687 if handler is not None:
2688 yield from handler()
2690 # Transition dts to next state
2691 next_state = switch.get(state, None)
2692 if next_state is not None:
2693 self._dts.handle.set_state(next_state)