New feature: Code changes for project support
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54 from rift.mano.utils.project import (
55 ManoProject,
56 ProjectHandler,
57 )
58
59
60 class VMResourceError(Exception):
61 """ VM resource Error"""
62 pass
63
64
65 class VnfRecordError(Exception):
66 """ VNF record instatiation failed"""
67 pass
68
69
70 class VduRecordError(Exception):
71 """ VDU record instatiation failed"""
72 pass
73
74
75 class NotImplemented(Exception):
76 """Not implemented """
77 pass
78
79
80 class VnfrRecordExistsError(Exception):
81 """VNFR record already exist with the same VNFR id"""
82 pass
83
84
85 class InternalVirtualLinkRecordError(Exception):
86 """Internal virtual link record error"""
87 pass
88
89
90 class VDUImageNotFound(Exception):
91 """VDU Image not found error"""
92 pass
93
94
95 class VirtualDeploymentUnitRecordError(Exception):
96 """VDU Instantiation failed"""
97 pass
98
99
100 class VMNotReadyError(Exception):
101 """ VM Not yet received from resource manager """
102 pass
103
104
105 class VDURecordNotFound(Exception):
106 """ Could not find a VDU record """
107 pass
108
109
110 class VirtualNetworkFunctionRecordDescNotFound(Exception):
111 """ Cannot find Virtual Network Function Record Descriptor """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorError(Exception):
116 """ Virtual Network Function Record Descriptor Error """
117 pass
118
119
120 class VirtualNetworkFunctionDescriptorNotFound(Exception):
121 """ Virtual Network Function Record Descriptor Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionRecordNotFound(Exception):
126 """ Virtual Network Function Record Not Found """
127 pass
128
129
130 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
131 """ Virtual Network Funtion Descriptor reference count exists """
132 pass
133
134
135 class VnfrInstantiationFailed(Exception):
136 """ Virtual Network Funtion Instantiation failed"""
137 pass
138
139
140 class VNFMPlacementGroupError(Exception):
141 pass
142
143 class VirtualNetworkFunctionRecordState(enum.Enum):
144 """ VNFR state """
145 INIT = 1
146 VL_INIT_PHASE = 2
147 VM_INIT_PHASE = 3
148 READY = 4
149 TERMINATE = 5
150 VL_TERMINATE_PHASE = 6
151 VDU_TERMINATE_PHASE = 7
152 TERMINATED = 7
153 FAILED = 10
154
155
156 class VDURecordState(enum.Enum):
157 """VDU record state """
158 INIT = 1
159 INSTANTIATING = 2
160 RESOURCE_ALLOC_PENDING = 3
161 READY = 4
162 TERMINATING = 5
163 TERMINATED = 6
164 FAILED = 10
165
166
167 class VcsComponent(object):
168 """ VCS Component within the VNF descriptor """
169 def __init__(self, dts, log, loop, cluster_name,
170 vcs_handler, component, mangled_name):
171 self._dts = dts
172 self._log = log
173 self._loop = loop
174 self._component = component
175 self._cluster_name = cluster_name
176 self._vcs_handler = vcs_handler
177 self._mangled_name = mangled_name
178
179 @staticmethod
180 def mangle_name(component_name, vnf_name, vnfd_id):
181 """ mangled component name """
182 return vnf_name + ":" + component_name + ":" + vnfd_id
183
184 @property
185 def name(self):
186 """ name of this component"""
187 return self._mangled_name
188
189 @property
190 def path(self):
191 """ The path for this object """
192 return ("D,/rw-manifest:manifest" +
193 "/rw-manifest:operational-inventory" +
194 "/rw-manifest:component" +
195 "[rw-manifest:component-name = '{}']").format(self.name)
196
197 @property
198 def instance_xpath(self):
199 """ The path for this object """
200 return("D,/rw-base:vcs" +
201 "/instances" +
202 "/instance" +
203 "[instance-name = '{}']".format(self._cluster_name))
204
205 @property
206 def start_comp_xpath(self):
207 """ start component xpath """
208 return (self.instance_xpath +
209 "/child-n[instance-name = 'START-REQ']")
210
211 def get_start_comp_msg(self, ip_address):
212 """ start this component """
213 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
214 start_msg.instance_name = 'START-REQ'
215 start_msg.component_name = self.name
216 start_msg.admin_command = "START"
217 start_msg.ip_address = ip_address
218
219 return start_msg
220
221 @property
222 def msg(self):
223 """ Returns the message for this vcs component"""
224
225 vcs_comp_dict = self._component.as_dict()
226
227 def mangle_comp_names(comp_dict):
228 """ mangle component name with VNF name, id"""
229 for key, val in comp_dict.items():
230 if isinstance(val, dict):
231 comp_dict[key] = mangle_comp_names(val)
232 elif isinstance(val, list):
233 i = 0
234 for ent in val:
235 if isinstance(ent, dict):
236 val[i] = mangle_comp_names(ent)
237 else:
238 val[i] = ent
239 i += 1
240 elif key == "component_name":
241 comp_dict[key] = VcsComponent.mangle_name(val,
242 self._vnfd_name,
243 self._vnfd_id)
244 return comp_dict
245
246 mangled_dict = mangle_comp_names(vcs_comp_dict)
247 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
248 return msg
249
250 @asyncio.coroutine
251 def publish(self, xact):
252 """ Publishes the VCS component """
253 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
254 self.name, self.path, self.msg)
255 yield from self._vcs_handler.publish(xact, self.path, self.msg)
256
257 @asyncio.coroutine
258 def start(self, xact, parent, ip_addr=None):
259 """ Starts this VCS component """
260 # ATTN RV - replace with block add
261 start_msg = self.get_start_comp_msg(ip_addr)
262 self._log.debug("starting component %s %s",
263 self.start_comp_xpath, start_msg)
264 yield from self._dts.query_create(self.start_comp_xpath,
265 0,
266 start_msg)
267 self._log.debug("started component %s, %s",
268 self.start_comp_xpath, start_msg)
269
270
271 class VirtualDeploymentUnitRecord(object):
272 """ Virtual Deployment Unit Record """
273 def __init__(self,
274 dts,
275 log,
276 loop,
277 project,
278 vdud,
279 vnfr,
280 mgmt_intf,
281 mgmt_network,
282 cloud_account_name,
283 vnfd_package_store,
284 vdur_id=None,
285 placement_groups=[]):
286 self._dts = dts
287 self._log = log
288 self._loop = loop
289 self._project = project
290 self._vdud = vdud
291 self._vnfr = vnfr
292 self._mgmt_intf = mgmt_intf
293 self._cloud_account_name = cloud_account_name
294 self._vnfd_package_store = vnfd_package_store
295 self._mgmt_network = mgmt_network
296
297 self._vdur_id = vdur_id or str(uuid.uuid4())
298 self._int_intf = []
299 self._ext_intf = []
300 self._state = VDURecordState.INIT
301 self._state_failed_reason = None
302 self._request_id = str(uuid.uuid4())
303 self._name = vnfr.name + "__" + vdud.id
304 self._placement_groups = placement_groups
305 self._rm_regh = None
306 self._vm_resp = None
307 self._vdud_cloud_init = None
308 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(
309 dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
310
311 @asyncio.coroutine
312 def vdu_opdata_register(self):
313 yield from self._vdur_console_handler.register()
314
315 def cp_ip_addr(self, cp_name):
316 """ Find ip address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.ip_address
321 return "0.0.0.0"
322
323 def cp_mac_addr(self, cp_name):
324 """ Find mac address by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.mac_addr
329 return "00:00:00:00:00:00"
330
331 def cp_id(self, cp_name):
332 """ Find connection point id by connection point name """
333 if self._vm_resp is not None:
334 for conn_point in self._vm_resp.connection_points:
335 if conn_point.name == cp_name:
336 return conn_point.connection_point_id
337 return ''
338
339 @property
340 def vdu_id(self):
341 return self._vdud.id
342
343 @property
344 def vm_resp(self):
345 return self._vm_resp
346
347 @property
348 def name(self):
349 """ Return this VDUR's name """
350 return self._name
351
352 @property
353 def cloud_account_name(self):
354 """ Cloud account this VDU should be created in """
355 return self._cloud_account_name
356
357 @property
358 def image_name(self):
359 """ name that should be used to lookup the image on the CMP """
360 if 'image' not in self._vdud:
361 return None
362 return os.path.basename(self._vdud.image)
363
364 @property
365 def image_checksum(self):
366 """ name that should be used to lookup the image on the CMP """
367 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
368
369 @property
370 def management_ip(self):
371 if not self.active:
372 return None
373 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
374
375 @property
376 def vm_management_ip(self):
377 if not self.active:
378 return None
379 return self._vm_resp.management_ip
380
381 @property
382 def operational_status(self):
383 """ Operational status of this VDU"""
384 op_stats_dict = {"INIT": "init",
385 "INSTANTIATING": "vm_init_phase",
386 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
387 "READY": "running",
388 "FAILED": "failed",
389 "TERMINATING": "terminated",
390 "TERMINATED": "terminated",
391 }
392 return op_stats_dict[self._state.name]
393
394 @property
395 def msg(self):
396 """ Process VDU message from resmgr"""
397 vdu_fields = ["vm_flavor",
398 "guest_epa",
399 "vswitch_epa",
400 "hypervisor_epa",
401 "host_epa",
402 "volumes",
403 "name"]
404 vdu_copy_dict = {k: v for k, v in
405 self._vdud.as_dict().items() if k in vdu_fields}
406 vdur_dict = {"id": self._vdur_id,
407 "vdu_id_ref": self._vdud.id,
408 "operational_status": self.operational_status,
409 "operational_status_details": self._state_failed_reason,
410 }
411 if self.vm_resp is not None:
412 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
413 "flavor_id": self.vm_resp.flavor_id
414 })
415 if self._vm_resp.has_field('image_id'):
416 vdur_dict.update({ "image_id": self.vm_resp.image_id })
417
418 if self.management_ip is not None:
419 vdur_dict["management_ip"] = self.management_ip
420
421 if self.vm_management_ip is not None:
422 vdur_dict["vm_management_ip"] = self.vm_management_ip
423
424 vdur_dict.update(vdu_copy_dict)
425
426 if self.vm_resp is not None:
427 if self._vm_resp.has_field('volumes'):
428 for opvolume in self._vm_resp.volumes:
429 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
430 if len(vdurvol_data) == 1:
431 vdurvol_data[0]["volume_id"] = opvolume.volume_id
432 if opvolume.has_field('custom_meta_data'):
433 metadata_list = list()
434 for metadata_item in opvolume.custom_meta_data:
435 metadata_list.append(metadata_item.as_dict())
436 vdurvol_data[0]['custom_meta_data'] = metadata_list
437
438 if self._vm_resp.has_field('supplemental_boot_data'):
439 vdur_dict['supplemental_boot_data'] = dict()
440 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
441 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
442 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
443 metadata_list = list()
444 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
445 metadata_list.append(metadata_item.as_dict())
446 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
447 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
448 file_list = list()
449 for file_item in self._vm_resp.supplemental_boot_data.config_file:
450 file_list.append(file_item.as_dict())
451 vdur_dict['supplemental_boot_data']['config_file'] = file_list
452
453 icp_list = []
454 ii_list = []
455
456 for intf, cp_id, vlr in self._int_intf:
457 cp = self.find_internal_cp_by_cp_id(cp_id)
458
459 icp_list.append({"name": cp.name,
460 "id": cp.id,
461 "type_yang": "VPORT",
462 "ip_address": self.cp_ip_addr(cp.id),
463 "mac_address": self.cp_mac_addr(cp.id)})
464
465 ii_list.append({"name": intf.name,
466 "vdur_internal_connection_point_ref": cp.id,
467 "virtual_interface": {}})
468
469 vdur_dict["internal_connection_point"] = icp_list
470 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
471 vdur_dict["internal_interface"] = ii_list
472
473 ei_list = []
474 for intf, cp, vlr in self._ext_intf:
475 ei_list.append({"name": cp.name,
476 "vnfd_connection_point_ref": cp.name,
477 "virtual_interface": {}})
478 self._vnfr.update_cp(cp.name,
479 self.cp_ip_addr(cp.name),
480 self.cp_mac_addr(cp.name),
481 self.cp_id(cp.name))
482
483 vdur_dict["external_interface"] = ei_list
484
485 placement_groups = []
486 for group in self._placement_groups:
487 placement_groups.append(group.as_dict())
488 vdur_dict['placement_groups_info'] = placement_groups
489
490 return RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
491
492 @property
493 def resmgr_path(self):
494 """ path for resource-mgr"""
495 xpath = self._project.add_project("D,/rw-resource-mgr:resource-mgmt" +
496 "/vdu-event" +
497 "/vdu-event-data[event-id='{}']".format(self._request_id))
498 return xpath
499
500 @property
501 def vm_flavor_msg(self):
502 """ VM flavor message """
503 flavor = self._vdud.vm_flavor.__class__()
504 flavor.copy_from(self._vdud.vm_flavor)
505
506 return flavor
507
508 @property
509 def vdud_cloud_init(self):
510 """ Return the cloud-init contents for the VDU """
511 if self._vdud_cloud_init is None:
512 self._vdud_cloud_init = self.cloud_init()
513
514 return self._vdud_cloud_init
515
516 def cloud_init(self):
517 """ Populate cloud_init with cloud-config script from
518 either the inline contents or from the file provided
519 """
520 if self._vdud.cloud_init is not None:
521 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
522 return self._vdud.cloud_init
523 elif self._vdud.cloud_init_file is not None:
524 # Get cloud-init script contents from the file provided in the cloud_init_file param
525 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
526 filename = self._vdud.cloud_init_file
527 self._vnfd_package_store.refresh()
528 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
529 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
530 try:
531 return cloud_init_extractor.read_script(stored_package, filename)
532 except rift.package.cloud_init.CloudInitExtractionError as e:
533 self.instantiation_failed(str(e))
534 raise VirtualDeploymentUnitRecordError(e)
535 else:
536 self._log.debug("VDU Instantiation: cloud-init script not provided")
537
538 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
539 host_aggregates = []
540 availability_zones = []
541 server_groups = []
542 for group in self._placement_groups:
543 if group.has_field('host_aggregate'):
544 for aggregate in group.host_aggregate:
545 host_aggregates.append(aggregate.as_dict())
546 if group.has_field('availability_zone'):
547 availability_zones.append(group.availability_zone.as_dict())
548 if group.has_field('server_group'):
549 server_groups.append(group.server_group.as_dict())
550
551 if availability_zones:
552 if len(availability_zones) > 1:
553 self._log.error("Can not launch VDU: %s in multiple availability zones. " +
554 "Requested Zones: %s", self.name, availability_zones)
555 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability" +
556 " zones. Requsted Zones".format(self.name, availability_zones))
557 else:
558 vm_create_msg_dict['availability_zone'] = availability_zones[0]
559
560 if server_groups:
561 if len(server_groups) > 1:
562 self._log.error("Can not launch VDU: %s in multiple Server Group. " +
563 "Requested Groups: %s", self.name, server_groups)
564 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple " +
565 "Server Groups. Requsted Groups".format(self.name, server_groups))
566 else:
567 vm_create_msg_dict['server_group'] = server_groups[0]
568
569 if host_aggregates:
570 vm_create_msg_dict['host_aggregate'] = host_aggregates
571
572 return
573
574 def process_placement_groups(self, vm_create_msg_dict):
575 """Process the placement_groups and fill resource-mgr request"""
576 if not self._placement_groups:
577 return
578
579 cloud_set = set([group.cloud_type for group in self._placement_groups])
580 assert len(cloud_set) == 1
581 cloud_type = cloud_set.pop()
582
583 if cloud_type == 'openstack':
584 self.process_openstack_placement_group_construct(vm_create_msg_dict)
585
586 else:
587 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
588 return
589
590 def process_custom_bootdata(self, vm_create_msg_dict):
591 """Process the custom boot data"""
592 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
593 return
594
595 self._vnfd_package_store.refresh()
596 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
597 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
598 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
599 if 'source' not in file_item or 'dest' not in file_item:
600 continue
601 source = file_item['source']
602 # Find source file in scripts dir of VNFD
603 self._log.debug("Checking for source config file at %s", source)
604 try:
605 source_file_str = cloud_init_extractor.read_script(stored_package, source)
606 except rift.package.cloud_init.CloudInitExtractionError as e:
607 raise VirtualDeploymentUnitRecordError(e)
608 # Update source file location with file contents
609 file_item['source'] = source_file_str
610
611 return
612
613 def resmgr_msg(self, config=None):
614 vdu_fields = ["vm_flavor",
615 "guest_epa",
616 "vswitch_epa",
617 "hypervisor_epa",
618 "host_epa",
619 "volumes",
620 "supplemental_boot_data"]
621
622 self._log.debug("Creating params based on VDUD: %s", self._vdud)
623 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
624
625 vm_create_msg_dict = {
626 "name": self.name,
627 }
628
629 if self.image_name is not None:
630 vm_create_msg_dict["image_name"] = self.image_name
631
632 if self.image_checksum is not None:
633 vm_create_msg_dict["image_checksum"] = self.image_checksum
634
635 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
636 if self._vdud.has_field('mgmt_vpci'):
637 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
638
639 self._log.debug("VDUD: %s", self._vdud)
640 if config is not None:
641 vm_create_msg_dict['vdu_init'] = {'userdata': config}
642
643 if self._mgmt_network:
644 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
645
646 cp_list = []
647 for intf, cp, vlr in self._ext_intf:
648 cp_info = { "name": cp.name,
649 "virtual_link_id": vlr.network_id,
650 "type_yang": intf.virtual_interface.type_yang }
651
652 if cp.has_field('port_security_enabled'):
653 cp_info["port_security_enabled"] = cp.port_security_enabled
654
655 if (intf.virtual_interface.has_field('vpci') and
656 intf.virtual_interface.vpci is not None):
657 cp_info["vpci"] = intf.virtual_interface.vpci
658
659 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
660 cp_info['security_group'] = vlr.ip_profile_params.security_group
661
662 cp_list.append(cp_info)
663
664 for intf, cp, vlr in self._int_intf:
665 if (intf.virtual_interface.has_field('vpci') and
666 intf.virtual_interface.vpci is not None):
667 cp_list.append({"name": cp,
668 "virtual_link_id": vlr.network_id,
669 "type_yang": intf.virtual_interface.type_yang,
670 "vpci": intf.virtual_interface.vpci})
671 else:
672 if cp.has_field('port_security_enabled'):
673 cp_list.append({"name": cp,
674 "virtual_link_id": vlr.network_id,
675 "type_yang": intf.virtual_interface.type_yang,
676 "port_security_enabled": cp.port_security_enabled})
677 else:
678 cp_list.append({"name": cp,
679 "virtual_link_id": vlr.network_id,
680 "type_yang": intf.virtual_interface.type_yang})
681
682
683 vm_create_msg_dict["connection_points"] = cp_list
684 vm_create_msg_dict.update(vdu_copy_dict)
685
686 self.process_placement_groups(vm_create_msg_dict)
687 if 'supplemental_boot_data' in vm_create_msg_dict:
688 self.process_custom_bootdata(vm_create_msg_dict)
689
690 msg = RwResourceMgrYang.VDUEventData()
691 msg.event_id = self._request_id
692 msg.cloud_account = self.cloud_account_name
693 msg.request_info.from_dict(vm_create_msg_dict)
694
695 return msg
696
697 @asyncio.coroutine
698 def terminate(self, xact):
699 """ Delete resource in VIM """
700 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
701 self._log.warning("VDU terminate in not ready state - Ignoring request")
702 return
703
704 self._state = VDURecordState.TERMINATING
705 if self._vm_resp is not None:
706 try:
707 with self._dts.transaction() as new_xact:
708 yield from self.delete_resource(new_xact)
709 except Exception:
710 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
711
712 if self._rm_regh is not None:
713 self._log.debug("Deregistering resource manager registration handle")
714 self._rm_regh.deregister()
715 self._rm_regh = None
716
717 if self._vdur_console_handler is not None:
718 self._log.debug("Deregistering vnfr vdur registration handle")
719 self._vdur_console_handler._regh.deregister()
720 self._vdur_console_handler._regh = None
721
722 self._state = VDURecordState.TERMINATED
723
724 def find_internal_cp_by_cp_id(self, cp_id):
725 """ Find the CP corresponding to the connection point id"""
726 cp = None
727
728 self._log.debug("find_internal_cp_by_cp_id(%s) called",
729 cp_id)
730
731 for int_cp in self._vdud.internal_connection_point:
732 self._log.debug("Checking for int cp %s in internal connection points",
733 int_cp.id)
734 if int_cp.id == cp_id:
735 cp = int_cp
736 break
737
738 if cp is None:
739 self._log.debug("Failed to find cp %s in internal connection points",
740 cp_id)
741 msg = "Failed to find cp %s in internal connection points" % cp_id
742 raise VduRecordError(msg)
743
744 # return the VLR associated with the connection point
745 return cp
746
747 @asyncio.coroutine
748 def create_resource(self, xact, vnfr, config=None):
749 """ Request resource from ResourceMgr """
750 def find_cp_by_name(cp_name):
751 """ Find a connection point by name """
752 cp = None
753 self._log.debug("find_cp_by_name(%s) called", cp_name)
754 for ext_cp in vnfr._cprs:
755 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
756 if ext_cp.name == cp_name:
757 cp = ext_cp
758 break
759 if cp is None:
760 self._log.debug("Failed to find cp %s in external connection points",
761 cp_name)
762 return cp
763
764 def find_internal_vlr_by_cp_name(cp_name):
765 """ Find the VLR corresponding to the connection point name"""
766 cp = None
767
768 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
769 cp_name)
770
771 for int_cp in self._vdud.internal_connection_point:
772 self._log.debug("Checking for int cp %s in internal connection points",
773 int_cp.id)
774 if int_cp.id == cp_name:
775 cp = int_cp
776 break
777
778 if cp is None:
779 self._log.debug("Failed to find cp %s in internal connection points",
780 cp_name)
781 msg = "Failed to find cp %s in internal connection points" % cp_name
782 raise VduRecordError(msg)
783
784 # return the VLR associated with the connection point
785 return vnfr.find_vlr_by_cp(cp_name)
786
787 block = xact.block_create()
788
789 self._log.debug("Executing vm request id: %s, action: create",
790 self._request_id)
791
792 # Resolve the networks associated external interfaces
793 for ext_intf in self._vdud.external_interface:
794 self._log.debug("Resolving external interface name [%s], cp[%s]",
795 ext_intf.name, ext_intf.vnfd_connection_point_ref)
796 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
797 if cp is None:
798 self._log.debug("Failed to find connection point - %s",
799 ext_intf.vnfd_connection_point_ref)
800 continue
801 self._log.debug("Connection point name [%s], type[%s]",
802 cp.name, cp.type_yang)
803
804 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
805
806 etuple = (ext_intf, cp, vlr)
807 self._ext_intf.append(etuple)
808
809 self._log.debug("Created external interface tuple : %s", etuple)
810
811 # Resolve the networks associated internal interfaces
812 for intf in self._vdud.internal_interface:
813 cp_id = intf.vdu_internal_connection_point_ref
814 self._log.debug("Resolving internal interface name [%s], cp[%s]",
815 intf.name, cp_id)
816
817 try:
818 vlr = find_internal_vlr_by_cp_name(cp_id)
819 except Exception as e:
820 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
821 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
822 raise VduRecordError(msg)
823
824 ituple = (intf, cp_id, vlr)
825 self._int_intf.append(ituple)
826
827 self._log.debug("Created internal interface tuple : %s", ituple)
828
829 resmgr_path = self.resmgr_path
830 resmgr_msg = self.resmgr_msg(config)
831
832 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
833 block.add_query_create(resmgr_path, resmgr_msg)
834
835 res_iter = yield from block.execute(now=True)
836
837 resp = None
838
839 for i in res_iter:
840 r = yield from i
841 resp = r.result
842
843 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
844 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
845 self._log.debug("Got vm request response: %s", resp.resource_info)
846 return resp.resource_info
847
848 @asyncio.coroutine
849 def delete_resource(self, xact):
850 block = xact.block_create()
851
852 self._log.debug("Executing vm request id: %s, action: delete",
853 self._request_id)
854
855 block.add_query_delete(self.resmgr_path)
856
857 yield from block.execute(flags=0, now=True)
858
859 @asyncio.coroutine
860 def read_resource(self, xact):
861 block = xact.block_create()
862
863 self._log.debug("Executing vm request id: %s, action: delete",
864 self._request_id)
865
866 block.add_query_read(self.resmgr_path)
867
868 res_iter = yield from block.execute(flags=0, now=True)
869 for i in res_iter:
870 r = yield from i
871 resp = r.result
872
873 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
874 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
875 self._log.debug("Got vm request response: %s", resp.resource_info)
876 #self._vm_resp = resp.resource_info
877 return resp.resource_info
878
879
880 @asyncio.coroutine
881 def start_component(self):
882 """ This VDUR is active """
883 self._log.debug("Starting component %s for vdud %s vdur %s",
884 self._vdud.vcs_component_ref,
885 self._vdud,
886 self._vdur_id)
887 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
888 self.vm_resp.management_ip)
889
890 @property
891 def active(self):
892 """ Is this VDU active """
893 return True if self._state is VDURecordState.READY else False
894
895 @asyncio.coroutine
896 def instantiation_failed(self, failed_reason=None):
897 """ VDU instantiation failed """
898 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
899 self._state = VDURecordState.FAILED
900 self._state_failed_reason = failed_reason
901 yield from self._vnfr.instantiation_failed(failed_reason)
902
903 @asyncio.coroutine
904 def vdu_is_active(self):
905 """ This VDU is active"""
906 if self.active:
907 self._log.warning("VDU %s was already marked as active", self._vdur_id)
908 return
909
910 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
911
912 if self._vdud.vcs_component_ref is not None:
913 yield from self.start_component()
914
915 self._state = VDURecordState.READY
916
917 if self._vnfr.all_vdus_active():
918 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
919 yield from self._vnfr.is_ready()
920
921 @asyncio.coroutine
922 def instantiate(self, xact, vnfr, config=None):
923 """ Instantiate this VDU """
924 self._state = VDURecordState.INSTANTIATING
925
926 @asyncio.coroutine
927 def on_prepare(xact_info, query_action, ks_path, msg):
928 """ This VDUR is active """
929 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
930 query_action,
931 ks_path,
932 msg)
933
934 if (query_action == rwdts.QueryAction.UPDATE or
935 query_action == rwdts.QueryAction.CREATE):
936 self._vm_resp = msg
937
938 if msg.resource_state == "active":
939 # Move this VDU to ready state
940 yield from self.vdu_is_active()
941 elif msg.resource_state == "failed":
942 yield from self.instantiation_failed(msg.resource_errors)
943 elif query_action == rwdts.QueryAction.DELETE:
944 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
945 else:
946 raise NotImplementedError(
947 "%s action on VirtualDeployementUnitRecord not supported",
948 query_action)
949
950 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
951
952 try:
953 reg_event = asyncio.Event(loop=self._loop)
954
955 @asyncio.coroutine
956 def on_ready(regh, status):
957 reg_event.set()
958
959 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
960 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
961 flags=rwdts.Flag.SUBSCRIBER,
962 handler=handler)
963 yield from reg_event.wait()
964
965 vm_resp = yield from self.create_resource(xact, vnfr, config)
966 self._vm_resp = vm_resp
967 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
968
969 self._log.debug("Requested VM from resource manager response %s",
970 vm_resp)
971 if vm_resp.resource_state == "active":
972 self._log.debug("Resourcemgr responded wih an active vm resp %s",
973 vm_resp)
974 yield from self.vdu_is_active()
975 self._state = VDURecordState.READY
976 elif (vm_resp.resource_state == "pending" or
977 vm_resp.resource_state == "inactive"):
978 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
979 vm_resp)
980 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
981 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
982 # flags=rwdts.Flag.SUBSCRIBER,
983 # handler=handler)
984 else:
985 self._log.debug("Resourcemgr responded wih an error vm resp %s",
986 vm_resp)
987 raise VirtualDeploymentUnitRecordError(
988 "Failed VDUR instantiation %s " % vm_resp)
989
990 except Exception as e:
991 import traceback
992 traceback.print_exc()
993 self._log.exception(e)
994 self._log.error("Instantiation of VDU record failed: %s", str(e))
995 self._state = VDURecordState.FAILED
996 yield from self.instantiation_failed(str(e))
997
998
999 class VlRecordState(enum.Enum):
1000 """ VL Record State """
1001 INIT = 101
1002 INSTANTIATION_PENDING = 102
1003 ACTIVE = 103
1004 TERMINATE_PENDING = 104
1005 TERMINATED = 105
1006 FAILED = 106
1007
1008
1009 class InternalVirtualLinkRecord(object):
1010 """ Internal Virtual Link record """
1011 def __init__(self, dts, log, loop, project,
1012 ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
1013 self._dts = dts
1014 self._log = log
1015 self._loop = loop
1016 self._project = project
1017 self._ivld_msg = ivld_msg
1018 self._vnfr_name = vnfr_name
1019 self._cloud_account_name = cloud_account_name
1020 self._ip_profile = ip_profile
1021
1022 self._vlr_req = self.create_vlr()
1023 self._vlr = None
1024 self._state = VlRecordState.INIT
1025
1026 @property
1027 def vlr_id(self):
1028 """ Find VLR by id """
1029 return self._vlr_req.id
1030
1031 @property
1032 def name(self):
1033 """ Name of this VL """
1034 if self._ivld_msg.vim_network_name:
1035 return self._ivld_msg.vim_network_name
1036 else:
1037 return self._vnfr_name + "." + self._ivld_msg.name
1038
1039 @property
1040 def network_id(self):
1041 """ Find VLR by id """
1042 return self._vlr.network_id if self._vlr else None
1043
1044 def vlr_path(self):
1045 """ VLR path for this VLR instance"""
1046 return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".
1047 format(self.vlr_id))
1048
1049 def create_vlr(self):
1050 """ Create the VLR record which will be instantiated """
1051
1052 vld_fields = ["short_name",
1053 "vendor",
1054 "description",
1055 "version",
1056 "type_yang",
1057 "vim_network_name",
1058 "provider_network"]
1059
1060 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1061
1062 vlr_dict = {"id": str(uuid.uuid4()),
1063 "name": self.name,
1064 "cloud_account": self._cloud_account_name,
1065 }
1066
1067 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1068 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1069
1070 vlr_dict.update(vld_copy_dict)
1071
1072 vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict)
1073 return vlr
1074
1075 @asyncio.coroutine
1076 def instantiate(self, xact, restart_mode=False):
1077 """ Instantiate VL """
1078
1079 @asyncio.coroutine
1080 def instantiate_vlr():
1081 """ Instantiate VLR"""
1082 self._log.debug("Create VL with xpath %s and vlr %s",
1083 self.vlr_path(), self._vlr_req)
1084
1085 with self._dts.transaction(flags=0) as xact:
1086 block = xact.block_create()
1087 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1088 self._log.debug("Executing VL create path:%s msg:%s",
1089 self.vlr_path(), self._vlr_req)
1090
1091 res_iter = None
1092 try:
1093 res_iter = yield from block.execute()
1094 except Exception:
1095 self._state = VlRecordState.FAILED
1096 self._log.exception("Caught exception while instantial VL")
1097 raise
1098
1099 for ent in res_iter:
1100 res = yield from ent
1101 self._vlr = res.result
1102
1103 if self._vlr.operational_status == 'failed':
1104 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1105 self._state = VlRecordState.FAILED
1106 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1107
1108 self._log.info("Created VL with xpath %s and vlr %s",
1109 self.vlr_path(), self._vlr)
1110
1111 @asyncio.coroutine
1112 def get_vlr():
1113 """ Get the network id """
1114 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1115 vlr = None
1116 for ent in res_iter:
1117 res = yield from ent
1118 vlr = res.result
1119
1120 if vlr is None:
1121 err = "Failed to get VLR for path %s" % self.vlr_path()
1122 self._log.warn(err)
1123 raise InternalVirtualLinkRecordError(err)
1124 return vlr
1125
1126 self._state = VlRecordState.INSTANTIATION_PENDING
1127
1128 if restart_mode:
1129 vl = yield from get_vlr()
1130 if vl is None:
1131 yield from instantiate_vlr()
1132 else:
1133 yield from instantiate_vlr()
1134
1135 self._state = VlRecordState.ACTIVE
1136
1137 def vlr_in_vns(self):
1138 """ Is there a VLR record in VNS """
1139 if (self._state == VlRecordState.ACTIVE or
1140 self._state == VlRecordState.INSTANTIATION_PENDING or
1141 self._state == VlRecordState.FAILED):
1142 return True
1143
1144 return False
1145
1146 @asyncio.coroutine
1147 def terminate(self, xact):
1148 """Terminate this VL """
1149 if not self.vlr_in_vns():
1150 self._log.debug("Ignoring terminate request for id %s in state %s",
1151 self.vlr_id, self._state)
1152 return
1153
1154 self._log.debug("Terminating VL with path %s", self.vlr_path())
1155 self._state = VlRecordState.TERMINATE_PENDING
1156 block = xact.block_create()
1157 block.add_query_delete(self.vlr_path())
1158 yield from block.execute(flags=0, now=True)
1159 self._state = VlRecordState.TERMINATED
1160 self._log.debug("Terminated VL with path %s", self.vlr_path())
1161
1162
1163 class VirtualNetworkFunctionRecord(object):
1164 """ Virtual Network Function Record """
1165 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1166 self._dts = dts
1167 self._log = log
1168 self._loop = loop
1169 self._project = vnfm._project
1170 self._cluster_name = cluster_name
1171 self._vnfr_msg = vnfr_msg
1172 self._vnfr_id = vnfr_msg.id
1173 self._vnfd_id = vnfr_msg.vnfd.id
1174 self._vnfm = vnfm
1175 self._vcs_handler = vcs_handler
1176 self._vnfr = vnfr_msg
1177 self._mgmt_network = mgmt_network
1178
1179 self._vnfd = vnfr_msg.vnfd
1180 self._state = VirtualNetworkFunctionRecordState.INIT
1181 self._state_failed_reason = None
1182 self._ext_vlrs = {} # The list of external virtual links
1183 self._vlrs = [] # The list of internal virtual links
1184 self._vdus = [] # The list of vdu
1185 self._vlr_by_cp = {}
1186 self._cprs = []
1187 self._inventory = {}
1188 self._create_time = int(time.time())
1189 self._vnf_mon = None
1190 self._config_status = vnfr_msg.config_status
1191 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1192 self._rw_vnfd = None
1193 self._vnfd_ref_count = 0
1194
1195 def _get_vdur_from_vdu_id(self, vdu_id):
1196 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1197 self._log.debug("Searching through vdus: %s", self._vdus)
1198 for vdu in self._vdus:
1199 self._log.debug("vdu_id: %s", vdu.vdu_id)
1200 if vdu.vdu_id == vdu_id:
1201 return vdu
1202
1203 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1204
1205 @property
1206 def operational_status(self):
1207 """ Operational status of this VNFR """
1208 op_status_map = {"INIT": "init",
1209 "VL_INIT_PHASE": "vl_init_phase",
1210 "VM_INIT_PHASE": "vm_init_phase",
1211 "READY": "running",
1212 "TERMINATE": "terminate",
1213 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1214 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1215 "TERMINATED": "terminated",
1216 "FAILED": "failed", }
1217 return op_status_map[self._state.name]
1218
1219 @staticmethod
1220 def vnfd_xpath(vnfd_id):
1221 """ VNFD xpath associated with this VNFR """
1222 return ("C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".
1223 format(vnfd_id))
1224
1225 @property
1226 def vnfd_ref_count(self):
1227 """ Returns the VNFD reference count associated with this VNFR """
1228 return self._vnfd_ref_count
1229
1230 def vnfd_in_use(self):
1231 """ Returns whether vnfd is in use or not """
1232 return True if self._vnfd_ref_count > 0 else False
1233
1234 def vnfd_ref(self):
1235 """ Take a reference on this object """
1236 self._vnfd_ref_count += 1
1237 return self._vnfd_ref_count
1238
1239 def vnfd_unref(self):
1240 """ Release reference on this object """
1241 if self._vnfd_ref_count < 1:
1242 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1243 (self.vnfd.id, self._vnfd_ref_count))
1244 self._log.critical(msg)
1245 raise VnfRecordError(msg)
1246 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1247 self.vnfd.id, self._vnfd_ref_count)
1248 self._vnfd_ref_count -= 1
1249 return self._vnfd_ref_count
1250
1251 @property
1252 def vnfd(self):
1253 """ VNFD for this VNFR """
1254 return self._vnfd
1255
1256 @property
1257 def vnf_name(self):
1258 """ VNFD name associated with this VNFR """
1259 return self.vnfd.name
1260
1261 @property
1262 def name(self):
1263 """ Name of this VNF in the record """
1264 return self._vnfr.name
1265
1266 @property
1267 def cloud_account_name(self):
1268 """ Name of the cloud account this VNFR is instantiated in """
1269 return self._vnfr.cloud_account
1270
1271 @property
1272 def vnfd_id(self):
1273 """ VNFD Id associated with this VNFR """
1274 return self.vnfd.id
1275
1276 @property
1277 def vnfr_id(self):
1278 """ VNFR Id associated with this VNFR """
1279 return self._vnfr_id
1280
1281 @property
1282 def member_vnf_index(self):
1283 """ Member VNF index associated with this VNFR """
1284 return self._vnfr.member_vnf_index_ref
1285
1286 @property
1287 def config_status(self):
1288 """ Config agent status for this VNFR """
1289 return self._config_status
1290
1291 def component_by_name(self, component_name):
1292 """ Find a component by name in the inventory list"""
1293 mangled_name = VcsComponent.mangle_name(component_name,
1294 self.vnf_name,
1295 self.vnfd_id)
1296 return self._inventory[mangled_name]
1297
1298
1299
1300 @asyncio.coroutine
1301 def get_nsr_config(self):
1302 ### Need access to NS instance configuration for runtime resolution.
1303 ### This shall be replaced when deployment flavors are implemented
1304 xpath = self._project.add_project("C,/nsr:ns-instance-config")
1305 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1306
1307 for result in results:
1308 entry = yield from result
1309 ns_instance_config = entry.result
1310 for nsr in ns_instance_config.nsr:
1311 if nsr.id == self._vnfr_msg.nsr_id_ref:
1312 return nsr
1313 return None
1314
1315 @asyncio.coroutine
1316 def start_component(self, component_name, ip_addr):
1317 """ Start a component in the VNFR by name """
1318 comp = self.component_by_name(component_name)
1319 yield from comp.start(None, None, ip_addr)
1320
1321 def cp_ip_addr(self, cp_name):
1322 """ Get ip address for connection point """
1323 self._log.debug("cp_ip_addr()")
1324 for cp in self._cprs:
1325 if cp.name == cp_name and cp.ip_address is not None:
1326 return cp.ip_address
1327 return "0.0.0.0"
1328
1329 def mgmt_intf_info(self):
1330 """ Get Management interface info for this VNFR """
1331 mgmt_intf_desc = self.vnfd.mgmt_interface
1332 ip_addr = None
1333 if mgmt_intf_desc.has_field("cp"):
1334 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1335 elif mgmt_intf_desc.has_field("vdu_id"):
1336 try:
1337 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1338 ip_addr = vdur.management_ip
1339 except VDURecordNotFound:
1340 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1341 ip_addr = None
1342 else:
1343 ip_addr = mgmt_intf_desc.ip_address
1344 port = mgmt_intf_desc.port
1345
1346 return ip_addr, port
1347
1348 @property
1349 def msg(self):
1350 """ Message associated with this VNFR """
1351 vnfd_fields = ["short_name", "vendor", "description", "version"]
1352 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1353
1354 mgmt_intf = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MgmtInterface()
1355 ip_address, port = self.mgmt_intf_info()
1356
1357 if ip_address is not None:
1358 mgmt_intf.ip_address = ip_address
1359 if port is not None:
1360 mgmt_intf.port = port
1361
1362 vnfr_dict = {"id": self._vnfr_id,
1363 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1364 "name": self.name,
1365 "member_vnf_index_ref": self.member_vnf_index,
1366 "operational_status": self.operational_status,
1367 "operational_status_details": self._state_failed_reason,
1368 "cloud_account": self.cloud_account_name,
1369 "config_status": self._config_status
1370 }
1371
1372 vnfr_dict.update(vnfd_copy_dict)
1373
1374 vnfr_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1375 vnfr_msg.vnfd = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1376
1377 vnfr_msg.create_time = self._create_time
1378 vnfr_msg.uptime = int(time.time()) - self._create_time
1379 vnfr_msg.mgmt_interface = mgmt_intf
1380
1381 # Add all the VLRs to VNFR
1382 for vlr in self._vlrs:
1383 ivlr = vnfr_msg.internal_vlr.add()
1384 ivlr.vlr_ref = vlr.vlr_id
1385
1386 # Add all the VDURs to VDUR
1387 if self._vdus is not None:
1388 for vdu in self._vdus:
1389 vdur = vnfr_msg.vdur.add()
1390 vdur.from_dict(vdu.msg.as_dict())
1391
1392 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1393 vnfr_msg.dashboard_url = self.dashboard_url
1394
1395 for cpr in self._cprs:
1396 new_cp = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1397 vnfr_msg.connection_point.append(new_cp)
1398
1399 if self._vnf_mon is not None:
1400 for monp in self._vnf_mon.msg:
1401 vnfr_msg.monitoring_param.append(
1402 VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1403
1404 if self._vnfr.vnf_configuration is not None:
1405 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1406 if (ip_address is not None and
1407 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1408 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1409
1410 for group in self._vnfr_msg.placement_groups_info:
1411 group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1412 group_info.from_dict(group.as_dict())
1413 vnfr_msg.placement_groups_info.append(group_info)
1414
1415 return vnfr_msg
1416
1417 @property
1418 def dashboard_url(self):
1419 ip, cfg_port = self.mgmt_intf_info()
1420 protocol = 'http'
1421 http_port = 80
1422 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1423 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1424 protocol = 'https'
1425 http_port = 443
1426 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1427 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1428
1429 url = "{protocol}://{ip_address}:{port}/{path}".format(
1430 protocol=protocol,
1431 ip_address=ip,
1432 port=http_port,
1433 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1434 )
1435
1436 return url
1437
1438 @property
1439 def xpath(self):
1440 """ path for this VNFR """
1441 return self._project.add_project("D,/vnfr:vnfr-catalog"
1442 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1443
1444 @asyncio.coroutine
1445 def publish(self, xact):
1446 """ publish this VNFR """
1447 vnfr = self.msg
1448 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1449 self.xpath, self.msg)
1450 vnfr.create_time = self._create_time
1451 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1452 self._log.debug("Published VNFR path = [%s], record = [%s]",
1453 self.xpath, self.msg)
1454
1455 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1456 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1457 if not vld.has_field('ip_profile_ref'):
1458 return None
1459 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1460 return profile[0] if profile else None
1461
1462 @asyncio.coroutine
1463 def create_vls(self):
1464 """ Publish The VLs associated with this VNF """
1465 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1466 self.vnfd_id)
1467 for ivld_msg in self.vnfd.internal_vld:
1468 self._log.debug("Creating internal vld:"
1469 " %s, int_cp_ref = %s",
1470 ivld_msg, ivld_msg.internal_connection_point
1471 )
1472 vlr = InternalVirtualLinkRecord(dts=self._dts,
1473 log=self._log,
1474 loop=self._loop,
1475 ivld_msg=ivld_msg,
1476 vnfr_name=self.name,
1477 cloud_account_name=self.cloud_account_name,
1478 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1479 )
1480 self._vlrs.append(vlr)
1481
1482 for int_cp in ivld_msg.internal_connection_point:
1483 if int_cp.id_ref in self._vlr_by_cp:
1484 msg = ("Connection point %s already "
1485 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1486 raise InternalVirtualLinkRecordError(msg)
1487 self._log.debug("Setting vlr %s to internal cp = %s",
1488 vlr, int_cp.id_ref)
1489 self._vlr_by_cp[int_cp.id_ref] = vlr
1490
1491 @asyncio.coroutine
1492 def instantiate_vls(self, xact, restart_mode=False):
1493 """ Instantiate the VLs associated with this VNF """
1494 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1495 self.vnfd_id)
1496
1497 for vlr in self._vlrs:
1498 self._log.debug("Instantiating VLR %s", vlr)
1499 yield from vlr.instantiate(xact, restart_mode)
1500
1501 def find_vlr_by_cp(self, cp_name):
1502 """ Find the VLR associated with the cp name """
1503 return self._vlr_by_cp[cp_name]
1504
1505 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1506 """
1507 Returns the cloud specific construct for placement group
1508 Arguments:
1509 input_group: VNFD PlacementGroup
1510 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1511 """
1512 copy_dict = ['name', 'requirement', 'strategy']
1513 for group_info in nsr_config.vnfd_placement_group_maps:
1514 if group_info.placement_group_ref == input_group.name and \
1515 group_info.vnfd_id_ref == self.vnfd_id:
1516 group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1517 group_dict = {k:v for k,v in
1518 group_info.as_dict().items()
1519 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1520 for param in copy_dict:
1521 group_dict.update({param: getattr(input_group, param)})
1522 group.from_dict(group_dict)
1523 return group
1524 return None
1525
1526 @asyncio.coroutine
1527 def get_vdu_placement_groups(self, vdu):
1528 placement_groups = []
1529 ### Step-1: Get VNF level placement groups
1530 for group in self._vnfr_msg.placement_groups_info:
1531 #group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1532 #group_info.from_dict(group.as_dict())
1533 placement_groups.append(group)
1534
1535 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1536 nsr_config = yield from self.get_nsr_config()
1537
1538 ### Step-3: Get VDU level placement groups
1539 for group in self.vnfd.placement_groups:
1540 for member_vdu in group.member_vdus:
1541 if member_vdu.member_vdu_ref == vdu.id:
1542 group_info = self.resolve_placement_group_cloud_construct(group,
1543 nsr_config)
1544 if group_info is None:
1545 self._log.info("Could not resolve cloud-construct for " +
1546 "placement group: %s", group.name)
1547 else:
1548 self._log.info("Successfully resolved cloud construct for " +
1549 "placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1550 str(group_info),
1551 vdu.name,
1552 self.vnf_name,
1553 self.member_vnf_index)
1554 placement_groups.append(group_info)
1555
1556 return placement_groups
1557
1558 @asyncio.coroutine
1559 def vdu_cloud_init_instantiation(self):
1560 [vdu.vdud_cloud_init for vdu in self._vdus]
1561
1562 @asyncio.coroutine
1563 def create_vdus(self, vnfr, restart_mode=False):
1564 """ Create the VDUs associated with this VNF """
1565
1566 def get_vdur_id(vdud):
1567 """Get the corresponding VDUR's id for the VDUD. This is useful in
1568 case of a restart.
1569
1570 In restart mode we check for exiting VDUR's ID and use them, if
1571 available. This way we don't end up creating duplicate VDURs
1572 """
1573 vdur_id = None
1574
1575 if restart_mode and vdud is not None:
1576 try:
1577 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1578 vdur_id = vdur[0]
1579 except IndexError:
1580 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1581
1582 return vdur_id
1583
1584
1585 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1586 for vdu in self._rw_vnfd.vdu:
1587 self._log.debug("Creating vdu: %s", vdu)
1588 vdur_id = get_vdur_id(vdu)
1589
1590 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1591 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1592 vdu.name,
1593 self.vnf_name,
1594 self.member_vnf_index,
1595 [ group.name for group in placement_groups])
1596
1597 vdur = VirtualDeploymentUnitRecord(
1598 dts=self._dts,
1599 log=self._log,
1600 loop=self._loop,
1601 project = self._project,
1602 vdud=vdu,
1603 vnfr=vnfr,
1604 mgmt_intf=self.has_mgmt_interface(vdu),
1605 mgmt_network=self._mgmt_network,
1606 cloud_account_name=self.cloud_account_name,
1607 vnfd_package_store=self._vnfd_package_store,
1608 vdur_id=vdur_id,
1609 placement_groups = placement_groups,
1610 )
1611 yield from vdur.vdu_opdata_register()
1612
1613 self._vdus.append(vdur)
1614
1615 @asyncio.coroutine
1616 def instantiate_vdus(self, xact, vnfr):
1617 """ Instantiate the VDUs associated with this VNF """
1618 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1619
1620 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1621
1622 # Identify any dependencies among the VDUs
1623 dependencies = collections.defaultdict(list)
1624 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1625
1626 for vdu in self._vdus:
1627 if vdu._vdud_cloud_init is not None:
1628 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1629 if vdu_id != vdu.vdu_id:
1630 # This means that vdu.vdu_id depends upon vdu_id,
1631 # i.e. vdu_id must be instantiated before
1632 # vdu.vdu_id.
1633 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1634
1635 # Define the terminal states of VDU instantiation
1636 terminal = (
1637 VDURecordState.READY,
1638 VDURecordState.TERMINATED,
1639 VDURecordState.FAILED,
1640 )
1641
1642 datastore = VdurDatastore()
1643 processed = set()
1644
1645 @asyncio.coroutine
1646 def instantiate_monitor(vdu):
1647 """Monitor the state of the VDU during instantiation
1648
1649 Arguments:
1650 vdu - a VirtualDeploymentUnitRecord
1651
1652 """
1653 # wait for the VDUR to enter a terminal state
1654 while vdu._state not in terminal:
1655 yield from asyncio.sleep(1, loop=self._loop)
1656 # update the datastore
1657 datastore.update(vdu)
1658
1659 # add the VDU to the set of processed VDUs
1660 processed.add(vdu.vdu_id)
1661
1662 @asyncio.coroutine
1663 def instantiate(vdu):
1664 """Instantiate the specified VDU
1665
1666 Arguments:
1667 vdu - a VirtualDeploymentUnitRecord
1668
1669 Raises:
1670 if the VDU, or any of the VDUs this VDU depends upon, are
1671 terminated or fail to instantiate properly, a
1672 VirtualDeploymentUnitRecordError is raised.
1673
1674 """
1675 for dependency in dependencies[vdu.vdu_id]:
1676 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1677
1678 while dependency.vdu_id not in processed:
1679 yield from asyncio.sleep(1, loop=self._loop)
1680
1681 if not dependency.active:
1682 raise VirtualDeploymentUnitRecordError()
1683
1684 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1685
1686 # Populate the datastore with the current values of the VDU
1687 datastore.add(vdu)
1688
1689 # Substitute any variables contained in the cloud config script
1690 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1691
1692 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1693 if len(parts) > 1:
1694
1695 # Extract the variable names
1696 variables = list()
1697 for variable in parts[1::2]:
1698 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1699
1700 # Iterate of the variables and substitute values from the
1701 # datastore.
1702 for variable in variables:
1703
1704 # Handle a reference to a VDU by ID
1705 if variable.startswith('vdu['):
1706 value = datastore.get(variable)
1707 if value is None:
1708 msg = "Unable to find a substitute for {} in {} cloud-init script"
1709 raise ValueError(msg.format(variable, vdu.vdu_id))
1710
1711 config = config.replace("{{ %s }}" % variable, value)
1712 continue
1713
1714 # Handle a reference to the current VDU
1715 if variable.startswith('vdu'):
1716 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1717 config = config.replace("{{ %s }}" % variable, value)
1718 continue
1719
1720 # Handle unrecognized variables
1721 msg = 'unrecognized cloud-config variable: {}'
1722 raise ValueError(msg.format(variable))
1723
1724 # Instantiate the VDU
1725 with self._dts.transaction() as xact:
1726 self._log.debug("Instantiating vdu: %s", vdu)
1727 yield from vdu.instantiate(xact, vnfr, config=config)
1728 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1729 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1730 self.vnfr_id, vdu)
1731
1732 # First create a set of tasks to monitor the state of the VDUs and
1733 # report when they have entered a terminal state
1734 for vdu in self._vdus:
1735 self._loop.create_task(instantiate_monitor(vdu))
1736
1737 for vdu in self._vdus:
1738 self._loop.create_task(instantiate(vdu))
1739
1740 def has_mgmt_interface(self, vdu):
1741 # ## TODO: Support additional mgmt_interface type options
1742 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1743 return True
1744 return False
1745
1746 def vlr_xpath(self, vlr_id):
1747 """ vlr xpath """
1748 return self._project.add_project("D,/vlr:vlr-catalog/"
1749 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1750
1751 def ext_vlr_by_id(self, vlr_id):
1752 """ find ext vlr by id """
1753 return self._ext_vlrs[vlr_id]
1754
1755 @asyncio.coroutine
1756 def publish_inventory(self, xact):
1757 """ Publish the inventory associated with this VNF """
1758 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1759
1760 for component in self._rw_vnfd.component:
1761 self._log.debug("Creating inventory component %s", component)
1762 mangled_name = VcsComponent.mangle_name(component.component_name,
1763 self.vnf_name,
1764 self.vnfd_id
1765 )
1766 comp = VcsComponent(dts=self._dts,
1767 log=self._log,
1768 loop=self._loop,
1769 cluster_name=self._cluster_name,
1770 vcs_handler=self._vcs_handler,
1771 component=component,
1772 mangled_name=mangled_name,
1773 )
1774 if comp.name in self._inventory:
1775 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1776 component, self._vnfd_id)
1777 return
1778 self._log.debug("Adding component %s for vnrf %s",
1779 comp.name, self._vnfr_id)
1780 self._inventory[comp.name] = comp
1781 yield from comp.publish(xact)
1782
1783 def all_vdus_active(self):
1784 """ Are all VDUS in this VNFR active? """
1785 for vdu in self._vdus:
1786 if not vdu.active:
1787 return False
1788
1789 self._log.debug("Inside all_vdus_active. Returning True")
1790 return True
1791
1792 @asyncio.coroutine
1793 def instantiation_failed(self, failed_reason=None):
1794 """ VNFR instantiation failed """
1795 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1796 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1797 self._state_failed_reason = failed_reason
1798
1799 # Update the VNFR with the changed status
1800 yield from self.publish(None)
1801
1802 @asyncio.coroutine
1803 def is_ready(self):
1804 """ This VNF is ready"""
1805 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1806
1807 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1808 self.set_state(VirtualNetworkFunctionRecordState.READY)
1809
1810 else:
1811 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1812
1813 # Update the VNFR with the changed status
1814 yield from self.publish(None)
1815
1816 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1817 """Updated the connection point with ip address"""
1818 for cp in self._cprs:
1819 if cp.name == cp_name:
1820 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1821 cp_name, cp, ip_address, cp_id)
1822 cp.ip_address = ip_address
1823 cp.mac_address = mac_addr
1824 cp.connection_point_id = cp_id
1825 return
1826
1827 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1828 self._log.debug(err)
1829 raise VirtualDeploymentUnitRecordError(err)
1830
1831 def set_state(self, state):
1832 """ Set state for this VNFR"""
1833 self._state = state
1834
1835 @asyncio.coroutine
1836 def instantiate(self, xact, restart_mode=False):
1837 """ instantiate this VNF """
1838 self._log.info("Instantiate VNF {}: {}".format(self._vnfr_id, self._state))
1839 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1840 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1841
1842 @asyncio.coroutine
1843 def fetch_vlrs():
1844 """ Fetch VLRs """
1845 # Iterate over all the connection points in VNFR and fetch the
1846 # associated VLRs
1847
1848 def cpr_from_cp(cp):
1849 """ Creates a record level connection point from the desciptor cp"""
1850 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1851 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1852 cpr_dict = {}
1853 cpr_dict.update(cp_copy_dict)
1854 return VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1855
1856 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1857 self._vnfr_id, self._vnfr.connection_point)
1858
1859 for cp in self._vnfr.connection_point:
1860 cpr = cpr_from_cp(cp)
1861 self._cprs.append(cpr)
1862 self._log.debug("Adding Connection point record %s ", cp)
1863
1864 vlr_path = self.vlr_xpath(cp.vlr_ref)
1865 self._log.debug("Fetching VLR with path = %s", vlr_path)
1866 res_iter = yield from self._dts.query_read(vlr_path,
1867 rwdts.XactFlag.MERGE)
1868 for i in res_iter:
1869 r = yield from i
1870 d = r.result
1871 self._ext_vlrs[cp.vlr_ref] = d
1872 cpr.vlr_ref = cp.vlr_ref
1873 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1874
1875 # Increase the VNFD reference count
1876 self.vnfd_ref()
1877
1878 assert self.vnfd
1879
1880 # Fetch External VLRs
1881 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1882 yield from fetch_vlrs()
1883
1884 # Publish inventory
1885 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1886 yield from self.publish_inventory(xact)
1887
1888 # Publish inventory
1889 self._log.debug("Create VLs {}: {}".format(self._vnfr_id, self._state))
1890 yield from self.create_vls()
1891
1892 # publish the VNFR
1893 self._log.debug("Publish VNFR {}: {}".format(self._vnfr_id, self._state))
1894 yield from self.publish(xact)
1895
1896
1897 # instantiate VLs
1898 self._log.debug("Instantiate VLs {}: {}".format(self._vnfr_id, self._state))
1899 try:
1900 yield from self.instantiate_vls(xact, restart_mode)
1901 except Exception as e:
1902 self._log.exception("VL instantiation failed (%s)", str(e))
1903 yield from self.instantiation_failed(str(e))
1904 return
1905
1906 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1907
1908 # instantiate VDUs
1909 self._log.debug("Create VDUs {}: {}".format(self._vnfr_id, self._state))
1910 yield from self.create_vdus(self, restart_mode)
1911
1912 try:
1913 yield from self.vdu_cloud_init_instantiation()
1914 except Exception as e:
1915 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1916 self._state_failed_reason = str(e)
1917 yield from self.publish(xact)
1918
1919 # publish the VNFR
1920 self._log.debug("VNFR {}: Publish VNFR with state {}".
1921 format(self._vnfr_id, self._state))
1922 yield from self.publish(xact)
1923
1924 # instantiate VDUs
1925 # ToDo: Check if this should be prevented during restart
1926 self._log.debug("Instantiate VDUs {}: {}".format(self._vnfr_id, self._state))
1927 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1928
1929 # publish the VNFR
1930 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1931 yield from self.publish(xact)
1932
1933 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1934
1935 # create task updating uptime for this vnfr
1936 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1937 self._loop.create_task(self.vnfr_uptime_update(xact))
1938
1939 @asyncio.coroutine
1940 def terminate(self, xact):
1941 """ Terminate this virtual network function """
1942
1943 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1944
1945 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1946
1947 # stop monitoring
1948 if self._vnf_mon is not None:
1949 self._vnf_mon.stop()
1950 self._vnf_mon.deregister()
1951 self._vnf_mon = None
1952
1953 @asyncio.coroutine
1954 def terminate_vls():
1955 """ Terminate VLs in this VNF """
1956 for vl in self._vlrs:
1957 yield from vl.terminate(xact)
1958
1959 @asyncio.coroutine
1960 def terminate_vdus():
1961 """ Terminate VDUS in this VNF """
1962 for vdu in self._vdus:
1963 yield from vdu.terminate(xact)
1964
1965 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1966 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1967 yield from terminate_vls()
1968
1969 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1970 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1971 yield from terminate_vdus()
1972
1973 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1974 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1975
1976 @asyncio.coroutine
1977 def vnfr_uptime_update(self, xact):
1978 while True:
1979 # Return when vnfr state is FAILED or TERMINATED etc
1980 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1981 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1982 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1983 VirtualNetworkFunctionRecordState.READY]:
1984 return
1985 yield from self.publish(xact)
1986 yield from asyncio.sleep(2, loop=self._loop)
1987
1988
1989
1990 class VnfdDtsHandler(object):
1991 """ DTS handler for VNFD config changes """
1992 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1993
1994 def __init__(self, dts, log, loop, vnfm):
1995 self._dts = dts
1996 self._log = log
1997 self._loop = loop
1998 self._vnfm = vnfm
1999 self._regh = None
2000
2001 @asyncio.coroutine
2002 def regh(self):
2003 """ DTS registration handle """
2004 return self._regh
2005
2006 def deregister(self):
2007 '''De-register from DTS'''
2008 self._log.debug("De-register VNFD DTS handler for project {}".
2009 format(self._project))
2010 if self._regh:
2011 self._regh.deregister()
2012 self._regh = None
2013
2014 @asyncio.coroutine
2015 def register(self):
2016 """ Register for VNFD configuration"""
2017
2018 def on_apply(dts, acg, xact, action, scratch):
2019 """Apply the configuration"""
2020 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2021 xact, action, scratch)
2022
2023 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2024
2025 @asyncio.coroutine
2026 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2027 """ on prepare callback """
2028 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
2029 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
2030 fref = ProtobufC.FieldReference.alloc()
2031 fref.goto_whole_message(msg.to_pbcm())
2032
2033 # Handle deletes in prepare_callback
2034 if fref.is_field_deleted():
2035 # Delete an VNFD record
2036 self._log.debug("Deleting VNFD with id %s", msg.id)
2037 if self._vnfm.vnfd_in_use(msg.id):
2038 self._log.debug("Cannot delete VNFD in use - %s", msg)
2039 err = "Cannot delete a VNFD in use - %s" % msg
2040 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
2041 # Delete a VNFD record
2042 yield from self._vnfm.delete_vnfd(msg.id)
2043
2044 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2045
2046 xpath = self._vnfm._project.add_project(VnfdDtsHandler.XPATH)
2047 self._log.debug("Registering for VNFD config using xpath: {}".
2048 format(xpath))
2049
2050 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2051 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2052 self._regh = acg.register(
2053 xpath=xpath,
2054 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2055 on_prepare=on_prepare)
2056
2057
2058 class VcsComponentDtsHandler(object):
2059 """ Vcs Component DTS handler """
2060 XPATH = ("D,/rw-manifest:manifest" +
2061 "/rw-manifest:operational-inventory" +
2062 "/rw-manifest:component")
2063
2064 def __init__(self, dts, log, loop, vnfm):
2065 self._dts = dts
2066 self._log = log
2067 self._loop = loop
2068 self._regh = None
2069 self._vnfm = vnfm
2070
2071 @property
2072 def regh(self):
2073 """ DTS registration handle """
2074 return self._regh
2075
2076 def deregister(self):
2077 '''De-register from DTS'''
2078 self._log.debug("De-register VCS DTS handler for project {}".
2079 format(self._project))
2080 if self._regh:
2081 self._regh.deregister()
2082 self._regh = None
2083
2084 @asyncio.coroutine
2085 def register(self):
2086 """ Registers VCS component dts publisher registration"""
2087 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2088 VcsComponentDtsHandler.XPATH)
2089
2090 hdl = rift.tasklets.DTS.RegistrationHandler()
2091 handlers = rift.tasklets.Group.Handler()
2092 with self._dts.group_create(handler=handlers) as group:
2093 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2094 handler=hdl,
2095 flags=(rwdts.Flag.PUBLISHER |
2096 rwdts.Flag.NO_PREP_READ |
2097 rwdts.Flag.DATASTORE),)
2098
2099 @asyncio.coroutine
2100 def publish(self, xact, path, msg):
2101 """ Publishes the VCS component """
2102 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2103 xact, path, msg)
2104 self.regh.create_element(path, msg)
2105 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2106 VcsComponentDtsHandler.XPATH, xact, path, msg)
2107
2108 class VnfrConsoleOperdataDtsHandler(object):
2109 """
2110 Registers 'D,/rw-project:project/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]'
2111 and handles CRUD from DTS
2112 """
2113
2114 @property
2115 def vnfr_vdu_console_xpath(self):
2116 """ path for resource-mgr"""
2117 return self._project.add_project("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']" +
2118 "/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2119
2120 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2121 self._dts = dts
2122 self._log = log
2123 self._loop = loop
2124 self._regh = None
2125 self._vnfm = vnfm
2126
2127 self._vnfr_id = vnfr_id
2128 self._vdur_id = vdur_id
2129 self._vdu_id = vdu_id
2130
2131 self._project = vnfm._project
2132
2133 def deregister(self):
2134 '''De-register from DTS'''
2135 self._log.debug("De-register VNFR console DTS handler for project {}".
2136 format(self._project))
2137 if self._regh:
2138 self._regh.deregister()
2139 self._regh = None
2140
2141 @asyncio.coroutine
2142 def register(self):
2143 """ Register for VNFR VDU Operational Data read from dts """
2144
2145 @asyncio.coroutine
2146 def on_prepare(xact_info, action, ks_path, msg):
2147 """ prepare callback from dts """
2148 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2149 self._log.debug(
2150 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2151 xact_info, action, xpath, msg
2152 )
2153
2154 if action == rwdts.QueryAction.READ:
2155 schema = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur.schema()
2156 path_entry = schema.keyspec_to_entry(ks_path)
2157 self._log.debug("VDU Opdata path is {}".format(path_entry))
2158 try:
2159 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2160 except VnfRecordError as e:
2161 self._log.error("VNFR id %s not found", self._vnfr_id)
2162 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2163 return
2164 try:
2165 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2166 if not vdur._state == VDURecordState.READY:
2167 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2168 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2169 return
2170 with self._dts.transaction() as new_xact:
2171 resp = yield from vdur.read_resource(new_xact)
2172 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2173 vdur_console.id = self._vdur_id
2174 if resp.console_url:
2175 vdur_console.console_url = resp.console_url
2176 else:
2177 vdur_console.console_url = 'none'
2178 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2179 except Exception:
2180 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2181 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2182 vdur_console.id = self._vdur_id
2183 vdur_console.console_url = 'none'
2184
2185 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2186 xpath=self.vnfr_vdu_console_xpath,
2187 msg=vdur_console)
2188 else:
2189 #raise VnfRecordError("Not supported operation %s" % action)
2190 self._log.error("Not supported operation %s" % action)
2191 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2192 return
2193
2194
2195 self._log.debug("Registering for VNFR VDU using xpath: %s",
2196 self.vnfr_vdu_console_xpath)
2197 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2198 with self._dts.group_create() as group:
2199 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2200 handler=hdl,
2201 flags=rwdts.Flag.PUBLISHER,
2202 )
2203
2204
2205 class VnfrDtsHandler(object):
2206 """ registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2207 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2208
2209 def __init__(self, dts, log, loop, vnfm):
2210 self._dts = dts
2211 self._log = log
2212 self._loop = loop
2213 self._vnfm = vnfm
2214
2215 self._regh = None
2216 self._project = vnfm._project
2217
2218 @property
2219 def regh(self):
2220 """ Return registration handle"""
2221 return self._regh
2222
2223 @property
2224 def vnfm(self):
2225 """ Return VNF manager instance """
2226 return self._vnfm
2227
2228 def deregister(self):
2229 '''De-register from DTS'''
2230 self._log.debug("De-register VNFR DTS handler for project {}".
2231 format(self._project))
2232 if self._regh:
2233 self._regh.deregister()
2234 self._regh = None
2235
2236 @asyncio.coroutine
2237 def register(self):
2238 """ Register for vnfr create/update/delete/read requests from dts """
2239 def on_commit(xact_info):
2240 """ The transaction has been committed """
2241 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2242 return rwdts.MemberRspCode.ACTION_OK
2243
2244 def on_abort(*args):
2245 """ Abort callback """
2246 self._log.debug("VNF transaction got aborted")
2247
2248 @asyncio.coroutine
2249 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2250
2251 @asyncio.coroutine
2252 def instantiate_realloc_vnfr(vnfr):
2253 """Re-populate the vnfm after restart
2254
2255 Arguments:
2256 vlink
2257
2258 """
2259
2260 yield from vnfr.instantiate(None, restart_mode=True)
2261
2262 if xact_event == rwdts.MemberEvent.INSTALL:
2263 curr_cfg = self.regh.elements
2264 for cfg in curr_cfg:
2265 vnfr = self.vnfm.create_vnfr(cfg)
2266 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2267
2268 self._log.debug("Got on_event in vnfm")
2269
2270 return rwdts.MemberRspCode.ACTION_OK
2271
2272 @asyncio.coroutine
2273 def on_prepare(xact_info, action, ks_path, msg):
2274 """ prepare callback from dts """
2275 self._log.debug(
2276 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2277 xact_info, action, msg
2278 )
2279
2280 if action == rwdts.QueryAction.CREATE:
2281 if not msg.has_field("vnfd"):
2282 err = "Vnfd not provided"
2283 self._log.error(err)
2284 raise VnfRecordError(err)
2285
2286 vnfr = self.vnfm.create_vnfr(msg)
2287 try:
2288 # RIFT-9105: Unable to add a READ query under an existing transaction
2289 # xact = xact_info.xact
2290 yield from vnfr.instantiate(None)
2291 except Exception as e:
2292 self._log.exception(e)
2293 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2294 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2295 yield from vnfr.publish(None)
2296 elif action == rwdts.QueryAction.DELETE:
2297 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2298 path_entry = schema.keyspec_to_entry(ks_path)
2299 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2300
2301 if vnfr is None:
2302 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2303 raise VirtualNetworkFunctionRecordNotFound(
2304 "VNFR id %s", path_entry.key00.id)
2305
2306 try:
2307 yield from vnfr.terminate(xact_info.xact)
2308 # Unref the VNFD
2309 vnfr.vnfd_unref()
2310 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2311 except Exception as e:
2312 self._log.exception(e)
2313 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2314
2315 elif action == rwdts.QueryAction.UPDATE:
2316 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2317 path_entry = schema.keyspec_to_entry(ks_path)
2318 vnfr = None
2319 try:
2320 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2321 except Exception as e:
2322 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2323 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2324 return
2325
2326 if vnfr is None:
2327 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2328 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2329 return
2330
2331 self._log.debug("VNFR {} update config status {} (current {})".
2332 format(vnfr.name, msg.config_status, vnfr.config_status))
2333 # Update the config status and publish
2334 vnfr._config_status = msg.config_status
2335 yield from vnfr.publish(None)
2336
2337 else:
2338 raise NotImplementedError(
2339 "%s action on VirtualNetworkFunctionRecord not supported",
2340 action)
2341
2342 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2343
2344 xpath = self._project.add_project(VnfrDtsHandler.XPATH)
2345 self._log.debug("Registering for VNFR using xpath: {}".
2346 format(xpath))
2347
2348 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2349 on_prepare=on_prepare,)
2350 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2351 with self._dts.group_create(handler=handlers) as group:
2352 self._regh = group.register(xpath=xpath,
2353 handler=hdl,
2354 flags=(rwdts.Flag.PUBLISHER |
2355 rwdts.Flag.NO_PREP_READ |
2356 rwdts.Flag.CACHE |
2357 rwdts.Flag.DATASTORE),)
2358
2359 @asyncio.coroutine
2360 def create(self, xact, xpath, msg):
2361 """
2362 Create a VNFR record in DTS with path and message
2363 """
2364 path = self._project.add_project(xpath)
2365 self._log.debug("Creating VNFR xact = %s, %s:%s",
2366 xact, path, msg)
2367
2368 self.regh.create_element(path, msg)
2369 self._log.debug("Created VNFR xact = %s, %s:%s",
2370 xact, path, msg)
2371
2372 @asyncio.coroutine
2373 def update(self, xact, xpath, msg):
2374 """
2375 Update a VNFR record in DTS with path and message
2376 """
2377 path = self._project.add_project(xpath)
2378 self._log.debug("Updating VNFR xact = %s, %s:%s",
2379 xact, path, msg)
2380 self.regh.update_element(path, msg)
2381 self._log.debug("Updated VNFR xact = %s, %s:%s",
2382 xact, path, msg)
2383
2384 @asyncio.coroutine
2385 def delete(self, xact, xpath):
2386 """
2387 Delete a VNFR record in DTS with path and message
2388 """
2389 path = self._project.add_project(xpath)
2390 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2391 self.regh.delete_element(path)
2392 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2393
2394
2395 class VnfdRefCountDtsHandler(object):
2396 """ The VNFD Ref Count DTS handler """
2397 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2398
2399 def __init__(self, dts, log, loop, vnfm):
2400 self._dts = dts
2401 self._log = log
2402 self._loop = loop
2403 self._vnfm = vnfm
2404
2405 self._regh = None
2406
2407 @property
2408 def regh(self):
2409 """ Return registration handle """
2410 return self._regh
2411
2412 @property
2413 def vnfm(self):
2414 """ Return the NS manager instance """
2415 return self._vnfm
2416
2417 def deregister(self):
2418 '''De-register from DTS'''
2419 self._log.debug("De-register VNFD Ref DTS handler for project {}".
2420 format(self._project))
2421 if self._regh:
2422 self._regh.deregister()
2423 self._regh = None
2424
2425 @asyncio.coroutine
2426 def register(self):
2427 """ Register for VNFD ref count read from dts """
2428
2429 @asyncio.coroutine
2430 def on_prepare(xact_info, action, ks_path, msg):
2431 """ prepare callback from dts """
2432 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2433 self._log.debug(
2434 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2435 xact_info, action, xpath, msg
2436 )
2437
2438 if action == rwdts.QueryAction.READ:
2439 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount.schema()
2440 path_entry = schema.keyspec_to_entry(ks_path)
2441 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2442 for xpath, msg in vnfd_list:
2443 self._log.debug("Responding to ref count query path:%s, msg:%s",
2444 xpath, msg)
2445 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2446 xpath=xpath,
2447 msg=msg)
2448 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2449 else:
2450 raise VnfRecordError("Not supported operation %s" % action)
2451
2452 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2453 with self._dts.group_create() as group:
2454 self._regh = group.register(xpath=self._vnfm._project.add_project(
2455 VnfdRefCountDtsHandler.XPATH),
2456 handler=hdl,
2457 flags=rwdts.Flag.PUBLISHER,
2458 )
2459
2460
2461 class VdurDatastore(object):
2462 """
2463 This VdurDatastore is intended to expose select information about a VDUR
2464 such that it can be referenced in a cloud config file. The data that is
2465 exposed does not necessarily follow the structure of the data in the yang
2466 model. This is intentional. The data that are exposed are intended to be
2467 agnostic of the yang model so that changes in the model do not necessarily
2468 require changes to the interface provided to the user. It also means that
2469 the user does not need to be familiar with the RIFT.ware yang models.
2470 """
2471
2472 def __init__(self):
2473 """Create an instance of VdurDatastore"""
2474 self._vdur_data = dict()
2475 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2476
2477 def add(self, vdur):
2478 """Add a new VDUR to the datastore
2479
2480 Arguments:
2481 vdur - a VirtualDeploymentUnitRecord instance
2482
2483 Raises:
2484 A ValueError is raised if the VDUR is (1) None or (2) already in
2485 the datastore.
2486
2487 """
2488 if vdur.vdu_id is None:
2489 raise ValueError('VDURs are required to have an ID')
2490
2491 if vdur.vdu_id in self._vdur_data:
2492 raise ValueError('cannot add a VDUR more than once')
2493
2494 self._vdur_data[vdur.vdu_id] = dict()
2495
2496 def set_if_not_none(key, attr):
2497 if attr is not None:
2498 self._vdur_data[vdur.vdu_id][key] = attr
2499
2500 set_if_not_none('name', vdur._vdud.name)
2501 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2502
2503 def update(self, vdur):
2504 """Update the VDUR information in the datastore
2505
2506 Arguments:
2507 vdur - a GI representation of a VDUR
2508
2509 Raises:
2510 A ValueError is raised if the VDUR is (1) None or (2) already in
2511 the datastore.
2512
2513 """
2514 if vdur.vdu_id is None:
2515 raise ValueError('VNFDs are required to have an ID')
2516
2517 if vdur.vdu_id not in self._vdur_data:
2518 raise ValueError('VNF is not recognized')
2519
2520 def set_or_delete(key, attr):
2521 if attr is None:
2522 if key in self._vdur_data[vdur.vdu_id]:
2523 del self._vdur_data[vdur.vdu_id][key]
2524
2525 else:
2526 self._vdur_data[vdur.vdu_id][key] = attr
2527
2528 set_or_delete('name', vdur._vdud.name)
2529 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2530
2531 def remove(self, vdur_id):
2532 """Remove all of the data associated with specified VDUR
2533
2534 Arguments:
2535 vdur_id - the identifier of a VNFD in the datastore
2536
2537 Raises:
2538 A ValueError is raised if the VDUR is not contained in the
2539 datastore.
2540
2541 """
2542 if vdur_id not in self._vdur_data:
2543 raise ValueError('VNF is not recognized')
2544
2545 del self._vdur_data[vdur_id]
2546
2547 def get(self, expr):
2548 """Retrieve VDUR information from the datastore
2549
2550 An expression should be of the form,
2551
2552 vdu[<id>].<attr>
2553
2554 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2555 the exposed attribute that the user wishes to retrieve.
2556
2557 If the requested data is not available, None is returned.
2558
2559 Arguments:
2560 expr - a string that specifies the data to return
2561
2562 Raises:
2563 A ValueError is raised if the provided expression cannot be parsed.
2564
2565 Returns:
2566 The requested data or None
2567
2568 """
2569 result = self._pattern.match(expr)
2570 if result is None:
2571 raise ValueError('data expression not recognized ({})'.format(expr))
2572
2573 vdur_id, key = result.groups()
2574
2575 if vdur_id not in self._vdur_data:
2576 return None
2577
2578 return self._vdur_data[vdur_id].get(key, None)
2579
2580
2581 class VnfManager(object):
2582 """ The virtual network function manager class """
2583 def __init__(self, dts, log, loop, project, cluster_name):
2584 self._dts = dts
2585 self._log = log
2586 self._loop = loop
2587 self._project = project
2588 self._cluster_name = cluster_name
2589
2590 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2591 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2592 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2593 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(
2594 log, dts, loop, project, callback=self.handle_nsr)
2595
2596 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2597 self._vnfr_handler,
2598 self._vcs_handler,
2599 self._vnfr_ref_handler,
2600 self._nsr_handler]
2601 self._vnfrs = {}
2602 self._vnfds_to_vnfr = {}
2603 self._nsrs = {}
2604
2605 @property
2606 def vnfr_handler(self):
2607 """ VNFR dts handler """
2608 return self._vnfr_handler
2609
2610 @property
2611 def vcs_handler(self):
2612 """ VCS dts handler """
2613 return self._vcs_handler
2614
2615 @asyncio.coroutine
2616 def register(self):
2617 """ Register all static DTS handlers """
2618 for hdl in self._dts_handlers:
2619 yield from hdl.register()
2620
2621 def deregister(self):
2622 self.log.debug("De-register VNFM project {}".format(self.name))
2623 for hdl in self._dts_handlers:
2624 yield from hdl.deregister()
2625
2626 @asyncio.coroutine
2627 def run(self):
2628 """ Run this VNFM instance """
2629 self._log.debug("Run VNFManager - registering static DTS handlers""")
2630 yield from self.register()
2631
2632 def handle_nsr(self, nsr, action):
2633 if action in [rwdts.QueryAction.CREATE]:
2634 self._nsrs[nsr.id] = nsr
2635 elif action == rwdts.QueryAction.DELETE:
2636 if nsr.id in self._nsrs:
2637 del self._nsrs[nsr.id]
2638
2639 def get_linked_mgmt_network(self, vnfr):
2640 """For the given VNFR get the related mgmt network from the NSD, if
2641 available.
2642 """
2643 vnfd_id = vnfr.vnfd.id
2644 nsr_id = vnfr.nsr_id_ref
2645
2646 # for the given related VNFR, get the corresponding NSR-config
2647 nsr_obj = None
2648 try:
2649 nsr_obj = self._nsrs[nsr_id]
2650 except KeyError:
2651 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2652
2653 # for the related NSD check if a VLD exists such that it's a mgmt
2654 # network
2655 for vld in nsr_obj.nsd.vld:
2656 if vld.mgmt_network:
2657 return vld.name
2658
2659 return None
2660
2661 def get_vnfr(self, vnfr_id):
2662 """ get VNFR by vnfr id """
2663
2664 if vnfr_id not in self._vnfrs:
2665 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2666
2667 return self._vnfrs[vnfr_id]
2668
2669 def create_vnfr(self, vnfr):
2670 """ Create a VNFR instance """
2671 if vnfr.id in self._vnfrs:
2672 msg = "Vnfr id %s already exists" % vnfr.id
2673 self._log.error(msg)
2674 raise VnfRecordError(msg)
2675
2676 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2677 vnfr.id,
2678 vnfr.vnfd.id)
2679
2680 mgmt_network = self.get_linked_mgmt_network(vnfr)
2681
2682 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2683 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2684 mgmt_network=mgmt_network
2685 )
2686
2687 #Update ref count
2688 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2689 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2690 else:
2691 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2692
2693 return self._vnfrs[vnfr.id]
2694
2695 @asyncio.coroutine
2696 def delete_vnfr(self, xact, vnfr):
2697 """ Create a VNFR instance """
2698 if vnfr.vnfr_id in self._vnfrs:
2699 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2700 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2701
2702 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2703 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2704 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2705
2706 del self._vnfrs[vnfr.vnfr_id]
2707
2708 @asyncio.coroutine
2709 def fetch_vnfd(self, vnfd_id):
2710 """ Fetch VNFDs based with the vnfd id"""
2711 vnfd_path = self._project.add_project(
2712 VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id))
2713 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2714 vnfd = None
2715
2716 res_iter = yield from self._dts.query_read(vnfd_path,
2717 rwdts.XactFlag.MERGE)
2718
2719 for ent in res_iter:
2720 res = yield from ent
2721 vnfd = res.result
2722
2723 if vnfd is None:
2724 err = "Failed to get Vnfd %s" % vnfd_id
2725 self._log.error(err)
2726 raise VnfRecordError(err)
2727
2728 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2729
2730 return vnfd
2731
2732 def vnfd_in_use(self, vnfd_id):
2733 """ Is this VNFD in use """
2734 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2735 if vnfd_id in self._vnfds_to_vnfr:
2736 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2737 return False
2738
2739 @asyncio.coroutine
2740 def publish_vnfr(self, xact, path, msg):
2741 """ Publish a VNFR """
2742 self._log.debug("publish_vnfr called with path %s, msg %s",
2743 path, msg)
2744 yield from self.vnfr_handler.update(xact, path, msg)
2745
2746 @asyncio.coroutine
2747 def delete_vnfd(self, vnfd_id):
2748 """ Delete the Virtual Network Function descriptor with the passed id """
2749 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2750 if vnfd_id in self._vnfds_to_vnfr:
2751 if self._vnfds_to_vnfr[vnfd_id]:
2752 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2753 vnfd_id,
2754 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2755 raise VirtualNetworkFunctionDescriptorRefCountExists(
2756 "Cannot delete :%s, ref_count:%s",
2757 vnfd_id,
2758 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2759
2760 del self._vnfds_to_vnfr[vnfd_id]
2761
2762 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2763 try:
2764 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2765 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2766 if os.path.exists(vnfd_dir):
2767 shutil.rmtree(vnfd_dir, ignore_errors=True)
2768 except Exception as e:
2769 self._log.error("Exception in cleaning up VNFD {}: {}".
2770 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2771 self._log.exception(e)
2772
2773
2774 def vnfd_refcount_xpath(self, vnfd_id):
2775 """ xpath for ref count entry """
2776 return self._project.add_project(VnfdRefCountDtsHandler.XPATH +
2777 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2778
2779 @asyncio.coroutine
2780 def get_vnfd_refcount(self, vnfd_id):
2781 """ Get the vnfd_list from this VNFM"""
2782 vnfd_list = []
2783 if vnfd_id is None or vnfd_id == "":
2784 for vnfd in self._vnfds_to_vnfr.keys():
2785 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2786 vnfd_msg.vnfd_id_ref = vnfd
2787 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2788 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2789 elif vnfd_id in self._vnfds_to_vnfr:
2790 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2791 vnfd_msg.vnfd_id_ref = vnfd_id
2792 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2793 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2794
2795 return vnfd_list
2796
2797
2798 class VnfmProject(ManoProject):
2799
2800 def __init__(self, name, tasklet, **kw):
2801 super(VnfmProject, self).__init__(tasklet.log, name)
2802 self.update(tasklet)
2803
2804 self._vnfm = None
2805
2806 @asyncio.coroutine
2807 def register (self):
2808 try:
2809 vm_parent_name = self._tasklet.tasklet_info.get_parent_vm_parent_instance_name()
2810 assert vm_parent_name is not None
2811 self._vnfm = VnfManager(self._dts, self.log, self.loop, self, vm_parent_name)
2812 yield from self._vnfm.run()
2813 except Exception:
2814 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2815 raise
2816
2817 def deregister(self):
2818 self._log.debug("De-register project {} for VnfmProject".
2819 format(self.name))
2820 self._vnfm.deregister()
2821
2822
2823 class VnfmTasklet(rift.tasklets.Tasklet):
2824 """ VNF Manager tasklet class """
2825 def __init__(self, *args, **kwargs):
2826 super(VnfmTasklet, self).__init__(*args, **kwargs)
2827 self.rwlog.set_category("rw-mano-log")
2828 self.rwlog.set_subcategory("vnfm")
2829
2830 self._dts = None
2831 self._project_handler = None
2832 self.projects = {}
2833
2834 @property
2835 def dts(self):
2836 return self._dts
2837
2838 def start(self):
2839 try:
2840 super(VnfmTasklet, self).start()
2841 self.log.info("Starting VnfmTasklet")
2842
2843 self.log.setLevel(logging.DEBUG)
2844
2845 self.log.debug("Registering with dts")
2846 self._dts = rift.tasklets.DTS(self.tasklet_info,
2847 RwVnfmYang.get_schema(),
2848 self.loop,
2849 self.on_dts_state_change)
2850
2851 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2852 except Exception:
2853 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2854 raise
2855
2856 def on_instance_started(self):
2857 """ Task insance started callback """
2858 self.log.debug("Got instance started callback")
2859
2860 def stop(self):
2861 try:
2862 self._dts.deinit()
2863 except Exception:
2864 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2865 raise
2866
2867 @asyncio.coroutine
2868 def init(self):
2869 """ Task init callback """
2870 self.log.debug("creating project handler")
2871 self.project_handler = ProjectHandler(self, VnfmProject)
2872 self.project_handler.register()
2873
2874 @asyncio.coroutine
2875 def run(self):
2876 """ Task run callback """
2877 pass
2878
2879 @asyncio.coroutine
2880 def on_dts_state_change(self, state):
2881 """Take action according to current dts state to transition
2882 application into the corresponding application state
2883
2884 Arguments
2885 state - current dts state
2886 """
2887 switch = {
2888 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2889 rwdts.State.CONFIG: rwdts.State.RUN,
2890 }
2891
2892 handlers = {
2893 rwdts.State.INIT: self.init,
2894 rwdts.State.RUN: self.run,
2895 }
2896
2897 # Transition application to next state
2898 handler = handlers.get(state, None)
2899 if handler is not None:
2900 yield from handler()
2901
2902 # Transition dts to next state
2903 next_state = switch.get(state, None)
2904 if next_state is not None:
2905 self._dts.handle.set_state(next_state)