Bug 140
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54
55
56 class VMResourceError(Exception):
57 """ VM resource Error"""
58 pass
59
60
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
63 pass
64
65
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
68 pass
69
70
71 class NotImplemented(Exception):
72 """Not implemented """
73 pass
74
75
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
78 pass
79
80
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
83 pass
84
85
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
88 pass
89
90
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
93 pass
94
95
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
98 pass
99
100
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
103 pass
104
105
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
108 pass
109
110
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
113 pass
114
115
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
118 pass
119
120
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
123 pass
124
125
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
128 pass
129
130
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
133 pass
134
135
136 class VNFMPlacementGroupError(Exception):
137 pass
138
139 class VirtualNetworkFunctionRecordState(enum.Enum):
140 """ VNFR state """
141 INIT = 1
142 VL_INIT_PHASE = 2
143 VM_INIT_PHASE = 3
144 READY = 4
145 TERMINATE = 5
146 VL_TERMINATE_PHASE = 6
147 VDU_TERMINATE_PHASE = 7
148 TERMINATED = 7
149 FAILED = 10
150
151
152 class VDURecordState(enum.Enum):
153 """VDU record state """
154 INIT = 1
155 INSTANTIATING = 2
156 RESOURCE_ALLOC_PENDING = 3
157 READY = 4
158 TERMINATING = 5
159 TERMINATED = 6
160 FAILED = 10
161
162
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
166 self._dts = dts
167 self._log = log
168 self._loop = loop
169 self._component = component
170 self._cluster_name = cluster_name
171 self._vcs_handler = vcs_handler
172 self._mangled_name = mangled_name
173
174 @staticmethod
175 def mangle_name(component_name, vnf_name, vnfd_id):
176 """ mangled component name """
177 return vnf_name + ":" + component_name + ":" + vnfd_id
178
179 @property
180 def name(self):
181 """ name of this component"""
182 return self._mangled_name
183
184 @property
185 def path(self):
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self.name)
191
192 @property
193 def instance_xpath(self):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
196 "/instances" +
197 "/instance" +
198 "[instance-name = '{}']".format(self._cluster_name))
199
200 @property
201 def start_comp_xpath(self):
202 """ start component xpath """
203 return (self.instance_xpath +
204 "/child-n[instance-name = 'START-REQ']")
205
206 def get_start_comp_msg(self, ip_address):
207 """ start this component """
208 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
209 start_msg.instance_name = 'START-REQ'
210 start_msg.component_name = self.name
211 start_msg.admin_command = "START"
212 start_msg.ip_address = ip_address
213
214 return start_msg
215
216 @property
217 def msg(self):
218 """ Returns the message for this vcs component"""
219
220 vcs_comp_dict = self._component.as_dict()
221
222 def mangle_comp_names(comp_dict):
223 """ mangle component name with VNF name, id"""
224 for key, val in comp_dict.items():
225 if isinstance(val, dict):
226 comp_dict[key] = mangle_comp_names(val)
227 elif isinstance(val, list):
228 i = 0
229 for ent in val:
230 if isinstance(ent, dict):
231 val[i] = mangle_comp_names(ent)
232 else:
233 val[i] = ent
234 i += 1
235 elif key == "component_name":
236 comp_dict[key] = VcsComponent.mangle_name(val,
237 self._vnfd_name,
238 self._vnfd_id)
239 return comp_dict
240
241 mangled_dict = mangle_comp_names(vcs_comp_dict)
242 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
243 return msg
244
245 @asyncio.coroutine
246 def publish(self, xact):
247 """ Publishes the VCS component """
248 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self.name, self.path, self.msg)
250 yield from self._vcs_handler.publish(xact, self.path, self.msg)
251
252 @asyncio.coroutine
253 def start(self, xact, parent, ip_addr=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg = self.get_start_comp_msg(ip_addr)
257 self._log.debug("starting component %s %s",
258 self.start_comp_xpath, start_msg)
259 yield from self._dts.query_create(self.start_comp_xpath,
260 0,
261 start_msg)
262 self._log.debug("started component %s, %s",
263 self.start_comp_xpath, start_msg)
264
265
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
268 def __init__(self,
269 dts,
270 log,
271 loop,
272 vdud,
273 vnfr,
274 mgmt_intf,
275 mgmt_network,
276 cloud_account_name,
277 vnfd_package_store,
278 vdur_id=None,
279 placement_groups=[]):
280 self._dts = dts
281 self._log = log
282 self._loop = loop
283 self._vdud = vdud
284 self._vnfr = vnfr
285 self._mgmt_intf = mgmt_intf
286 self._cloud_account_name = cloud_account_name
287 self._vnfd_package_store = vnfd_package_store
288 self._mgmt_network = mgmt_network
289
290 self._vdur_id = vdur_id or str(uuid.uuid4())
291 self._int_intf = []
292 self._ext_intf = []
293 self._state = VDURecordState.INIT
294 self._state_failed_reason = None
295 self._request_id = str(uuid.uuid4())
296 self._name = vnfr.name + "__" + vdud.id
297 self._placement_groups = placement_groups
298 self._rm_regh = None
299 self._vm_resp = None
300 self._vdud_cloud_init = None
301 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
302
303 @asyncio.coroutine
304 def vdu_opdata_register(self):
305 yield from self._vdur_console_handler.register()
306
307 def cp_ip_addr(self, cp_name):
308 """ Find ip address by connection point name """
309 if self._vm_resp is not None:
310 for conn_point in self._vm_resp.connection_points:
311 if conn_point.name == cp_name:
312 return conn_point.ip_address
313 return "0.0.0.0"
314
315 def cp_mac_addr(self, cp_name):
316 """ Find mac address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.mac_addr
321 return "00:00:00:00:00:00"
322
323 def cp_id(self, cp_name):
324 """ Find connection point id by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.connection_point_id
329 return ''
330
331 @property
332 def vdu_id(self):
333 return self._vdud.id
334
335 @property
336 def vm_resp(self):
337 return self._vm_resp
338
339 @property
340 def name(self):
341 """ Return this VDUR's name """
342 return self._name
343
344 @property
345 def cloud_account_name(self):
346 """ Cloud account this VDU should be created in """
347 return self._cloud_account_name
348
349 @property
350 def image_name(self):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self._vdud:
353 return None
354 return os.path.basename(self._vdud.image)
355
356 @property
357 def image_checksum(self):
358 """ name that should be used to lookup the image on the CMP """
359 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
360
361 @property
362 def management_ip(self):
363 if not self.active:
364 return None
365 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
366
367 @property
368 def vm_management_ip(self):
369 if not self.active:
370 return None
371 return self._vm_resp.management_ip
372
373 @property
374 def operational_status(self):
375 """ Operational status of this VDU"""
376 op_stats_dict = {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
379 "READY": "running",
380 "FAILED": "failed",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
383 }
384 return op_stats_dict[self._state.name]
385
386 @property
387 def msg(self):
388 """ Process VDU message from resmgr"""
389 vdu_fields = ["vm_flavor",
390 "guest_epa",
391 "vswitch_epa",
392 "hypervisor_epa",
393 "host_epa",
394 "volumes",
395 "name"]
396 vdu_copy_dict = {k: v for k, v in
397 self._vdud.as_dict().items() if k in vdu_fields}
398 vdur_dict = {"id": self._vdur_id,
399 "vdu_id_ref": self._vdud.id,
400 "operational_status": self.operational_status,
401 "operational_status_details": self._state_failed_reason,
402 }
403 if self.vm_resp is not None:
404 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
405 "flavor_id": self.vm_resp.flavor_id
406 })
407 if self._vm_resp.has_field('image_id'):
408 vdur_dict.update({ "image_id": self.vm_resp.image_id })
409
410 if self.management_ip is not None:
411 vdur_dict["management_ip"] = self.management_ip
412
413 if self.vm_management_ip is not None:
414 vdur_dict["vm_management_ip"] = self.vm_management_ip
415
416 vdur_dict.update(vdu_copy_dict)
417
418 if self.vm_resp is not None:
419 if self._vm_resp.has_field('volumes'):
420 for opvolume in self._vm_resp.volumes:
421 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
422 if len(vdurvol_data) == 1:
423 vdurvol_data[0]["volume_id"] = opvolume.volume_id
424 if opvolume.has_field('custom_meta_data'):
425 metadata_list = list()
426 for metadata_item in opvolume.custom_meta_data:
427 metadata_list.append(metadata_item.as_dict())
428 if 'guest_params' not in vdurvol_data[0]:
429 vdurvol_data[0]['guest_params'] = dict()
430 vdurvol_data[0]['guest_params']['custom_meta_data'] = metadata_list
431
432 if self._vm_resp.has_field('custom_boot_data'):
433 vdur_dict['custom_boot_data'] = dict()
434 if self._vm_resp.custom_boot_data.has_field('custom_drive'):
435 vdur_dict['custom_boot_data']['custom_drive'] = self._vm_resp.custom_boot_data.custom_drive
436 if self._vm_resp.custom_boot_data.has_field('custom_meta_data'):
437 metadata_list = list()
438 for metadata_item in self._vm_resp.custom_boot_data.custom_meta_data:
439 metadata_list.append(metadata_item.as_dict())
440 vdur_dict['custom_boot_data']['custom_meta_data'] = metadata_list
441 if self._vm_resp.custom_boot_data.has_field('custom_config_files'):
442 file_list = list()
443 for file_item in self._vm_resp.custom_boot_data.custom_config_files:
444 file_list.append(file_item.as_dict())
445 vdur_dict['custom_boot_data']['custom_config_files'] = file_list
446
447 icp_list = []
448 ii_list = []
449
450 for intf, cp_id, vlr in self._int_intf:
451 cp = self.find_internal_cp_by_cp_id(cp_id)
452
453 icp_list.append({"name": cp.name,
454 "id": cp.id,
455 "type_yang": "VPORT",
456 "ip_address": self.cp_ip_addr(cp.id),
457 "mac_address": self.cp_mac_addr(cp.id)})
458
459 ii_list.append({"name": intf.name,
460 "vdur_internal_connection_point_ref": cp.id,
461 "virtual_interface": {}})
462
463 vdur_dict["internal_connection_point"] = icp_list
464 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
465 vdur_dict["internal_interface"] = ii_list
466
467 ei_list = []
468 for intf, cp, vlr in self._ext_intf:
469 ei_list.append({"name": cp.name,
470 "vnfd_connection_point_ref": cp.name,
471 "virtual_interface": {}})
472 self._vnfr.update_cp(cp.name,
473 self.cp_ip_addr(cp.name),
474 self.cp_mac_addr(cp.name),
475 self.cp_id(cp.name))
476
477 vdur_dict["external_interface"] = ei_list
478
479 placement_groups = []
480 for group in self._placement_groups:
481 placement_groups.append(group.as_dict())
482 vdur_dict['placement_groups_info'] = placement_groups
483
484 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
485
486 @property
487 def resmgr_path(self):
488 """ path for resource-mgr"""
489 return ("D,/rw-resource-mgr:resource-mgmt" +
490 "/vdu-event" +
491 "/vdu-event-data[event-id='{}']".format(self._request_id))
492
493 @property
494 def vm_flavor_msg(self):
495 """ VM flavor message """
496 flavor = self._vdud.vm_flavor.__class__()
497 flavor.copy_from(self._vdud.vm_flavor)
498
499 return flavor
500
501 @property
502 def vdud_cloud_init(self):
503 """ Return the cloud-init contents for the VDU """
504 if self._vdud_cloud_init is None:
505 self._vdud_cloud_init = self.cloud_init()
506
507 return self._vdud_cloud_init
508
509 def cloud_init(self):
510 """ Populate cloud_init with cloud-config script from
511 either the inline contents or from the file provided
512 """
513 if self._vdud.cloud_init is not None:
514 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
515 return self._vdud.cloud_init
516 elif self._vdud.cloud_init_file is not None:
517 # Get cloud-init script contents from the file provided in the cloud_init_file param
518 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
519 filename = self._vdud.cloud_init_file
520 self._vnfd_package_store.refresh()
521 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
522 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
523 try:
524 return cloud_init_extractor.read_script(stored_package, filename)
525 except rift.package.cloud_init.CloudInitExtractionError as e:
526 raise VirtualDeploymentUnitRecordError(e)
527 else:
528 self._log.debug("VDU Instantiation: cloud-init script not provided")
529
530 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
531 host_aggregates = []
532 availability_zones = []
533 server_groups = []
534 for group in self._placement_groups:
535 if group.has_field('host_aggregate'):
536 for aggregate in group.host_aggregate:
537 host_aggregates.append(aggregate.as_dict())
538 if group.has_field('availability_zone'):
539 availability_zones.append(group.availability_zone.as_dict())
540 if group.has_field('server_group'):
541 server_groups.append(group.server_group.as_dict())
542
543 if availability_zones:
544 if len(availability_zones) > 1:
545 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
546 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
547 else:
548 vm_create_msg_dict['availability_zone'] = availability_zones[0]
549
550 if server_groups:
551 if len(server_groups) > 1:
552 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
553 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
554 else:
555 vm_create_msg_dict['server_group'] = server_groups[0]
556
557 if host_aggregates:
558 vm_create_msg_dict['host_aggregate'] = host_aggregates
559
560 return
561
562 def process_placement_groups(self, vm_create_msg_dict):
563 """Process the placement_groups and fill resource-mgr request"""
564 if not self._placement_groups:
565 return
566
567 cloud_set = set([group.cloud_type for group in self._placement_groups])
568 assert len(cloud_set) == 1
569 cloud_type = cloud_set.pop()
570
571 if cloud_type == 'openstack':
572 self.process_openstack_placement_group_construct(vm_create_msg_dict)
573
574 else:
575 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
576 return
577
578 def process_custom_bootdata(self, vm_create_msg_dict):
579 """Process the custom boot data"""
580 if 'custom_config_files' not in vm_create_msg_dict['custom_boot_data']:
581 return
582
583 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
584 script_extractor = rift.package.script.PackageScriptExtractor(self._log)
585 for custom_file_item in vm_create_msg_dict['custom_boot_data']['custom_config_files']:
586 if 'source' not in custom_file_item or 'dest' not in custom_file_item:
587 continue
588 source = custom_file_item['source']
589 # Find source file in scripts dir of VNFD
590 self._vnfd_package_store.refresh()
591 self._log.debug("Checking for source config file at %s", source)
592 try:
593 source_file_str = script_extractor.read_script(stored_package, source)
594 except rift.package.script.ScriptExtractionError as e:
595 raise VirtualDeploymentUnitRecordError(e)
596 # Update source file location with file contents
597 custom_file_item['source'] = source_file_str
598
599 return
600
601 def resmgr_msg(self, config=None):
602 vdu_fields = ["vm_flavor",
603 "guest_epa",
604 "vswitch_epa",
605 "hypervisor_epa",
606 "host_epa",
607 "volumes",
608 "custom_boot_data"]
609
610 self._log.debug("Creating params based on VDUD: %s", self._vdud)
611 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
612
613 vm_create_msg_dict = {
614 "name": self.name,
615 }
616
617 if self.image_name is not None:
618 vm_create_msg_dict["image_name"] = self.image_name
619
620 if self.image_checksum is not None:
621 vm_create_msg_dict["image_checksum"] = self.image_checksum
622
623 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
624 if self._vdud.has_field('mgmt_vpci'):
625 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
626
627 self._log.debug("VDUD: %s", self._vdud)
628 if config is not None:
629 vm_create_msg_dict['vdu_init'] = {'userdata': config}
630
631 if self._mgmt_network:
632 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
633
634 cp_list = []
635 for intf, cp, vlr in self._ext_intf:
636 cp_info = {"name": cp.name,
637 "virtual_link_id": vlr.network_id,
638 "type_yang": intf.virtual_interface.type_yang,
639 "port_security_enabled": cp.port_security_enabled}
640
641 if (intf.virtual_interface.has_field('vpci') and
642 intf.virtual_interface.vpci is not None):
643 cp_info["vpci"] = intf.virtual_interface.vpci
644
645 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
646 cp_info['security_group'] = vlr.ip_profile_params.security_group
647
648 cp_list.append(cp_info)
649
650 for intf, cp, vlr in self._int_intf:
651 if (intf.virtual_interface.has_field('vpci') and
652 intf.virtual_interface.vpci is not None):
653 cp_list.append({"name": cp,
654 "virtual_link_id": vlr.network_id,
655 "type_yang": intf.virtual_interface.type_yang,
656 "vpci": intf.virtual_interface.vpci})
657 else:
658 cp_list.append({"name": cp,
659 "virtual_link_id": vlr.network_id,
660 "type_yang": intf.virtual_interface.type_yang,
661 "port_security_enabled": cp.port_security_enabled})
662
663 vm_create_msg_dict["connection_points"] = cp_list
664 vm_create_msg_dict.update(vdu_copy_dict)
665
666 self.process_placement_groups(vm_create_msg_dict)
667 if 'custom_boot_data' in vm_create_msg_dict:
668 self.process_custom_bootdata(vm_create_msg_dict)
669
670 msg = RwResourceMgrYang.VDUEventData()
671 msg.event_id = self._request_id
672 msg.cloud_account = self.cloud_account_name
673 msg.request_info.from_dict(vm_create_msg_dict)
674
675 return msg
676
677 @asyncio.coroutine
678 def terminate(self, xact):
679 """ Delete resource in VIM """
680 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
681 self._log.warning("VDU terminate in not ready state - Ignoring request")
682 return
683
684 self._state = VDURecordState.TERMINATING
685 if self._vm_resp is not None:
686 try:
687 with self._dts.transaction() as new_xact:
688 yield from self.delete_resource(new_xact)
689 except Exception:
690 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
691
692 if self._rm_regh is not None:
693 self._log.debug("Deregistering resource manager registration handle")
694 self._rm_regh.deregister()
695 self._rm_regh = None
696
697 if self._vdur_console_handler is not None:
698 self._log.error("Deregistering vnfr vdur registration handle")
699 self._vdur_console_handler._regh.deregister()
700 self._vdur_console_handler._regh = None
701
702 self._state = VDURecordState.TERMINATED
703
704 def find_internal_cp_by_cp_id(self, cp_id):
705 """ Find the CP corresponding to the connection point id"""
706 cp = None
707
708 self._log.debug("find_internal_cp_by_cp_id(%s) called",
709 cp_id)
710
711 for int_cp in self._vdud.internal_connection_point:
712 self._log.debug("Checking for int cp %s in internal connection points",
713 int_cp.id)
714 if int_cp.id == cp_id:
715 cp = int_cp
716 break
717
718 if cp is None:
719 self._log.debug("Failed to find cp %s in internal connection points",
720 cp_id)
721 msg = "Failed to find cp %s in internal connection points" % cp_id
722 raise VduRecordError(msg)
723
724 # return the VLR associated with the connection point
725 return cp
726
727 @asyncio.coroutine
728 def create_resource(self, xact, vnfr, config=None):
729 """ Request resource from ResourceMgr """
730 def find_cp_by_name(cp_name):
731 """ Find a connection point by name """
732 cp = None
733 self._log.debug("find_cp_by_name(%s) called", cp_name)
734 for ext_cp in vnfr._cprs:
735 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
736 if ext_cp.name == cp_name:
737 cp = ext_cp
738 break
739 if cp is None:
740 self._log.debug("Failed to find cp %s in external connection points",
741 cp_name)
742 return cp
743
744 def find_internal_vlr_by_cp_name(cp_name):
745 """ Find the VLR corresponding to the connection point name"""
746 cp = None
747
748 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
749 cp_name)
750
751 for int_cp in self._vdud.internal_connection_point:
752 self._log.debug("Checking for int cp %s in internal connection points",
753 int_cp.id)
754 if int_cp.id == cp_name:
755 cp = int_cp
756 break
757
758 if cp is None:
759 self._log.debug("Failed to find cp %s in internal connection points",
760 cp_name)
761 msg = "Failed to find cp %s in internal connection points" % cp_name
762 raise VduRecordError(msg)
763
764 # return the VLR associated with the connection point
765 return vnfr.find_vlr_by_cp(cp_name)
766
767 block = xact.block_create()
768
769 self._log.debug("Executing vm request id: %s, action: create",
770 self._request_id)
771
772 # Resolve the networks associated external interfaces
773 for ext_intf in self._vdud.external_interface:
774 self._log.debug("Resolving external interface name [%s], cp[%s]",
775 ext_intf.name, ext_intf.vnfd_connection_point_ref)
776 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
777 if cp is None:
778 self._log.debug("Failed to find connection point - %s",
779 ext_intf.vnfd_connection_point_ref)
780 continue
781 self._log.debug("Connection point name [%s], type[%s]",
782 cp.name, cp.type_yang)
783
784 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
785
786 etuple = (ext_intf, cp, vlr)
787 self._ext_intf.append(etuple)
788
789 self._log.debug("Created external interface tuple : %s", etuple)
790
791 # Resolve the networks associated internal interfaces
792 for intf in self._vdud.internal_interface:
793 cp_id = intf.vdu_internal_connection_point_ref
794 self._log.debug("Resolving internal interface name [%s], cp[%s]",
795 intf.name, cp_id)
796
797 try:
798 vlr = find_internal_vlr_by_cp_name(cp_id)
799 except Exception as e:
800 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
801 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
802 raise VduRecordError(msg)
803
804 ituple = (intf, cp_id, vlr)
805 self._int_intf.append(ituple)
806
807 self._log.debug("Created internal interface tuple : %s", ituple)
808
809 resmgr_path = self.resmgr_path
810 resmgr_msg = self.resmgr_msg(config)
811
812 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
813 block.add_query_create(resmgr_path, resmgr_msg)
814
815 res_iter = yield from block.execute(now=True)
816
817 resp = None
818
819 for i in res_iter:
820 r = yield from i
821 resp = r.result
822
823 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
824 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
825 self._log.debug("Got vm request response: %s", resp.resource_info)
826 return resp.resource_info
827
828 @asyncio.coroutine
829 def delete_resource(self, xact):
830 block = xact.block_create()
831
832 self._log.debug("Executing vm request id: %s, action: delete",
833 self._request_id)
834
835 block.add_query_delete(self.resmgr_path)
836
837 yield from block.execute(flags=0, now=True)
838
839 @asyncio.coroutine
840 def read_resource(self, xact):
841 block = xact.block_create()
842
843 self._log.debug("Executing vm request id: %s, action: delete",
844 self._request_id)
845
846 block.add_query_read(self.resmgr_path)
847
848 res_iter = yield from block.execute(flags=0, now=True)
849 for i in res_iter:
850 r = yield from i
851 resp = r.result
852
853 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
854 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
855 self._log.debug("Got vm request response: %s", resp.resource_info)
856 #self._vm_resp = resp.resource_info
857 return resp.resource_info
858
859
860 @asyncio.coroutine
861 def start_component(self):
862 """ This VDUR is active """
863 self._log.debug("Starting component %s for vdud %s vdur %s",
864 self._vdud.vcs_component_ref,
865 self._vdud,
866 self._vdur_id)
867 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
868 self.vm_resp.management_ip)
869
870 @property
871 def active(self):
872 """ Is this VDU active """
873 return True if self._state is VDURecordState.READY else False
874
875 @asyncio.coroutine
876 def instantiation_failed(self, failed_reason=None):
877 """ VDU instantiation failed """
878 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
879 self._state = VDURecordState.FAILED
880 self._state_failed_reason = failed_reason
881 yield from self._vnfr.instantiation_failed(failed_reason)
882
883 @asyncio.coroutine
884 def vdu_is_active(self):
885 """ This VDU is active"""
886 if self.active:
887 self._log.warning("VDU %s was already marked as active", self._vdur_id)
888 return
889
890 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
891
892 if self._vdud.vcs_component_ref is not None:
893 yield from self.start_component()
894
895 self._state = VDURecordState.READY
896
897 if self._vnfr.all_vdus_active():
898 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
899 yield from self._vnfr.is_ready()
900
901 @asyncio.coroutine
902 def instantiate(self, xact, vnfr, config=None):
903 """ Instantiate this VDU """
904 self._state = VDURecordState.INSTANTIATING
905
906 @asyncio.coroutine
907 def on_prepare(xact_info, query_action, ks_path, msg):
908 """ This VDUR is active """
909 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
910 query_action,
911 ks_path,
912 msg)
913
914 if (query_action == rwdts.QueryAction.UPDATE or
915 query_action == rwdts.QueryAction.CREATE):
916 self._vm_resp = msg
917
918 if msg.resource_state == "active":
919 # Move this VDU to ready state
920 yield from self.vdu_is_active()
921 elif msg.resource_state == "failed":
922 yield from self.instantiation_failed(msg.resource_errors)
923 elif query_action == rwdts.QueryAction.DELETE:
924 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
925 else:
926 raise NotImplementedError(
927 "%s action on VirtualDeployementUnitRecord not supported",
928 query_action)
929
930 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
931
932 try:
933 reg_event = asyncio.Event(loop=self._loop)
934
935 @asyncio.coroutine
936 def on_ready(regh, status):
937 reg_event.set()
938
939 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
940 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
941 flags=rwdts.Flag.SUBSCRIBER,
942 handler=handler)
943 yield from reg_event.wait()
944
945 vm_resp = yield from self.create_resource(xact, vnfr, config)
946 self._vm_resp = vm_resp
947
948 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
949 self._log.debug("Requested VM from resource manager response %s",
950 vm_resp)
951 if vm_resp.resource_state == "active":
952 self._log.debug("Resourcemgr responded wih an active vm resp %s",
953 vm_resp)
954 yield from self.vdu_is_active()
955 self._state = VDURecordState.READY
956 elif (vm_resp.resource_state == "pending" or
957 vm_resp.resource_state == "inactive"):
958 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
959 vm_resp)
960 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
961 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
962 # flags=rwdts.Flag.SUBSCRIBER,
963 # handler=handler)
964 else:
965 self._log.debug("Resourcemgr responded wih an error vm resp %s",
966 vm_resp)
967 raise VirtualDeploymentUnitRecordError(
968 "Failed VDUR instantiation %s " % vm_resp)
969
970 except Exception as e:
971 import traceback
972 traceback.print_exc()
973 self._log.exception(e)
974 self._log.error("Instantiation of VDU record failed: %s", str(e))
975 self._state = VDURecordState.FAILED
976 yield from self.instantiation_failed(str(e))
977
978
979 class VlRecordState(enum.Enum):
980 """ VL Record State """
981 INIT = 101
982 INSTANTIATION_PENDING = 102
983 ACTIVE = 103
984 TERMINATE_PENDING = 104
985 TERMINATED = 105
986 FAILED = 106
987
988
989 class InternalVirtualLinkRecord(object):
990 """ Internal Virtual Link record """
991 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
992 self._dts = dts
993 self._log = log
994 self._loop = loop
995 self._ivld_msg = ivld_msg
996 self._vnfr_name = vnfr_name
997 self._cloud_account_name = cloud_account_name
998 self._ip_profile = ip_profile
999
1000 self._vlr_req = self.create_vlr()
1001 self._vlr = None
1002 self._state = VlRecordState.INIT
1003
1004 @property
1005 def vlr_id(self):
1006 """ Find VLR by id """
1007 return self._vlr_req.id
1008
1009 @property
1010 def name(self):
1011 """ Name of this VL """
1012 if self._ivld_msg.vim_network_name:
1013 return self._ivld_msg.vim_network_name
1014 else:
1015 return self._vnfr_name + "." + self._ivld_msg.name
1016
1017 @property
1018 def network_id(self):
1019 """ Find VLR by id """
1020 return self._vlr.network_id if self._vlr else None
1021
1022 def vlr_path(self):
1023 """ VLR path for this VLR instance"""
1024 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1025
1026 def create_vlr(self):
1027 """ Create the VLR record which will be instantiated """
1028
1029 vld_fields = ["short_name",
1030 "vendor",
1031 "description",
1032 "version",
1033 "type_yang",
1034 "vim_network_name",
1035 "provider_network"]
1036
1037 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1038
1039 vlr_dict = {"id": str(uuid.uuid4()),
1040 "name": self.name,
1041 "cloud_account": self._cloud_account_name,
1042 }
1043
1044 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1045 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1046
1047 vlr_dict.update(vld_copy_dict)
1048
1049 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1050 return vlr
1051
1052 @asyncio.coroutine
1053 def instantiate(self, xact, restart_mode=False):
1054 """ Instantiate VL """
1055
1056 @asyncio.coroutine
1057 def instantiate_vlr():
1058 """ Instantiate VLR"""
1059 self._log.debug("Create VL with xpath %s and vlr %s",
1060 self.vlr_path(), self._vlr_req)
1061
1062 with self._dts.transaction(flags=0) as xact:
1063 block = xact.block_create()
1064 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1065 self._log.debug("Executing VL create path:%s msg:%s",
1066 self.vlr_path(), self._vlr_req)
1067
1068 res_iter = None
1069 try:
1070 res_iter = yield from block.execute()
1071 except Exception:
1072 self._state = VlRecordState.FAILED
1073 self._log.exception("Caught exception while instantial VL")
1074 raise
1075
1076 for ent in res_iter:
1077 res = yield from ent
1078 self._vlr = res.result
1079
1080 if self._vlr.operational_status == 'failed':
1081 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1082 self._state = VlRecordState.FAILED
1083 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1084
1085 self._log.info("Created VL with xpath %s and vlr %s",
1086 self.vlr_path(), self._vlr)
1087
1088 @asyncio.coroutine
1089 def get_vlr():
1090 """ Get the network id """
1091 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1092 vlr = None
1093 for ent in res_iter:
1094 res = yield from ent
1095 vlr = res.result
1096
1097 if vlr is None:
1098 err = "Failed to get VLR for path %s" % self.vlr_path()
1099 self._log.warn(err)
1100 raise InternalVirtualLinkRecordError(err)
1101 return vlr
1102
1103 self._state = VlRecordState.INSTANTIATION_PENDING
1104
1105 if restart_mode:
1106 vl = yield from get_vlr()
1107 if vl is None:
1108 yield from instantiate_vlr()
1109 else:
1110 yield from instantiate_vlr()
1111
1112 self._state = VlRecordState.ACTIVE
1113
1114 def vlr_in_vns(self):
1115 """ Is there a VLR record in VNS """
1116 if (self._state == VlRecordState.ACTIVE or
1117 self._state == VlRecordState.INSTANTIATION_PENDING or
1118 self._state == VlRecordState.FAILED):
1119 return True
1120
1121 return False
1122
1123 @asyncio.coroutine
1124 def terminate(self, xact):
1125 """Terminate this VL """
1126 if not self.vlr_in_vns():
1127 self._log.debug("Ignoring terminate request for id %s in state %s",
1128 self.vlr_id, self._state)
1129 return
1130
1131 self._log.debug("Terminating VL with path %s", self.vlr_path())
1132 self._state = VlRecordState.TERMINATE_PENDING
1133 block = xact.block_create()
1134 block.add_query_delete(self.vlr_path())
1135 yield from block.execute(flags=0, now=True)
1136 self._state = VlRecordState.TERMINATED
1137 self._log.debug("Terminated VL with path %s", self.vlr_path())
1138
1139
1140 class VirtualNetworkFunctionRecord(object):
1141 """ Virtual Network Function Record """
1142 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1143 self._dts = dts
1144 self._log = log
1145 self._loop = loop
1146 self._cluster_name = cluster_name
1147 self._vnfr_msg = vnfr_msg
1148 self._vnfr_id = vnfr_msg.id
1149 self._vnfd_id = vnfr_msg.vnfd.id
1150 self._vnfm = vnfm
1151 self._vcs_handler = vcs_handler
1152 self._vnfr = vnfr_msg
1153 self._mgmt_network = mgmt_network
1154
1155 self._vnfd = vnfr_msg.vnfd
1156 self._state = VirtualNetworkFunctionRecordState.INIT
1157 self._state_failed_reason = None
1158 self._ext_vlrs = {} # The list of external virtual links
1159 self._vlrs = [] # The list of internal virtual links
1160 self._vdus = [] # The list of vdu
1161 self._vlr_by_cp = {}
1162 self._cprs = []
1163 self._inventory = {}
1164 self._create_time = int(time.time())
1165 self._vnf_mon = None
1166 self._config_status = vnfr_msg.config_status
1167 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1168 self._rw_vnfd = None
1169 self._vnfd_ref_count = 0
1170
1171 def _get_vdur_from_vdu_id(self, vdu_id):
1172 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1173 self._log.debug("Searching through vdus: %s", self._vdus)
1174 for vdu in self._vdus:
1175 self._log.debug("vdu_id: %s", vdu.vdu_id)
1176 if vdu.vdu_id == vdu_id:
1177 return vdu
1178
1179 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1180
1181 @property
1182 def operational_status(self):
1183 """ Operational status of this VNFR """
1184 op_status_map = {"INIT": "init",
1185 "VL_INIT_PHASE": "vl_init_phase",
1186 "VM_INIT_PHASE": "vm_init_phase",
1187 "READY": "running",
1188 "TERMINATE": "terminate",
1189 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1190 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1191 "TERMINATED": "terminated",
1192 "FAILED": "failed", }
1193 return op_status_map[self._state.name]
1194
1195 @staticmethod
1196 def vnfd_xpath(vnfd_id):
1197 """ VNFD xpath associated with this VNFR """
1198 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1199
1200 @property
1201 def vnfd_ref_count(self):
1202 """ Returns the VNFD reference count associated with this VNFR """
1203 return self._vnfd_ref_count
1204
1205 def vnfd_in_use(self):
1206 """ Returns whether vnfd is in use or not """
1207 return True if self._vnfd_ref_count > 0 else False
1208
1209 def vnfd_ref(self):
1210 """ Take a reference on this object """
1211 self._vnfd_ref_count += 1
1212 return self._vnfd_ref_count
1213
1214 def vnfd_unref(self):
1215 """ Release reference on this object """
1216 if self._vnfd_ref_count < 1:
1217 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1218 (self.vnfd.id, self._vnfd_ref_count))
1219 self._log.critical(msg)
1220 raise VnfRecordError(msg)
1221 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1222 self.vnfd.id, self._vnfd_ref_count)
1223 self._vnfd_ref_count -= 1
1224 return self._vnfd_ref_count
1225
1226 @property
1227 def vnfd(self):
1228 """ VNFD for this VNFR """
1229 return self._vnfd
1230
1231 @property
1232 def vnf_name(self):
1233 """ VNFD name associated with this VNFR """
1234 return self.vnfd.name
1235
1236 @property
1237 def name(self):
1238 """ Name of this VNF in the record """
1239 return self._vnfr.name
1240
1241 @property
1242 def cloud_account_name(self):
1243 """ Name of the cloud account this VNFR is instantiated in """
1244 return self._vnfr.cloud_account
1245
1246 @property
1247 def vnfd_id(self):
1248 """ VNFD Id associated with this VNFR """
1249 return self.vnfd.id
1250
1251 @property
1252 def vnfr_id(self):
1253 """ VNFR Id associated with this VNFR """
1254 return self._vnfr_id
1255
1256 @property
1257 def member_vnf_index(self):
1258 """ Member VNF index associated with this VNFR """
1259 return self._vnfr.member_vnf_index_ref
1260
1261 @property
1262 def config_status(self):
1263 """ Config agent status for this VNFR """
1264 return self._config_status
1265
1266 def component_by_name(self, component_name):
1267 """ Find a component by name in the inventory list"""
1268 mangled_name = VcsComponent.mangle_name(component_name,
1269 self.vnf_name,
1270 self.vnfd_id)
1271 return self._inventory[mangled_name]
1272
1273
1274
1275 @asyncio.coroutine
1276 def get_nsr_config(self):
1277 ### Need access to NS instance configuration for runtime resolution.
1278 ### This shall be replaced when deployment flavors are implemented
1279 xpath = "C,/nsr:ns-instance-config"
1280 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1281
1282 for result in results:
1283 entry = yield from result
1284 ns_instance_config = entry.result
1285 for nsr in ns_instance_config.nsr:
1286 if nsr.id == self._vnfr_msg.nsr_id_ref:
1287 return nsr
1288 return None
1289
1290 @asyncio.coroutine
1291 def start_component(self, component_name, ip_addr):
1292 """ Start a component in the VNFR by name """
1293 comp = self.component_by_name(component_name)
1294 yield from comp.start(None, None, ip_addr)
1295
1296 def cp_ip_addr(self, cp_name):
1297 """ Get ip address for connection point """
1298 self._log.debug("cp_ip_addr()")
1299 for cp in self._cprs:
1300 if cp.name == cp_name and cp.ip_address is not None:
1301 return cp.ip_address
1302 return "0.0.0.0"
1303
1304 def mgmt_intf_info(self):
1305 """ Get Management interface info for this VNFR """
1306 mgmt_intf_desc = self.vnfd.mgmt_interface
1307 ip_addr = None
1308 if mgmt_intf_desc.has_field("cp"):
1309 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1310 elif mgmt_intf_desc.has_field("vdu_id"):
1311 try:
1312 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1313 ip_addr = vdur.management_ip
1314 except VDURecordNotFound:
1315 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1316 ip_addr = None
1317 else:
1318 ip_addr = mgmt_intf_desc.ip_address
1319 port = mgmt_intf_desc.port
1320
1321 return ip_addr, port
1322
1323 @property
1324 def msg(self):
1325 """ Message associated with this VNFR """
1326 vnfd_fields = ["short_name", "vendor", "description", "version"]
1327 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1328
1329 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1330 ip_address, port = self.mgmt_intf_info()
1331
1332 if ip_address is not None:
1333 mgmt_intf.ip_address = ip_address
1334 if port is not None:
1335 mgmt_intf.port = port
1336
1337 vnfr_dict = {"id": self._vnfr_id,
1338 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1339 "name": self.name,
1340 "member_vnf_index_ref": self.member_vnf_index,
1341 "operational_status": self.operational_status,
1342 "operational_status_details": self._state_failed_reason,
1343 "cloud_account": self.cloud_account_name,
1344 "config_status": self._config_status
1345 }
1346
1347 vnfr_dict.update(vnfd_copy_dict)
1348
1349 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1350 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1351
1352 vnfr_msg.create_time = self._create_time
1353 vnfr_msg.uptime = int(time.time()) - self._create_time
1354 vnfr_msg.mgmt_interface = mgmt_intf
1355
1356 # Add all the VLRs to VNFR
1357 for vlr in self._vlrs:
1358 ivlr = vnfr_msg.internal_vlr.add()
1359 ivlr.vlr_ref = vlr.vlr_id
1360
1361 # Add all the VDURs to VDUR
1362 if self._vdus is not None:
1363 for vdu in self._vdus:
1364 vdur = vnfr_msg.vdur.add()
1365 vdur.from_dict(vdu.msg.as_dict())
1366
1367 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1368 vnfr_msg.dashboard_url = self.dashboard_url
1369
1370 for cpr in self._cprs:
1371 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1372 vnfr_msg.connection_point.append(new_cp)
1373
1374 if self._vnf_mon is not None:
1375 for monp in self._vnf_mon.msg:
1376 vnfr_msg.monitoring_param.append(
1377 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1378
1379 if self._vnfr.vnf_configuration is not None:
1380 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1381 if (ip_address is not None and
1382 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1383 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1384
1385 for group in self._vnfr_msg.placement_groups_info:
1386 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1387 group_info.from_dict(group.as_dict())
1388 vnfr_msg.placement_groups_info.append(group_info)
1389
1390 return vnfr_msg
1391
1392 @property
1393 def dashboard_url(self):
1394 ip, cfg_port = self.mgmt_intf_info()
1395 protocol = 'http'
1396 http_port = 80
1397 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1398 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1399 protocol = 'https'
1400 http_port = 443
1401 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1402 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1403
1404 url = "{protocol}://{ip_address}:{port}/{path}".format(
1405 protocol=protocol,
1406 ip_address=ip,
1407 port=http_port,
1408 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1409 )
1410
1411 return url
1412
1413 @property
1414 def xpath(self):
1415 """ path for this VNFR """
1416 return("D,/vnfr:vnfr-catalog"
1417 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1418
1419 @asyncio.coroutine
1420 def publish(self, xact):
1421 """ publish this VNFR """
1422 vnfr = self.msg
1423 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1424 self.xpath, self.msg)
1425 vnfr.create_time = self._create_time
1426 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1427 self._log.debug("Published VNFR path = [%s], record = [%s]",
1428 self.xpath, self.msg)
1429
1430 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1431 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1432 if not vld.has_field('ip_profile_ref'):
1433 return None
1434 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1435 return profile[0] if profile else None
1436
1437 @asyncio.coroutine
1438 def create_vls(self):
1439 """ Publish The VLs associated with this VNF """
1440 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1441 self.vnfd_id)
1442 for ivld_msg in self.vnfd.internal_vld:
1443 self._log.debug("Creating internal vld:"
1444 " %s, int_cp_ref = %s",
1445 ivld_msg, ivld_msg.internal_connection_point
1446 )
1447 vlr = InternalVirtualLinkRecord(dts=self._dts,
1448 log=self._log,
1449 loop=self._loop,
1450 ivld_msg=ivld_msg,
1451 vnfr_name=self.name,
1452 cloud_account_name=self.cloud_account_name,
1453 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1454 )
1455 self._vlrs.append(vlr)
1456
1457 for int_cp in ivld_msg.internal_connection_point:
1458 if int_cp.id_ref in self._vlr_by_cp:
1459 msg = ("Connection point %s already "
1460 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1461 raise InternalVirtualLinkRecordError(msg)
1462 self._log.debug("Setting vlr %s to internal cp = %s",
1463 vlr, int_cp.id_ref)
1464 self._vlr_by_cp[int_cp.id_ref] = vlr
1465
1466 @asyncio.coroutine
1467 def instantiate_vls(self, xact, restart_mode=False):
1468 """ Instantiate the VLs associated with this VNF """
1469 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1470 self.vnfd_id)
1471
1472 for vlr in self._vlrs:
1473 self._log.debug("Instantiating VLR %s", vlr)
1474 yield from vlr.instantiate(xact, restart_mode)
1475
1476 def find_vlr_by_cp(self, cp_name):
1477 """ Find the VLR associated with the cp name """
1478 return self._vlr_by_cp[cp_name]
1479
1480 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1481 """
1482 Returns the cloud specific construct for placement group
1483 Arguments:
1484 input_group: VNFD PlacementGroup
1485 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1486 """
1487 copy_dict = ['name', 'requirement', 'strategy']
1488 for group_info in nsr_config.vnfd_placement_group_maps:
1489 if group_info.placement_group_ref == input_group.name and \
1490 group_info.vnfd_id_ref == self.vnfd_id:
1491 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1492 group_dict = {k:v for k,v in
1493 group_info.as_dict().items()
1494 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1495 for param in copy_dict:
1496 group_dict.update({param: getattr(input_group, param)})
1497 group.from_dict(group_dict)
1498 return group
1499 return None
1500
1501 @asyncio.coroutine
1502 def get_vdu_placement_groups(self, vdu):
1503 placement_groups = []
1504 ### Step-1: Get VNF level placement groups
1505 for group in self._vnfr_msg.placement_groups_info:
1506 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1507 #group_info.from_dict(group.as_dict())
1508 placement_groups.append(group)
1509
1510 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1511 nsr_config = yield from self.get_nsr_config()
1512
1513 ### Step-3: Get VDU level placement groups
1514 for group in self.vnfd.placement_groups:
1515 for member_vdu in group.member_vdus:
1516 if member_vdu.member_vdu_ref == vdu.id:
1517 group_info = self.resolve_placement_group_cloud_construct(group,
1518 nsr_config)
1519 if group_info is None:
1520 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1521 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1522 else:
1523 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1524 str(group_info),
1525 vdu.name,
1526 self.vnf_name,
1527 self.member_vnf_index)
1528 placement_groups.append(group_info)
1529
1530 return placement_groups
1531
1532 @asyncio.coroutine
1533 def create_vdus(self, vnfr, restart_mode=False):
1534 """ Create the VDUs associated with this VNF """
1535
1536 def get_vdur_id(vdud):
1537 """Get the corresponding VDUR's id for the VDUD. This is useful in
1538 case of a restart.
1539
1540 In restart mode we check for exiting VDUR's ID and use them, if
1541 available. This way we don't end up creating duplicate VDURs
1542 """
1543 vdur_id = None
1544
1545 if restart_mode and vdud is not None:
1546 try:
1547 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1548 vdur_id = vdur[0]
1549 except IndexError:
1550 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1551
1552 return vdur_id
1553
1554
1555 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1556 for vdu in self._rw_vnfd.vdu:
1557 self._log.debug("Creating vdu: %s", vdu)
1558 vdur_id = get_vdur_id(vdu)
1559
1560 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1561 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1562 vdu.name,
1563 self.vnf_name,
1564 self.member_vnf_index,
1565 [ group.name for group in placement_groups])
1566
1567 vdur = VirtualDeploymentUnitRecord(
1568 dts=self._dts,
1569 log=self._log,
1570 loop=self._loop,
1571 vdud=vdu,
1572 vnfr=vnfr,
1573 mgmt_intf=self.has_mgmt_interface(vdu),
1574 mgmt_network=self._mgmt_network,
1575 cloud_account_name=self.cloud_account_name,
1576 vnfd_package_store=self._vnfd_package_store,
1577 vdur_id=vdur_id,
1578 placement_groups = placement_groups,
1579 )
1580 yield from vdur.vdu_opdata_register()
1581
1582 self._vdus.append(vdur)
1583
1584 @asyncio.coroutine
1585 def instantiate_vdus(self, xact, vnfr):
1586 """ Instantiate the VDUs associated with this VNF """
1587 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1588
1589 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1590
1591 # Identify any dependencies among the VDUs
1592 dependencies = collections.defaultdict(list)
1593 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1594
1595 for vdu in self._vdus:
1596 if vdu.vdud_cloud_init is not None:
1597 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1598 if vdu_id != vdu.vdu_id:
1599 # This means that vdu.vdu_id depends upon vdu_id,
1600 # i.e. vdu_id must be instantiated before
1601 # vdu.vdu_id.
1602 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1603
1604 # Define the terminal states of VDU instantiation
1605 terminal = (
1606 VDURecordState.READY,
1607 VDURecordState.TERMINATED,
1608 VDURecordState.FAILED,
1609 )
1610
1611 datastore = VdurDatastore()
1612 processed = set()
1613
1614 @asyncio.coroutine
1615 def instantiate_monitor(vdu):
1616 """Monitor the state of the VDU during instantiation
1617
1618 Arguments:
1619 vdu - a VirtualDeploymentUnitRecord
1620
1621 """
1622 # wait for the VDUR to enter a terminal state
1623 while vdu._state not in terminal:
1624 yield from asyncio.sleep(1, loop=self._loop)
1625
1626 # update the datastore
1627 datastore.update(vdu)
1628
1629 # add the VDU to the set of processed VDUs
1630 processed.add(vdu.vdu_id)
1631
1632 @asyncio.coroutine
1633 def instantiate(vdu):
1634 """Instantiate the specified VDU
1635
1636 Arguments:
1637 vdu - a VirtualDeploymentUnitRecord
1638
1639 Raises:
1640 if the VDU, or any of the VDUs this VDU depends upon, are
1641 terminated or fail to instantiate properly, a
1642 VirtualDeploymentUnitRecordError is raised.
1643
1644 """
1645 for dependency in dependencies[vdu.vdu_id]:
1646 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1647
1648 while dependency.vdu_id not in processed:
1649 yield from asyncio.sleep(1, loop=self._loop)
1650
1651 if not dependency.active:
1652 raise VirtualDeploymentUnitRecordError()
1653
1654 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1655
1656 # Populate the datastore with the current values of the VDU
1657 datastore.add(vdu)
1658
1659 # Substitute any variables contained in the cloud config script
1660 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1661
1662 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1663 if len(parts) > 1:
1664
1665 # Extract the variable names
1666 variables = list()
1667 for variable in parts[1::2]:
1668 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1669
1670 # Iterate of the variables and substitute values from the
1671 # datastore.
1672 for variable in variables:
1673
1674 # Handle a reference to a VDU by ID
1675 if variable.startswith('vdu['):
1676 value = datastore.get(variable)
1677 if value is None:
1678 msg = "Unable to find a substitute for {} in {} cloud-init script"
1679 raise ValueError(msg.format(variable, vdu.vdu_id))
1680
1681 config = config.replace("{{ %s }}" % variable, value)
1682 continue
1683
1684 # Handle a reference to the current VDU
1685 if variable.startswith('vdu'):
1686 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1687 config = config.replace("{{ %s }}" % variable, value)
1688 continue
1689
1690 # Handle unrecognized variables
1691 msg = 'unrecognized cloud-config variable: {}'
1692 raise ValueError(msg.format(variable))
1693
1694 # Instantiate the VDU
1695 with self._dts.transaction() as xact:
1696 self._log.debug("Instantiating vdu: %s", vdu)
1697 yield from vdu.instantiate(xact, vnfr, config=config)
1698 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1699 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1700 self.vnfr_id, vdu)
1701
1702 # First create a set of tasks to monitor the state of the VDUs and
1703 # report when they have entered a terminal state
1704 for vdu in self._vdus:
1705 self._loop.create_task(instantiate_monitor(vdu))
1706
1707 for vdu in self._vdus:
1708 self._loop.create_task(instantiate(vdu))
1709
1710 def has_mgmt_interface(self, vdu):
1711 # ## TODO: Support additional mgmt_interface type options
1712 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1713 return True
1714 return False
1715
1716 def vlr_xpath(self, vlr_id):
1717 """ vlr xpath """
1718 return(
1719 "D,/vlr:vlr-catalog/"
1720 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1721
1722 def ext_vlr_by_id(self, vlr_id):
1723 """ find ext vlr by id """
1724 return self._ext_vlrs[vlr_id]
1725
1726 @asyncio.coroutine
1727 def publish_inventory(self, xact):
1728 """ Publish the inventory associated with this VNF """
1729 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1730
1731 for component in self._rw_vnfd.component:
1732 self._log.debug("Creating inventory component %s", component)
1733 mangled_name = VcsComponent.mangle_name(component.component_name,
1734 self.vnf_name,
1735 self.vnfd_id
1736 )
1737 comp = VcsComponent(dts=self._dts,
1738 log=self._log,
1739 loop=self._loop,
1740 cluster_name=self._cluster_name,
1741 vcs_handler=self._vcs_handler,
1742 component=component,
1743 mangled_name=mangled_name,
1744 )
1745 if comp.name in self._inventory:
1746 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1747 component, self._vnfd_id)
1748 return
1749 self._log.debug("Adding component %s for vnrf %s",
1750 comp.name, self._vnfr_id)
1751 self._inventory[comp.name] = comp
1752 yield from comp.publish(xact)
1753
1754 def all_vdus_active(self):
1755 """ Are all VDUS in this VNFR active? """
1756 for vdu in self._vdus:
1757 if not vdu.active:
1758 return False
1759
1760 self._log.debug("Inside all_vdus_active. Returning True")
1761 return True
1762
1763 @asyncio.coroutine
1764 def instantiation_failed(self, failed_reason=None):
1765 """ VNFR instantiation failed """
1766 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1767 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1768 self._state_failed_reason = failed_reason
1769
1770 # Update the VNFR with the changed status
1771 yield from self.publish(None)
1772
1773 @asyncio.coroutine
1774 def is_ready(self):
1775 """ This VNF is ready"""
1776 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1777
1778 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1779 self.set_state(VirtualNetworkFunctionRecordState.READY)
1780
1781 else:
1782 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1783
1784 # Update the VNFR with the changed status
1785 yield from self.publish(None)
1786
1787 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1788 """Updated the connection point with ip address"""
1789 for cp in self._cprs:
1790 if cp.name == cp_name:
1791 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1792 cp_name, cp, ip_address, cp_id)
1793 cp.ip_address = ip_address
1794 cp.mac_address = mac_addr
1795 cp.connection_point_id = cp_id
1796 return
1797
1798 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1799 self._log.debug(err)
1800 raise VirtualDeploymentUnitRecordError(err)
1801
1802 def set_state(self, state):
1803 """ Set state for this VNFR"""
1804 self._state = state
1805
1806 @asyncio.coroutine
1807 def instantiate(self, xact, restart_mode=False):
1808 """ instantiate this VNF """
1809 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1810 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1811
1812 @asyncio.coroutine
1813 def fetch_vlrs():
1814 """ Fetch VLRs """
1815 # Iterate over all the connection points in VNFR and fetch the
1816 # associated VLRs
1817
1818 def cpr_from_cp(cp):
1819 """ Creates a record level connection point from the desciptor cp"""
1820 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1821 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1822 cpr_dict = {}
1823 cpr_dict.update(cp_copy_dict)
1824 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1825
1826 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1827 self._vnfr_id, self._vnfr.connection_point)
1828
1829 for cp in self._vnfr.connection_point:
1830 cpr = cpr_from_cp(cp)
1831 self._cprs.append(cpr)
1832 self._log.debug("Adding Connection point record %s ", cp)
1833
1834 vlr_path = self.vlr_xpath(cp.vlr_ref)
1835 self._log.debug("Fetching VLR with path = %s", vlr_path)
1836 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1837 rwdts.XactFlag.MERGE)
1838 for i in res_iter:
1839 r = yield from i
1840 d = r.result
1841 self._ext_vlrs[cp.vlr_ref] = d
1842 cpr.vlr_ref = cp.vlr_ref
1843 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1844
1845 # Increase the VNFD reference count
1846 self.vnfd_ref()
1847
1848 assert self.vnfd
1849
1850 # Fetch External VLRs
1851 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1852 yield from fetch_vlrs()
1853
1854 # Publish inventory
1855 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1856 yield from self.publish_inventory(xact)
1857
1858 # Publish inventory
1859 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1860 yield from self.create_vls()
1861
1862 # publish the VNFR
1863 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1864 yield from self.publish(xact)
1865
1866 # instantiate VLs
1867 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1868 try:
1869 yield from self.instantiate_vls(xact, restart_mode)
1870 except Exception as e:
1871 self._log.exception("VL instantiation failed (%s)", str(e))
1872 yield from self.instantiation_failed(str(e))
1873 return
1874
1875 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1876
1877 # instantiate VDUs
1878 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1879 yield from self.create_vdus(self, restart_mode)
1880
1881 # publish the VNFR
1882 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1883 yield from self.publish(xact)
1884
1885 # instantiate VDUs
1886 # ToDo: Check if this should be prevented during restart
1887 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1888 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1889
1890 # publish the VNFR
1891 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1892 yield from self.publish(xact)
1893
1894 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1895
1896 # create task updating uptime for this vnfr
1897 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1898 self._loop.create_task(self.vnfr_uptime_update(xact))
1899
1900 @asyncio.coroutine
1901 def terminate(self, xact):
1902 """ Terminate this virtual network function """
1903
1904 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1905
1906 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1907
1908 # stop monitoring
1909 if self._vnf_mon is not None:
1910 self._vnf_mon.stop()
1911 self._vnf_mon.deregister()
1912 self._vnf_mon = None
1913
1914 @asyncio.coroutine
1915 def terminate_vls():
1916 """ Terminate VLs in this VNF """
1917 for vl in self._vlrs:
1918 yield from vl.terminate(xact)
1919
1920 @asyncio.coroutine
1921 def terminate_vdus():
1922 """ Terminate VDUS in this VNF """
1923 for vdu in self._vdus:
1924 yield from vdu.terminate(xact)
1925
1926 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1927 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1928 yield from terminate_vls()
1929
1930 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1931 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1932 yield from terminate_vdus()
1933
1934 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1935 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1936
1937 @asyncio.coroutine
1938 def vnfr_uptime_update(self, xact):
1939 while True:
1940 # Return when vnfr state is FAILED or TERMINATED etc
1941 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1942 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1943 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1944 VirtualNetworkFunctionRecordState.READY]:
1945 return
1946 yield from self.publish(xact)
1947 yield from asyncio.sleep(2, loop=self._loop)
1948
1949
1950
1951 class VnfdDtsHandler(object):
1952 """ DTS handler for VNFD config changes """
1953 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1954
1955 def __init__(self, dts, log, loop, vnfm):
1956 self._dts = dts
1957 self._log = log
1958 self._loop = loop
1959 self._vnfm = vnfm
1960 self._regh = None
1961
1962 @asyncio.coroutine
1963 def regh(self):
1964 """ DTS registration handle """
1965 return self._regh
1966
1967 @asyncio.coroutine
1968 def register(self):
1969 """ Register for VNFD configuration"""
1970
1971 def on_apply(dts, acg, xact, action, scratch):
1972 """Apply the configuration"""
1973 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1974 xact, action, scratch)
1975
1976 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1977
1978 @asyncio.coroutine
1979 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1980 """ on prepare callback """
1981 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1982 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1983 fref = ProtobufC.FieldReference.alloc()
1984 fref.goto_whole_message(msg.to_pbcm())
1985
1986 # Handle deletes in prepare_callback
1987 if fref.is_field_deleted():
1988 # Delete an VNFD record
1989 self._log.debug("Deleting VNFD with id %s", msg.id)
1990 if self._vnfm.vnfd_in_use(msg.id):
1991 self._log.debug("Cannot delete VNFD in use - %s", msg)
1992 err = "Cannot delete a VNFD in use - %s" % msg
1993 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1994 # Delete a VNFD record
1995 yield from self._vnfm.delete_vnfd(msg.id)
1996
1997 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1998
1999 self._log.debug(
2000 "Registering for VNFD config using xpath: %s",
2001 VnfdDtsHandler.XPATH,
2002 )
2003 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2004 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2005 self._regh = acg.register(
2006 xpath=VnfdDtsHandler.XPATH,
2007 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2008 on_prepare=on_prepare)
2009
2010
2011 class VcsComponentDtsHandler(object):
2012 """ Vcs Component DTS handler """
2013 XPATH = ("D,/rw-manifest:manifest" +
2014 "/rw-manifest:operational-inventory" +
2015 "/rw-manifest:component")
2016
2017 def __init__(self, dts, log, loop, vnfm):
2018 self._dts = dts
2019 self._log = log
2020 self._loop = loop
2021 self._regh = None
2022 self._vnfm = vnfm
2023
2024 @property
2025 def regh(self):
2026 """ DTS registration handle """
2027 return self._regh
2028
2029 @asyncio.coroutine
2030 def register(self):
2031 """ Registers VCS component dts publisher registration"""
2032 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2033 VcsComponentDtsHandler.XPATH)
2034
2035 hdl = rift.tasklets.DTS.RegistrationHandler()
2036 handlers = rift.tasklets.Group.Handler()
2037 with self._dts.group_create(handler=handlers) as group:
2038 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2039 handler=hdl,
2040 flags=(rwdts.Flag.PUBLISHER |
2041 rwdts.Flag.NO_PREP_READ |
2042 rwdts.Flag.DATASTORE),)
2043
2044 @asyncio.coroutine
2045 def publish(self, xact, path, msg):
2046 """ Publishes the VCS component """
2047 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2048 xact, path, msg)
2049 self.regh.create_element(path, msg)
2050 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2051 VcsComponentDtsHandler.XPATH, xact, path, msg)
2052
2053 class VnfrConsoleOperdataDtsHandler(object):
2054 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2055 @property
2056 def vnfr_vdu_console_xpath(self):
2057 """ path for resource-mgr"""
2058 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2059
2060 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2061 self._dts = dts
2062 self._log = log
2063 self._loop = loop
2064 self._regh = None
2065 self._vnfm = vnfm
2066
2067 self._vnfr_id = vnfr_id
2068 self._vdur_id = vdur_id
2069 self._vdu_id = vdu_id
2070
2071 @asyncio.coroutine
2072 def register(self):
2073 """ Register for VNFR VDU Operational Data read from dts """
2074
2075 @asyncio.coroutine
2076 def on_prepare(xact_info, action, ks_path, msg):
2077 """ prepare callback from dts """
2078 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2079 self._log.debug(
2080 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2081 xact_info, action, xpath, msg
2082 )
2083
2084 if action == rwdts.QueryAction.READ:
2085 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2086 path_entry = schema.keyspec_to_entry(ks_path)
2087 self._log.debug("VDU Opdata path is {}".format(path_entry))
2088 try:
2089 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2090 except VnfRecordError as e:
2091 self._log.error("VNFR id %s not found", self._vnfr_id)
2092 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2093 return
2094 try:
2095 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2096 if not vdur._state == VDURecordState.READY:
2097 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2098 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2099 return
2100 with self._dts.transaction() as new_xact:
2101 resp = yield from vdur.read_resource(new_xact)
2102 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2103 vdur_console.id = self._vdur_id
2104 if resp.console_url:
2105 vdur_console.console_url = resp.console_url
2106 else:
2107 vdur_console.console_url = 'none'
2108 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2109 except Exception:
2110 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2111 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2112 vdur_console.id = self._vdur_id
2113 vdur_console.console_url = 'none'
2114
2115 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2116 xpath=self.vnfr_vdu_console_xpath,
2117 msg=vdur_console)
2118 else:
2119 #raise VnfRecordError("Not supported operation %s" % action)
2120 self._log.error("Not supported operation %s" % action)
2121 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2122 return
2123
2124
2125 self._log.debug("Registering for VNFR VDU using xpath: %s",
2126 self.vnfr_vdu_console_xpath)
2127 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2128 with self._dts.group_create() as group:
2129 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2130 handler=hdl,
2131 flags=rwdts.Flag.PUBLISHER,
2132 )
2133
2134
2135 class VnfrDtsHandler(object):
2136 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2137 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2138
2139 def __init__(self, dts, log, loop, vnfm):
2140 self._dts = dts
2141 self._log = log
2142 self._loop = loop
2143 self._vnfm = vnfm
2144
2145 self._regh = None
2146
2147 @property
2148 def regh(self):
2149 """ Return registration handle"""
2150 return self._regh
2151
2152 @property
2153 def vnfm(self):
2154 """ Return VNF manager instance """
2155 return self._vnfm
2156
2157 @asyncio.coroutine
2158 def register(self):
2159 """ Register for vnfr create/update/delete/read requests from dts """
2160 def on_commit(xact_info):
2161 """ The transaction has been committed """
2162 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2163 return rwdts.MemberRspCode.ACTION_OK
2164
2165 def on_abort(*args):
2166 """ Abort callback """
2167 self._log.debug("VNF transaction got aborted")
2168
2169 @asyncio.coroutine
2170 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2171
2172 @asyncio.coroutine
2173 def instantiate_realloc_vnfr(vnfr):
2174 """Re-populate the vnfm after restart
2175
2176 Arguments:
2177 vlink
2178
2179 """
2180
2181 yield from vnfr.instantiate(None, restart_mode=True)
2182
2183 if xact_event == rwdts.MemberEvent.INSTALL:
2184 curr_cfg = self.regh.elements
2185 for cfg in curr_cfg:
2186 vnfr = self.vnfm.create_vnfr(cfg)
2187 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2188
2189 self._log.debug("Got on_event in vnfm")
2190
2191 return rwdts.MemberRspCode.ACTION_OK
2192
2193 @asyncio.coroutine
2194 def on_prepare(xact_info, action, ks_path, msg):
2195 """ prepare callback from dts """
2196 self._log.debug(
2197 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2198 xact_info, action, msg
2199 )
2200
2201 if action == rwdts.QueryAction.CREATE:
2202 if not msg.has_field("vnfd"):
2203 err = "Vnfd not provided"
2204 self._log.error(err)
2205 raise VnfRecordError(err)
2206
2207 vnfr = self.vnfm.create_vnfr(msg)
2208 try:
2209 # RIFT-9105: Unable to add a READ query under an existing transaction
2210 # xact = xact_info.xact
2211 yield from vnfr.instantiate(None)
2212 except Exception as e:
2213 self._log.exception(e)
2214 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2215 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2216 yield from vnfr.publish(None)
2217 elif action == rwdts.QueryAction.DELETE:
2218 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2219 path_entry = schema.keyspec_to_entry(ks_path)
2220 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2221
2222 if vnfr is None:
2223 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2224 raise VirtualNetworkFunctionRecordNotFound(
2225 "VNFR id %s", path_entry.key00.id)
2226
2227 try:
2228 yield from vnfr.terminate(xact_info.xact)
2229 # Unref the VNFD
2230 vnfr.vnfd_unref()
2231 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2232 except Exception as e:
2233 self._log.exception(e)
2234 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2235
2236 elif action == rwdts.QueryAction.UPDATE:
2237 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2238 path_entry = schema.keyspec_to_entry(ks_path)
2239 vnfr = None
2240 try:
2241 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2242 except Exception as e:
2243 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2244 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2245 return
2246
2247 if vnfr is None:
2248 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2249 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2250 return
2251
2252 self._log.debug("VNFR {} update config status {} (current {})".
2253 format(vnfr.name, msg.config_status, vnfr.config_status))
2254 # Update the config status and publish
2255 vnfr._config_status = msg.config_status
2256 yield from vnfr.publish(None)
2257
2258 else:
2259 raise NotImplementedError(
2260 "%s action on VirtualNetworkFunctionRecord not supported",
2261 action)
2262
2263 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2264
2265 self._log.debug("Registering for VNFR using xpath: %s",
2266 VnfrDtsHandler.XPATH,)
2267
2268 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2269 on_prepare=on_prepare,)
2270 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2271 with self._dts.group_create(handler=handlers) as group:
2272 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2273 handler=hdl,
2274 flags=(rwdts.Flag.PUBLISHER |
2275 rwdts.Flag.NO_PREP_READ |
2276 rwdts.Flag.CACHE |
2277 rwdts.Flag.DATASTORE),)
2278
2279 @asyncio.coroutine
2280 def create(self, xact, path, msg):
2281 """
2282 Create a VNFR record in DTS with path and message
2283 """
2284 self._log.debug("Creating VNFR xact = %s, %s:%s",
2285 xact, path, msg)
2286
2287 self.regh.create_element(path, msg)
2288 self._log.debug("Created VNFR xact = %s, %s:%s",
2289 xact, path, msg)
2290
2291 @asyncio.coroutine
2292 def update(self, xact, path, msg):
2293 """
2294 Update a VNFR record in DTS with path and message
2295 """
2296 self._log.debug("Updating VNFR xact = %s, %s:%s",
2297 xact, path, msg)
2298 self.regh.update_element(path, msg)
2299 self._log.debug("Updated VNFR xact = %s, %s:%s",
2300 xact, path, msg)
2301
2302 @asyncio.coroutine
2303 def delete(self, xact, path):
2304 """
2305 Delete a VNFR record in DTS with path and message
2306 """
2307 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2308 self.regh.delete_element(path)
2309 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2310
2311
2312 class VnfdRefCountDtsHandler(object):
2313 """ The VNFD Ref Count DTS handler """
2314 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2315
2316 def __init__(self, dts, log, loop, vnfm):
2317 self._dts = dts
2318 self._log = log
2319 self._loop = loop
2320 self._vnfm = vnfm
2321
2322 self._regh = None
2323
2324 @property
2325 def regh(self):
2326 """ Return registration handle """
2327 return self._regh
2328
2329 @property
2330 def vnfm(self):
2331 """ Return the NS manager instance """
2332 return self._vnfm
2333
2334 @asyncio.coroutine
2335 def register(self):
2336 """ Register for VNFD ref count read from dts """
2337
2338 @asyncio.coroutine
2339 def on_prepare(xact_info, action, ks_path, msg):
2340 """ prepare callback from dts """
2341 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2342 self._log.debug(
2343 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2344 xact_info, action, xpath, msg
2345 )
2346
2347 if action == rwdts.QueryAction.READ:
2348 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2349 path_entry = schema.keyspec_to_entry(ks_path)
2350 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2351 for xpath, msg in vnfd_list:
2352 self._log.debug("Responding to ref count query path:%s, msg:%s",
2353 xpath, msg)
2354 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2355 xpath=xpath,
2356 msg=msg)
2357 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2358 else:
2359 raise VnfRecordError("Not supported operation %s" % action)
2360
2361 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2362 with self._dts.group_create() as group:
2363 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2364 handler=hdl,
2365 flags=rwdts.Flag.PUBLISHER,
2366 )
2367
2368
2369 class VdurDatastore(object):
2370 """
2371 This VdurDatastore is intended to expose select information about a VDUR
2372 such that it can be referenced in a cloud config file. The data that is
2373 exposed does not necessarily follow the structure of the data in the yang
2374 model. This is intentional. The data that are exposed are intended to be
2375 agnostic of the yang model so that changes in the model do not necessarily
2376 require changes to the interface provided to the user. It also means that
2377 the user does not need to be familiar with the RIFT.ware yang models.
2378 """
2379
2380 def __init__(self):
2381 """Create an instance of VdurDatastore"""
2382 self._vdur_data = dict()
2383 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2384
2385 def add(self, vdur):
2386 """Add a new VDUR to the datastore
2387
2388 Arguments:
2389 vdur - a VirtualDeploymentUnitRecord instance
2390
2391 Raises:
2392 A ValueError is raised if the VDUR is (1) None or (2) already in
2393 the datastore.
2394
2395 """
2396 if vdur.vdu_id is None:
2397 raise ValueError('VDURs are required to have an ID')
2398
2399 if vdur.vdu_id in self._vdur_data:
2400 raise ValueError('cannot add a VDUR more than once')
2401
2402 self._vdur_data[vdur.vdu_id] = dict()
2403
2404 def set_if_not_none(key, attr):
2405 if attr is not None:
2406 self._vdur_data[vdur.vdu_id][key] = attr
2407
2408 set_if_not_none('name', vdur._vdud.name)
2409 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2410
2411 def update(self, vdur):
2412 """Update the VDUR information in the datastore
2413
2414 Arguments:
2415 vdur - a GI representation of a VDUR
2416
2417 Raises:
2418 A ValueError is raised if the VDUR is (1) None or (2) already in
2419 the datastore.
2420
2421 """
2422 if vdur.vdu_id is None:
2423 raise ValueError('VNFDs are required to have an ID')
2424
2425 if vdur.vdu_id not in self._vdur_data:
2426 raise ValueError('VNF is not recognized')
2427
2428 def set_or_delete(key, attr):
2429 if attr is None:
2430 if key in self._vdur_data[vdur.vdu_id]:
2431 del self._vdur_data[vdur.vdu_id][key]
2432
2433 else:
2434 self._vdur_data[vdur.vdu_id][key] = attr
2435
2436 set_or_delete('name', vdur._vdud.name)
2437 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2438
2439 def remove(self, vdur_id):
2440 """Remove all of the data associated with specified VDUR
2441
2442 Arguments:
2443 vdur_id - the identifier of a VNFD in the datastore
2444
2445 Raises:
2446 A ValueError is raised if the VDUR is not contained in the
2447 datastore.
2448
2449 """
2450 if vdur_id not in self._vdur_data:
2451 raise ValueError('VNF is not recognized')
2452
2453 del self._vdur_data[vdur_id]
2454
2455 def get(self, expr):
2456 """Retrieve VDUR information from the datastore
2457
2458 An expression should be of the form,
2459
2460 vdu[<id>].<attr>
2461
2462 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2463 the exposed attribute that the user wishes to retrieve.
2464
2465 If the requested data is not available, None is returned.
2466
2467 Arguments:
2468 expr - a string that specifies the data to return
2469
2470 Raises:
2471 A ValueError is raised if the provided expression cannot be parsed.
2472
2473 Returns:
2474 The requested data or None
2475
2476 """
2477 result = self._pattern.match(expr)
2478 if result is None:
2479 raise ValueError('data expression not recognized ({})'.format(expr))
2480
2481 vdur_id, key = result.groups()
2482
2483 if vdur_id not in self._vdur_data:
2484 return None
2485
2486 return self._vdur_data[vdur_id].get(key, None)
2487
2488
2489 class VnfManager(object):
2490 """ The virtual network function manager class """
2491 def __init__(self, dts, log, loop, cluster_name):
2492 self._dts = dts
2493 self._log = log
2494 self._loop = loop
2495 self._cluster_name = cluster_name
2496
2497 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2498 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2499 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2500 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2501
2502 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2503 self._vnfr_handler,
2504 self._vcs_handler,
2505 self._vnfr_ref_handler,
2506 self._nsr_handler]
2507 self._vnfrs = {}
2508 self._vnfds_to_vnfr = {}
2509 self._nsrs = {}
2510
2511 @property
2512 def vnfr_handler(self):
2513 """ VNFR dts handler """
2514 return self._vnfr_handler
2515
2516 @property
2517 def vcs_handler(self):
2518 """ VCS dts handler """
2519 return self._vcs_handler
2520
2521 @asyncio.coroutine
2522 def register(self):
2523 """ Register all static DTS handlers """
2524 for hdl in self._dts_handlers:
2525 yield from hdl.register()
2526
2527 @asyncio.coroutine
2528 def run(self):
2529 """ Run this VNFM instance """
2530 self._log.debug("Run VNFManager - registering static DTS handlers""")
2531 yield from self.register()
2532
2533 def handle_nsr(self, nsr, action):
2534 if action in [rwdts.QueryAction.CREATE]:
2535 self._nsrs[nsr.id] = nsr
2536 elif action == rwdts.QueryAction.DELETE:
2537 if nsr.id in self._nsrs:
2538 del self._nsrs[nsr.id]
2539
2540 def get_linked_mgmt_network(self, vnfr):
2541 """For the given VNFR get the related mgmt network from the NSD, if
2542 available.
2543 """
2544 vnfd_id = vnfr.vnfd.id
2545 nsr_id = vnfr.nsr_id_ref
2546
2547 # for the given related VNFR, get the corresponding NSR-config
2548 nsr_obj = None
2549 try:
2550 nsr_obj = self._nsrs[nsr_id]
2551 except KeyError:
2552 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2553
2554 # for the related NSD check if a VLD exists such that it's a mgmt
2555 # network
2556 for vld in nsr_obj.nsd.vld:
2557 if vld.mgmt_network:
2558 return vld.name
2559
2560 return None
2561
2562 def get_vnfr(self, vnfr_id):
2563 """ get VNFR by vnfr id """
2564
2565 if vnfr_id not in self._vnfrs:
2566 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2567
2568 return self._vnfrs[vnfr_id]
2569
2570 def create_vnfr(self, vnfr):
2571 """ Create a VNFR instance """
2572 if vnfr.id in self._vnfrs:
2573 msg = "Vnfr id %s already exists" % vnfr.id
2574 self._log.error(msg)
2575 raise VnfRecordError(msg)
2576
2577 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2578 vnfr.id,
2579 vnfr.vnfd.id)
2580
2581 mgmt_network = self.get_linked_mgmt_network(vnfr)
2582
2583 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2584 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2585 mgmt_network=mgmt_network
2586 )
2587
2588 #Update ref count
2589 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2590 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2591 else:
2592 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2593
2594 return self._vnfrs[vnfr.id]
2595
2596 @asyncio.coroutine
2597 def delete_vnfr(self, xact, vnfr):
2598 """ Create a VNFR instance """
2599 if vnfr.vnfr_id in self._vnfrs:
2600 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2601 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2602
2603 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2604 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2605 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2606
2607 del self._vnfrs[vnfr.vnfr_id]
2608
2609 @asyncio.coroutine
2610 def fetch_vnfd(self, vnfd_id):
2611 """ Fetch VNFDs based with the vnfd id"""
2612 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2613 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2614 vnfd = None
2615
2616 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2617
2618 for ent in res_iter:
2619 res = yield from ent
2620 vnfd = res.result
2621
2622 if vnfd is None:
2623 err = "Failed to get Vnfd %s" % vnfd_id
2624 self._log.error(err)
2625 raise VnfRecordError(err)
2626
2627 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2628
2629 return vnfd
2630
2631 def vnfd_in_use(self, vnfd_id):
2632 """ Is this VNFD in use """
2633 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2634 if vnfd_id in self._vnfds_to_vnfr:
2635 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2636 return False
2637
2638 @asyncio.coroutine
2639 def publish_vnfr(self, xact, path, msg):
2640 """ Publish a VNFR """
2641 self._log.debug("publish_vnfr called with path %s, msg %s",
2642 path, msg)
2643 yield from self.vnfr_handler.update(xact, path, msg)
2644
2645 @asyncio.coroutine
2646 def delete_vnfd(self, vnfd_id):
2647 """ Delete the Virtual Network Function descriptor with the passed id """
2648 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2649 if vnfd_id in self._vnfds_to_vnfr:
2650 if self._vnfds_to_vnfr[vnfd_id]:
2651 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2652 vnfd_id,
2653 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2654 raise VirtualNetworkFunctionDescriptorRefCountExists(
2655 "Cannot delete :%s, ref_count:%s",
2656 vnfd_id,
2657 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2658
2659 del self._vnfds_to_vnfr[vnfd_id]
2660
2661 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2662 try:
2663 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2664 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2665 if os.path.exists(vnfd_dir):
2666 shutil.rmtree(vnfd_dir, ignore_errors=True)
2667 except Exception as e:
2668 self._log.error("Exception in cleaning up VNFD {}: {}".
2669 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2670 self._log.exception(e)
2671
2672
2673 def vnfd_refcount_xpath(self, vnfd_id):
2674 """ xpath for ref count entry """
2675 return (VnfdRefCountDtsHandler.XPATH +
2676 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2677
2678 @asyncio.coroutine
2679 def get_vnfd_refcount(self, vnfd_id):
2680 """ Get the vnfd_list from this VNFM"""
2681 vnfd_list = []
2682 if vnfd_id is None or vnfd_id == "":
2683 for vnfd in self._vnfds_to_vnfr.keys():
2684 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2685 vnfd_msg.vnfd_id_ref = vnfd
2686 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2687 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2688 elif vnfd_id in self._vnfds_to_vnfr:
2689 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2690 vnfd_msg.vnfd_id_ref = vnfd_id
2691 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2692 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2693
2694 return vnfd_list
2695
2696
2697 class VnfmTasklet(rift.tasklets.Tasklet):
2698 """ VNF Manager tasklet class """
2699 def __init__(self, *args, **kwargs):
2700 super(VnfmTasklet, self).__init__(*args, **kwargs)
2701 self.rwlog.set_category("rw-mano-log")
2702 self.rwlog.set_subcategory("vnfm")
2703
2704 self._dts = None
2705 self._vnfm = None
2706
2707 def start(self):
2708 try:
2709 super(VnfmTasklet, self).start()
2710 self.log.info("Starting VnfmTasklet")
2711
2712 self.log.setLevel(logging.DEBUG)
2713
2714 self.log.debug("Registering with dts")
2715 self._dts = rift.tasklets.DTS(self.tasklet_info,
2716 RwVnfmYang.get_schema(),
2717 self.loop,
2718 self.on_dts_state_change)
2719
2720 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2721 except Exception:
2722 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2723 raise
2724
2725 def on_instance_started(self):
2726 """ Task insance started callback """
2727 self.log.debug("Got instance started callback")
2728
2729 def stop(self):
2730 try:
2731 self._dts.deinit()
2732 except Exception:
2733 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2734 raise
2735
2736 @asyncio.coroutine
2737 def init(self):
2738 """ Task init callback """
2739 try:
2740 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2741 assert vm_parent_name is not None
2742 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2743 yield from self._vnfm.run()
2744 except Exception:
2745 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2746 raise
2747
2748 @asyncio.coroutine
2749 def run(self):
2750 """ Task run callback """
2751 pass
2752
2753 @asyncio.coroutine
2754 def on_dts_state_change(self, state):
2755 """Take action according to current dts state to transition
2756 application into the corresponding application state
2757
2758 Arguments
2759 state - current dts state
2760 """
2761 switch = {
2762 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2763 rwdts.State.CONFIG: rwdts.State.RUN,
2764 }
2765
2766 handlers = {
2767 rwdts.State.INIT: self.init,
2768 rwdts.State.RUN: self.run,
2769 }
2770
2771 # Transition application to next state
2772 handler = handlers.get(state, None)
2773 if handler is not None:
2774 yield from handler()
2775
2776 # Transition dts to next state
2777 next_state = switch.get(state, None)
2778 if next_state is not None:
2779 self._dts.handle.set_state(next_state)