Fix project delete failures
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54 from rift.mano.utils.project import (
55 ManoProject,
56 ProjectHandler,
57 )
58
59
60 class VMResourceError(Exception):
61 """ VM resource Error"""
62 pass
63
64
65 class VnfRecordError(Exception):
66 """ VNF record instatiation failed"""
67 pass
68
69
70 class VduRecordError(Exception):
71 """ VDU record instatiation failed"""
72 pass
73
74
75 class NotImplemented(Exception):
76 """Not implemented """
77 pass
78
79
80 class VnfrRecordExistsError(Exception):
81 """VNFR record already exist with the same VNFR id"""
82 pass
83
84
85 class InternalVirtualLinkRecordError(Exception):
86 """Internal virtual link record error"""
87 pass
88
89
90 class VDUImageNotFound(Exception):
91 """VDU Image not found error"""
92 pass
93
94
95 class VirtualDeploymentUnitRecordError(Exception):
96 """VDU Instantiation failed"""
97 pass
98
99
100 class VMNotReadyError(Exception):
101 """ VM Not yet received from resource manager """
102 pass
103
104
105 class VDURecordNotFound(Exception):
106 """ Could not find a VDU record """
107 pass
108
109
110 class VirtualNetworkFunctionRecordDescNotFound(Exception):
111 """ Cannot find Virtual Network Function Record Descriptor """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorError(Exception):
116 """ Virtual Network Function Record Descriptor Error """
117 pass
118
119
120 class VirtualNetworkFunctionDescriptorNotFound(Exception):
121 """ Virtual Network Function Record Descriptor Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionRecordNotFound(Exception):
126 """ Virtual Network Function Record Not Found """
127 pass
128
129
130 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
131 """ Virtual Network Funtion Descriptor reference count exists """
132 pass
133
134
135 class VnfrInstantiationFailed(Exception):
136 """ Virtual Network Funtion Instantiation failed"""
137 pass
138
139
140 class VNFMPlacementGroupError(Exception):
141 pass
142
143 class VirtualNetworkFunctionRecordState(enum.Enum):
144 """ VNFR state """
145 INIT = 1
146 VL_INIT_PHASE = 2
147 VM_INIT_PHASE = 3
148 READY = 4
149 TERMINATE = 5
150 VL_TERMINATE_PHASE = 6
151 VDU_TERMINATE_PHASE = 7
152 TERMINATED = 7
153 FAILED = 10
154
155
156 class VDURecordState(enum.Enum):
157 """VDU record state """
158 INIT = 1
159 INSTANTIATING = 2
160 RESOURCE_ALLOC_PENDING = 3
161 READY = 4
162 TERMINATING = 5
163 TERMINATED = 6
164 FAILED = 10
165
166
167 class VcsComponent(object):
168 """ VCS Component within the VNF descriptor """
169 def __init__(self, dts, log, loop, cluster_name,
170 vcs_handler, component, mangled_name):
171 self._dts = dts
172 self._log = log
173 self._loop = loop
174 self._component = component
175 self._cluster_name = cluster_name
176 self._vcs_handler = vcs_handler
177 self._mangled_name = mangled_name
178
179 @staticmethod
180 def mangle_name(component_name, vnf_name, vnfd_id):
181 """ mangled component name """
182 return vnf_name + ":" + component_name + ":" + vnfd_id
183
184 @property
185 def name(self):
186 """ name of this component"""
187 return self._mangled_name
188
189 @property
190 def path(self):
191 """ The path for this object """
192 return ("D,/rw-manifest:manifest" +
193 "/rw-manifest:operational-inventory" +
194 "/rw-manifest:component" +
195 "[rw-manifest:component-name = '{}']").format(self.name)
196
197 @property
198 def instance_xpath(self):
199 """ The path for this object """
200 return("D,/rw-base:vcs" +
201 "/instances" +
202 "/instance" +
203 "[instance-name = '{}']".format(self._cluster_name))
204
205 @property
206 def start_comp_xpath(self):
207 """ start component xpath """
208 return (self.instance_xpath +
209 "/child-n[instance-name = 'START-REQ']")
210
211 def get_start_comp_msg(self, ip_address):
212 """ start this component """
213 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
214 start_msg.instance_name = 'START-REQ'
215 start_msg.component_name = self.name
216 start_msg.admin_command = "START"
217 start_msg.ip_address = ip_address
218
219 return start_msg
220
221 @property
222 def msg(self):
223 """ Returns the message for this vcs component"""
224
225 vcs_comp_dict = self._component.as_dict()
226
227 def mangle_comp_names(comp_dict):
228 """ mangle component name with VNF name, id"""
229 for key, val in comp_dict.items():
230 if isinstance(val, dict):
231 comp_dict[key] = mangle_comp_names(val)
232 elif isinstance(val, list):
233 i = 0
234 for ent in val:
235 if isinstance(ent, dict):
236 val[i] = mangle_comp_names(ent)
237 else:
238 val[i] = ent
239 i += 1
240 elif key == "component_name":
241 comp_dict[key] = VcsComponent.mangle_name(val,
242 self._vnfd_name,
243 self._vnfd_id)
244 return comp_dict
245
246 mangled_dict = mangle_comp_names(vcs_comp_dict)
247 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
248 return msg
249
250 @asyncio.coroutine
251 def publish(self, xact):
252 """ Publishes the VCS component """
253 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
254 self.name, self.path, self.msg)
255 yield from self._vcs_handler.publish(xact, self.path, self.msg)
256
257 @asyncio.coroutine
258 def start(self, xact, parent, ip_addr=None):
259 """ Starts this VCS component """
260 # ATTN RV - replace with block add
261 start_msg = self.get_start_comp_msg(ip_addr)
262 self._log.debug("starting component %s %s",
263 self.start_comp_xpath, start_msg)
264 yield from self._dts.query_create(self.start_comp_xpath,
265 0,
266 start_msg)
267 self._log.debug("started component %s, %s",
268 self.start_comp_xpath, start_msg)
269
270
271 class VirtualDeploymentUnitRecord(object):
272 """ Virtual Deployment Unit Record """
273 def __init__(self,
274 dts,
275 log,
276 loop,
277 project,
278 vdud,
279 vnfr,
280 mgmt_intf,
281 mgmt_network,
282 cloud_account_name,
283 vnfd_package_store,
284 vdur_id=None,
285 placement_groups=[]):
286 self._dts = dts
287 self._log = log
288 self._loop = loop
289 self._project = project
290 self._vdud = vdud
291 self._vnfr = vnfr
292 self._mgmt_intf = mgmt_intf
293 self._cloud_account_name = cloud_account_name
294 self._vnfd_package_store = vnfd_package_store
295 self._mgmt_network = mgmt_network
296
297 self._vdur_id = vdur_id or str(uuid.uuid4())
298 self._int_intf = []
299 self._ext_intf = []
300 self._state = VDURecordState.INIT
301 self._state_failed_reason = None
302 self._request_id = str(uuid.uuid4())
303 self._name = vnfr.name + "__" + vdud.id
304 self._placement_groups = placement_groups
305 self._rm_regh = None
306 self._vm_resp = None
307 self._vdud_cloud_init = None
308 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(
309 dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
310
311 @asyncio.coroutine
312 def vdu_opdata_register(self):
313 yield from self._vdur_console_handler.register()
314
315 def cp_ip_addr(self, cp_name):
316 """ Find ip address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.ip_address
321 return "0.0.0.0"
322
323 def cp_mac_addr(self, cp_name):
324 """ Find mac address by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.mac_addr
329 return "00:00:00:00:00:00"
330
331 def cp_id(self, cp_name):
332 """ Find connection point id by connection point name """
333 if self._vm_resp is not None:
334 for conn_point in self._vm_resp.connection_points:
335 if conn_point.name == cp_name:
336 return conn_point.connection_point_id
337 return ''
338
339 @property
340 def vdu_id(self):
341 return self._vdud.id
342
343 @property
344 def vm_resp(self):
345 return self._vm_resp
346
347 @property
348 def name(self):
349 """ Return this VDUR's name """
350 return self._name
351
352 @property
353 def cloud_account_name(self):
354 """ Cloud account this VDU should be created in """
355 return self._cloud_account_name
356
357 @property
358 def image_name(self):
359 """ name that should be used to lookup the image on the CMP """
360 if 'image' not in self._vdud:
361 return None
362 return os.path.basename(self._vdud.image)
363
364 @property
365 def image_checksum(self):
366 """ name that should be used to lookup the image on the CMP """
367 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
368
369 @property
370 def management_ip(self):
371 if not self.active:
372 return None
373 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
374
375 @property
376 def vm_management_ip(self):
377 if not self.active:
378 return None
379 return self._vm_resp.management_ip
380
381 @property
382 def operational_status(self):
383 """ Operational status of this VDU"""
384 op_stats_dict = {"INIT": "init",
385 "INSTANTIATING": "vm_init_phase",
386 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
387 "READY": "running",
388 "FAILED": "failed",
389 "TERMINATING": "terminated",
390 "TERMINATED": "terminated",
391 }
392 return op_stats_dict[self._state.name]
393
394 @property
395 def msg(self):
396 """ Process VDU message from resmgr"""
397 vdu_fields = ["vm_flavor",
398 "guest_epa",
399 "vswitch_epa",
400 "hypervisor_epa",
401 "host_epa",
402 "volumes",
403 "name"]
404 vdu_copy_dict = {k: v for k, v in
405 self._vdud.as_dict().items() if k in vdu_fields}
406 vdur_dict = {"id": self._vdur_id,
407 "vdu_id_ref": self._vdud.id,
408 "operational_status": self.operational_status,
409 "operational_status_details": self._state_failed_reason,
410 }
411 if self.vm_resp is not None:
412 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
413 "flavor_id": self.vm_resp.flavor_id
414 })
415 if self._vm_resp.has_field('image_id'):
416 vdur_dict.update({ "image_id": self.vm_resp.image_id })
417
418 if self.management_ip is not None:
419 vdur_dict["management_ip"] = self.management_ip
420
421 if self.vm_management_ip is not None:
422 vdur_dict["vm_management_ip"] = self.vm_management_ip
423
424 vdur_dict.update(vdu_copy_dict)
425
426 if self.vm_resp is not None:
427 if self._vm_resp.has_field('volumes'):
428 for opvolume in self._vm_resp.volumes:
429 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
430 if len(vdurvol_data) == 1:
431 vdurvol_data[0]["volume_id"] = opvolume.volume_id
432 if opvolume.has_field('custom_meta_data'):
433 metadata_list = list()
434 for metadata_item in opvolume.custom_meta_data:
435 metadata_list.append(metadata_item.as_dict())
436 vdurvol_data[0]['custom_meta_data'] = metadata_list
437
438 if self._vm_resp.has_field('supplemental_boot_data'):
439 vdur_dict['supplemental_boot_data'] = dict()
440 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
441 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
442 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
443 metadata_list = list()
444 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
445 metadata_list.append(metadata_item.as_dict())
446 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
447 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
448 file_list = list()
449 for file_item in self._vm_resp.supplemental_boot_data.config_file:
450 file_list.append(file_item.as_dict())
451 vdur_dict['supplemental_boot_data']['config_file'] = file_list
452
453 icp_list = []
454 ii_list = []
455
456 for intf, cp_id, vlr in self._int_intf:
457 cp = self.find_internal_cp_by_cp_id(cp_id)
458
459 icp_list.append({"name": cp.name,
460 "id": cp.id,
461 "type_yang": "VPORT",
462 "ip_address": self.cp_ip_addr(cp.id),
463 "mac_address": self.cp_mac_addr(cp.id)})
464
465 ii_list.append({"name": intf.name,
466 "vdur_internal_connection_point_ref": cp.id,
467 "virtual_interface": {}})
468
469 vdur_dict["internal_connection_point"] = icp_list
470 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
471 vdur_dict["internal_interface"] = ii_list
472
473 ei_list = []
474 for intf, cp, vlr in self._ext_intf:
475 ei_list.append({"name": cp.name,
476 "vnfd_connection_point_ref": cp.name,
477 "virtual_interface": {}})
478 self._vnfr.update_cp(cp.name,
479 self.cp_ip_addr(cp.name),
480 self.cp_mac_addr(cp.name),
481 self.cp_id(cp.name))
482
483 vdur_dict["external_interface"] = ei_list
484
485 placement_groups = []
486 for group in self._placement_groups:
487 placement_groups.append(group.as_dict())
488 vdur_dict['placement_groups_info'] = placement_groups
489
490 return RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
491
492 @property
493 def resmgr_path(self):
494 """ path for resource-mgr"""
495 xpath = self._project.add_project("D,/rw-resource-mgr:resource-mgmt" +
496 "/vdu-event" +
497 "/vdu-event-data[event-id='{}']".format(self._request_id))
498 return xpath
499
500 @property
501 def vm_flavor_msg(self):
502 """ VM flavor message """
503 flavor = self._vdud.vm_flavor.__class__()
504 flavor.copy_from(self._vdud.vm_flavor)
505
506 return flavor
507
508 @property
509 def vdud_cloud_init(self):
510 """ Return the cloud-init contents for the VDU """
511 if self._vdud_cloud_init is None:
512 self._vdud_cloud_init = self.cloud_init()
513
514 return self._vdud_cloud_init
515
516 def cloud_init(self):
517 """ Populate cloud_init with cloud-config script from
518 either the inline contents or from the file provided
519 """
520 if self._vdud.cloud_init is not None:
521 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
522 return self._vdud.cloud_init
523 elif self._vdud.cloud_init_file is not None:
524 # Get cloud-init script contents from the file provided in the cloud_init_file param
525 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
526 filename = self._vdud.cloud_init_file
527 self._vnfd_package_store.refresh()
528 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
529 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
530 try:
531 return cloud_init_extractor.read_script(stored_package, filename)
532 except rift.package.cloud_init.CloudInitExtractionError as e:
533 self.instantiation_failed(str(e))
534 raise VirtualDeploymentUnitRecordError(e)
535 else:
536 self._log.debug("VDU Instantiation: cloud-init script not provided")
537
538 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
539 host_aggregates = []
540 availability_zones = []
541 server_groups = []
542 for group in self._placement_groups:
543 if group.has_field('host_aggregate'):
544 for aggregate in group.host_aggregate:
545 host_aggregates.append(aggregate.as_dict())
546 if group.has_field('availability_zone'):
547 availability_zones.append(group.availability_zone.as_dict())
548 if group.has_field('server_group'):
549 server_groups.append(group.server_group.as_dict())
550
551 if availability_zones:
552 if len(availability_zones) > 1:
553 self._log.error("Can not launch VDU: %s in multiple availability zones. " +
554 "Requested Zones: %s", self.name, availability_zones)
555 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability" +
556 " zones. Requsted Zones".format(self.name, availability_zones))
557 else:
558 vm_create_msg_dict['availability_zone'] = availability_zones[0]
559
560 if server_groups:
561 if len(server_groups) > 1:
562 self._log.error("Can not launch VDU: %s in multiple Server Group. " +
563 "Requested Groups: %s", self.name, server_groups)
564 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple " +
565 "Server Groups. Requsted Groups".format(self.name, server_groups))
566 else:
567 vm_create_msg_dict['server_group'] = server_groups[0]
568
569 if host_aggregates:
570 vm_create_msg_dict['host_aggregate'] = host_aggregates
571
572 return
573
574 def process_placement_groups(self, vm_create_msg_dict):
575 """Process the placement_groups and fill resource-mgr request"""
576 if not self._placement_groups:
577 return
578
579 cloud_set = set([group.cloud_type for group in self._placement_groups])
580 assert len(cloud_set) == 1
581 cloud_type = cloud_set.pop()
582
583 if cloud_type == 'openstack':
584 self.process_openstack_placement_group_construct(vm_create_msg_dict)
585
586 else:
587 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
588 return
589
590 def process_custom_bootdata(self, vm_create_msg_dict):
591 """Process the custom boot data"""
592 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
593 return
594
595 self._vnfd_package_store.refresh()
596 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
597 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
598 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
599 if 'source' not in file_item or 'dest' not in file_item:
600 continue
601 source = file_item['source']
602 # Find source file in scripts dir of VNFD
603 self._log.debug("Checking for source config file at %s", source)
604 try:
605 source_file_str = cloud_init_extractor.read_script(stored_package, source)
606 except rift.package.cloud_init.CloudInitExtractionError as e:
607 raise VirtualDeploymentUnitRecordError(e)
608 # Update source file location with file contents
609 file_item['source'] = source_file_str
610
611 return
612
613 def resmgr_msg(self, config=None):
614 vdu_fields = ["vm_flavor",
615 "guest_epa",
616 "vswitch_epa",
617 "hypervisor_epa",
618 "host_epa",
619 "volumes",
620 "supplemental_boot_data"]
621
622 self._log.debug("Creating params based on VDUD: %s", self._vdud)
623 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
624
625 vm_create_msg_dict = {
626 "name": self.name,
627 }
628
629 if self.image_name is not None:
630 vm_create_msg_dict["image_name"] = self.image_name
631
632 if self.image_checksum is not None:
633 vm_create_msg_dict["image_checksum"] = self.image_checksum
634
635 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
636 if self._vdud.has_field('mgmt_vpci'):
637 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
638
639 self._log.debug("VDUD: %s", self._vdud)
640 if config is not None:
641 vm_create_msg_dict['vdu_init'] = {'userdata': config}
642
643 if self._mgmt_network:
644 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
645
646 cp_list = []
647 for intf, cp, vlr in self._ext_intf:
648 cp_info = { "name": cp.name,
649 "virtual_link_id": vlr.network_id,
650 "type_yang": intf.virtual_interface.type_yang }
651
652 if cp.has_field('port_security_enabled'):
653 cp_info["port_security_enabled"] = cp.port_security_enabled
654
655 if (intf.virtual_interface.has_field('vpci') and
656 intf.virtual_interface.vpci is not None):
657 cp_info["vpci"] = intf.virtual_interface.vpci
658
659 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
660 cp_info['security_group'] = vlr.ip_profile_params.security_group
661
662 cp_list.append(cp_info)
663
664 for intf, cp, vlr in self._int_intf:
665 if (intf.virtual_interface.has_field('vpci') and
666 intf.virtual_interface.vpci is not None):
667 cp_list.append({"name": cp,
668 "virtual_link_id": vlr.network_id,
669 "type_yang": intf.virtual_interface.type_yang,
670 "vpci": intf.virtual_interface.vpci})
671 else:
672 if cp.has_field('port_security_enabled'):
673 cp_list.append({"name": cp,
674 "virtual_link_id": vlr.network_id,
675 "type_yang": intf.virtual_interface.type_yang,
676 "port_security_enabled": cp.port_security_enabled})
677 else:
678 cp_list.append({"name": cp,
679 "virtual_link_id": vlr.network_id,
680 "type_yang": intf.virtual_interface.type_yang})
681
682
683 vm_create_msg_dict["connection_points"] = cp_list
684 vm_create_msg_dict.update(vdu_copy_dict)
685
686 self.process_placement_groups(vm_create_msg_dict)
687 if 'supplemental_boot_data' in vm_create_msg_dict:
688 self.process_custom_bootdata(vm_create_msg_dict)
689
690 msg = RwResourceMgrYang.VDUEventData()
691 msg.event_id = self._request_id
692 msg.cloud_account = self.cloud_account_name
693 msg.request_info.from_dict(vm_create_msg_dict)
694
695 return msg
696
697 @asyncio.coroutine
698 def terminate(self, xact):
699 """ Delete resource in VIM """
700 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
701 self._log.warning("VDU terminate in not ready state - Ignoring request")
702 return
703
704 self._state = VDURecordState.TERMINATING
705 if self._vm_resp is not None:
706 try:
707 with self._dts.transaction() as new_xact:
708 yield from self.delete_resource(new_xact)
709 except Exception:
710 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
711
712 if self._rm_regh is not None:
713 self._log.debug("Deregistering resource manager registration handle")
714 self._rm_regh.deregister()
715 self._rm_regh = None
716
717 if self._vdur_console_handler is not None:
718 self._log.debug("Deregistering vnfr vdur registration handle")
719 self._vdur_console_handler._regh.deregister()
720 self._vdur_console_handler._regh = None
721
722 self._state = VDURecordState.TERMINATED
723
724 def find_internal_cp_by_cp_id(self, cp_id):
725 """ Find the CP corresponding to the connection point id"""
726 cp = None
727
728 self._log.debug("find_internal_cp_by_cp_id(%s) called",
729 cp_id)
730
731 for int_cp in self._vdud.internal_connection_point:
732 self._log.debug("Checking for int cp %s in internal connection points",
733 int_cp.id)
734 if int_cp.id == cp_id:
735 cp = int_cp
736 break
737
738 if cp is None:
739 self._log.debug("Failed to find cp %s in internal connection points",
740 cp_id)
741 msg = "Failed to find cp %s in internal connection points" % cp_id
742 raise VduRecordError(msg)
743
744 # return the VLR associated with the connection point
745 return cp
746
747 @asyncio.coroutine
748 def create_resource(self, xact, vnfr, config=None):
749 """ Request resource from ResourceMgr """
750 def find_cp_by_name(cp_name):
751 """ Find a connection point by name """
752 cp = None
753 self._log.debug("find_cp_by_name(%s) called", cp_name)
754 for ext_cp in vnfr._cprs:
755 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
756 if ext_cp.name == cp_name:
757 cp = ext_cp
758 break
759 if cp is None:
760 self._log.debug("Failed to find cp %s in external connection points",
761 cp_name)
762 return cp
763
764 def find_internal_vlr_by_cp_name(cp_name):
765 """ Find the VLR corresponding to the connection point name"""
766 cp = None
767
768 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
769 cp_name)
770
771 for int_cp in self._vdud.internal_connection_point:
772 self._log.debug("Checking for int cp %s in internal connection points",
773 int_cp.id)
774 if int_cp.id == cp_name:
775 cp = int_cp
776 break
777
778 if cp is None:
779 self._log.debug("Failed to find cp %s in internal connection points",
780 cp_name)
781 msg = "Failed to find cp %s in internal connection points" % cp_name
782 raise VduRecordError(msg)
783
784 # return the VLR associated with the connection point
785 return vnfr.find_vlr_by_cp(cp_name)
786
787 block = xact.block_create()
788
789 self._log.debug("Executing vm request id: %s, action: create",
790 self._request_id)
791
792 # Resolve the networks associated external interfaces
793 for ext_intf in self._vdud.external_interface:
794 self._log.debug("Resolving external interface name [%s], cp[%s]",
795 ext_intf.name, ext_intf.vnfd_connection_point_ref)
796 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
797 if cp is None:
798 self._log.debug("Failed to find connection point - %s",
799 ext_intf.vnfd_connection_point_ref)
800 continue
801 self._log.debug("Connection point name [%s], type[%s]",
802 cp.name, cp.type_yang)
803
804 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
805
806 etuple = (ext_intf, cp, vlr)
807 self._ext_intf.append(etuple)
808
809 self._log.debug("Created external interface tuple : %s", etuple)
810
811 # Resolve the networks associated internal interfaces
812 for intf in self._vdud.internal_interface:
813 cp_id = intf.vdu_internal_connection_point_ref
814 self._log.debug("Resolving internal interface name [%s], cp[%s]",
815 intf.name, cp_id)
816
817 try:
818 vlr = find_internal_vlr_by_cp_name(cp_id)
819 except Exception as e:
820 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
821 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
822 raise VduRecordError(msg)
823
824 ituple = (intf, cp_id, vlr)
825 self._int_intf.append(ituple)
826
827 self._log.debug("Created internal interface tuple : %s", ituple)
828
829 resmgr_path = self.resmgr_path
830 resmgr_msg = self.resmgr_msg(config)
831
832 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
833 block.add_query_create(resmgr_path, resmgr_msg)
834
835 res_iter = yield from block.execute(now=True)
836
837 resp = None
838
839 for i in res_iter:
840 r = yield from i
841 resp = r.result
842
843 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
844 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
845 self._log.debug("Got vm request response: %s", resp.resource_info)
846 return resp.resource_info
847
848 @asyncio.coroutine
849 def delete_resource(self, xact):
850 block = xact.block_create()
851
852 self._log.debug("Executing vm request id: %s, action: delete",
853 self._request_id)
854
855 block.add_query_delete(self.resmgr_path)
856
857 yield from block.execute(flags=0, now=True)
858
859 @asyncio.coroutine
860 def read_resource(self, xact):
861 block = xact.block_create()
862
863 self._log.debug("Executing vm request id: %s, action: delete",
864 self._request_id)
865
866 block.add_query_read(self.resmgr_path)
867
868 res_iter = yield from block.execute(flags=0, now=True)
869 for i in res_iter:
870 r = yield from i
871 resp = r.result
872
873 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
874 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
875 self._log.debug("Got vm request response: %s", resp.resource_info)
876 #self._vm_resp = resp.resource_info
877 return resp.resource_info
878
879
880 @asyncio.coroutine
881 def start_component(self):
882 """ This VDUR is active """
883 self._log.debug("Starting component %s for vdud %s vdur %s",
884 self._vdud.vcs_component_ref,
885 self._vdud,
886 self._vdur_id)
887 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
888 self.vm_resp.management_ip)
889
890 @property
891 def active(self):
892 """ Is this VDU active """
893 return True if self._state is VDURecordState.READY else False
894
895 @asyncio.coroutine
896 def instantiation_failed(self, failed_reason=None):
897 """ VDU instantiation failed """
898 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
899 self._state = VDURecordState.FAILED
900 self._state_failed_reason = failed_reason
901 yield from self._vnfr.instantiation_failed(failed_reason)
902
903 @asyncio.coroutine
904 def vdu_is_active(self):
905 """ This VDU is active"""
906 if self.active:
907 self._log.warning("VDU %s was already marked as active", self._vdur_id)
908 return
909
910 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
911
912 if self._vdud.vcs_component_ref is not None:
913 yield from self.start_component()
914
915 self._state = VDURecordState.READY
916
917 if self._vnfr.all_vdus_active():
918 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
919 yield from self._vnfr.is_ready()
920
921 @asyncio.coroutine
922 def instantiate(self, xact, vnfr, config=None):
923 """ Instantiate this VDU """
924 self._state = VDURecordState.INSTANTIATING
925
926 @asyncio.coroutine
927 def on_prepare(xact_info, query_action, ks_path, msg):
928 """ This VDUR is active """
929 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
930 query_action,
931 ks_path,
932 msg)
933
934 if (query_action == rwdts.QueryAction.UPDATE or
935 query_action == rwdts.QueryAction.CREATE):
936 self._vm_resp = msg
937
938 if msg.resource_state == "active":
939 # Move this VDU to ready state
940 yield from self.vdu_is_active()
941 elif msg.resource_state == "failed":
942 yield from self.instantiation_failed(msg.resource_errors)
943 elif query_action == rwdts.QueryAction.DELETE:
944 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
945 else:
946 raise NotImplementedError(
947 "%s action on VirtualDeployementUnitRecord not supported",
948 query_action)
949
950 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
951
952 try:
953 reg_event = asyncio.Event(loop=self._loop)
954
955 @asyncio.coroutine
956 def on_ready(regh, status):
957 reg_event.set()
958
959 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
960 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
961 flags=rwdts.Flag.SUBSCRIBER,
962 handler=handler)
963 yield from reg_event.wait()
964
965 vm_resp = yield from self.create_resource(xact, vnfr, config)
966 self._vm_resp = vm_resp
967 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
968
969 self._log.debug("Requested VM from resource manager response %s",
970 vm_resp)
971 if vm_resp.resource_state == "active":
972 self._log.debug("Resourcemgr responded wih an active vm resp %s",
973 vm_resp)
974 yield from self.vdu_is_active()
975 self._state = VDURecordState.READY
976 elif (vm_resp.resource_state == "pending" or
977 vm_resp.resource_state == "inactive"):
978 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
979 vm_resp)
980 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
981 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
982 # flags=rwdts.Flag.SUBSCRIBER,
983 # handler=handler)
984 else:
985 self._log.debug("Resourcemgr responded wih an error vm resp %s",
986 vm_resp)
987 raise VirtualDeploymentUnitRecordError(
988 "Failed VDUR instantiation %s " % vm_resp)
989
990 except Exception as e:
991 import traceback
992 traceback.print_exc()
993 self._log.exception(e)
994 self._log.error("Instantiation of VDU record failed: %s", str(e))
995 self._state = VDURecordState.FAILED
996 yield from self.instantiation_failed(str(e))
997
998
999 class VlRecordState(enum.Enum):
1000 """ VL Record State """
1001 INIT = 101
1002 INSTANTIATION_PENDING = 102
1003 ACTIVE = 103
1004 TERMINATE_PENDING = 104
1005 TERMINATED = 105
1006 FAILED = 106
1007
1008
1009 class InternalVirtualLinkRecord(object):
1010 """ Internal Virtual Link record """
1011 def __init__(self, dts, log, loop, project,
1012 ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
1013 self._dts = dts
1014 self._log = log
1015 self._loop = loop
1016 self._project = project
1017 self._ivld_msg = ivld_msg
1018 self._vnfr_name = vnfr_name
1019 self._cloud_account_name = cloud_account_name
1020 self._ip_profile = ip_profile
1021
1022 self._vlr_req = self.create_vlr()
1023 self._vlr = None
1024 self._state = VlRecordState.INIT
1025
1026 @property
1027 def vlr_id(self):
1028 """ Find VLR by id """
1029 return self._vlr_req.id
1030
1031 @property
1032 def name(self):
1033 """ Name of this VL """
1034 if self._ivld_msg.vim_network_name:
1035 return self._ivld_msg.vim_network_name
1036 else:
1037 return self._vnfr_name + "." + self._ivld_msg.name
1038
1039 @property
1040 def network_id(self):
1041 """ Find VLR by id """
1042 return self._vlr.network_id if self._vlr else None
1043
1044 def vlr_path(self):
1045 """ VLR path for this VLR instance"""
1046 return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".
1047 format(self.vlr_id))
1048
1049 def create_vlr(self):
1050 """ Create the VLR record which will be instantiated """
1051
1052 vld_fields = ["short_name",
1053 "vendor",
1054 "description",
1055 "version",
1056 "type_yang",
1057 "vim_network_name",
1058 "provider_network"]
1059
1060 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1061
1062 vlr_dict = {"id": str(uuid.uuid4()),
1063 "name": self.name,
1064 "cloud_account": self._cloud_account_name,
1065 }
1066
1067 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1068 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1069
1070 vlr_dict.update(vld_copy_dict)
1071
1072 vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict)
1073 return vlr
1074
1075 @asyncio.coroutine
1076 def instantiate(self, xact, restart_mode=False):
1077 """ Instantiate VL """
1078
1079 @asyncio.coroutine
1080 def instantiate_vlr():
1081 """ Instantiate VLR"""
1082 self._log.debug("Create VL with xpath %s and vlr %s",
1083 self.vlr_path(), self._vlr_req)
1084
1085 with self._dts.transaction(flags=0) as xact:
1086 block = xact.block_create()
1087 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1088 self._log.debug("Executing VL create path:%s msg:%s",
1089 self.vlr_path(), self._vlr_req)
1090
1091 res_iter = None
1092 try:
1093 res_iter = yield from block.execute()
1094 except Exception:
1095 self._state = VlRecordState.FAILED
1096 self._log.exception("Caught exception while instantial VL")
1097 raise
1098
1099 for ent in res_iter:
1100 res = yield from ent
1101 self._vlr = res.result
1102
1103 if self._vlr.operational_status == 'failed':
1104 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1105 self._state = VlRecordState.FAILED
1106 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1107
1108 self._log.info("Created VL with xpath %s and vlr %s",
1109 self.vlr_path(), self._vlr)
1110
1111 @asyncio.coroutine
1112 def get_vlr():
1113 """ Get the network id """
1114 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1115 vlr = None
1116 for ent in res_iter:
1117 res = yield from ent
1118 vlr = res.result
1119
1120 if vlr is None:
1121 err = "Failed to get VLR for path %s" % self.vlr_path()
1122 self._log.warn(err)
1123 raise InternalVirtualLinkRecordError(err)
1124 return vlr
1125
1126 self._state = VlRecordState.INSTANTIATION_PENDING
1127
1128 if restart_mode:
1129 vl = yield from get_vlr()
1130 if vl is None:
1131 yield from instantiate_vlr()
1132 else:
1133 yield from instantiate_vlr()
1134
1135 self._state = VlRecordState.ACTIVE
1136
1137 def vlr_in_vns(self):
1138 """ Is there a VLR record in VNS """
1139 if (self._state == VlRecordState.ACTIVE or
1140 self._state == VlRecordState.INSTANTIATION_PENDING or
1141 self._state == VlRecordState.FAILED):
1142 return True
1143
1144 return False
1145
1146 @asyncio.coroutine
1147 def terminate(self, xact):
1148 """Terminate this VL """
1149 if not self.vlr_in_vns():
1150 self._log.debug("Ignoring terminate request for id %s in state %s",
1151 self.vlr_id, self._state)
1152 return
1153
1154 self._log.debug("Terminating VL with path %s", self.vlr_path())
1155 self._state = VlRecordState.TERMINATE_PENDING
1156 block = xact.block_create()
1157 block.add_query_delete(self.vlr_path())
1158 yield from block.execute(flags=0, now=True)
1159 self._state = VlRecordState.TERMINATED
1160 self._log.debug("Terminated VL with path %s", self.vlr_path())
1161
1162
1163 class VirtualNetworkFunctionRecord(object):
1164 """ Virtual Network Function Record """
1165 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1166 self._dts = dts
1167 self._log = log
1168 self._loop = loop
1169 self._project = vnfm._project
1170 self._cluster_name = cluster_name
1171 self._vnfr_msg = vnfr_msg
1172 self._vnfr_id = vnfr_msg.id
1173 self._vnfd_id = vnfr_msg.vnfd.id
1174 self._vnfm = vnfm
1175 self._vcs_handler = vcs_handler
1176 self._vnfr = vnfr_msg
1177 self._mgmt_network = mgmt_network
1178
1179 self._vnfd = vnfr_msg.vnfd
1180 self._state = VirtualNetworkFunctionRecordState.INIT
1181 self._state_failed_reason = None
1182 self._ext_vlrs = {} # The list of external virtual links
1183 self._vlrs = [] # The list of internal virtual links
1184 self._vdus = [] # The list of vdu
1185 self._vlr_by_cp = {}
1186 self._cprs = []
1187 self._inventory = {}
1188 self._create_time = int(time.time())
1189 self._vnf_mon = None
1190 self._config_status = vnfr_msg.config_status
1191 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1192 self._rw_vnfd = None
1193 self._vnfd_ref_count = 0
1194
1195 def _get_vdur_from_vdu_id(self, vdu_id):
1196 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1197 self._log.debug("Searching through vdus: %s", self._vdus)
1198 for vdu in self._vdus:
1199 self._log.debug("vdu_id: %s", vdu.vdu_id)
1200 if vdu.vdu_id == vdu_id:
1201 return vdu
1202
1203 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1204
1205 @property
1206 def operational_status(self):
1207 """ Operational status of this VNFR """
1208 op_status_map = {"INIT": "init",
1209 "VL_INIT_PHASE": "vl_init_phase",
1210 "VM_INIT_PHASE": "vm_init_phase",
1211 "READY": "running",
1212 "TERMINATE": "terminate",
1213 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1214 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1215 "TERMINATED": "terminated",
1216 "FAILED": "failed", }
1217 return op_status_map[self._state.name]
1218
1219 @staticmethod
1220 def vnfd_xpath(vnfd_id):
1221 """ VNFD xpath associated with this VNFR """
1222 return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd[project-vnfd:id = '{}']".
1223 format(vnfd_id))
1224
1225 @property
1226 def vnfd_ref_count(self):
1227 """ Returns the VNFD reference count associated with this VNFR """
1228 return self._vnfd_ref_count
1229
1230 def vnfd_in_use(self):
1231 """ Returns whether vnfd is in use or not """
1232 return True if self._vnfd_ref_count > 0 else False
1233
1234 def vnfd_ref(self):
1235 """ Take a reference on this object """
1236 self._vnfd_ref_count += 1
1237 return self._vnfd_ref_count
1238
1239 def vnfd_unref(self):
1240 """ Release reference on this object """
1241 if self._vnfd_ref_count < 1:
1242 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1243 (self.vnfd.id, self._vnfd_ref_count))
1244 self._log.critical(msg)
1245 raise VnfRecordError(msg)
1246 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1247 self.vnfd.id, self._vnfd_ref_count)
1248 self._vnfd_ref_count -= 1
1249 return self._vnfd_ref_count
1250
1251 @property
1252 def vnfd(self):
1253 """ VNFD for this VNFR """
1254 return self._vnfd
1255
1256 @property
1257 def vnf_name(self):
1258 """ VNFD name associated with this VNFR """
1259 return self.vnfd.name
1260
1261 @property
1262 def name(self):
1263 """ Name of this VNF in the record """
1264 return self._vnfr.name
1265
1266 @property
1267 def cloud_account_name(self):
1268 """ Name of the cloud account this VNFR is instantiated in """
1269 return self._vnfr.cloud_account
1270
1271 @property
1272 def vnfd_id(self):
1273 """ VNFD Id associated with this VNFR """
1274 return self.vnfd.id
1275
1276 @property
1277 def vnfr_id(self):
1278 """ VNFR Id associated with this VNFR """
1279 return self._vnfr_id
1280
1281 @property
1282 def member_vnf_index(self):
1283 """ Member VNF index associated with this VNFR """
1284 return self._vnfr.member_vnf_index_ref
1285
1286 @property
1287 def config_status(self):
1288 """ Config agent status for this VNFR """
1289 return self._config_status
1290
1291 def component_by_name(self, component_name):
1292 """ Find a component by name in the inventory list"""
1293 mangled_name = VcsComponent.mangle_name(component_name,
1294 self.vnf_name,
1295 self.vnfd_id)
1296 return self._inventory[mangled_name]
1297
1298
1299
1300 @asyncio.coroutine
1301 def get_nsr_config(self):
1302 ### Need access to NS instance configuration for runtime resolution.
1303 ### This shall be replaced when deployment flavors are implemented
1304 xpath = self._project.add_project("C,/nsr:ns-instance-config")
1305 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1306
1307 for result in results:
1308 entry = yield from result
1309 ns_instance_config = entry.result
1310 for nsr in ns_instance_config.nsr:
1311 if nsr.id == self._vnfr_msg.nsr_id_ref:
1312 return nsr
1313 return None
1314
1315 @asyncio.coroutine
1316 def start_component(self, component_name, ip_addr):
1317 """ Start a component in the VNFR by name """
1318 comp = self.component_by_name(component_name)
1319 yield from comp.start(None, None, ip_addr)
1320
1321 def cp_ip_addr(self, cp_name):
1322 """ Get ip address for connection point """
1323 self._log.debug("cp_ip_addr()")
1324 for cp in self._cprs:
1325 if cp.name == cp_name and cp.ip_address is not None:
1326 return cp.ip_address
1327 return "0.0.0.0"
1328
1329 def mgmt_intf_info(self):
1330 """ Get Management interface info for this VNFR """
1331 mgmt_intf_desc = self.vnfd.mgmt_interface
1332 ip_addr = None
1333 if mgmt_intf_desc.has_field("cp"):
1334 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1335 elif mgmt_intf_desc.has_field("vdu_id"):
1336 try:
1337 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1338 ip_addr = vdur.management_ip
1339 except VDURecordNotFound:
1340 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1341 ip_addr = None
1342 else:
1343 ip_addr = mgmt_intf_desc.ip_address
1344 port = mgmt_intf_desc.port
1345
1346 return ip_addr, port
1347
1348 @property
1349 def msg(self):
1350 """ Message associated with this VNFR """
1351 vnfd_fields = ["short_name", "vendor", "description", "version"]
1352 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1353
1354 mgmt_intf = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MgmtInterface()
1355 ip_address, port = self.mgmt_intf_info()
1356
1357 if ip_address is not None:
1358 mgmt_intf.ip_address = ip_address
1359 if port is not None:
1360 mgmt_intf.port = port
1361
1362 vnfr_dict = {"id": self._vnfr_id,
1363 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1364 "name": self.name,
1365 "member_vnf_index_ref": self.member_vnf_index,
1366 "operational_status": self.operational_status,
1367 "operational_status_details": self._state_failed_reason,
1368 "cloud_account": self.cloud_account_name,
1369 "config_status": self._config_status
1370 }
1371
1372 vnfr_dict.update(vnfd_copy_dict)
1373
1374 vnfr_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1375 vnfr_msg.vnfd = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1376
1377 vnfr_msg.create_time = self._create_time
1378 vnfr_msg.uptime = int(time.time()) - self._create_time
1379 vnfr_msg.mgmt_interface = mgmt_intf
1380
1381 # Add all the VLRs to VNFR
1382 for vlr in self._vlrs:
1383 ivlr = vnfr_msg.internal_vlr.add()
1384 ivlr.vlr_ref = vlr.vlr_id
1385
1386 # Add all the VDURs to VDUR
1387 if self._vdus is not None:
1388 for vdu in self._vdus:
1389 vdur = vnfr_msg.vdur.add()
1390 vdur.from_dict(vdu.msg.as_dict())
1391
1392 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1393 vnfr_msg.dashboard_url = self.dashboard_url
1394
1395 for cpr in self._cprs:
1396 new_cp = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1397 vnfr_msg.connection_point.append(new_cp)
1398
1399 if self._vnf_mon is not None:
1400 for monp in self._vnf_mon.msg:
1401 vnfr_msg.monitoring_param.append(
1402 VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1403
1404 if self._vnfr.vnf_configuration is not None:
1405 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1406 if (ip_address is not None and
1407 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1408 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1409
1410 for group in self._vnfr_msg.placement_groups_info:
1411 group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1412 group_info.from_dict(group.as_dict())
1413 vnfr_msg.placement_groups_info.append(group_info)
1414
1415 return vnfr_msg
1416
1417 @property
1418 def dashboard_url(self):
1419 ip, cfg_port = self.mgmt_intf_info()
1420 protocol = 'http'
1421 http_port = 80
1422 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1423 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1424 protocol = 'https'
1425 http_port = 443
1426 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1427 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1428
1429 url = "{protocol}://{ip_address}:{port}/{path}".format(
1430 protocol=protocol,
1431 ip_address=ip,
1432 port=http_port,
1433 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1434 )
1435
1436 return url
1437
1438 @property
1439 def xpath(self):
1440 """ path for this VNFR """
1441 return self._project.add_project("D,/vnfr:vnfr-catalog"
1442 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1443
1444 @asyncio.coroutine
1445 def publish(self, xact):
1446 """ publish this VNFR """
1447 vnfr = self.msg
1448 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1449 self.xpath, self.msg)
1450 vnfr.create_time = self._create_time
1451 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1452 self._log.debug("Published VNFR path = [%s], record = [%s]",
1453 self.xpath, self.msg)
1454
1455 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1456 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1457 if not vld.has_field('ip_profile_ref'):
1458 return None
1459 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1460 return profile[0] if profile else None
1461
1462 @asyncio.coroutine
1463 def create_vls(self):
1464 """ Publish The VLs associated with this VNF """
1465 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1466 self.vnfd_id)
1467 for ivld_msg in self.vnfd.internal_vld:
1468 self._log.debug("Creating internal vld:"
1469 " %s, int_cp_ref = %s",
1470 ivld_msg, ivld_msg.internal_connection_point
1471 )
1472 vlr = InternalVirtualLinkRecord(dts=self._dts,
1473 log=self._log,
1474 loop=self._loop,
1475 ivld_msg=ivld_msg,
1476 vnfr_name=self.name,
1477 cloud_account_name=self.cloud_account_name,
1478 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1479 )
1480 self._vlrs.append(vlr)
1481
1482 for int_cp in ivld_msg.internal_connection_point:
1483 if int_cp.id_ref in self._vlr_by_cp:
1484 msg = ("Connection point %s already "
1485 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1486 raise InternalVirtualLinkRecordError(msg)
1487 self._log.debug("Setting vlr %s to internal cp = %s",
1488 vlr, int_cp.id_ref)
1489 self._vlr_by_cp[int_cp.id_ref] = vlr
1490
1491 @asyncio.coroutine
1492 def instantiate_vls(self, xact, restart_mode=False):
1493 """ Instantiate the VLs associated with this VNF """
1494 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1495 self.vnfd_id)
1496
1497 for vlr in self._vlrs:
1498 self._log.debug("Instantiating VLR %s", vlr)
1499 yield from vlr.instantiate(xact, restart_mode)
1500
1501 def find_vlr_by_cp(self, cp_name):
1502 """ Find the VLR associated with the cp name """
1503 return self._vlr_by_cp[cp_name]
1504
1505 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1506 """
1507 Returns the cloud specific construct for placement group
1508 Arguments:
1509 input_group: VNFD PlacementGroup
1510 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1511 """
1512 copy_dict = ['name', 'requirement', 'strategy']
1513 for group_info in nsr_config.vnfd_placement_group_maps:
1514 if group_info.placement_group_ref == input_group.name and \
1515 group_info.vnfd_id_ref == self.vnfd_id:
1516 group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1517 group_dict = {k:v for k,v in
1518 group_info.as_dict().items()
1519 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1520 for param in copy_dict:
1521 group_dict.update({param: getattr(input_group, param)})
1522 group.from_dict(group_dict)
1523 return group
1524 return None
1525
1526 @asyncio.coroutine
1527 def get_vdu_placement_groups(self, vdu):
1528 placement_groups = []
1529 ### Step-1: Get VNF level placement groups
1530 for group in self._vnfr_msg.placement_groups_info:
1531 #group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1532 #group_info.from_dict(group.as_dict())
1533 placement_groups.append(group)
1534
1535 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1536 nsr_config = yield from self.get_nsr_config()
1537
1538 ### Step-3: Get VDU level placement groups
1539 for group in self.vnfd.placement_groups:
1540 for member_vdu in group.member_vdus:
1541 if member_vdu.member_vdu_ref == vdu.id:
1542 group_info = self.resolve_placement_group_cloud_construct(group,
1543 nsr_config)
1544 if group_info is None:
1545 self._log.info("Could not resolve cloud-construct for " +
1546 "placement group: %s", group.name)
1547 else:
1548 self._log.info("Successfully resolved cloud construct for " +
1549 "placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1550 str(group_info),
1551 vdu.name,
1552 self.vnf_name,
1553 self.member_vnf_index)
1554 placement_groups.append(group_info)
1555
1556 return placement_groups
1557
1558 @asyncio.coroutine
1559 def vdu_cloud_init_instantiation(self):
1560 [vdu.vdud_cloud_init for vdu in self._vdus]
1561
1562 @asyncio.coroutine
1563 def create_vdus(self, vnfr, restart_mode=False):
1564 """ Create the VDUs associated with this VNF """
1565
1566 def get_vdur_id(vdud):
1567 """Get the corresponding VDUR's id for the VDUD. This is useful in
1568 case of a restart.
1569
1570 In restart mode we check for exiting VDUR's ID and use them, if
1571 available. This way we don't end up creating duplicate VDURs
1572 """
1573 vdur_id = None
1574
1575 if restart_mode and vdud is not None:
1576 try:
1577 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1578 vdur_id = vdur[0]
1579 except IndexError:
1580 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1581
1582 return vdur_id
1583
1584
1585 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1586 for vdu in self._rw_vnfd.vdu:
1587 self._log.debug("Creating vdu: %s", vdu)
1588 vdur_id = get_vdur_id(vdu)
1589
1590 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1591 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1592 vdu.name,
1593 self.vnf_name,
1594 self.member_vnf_index,
1595 [ group.name for group in placement_groups])
1596
1597 vdur = VirtualDeploymentUnitRecord(
1598 dts=self._dts,
1599 log=self._log,
1600 loop=self._loop,
1601 project = self._project,
1602 vdud=vdu,
1603 vnfr=vnfr,
1604 mgmt_intf=self.has_mgmt_interface(vdu),
1605 mgmt_network=self._mgmt_network,
1606 cloud_account_name=self.cloud_account_name,
1607 vnfd_package_store=self._vnfd_package_store,
1608 vdur_id=vdur_id,
1609 placement_groups = placement_groups,
1610 )
1611 yield from vdur.vdu_opdata_register()
1612
1613 self._vdus.append(vdur)
1614
1615 @asyncio.coroutine
1616 def instantiate_vdus(self, xact, vnfr):
1617 """ Instantiate the VDUs associated with this VNF """
1618 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1619
1620 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1621
1622 # Identify any dependencies among the VDUs
1623 dependencies = collections.defaultdict(list)
1624 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1625
1626 for vdu in self._vdus:
1627 if vdu._vdud_cloud_init is not None:
1628 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1629 if vdu_id != vdu.vdu_id:
1630 # This means that vdu.vdu_id depends upon vdu_id,
1631 # i.e. vdu_id must be instantiated before
1632 # vdu.vdu_id.
1633 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1634
1635 # Define the terminal states of VDU instantiation
1636 terminal = (
1637 VDURecordState.READY,
1638 VDURecordState.TERMINATED,
1639 VDURecordState.FAILED,
1640 )
1641
1642 datastore = VdurDatastore()
1643 processed = set()
1644
1645 @asyncio.coroutine
1646 def instantiate_monitor(vdu):
1647 """Monitor the state of the VDU during instantiation
1648
1649 Arguments:
1650 vdu - a VirtualDeploymentUnitRecord
1651
1652 """
1653 # wait for the VDUR to enter a terminal state
1654 while vdu._state not in terminal:
1655 yield from asyncio.sleep(1, loop=self._loop)
1656 # update the datastore
1657 datastore.update(vdu)
1658
1659 # add the VDU to the set of processed VDUs
1660 processed.add(vdu.vdu_id)
1661
1662 @asyncio.coroutine
1663 def instantiate(vdu):
1664 """Instantiate the specified VDU
1665
1666 Arguments:
1667 vdu - a VirtualDeploymentUnitRecord
1668
1669 Raises:
1670 if the VDU, or any of the VDUs this VDU depends upon, are
1671 terminated or fail to instantiate properly, a
1672 VirtualDeploymentUnitRecordError is raised.
1673
1674 """
1675 for dependency in dependencies[vdu.vdu_id]:
1676 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1677
1678 while dependency.vdu_id not in processed:
1679 yield from asyncio.sleep(1, loop=self._loop)
1680
1681 if not dependency.active:
1682 raise VirtualDeploymentUnitRecordError()
1683
1684 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1685
1686 # Populate the datastore with the current values of the VDU
1687 datastore.add(vdu)
1688
1689 # Substitute any variables contained in the cloud config script
1690 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1691
1692 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1693 if len(parts) > 1:
1694
1695 # Extract the variable names
1696 variables = list()
1697 for variable in parts[1::2]:
1698 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1699
1700 # Iterate of the variables and substitute values from the
1701 # datastore.
1702 for variable in variables:
1703
1704 # Handle a reference to a VDU by ID
1705 if variable.startswith('vdu['):
1706 value = datastore.get(variable)
1707 if value is None:
1708 msg = "Unable to find a substitute for {} in {} cloud-init script"
1709 raise ValueError(msg.format(variable, vdu.vdu_id))
1710
1711 config = config.replace("{{ %s }}" % variable, value)
1712 continue
1713
1714 # Handle a reference to the current VDU
1715 if variable.startswith('vdu'):
1716 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1717 config = config.replace("{{ %s }}" % variable, value)
1718 continue
1719
1720 # Handle unrecognized variables
1721 msg = 'unrecognized cloud-config variable: {}'
1722 raise ValueError(msg.format(variable))
1723
1724 # Instantiate the VDU
1725 with self._dts.transaction() as xact:
1726 self._log.debug("Instantiating vdu: %s", vdu)
1727 yield from vdu.instantiate(xact, vnfr, config=config)
1728 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1729 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1730 self.vnfr_id, vdu)
1731
1732 # First create a set of tasks to monitor the state of the VDUs and
1733 # report when they have entered a terminal state
1734 for vdu in self._vdus:
1735 self._loop.create_task(instantiate_monitor(vdu))
1736
1737 for vdu in self._vdus:
1738 self._loop.create_task(instantiate(vdu))
1739
1740 def has_mgmt_interface(self, vdu):
1741 # ## TODO: Support additional mgmt_interface type options
1742 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1743 return True
1744 return False
1745
1746 def vlr_xpath(self, vlr_id):
1747 """ vlr xpath """
1748 return self._project.add_project("D,/vlr:vlr-catalog/"
1749 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1750
1751 def ext_vlr_by_id(self, vlr_id):
1752 """ find ext vlr by id """
1753 return self._ext_vlrs[vlr_id]
1754
1755 @asyncio.coroutine
1756 def publish_inventory(self, xact):
1757 """ Publish the inventory associated with this VNF """
1758 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1759
1760 for component in self._rw_vnfd.component:
1761 self._log.debug("Creating inventory component %s", component)
1762 mangled_name = VcsComponent.mangle_name(component.component_name,
1763 self.vnf_name,
1764 self.vnfd_id
1765 )
1766 comp = VcsComponent(dts=self._dts,
1767 log=self._log,
1768 loop=self._loop,
1769 cluster_name=self._cluster_name,
1770 vcs_handler=self._vcs_handler,
1771 component=component,
1772 mangled_name=mangled_name,
1773 )
1774 if comp.name in self._inventory:
1775 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1776 component, self._vnfd_id)
1777 return
1778 self._log.debug("Adding component %s for vnrf %s",
1779 comp.name, self._vnfr_id)
1780 self._inventory[comp.name] = comp
1781 yield from comp.publish(xact)
1782
1783 def all_vdus_active(self):
1784 """ Are all VDUS in this VNFR active? """
1785 for vdu in self._vdus:
1786 if not vdu.active:
1787 return False
1788
1789 self._log.debug("Inside all_vdus_active. Returning True")
1790 return True
1791
1792 @asyncio.coroutine
1793 def instantiation_failed(self, failed_reason=None):
1794 """ VNFR instantiation failed """
1795 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1796 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1797 self._state_failed_reason = failed_reason
1798
1799 # Update the VNFR with the changed status
1800 yield from self.publish(None)
1801
1802 @asyncio.coroutine
1803 def is_ready(self):
1804 """ This VNF is ready"""
1805 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1806
1807 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1808 self.set_state(VirtualNetworkFunctionRecordState.READY)
1809
1810 else:
1811 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1812
1813 # Update the VNFR with the changed status
1814 yield from self.publish(None)
1815
1816 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1817 """Updated the connection point with ip address"""
1818 for cp in self._cprs:
1819 if cp.name == cp_name:
1820 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1821 cp_name, cp, ip_address, cp_id)
1822 cp.ip_address = ip_address
1823 cp.mac_address = mac_addr
1824 cp.connection_point_id = cp_id
1825 return
1826
1827 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1828 self._log.debug(err)
1829 raise VirtualDeploymentUnitRecordError(err)
1830
1831 def set_state(self, state):
1832 """ Set state for this VNFR"""
1833 self._state = state
1834
1835 @asyncio.coroutine
1836 def instantiate(self, xact, restart_mode=False):
1837 """ instantiate this VNF """
1838 self._log.info("Instantiate VNF {}: {}".format(self._vnfr_id, self._state))
1839 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1840 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1841
1842 @asyncio.coroutine
1843 def fetch_vlrs():
1844 """ Fetch VLRs """
1845 # Iterate over all the connection points in VNFR and fetch the
1846 # associated VLRs
1847
1848 def cpr_from_cp(cp):
1849 """ Creates a record level connection point from the desciptor cp"""
1850 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1851 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1852 cpr_dict = {}
1853 cpr_dict.update(cp_copy_dict)
1854 return VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1855
1856 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1857 self._vnfr_id, self._vnfr.connection_point)
1858
1859 for cp in self._vnfr.connection_point:
1860 cpr = cpr_from_cp(cp)
1861 self._cprs.append(cpr)
1862 self._log.debug("Adding Connection point record %s ", cp)
1863
1864 vlr_path = self.vlr_xpath(cp.vlr_ref)
1865 self._log.debug("Fetching VLR with path = %s", vlr_path)
1866 res_iter = yield from self._dts.query_read(vlr_path,
1867 rwdts.XactFlag.MERGE)
1868 for i in res_iter:
1869 r = yield from i
1870 d = r.result
1871 self._ext_vlrs[cp.vlr_ref] = d
1872 cpr.vlr_ref = cp.vlr_ref
1873 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1874
1875 # Increase the VNFD reference count
1876 self.vnfd_ref()
1877
1878 assert self.vnfd
1879
1880 # Fetch External VLRs
1881 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1882 yield from fetch_vlrs()
1883
1884 # Publish inventory
1885 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1886 yield from self.publish_inventory(xact)
1887
1888 # Publish inventory
1889 self._log.debug("Create VLs {}: {}".format(self._vnfr_id, self._state))
1890 yield from self.create_vls()
1891
1892 # publish the VNFR
1893 self._log.debug("Publish VNFR {}: {}".format(self._vnfr_id, self._state))
1894 yield from self.publish(xact)
1895
1896
1897 # instantiate VLs
1898 self._log.debug("Instantiate VLs {}: {}".format(self._vnfr_id, self._state))
1899 try:
1900 yield from self.instantiate_vls(xact, restart_mode)
1901 except Exception as e:
1902 self._log.exception("VL instantiation failed (%s)", str(e))
1903 yield from self.instantiation_failed(str(e))
1904 return
1905
1906 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1907
1908 # instantiate VDUs
1909 self._log.debug("Create VDUs {}: {}".format(self._vnfr_id, self._state))
1910 yield from self.create_vdus(self, restart_mode)
1911
1912 try:
1913 yield from self.vdu_cloud_init_instantiation()
1914 except Exception as e:
1915 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1916 self._state_failed_reason = str(e)
1917 yield from self.publish(xact)
1918
1919 # publish the VNFR
1920 self._log.debug("VNFR {}: Publish VNFR with state {}".
1921 format(self._vnfr_id, self._state))
1922 yield from self.publish(xact)
1923
1924 # instantiate VDUs
1925 # ToDo: Check if this should be prevented during restart
1926 self._log.debug("Instantiate VDUs {}: {}".format(self._vnfr_id, self._state))
1927 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1928
1929 # publish the VNFR
1930 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1931 yield from self.publish(xact)
1932
1933 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1934
1935 # create task updating uptime for this vnfr
1936 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1937 self._loop.create_task(self.vnfr_uptime_update(xact))
1938
1939 @asyncio.coroutine
1940 def terminate(self, xact):
1941 """ Terminate this virtual network function """
1942
1943 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1944
1945 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1946
1947 # stop monitoring
1948 if self._vnf_mon is not None:
1949 self._vnf_mon.stop()
1950 self._vnf_mon.deregister()
1951 self._vnf_mon = None
1952
1953 @asyncio.coroutine
1954 def terminate_vls():
1955 """ Terminate VLs in this VNF """
1956 for vl in self._vlrs:
1957 yield from vl.terminate(xact)
1958
1959 @asyncio.coroutine
1960 def terminate_vdus():
1961 """ Terminate VDUS in this VNF """
1962 for vdu in self._vdus:
1963 yield from vdu.terminate(xact)
1964
1965 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1966 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1967 yield from terminate_vls()
1968
1969 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1970 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1971 yield from terminate_vdus()
1972
1973 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1974 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1975
1976 @asyncio.coroutine
1977 def vnfr_uptime_update(self, xact):
1978 while True:
1979 # Return when vnfr state is FAILED or TERMINATED etc
1980 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1981 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1982 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1983 VirtualNetworkFunctionRecordState.READY]:
1984 return
1985 yield from self.publish(xact)
1986 yield from asyncio.sleep(2, loop=self._loop)
1987
1988
1989
1990 class VnfdDtsHandler(object):
1991 """ DTS handler for VNFD config changes """
1992 XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
1993
1994 def __init__(self, dts, log, loop, vnfm):
1995 self._dts = dts
1996 self._log = log
1997 self._loop = loop
1998 self._vnfm = vnfm
1999 self._regh = None
2000
2001 @asyncio.coroutine
2002 def regh(self):
2003 """ DTS registration handle """
2004 return self._regh
2005
2006 def deregister(self):
2007 '''De-register from DTS'''
2008 self._log.debug("De-register VNFD DTS handler for project {}".
2009 format(self._project))
2010 if self._regh:
2011 self._regh.deregister()
2012 self._regh = None
2013
2014 @asyncio.coroutine
2015 def register(self):
2016 """ Register for VNFD configuration"""
2017
2018 def on_apply(dts, acg, xact, action, scratch):
2019 """Apply the configuration"""
2020 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2021 xact, action, scratch)
2022
2023 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2024
2025 @asyncio.coroutine
2026 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2027 """ on prepare callback """
2028 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2029 ks_path.to_xpath(RwVnfmYang.get_schema()),
2030 xact_info.query_action, msg)
2031 fref = ProtobufC.FieldReference.alloc()
2032 fref.goto_whole_message(msg.to_pbcm())
2033
2034 # Handle deletes in prepare_callback
2035 if fref.is_field_deleted():
2036 # Delete an VNFD record
2037 self._log.debug("Deleting VNFD with id %s", msg.id)
2038 if self._vnfm.vnfd_in_use(msg.id):
2039 self._log.debug("Cannot delete VNFD in use - %s", msg)
2040 err = "Cannot delete a VNFD in use - %s" % msg
2041 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
2042 # Delete a VNFD record
2043 yield from self._vnfm.delete_vnfd(msg.id)
2044
2045 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2046
2047 xpath = self._vnfm._project.add_project(VnfdDtsHandler.XPATH)
2048 self._log.debug("Registering for VNFD config using xpath: {}".
2049 format(xpath))
2050
2051 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2052 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2053 self._regh = acg.register(
2054 xpath=xpath,
2055 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2056 on_prepare=on_prepare)
2057
2058
2059 class VcsComponentDtsHandler(object):
2060 """ Vcs Component DTS handler """
2061 XPATH = ("D,/rw-manifest:manifest" +
2062 "/rw-manifest:operational-inventory" +
2063 "/rw-manifest:component")
2064
2065 def __init__(self, dts, log, loop, vnfm):
2066 self._dts = dts
2067 self._log = log
2068 self._loop = loop
2069 self._regh = None
2070 self._vnfm = vnfm
2071
2072 @property
2073 def regh(self):
2074 """ DTS registration handle """
2075 return self._regh
2076
2077 def deregister(self):
2078 '''De-register from DTS'''
2079 self._log.debug("De-register VCS DTS handler for project {}".
2080 format(self._project))
2081 if self._regh:
2082 self._regh.deregister()
2083 self._regh = None
2084
2085 @asyncio.coroutine
2086 def register(self):
2087 """ Registers VCS component dts publisher registration"""
2088 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2089 VcsComponentDtsHandler.XPATH)
2090
2091 hdl = rift.tasklets.DTS.RegistrationHandler()
2092 handlers = rift.tasklets.Group.Handler()
2093 with self._dts.group_create(handler=handlers) as group:
2094 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2095 handler=hdl,
2096 flags=(rwdts.Flag.PUBLISHER |
2097 rwdts.Flag.NO_PREP_READ |
2098 rwdts.Flag.DATASTORE),)
2099
2100 @asyncio.coroutine
2101 def publish(self, xact, path, msg):
2102 """ Publishes the VCS component """
2103 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2104 xact, path, msg)
2105 self.regh.create_element(path, msg)
2106 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2107 VcsComponentDtsHandler.XPATH, xact, path, msg)
2108
2109 class VnfrConsoleOperdataDtsHandler(object):
2110 """
2111 Registers 'D,/rw-project:project/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]'
2112 and handles CRUD from DTS
2113 """
2114
2115 @property
2116 def vnfr_vdu_console_xpath(self):
2117 """ path for resource-mgr"""
2118 return self._project.add_project("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']" +
2119 "/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2120
2121 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2122 self._dts = dts
2123 self._log = log
2124 self._loop = loop
2125 self._regh = None
2126 self._vnfm = vnfm
2127
2128 self._vnfr_id = vnfr_id
2129 self._vdur_id = vdur_id
2130 self._vdu_id = vdu_id
2131
2132 self._project = vnfm._project
2133
2134 def deregister(self):
2135 '''De-register from DTS'''
2136 self._log.debug("De-register VNFR console DTS handler for project {}".
2137 format(self._project))
2138 if self._regh:
2139 self._regh.deregister()
2140 self._regh = None
2141
2142 @asyncio.coroutine
2143 def register(self):
2144 """ Register for VNFR VDU Operational Data read from dts """
2145
2146 @asyncio.coroutine
2147 def on_prepare(xact_info, action, ks_path, msg):
2148 """ prepare callback from dts """
2149 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2150 self._log.debug(
2151 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2152 xact_info, action, xpath, msg
2153 )
2154
2155 if action == rwdts.QueryAction.READ:
2156 schema = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur.schema()
2157 path_entry = schema.keyspec_to_entry(ks_path)
2158 self._log.debug("VDU Opdata path is {}".format(path_entry))
2159 try:
2160 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2161 except VnfRecordError as e:
2162 self._log.error("VNFR id %s not found", self._vnfr_id)
2163 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2164 return
2165 try:
2166 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2167 if not vdur._state == VDURecordState.READY:
2168 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2169 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2170 return
2171 with self._dts.transaction() as new_xact:
2172 resp = yield from vdur.read_resource(new_xact)
2173 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2174 vdur_console.id = self._vdur_id
2175 if resp.console_url:
2176 vdur_console.console_url = resp.console_url
2177 else:
2178 vdur_console.console_url = 'none'
2179 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2180 except Exception:
2181 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2182 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2183 vdur_console.id = self._vdur_id
2184 vdur_console.console_url = 'none'
2185
2186 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2187 xpath=self.vnfr_vdu_console_xpath,
2188 msg=vdur_console)
2189 else:
2190 #raise VnfRecordError("Not supported operation %s" % action)
2191 self._log.error("Not supported operation %s" % action)
2192 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2193 return
2194
2195
2196 self._log.debug("Registering for VNFR VDU using xpath: %s",
2197 self.vnfr_vdu_console_xpath)
2198 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2199 with self._dts.group_create() as group:
2200 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2201 handler=hdl,
2202 flags=rwdts.Flag.PUBLISHER,
2203 )
2204
2205
2206 class VnfrDtsHandler(object):
2207 """ registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2208 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2209
2210 def __init__(self, dts, log, loop, vnfm):
2211 self._dts = dts
2212 self._log = log
2213 self._loop = loop
2214 self._vnfm = vnfm
2215
2216 self._regh = None
2217 self._project = vnfm._project
2218
2219 @property
2220 def regh(self):
2221 """ Return registration handle"""
2222 return self._regh
2223
2224 @property
2225 def vnfm(self):
2226 """ Return VNF manager instance """
2227 return self._vnfm
2228
2229 def deregister(self):
2230 '''De-register from DTS'''
2231 self._log.debug("De-register VNFR DTS handler for project {}".
2232 format(self._project))
2233 if self._regh:
2234 self._regh.deregister()
2235 self._regh = None
2236
2237 @asyncio.coroutine
2238 def register(self):
2239 """ Register for vnfr create/update/delete/read requests from dts """
2240 def on_commit(xact_info):
2241 """ The transaction has been committed """
2242 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2243 return rwdts.MemberRspCode.ACTION_OK
2244
2245 def on_abort(*args):
2246 """ Abort callback """
2247 self._log.debug("VNF transaction got aborted")
2248
2249 @asyncio.coroutine
2250 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2251
2252 @asyncio.coroutine
2253 def instantiate_realloc_vnfr(vnfr):
2254 """Re-populate the vnfm after restart
2255
2256 Arguments:
2257 vlink
2258
2259 """
2260
2261 yield from vnfr.instantiate(None, restart_mode=True)
2262
2263 if xact_event == rwdts.MemberEvent.INSTALL:
2264 curr_cfg = self.regh.elements
2265 for cfg in curr_cfg:
2266 vnfr = self.vnfm.create_vnfr(cfg)
2267 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2268
2269 self._log.debug("Got on_event in vnfm")
2270
2271 return rwdts.MemberRspCode.ACTION_OK
2272
2273 @asyncio.coroutine
2274 def on_prepare(xact_info, action, ks_path, msg):
2275 """ prepare callback from dts """
2276 self._log.debug(
2277 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2278 xact_info, action, msg
2279 )
2280
2281 if action == rwdts.QueryAction.CREATE:
2282 if not msg.has_field("vnfd"):
2283 err = "Vnfd not provided"
2284 self._log.error(err)
2285 raise VnfRecordError(err)
2286
2287 vnfr = self.vnfm.create_vnfr(msg)
2288 try:
2289 # RIFT-9105: Unable to add a READ query under an existing transaction
2290 # xact = xact_info.xact
2291 yield from vnfr.instantiate(None)
2292 except Exception as e:
2293 self._log.exception(e)
2294 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2295 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2296 yield from vnfr.publish(None)
2297 elif action == rwdts.QueryAction.DELETE:
2298 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2299 path_entry = schema.keyspec_to_entry(ks_path)
2300 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2301
2302 if vnfr is None:
2303 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2304 raise VirtualNetworkFunctionRecordNotFound(
2305 "VNFR id %s", path_entry.key00.id)
2306
2307 try:
2308 yield from vnfr.terminate(xact_info.xact)
2309 # Unref the VNFD
2310 vnfr.vnfd_unref()
2311 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2312 except Exception as e:
2313 self._log.exception(e)
2314 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2315
2316 elif action == rwdts.QueryAction.UPDATE:
2317 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2318 path_entry = schema.keyspec_to_entry(ks_path)
2319 vnfr = None
2320 try:
2321 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2322 except Exception as e:
2323 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2324 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2325 return
2326
2327 if vnfr is None:
2328 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2329 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2330 return
2331
2332 self._log.debug("VNFR {} update config status {} (current {})".
2333 format(vnfr.name, msg.config_status, vnfr.config_status))
2334 # Update the config status and publish
2335 vnfr._config_status = msg.config_status
2336 yield from vnfr.publish(None)
2337
2338 else:
2339 raise NotImplementedError(
2340 "%s action on VirtualNetworkFunctionRecord not supported",
2341 action)
2342
2343 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2344
2345 xpath = self._project.add_project(VnfrDtsHandler.XPATH)
2346 self._log.debug("Registering for VNFR using xpath: {}".
2347 format(xpath))
2348
2349 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2350 on_prepare=on_prepare,)
2351 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2352 with self._dts.group_create(handler=handlers) as group:
2353 self._regh = group.register(xpath=xpath,
2354 handler=hdl,
2355 flags=(rwdts.Flag.PUBLISHER |
2356 rwdts.Flag.NO_PREP_READ |
2357 rwdts.Flag.CACHE |
2358 rwdts.Flag.DATASTORE),)
2359
2360 @asyncio.coroutine
2361 def create(self, xact, xpath, msg):
2362 """
2363 Create a VNFR record in DTS with path and message
2364 """
2365 path = self._project.add_project(xpath)
2366 self._log.debug("Creating VNFR xact = %s, %s:%s",
2367 xact, path, msg)
2368
2369 self.regh.create_element(path, msg)
2370 self._log.debug("Created VNFR xact = %s, %s:%s",
2371 xact, path, msg)
2372
2373 @asyncio.coroutine
2374 def update(self, xact, xpath, msg):
2375 """
2376 Update a VNFR record in DTS with path and message
2377 """
2378 path = self._project.add_project(xpath)
2379 self._log.debug("Updating VNFR xact = %s, %s:%s",
2380 xact, path, msg)
2381 self.regh.update_element(path, msg)
2382 self._log.debug("Updated VNFR xact = %s, %s:%s",
2383 xact, path, msg)
2384
2385 @asyncio.coroutine
2386 def delete(self, xact, xpath):
2387 """
2388 Delete a VNFR record in DTS with path and message
2389 """
2390 path = self._project.add_project(xpath)
2391 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2392 self.regh.delete_element(path)
2393 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2394
2395
2396 class VnfdRefCountDtsHandler(object):
2397 """ The VNFD Ref Count DTS handler """
2398 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2399
2400 def __init__(self, dts, log, loop, vnfm):
2401 self._dts = dts
2402 self._log = log
2403 self._loop = loop
2404 self._vnfm = vnfm
2405
2406 self._regh = None
2407
2408 @property
2409 def regh(self):
2410 """ Return registration handle """
2411 return self._regh
2412
2413 @property
2414 def vnfm(self):
2415 """ Return the NS manager instance """
2416 return self._vnfm
2417
2418 def deregister(self):
2419 '''De-register from DTS'''
2420 self._log.debug("De-register VNFD Ref DTS handler for project {}".
2421 format(self._project))
2422 if self._regh:
2423 self._regh.deregister()
2424 self._regh = None
2425
2426 @asyncio.coroutine
2427 def register(self):
2428 """ Register for VNFD ref count read from dts """
2429
2430 @asyncio.coroutine
2431 def on_prepare(xact_info, action, ks_path, msg):
2432 """ prepare callback from dts """
2433 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2434 self._log.debug(
2435 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2436 xact_info, action, xpath, msg
2437 )
2438
2439 if action == rwdts.QueryAction.READ:
2440 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount.schema()
2441 path_entry = schema.keyspec_to_entry(ks_path)
2442 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2443 for xpath, msg in vnfd_list:
2444 self._log.debug("Responding to ref count query path:%s, msg:%s",
2445 xpath, msg)
2446 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2447 xpath=xpath,
2448 msg=msg)
2449 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2450 else:
2451 raise VnfRecordError("Not supported operation %s" % action)
2452
2453 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2454 with self._dts.group_create() as group:
2455 self._regh = group.register(xpath=self._vnfm._project.add_project(
2456 VnfdRefCountDtsHandler.XPATH),
2457 handler=hdl,
2458 flags=rwdts.Flag.PUBLISHER,
2459 )
2460
2461
2462 class VdurDatastore(object):
2463 """
2464 This VdurDatastore is intended to expose select information about a VDUR
2465 such that it can be referenced in a cloud config file. The data that is
2466 exposed does not necessarily follow the structure of the data in the yang
2467 model. This is intentional. The data that are exposed are intended to be
2468 agnostic of the yang model so that changes in the model do not necessarily
2469 require changes to the interface provided to the user. It also means that
2470 the user does not need to be familiar with the RIFT.ware yang models.
2471 """
2472
2473 def __init__(self):
2474 """Create an instance of VdurDatastore"""
2475 self._vdur_data = dict()
2476 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2477
2478 def add(self, vdur):
2479 """Add a new VDUR to the datastore
2480
2481 Arguments:
2482 vdur - a VirtualDeploymentUnitRecord instance
2483
2484 Raises:
2485 A ValueError is raised if the VDUR is (1) None or (2) already in
2486 the datastore.
2487
2488 """
2489 if vdur.vdu_id is None:
2490 raise ValueError('VDURs are required to have an ID')
2491
2492 if vdur.vdu_id in self._vdur_data:
2493 raise ValueError('cannot add a VDUR more than once')
2494
2495 self._vdur_data[vdur.vdu_id] = dict()
2496
2497 def set_if_not_none(key, attr):
2498 if attr is not None:
2499 self._vdur_data[vdur.vdu_id][key] = attr
2500
2501 set_if_not_none('name', vdur._vdud.name)
2502 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2503
2504 def update(self, vdur):
2505 """Update the VDUR information in the datastore
2506
2507 Arguments:
2508 vdur - a GI representation of a VDUR
2509
2510 Raises:
2511 A ValueError is raised if the VDUR is (1) None or (2) already in
2512 the datastore.
2513
2514 """
2515 if vdur.vdu_id is None:
2516 raise ValueError('VNFDs are required to have an ID')
2517
2518 if vdur.vdu_id not in self._vdur_data:
2519 raise ValueError('VNF is not recognized')
2520
2521 def set_or_delete(key, attr):
2522 if attr is None:
2523 if key in self._vdur_data[vdur.vdu_id]:
2524 del self._vdur_data[vdur.vdu_id][key]
2525
2526 else:
2527 self._vdur_data[vdur.vdu_id][key] = attr
2528
2529 set_or_delete('name', vdur._vdud.name)
2530 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2531
2532 def remove(self, vdur_id):
2533 """Remove all of the data associated with specified VDUR
2534
2535 Arguments:
2536 vdur_id - the identifier of a VNFD in the datastore
2537
2538 Raises:
2539 A ValueError is raised if the VDUR is not contained in the
2540 datastore.
2541
2542 """
2543 if vdur_id not in self._vdur_data:
2544 raise ValueError('VNF is not recognized')
2545
2546 del self._vdur_data[vdur_id]
2547
2548 def get(self, expr):
2549 """Retrieve VDUR information from the datastore
2550
2551 An expression should be of the form,
2552
2553 vdu[<id>].<attr>
2554
2555 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2556 the exposed attribute that the user wishes to retrieve.
2557
2558 If the requested data is not available, None is returned.
2559
2560 Arguments:
2561 expr - a string that specifies the data to return
2562
2563 Raises:
2564 A ValueError is raised if the provided expression cannot be parsed.
2565
2566 Returns:
2567 The requested data or None
2568
2569 """
2570 result = self._pattern.match(expr)
2571 if result is None:
2572 raise ValueError('data expression not recognized ({})'.format(expr))
2573
2574 vdur_id, key = result.groups()
2575
2576 if vdur_id not in self._vdur_data:
2577 return None
2578
2579 return self._vdur_data[vdur_id].get(key, None)
2580
2581
2582 class VnfManager(object):
2583 """ The virtual network function manager class """
2584 def __init__(self, dts, log, loop, project, cluster_name):
2585 self._dts = dts
2586 self._log = log
2587 self._loop = loop
2588 self._project = project
2589 self._cluster_name = cluster_name
2590
2591 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2592 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2593 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2594 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(
2595 log, dts, loop, project, callback=self.handle_nsr)
2596
2597 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2598 self._vnfr_handler,
2599 self._vcs_handler,
2600 self._vnfr_ref_handler,
2601 self._nsr_handler]
2602 self._vnfrs = {}
2603 self._vnfds_to_vnfr = {}
2604 self._nsrs = {}
2605
2606 @property
2607 def vnfr_handler(self):
2608 """ VNFR dts handler """
2609 return self._vnfr_handler
2610
2611 @property
2612 def vcs_handler(self):
2613 """ VCS dts handler """
2614 return self._vcs_handler
2615
2616 @asyncio.coroutine
2617 def register(self):
2618 """ Register all static DTS handlers """
2619 for hdl in self._dts_handlers:
2620 yield from hdl.register()
2621
2622 def deregister(self):
2623 self.log.debug("De-register VNFM project {}".format(self.name))
2624 for hdl in self._dts_handlers:
2625 hdl.deregister()
2626
2627 @asyncio.coroutine
2628 def run(self):
2629 """ Run this VNFM instance """
2630 self._log.debug("Run VNFManager - registering static DTS handlers""")
2631 yield from self.register()
2632
2633 def handle_nsr(self, nsr, action):
2634 if action in [rwdts.QueryAction.CREATE]:
2635 self._nsrs[nsr.id] = nsr
2636 elif action == rwdts.QueryAction.DELETE:
2637 if nsr.id in self._nsrs:
2638 del self._nsrs[nsr.id]
2639
2640 def get_linked_mgmt_network(self, vnfr):
2641 """For the given VNFR get the related mgmt network from the NSD, if
2642 available.
2643 """
2644 vnfd_id = vnfr.vnfd.id
2645 nsr_id = vnfr.nsr_id_ref
2646
2647 # for the given related VNFR, get the corresponding NSR-config
2648 nsr_obj = None
2649 try:
2650 nsr_obj = self._nsrs[nsr_id]
2651 except KeyError:
2652 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2653
2654 # for the related NSD check if a VLD exists such that it's a mgmt
2655 # network
2656 for vld in nsr_obj.nsd.vld:
2657 if vld.mgmt_network:
2658 return vld.name
2659
2660 return None
2661
2662 def get_vnfr(self, vnfr_id):
2663 """ get VNFR by vnfr id """
2664
2665 if vnfr_id not in self._vnfrs:
2666 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2667
2668 return self._vnfrs[vnfr_id]
2669
2670 def create_vnfr(self, vnfr):
2671 """ Create a VNFR instance """
2672 if vnfr.id in self._vnfrs:
2673 msg = "Vnfr id %s already exists" % vnfr.id
2674 self._log.error(msg)
2675 raise VnfRecordError(msg)
2676
2677 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2678 vnfr.id,
2679 vnfr.vnfd.id)
2680
2681 mgmt_network = self.get_linked_mgmt_network(vnfr)
2682
2683 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2684 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2685 mgmt_network=mgmt_network
2686 )
2687
2688 #Update ref count
2689 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2690 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2691 else:
2692 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2693
2694 return self._vnfrs[vnfr.id]
2695
2696 @asyncio.coroutine
2697 def delete_vnfr(self, xact, vnfr):
2698 """ Create a VNFR instance """
2699 if vnfr.vnfr_id in self._vnfrs:
2700 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2701 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2702
2703 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2704 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2705 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2706
2707 del self._vnfrs[vnfr.vnfr_id]
2708
2709 @asyncio.coroutine
2710 def fetch_vnfd(self, vnfd_id):
2711 """ Fetch VNFDs based with the vnfd id"""
2712 vnfd_path = self._project.add_project(
2713 VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id))
2714 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2715 vnfd = None
2716
2717 res_iter = yield from self._dts.query_read(vnfd_path,
2718 rwdts.XactFlag.MERGE)
2719
2720 for ent in res_iter:
2721 res = yield from ent
2722 vnfd = res.result
2723
2724 if vnfd is None:
2725 err = "Failed to get Vnfd %s" % vnfd_id
2726 self._log.error(err)
2727 raise VnfRecordError(err)
2728
2729 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2730
2731 return vnfd
2732
2733 def vnfd_in_use(self, vnfd_id):
2734 """ Is this VNFD in use """
2735 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2736 if vnfd_id in self._vnfds_to_vnfr:
2737 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2738 return False
2739
2740 @asyncio.coroutine
2741 def publish_vnfr(self, xact, path, msg):
2742 """ Publish a VNFR """
2743 self._log.debug("publish_vnfr called with path %s, msg %s",
2744 path, msg)
2745 yield from self.vnfr_handler.update(xact, path, msg)
2746
2747 @asyncio.coroutine
2748 def delete_vnfd(self, vnfd_id):
2749 """ Delete the Virtual Network Function descriptor with the passed id """
2750 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2751 if vnfd_id in self._vnfds_to_vnfr:
2752 if self._vnfds_to_vnfr[vnfd_id]:
2753 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2754 vnfd_id,
2755 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2756 raise VirtualNetworkFunctionDescriptorRefCountExists(
2757 "Cannot delete :%s, ref_count:%s",
2758 vnfd_id,
2759 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2760
2761 del self._vnfds_to_vnfr[vnfd_id]
2762
2763 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2764 try:
2765 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2766 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2767 if os.path.exists(vnfd_dir):
2768 shutil.rmtree(vnfd_dir, ignore_errors=True)
2769 except Exception as e:
2770 self._log.error("Exception in cleaning up VNFD {}: {}".
2771 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2772 self._log.exception(e)
2773
2774
2775 def vnfd_refcount_xpath(self, vnfd_id):
2776 """ xpath for ref count entry """
2777 return self._project.add_project(VnfdRefCountDtsHandler.XPATH +
2778 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2779
2780 @asyncio.coroutine
2781 def get_vnfd_refcount(self, vnfd_id):
2782 """ Get the vnfd_list from this VNFM"""
2783 vnfd_list = []
2784 if vnfd_id is None or vnfd_id == "":
2785 for vnfd in self._vnfds_to_vnfr.keys():
2786 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2787 vnfd_msg.vnfd_id_ref = vnfd
2788 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2789 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2790 elif vnfd_id in self._vnfds_to_vnfr:
2791 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2792 vnfd_msg.vnfd_id_ref = vnfd_id
2793 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2794 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2795
2796 return vnfd_list
2797
2798
2799 class VnfmProject(ManoProject):
2800
2801 def __init__(self, name, tasklet, **kw):
2802 super(VnfmProject, self).__init__(tasklet.log, name)
2803 self.update(tasklet)
2804
2805 self._vnfm = None
2806
2807 @asyncio.coroutine
2808 def register (self):
2809 try:
2810 vm_parent_name = self._tasklet.tasklet_info.get_parent_vm_parent_instance_name()
2811 assert vm_parent_name is not None
2812 self._vnfm = VnfManager(self._dts, self.log, self.loop, self, vm_parent_name)
2813 yield from self._vnfm.run()
2814 except Exception:
2815 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2816 raise
2817
2818 def deregister(self):
2819 self._log.debug("De-register project {} for VnfmProject".
2820 format(self.name))
2821 self._vnfm.deregister()
2822
2823
2824 class VnfmTasklet(rift.tasklets.Tasklet):
2825 """ VNF Manager tasklet class """
2826 def __init__(self, *args, **kwargs):
2827 super(VnfmTasklet, self).__init__(*args, **kwargs)
2828 self.rwlog.set_category("rw-mano-log")
2829 self.rwlog.set_subcategory("vnfm")
2830
2831 self._dts = None
2832 self._project_handler = None
2833 self.projects = {}
2834
2835 @property
2836 def dts(self):
2837 return self._dts
2838
2839 def start(self):
2840 try:
2841 super(VnfmTasklet, self).start()
2842 self.log.info("Starting VnfmTasklet")
2843
2844 self.log.setLevel(logging.DEBUG)
2845
2846 self.log.debug("Registering with dts")
2847 self._dts = rift.tasklets.DTS(self.tasklet_info,
2848 RwVnfmYang.get_schema(),
2849 self.loop,
2850 self.on_dts_state_change)
2851
2852 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2853 except Exception:
2854 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2855 raise
2856
2857 def on_instance_started(self):
2858 """ Task insance started callback """
2859 self.log.debug("Got instance started callback")
2860
2861 def stop(self):
2862 try:
2863 self._dts.deinit()
2864 except Exception:
2865 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2866 raise
2867
2868 @asyncio.coroutine
2869 def init(self):
2870 """ Task init callback """
2871 self.log.debug("creating project handler")
2872 self.project_handler = ProjectHandler(self, VnfmProject)
2873 self.project_handler.register()
2874
2875 @asyncio.coroutine
2876 def run(self):
2877 """ Task run callback """
2878 pass
2879
2880 @asyncio.coroutine
2881 def on_dts_state_change(self, state):
2882 """Take action according to current dts state to transition
2883 application into the corresponding application state
2884
2885 Arguments
2886 state - current dts state
2887 """
2888 switch = {
2889 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2890 rwdts.State.CONFIG: rwdts.State.RUN,
2891 }
2892
2893 handlers = {
2894 rwdts.State.INIT: self.init,
2895 rwdts.State.RUN: self.run,
2896 }
2897
2898 # Transition application to next state
2899 handler = handlers.get(state, None)
2900 if handler is not None:
2901 yield from handler()
2902
2903 # Transition dts to next state
2904 next_state = switch.get(state, None)
2905 if next_state is not None:
2906 self._dts.handle.set_state(next_state)