First set of OSM model changes for multi-disk/config-files/etc.
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54
55
56 class VMResourceError(Exception):
57 """ VM resource Error"""
58 pass
59
60
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
63 pass
64
65
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
68 pass
69
70
71 class NotImplemented(Exception):
72 """Not implemented """
73 pass
74
75
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
78 pass
79
80
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
83 pass
84
85
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
88 pass
89
90
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
93 pass
94
95
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
98 pass
99
100
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
103 pass
104
105
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
108 pass
109
110
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
113 pass
114
115
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
118 pass
119
120
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
123 pass
124
125
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
128 pass
129
130
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
133 pass
134
135
136 class VNFMPlacementGroupError(Exception):
137 pass
138
139 class VirtualNetworkFunctionRecordState(enum.Enum):
140 """ VNFR state """
141 INIT = 1
142 VL_INIT_PHASE = 2
143 VM_INIT_PHASE = 3
144 READY = 4
145 TERMINATE = 5
146 VL_TERMINATE_PHASE = 6
147 VDU_TERMINATE_PHASE = 7
148 TERMINATED = 7
149 FAILED = 10
150
151
152 class VDURecordState(enum.Enum):
153 """VDU record state """
154 INIT = 1
155 INSTANTIATING = 2
156 RESOURCE_ALLOC_PENDING = 3
157 READY = 4
158 TERMINATING = 5
159 TERMINATED = 6
160 FAILED = 10
161
162
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
166 self._dts = dts
167 self._log = log
168 self._loop = loop
169 self._component = component
170 self._cluster_name = cluster_name
171 self._vcs_handler = vcs_handler
172 self._mangled_name = mangled_name
173
174 @staticmethod
175 def mangle_name(component_name, vnf_name, vnfd_id):
176 """ mangled component name """
177 return vnf_name + ":" + component_name + ":" + vnfd_id
178
179 @property
180 def name(self):
181 """ name of this component"""
182 return self._mangled_name
183
184 @property
185 def path(self):
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self.name)
191
192 @property
193 def instance_xpath(self):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
196 "/instances" +
197 "/instance" +
198 "[instance-name = '{}']".format(self._cluster_name))
199
200 @property
201 def start_comp_xpath(self):
202 """ start component xpath """
203 return (self.instance_xpath +
204 "/child-n[instance-name = 'START-REQ']")
205
206 def get_start_comp_msg(self, ip_address):
207 """ start this component """
208 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
209 start_msg.instance_name = 'START-REQ'
210 start_msg.component_name = self.name
211 start_msg.admin_command = "START"
212 start_msg.ip_address = ip_address
213
214 return start_msg
215
216 @property
217 def msg(self):
218 """ Returns the message for this vcs component"""
219
220 vcs_comp_dict = self._component.as_dict()
221
222 def mangle_comp_names(comp_dict):
223 """ mangle component name with VNF name, id"""
224 for key, val in comp_dict.items():
225 if isinstance(val, dict):
226 comp_dict[key] = mangle_comp_names(val)
227 elif isinstance(val, list):
228 i = 0
229 for ent in val:
230 if isinstance(ent, dict):
231 val[i] = mangle_comp_names(ent)
232 else:
233 val[i] = ent
234 i += 1
235 elif key == "component_name":
236 comp_dict[key] = VcsComponent.mangle_name(val,
237 self._vnfd_name,
238 self._vnfd_id)
239 return comp_dict
240
241 mangled_dict = mangle_comp_names(vcs_comp_dict)
242 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
243 return msg
244
245 @asyncio.coroutine
246 def publish(self, xact):
247 """ Publishes the VCS component """
248 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self.name, self.path, self.msg)
250 yield from self._vcs_handler.publish(xact, self.path, self.msg)
251
252 @asyncio.coroutine
253 def start(self, xact, parent, ip_addr=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg = self.get_start_comp_msg(ip_addr)
257 self._log.debug("starting component %s %s",
258 self.start_comp_xpath, start_msg)
259 yield from self._dts.query_create(self.start_comp_xpath,
260 0,
261 start_msg)
262 self._log.debug("started component %s, %s",
263 self.start_comp_xpath, start_msg)
264
265
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
268 def __init__(self,
269 dts,
270 log,
271 loop,
272 vdud,
273 vnfr,
274 mgmt_intf,
275 mgmt_network,
276 cloud_account_name,
277 vnfd_package_store,
278 vdur_id=None,
279 placement_groups=[]):
280 self._dts = dts
281 self._log = log
282 self._loop = loop
283 self._vdud = vdud
284 self._vnfr = vnfr
285 self._mgmt_intf = mgmt_intf
286 self._cloud_account_name = cloud_account_name
287 self._vnfd_package_store = vnfd_package_store
288 self._mgmt_network = mgmt_network
289
290 self._vdur_id = vdur_id or str(uuid.uuid4())
291 self._int_intf = []
292 self._ext_intf = []
293 self._state = VDURecordState.INIT
294 self._state_failed_reason = None
295 self._request_id = str(uuid.uuid4())
296 self._name = vnfr.name + "__" + vdud.id
297 self._placement_groups = placement_groups
298 self._rm_regh = None
299 self._vm_resp = None
300 self._vdud_cloud_init = None
301 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
302
303 @asyncio.coroutine
304 def vdu_opdata_register(self):
305 yield from self._vdur_console_handler.register()
306
307 def cp_ip_addr(self, cp_name):
308 """ Find ip address by connection point name """
309 if self._vm_resp is not None:
310 for conn_point in self._vm_resp.connection_points:
311 if conn_point.name == cp_name:
312 return conn_point.ip_address
313 return "0.0.0.0"
314
315 def cp_mac_addr(self, cp_name):
316 """ Find mac address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.mac_addr
321 return "00:00:00:00:00:00"
322
323 def cp_id(self, cp_name):
324 """ Find connection point id by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.connection_point_id
329 return ''
330
331 @property
332 def vdu_id(self):
333 return self._vdud.id
334
335 @property
336 def vm_resp(self):
337 return self._vm_resp
338
339 @property
340 def name(self):
341 """ Return this VDUR's name """
342 return self._name
343
344 @property
345 def cloud_account_name(self):
346 """ Cloud account this VDU should be created in """
347 return self._cloud_account_name
348
349 @property
350 def image_name(self):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self._vdud:
353 return None
354 return os.path.basename(self._vdud.image)
355
356 @property
357 def image_checksum(self):
358 """ name that should be used to lookup the image on the CMP """
359 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
360
361 @property
362 def management_ip(self):
363 if not self.active:
364 return None
365 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
366
367 @property
368 def vm_management_ip(self):
369 if not self.active:
370 return None
371 return self._vm_resp.management_ip
372
373 @property
374 def operational_status(self):
375 """ Operational status of this VDU"""
376 op_stats_dict = {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
379 "READY": "running",
380 "FAILED": "failed",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
383 }
384 return op_stats_dict[self._state.name]
385
386 @property
387 def msg(self):
388 """ Process VDU message from resmgr"""
389 vdu_fields = ["vm_flavor",
390 "guest_epa",
391 "vswitch_epa",
392 "hypervisor_epa",
393 "host_epa",
394 "volumes",
395 "name"]
396 vdu_copy_dict = {k: v for k, v in
397 self._vdud.as_dict().items() if k in vdu_fields}
398 vdur_dict = {"id": self._vdur_id,
399 "vdu_id_ref": self._vdud.id,
400 "operational_status": self.operational_status,
401 "operational_status_details": self._state_failed_reason,
402 }
403 if self.vm_resp is not None:
404 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
405 "flavor_id": self.vm_resp.flavor_id
406 })
407 if self._vm_resp.has_field('image_id'):
408 vdur_dict.update({ "image_id": self.vm_resp.image_id })
409
410 if self.management_ip is not None:
411 vdur_dict["management_ip"] = self.management_ip
412
413 if self.vm_management_ip is not None:
414 vdur_dict["vm_management_ip"] = self.vm_management_ip
415
416 vdur_dict.update(vdu_copy_dict)
417
418 if self.vm_resp is not None:
419 if self._vm_resp.has_field('volumes'):
420 for opvolume in self._vm_resp.volumes:
421 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
422 if len(vdurvol_data) == 1:
423 vdurvol_data[0]["volume_id"] = opvolume.volume_id
424 if opvolume.has_field('custom_meta_data'):
425 metadata_list = list()
426 for metadata_item in opvolume.custom_meta_data:
427 metadata_list.append(metadata_item.as_dict())
428 vdurvol_data[0]['custom_meta_data'] = metadata_list
429
430 if self._vm_resp.has_field('supplemental_boot_data'):
431 vdur_dict['supplemental_boot_data'] = dict()
432 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
433 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
434 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
435 metadata_list = list()
436 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
437 metadata_list.append(metadata_item.as_dict())
438 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
439 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
440 file_list = list()
441 for file_item in self._vm_resp.supplemental_boot_data.config_file:
442 file_list.append(file_item.as_dict())
443 vdur_dict['supplemental_boot_data']['config_file'] = file_list
444
445 icp_list = []
446 ii_list = []
447
448 for intf, cp_id, vlr in self._int_intf:
449 cp = self.find_internal_cp_by_cp_id(cp_id)
450
451 icp_list.append({"name": cp.name,
452 "id": cp.id,
453 "type_yang": "VPORT",
454 "ip_address": self.cp_ip_addr(cp.id),
455 "mac_address": self.cp_mac_addr(cp.id)})
456
457 ii_list.append({"name": intf.name,
458 "vdur_internal_connection_point_ref": cp.id,
459 "virtual_interface": {}})
460
461 vdur_dict["internal_connection_point"] = icp_list
462 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
463 vdur_dict["internal_interface"] = ii_list
464
465 ei_list = []
466 for intf, cp, vlr in self._ext_intf:
467 ei_list.append({"name": cp.name,
468 "vnfd_connection_point_ref": cp.name,
469 "virtual_interface": {}})
470 self._vnfr.update_cp(cp.name,
471 self.cp_ip_addr(cp.name),
472 self.cp_mac_addr(cp.name),
473 self.cp_id(cp.name))
474
475 vdur_dict["external_interface"] = ei_list
476
477 placement_groups = []
478 for group in self._placement_groups:
479 placement_groups.append(group.as_dict())
480 vdur_dict['placement_groups_info'] = placement_groups
481
482 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
483
484 @property
485 def resmgr_path(self):
486 """ path for resource-mgr"""
487 return ("D,/rw-resource-mgr:resource-mgmt" +
488 "/vdu-event" +
489 "/vdu-event-data[event-id='{}']".format(self._request_id))
490
491 @property
492 def vm_flavor_msg(self):
493 """ VM flavor message """
494 flavor = self._vdud.vm_flavor.__class__()
495 flavor.copy_from(self._vdud.vm_flavor)
496
497 return flavor
498
499 @property
500 def vdud_cloud_init(self):
501 """ Return the cloud-init contents for the VDU """
502 if self._vdud_cloud_init is None:
503 self._vdud_cloud_init = self.cloud_init()
504
505 return self._vdud_cloud_init
506
507 def cloud_init(self):
508 """ Populate cloud_init with cloud-config script from
509 either the inline contents or from the file provided
510 """
511 if self._vdud.cloud_init is not None:
512 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
513 return self._vdud.cloud_init
514 elif self._vdud.cloud_init_file is not None:
515 # Get cloud-init script contents from the file provided in the cloud_init_file param
516 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
517 filename = self._vdud.cloud_init_file
518 self._vnfd_package_store.refresh()
519 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
520 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
521 try:
522 return cloud_init_extractor.read_script(stored_package, filename)
523 except rift.package.cloud_init.CloudInitExtractionError as e:
524 raise VirtualDeploymentUnitRecordError(e)
525 else:
526 self._log.debug("VDU Instantiation: cloud-init script not provided")
527
528 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
529 host_aggregates = []
530 availability_zones = []
531 server_groups = []
532 for group in self._placement_groups:
533 if group.has_field('host_aggregate'):
534 for aggregate in group.host_aggregate:
535 host_aggregates.append(aggregate.as_dict())
536 if group.has_field('availability_zone'):
537 availability_zones.append(group.availability_zone.as_dict())
538 if group.has_field('server_group'):
539 server_groups.append(group.server_group.as_dict())
540
541 if availability_zones:
542 if len(availability_zones) > 1:
543 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
544 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
545 else:
546 vm_create_msg_dict['availability_zone'] = availability_zones[0]
547
548 if server_groups:
549 if len(server_groups) > 1:
550 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
551 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
552 else:
553 vm_create_msg_dict['server_group'] = server_groups[0]
554
555 if host_aggregates:
556 vm_create_msg_dict['host_aggregate'] = host_aggregates
557
558 return
559
560 def process_placement_groups(self, vm_create_msg_dict):
561 """Process the placement_groups and fill resource-mgr request"""
562 if not self._placement_groups:
563 return
564
565 cloud_set = set([group.cloud_type for group in self._placement_groups])
566 assert len(cloud_set) == 1
567 cloud_type = cloud_set.pop()
568
569 if cloud_type == 'openstack':
570 self.process_openstack_placement_group_construct(vm_create_msg_dict)
571
572 else:
573 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
574 return
575
576 def process_custom_bootdata(self, vm_create_msg_dict):
577 """Process the custom boot data"""
578 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
579 return
580
581 self._vnfd_package_store.refresh()
582 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
583 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
584 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
585 if 'source' not in file_item or 'dest' not in file_item:
586 continue
587 source = file_item['source']
588 # Find source file in scripts dir of VNFD
589 self._log.debug("Checking for source config file at %s", source)
590 try:
591 source_file_str = cloud_init_extractor.read_script(stored_package, source)
592 except rift.package.cloud_init.CloudInitExtractionError as e:
593 raise VirtualDeploymentUnitRecordError(e)
594 # Update source file location with file contents
595 file_item['source'] = source_file_str
596
597 return
598
599 def resmgr_msg(self, config=None):
600 vdu_fields = ["vm_flavor",
601 "guest_epa",
602 "vswitch_epa",
603 "hypervisor_epa",
604 "host_epa",
605 "volumes",
606 "supplemental_boot_data"]
607
608 self._log.debug("Creating params based on VDUD: %s", self._vdud)
609 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
610
611 vm_create_msg_dict = {
612 "name": self.name,
613 }
614
615 if self.image_name is not None:
616 vm_create_msg_dict["image_name"] = self.image_name
617
618 if self.image_checksum is not None:
619 vm_create_msg_dict["image_checksum"] = self.image_checksum
620
621 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
622 if self._vdud.has_field('mgmt_vpci'):
623 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
624
625 self._log.debug("VDUD: %s", self._vdud)
626 if config is not None:
627 vm_create_msg_dict['vdu_init'] = {'userdata': config}
628
629 if self._mgmt_network:
630 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
631
632 cp_list = []
633 for intf, cp, vlr in self._ext_intf:
634 cp_info = {"name": cp.name,
635 "virtual_link_id": vlr.network_id,
636 "type_yang": intf.virtual_interface.type_yang,
637 "port_security_enabled": cp.port_security_enabled}
638
639 if (intf.virtual_interface.has_field('vpci') and
640 intf.virtual_interface.vpci is not None):
641 cp_info["vpci"] = intf.virtual_interface.vpci
642
643 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
644 cp_info['security_group'] = vlr.ip_profile_params.security_group
645
646 cp_list.append(cp_info)
647
648 for intf, cp, vlr in self._int_intf:
649 if (intf.virtual_interface.has_field('vpci') and
650 intf.virtual_interface.vpci is not None):
651 cp_list.append({"name": cp,
652 "virtual_link_id": vlr.network_id,
653 "type_yang": intf.virtual_interface.type_yang,
654 "vpci": intf.virtual_interface.vpci})
655 else:
656 cp_list.append({"name": cp,
657 "virtual_link_id": vlr.network_id,
658 "type_yang": intf.virtual_interface.type_yang,
659 "port_security_enabled": cp.port_security_enabled})
660
661 vm_create_msg_dict["connection_points"] = cp_list
662 vm_create_msg_dict.update(vdu_copy_dict)
663
664 self.process_placement_groups(vm_create_msg_dict)
665 if 'supplemental_boot_data' in vm_create_msg_dict:
666 self.process_custom_bootdata(vm_create_msg_dict)
667
668 msg = RwResourceMgrYang.VDUEventData()
669 msg.event_id = self._request_id
670 msg.cloud_account = self.cloud_account_name
671 msg.request_info.from_dict(vm_create_msg_dict)
672
673 return msg
674
675 @asyncio.coroutine
676 def terminate(self, xact):
677 """ Delete resource in VIM """
678 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
679 self._log.warning("VDU terminate in not ready state - Ignoring request")
680 return
681
682 self._state = VDURecordState.TERMINATING
683 if self._vm_resp is not None:
684 try:
685 with self._dts.transaction() as new_xact:
686 yield from self.delete_resource(new_xact)
687 except Exception:
688 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
689
690 if self._rm_regh is not None:
691 self._log.debug("Deregistering resource manager registration handle")
692 self._rm_regh.deregister()
693 self._rm_regh = None
694
695 if self._vdur_console_handler is not None:
696 self._log.error("Deregistering vnfr vdur registration handle")
697 self._vdur_console_handler._regh.deregister()
698 self._vdur_console_handler._regh = None
699
700 self._state = VDURecordState.TERMINATED
701
702 def find_internal_cp_by_cp_id(self, cp_id):
703 """ Find the CP corresponding to the connection point id"""
704 cp = None
705
706 self._log.debug("find_internal_cp_by_cp_id(%s) called",
707 cp_id)
708
709 for int_cp in self._vdud.internal_connection_point:
710 self._log.debug("Checking for int cp %s in internal connection points",
711 int_cp.id)
712 if int_cp.id == cp_id:
713 cp = int_cp
714 break
715
716 if cp is None:
717 self._log.debug("Failed to find cp %s in internal connection points",
718 cp_id)
719 msg = "Failed to find cp %s in internal connection points" % cp_id
720 raise VduRecordError(msg)
721
722 # return the VLR associated with the connection point
723 return cp
724
725 @asyncio.coroutine
726 def create_resource(self, xact, vnfr, config=None):
727 """ Request resource from ResourceMgr """
728 def find_cp_by_name(cp_name):
729 """ Find a connection point by name """
730 cp = None
731 self._log.debug("find_cp_by_name(%s) called", cp_name)
732 for ext_cp in vnfr._cprs:
733 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
734 if ext_cp.name == cp_name:
735 cp = ext_cp
736 break
737 if cp is None:
738 self._log.debug("Failed to find cp %s in external connection points",
739 cp_name)
740 return cp
741
742 def find_internal_vlr_by_cp_name(cp_name):
743 """ Find the VLR corresponding to the connection point name"""
744 cp = None
745
746 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
747 cp_name)
748
749 for int_cp in self._vdud.internal_connection_point:
750 self._log.debug("Checking for int cp %s in internal connection points",
751 int_cp.id)
752 if int_cp.id == cp_name:
753 cp = int_cp
754 break
755
756 if cp is None:
757 self._log.debug("Failed to find cp %s in internal connection points",
758 cp_name)
759 msg = "Failed to find cp %s in internal connection points" % cp_name
760 raise VduRecordError(msg)
761
762 # return the VLR associated with the connection point
763 return vnfr.find_vlr_by_cp(cp_name)
764
765 block = xact.block_create()
766
767 self._log.debug("Executing vm request id: %s, action: create",
768 self._request_id)
769
770 # Resolve the networks associated external interfaces
771 for ext_intf in self._vdud.external_interface:
772 self._log.debug("Resolving external interface name [%s], cp[%s]",
773 ext_intf.name, ext_intf.vnfd_connection_point_ref)
774 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
775 if cp is None:
776 self._log.debug("Failed to find connection point - %s",
777 ext_intf.vnfd_connection_point_ref)
778 continue
779 self._log.debug("Connection point name [%s], type[%s]",
780 cp.name, cp.type_yang)
781
782 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
783
784 etuple = (ext_intf, cp, vlr)
785 self._ext_intf.append(etuple)
786
787 self._log.debug("Created external interface tuple : %s", etuple)
788
789 # Resolve the networks associated internal interfaces
790 for intf in self._vdud.internal_interface:
791 cp_id = intf.vdu_internal_connection_point_ref
792 self._log.debug("Resolving internal interface name [%s], cp[%s]",
793 intf.name, cp_id)
794
795 try:
796 vlr = find_internal_vlr_by_cp_name(cp_id)
797 except Exception as e:
798 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
799 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
800 raise VduRecordError(msg)
801
802 ituple = (intf, cp_id, vlr)
803 self._int_intf.append(ituple)
804
805 self._log.debug("Created internal interface tuple : %s", ituple)
806
807 resmgr_path = self.resmgr_path
808 resmgr_msg = self.resmgr_msg(config)
809
810 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
811 block.add_query_create(resmgr_path, resmgr_msg)
812
813 res_iter = yield from block.execute(now=True)
814
815 resp = None
816
817 for i in res_iter:
818 r = yield from i
819 resp = r.result
820
821 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
822 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
823 self._log.debug("Got vm request response: %s", resp.resource_info)
824 return resp.resource_info
825
826 @asyncio.coroutine
827 def delete_resource(self, xact):
828 block = xact.block_create()
829
830 self._log.debug("Executing vm request id: %s, action: delete",
831 self._request_id)
832
833 block.add_query_delete(self.resmgr_path)
834
835 yield from block.execute(flags=0, now=True)
836
837 @asyncio.coroutine
838 def read_resource(self, xact):
839 block = xact.block_create()
840
841 self._log.debug("Executing vm request id: %s, action: delete",
842 self._request_id)
843
844 block.add_query_read(self.resmgr_path)
845
846 res_iter = yield from block.execute(flags=0, now=True)
847 for i in res_iter:
848 r = yield from i
849 resp = r.result
850
851 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
852 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
853 self._log.debug("Got vm request response: %s", resp.resource_info)
854 #self._vm_resp = resp.resource_info
855 return resp.resource_info
856
857
858 @asyncio.coroutine
859 def start_component(self):
860 """ This VDUR is active """
861 self._log.debug("Starting component %s for vdud %s vdur %s",
862 self._vdud.vcs_component_ref,
863 self._vdud,
864 self._vdur_id)
865 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
866 self.vm_resp.management_ip)
867
868 @property
869 def active(self):
870 """ Is this VDU active """
871 return True if self._state is VDURecordState.READY else False
872
873 @asyncio.coroutine
874 def instantiation_failed(self, failed_reason=None):
875 """ VDU instantiation failed """
876 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
877 self._state = VDURecordState.FAILED
878 self._state_failed_reason = failed_reason
879 yield from self._vnfr.instantiation_failed(failed_reason)
880
881 @asyncio.coroutine
882 def vdu_is_active(self):
883 """ This VDU is active"""
884 if self.active:
885 self._log.warning("VDU %s was already marked as active", self._vdur_id)
886 return
887
888 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
889
890 if self._vdud.vcs_component_ref is not None:
891 yield from self.start_component()
892
893 self._state = VDURecordState.READY
894
895 if self._vnfr.all_vdus_active():
896 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
897 yield from self._vnfr.is_ready()
898
899 @asyncio.coroutine
900 def instantiate(self, xact, vnfr, config=None):
901 """ Instantiate this VDU """
902 self._state = VDURecordState.INSTANTIATING
903
904 @asyncio.coroutine
905 def on_prepare(xact_info, query_action, ks_path, msg):
906 """ This VDUR is active """
907 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
908 query_action,
909 ks_path,
910 msg)
911
912 if (query_action == rwdts.QueryAction.UPDATE or
913 query_action == rwdts.QueryAction.CREATE):
914 self._vm_resp = msg
915
916 if msg.resource_state == "active":
917 # Move this VDU to ready state
918 yield from self.vdu_is_active()
919 elif msg.resource_state == "failed":
920 yield from self.instantiation_failed(msg.resource_errors)
921 elif query_action == rwdts.QueryAction.DELETE:
922 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
923 else:
924 raise NotImplementedError(
925 "%s action on VirtualDeployementUnitRecord not supported",
926 query_action)
927
928 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
929
930 try:
931 reg_event = asyncio.Event(loop=self._loop)
932
933 @asyncio.coroutine
934 def on_ready(regh, status):
935 reg_event.set()
936
937 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
938 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
939 flags=rwdts.Flag.SUBSCRIBER,
940 handler=handler)
941 yield from reg_event.wait()
942
943 vm_resp = yield from self.create_resource(xact, vnfr, config)
944 self._vm_resp = vm_resp
945
946 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
947 self._log.debug("Requested VM from resource manager response %s",
948 vm_resp)
949 if vm_resp.resource_state == "active":
950 self._log.debug("Resourcemgr responded wih an active vm resp %s",
951 vm_resp)
952 yield from self.vdu_is_active()
953 self._state = VDURecordState.READY
954 elif (vm_resp.resource_state == "pending" or
955 vm_resp.resource_state == "inactive"):
956 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
957 vm_resp)
958 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
959 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
960 # flags=rwdts.Flag.SUBSCRIBER,
961 # handler=handler)
962 else:
963 self._log.debug("Resourcemgr responded wih an error vm resp %s",
964 vm_resp)
965 raise VirtualDeploymentUnitRecordError(
966 "Failed VDUR instantiation %s " % vm_resp)
967
968 except Exception as e:
969 import traceback
970 traceback.print_exc()
971 self._log.exception(e)
972 self._log.error("Instantiation of VDU record failed: %s", str(e))
973 self._state = VDURecordState.FAILED
974 yield from self.instantiation_failed(str(e))
975
976
977 class VlRecordState(enum.Enum):
978 """ VL Record State """
979 INIT = 101
980 INSTANTIATION_PENDING = 102
981 ACTIVE = 103
982 TERMINATE_PENDING = 104
983 TERMINATED = 105
984 FAILED = 106
985
986
987 class InternalVirtualLinkRecord(object):
988 """ Internal Virtual Link record """
989 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
990 self._dts = dts
991 self._log = log
992 self._loop = loop
993 self._ivld_msg = ivld_msg
994 self._vnfr_name = vnfr_name
995 self._cloud_account_name = cloud_account_name
996
997 self._vlr_req = self.create_vlr()
998 self._vlr = None
999 self._state = VlRecordState.INIT
1000
1001 @property
1002 def vlr_id(self):
1003 """ Find VLR by id """
1004 return self._vlr_req.id
1005
1006 @property
1007 def name(self):
1008 """ Name of this VL """
1009 return self._vnfr_name + "." + self._ivld_msg.name
1010
1011 @property
1012 def network_id(self):
1013 """ Find VLR by id """
1014 return self._vlr.network_id if self._vlr else None
1015
1016 def vlr_path(self):
1017 """ VLR path for this VLR instance"""
1018 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1019
1020 def create_vlr(self):
1021 """ Create the VLR record which will be instantiated """
1022
1023 vld_fields = ["short_name",
1024 "vendor",
1025 "description",
1026 "version",
1027 "type_yang",
1028 "provider_network"]
1029
1030 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1031
1032 vlr_dict = {"id": str(uuid.uuid4()),
1033 "name": self.name,
1034 "cloud_account": self._cloud_account_name,
1035 }
1036 vlr_dict.update(vld_copy_dict)
1037
1038 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1039 return vlr
1040
1041 @asyncio.coroutine
1042 def instantiate(self, xact, restart_mode=False):
1043 """ Instantiate VL """
1044
1045 @asyncio.coroutine
1046 def instantiate_vlr():
1047 """ Instantiate VLR"""
1048 self._log.debug("Create VL with xpath %s and vlr %s",
1049 self.vlr_path(), self._vlr_req)
1050
1051 with self._dts.transaction(flags=0) as xact:
1052 block = xact.block_create()
1053 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1054 self._log.debug("Executing VL create path:%s msg:%s",
1055 self.vlr_path(), self._vlr_req)
1056
1057 res_iter = None
1058 try:
1059 res_iter = yield from block.execute()
1060 except Exception:
1061 self._state = VlRecordState.FAILED
1062 self._log.exception("Caught exception while instantial VL")
1063 raise
1064
1065 for ent in res_iter:
1066 res = yield from ent
1067 self._vlr = res.result
1068
1069 if self._vlr.operational_status == 'failed':
1070 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1071 self._state = VlRecordState.FAILED
1072 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1073
1074 self._log.info("Created VL with xpath %s and vlr %s",
1075 self.vlr_path(), self._vlr)
1076
1077 @asyncio.coroutine
1078 def get_vlr():
1079 """ Get the network id """
1080 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1081 vlr = None
1082 for ent in res_iter:
1083 res = yield from ent
1084 vlr = res.result
1085
1086 if vlr is None:
1087 err = "Failed to get VLR for path %s" % self.vlr_path()
1088 self._log.warn(err)
1089 raise InternalVirtualLinkRecordError(err)
1090 return vlr
1091
1092 self._state = VlRecordState.INSTANTIATION_PENDING
1093
1094 if restart_mode:
1095 vl = yield from get_vlr()
1096 if vl is None:
1097 yield from instantiate_vlr()
1098 else:
1099 yield from instantiate_vlr()
1100
1101 self._state = VlRecordState.ACTIVE
1102
1103 def vlr_in_vns(self):
1104 """ Is there a VLR record in VNS """
1105 if (self._state == VlRecordState.ACTIVE or
1106 self._state == VlRecordState.INSTANTIATION_PENDING or
1107 self._state == VlRecordState.FAILED):
1108 return True
1109
1110 return False
1111
1112 @asyncio.coroutine
1113 def terminate(self, xact):
1114 """Terminate this VL """
1115 if not self.vlr_in_vns():
1116 self._log.debug("Ignoring terminate request for id %s in state %s",
1117 self.vlr_id, self._state)
1118 return
1119
1120 self._log.debug("Terminating VL with path %s", self.vlr_path())
1121 self._state = VlRecordState.TERMINATE_PENDING
1122 block = xact.block_create()
1123 block.add_query_delete(self.vlr_path())
1124 yield from block.execute(flags=0, now=True)
1125 self._state = VlRecordState.TERMINATED
1126 self._log.debug("Terminated VL with path %s", self.vlr_path())
1127
1128
1129 class VirtualNetworkFunctionRecord(object):
1130 """ Virtual Network Function Record """
1131 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1132 self._dts = dts
1133 self._log = log
1134 self._loop = loop
1135 self._cluster_name = cluster_name
1136 self._vnfr_msg = vnfr_msg
1137 self._vnfr_id = vnfr_msg.id
1138 self._vnfd_id = vnfr_msg.vnfd.id
1139 self._vnfm = vnfm
1140 self._vcs_handler = vcs_handler
1141 self._vnfr = vnfr_msg
1142 self._mgmt_network = mgmt_network
1143
1144 self._vnfd = vnfr_msg.vnfd
1145 self._state = VirtualNetworkFunctionRecordState.INIT
1146 self._state_failed_reason = None
1147 self._ext_vlrs = {} # The list of external virtual links
1148 self._vlrs = [] # The list of internal virtual links
1149 self._vdus = [] # The list of vdu
1150 self._vlr_by_cp = {}
1151 self._cprs = []
1152 self._inventory = {}
1153 self._create_time = int(time.time())
1154 self._vnf_mon = None
1155 self._config_status = vnfr_msg.config_status
1156 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1157 self._rw_vnfd = None
1158 self._vnfd_ref_count = 0
1159
1160 def _get_vdur_from_vdu_id(self, vdu_id):
1161 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1162 self._log.debug("Searching through vdus: %s", self._vdus)
1163 for vdu in self._vdus:
1164 self._log.debug("vdu_id: %s", vdu.vdu_id)
1165 if vdu.vdu_id == vdu_id:
1166 return vdu
1167
1168 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1169
1170 @property
1171 def operational_status(self):
1172 """ Operational status of this VNFR """
1173 op_status_map = {"INIT": "init",
1174 "VL_INIT_PHASE": "vl_init_phase",
1175 "VM_INIT_PHASE": "vm_init_phase",
1176 "READY": "running",
1177 "TERMINATE": "terminate",
1178 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1179 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1180 "TERMINATED": "terminated",
1181 "FAILED": "failed", }
1182 return op_status_map[self._state.name]
1183
1184 @staticmethod
1185 def vnfd_xpath(vnfd_id):
1186 """ VNFD xpath associated with this VNFR """
1187 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1188
1189 @property
1190 def vnfd_ref_count(self):
1191 """ Returns the VNFD reference count associated with this VNFR """
1192 return self._vnfd_ref_count
1193
1194 def vnfd_in_use(self):
1195 """ Returns whether vnfd is in use or not """
1196 return True if self._vnfd_ref_count > 0 else False
1197
1198 def vnfd_ref(self):
1199 """ Take a reference on this object """
1200 self._vnfd_ref_count += 1
1201 return self._vnfd_ref_count
1202
1203 def vnfd_unref(self):
1204 """ Release reference on this object """
1205 if self._vnfd_ref_count < 1:
1206 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1207 (self.vnfd.id, self._vnfd_ref_count))
1208 self._log.critical(msg)
1209 raise VnfRecordError(msg)
1210 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1211 self.vnfd.id, self._vnfd_ref_count)
1212 self._vnfd_ref_count -= 1
1213 return self._vnfd_ref_count
1214
1215 @property
1216 def vnfd(self):
1217 """ VNFD for this VNFR """
1218 return self._vnfd
1219
1220 @property
1221 def vnf_name(self):
1222 """ VNFD name associated with this VNFR """
1223 return self.vnfd.name
1224
1225 @property
1226 def name(self):
1227 """ Name of this VNF in the record """
1228 return self._vnfr.name
1229
1230 @property
1231 def cloud_account_name(self):
1232 """ Name of the cloud account this VNFR is instantiated in """
1233 return self._vnfr.cloud_account
1234
1235 @property
1236 def vnfd_id(self):
1237 """ VNFD Id associated with this VNFR """
1238 return self.vnfd.id
1239
1240 @property
1241 def vnfr_id(self):
1242 """ VNFR Id associated with this VNFR """
1243 return self._vnfr_id
1244
1245 @property
1246 def member_vnf_index(self):
1247 """ Member VNF index associated with this VNFR """
1248 return self._vnfr.member_vnf_index_ref
1249
1250 @property
1251 def config_status(self):
1252 """ Config agent status for this VNFR """
1253 return self._config_status
1254
1255 def component_by_name(self, component_name):
1256 """ Find a component by name in the inventory list"""
1257 mangled_name = VcsComponent.mangle_name(component_name,
1258 self.vnf_name,
1259 self.vnfd_id)
1260 return self._inventory[mangled_name]
1261
1262
1263
1264 @asyncio.coroutine
1265 def get_nsr_config(self):
1266 ### Need access to NS instance configuration for runtime resolution.
1267 ### This shall be replaced when deployment flavors are implemented
1268 xpath = "C,/nsr:ns-instance-config"
1269 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1270
1271 for result in results:
1272 entry = yield from result
1273 ns_instance_config = entry.result
1274 for nsr in ns_instance_config.nsr:
1275 if nsr.id == self._vnfr_msg.nsr_id_ref:
1276 return nsr
1277 return None
1278
1279 @asyncio.coroutine
1280 def start_component(self, component_name, ip_addr):
1281 """ Start a component in the VNFR by name """
1282 comp = self.component_by_name(component_name)
1283 yield from comp.start(None, None, ip_addr)
1284
1285 def cp_ip_addr(self, cp_name):
1286 """ Get ip address for connection point """
1287 self._log.debug("cp_ip_addr()")
1288 for cp in self._cprs:
1289 if cp.name == cp_name and cp.ip_address is not None:
1290 return cp.ip_address
1291 return "0.0.0.0"
1292
1293 def mgmt_intf_info(self):
1294 """ Get Management interface info for this VNFR """
1295 mgmt_intf_desc = self.vnfd.mgmt_interface
1296 ip_addr = None
1297 if mgmt_intf_desc.has_field("cp"):
1298 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1299 elif mgmt_intf_desc.has_field("vdu_id"):
1300 try:
1301 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1302 ip_addr = vdur.management_ip
1303 except VDURecordNotFound:
1304 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1305 ip_addr = None
1306 else:
1307 ip_addr = mgmt_intf_desc.ip_address
1308 port = mgmt_intf_desc.port
1309
1310 return ip_addr, port
1311
1312 @property
1313 def msg(self):
1314 """ Message associated with this VNFR """
1315 vnfd_fields = ["short_name", "vendor", "description", "version"]
1316 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1317
1318 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1319 ip_address, port = self.mgmt_intf_info()
1320
1321 if ip_address is not None:
1322 mgmt_intf.ip_address = ip_address
1323 if port is not None:
1324 mgmt_intf.port = port
1325
1326 vnfr_dict = {"id": self._vnfr_id,
1327 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1328 "name": self.name,
1329 "member_vnf_index_ref": self.member_vnf_index,
1330 "operational_status": self.operational_status,
1331 "operational_status_details": self._state_failed_reason,
1332 "cloud_account": self.cloud_account_name,
1333 "config_status": self._config_status
1334 }
1335
1336 vnfr_dict.update(vnfd_copy_dict)
1337
1338 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1339 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1340
1341 vnfr_msg.create_time = self._create_time
1342 vnfr_msg.uptime = int(time.time()) - self._create_time
1343 vnfr_msg.mgmt_interface = mgmt_intf
1344
1345 # Add all the VLRs to VNFR
1346 for vlr in self._vlrs:
1347 ivlr = vnfr_msg.internal_vlr.add()
1348 ivlr.vlr_ref = vlr.vlr_id
1349
1350 # Add all the VDURs to VDUR
1351 if self._vdus is not None:
1352 for vdu in self._vdus:
1353 vdur = vnfr_msg.vdur.add()
1354 vdur.from_dict(vdu.msg.as_dict())
1355
1356 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1357 vnfr_msg.dashboard_url = self.dashboard_url
1358
1359 for cpr in self._cprs:
1360 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1361 vnfr_msg.connection_point.append(new_cp)
1362
1363 if self._vnf_mon is not None:
1364 for monp in self._vnf_mon.msg:
1365 vnfr_msg.monitoring_param.append(
1366 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1367
1368 if self._vnfr.vnf_configuration is not None:
1369 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1370 if (ip_address is not None and
1371 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1372 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1373
1374 for group in self._vnfr_msg.placement_groups_info:
1375 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1376 group_info.from_dict(group.as_dict())
1377 vnfr_msg.placement_groups_info.append(group_info)
1378
1379 return vnfr_msg
1380
1381 @property
1382 def dashboard_url(self):
1383 ip, cfg_port = self.mgmt_intf_info()
1384 protocol = 'http'
1385 http_port = 80
1386 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1387 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1388 protocol = 'https'
1389 http_port = 443
1390 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1391 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1392
1393 url = "{protocol}://{ip_address}:{port}/{path}".format(
1394 protocol=protocol,
1395 ip_address=ip,
1396 port=http_port,
1397 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1398 )
1399
1400 return url
1401
1402 @property
1403 def xpath(self):
1404 """ path for this VNFR """
1405 return("D,/vnfr:vnfr-catalog"
1406 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1407
1408 @asyncio.coroutine
1409 def publish(self, xact):
1410 """ publish this VNFR """
1411 vnfr = self.msg
1412 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1413 self.xpath, self.msg)
1414 vnfr.create_time = self._create_time
1415 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1416 self._log.debug("Published VNFR path = [%s], record = [%s]",
1417 self.xpath, self.msg)
1418
1419 @asyncio.coroutine
1420 def create_vls(self):
1421 """ Publish The VLs associated with this VNF """
1422 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1423 self.vnfd_id)
1424 for ivld_msg in self.vnfd.internal_vld:
1425 self._log.debug("Creating internal vld:"
1426 " %s, int_cp_ref = %s",
1427 ivld_msg, ivld_msg.internal_connection_point
1428 )
1429 vlr = InternalVirtualLinkRecord(dts=self._dts,
1430 log=self._log,
1431 loop=self._loop,
1432 ivld_msg=ivld_msg,
1433 vnfr_name=self.name,
1434 cloud_account_name=self.cloud_account_name
1435 )
1436 self._vlrs.append(vlr)
1437
1438 for int_cp in ivld_msg.internal_connection_point:
1439 if int_cp.id_ref in self._vlr_by_cp:
1440 msg = ("Connection point %s already "
1441 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1442 raise InternalVirtualLinkRecordError(msg)
1443 self._log.debug("Setting vlr %s to internal cp = %s",
1444 vlr, int_cp.id_ref)
1445 self._vlr_by_cp[int_cp.id_ref] = vlr
1446
1447 @asyncio.coroutine
1448 def instantiate_vls(self, xact, restart_mode=False):
1449 """ Instantiate the VLs associated with this VNF """
1450 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1451 self.vnfd_id)
1452
1453 for vlr in self._vlrs:
1454 self._log.debug("Instantiating VLR %s", vlr)
1455 yield from vlr.instantiate(xact, restart_mode)
1456
1457 def find_vlr_by_cp(self, cp_name):
1458 """ Find the VLR associated with the cp name """
1459 return self._vlr_by_cp[cp_name]
1460
1461 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1462 """
1463 Returns the cloud specific construct for placement group
1464 Arguments:
1465 input_group: VNFD PlacementGroup
1466 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1467 """
1468 copy_dict = ['name', 'requirement', 'strategy']
1469 for group_info in nsr_config.vnfd_placement_group_maps:
1470 if group_info.placement_group_ref == input_group.name and \
1471 group_info.vnfd_id_ref == self.vnfd_id:
1472 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1473 group_dict = {k:v for k,v in
1474 group_info.as_dict().items()
1475 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1476 for param in copy_dict:
1477 group_dict.update({param: getattr(input_group, param)})
1478 group.from_dict(group_dict)
1479 return group
1480 return None
1481
1482 @asyncio.coroutine
1483 def get_vdu_placement_groups(self, vdu):
1484 placement_groups = []
1485 ### Step-1: Get VNF level placement groups
1486 for group in self._vnfr_msg.placement_groups_info:
1487 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1488 #group_info.from_dict(group.as_dict())
1489 placement_groups.append(group)
1490
1491 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1492 nsr_config = yield from self.get_nsr_config()
1493
1494 ### Step-3: Get VDU level placement groups
1495 for group in self.vnfd.placement_groups:
1496 for member_vdu in group.member_vdus:
1497 if member_vdu.member_vdu_ref == vdu.id:
1498 group_info = self.resolve_placement_group_cloud_construct(group,
1499 nsr_config)
1500 if group_info is None:
1501 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1502 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1503 else:
1504 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1505 str(group_info),
1506 vdu.name,
1507 self.vnf_name,
1508 self.member_vnf_index)
1509 placement_groups.append(group_info)
1510
1511 return placement_groups
1512
1513 @asyncio.coroutine
1514 def create_vdus(self, vnfr, restart_mode=False):
1515 """ Create the VDUs associated with this VNF """
1516
1517 def get_vdur_id(vdud):
1518 """Get the corresponding VDUR's id for the VDUD. This is useful in
1519 case of a restart.
1520
1521 In restart mode we check for exiting VDUR's ID and use them, if
1522 available. This way we don't end up creating duplicate VDURs
1523 """
1524 vdur_id = None
1525
1526 if restart_mode and vdud is not None:
1527 try:
1528 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1529 vdur_id = vdur[0]
1530 except IndexError:
1531 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1532
1533 return vdur_id
1534
1535
1536 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1537 for vdu in self._rw_vnfd.vdu:
1538 self._log.debug("Creating vdu: %s", vdu)
1539 vdur_id = get_vdur_id(vdu)
1540
1541 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1542 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1543 vdu.name,
1544 self.vnf_name,
1545 self.member_vnf_index,
1546 [ group.name for group in placement_groups])
1547
1548 vdur = VirtualDeploymentUnitRecord(
1549 dts=self._dts,
1550 log=self._log,
1551 loop=self._loop,
1552 vdud=vdu,
1553 vnfr=vnfr,
1554 mgmt_intf=self.has_mgmt_interface(vdu),
1555 mgmt_network=self._mgmt_network,
1556 cloud_account_name=self.cloud_account_name,
1557 vnfd_package_store=self._vnfd_package_store,
1558 vdur_id=vdur_id,
1559 placement_groups = placement_groups,
1560 )
1561 yield from vdur.vdu_opdata_register()
1562
1563 self._vdus.append(vdur)
1564
1565 @asyncio.coroutine
1566 def instantiate_vdus(self, xact, vnfr):
1567 """ Instantiate the VDUs associated with this VNF """
1568 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1569
1570 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1571
1572 # Identify any dependencies among the VDUs
1573 dependencies = collections.defaultdict(list)
1574 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1575
1576 for vdu in self._vdus:
1577 if vdu.vdud_cloud_init is not None:
1578 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1579 if vdu_id != vdu.vdu_id:
1580 # This means that vdu.vdu_id depends upon vdu_id,
1581 # i.e. vdu_id must be instantiated before
1582 # vdu.vdu_id.
1583 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1584
1585 # Define the terminal states of VDU instantiation
1586 terminal = (
1587 VDURecordState.READY,
1588 VDURecordState.TERMINATED,
1589 VDURecordState.FAILED,
1590 )
1591
1592 datastore = VdurDatastore()
1593 processed = set()
1594
1595 @asyncio.coroutine
1596 def instantiate_monitor(vdu):
1597 """Monitor the state of the VDU during instantiation
1598
1599 Arguments:
1600 vdu - a VirtualDeploymentUnitRecord
1601
1602 """
1603 # wait for the VDUR to enter a terminal state
1604 while vdu._state not in terminal:
1605 yield from asyncio.sleep(1, loop=self._loop)
1606
1607 # update the datastore
1608 datastore.update(vdu)
1609
1610 # add the VDU to the set of processed VDUs
1611 processed.add(vdu.vdu_id)
1612
1613 @asyncio.coroutine
1614 def instantiate(vdu):
1615 """Instantiate the specified VDU
1616
1617 Arguments:
1618 vdu - a VirtualDeploymentUnitRecord
1619
1620 Raises:
1621 if the VDU, or any of the VDUs this VDU depends upon, are
1622 terminated or fail to instantiate properly, a
1623 VirtualDeploymentUnitRecordError is raised.
1624
1625 """
1626 for dependency in dependencies[vdu.vdu_id]:
1627 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1628
1629 while dependency.vdu_id not in processed:
1630 yield from asyncio.sleep(1, loop=self._loop)
1631
1632 if not dependency.active:
1633 raise VirtualDeploymentUnitRecordError()
1634
1635 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1636
1637 # Populate the datastore with the current values of the VDU
1638 datastore.add(vdu)
1639
1640 # Substitute any variables contained in the cloud config script
1641 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1642
1643 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1644 if len(parts) > 1:
1645
1646 # Extract the variable names
1647 variables = list()
1648 for variable in parts[1::2]:
1649 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1650
1651 # Iterate of the variables and substitute values from the
1652 # datastore.
1653 for variable in variables:
1654
1655 # Handle a reference to a VDU by ID
1656 if variable.startswith('vdu['):
1657 value = datastore.get(variable)
1658 if value is None:
1659 msg = "Unable to find a substitute for {} in {} cloud-init script"
1660 raise ValueError(msg.format(variable, vdu.vdu_id))
1661
1662 config = config.replace("{{ %s }}" % variable, value)
1663 continue
1664
1665 # Handle a reference to the current VDU
1666 if variable.startswith('vdu'):
1667 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1668 config = config.replace("{{ %s }}" % variable, value)
1669 continue
1670
1671 # Handle unrecognized variables
1672 msg = 'unrecognized cloud-config variable: {}'
1673 raise ValueError(msg.format(variable))
1674
1675 # Instantiate the VDU
1676 with self._dts.transaction() as xact:
1677 self._log.debug("Instantiating vdu: %s", vdu)
1678 yield from vdu.instantiate(xact, vnfr, config=config)
1679 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1680 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1681 self.vnfr_id, vdu)
1682
1683 # First create a set of tasks to monitor the state of the VDUs and
1684 # report when they have entered a terminal state
1685 for vdu in self._vdus:
1686 self._loop.create_task(instantiate_monitor(vdu))
1687
1688 for vdu in self._vdus:
1689 self._loop.create_task(instantiate(vdu))
1690
1691 def has_mgmt_interface(self, vdu):
1692 # ## TODO: Support additional mgmt_interface type options
1693 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1694 return True
1695 return False
1696
1697 def vlr_xpath(self, vlr_id):
1698 """ vlr xpath """
1699 return(
1700 "D,/vlr:vlr-catalog/"
1701 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1702
1703 def ext_vlr_by_id(self, vlr_id):
1704 """ find ext vlr by id """
1705 return self._ext_vlrs[vlr_id]
1706
1707 @asyncio.coroutine
1708 def publish_inventory(self, xact):
1709 """ Publish the inventory associated with this VNF """
1710 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1711
1712 for component in self._rw_vnfd.component:
1713 self._log.debug("Creating inventory component %s", component)
1714 mangled_name = VcsComponent.mangle_name(component.component_name,
1715 self.vnf_name,
1716 self.vnfd_id
1717 )
1718 comp = VcsComponent(dts=self._dts,
1719 log=self._log,
1720 loop=self._loop,
1721 cluster_name=self._cluster_name,
1722 vcs_handler=self._vcs_handler,
1723 component=component,
1724 mangled_name=mangled_name,
1725 )
1726 if comp.name in self._inventory:
1727 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1728 component, self._vnfd_id)
1729 return
1730 self._log.debug("Adding component %s for vnrf %s",
1731 comp.name, self._vnfr_id)
1732 self._inventory[comp.name] = comp
1733 yield from comp.publish(xact)
1734
1735 def all_vdus_active(self):
1736 """ Are all VDUS in this VNFR active? """
1737 for vdu in self._vdus:
1738 if not vdu.active:
1739 return False
1740
1741 self._log.debug("Inside all_vdus_active. Returning True")
1742 return True
1743
1744 @asyncio.coroutine
1745 def instantiation_failed(self, failed_reason=None):
1746 """ VNFR instantiation failed """
1747 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1748 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1749 self._state_failed_reason = failed_reason
1750
1751 # Update the VNFR with the changed status
1752 yield from self.publish(None)
1753
1754 @asyncio.coroutine
1755 def is_ready(self):
1756 """ This VNF is ready"""
1757 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1758
1759 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1760 self.set_state(VirtualNetworkFunctionRecordState.READY)
1761
1762 else:
1763 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1764
1765 # Update the VNFR with the changed status
1766 yield from self.publish(None)
1767
1768 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1769 """Updated the connection point with ip address"""
1770 for cp in self._cprs:
1771 if cp.name == cp_name:
1772 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1773 cp_name, cp, ip_address, cp_id)
1774 cp.ip_address = ip_address
1775 cp.mac_address = mac_addr
1776 cp.connection_point_id = cp_id
1777 return
1778
1779 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1780 self._log.debug(err)
1781 raise VirtualDeploymentUnitRecordError(err)
1782
1783 def set_state(self, state):
1784 """ Set state for this VNFR"""
1785 self._state = state
1786
1787 @asyncio.coroutine
1788 def instantiate(self, xact, restart_mode=False):
1789 """ instantiate this VNF """
1790 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1791 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1792
1793 @asyncio.coroutine
1794 def fetch_vlrs():
1795 """ Fetch VLRs """
1796 # Iterate over all the connection points in VNFR and fetch the
1797 # associated VLRs
1798
1799 def cpr_from_cp(cp):
1800 """ Creates a record level connection point from the desciptor cp"""
1801 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1802 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1803 cpr_dict = {}
1804 cpr_dict.update(cp_copy_dict)
1805 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1806
1807 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1808 self._vnfr_id, self._vnfr.connection_point)
1809
1810 for cp in self._vnfr.connection_point:
1811 cpr = cpr_from_cp(cp)
1812 self._cprs.append(cpr)
1813 self._log.debug("Adding Connection point record %s ", cp)
1814
1815 vlr_path = self.vlr_xpath(cp.vlr_ref)
1816 self._log.debug("Fetching VLR with path = %s", vlr_path)
1817 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1818 rwdts.XactFlag.MERGE)
1819 for i in res_iter:
1820 r = yield from i
1821 d = r.result
1822 self._ext_vlrs[cp.vlr_ref] = d
1823 cpr.vlr_ref = cp.vlr_ref
1824 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1825
1826 # Increase the VNFD reference count
1827 self.vnfd_ref()
1828
1829 assert self.vnfd
1830
1831 # Fetch External VLRs
1832 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1833 yield from fetch_vlrs()
1834
1835 # Publish inventory
1836 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1837 yield from self.publish_inventory(xact)
1838
1839 # Publish inventory
1840 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1841 yield from self.create_vls()
1842
1843 # publish the VNFR
1844 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1845 yield from self.publish(xact)
1846
1847 # instantiate VLs
1848 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1849 try:
1850 yield from self.instantiate_vls(xact, restart_mode)
1851 except Exception as e:
1852 self._log.exception("VL instantiation failed (%s)", str(e))
1853 yield from self.instantiation_failed(str(e))
1854 return
1855
1856 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1857
1858 # instantiate VDUs
1859 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1860 yield from self.create_vdus(self, restart_mode)
1861
1862 # publish the VNFR
1863 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1864 yield from self.publish(xact)
1865
1866 # instantiate VDUs
1867 # ToDo: Check if this should be prevented during restart
1868 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1869 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1870
1871 # publish the VNFR
1872 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1873 yield from self.publish(xact)
1874
1875 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1876
1877 # create task updating uptime for this vnfr
1878 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1879 self._loop.create_task(self.vnfr_uptime_update(xact))
1880
1881 @asyncio.coroutine
1882 def terminate(self, xact):
1883 """ Terminate this virtual network function """
1884
1885 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1886
1887 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1888
1889 # stop monitoring
1890 if self._vnf_mon is not None:
1891 self._vnf_mon.stop()
1892 self._vnf_mon.deregister()
1893 self._vnf_mon = None
1894
1895 @asyncio.coroutine
1896 def terminate_vls():
1897 """ Terminate VLs in this VNF """
1898 for vl in self._vlrs:
1899 yield from vl.terminate(xact)
1900
1901 @asyncio.coroutine
1902 def terminate_vdus():
1903 """ Terminate VDUS in this VNF """
1904 for vdu in self._vdus:
1905 yield from vdu.terminate(xact)
1906
1907 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1908 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1909 yield from terminate_vls()
1910
1911 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1912 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1913 yield from terminate_vdus()
1914
1915 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1916 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1917
1918 @asyncio.coroutine
1919 def vnfr_uptime_update(self, xact):
1920 while True:
1921 # Return when vnfr state is FAILED or TERMINATED etc
1922 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1923 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1924 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1925 VirtualNetworkFunctionRecordState.READY]:
1926 return
1927 yield from self.publish(xact)
1928 yield from asyncio.sleep(2, loop=self._loop)
1929
1930
1931
1932 class VnfdDtsHandler(object):
1933 """ DTS handler for VNFD config changes """
1934 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1935
1936 def __init__(self, dts, log, loop, vnfm):
1937 self._dts = dts
1938 self._log = log
1939 self._loop = loop
1940 self._vnfm = vnfm
1941 self._regh = None
1942
1943 @asyncio.coroutine
1944 def regh(self):
1945 """ DTS registration handle """
1946 return self._regh
1947
1948 @asyncio.coroutine
1949 def register(self):
1950 """ Register for VNFD configuration"""
1951
1952 def on_apply(dts, acg, xact, action, scratch):
1953 """Apply the configuration"""
1954 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1955 xact, action, scratch)
1956
1957 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1958
1959 @asyncio.coroutine
1960 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1961 """ on prepare callback """
1962 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1963 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1964 fref = ProtobufC.FieldReference.alloc()
1965 fref.goto_whole_message(msg.to_pbcm())
1966
1967 # Handle deletes in prepare_callback
1968 if fref.is_field_deleted():
1969 # Delete an VNFD record
1970 self._log.debug("Deleting VNFD with id %s", msg.id)
1971 if self._vnfm.vnfd_in_use(msg.id):
1972 self._log.debug("Cannot delete VNFD in use - %s", msg)
1973 err = "Cannot delete a VNFD in use - %s" % msg
1974 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1975 # Delete a VNFD record
1976 yield from self._vnfm.delete_vnfd(msg.id)
1977
1978 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1979
1980 self._log.debug(
1981 "Registering for VNFD config using xpath: %s",
1982 VnfdDtsHandler.XPATH,
1983 )
1984 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1985 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1986 self._regh = acg.register(
1987 xpath=VnfdDtsHandler.XPATH,
1988 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1989 on_prepare=on_prepare)
1990
1991
1992 class VcsComponentDtsHandler(object):
1993 """ Vcs Component DTS handler """
1994 XPATH = ("D,/rw-manifest:manifest" +
1995 "/rw-manifest:operational-inventory" +
1996 "/rw-manifest:component")
1997
1998 def __init__(self, dts, log, loop, vnfm):
1999 self._dts = dts
2000 self._log = log
2001 self._loop = loop
2002 self._regh = None
2003 self._vnfm = vnfm
2004
2005 @property
2006 def regh(self):
2007 """ DTS registration handle """
2008 return self._regh
2009
2010 @asyncio.coroutine
2011 def register(self):
2012 """ Registers VCS component dts publisher registration"""
2013 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2014 VcsComponentDtsHandler.XPATH)
2015
2016 hdl = rift.tasklets.DTS.RegistrationHandler()
2017 handlers = rift.tasklets.Group.Handler()
2018 with self._dts.group_create(handler=handlers) as group:
2019 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2020 handler=hdl,
2021 flags=(rwdts.Flag.PUBLISHER |
2022 rwdts.Flag.NO_PREP_READ |
2023 rwdts.Flag.DATASTORE),)
2024
2025 @asyncio.coroutine
2026 def publish(self, xact, path, msg):
2027 """ Publishes the VCS component """
2028 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2029 xact, path, msg)
2030 self.regh.create_element(path, msg)
2031 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2032 VcsComponentDtsHandler.XPATH, xact, path, msg)
2033
2034 class VnfrConsoleOperdataDtsHandler(object):
2035 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2036 @property
2037 def vnfr_vdu_console_xpath(self):
2038 """ path for resource-mgr"""
2039 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2040
2041 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2042 self._dts = dts
2043 self._log = log
2044 self._loop = loop
2045 self._regh = None
2046 self._vnfm = vnfm
2047
2048 self._vnfr_id = vnfr_id
2049 self._vdur_id = vdur_id
2050 self._vdu_id = vdu_id
2051
2052 @asyncio.coroutine
2053 def register(self):
2054 """ Register for VNFR VDU Operational Data read from dts """
2055
2056 @asyncio.coroutine
2057 def on_prepare(xact_info, action, ks_path, msg):
2058 """ prepare callback from dts """
2059 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2060 self._log.debug(
2061 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2062 xact_info, action, xpath, msg
2063 )
2064
2065 if action == rwdts.QueryAction.READ:
2066 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2067 path_entry = schema.keyspec_to_entry(ks_path)
2068 self._log.debug("VDU Opdata path is {}".format(path_entry))
2069 try:
2070 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2071 except VnfRecordError as e:
2072 self._log.error("VNFR id %s not found", self._vnfr_id)
2073 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2074 return
2075 try:
2076 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2077 if not vdur._state == VDURecordState.READY:
2078 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2079 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2080 return
2081 with self._dts.transaction() as new_xact:
2082 resp = yield from vdur.read_resource(new_xact)
2083 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2084 vdur_console.id = self._vdur_id
2085 if resp.console_url:
2086 vdur_console.console_url = resp.console_url
2087 else:
2088 vdur_console.console_url = 'none'
2089 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2090 except Exception:
2091 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2092 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2093 vdur_console.id = self._vdur_id
2094 vdur_console.console_url = 'none'
2095
2096 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2097 xpath=self.vnfr_vdu_console_xpath,
2098 msg=vdur_console)
2099 else:
2100 #raise VnfRecordError("Not supported operation %s" % action)
2101 self._log.error("Not supported operation %s" % action)
2102 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2103 return
2104
2105
2106 self._log.debug("Registering for VNFR VDU using xpath: %s",
2107 self.vnfr_vdu_console_xpath)
2108 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2109 with self._dts.group_create() as group:
2110 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2111 handler=hdl,
2112 flags=rwdts.Flag.PUBLISHER,
2113 )
2114
2115
2116 class VnfrDtsHandler(object):
2117 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2118 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2119
2120 def __init__(self, dts, log, loop, vnfm):
2121 self._dts = dts
2122 self._log = log
2123 self._loop = loop
2124 self._vnfm = vnfm
2125
2126 self._regh = None
2127
2128 @property
2129 def regh(self):
2130 """ Return registration handle"""
2131 return self._regh
2132
2133 @property
2134 def vnfm(self):
2135 """ Return VNF manager instance """
2136 return self._vnfm
2137
2138 @asyncio.coroutine
2139 def register(self):
2140 """ Register for vnfr create/update/delete/read requests from dts """
2141 def on_commit(xact_info):
2142 """ The transaction has been committed """
2143 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2144 return rwdts.MemberRspCode.ACTION_OK
2145
2146 def on_abort(*args):
2147 """ Abort callback """
2148 self._log.debug("VNF transaction got aborted")
2149
2150 @asyncio.coroutine
2151 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2152
2153 @asyncio.coroutine
2154 def instantiate_realloc_vnfr(vnfr):
2155 """Re-populate the vnfm after restart
2156
2157 Arguments:
2158 vlink
2159
2160 """
2161
2162 yield from vnfr.instantiate(None, restart_mode=True)
2163
2164 if xact_event == rwdts.MemberEvent.INSTALL:
2165 curr_cfg = self.regh.elements
2166 for cfg in curr_cfg:
2167 vnfr = self.vnfm.create_vnfr(cfg)
2168 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2169
2170 self._log.debug("Got on_event in vnfm")
2171
2172 return rwdts.MemberRspCode.ACTION_OK
2173
2174 @asyncio.coroutine
2175 def on_prepare(xact_info, action, ks_path, msg):
2176 """ prepare callback from dts """
2177 self._log.debug(
2178 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2179 xact_info, action, msg
2180 )
2181
2182 if action == rwdts.QueryAction.CREATE:
2183 if not msg.has_field("vnfd"):
2184 err = "Vnfd not provided"
2185 self._log.error(err)
2186 raise VnfRecordError(err)
2187
2188 vnfr = self.vnfm.create_vnfr(msg)
2189 try:
2190 # RIFT-9105: Unable to add a READ query under an existing transaction
2191 # xact = xact_info.xact
2192 yield from vnfr.instantiate(None)
2193 except Exception as e:
2194 self._log.exception(e)
2195 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2196 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2197 yield from vnfr.publish(None)
2198 elif action == rwdts.QueryAction.DELETE:
2199 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2200 path_entry = schema.keyspec_to_entry(ks_path)
2201 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2202
2203 if vnfr is None:
2204 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2205 raise VirtualNetworkFunctionRecordNotFound(
2206 "VNFR id %s", path_entry.key00.id)
2207
2208 try:
2209 yield from vnfr.terminate(xact_info.xact)
2210 # Unref the VNFD
2211 vnfr.vnfd_unref()
2212 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2213 except Exception as e:
2214 self._log.exception(e)
2215 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2216
2217 elif action == rwdts.QueryAction.UPDATE:
2218 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2219 path_entry = schema.keyspec_to_entry(ks_path)
2220 vnfr = None
2221 try:
2222 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2223 except Exception as e:
2224 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2225 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2226 return
2227
2228 if vnfr is None:
2229 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2230 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2231 return
2232
2233 self._log.debug("VNFR {} update config status {} (current {})".
2234 format(vnfr.name, msg.config_status, vnfr.config_status))
2235 # Update the config status and publish
2236 vnfr._config_status = msg.config_status
2237 yield from vnfr.publish(None)
2238
2239 else:
2240 raise NotImplementedError(
2241 "%s action on VirtualNetworkFunctionRecord not supported",
2242 action)
2243
2244 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2245
2246 self._log.debug("Registering for VNFR using xpath: %s",
2247 VnfrDtsHandler.XPATH,)
2248
2249 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2250 on_prepare=on_prepare,)
2251 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2252 with self._dts.group_create(handler=handlers) as group:
2253 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2254 handler=hdl,
2255 flags=(rwdts.Flag.PUBLISHER |
2256 rwdts.Flag.NO_PREP_READ |
2257 rwdts.Flag.CACHE |
2258 rwdts.Flag.DATASTORE),)
2259
2260 @asyncio.coroutine
2261 def create(self, xact, path, msg):
2262 """
2263 Create a VNFR record in DTS with path and message
2264 """
2265 self._log.debug("Creating VNFR xact = %s, %s:%s",
2266 xact, path, msg)
2267
2268 self.regh.create_element(path, msg)
2269 self._log.debug("Created VNFR xact = %s, %s:%s",
2270 xact, path, msg)
2271
2272 @asyncio.coroutine
2273 def update(self, xact, path, msg):
2274 """
2275 Update a VNFR record in DTS with path and message
2276 """
2277 self._log.debug("Updating VNFR xact = %s, %s:%s",
2278 xact, path, msg)
2279 self.regh.update_element(path, msg)
2280 self._log.debug("Updated VNFR xact = %s, %s:%s",
2281 xact, path, msg)
2282
2283 @asyncio.coroutine
2284 def delete(self, xact, path):
2285 """
2286 Delete a VNFR record in DTS with path and message
2287 """
2288 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2289 self.regh.delete_element(path)
2290 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2291
2292
2293 class VnfdRefCountDtsHandler(object):
2294 """ The VNFD Ref Count DTS handler """
2295 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2296
2297 def __init__(self, dts, log, loop, vnfm):
2298 self._dts = dts
2299 self._log = log
2300 self._loop = loop
2301 self._vnfm = vnfm
2302
2303 self._regh = None
2304
2305 @property
2306 def regh(self):
2307 """ Return registration handle """
2308 return self._regh
2309
2310 @property
2311 def vnfm(self):
2312 """ Return the NS manager instance """
2313 return self._vnfm
2314
2315 @asyncio.coroutine
2316 def register(self):
2317 """ Register for VNFD ref count read from dts """
2318
2319 @asyncio.coroutine
2320 def on_prepare(xact_info, action, ks_path, msg):
2321 """ prepare callback from dts """
2322 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2323 self._log.debug(
2324 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2325 xact_info, action, xpath, msg
2326 )
2327
2328 if action == rwdts.QueryAction.READ:
2329 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2330 path_entry = schema.keyspec_to_entry(ks_path)
2331 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2332 for xpath, msg in vnfd_list:
2333 self._log.debug("Responding to ref count query path:%s, msg:%s",
2334 xpath, msg)
2335 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2336 xpath=xpath,
2337 msg=msg)
2338 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2339 else:
2340 raise VnfRecordError("Not supported operation %s" % action)
2341
2342 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2343 with self._dts.group_create() as group:
2344 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2345 handler=hdl,
2346 flags=rwdts.Flag.PUBLISHER,
2347 )
2348
2349
2350 class VdurDatastore(object):
2351 """
2352 This VdurDatastore is intended to expose select information about a VDUR
2353 such that it can be referenced in a cloud config file. The data that is
2354 exposed does not necessarily follow the structure of the data in the yang
2355 model. This is intentional. The data that are exposed are intended to be
2356 agnostic of the yang model so that changes in the model do not necessarily
2357 require changes to the interface provided to the user. It also means that
2358 the user does not need to be familiar with the RIFT.ware yang models.
2359 """
2360
2361 def __init__(self):
2362 """Create an instance of VdurDatastore"""
2363 self._vdur_data = dict()
2364 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2365
2366 def add(self, vdur):
2367 """Add a new VDUR to the datastore
2368
2369 Arguments:
2370 vdur - a VirtualDeploymentUnitRecord instance
2371
2372 Raises:
2373 A ValueError is raised if the VDUR is (1) None or (2) already in
2374 the datastore.
2375
2376 """
2377 if vdur.vdu_id is None:
2378 raise ValueError('VDURs are required to have an ID')
2379
2380 if vdur.vdu_id in self._vdur_data:
2381 raise ValueError('cannot add a VDUR more than once')
2382
2383 self._vdur_data[vdur.vdu_id] = dict()
2384
2385 def set_if_not_none(key, attr):
2386 if attr is not None:
2387 self._vdur_data[vdur.vdu_id][key] = attr
2388
2389 set_if_not_none('name', vdur._vdud.name)
2390 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2391
2392 def update(self, vdur):
2393 """Update the VDUR information in the datastore
2394
2395 Arguments:
2396 vdur - a GI representation of a VDUR
2397
2398 Raises:
2399 A ValueError is raised if the VDUR is (1) None or (2) already in
2400 the datastore.
2401
2402 """
2403 if vdur.vdu_id is None:
2404 raise ValueError('VNFDs are required to have an ID')
2405
2406 if vdur.vdu_id not in self._vdur_data:
2407 raise ValueError('VNF is not recognized')
2408
2409 def set_or_delete(key, attr):
2410 if attr is None:
2411 if key in self._vdur_data[vdur.vdu_id]:
2412 del self._vdur_data[vdur.vdu_id][key]
2413
2414 else:
2415 self._vdur_data[vdur.vdu_id][key] = attr
2416
2417 set_or_delete('name', vdur._vdud.name)
2418 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2419
2420 def remove(self, vdur_id):
2421 """Remove all of the data associated with specified VDUR
2422
2423 Arguments:
2424 vdur_id - the identifier of a VNFD in the datastore
2425
2426 Raises:
2427 A ValueError is raised if the VDUR is not contained in the
2428 datastore.
2429
2430 """
2431 if vdur_id not in self._vdur_data:
2432 raise ValueError('VNF is not recognized')
2433
2434 del self._vdur_data[vdur_id]
2435
2436 def get(self, expr):
2437 """Retrieve VDUR information from the datastore
2438
2439 An expression should be of the form,
2440
2441 vdu[<id>].<attr>
2442
2443 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2444 the exposed attribute that the user wishes to retrieve.
2445
2446 If the requested data is not available, None is returned.
2447
2448 Arguments:
2449 expr - a string that specifies the data to return
2450
2451 Raises:
2452 A ValueError is raised if the provided expression cannot be parsed.
2453
2454 Returns:
2455 The requested data or None
2456
2457 """
2458 result = self._pattern.match(expr)
2459 if result is None:
2460 raise ValueError('data expression not recognized ({})'.format(expr))
2461
2462 vdur_id, key = result.groups()
2463
2464 if vdur_id not in self._vdur_data:
2465 return None
2466
2467 return self._vdur_data[vdur_id].get(key, None)
2468
2469
2470 class VnfManager(object):
2471 """ The virtual network function manager class """
2472 def __init__(self, dts, log, loop, cluster_name):
2473 self._dts = dts
2474 self._log = log
2475 self._loop = loop
2476 self._cluster_name = cluster_name
2477
2478 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2479 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2480 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2481 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2482
2483 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2484 self._vnfr_handler,
2485 self._vcs_handler,
2486 self._vnfr_ref_handler,
2487 self._nsr_handler]
2488 self._vnfrs = {}
2489 self._vnfds_to_vnfr = {}
2490 self._nsrs = {}
2491
2492 @property
2493 def vnfr_handler(self):
2494 """ VNFR dts handler """
2495 return self._vnfr_handler
2496
2497 @property
2498 def vcs_handler(self):
2499 """ VCS dts handler """
2500 return self._vcs_handler
2501
2502 @asyncio.coroutine
2503 def register(self):
2504 """ Register all static DTS handlers """
2505 for hdl in self._dts_handlers:
2506 yield from hdl.register()
2507
2508 @asyncio.coroutine
2509 def run(self):
2510 """ Run this VNFM instance """
2511 self._log.debug("Run VNFManager - registering static DTS handlers""")
2512 yield from self.register()
2513
2514 def handle_nsr(self, nsr, action):
2515 if action in [rwdts.QueryAction.CREATE]:
2516 self._nsrs[nsr.id] = nsr
2517 elif action == rwdts.QueryAction.DELETE:
2518 if nsr.id in self._nsrs:
2519 del self._nsrs[nsr.id]
2520
2521 def get_linked_mgmt_network(self, vnfr):
2522 """For the given VNFR get the related mgmt network from the NSD, if
2523 available.
2524 """
2525 vnfd_id = vnfr.vnfd.id
2526 nsr_id = vnfr.nsr_id_ref
2527
2528 # for the given related VNFR, get the corresponding NSR-config
2529 nsr_obj = None
2530 try:
2531 nsr_obj = self._nsrs[nsr_id]
2532 except KeyError:
2533 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2534
2535 # for the related NSD check if a VLD exists such that it's a mgmt
2536 # network
2537 for vld in nsr_obj.nsd.vld:
2538 if vld.mgmt_network:
2539 return vld.name
2540
2541 return None
2542
2543 def get_vnfr(self, vnfr_id):
2544 """ get VNFR by vnfr id """
2545
2546 if vnfr_id not in self._vnfrs:
2547 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2548
2549 return self._vnfrs[vnfr_id]
2550
2551 def create_vnfr(self, vnfr):
2552 """ Create a VNFR instance """
2553 if vnfr.id in self._vnfrs:
2554 msg = "Vnfr id %s already exists" % vnfr.id
2555 self._log.error(msg)
2556 raise VnfRecordError(msg)
2557
2558 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2559 vnfr.id,
2560 vnfr.vnfd.id)
2561
2562 mgmt_network = self.get_linked_mgmt_network(vnfr)
2563
2564 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2565 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2566 mgmt_network=mgmt_network
2567 )
2568
2569 #Update ref count
2570 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2571 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2572 else:
2573 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2574
2575 return self._vnfrs[vnfr.id]
2576
2577 @asyncio.coroutine
2578 def delete_vnfr(self, xact, vnfr):
2579 """ Create a VNFR instance """
2580 if vnfr.vnfr_id in self._vnfrs:
2581 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2582 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2583
2584 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2585 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2586 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2587
2588 del self._vnfrs[vnfr.vnfr_id]
2589
2590 @asyncio.coroutine
2591 def fetch_vnfd(self, vnfd_id):
2592 """ Fetch VNFDs based with the vnfd id"""
2593 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2594 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2595 vnfd = None
2596
2597 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2598
2599 for ent in res_iter:
2600 res = yield from ent
2601 vnfd = res.result
2602
2603 if vnfd is None:
2604 err = "Failed to get Vnfd %s" % vnfd_id
2605 self._log.error(err)
2606 raise VnfRecordError(err)
2607
2608 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2609
2610 return vnfd
2611
2612 def vnfd_in_use(self, vnfd_id):
2613 """ Is this VNFD in use """
2614 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2615 if vnfd_id in self._vnfds_to_vnfr:
2616 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2617 return False
2618
2619 @asyncio.coroutine
2620 def publish_vnfr(self, xact, path, msg):
2621 """ Publish a VNFR """
2622 self._log.debug("publish_vnfr called with path %s, msg %s",
2623 path, msg)
2624 yield from self.vnfr_handler.update(xact, path, msg)
2625
2626 @asyncio.coroutine
2627 def delete_vnfd(self, vnfd_id):
2628 """ Delete the Virtual Network Function descriptor with the passed id """
2629 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2630 if vnfd_id in self._vnfds_to_vnfr:
2631 if self._vnfds_to_vnfr[vnfd_id]:
2632 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2633 vnfd_id,
2634 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2635 raise VirtualNetworkFunctionDescriptorRefCountExists(
2636 "Cannot delete :%s, ref_count:%s",
2637 vnfd_id,
2638 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2639
2640 del self._vnfds_to_vnfr[vnfd_id]
2641
2642 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2643 try:
2644 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2645 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2646 if os.path.exists(vnfd_dir):
2647 shutil.rmtree(vnfd_dir, ignore_errors=True)
2648 except Exception as e:
2649 self._log.error("Exception in cleaning up VNFD {}: {}".
2650 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2651 self._log.exception(e)
2652
2653
2654 def vnfd_refcount_xpath(self, vnfd_id):
2655 """ xpath for ref count entry """
2656 return (VnfdRefCountDtsHandler.XPATH +
2657 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2658
2659 @asyncio.coroutine
2660 def get_vnfd_refcount(self, vnfd_id):
2661 """ Get the vnfd_list from this VNFM"""
2662 vnfd_list = []
2663 if vnfd_id is None or vnfd_id == "":
2664 for vnfd in self._vnfds_to_vnfr.keys():
2665 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2666 vnfd_msg.vnfd_id_ref = vnfd
2667 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2668 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2669 elif vnfd_id in self._vnfds_to_vnfr:
2670 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2671 vnfd_msg.vnfd_id_ref = vnfd_id
2672 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2673 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2674
2675 return vnfd_list
2676
2677
2678 class VnfmTasklet(rift.tasklets.Tasklet):
2679 """ VNF Manager tasklet class """
2680 def __init__(self, *args, **kwargs):
2681 super(VnfmTasklet, self).__init__(*args, **kwargs)
2682 self.rwlog.set_category("rw-mano-log")
2683 self.rwlog.set_subcategory("vnfm")
2684
2685 self._dts = None
2686 self._vnfm = None
2687
2688 def start(self):
2689 try:
2690 super(VnfmTasklet, self).start()
2691 self.log.info("Starting VnfmTasklet")
2692
2693 self.log.setLevel(logging.DEBUG)
2694
2695 self.log.debug("Registering with dts")
2696 self._dts = rift.tasklets.DTS(self.tasklet_info,
2697 RwVnfmYang.get_schema(),
2698 self.loop,
2699 self.on_dts_state_change)
2700
2701 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2702 except Exception:
2703 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2704 raise
2705
2706 def on_instance_started(self):
2707 """ Task insance started callback """
2708 self.log.debug("Got instance started callback")
2709
2710 def stop(self):
2711 try:
2712 self._dts.deinit()
2713 except Exception:
2714 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2715 raise
2716
2717 @asyncio.coroutine
2718 def init(self):
2719 """ Task init callback """
2720 try:
2721 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2722 assert vm_parent_name is not None
2723 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2724 yield from self._vnfm.run()
2725 except Exception:
2726 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2727 raise
2728
2729 @asyncio.coroutine
2730 def run(self):
2731 """ Task run callback """
2732 pass
2733
2734 @asyncio.coroutine
2735 def on_dts_state_change(self, state):
2736 """Take action according to current dts state to transition
2737 application into the corresponding application state
2738
2739 Arguments
2740 state - current dts state
2741 """
2742 switch = {
2743 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2744 rwdts.State.CONFIG: rwdts.State.RUN,
2745 }
2746
2747 handlers = {
2748 rwdts.State.INIT: self.init,
2749 rwdts.State.RUN: self.run,
2750 }
2751
2752 # Transition application to next state
2753 handler = handlers.get(state, None)
2754 if handler is not None:
2755 yield from handler()
2756
2757 # Transition dts to next state
2758 next_state = switch.get(state, None)
2759 if next_state is not None:
2760 self._dts.handle.set_state(next_state)