Revert "Full Juju Charm support"
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 # Copyright 2016 RIFT.IO Inc
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15
16 import asyncio
17 import collections
18 import enum
19 import gi
20 import logging
21 import os.path
22 import re
23 import shutil
24 import sys
25 import time
26 import uuid
27 import yaml
28
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('VnfrYang', '1.0')
32 gi.require_version('RwVnfmYang', '1.0')
33 gi.require_version('RwVnfdYang', '1.0')
34 gi.require_version('RwVlrYang', '1.0')
35 gi.require_version('RwManifestYang', '1.0')
36 gi.require_version('RwBaseYang', '1.0')
37 gi.require_version('RwResourceMgrYang', '1.0')
38
39 from gi.repository import (
40 RwDts as rwdts,
41 RwVnfrYang,
42 RwVnfdYang,
43 VnfdYang,
44 RwVnfmYang,
45 RwVlrYang,
46 VnfrYang,
47 RwManifestYang,
48 RwBaseYang,
49 RwResourceMgrYang,
50 ProtobufC,
51 RwTypes
52 )
53 gi.require_version('RwKeyspec', '1.0')
54 from gi.repository.RwKeyspec import quoted_key
55
56 import rift.tasklets
57 import rift.package.store
58 import rift.package.cloud_init
59 import rift.package.script
60 import rift.mano.dts as mano_dts
61 from rift.mano.utils.project import (
62 ManoProject,
63 ProjectHandler,
64 )
65 import rift.mano.utils.short_name as mano_short_name
66 from . import subscriber
67
68 VCP_FIELDS = ['name', 'id', 'connection_point_id', 'type_yang', 'ip_address', 'mac_address']
69
70 class VMResourceError(Exception):
71 """ VM resource Error"""
72 pass
73
74
75 class VnfRecordError(Exception):
76 """ VNF record instatiation failed"""
77 pass
78
79
80 class VduRecordError(Exception):
81 """ VDU record instatiation failed"""
82 pass
83
84
85 class NotImplemented(Exception):
86 """Not implemented """
87 pass
88
89
90 class VnfrRecordExistsError(Exception):
91 """VNFR record already exist with the same VNFR id"""
92 pass
93
94
95 class InternalVirtualLinkRecordError(Exception):
96 """Internal virtual link record error"""
97 pass
98
99
100 class VDUImageNotFound(Exception):
101 """VDU Image not found error"""
102 pass
103
104
105 class VirtualDeploymentUnitRecordError(Exception):
106 """VDU Instantiation failed"""
107 pass
108
109
110 class VMNotReadyError(Exception):
111 """ VM Not yet received from resource manager """
112 pass
113
114
115 class VDURecordNotFound(Exception):
116 """ Could not find a VDU record """
117 pass
118
119
120 class VirtualNetworkFunctionRecordDescNotFound(Exception):
121 """ Cannot find Virtual Network Function Record Descriptor """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorError(Exception):
126 """ Virtual Network Function Record Descriptor Error """
127 pass
128
129
130 class VirtualNetworkFunctionDescriptorNotFound(Exception):
131 """ Virtual Network Function Record Descriptor Not Found """
132 pass
133
134
135 class VirtualNetworkFunctionRecordNotFound(Exception):
136 """ Virtual Network Function Record Not Found """
137 pass
138
139
140 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
141 """ Virtual Network Funtion Descriptor reference count exists """
142 pass
143
144
145 class VnfrInstantiationFailed(Exception):
146 """ Virtual Network Funtion Instantiation failed"""
147 pass
148
149
150 class VNFMPlacementGroupError(Exception):
151 """ VNF placement group Error """
152 pass
153
154
155 class VlrError(Exception):
156 """ Virtual Link Record Error """
157 pass
158
159
160 class VirtualNetworkFunctionRecordState(enum.Enum):
161 """ VNFR state """
162 PRE_INIT = 0
163 INIT = 1
164 VL_INIT_PHASE = 2
165 VM_INIT_PHASE = 3
166 READY = 4
167 TERMINATE = 5
168 VL_TERMINATE_PHASE = 6
169 VDU_TERMINATE_PHASE = 7
170 TERMINATED = 7
171 FAILED = 10
172
173
174 class VDURecordState(enum.Enum):
175 """VDU record state """
176 INIT = 1
177 INSTANTIATING = 2
178 RESOURCE_ALLOC_PENDING = 3
179 READY = 4
180 TERMINATING = 5
181 TERMINATED = 6
182 FAILED = 10
183
184 class VirtualDeploymentUnitRecord(object):
185 """ Virtual Deployment Unit Record """
186 def __init__(self,
187 dts,
188 log,
189 loop,
190 project,
191 vdud,
192 vnfr,
193 nsr_config,
194 mgmt_intf,
195 mgmt_network,
196 datacenter_name,
197 vnfd_package_store,
198 vdur_id=None,
199 placement_groups=[]):
200 self._dts = dts
201 self._log = log
202 self._loop = loop
203 self._project = project
204 self._vdud = vdud
205 self._vnfr = vnfr
206 self._nsr_config = nsr_config
207 self._mgmt_intf = mgmt_intf
208 self._datacenter_name = datacenter_name
209 self._vnfd_package_store = vnfd_package_store
210 self._mgmt_network = mgmt_network
211
212 self._vdur_id = vdur_id or str(uuid.uuid4())
213 self._int_intf = []
214 self._ext_intf = []
215 self._state = VDURecordState.INIT
216 self._state_failed_reason = None
217 self._request_id = str(uuid.uuid4())
218 self._name = vnfr.name + "__" + vdud.id
219 self._placement_groups = placement_groups
220 self._rm_regh = None
221 self._vm_resp = None
222 self._vdud_cloud_init = None
223 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(
224 dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
225
226
227 @asyncio.coroutine
228 def vdu_opdata_register(self):
229 yield from self._vdur_console_handler.register()
230
231 def vm_cp_info(self, cp_name):
232 """ Find the VM Connection info by connection point name """
233 if self._vm_resp is not None:
234 for conn_point in self._vm_resp.connection_points:
235 if conn_point.name == cp_name:
236 return conn_point
237 return None
238
239 def cp_ip_addr(self, cp_name):
240 """ Find ip address by connection point name """
241 vm_cp_info = self.vm_cp_info(cp_name)
242 if vm_cp_info:
243 return vm_cp_info.ip_address
244 else:
245 return "0.0.0.0"
246
247 def cp_mac_addr(self, cp_name):
248 """ Find mac address by connection point name """
249 vm_cp_info = self.vm_cp_info(cp_name)
250 if vm_cp_info:
251 return vm_cp_info.mac_addr
252 else:
253 return "00:00:00:00:00:00"
254
255 def cp_id(self, cp_name):
256 """ Find connection point id by connection point name """
257 vm_cp_info = self.vm_cp_info(cp_name)
258 if vm_cp_info:
259 return vm_cp_info.connection_point_id
260 else:
261 return str()
262
263
264 @property
265 def vdu_id(self):
266 return self._vdud.id
267
268 @property
269 def vm_resp(self):
270 return self._vm_resp
271
272 @property
273 def name(self):
274 """ Return this VDUR's name """
275 return self._name
276
277 # Truncated name confirming to RFC 1123
278 @property
279 def unique_short_name(self):
280 """ Return this VDUR's unique short name """
281 # Impose these restrictions on Unique name
282 # Max 64
283 # - Max trailing 10 chars of NSR name (remove all specialcharacters, only numbers and alphabets)
284 # - 9 chars of shortened name
285 # - Max trailing 10 of VDU name (remove all specialcharacters, only numbers and alphabets)
286 #
287 def _restrict_tag(input_str):
288 # Exclude all characters except a-zA-Z0-9
289 outstr = re.sub('[^a-zA-Z0-9]', '', input_str)
290 # Take max of 10 chars
291 return outstr[-10:]
292
293 # Use NSR name for part1
294 part1 = _restrict_tag(self._nsr_config.name)
295 # Get unique short string (6 chars)
296 part2 = mano_short_name.StringShortner(self._name)
297 # Use VDU ID for part3
298 part3 = _restrict_tag(self._vdud.id)
299 shortstr = part1 + "-" + part2.short_string + "-" + part3
300 return shortstr
301
302 @property
303 def datacenter_name(self):
304 """ Cloud account this VDU should be created in """
305 return self._datacenter_name
306
307 @property
308 def image_name(self):
309 """ name that should be used to lookup the image on the CMP """
310 if 'image' not in self._vdud:
311 return None
312 return os.path.basename(self._vdud.image)
313
314 @property
315 def image_checksum(self):
316 """ name that should be used to lookup the image on the CMP """
317 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
318
319 @property
320 def management_ip(self):
321 if not self.active:
322 return None
323 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
324
325 @property
326 def vm_management_ip(self):
327 if not self.active:
328 return None
329 return self._vm_resp.management_ip
330
331 @property
332 def operational_status(self):
333 """ Operational status of this VDU"""
334 op_stats_dict = {"INIT": "init",
335 "INSTANTIATING": "vm_init_phase",
336 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
337 "READY": "running",
338 "FAILED": "failed",
339 "TERMINATING": "terminated",
340 "TERMINATED": "terminated",
341 }
342 return op_stats_dict[self._state.name]
343
344 @property
345 def msg(self):
346 """ Process VDU message from resmgr"""
347 vdu_fields = ["vm_flavor",
348 "guest_epa",
349 "vswitch_epa",
350 "hypervisor_epa",
351 "host_epa",
352 "volumes"
353 ]
354
355 vdu_copy_dict = {k: v for k, v in
356 self._vdud.as_dict().items() if k in vdu_fields}
357 vdur_dict = {"id": self._vdur_id,
358 "vdu_id_ref": self._vdud.id,
359 "operational_status": self.operational_status,
360 "operational_status_details": self._state_failed_reason,
361 "name": self.name,
362 "unique_short_name": self.unique_short_name
363 }
364
365
366 if self.vm_resp is not None:
367 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
368 "flavor_id": self.vm_resp.flavor_id
369 })
370 if self._vm_resp.has_field('image_id'):
371 vdur_dict.update({ "image_id": self.vm_resp.image_id })
372
373 if self.management_ip:
374 vdur_dict["management_ip"] = self.management_ip
375
376 if self.vm_management_ip:
377 vdur_dict["vm_management_ip"] = self.vm_management_ip
378
379 vdur_dict.update(vdu_copy_dict)
380
381
382 if self.vm_resp is not None:
383 if self._vm_resp.has_field('volumes'):
384 for opvolume in self._vm_resp.volumes:
385 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
386 if len(vdurvol_data) == 1:
387 vdurvol_data[0]["volume_id"] = opvolume.volume_id
388 if opvolume.has_field('custom_meta_data'):
389 metadata_list = list()
390 for metadata_item in opvolume.custom_meta_data:
391 metadata_list.append(metadata_item.as_dict())
392 vdurvol_data[0]['custom_meta_data'] = metadata_list
393
394 if self._vm_resp.has_field('supplemental_boot_data'):
395 vdur_dict['supplemental_boot_data'] = dict()
396 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
397 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
398 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
399 metadata_list = list()
400
401 # supplemental_boot_data below is returned by Openstack.
402 # The self._vm_resp version of supplemental data is defaulting to CLOUD_METADATA
403 # as Openstack does not repond with 'destination' attribute of custom meta data elements.
404 # Therefore the vdur when published does not specify the destination of the custom-meta-data.
405 # Should we add this field (destination) explicitly here by comparig the keys with the already obtained
406 # details in self._vdud ?
407
408 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
409 metadata_list.append(metadata_item.as_dict())
410 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
411
412 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
413 file_list = list()
414 for file_item in self._vm_resp.supplemental_boot_data.config_file:
415 file_list.append(file_item.as_dict())
416 vdur_dict['supplemental_boot_data']['config_file'] = file_list
417
418 icp_list = []
419 ii_list = []
420
421 for intf, cp_id, vlr in self._int_intf:
422 cp = self.find_internal_cp_by_cp_id(cp_id)
423
424 cp_info = dict(name=cp.name,
425 id=cp.id,
426 type_yang='VPORT',
427 ip_address=self.cp_ip_addr(cp.name),
428 mac_address=self.cp_mac_addr(cp.name),
429 connection_point_id=self.cp_id(cp.name))
430
431 virtual_cps = [ vcp for vcp in vlr._vlr.virtual_connection_points
432 if [ True for cp_ref in vcp.associated_cps if cp.name == cp_ref ]]
433
434 if virtual_cps:
435 for vcp in virtual_cps:
436 cp_info['virtual_cps'] = [ {k:v for k,v in vcp.as_dict().items() if k in VCP_FIELDS}
437 for vcp in virtual_cps ]
438
439 icp_list.append(cp_info)
440
441 ii_dict = {"name": intf.name,
442 "internal_connection_point_ref": cp.id,
443 "virtual_interface": {}}
444
445 if "position" in intf.as_dict():
446 ii_dict["position"] = intf.position
447
448 ii_list.append(ii_dict)
449
450 vdur_dict["internal_connection_point"] = icp_list
451 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
452
453
454 ei_list = []
455 for intf, cp, vlr in self._ext_intf:
456 ei_dict = {"name": intf.name,
457 "external_connection_point_ref": cp.name,
458 "virtual_interface": {}}
459 if "position" in intf.as_dict():
460 ei_dict["position"] = intf.position
461
462 ei_list.append(ei_dict)
463
464 virtual_cps = [ vcp for vcp in vlr.virtual_connection_points
465 if [ True for cp_ref in vcp.associated_cps if cp.name == cp_ref ]]
466
467 if virtual_cps:
468 for vcp in virtual_cps:
469 virtual_cp_info = [ {k:v for k,v in vcp.as_dict().items() if k in VCP_FIELDS}
470 for vcp in virtual_cps ]
471 else:
472 virtual_cp_info = []
473
474 self._vnfr.update_cp(cp.name,
475 self.cp_ip_addr(cp.name),
476 self.cp_mac_addr(cp.name),
477 self.cp_id(cp.name),
478 virtual_cp_info)
479
480 vdur_dict["interface"] = ei_list + ii_list
481
482
483 vdur_dict['placement_groups_info'] = [group.as_dict()
484 for group in self._placement_groups]
485
486 return RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
487
488 @property
489 def resmgr_path(self):
490 """ path for resource-mgr"""
491 xpath = self._project.add_project("D,/rw-resource-mgr:resource-mgmt" +
492 "/vdu-event" +
493 "/vdu-event-data[event-id={}]".format(quoted_key(self._request_id)))
494 return xpath
495
496 @property
497 def vm_flavor_msg(self):
498 """ VM flavor message """
499 flavor = self._vdud.vm_flavor.__class__()
500 flavor.copy_from(self._vdud.vm_flavor)
501
502 return flavor
503
504 @property
505 def vdud_cloud_init(self):
506 """ Return the cloud-init contents for the VDU """
507 if self._vdud_cloud_init is None:
508 ci = self.cloud_init()
509
510 # VNFR ssh public key, if available
511 if self._vnfr.public_key:
512 if not ci:
513 ci = "#cloud-config"
514 self._vdud_cloud_init = """{}
515 ssh_authorized_keys:
516 - {}""". \
517 format(ci, self._vnfr.public_key)
518 else:
519 self._vdud_cloud_init = ci
520
521 self._log.debug("Cloud init: {}".format(self._vdud_cloud_init))
522
523 return self._vdud_cloud_init
524
525 def cloud_init(self):
526 """ Populate cloud_init with cloud-config script from
527 either the inline contents or from the file provided
528 """
529 cloud_init_msg = None
530 if self._vdud.cloud_init is not None:
531 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
532 cloud_init_msg = self._vdud.cloud_init
533 elif self._vdud.cloud_init_file is not None:
534 # Get cloud-init script contents from the file provided in the cloud_init_file param
535 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
536 filename = self._vdud.cloud_init_file
537 self._vnfd_package_store.refresh()
538 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
539 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
540 try:
541 cloud_init_msg = cloud_init_extractor.read_script(stored_package, filename)
542 except rift.package.cloud_init.CloudInitExtractionError as e:
543 self.instantiation_failed(str(e))
544 raise VirtualDeploymentUnitRecordError(e)
545 else:
546 if not self._vnfr._vnfr_msg.cloud_config.key_pair and not self._vnfr._vnfr_msg.cloud_config.user:
547 self._log.debug("VDU Instantiation: cloud-init script not provided")
548 return
549
550 self._log.debug("Current cloud init msg is {}".format(cloud_init_msg))
551 if not self._vnfr._vnfr_msg.cloud_config.key_pair and not self._vnfr._vnfr_msg.cloud_config.user:
552 return cloud_init_msg
553
554 cloud_init_dict = {}
555 if cloud_init_msg:
556 try:
557 cloud_init_dict = yaml.load(cloud_init_msg)
558 except Exception as e:
559 self._log.exception(e)
560 self._log.error("Error loading cloud init Yaml file with exception %s", str(e))
561 return cloud_init_msg
562
563 self._log.debug("Current cloud init dict is {}".format(cloud_init_dict))
564
565 for key_pair in self._vnfr._vnfr_msg.cloud_config.key_pair:
566 if "ssh_authorized_keys" not in cloud_init_dict:
567 cloud_init_dict["ssh_authorized_keys"] = list()
568 cloud_init_dict["ssh_authorized_keys"].append(key_pair.key)
569
570 users = list()
571 for user_entry in self._vnfr._vnfr_msg.cloud_config.user:
572 if "users" not in cloud_init_dict:
573 cloud_init_dict["users"] = list()
574 user = {}
575 user["name"] = user_entry.name
576 user["gecos"] = user_entry.user_info
577 user["sudo"] = "ALL=(ALL) NOPASSWD:ALL"
578 user["ssh-authorized-keys"] = list()
579 for ssh_key in user_entry.key_pair:
580 user["ssh-authorized-keys"].append(ssh_key.key)
581 cloud_init_dict["users"].append(user)
582
583 cloud_msg = yaml.safe_dump(cloud_init_dict,width=1000,default_flow_style=False)
584 cloud_init = "#cloud-config\n"+cloud_msg
585 self._log.debug("Cloud init msg is {}".format(cloud_init))
586 return cloud_init
587
588 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
589 host_aggregates = []
590 availability_zones = []
591 server_groups = []
592 for group in self._placement_groups:
593 if group.has_field('host_aggregate'):
594 for aggregate in group.host_aggregate:
595 host_aggregates.append(aggregate.as_dict())
596 if group.has_field('availability_zone'):
597 availability_zones.append(group.availability_zone.as_dict())
598 if group.has_field('server_group'):
599 server_groups.append(group.server_group.as_dict())
600
601 if availability_zones:
602 if len(availability_zones) > 1:
603 self._log.error("Can not launch VDU: %s in multiple availability zones. " +
604 "Requested Zones: %s", self.name, availability_zones)
605 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability" +
606 " zones. Requsted Zones".format(self.name, availability_zones))
607 else:
608 vm_create_msg_dict['availability_zone'] = availability_zones[0]
609
610 if server_groups:
611 if len(server_groups) > 1:
612 self._log.error("Can not launch VDU: %s in multiple Server Group. " +
613 "Requested Groups: %s", self.name, server_groups)
614 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple " +
615 "Server Groups. Requsted Groups".format(self.name, server_groups))
616 else:
617 vm_create_msg_dict['server_group'] = server_groups[0]
618
619 if host_aggregates:
620 vm_create_msg_dict['host_aggregate'] = host_aggregates
621
622 return
623
624 def process_placement_groups(self, vm_create_msg_dict):
625 """Process the placement_groups and fill resource-mgr request"""
626 if not self._placement_groups:
627 return
628
629 cloud_set = set([group.cloud_type for group in self._placement_groups])
630 assert len(cloud_set) == 1
631 cloud_type = cloud_set.pop()
632
633 if cloud_type == 'openstack':
634 self.process_openstack_placement_group_construct(vm_create_msg_dict)
635
636 else:
637 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
638 return
639
640 def process_custom_bootdata(self, vm_create_msg_dict):
641 """Process the custom boot data"""
642 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
643 return
644
645 self._vnfd_package_store.refresh()
646 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
647 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
648 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
649 if 'source' not in file_item or 'dest' not in file_item:
650 continue
651 source = file_item['source']
652 # Find source file in scripts dir of VNFD
653 self._log.debug("Checking for source config file at %s", source)
654 try:
655 try:
656 source_file_str = cloud_init_extractor.read_script(stored_package, source)
657 file_item['source'] = source_file_str
658 except rift.package.package.PackageError as e:
659 self._log.info("Invalid package with Package descriptor id")
660
661 except rift.package.cloud_init.CloudInitExtractionError as e:
662 raise VirtualDeploymentUnitRecordError(e)
663 # Update source file location with file contents
664
665 return
666
667 def resmgr_msg(self, config=None):
668 vdu_fields = ["vm_flavor",
669 "guest_epa",
670 "vswitch_epa",
671 "hypervisor_epa",
672 "host_epa",
673 "volumes",
674 "supplemental_boot_data"]
675
676 def make_resmgr_cp_args(intf, cp, vlr):
677 cp_info = dict(name = cp.name,
678 virtual_link_id = vlr.network_id,
679 type_yang = intf.virtual_interface.type_yang)
680
681 if vlr.network_id is None:
682 raise VlrError("Unresolved virtual link id for vlr id:%s, name:%s",
683 (vlr.id, vlr.name))
684
685 if cp.has_field('port_security_enabled'):
686 cp_info["port_security_enabled"] = cp.port_security_enabled
687
688 try:
689 if intf.static_ip_address:
690 cp_info["static_ip_address"] = intf.static_ip_address
691 except AttributeError as e:
692 ### This can happen because of model difference between OSM and RIFT. Ignore exception
693 self._log.debug(str(e))
694
695 if (intf.virtual_interface.has_field('vpci') and
696 intf.virtual_interface.vpci is not None):
697 cp_info["vpci"] = intf.virtual_interface.vpci
698
699 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
700 cp_info['security_group'] = vlr.ip_profile_params.security_group
701
702 if vlr.has_field('virtual_connection_points'):
703 virtual_cps = [ vcp for vcp in vlr.virtual_connection_points
704 if [ True for cp_ref in vcp.associated_cps if cp.name == cp_ref ]]
705 if virtual_cps:
706 fields = ['connection_point_id', 'name', 'ip_address', 'mac_address']
707 cp_info['virtual_cps'] = [ {k:v for k,v in vcp.as_dict().items() if k in fields}
708 for vcp in virtual_cps ]
709
710 # Adding Port Sequence Information to cp_info
711 intf_dict = intf.as_dict()
712 if "position" in intf_dict:
713 cp_info["port_order"] = intf.position
714
715 self._log.debug("CP info {}".format(cp_info))
716 return cp_info
717
718 self._log.debug("Creating params based on VDUD: %s", self._vdud)
719 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
720
721 vm_create_msg_dict = {
722 "name": self.unique_short_name, # Truncated name confirming to RFC 1123
723 "node_id": self.name, # Rift assigned Id
724 }
725
726 if self.image_name is not None:
727 vm_create_msg_dict["image_name"] = self.image_name
728
729 if self.image_checksum is not None:
730 vm_create_msg_dict["image_checksum"] = self.image_checksum
731
732 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
733 if self._vdud.has_field('mgmt_vpci'):
734 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
735
736 self._log.debug("VDUD: %s", self._vdud)
737 if config is not None:
738 vm_create_msg_dict['vdu_init'] = {'userdata': config}
739
740 if self._mgmt_network:
741 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
742
743 cp_list = list()
744 for intf, cp, vlr in self._ext_intf:
745 cp_list.append(make_resmgr_cp_args(intf, cp, vlr))
746
747 for intf, cp_id, vlr in self._int_intf:
748 cp = self.find_internal_cp_by_cp_id(cp_id)
749 cp_list.append(make_resmgr_cp_args(intf, cp, vlr.msg()))
750
751
752 vm_create_msg_dict["connection_points"] = cp_list
753 vm_create_msg_dict.update(vdu_copy_dict)
754
755 self.process_placement_groups(vm_create_msg_dict)
756 if 'supplemental_boot_data' in vm_create_msg_dict:
757 self.process_custom_bootdata(vm_create_msg_dict)
758
759 msg = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData()
760 msg.event_id = self._request_id
761 msg.cloud_account = self.datacenter_name
762
763 msg.request_info.from_dict(vm_create_msg_dict)
764
765 for volume in self._vdud.volumes:
766 v = msg.request_info.volumes.add()
767 v.from_dict(volume.as_dict())
768
769 return msg
770
771 @asyncio.coroutine
772 def terminate(self, xact):
773 """ Delete resource in VIM """
774 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
775 self._log.warning("VDU terminate in not ready state - Ignoring request")
776 return
777
778 self._state = VDURecordState.TERMINATING
779 if self._vm_resp is not None:
780 try:
781 with self._dts.transaction() as new_xact:
782 yield from self.delete_resource(new_xact)
783 except Exception:
784 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
785
786 if self._rm_regh is not None:
787 self._log.debug("Deregistering resource manager registration handle")
788 self._rm_regh.deregister()
789 self._rm_regh = None
790
791 if self._vdur_console_handler is not None:
792 self._log.debug("Deregistering vnfr vdur console registration handle")
793 self._vdur_console_handler._regh.deregister()
794 self._vdur_console_handler._regh = None
795
796 self._state = VDURecordState.TERMINATED
797
798 def find_internal_cp_by_cp_id(self, cp_id):
799 """ Find the CP corresponding to the connection point id"""
800 cp = None
801
802 self._log.debug("find_internal_cp_by_cp_id(%s) called",
803 cp_id)
804
805 for int_cp in self._vdud.internal_connection_point:
806 self._log.debug("Checking for int cp %s in internal connection points",
807 int_cp.id)
808 if int_cp.id == cp_id:
809 cp = int_cp
810 break
811
812 if cp is None:
813 self._log.debug("Failed to find cp %s in internal connection points",
814 cp_id)
815 msg = "Failed to find cp %s in internal connection points" % cp_id
816 raise VduRecordError(msg)
817
818 # return the VLR associated with the connection point
819 return cp
820
821 @asyncio.coroutine
822 def create_resource(self, xact, vnfr, config=None):
823 """ Request resource from ResourceMgr """
824 def find_cp_by_name(cp_name):
825 """ Find a connection point by name """
826 cp = None
827 self._log.debug("find_cp_by_name(%s) called", cp_name)
828 for ext_cp in vnfr._cprs:
829 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
830 if ext_cp.name == cp_name:
831 cp = ext_cp
832 break
833 if cp is None:
834 self._log.debug("Failed to find cp %s in external connection points",
835 cp_name)
836 return cp
837
838 def find_internal_vlr_by_cp_id(cp_id):
839 self._log.debug("find_internal_vlr_by_cp_id(%s) called",
840 cp_id)
841
842 # Validate the cp
843 cp = self.find_internal_cp_by_cp_id(cp_id)
844
845 # return the VLR associated with the connection point
846 return vnfr.find_vlr_by_cp(cp_id)
847
848
849 def add_external_interface(interface):
850 # Add an external interface from vdu interface list
851 cp = find_cp_by_name(interface.external_connection_point_ref)
852 if cp is None:
853 self._log.debug("Failed to find connection point - %s",
854 interface.external_connection_point_ref)
855 return
856
857 self._log.debug("Connection point name [%s], type[%s]",
858 cp.name, cp.type_yang)
859
860 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
861
862 etuple = (interface, cp, vlr)
863 self._ext_intf.append(etuple)
864
865 self._log.debug("Created external interface tuple : %s", etuple)
866
867 @asyncio.coroutine
868 def add_internal_interface(interface):
869 # Add an internal interface from vdu interface list
870 cp_id = interface.internal_connection_point_ref
871 self._log.debug("Resolving internal interface name [%s], cp[%s]",
872 interface.name, cp_id)
873
874 if cp_id is None:
875 msg = "The Internal Interface : %s is not mapped to an internal connection point." % (interface.name)
876 self._log.error(msg)
877 raise VduRecordError(msg)
878
879 try:
880 vlr = find_internal_vlr_by_cp_id(cp_id)
881 iter = yield from self._dts.query_read(vlr.vlr_path())
882 for itr in iter:
883 vlr._vlr = (yield from itr).result
884 except Exception as e:
885 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
886 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
887 raise VduRecordError(msg)
888
889 ituple = (interface, cp_id, vlr)
890 self._int_intf.append(ituple)
891
892 self._log.debug("Created internal interface tuple : %s", ituple)
893
894
895 block = xact.block_create()
896
897 self._log.debug("Executing vm request id: %s, action: create",
898 self._request_id)
899
900 # Resolve the networks associated with interfaces ( both internal and external)
901
902 for intf in self._vdud.interface:
903 if intf.type_yang == 'EXTERNAL':
904 self._log.debug("Resolving external interface name [%s], cp[%s]",
905 intf.name, intf.external_connection_point_ref)
906 try:
907 add_external_interface(intf)
908 except Exception as e:
909 msg = "Failed to add external interface %s from vdu interface list, e = %s" % (intf.name, e)
910 self._log.error(msg)
911 raise VduRecordError(msg)
912 elif intf.type_yang == 'INTERNAL':
913 self._log.debug("Resolving internal interface name [%s], cp[%s]",
914 intf.name, intf.internal_connection_point_ref)
915 try:
916 yield from add_internal_interface(intf)
917 except Exception as e:
918 msg = "Failed to add internal interface %s from vdu interface list, e = %s" % (intf.name, e)
919 self._log.error(msg)
920 raise VduRecordError(msg)
921
922
923
924 resmgr_path = self.resmgr_path
925 resmgr_msg = self.resmgr_msg(config)
926
927 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
928 block.add_query_create(resmgr_path, resmgr_msg)
929
930 res_iter = yield from block.execute(now=True)
931
932 resp = None
933
934 for i in res_iter:
935 r = yield from i
936 resp = r.result
937
938 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
939 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
940 self._log.debug("Got vm request response: %s", resp.resource_info)
941 return resp.resource_info
942
943 @asyncio.coroutine
944 def delete_resource(self, xact):
945 block = xact.block_create()
946
947 self._log.debug("Executing vm request id: %s, action: delete",
948 self._request_id)
949
950 block.add_query_delete(self.resmgr_path)
951
952 yield from block.execute(flags=0, now=True)
953
954 @asyncio.coroutine
955 def read_resource(self, xact):
956 block = xact.block_create()
957
958 self._log.debug("Executing vm request id: %s, action: delete",
959 self._request_id)
960
961 block.add_query_read(self.resmgr_path)
962
963 res_iter = yield from block.execute(flags=0, now=True)
964 for i in res_iter:
965 r = yield from i
966 resp = r.result
967
968 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
969 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
970 self._log.debug("Got vm request response: %s", resp.resource_info)
971 #self._vm_resp = resp.resource_info
972 return resp.resource_info
973
974 @property
975 def active(self):
976 """ Is this VDU active """
977 return True if self._state is VDURecordState.READY else False
978
979 @asyncio.coroutine
980 def instantiation_failed(self, failed_reason=None):
981 """ VDU instantiation failed """
982 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
983 self._state = VDURecordState.FAILED
984 self._state_failed_reason = failed_reason
985 yield from self._vnfr.instantiation_failed(failed_reason)
986
987 @asyncio.coroutine
988 def vdu_is_active(self):
989 """ This VDU is active"""
990 if self.active:
991 self._log.warning("VDU %s was already marked as active", self._vdur_id)
992 return
993
994 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
995
996 self._state = VDURecordState.READY
997
998 if self._vnfr.all_vdus_active():
999 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
1000 yield from self._vnfr.is_ready()
1001
1002 @asyncio.coroutine
1003 def instantiate(self, xact, vnfr, config=None):
1004 """ Instantiate this VDU """
1005 self._state = VDURecordState.INSTANTIATING
1006
1007 @asyncio.coroutine
1008 def on_prepare(xact_info, query_action, ks_path, msg):
1009 """ This VDUR is active """
1010 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
1011 query_action,
1012 ks_path,
1013 msg)
1014
1015 if (query_action == rwdts.QueryAction.UPDATE or
1016 query_action == rwdts.QueryAction.CREATE):
1017 self._vm_resp = msg
1018
1019 if msg.resource_state == "active":
1020 # Move this VDU to ready state
1021 yield from self.vdu_is_active()
1022 elif msg.resource_state == "failed":
1023 yield from self.instantiation_failed(msg.resource_errors)
1024 elif query_action == rwdts.QueryAction.DELETE:
1025 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
1026 else:
1027 raise NotImplementedError(
1028 "%s action on VirtualDeployementUnitRecord not supported",
1029 query_action)
1030
1031 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1032
1033 try:
1034 #Check if resource orchestrator is not rift so that resource manager tasklet is not invoked
1035 if self._nsr_config.resource_orchestrator is not None:
1036 return
1037
1038 reg_event = asyncio.Event(loop=self._loop)
1039
1040 @asyncio.coroutine
1041 def on_ready(regh, status):
1042 reg_event.set()
1043
1044 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
1045 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
1046 flags=rwdts.Flag.SUBSCRIBER,
1047 handler=handler)
1048 yield from reg_event.wait()
1049
1050 vm_resp = yield from self.create_resource(xact, vnfr, config)
1051 self._vm_resp = vm_resp
1052 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
1053
1054 self._log.debug("Requested VM from resource manager response %s",
1055 vm_resp)
1056 if vm_resp.resource_state == "active":
1057 self._log.debug("Resourcemgr responded wih an active vm resp %s",
1058 vm_resp)
1059 yield from self.vdu_is_active()
1060 self._state = VDURecordState.READY
1061 elif (vm_resp.resource_state == "pending" or
1062 vm_resp.resource_state == "inactive"):
1063 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
1064 vm_resp)
1065 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1066 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
1067 # flags=rwdts.Flag.SUBSCRIBER,
1068 # handler=handler)
1069 else:
1070 self._log.debug("Resourcemgr responded wih an error vm resp %s",
1071 vm_resp)
1072 raise VirtualDeploymentUnitRecordError(
1073 "Failed VDUR instantiation %s " % vm_resp)
1074
1075 except Exception as e:
1076 import traceback
1077 traceback.print_exc()
1078 self._log.exception(e)
1079 self._log.error("Instantiation of VDU record failed: %s", str(e))
1080 self._state = VDURecordState.FAILED
1081 yield from self.instantiation_failed(str(e))
1082
1083
1084 class VlRecordState(enum.Enum):
1085 """ VL Record State """
1086 INIT = 101
1087 INSTANTIATION_PENDING = 102
1088 ACTIVE = 103
1089 TERMINATE_PENDING = 104
1090 TERMINATED = 105
1091 FAILED = 106
1092
1093
1094 class InternalVirtualLinkRecord(object):
1095 """ Internal Virtual Link record """
1096 def __init__(self, dts, log, loop, project, vnfm,
1097 ivld_msg, vnfr_name, datacenter_name, ip_profile=None):
1098 self._dts = dts
1099 self._log = log
1100 self._loop = loop
1101 self._project = project
1102 self._vnfm = vnfm
1103 self._ivld_msg = ivld_msg
1104 self._vnfr_name = vnfr_name
1105 self._datacenter_name = datacenter_name
1106 self._ip_profile = ip_profile
1107
1108 self._vlr_req = self.create_vlr()
1109 self._vlr = None
1110 self._network_id = None
1111 self._state = VlRecordState.INIT
1112 self._state_details = ""
1113
1114 @property
1115 def vlr_id(self):
1116 """ Find VLR by id """
1117 return self._vlr_req.id
1118
1119 @property
1120 def name(self):
1121 """ Name of this VL """
1122 if self._ivld_msg.vim_network_name:
1123 return self._ivld_msg.vim_network_name
1124 else:
1125 return self._vnfr_name + "." + self._ivld_msg.name
1126
1127 @property
1128 def network_id(self):
1129 """ Find VLR by id """
1130 return self._network_id
1131
1132 @network_id.setter
1133 def network_id(self, network_id):
1134 """ network id setter"""
1135 self._network_id = network_id
1136
1137 @property
1138 def active(self):
1139 """ """
1140 return self._state == VlRecordState.ACTIVE
1141
1142 @property
1143 def state(self):
1144 """ state for this VLR """
1145 return self._state
1146
1147 @property
1148 def state_details(self):
1149 """ state details for this VLR """
1150 return self._state_details
1151
1152 def vlr_path(self):
1153 """ VLR path for this VLR instance"""
1154 return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]".
1155 format(quoted_key(self.vlr_id)))
1156
1157 def create_vlr(self):
1158 """ Create the VLR record which will be instantiated """
1159
1160 vld_fields = ["short_name",
1161 "vendor",
1162 "description",
1163 "version",
1164 "type_yang",
1165 "vim_network_name",
1166 "provider_network"]
1167
1168 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1169
1170 vlr_dict = {"id": str(uuid.uuid4()),
1171 "name": self.name,
1172 "datacenter": self._datacenter_name,
1173 }
1174
1175 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1176 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1177
1178 vlr_dict.update(vld_copy_dict)
1179
1180 vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict)
1181
1182 if self._ivld_msg.has_field('virtual_connection_points'):
1183 for cp in self._ivld_msg.virtual_connection_points:
1184 vcp = vlr.virtual_connection_points.add()
1185 vcp.from_dict(cp.as_dict())
1186
1187 return vlr
1188
1189 @asyncio.coroutine
1190 def instantiate(self, xact, restart_mode=False):
1191 """ Instantiate VL """
1192
1193 @asyncio.coroutine
1194 def instantiate_vlr():
1195 """ Instantiate VLR"""
1196 self._log.debug("Create VL with xpath %s and vlr %s",
1197 self.vlr_path(), self._vlr_req)
1198
1199 try:
1200 with self._dts.transaction(flags=0) as xact:
1201 block = xact.block_create()
1202 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1203 self._log.debug("Executing VL create path:%s msg:%s",
1204 self.vlr_path(), self._vlr_req)
1205
1206 self._state = VlRecordState.INSTANTIATION_PENDING
1207 self._state_details = "Oustanding VL create request:%s".format(self.vlr_path())
1208 res_iter = None
1209 try:
1210 res_iter = yield from block.execute()
1211 except Exception as e:
1212 self._state = VlRecordState.FAILED
1213 self._state_details = str(e)
1214 self._log.exception("Caught exception while instantial VL")
1215 raise
1216
1217 for ent in res_iter:
1218 res = yield from ent
1219 self._vlr = res.result
1220
1221 if self._vlr.operational_status == 'failed':
1222 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1223 self._state = VlRecordState.FAILED
1224 self._state_details = self._vlr.operational_status_details
1225 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1226
1227 except Exception as e:
1228 self._log.error("Caught exception while instantiating VL:%s:%s, e:%s",
1229 self.vlr_id, self._vlr.name, e)
1230 self._state_details = str(e)
1231 raise
1232
1233 self._log.info("Created VL with xpath %s and vlr %s",
1234 self.vlr_path(), self._vlr)
1235
1236 @asyncio.coroutine
1237 def get_vlr():
1238 """ Get the network id """
1239 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1240 vlr = None
1241 for ent in res_iter:
1242 res = yield from ent
1243 vlr = res.result
1244
1245 if vlr is None:
1246 err = "Failed to get VLR for path %s" % self.vlr_path()
1247 self._log.warn(err)
1248 raise InternalVirtualLinkRecordError(err)
1249 return vlr
1250
1251 self._state = VlRecordState.INSTANTIATION_PENDING
1252
1253 if restart_mode:
1254 vl = yield from get_vlr()
1255 if vl is None:
1256 yield from instantiate_vlr()
1257 else:
1258 yield from instantiate_vlr()
1259
1260
1261 def vlr_in_vns(self):
1262 """ Is there a VLR record in VNS """
1263 if (self._state == VlRecordState.ACTIVE or
1264 self._state == VlRecordState.INSTANTIATION_PENDING or
1265 self._state == VlRecordState.FAILED):
1266 return True
1267
1268 return False
1269
1270 @asyncio.coroutine
1271 def terminate(self, xact):
1272 """Terminate this VL """
1273 if not self.vlr_in_vns():
1274 self._log.debug("Ignoring terminate request for id %s in state %s",
1275 self.vlr_id, self._state)
1276 return
1277
1278 self._log.debug("Terminating VL with path %s", self.vlr_path())
1279 self._state = VlRecordState.TERMINATE_PENDING
1280 self._state_details = "VL Terminate pending"
1281 block = xact.block_create()
1282 block.add_query_delete(self.vlr_path())
1283 yield from block.execute(flags=0, now=True)
1284 self._state = VlRecordState.TERMINATED
1285 self._state_details = "VL Terminated"
1286 self._log.debug("Terminated VL with path %s", self.vlr_path())
1287
1288 def set_state_from_op_status(self, operational_status, operational_status_details):
1289 """ Set the state of this VL based on operational_status"""
1290
1291 self._state_details = operational_status_details
1292
1293 if operational_status == 'running':
1294 self._log.info("VL %s moved to active state", self.vlr_id)
1295 self._state = VlRecordState.ACTIVE
1296 elif operational_status == 'failed':
1297 self._log.info("VL %s moved to failed state", self.vlr_id)
1298 self._state = VlRecordState.FAILED
1299 elif operational_status == 'vl_alloc_pending':
1300 self._log.debug("VL %s is in alloc pending state", self.vlr_id)
1301 self._state = VlRecordState.INSTANTIATION_PENDING
1302 else:
1303 raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status))
1304
1305 def msg(self):
1306 """ Get a proto corresponding to this VLR """
1307 msg = self._vlr
1308 return msg
1309
1310
1311 class VirtualNetworkFunctionRecord(object):
1312 """ Virtual Network Function Record """
1313 def __init__(self, dts, log, loop, cluster_name, vnfm, vnfr_msg,
1314 mgmt_network=None, external_ro=False):
1315 self._dts = dts
1316 self._log = log
1317 self._loop = loop###
1318 self._project = vnfm._project
1319 self._cluster_name = cluster_name
1320 self._vnfr_msg = vnfr_msg
1321 self._vnfr_id = vnfr_msg.id
1322 self._vnfd_id = vnfr_msg.vnfd.id
1323 self._vnfm = vnfm
1324 self._vnfr = vnfr_msg
1325 self._mgmt_network = mgmt_network
1326
1327 self._vnfd = vnfr_msg.vnfd
1328 self._state = VirtualNetworkFunctionRecordState.INIT
1329 self._state_failed_reason = None
1330 self._ext_vlrs = {} # The list of external virtual links
1331 self._vlrs = {} # The list of internal virtual links
1332 self._vdus = [] # The list of vdu
1333 self._vlr_by_cp = {}
1334 self._cprs = []
1335 self._inventory = {}
1336 self._create_time = int(time.time())
1337 self._vnf_mon = None
1338 self._config_status = vnfr_msg.config_status
1339 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log, project=self._project.name)
1340 self._rw_vnfd = None
1341 self._vnfd_ref_count = 0
1342
1343 self._ssh_pub_key = None
1344 self._ssh_key_file = None
1345 self._task = None
1346 # Create an asyncio loop to know when the virtual links are ready
1347 self._vls_ready = asyncio.Event(loop=self._loop)
1348
1349 # Counter for pre-init VNFR State Update DTS Query
1350 self._init = False
1351 self._external_ro = external_ro
1352
1353 def _get_vdur_from_vdu_id(self, vdu_id):
1354 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1355 self._log.debug("Searching through vdus: %s", self._vdus)
1356 for vdu in self._vdus:
1357 self._log.debug("vdu_id: %s", vdu.vdu_id)
1358 if vdu.vdu_id == vdu_id:
1359 return vdu
1360
1361 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1362
1363 @property
1364 def operational_status(self):
1365 """ Operational status of this VNFR """
1366 op_status_map = {"PRE_INIT": "pre_init",
1367 "INIT": "init",
1368 "VL_INIT_PHASE": "vl_init_phase",
1369 "VM_INIT_PHASE": "vm_init_phase",
1370 "READY": "running",
1371 "TERMINATE": "terminate",
1372 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1373 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1374 "TERMINATED": "terminated",
1375 "FAILED": "failed", }
1376 return op_status_map[self._state.name]
1377
1378 @staticmethod
1379 def vnfd_xpath(vnfd_id):
1380 """ VNFD xpath associated with this VNFR """
1381 return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd[project-vnfd:id={}]".
1382 format(quoted_key(vnfd_id)))
1383
1384 @property
1385 def external_ro(self):
1386 return self._external_ro
1387
1388 @property
1389 def task(self):
1390 return self._task
1391
1392 @task.setter
1393 def task(self, task):
1394 self._task = task
1395
1396 @property
1397 def vnfd_ref_count(self):
1398 """ Returns the VNFD reference count associated with this VNFR """
1399 return self._vnfd_ref_count
1400
1401 def vnfd_in_use(self):
1402 """ Returns whether vnfd is in use or not """
1403 return True if self._vnfd_ref_count > 0 else False
1404
1405 def vnfd_ref(self):
1406 """ Take a reference on this object """
1407 self._vnfd_ref_count += 1
1408 return self._vnfd_ref_count
1409
1410 def vnfd_unref(self):
1411 """ Release reference on this object """
1412 if self._vnfd_ref_count < 1:
1413 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1414 (self.vnfd.id, self._vnfd_ref_count))
1415 self._log.critical(msg)
1416 raise VnfRecordError(msg)
1417 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1418 self.vnfd.id, self._vnfd_ref_count)
1419 self._vnfd_ref_count -= 1
1420 return self._vnfd_ref_count
1421
1422 @property
1423 def vnfd(self):
1424 """ VNFD for this VNFR """
1425 return self._vnfd
1426
1427 @property
1428 def vnf_name(self):
1429 """ VNFD name associated with this VNFR """
1430 return self.vnfd.name
1431
1432 @property
1433 def name(self):
1434 """ Name of this VNF in the record """
1435 return self._vnfr.name
1436
1437 @property
1438 def datacenter_name(self):
1439 """ Name of the cloud account this VNFR is instantiated in """
1440 return self._vnfr.datacenter
1441
1442 @property
1443 def vnfd_id(self):
1444 """ VNFD Id associated with this VNFR """
1445 return self.vnfd.id
1446
1447 @property
1448 def vnfr_id(self):
1449 """ VNFR Id associated with this VNFR """
1450 return self._vnfr_id
1451
1452 @property
1453 def member_vnf_index(self):
1454 """ Member VNF index associated with this VNFR """
1455 return self._vnfr.member_vnf_index_ref
1456
1457 @property
1458 def config_status(self):
1459 """ Config agent status for this VNFR """
1460 return self._config_status
1461
1462 @property
1463 def public_key(self):
1464 return self._ssh_pub_key
1465
1466 @asyncio.coroutine
1467 def get_nsr_config(self):
1468 ### Need access to NS instance configuration for runtime resolution.
1469 ### This shall be replaced when deployment flavors are implemented
1470 xpath = self._project.add_project("C,/nsr:ns-instance-config")
1471 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1472
1473 for result in results:
1474 entry = yield from result
1475 ns_instance_config = entry.result
1476 for nsr in ns_instance_config.nsr:
1477 if nsr.id == self._vnfr_msg.nsr_id_ref:
1478 return nsr
1479 return None
1480
1481 @asyncio.coroutine
1482 def get_nsr_opdata(self):
1483 """ NSR opdata associated with this VNFR """
1484 xpath = self._project.add_project(
1485 "D,/nsr:ns-instance-opdata/nsr:nsr" \
1486 "[nsr:ns-instance-config-ref={}]". \
1487 format(quoted_key(self._vnfr_msg.nsr_id_ref)))
1488
1489 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1490
1491 for result in results:
1492 entry = yield from result
1493 nsr_op = entry.result
1494 return nsr_op
1495
1496 return None
1497
1498
1499 def cp_ip_addr(self, cp_name):
1500 """ Get ip address for connection point """
1501 self._log.debug("cp_ip_addr()")
1502 for cp in self._cprs:
1503 if cp.name == cp_name and cp.ip_address is not None:
1504 return cp.ip_address
1505 return "0.0.0.0"
1506
1507 def mgmt_intf_info(self):
1508 """ Get Management interface info for this VNFR """
1509 mgmt_intf_desc = self.vnfd.mgmt_interface
1510 ip_addr = None
1511 if mgmt_intf_desc.has_field("cp"):
1512 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1513 elif mgmt_intf_desc.has_field("vdu_id"):
1514 try:
1515 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1516 ip_addr = vdur.management_ip
1517 except VDURecordNotFound:
1518 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1519 ip_addr = None
1520 else:
1521 ip_addr = mgmt_intf_desc.ip_address
1522 port = mgmt_intf_desc.port
1523
1524 return ip_addr, port
1525
1526 @property
1527 def msg(self):
1528 """ Message associated with this VNFR """
1529 vnfd_fields = ["short_name", "vendor", "description", "version"]
1530 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1531
1532 mgmt_intf = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MgmtInterface()
1533 ip_address, port = self.mgmt_intf_info()
1534
1535 if ip_address:
1536 mgmt_intf.ip_address = ip_address
1537 if port is not None:
1538 mgmt_intf.port = port
1539
1540 if self._ssh_pub_key:
1541 mgmt_intf.ssh_key.public_key = self._ssh_pub_key
1542 mgmt_intf.ssh_key.private_key_file = self._ssh_key_file
1543
1544 vnfr_dict = {"id": self._vnfr_id,
1545 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1546 "name": self.name,
1547 "member_vnf_index_ref": self.member_vnf_index,
1548 "operational_status": self.operational_status,
1549 "operational_status_details": self._state_failed_reason,
1550 "datacenter": self.datacenter_name,
1551 "config_status": self._config_status
1552 }
1553
1554 vnfr_dict.update(vnfd_copy_dict)
1555
1556 vnfr_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1557 vnfr_msg.vnfd = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1558
1559 vnfr_msg.create_time = self._create_time
1560 vnfr_msg.uptime = int(time.time()) - self._create_time
1561 vnfr_msg.mgmt_interface = mgmt_intf
1562
1563 # Add all the VLRs to VNFR
1564 for vlr_id, vlr in self._vlrs.items():
1565 ivlr = vnfr_msg.internal_vlr.add()
1566 ivlr.vlr_ref = vlr.vlr_id
1567
1568 # Add all the VDUs to VDUR
1569 if self._vdus is not None:
1570 for vdu in self._vdus:
1571 vdur = vnfr_msg.vdur.add()
1572 vdur.from_dict(vdu.msg.as_dict())
1573
1574 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1575 vnfr_msg.dashboard_url = self.dashboard_url
1576
1577 for cpr in self._cprs:
1578 new_cp = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1579 vnfr_msg.connection_point.append(new_cp)
1580
1581 if self._vnf_mon is not None:
1582 for monp in self._vnf_mon.msg:
1583 vnfr_msg.monitoring_param.append(
1584 VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1585
1586 if self._vnfr.vnf_configuration is not None:
1587 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1588
1589 for group in self._vnfr_msg.placement_groups_info:
1590 group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1591 group_info.from_dict(group.as_dict())
1592 vnfr_msg.placement_groups_info.append(group_info)
1593
1594 return vnfr_msg
1595
1596 @asyncio.coroutine
1597 def update_config(self, msg, xact):
1598 self._log.debug("VNFM vnf config: {}".
1599 format(msg.vnf_configuration.as_dict()))
1600 self._config_status = msg.config_status
1601 self._vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(
1602 msg.as_dict())
1603 self._log.debug("VNFR msg config: {}".
1604 format(self._vnfr.as_dict()))
1605
1606 yield from self.publish(xact)
1607
1608 @asyncio.coroutine
1609 def update_vnfr_after_substitution(self, msg, xact):
1610 self._log.debug("Updating VNFR after Input Param Substitution: {}".
1611 format(msg.as_dict()))
1612 self._state = VirtualNetworkFunctionRecordState.INIT
1613 self._vnfd = msg.vnfd
1614 msg.operational_status = 'init'
1615 self._vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(
1616 msg.as_dict())
1617
1618 self._log.debug("VNFR updated: {}".
1619 format(self._vnfr.as_dict()))
1620 yield from self.publish(xact)
1621
1622 @property
1623 def dashboard_url(self):
1624 ip, cfg_port = self.mgmt_intf_info()
1625 protocol = 'http'
1626 http_port = 80
1627 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1628 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1629 protocol = 'https'
1630 http_port = 443
1631 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1632 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1633
1634 url = "{protocol}://{ip_address}:{port}/{path}".format(
1635 protocol=protocol,
1636 ip_address=ip,
1637 port=http_port,
1638 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1639 )
1640
1641 return url
1642
1643 @property
1644 def xpath(self):
1645 """ path for this VNFR """
1646 return self._project.add_project("D,/vnfr:vnfr-catalog"
1647 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self.vnfr_id)))
1648
1649 @asyncio.coroutine
1650 def publish(self, xact):
1651 """ publish this VNFR """
1652 vnfr = self.msg
1653 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1654 self.xpath, self.msg)
1655 vnfr.create_time = self._create_time
1656 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1657 self._log.debug("Published VNFR path = [%s], record = [%s]",
1658 self.xpath, self.msg)
1659
1660 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1661 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1662 if not vld.has_field('ip_profile_ref'):
1663 return None
1664 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1665 return profile[0] if profile else None
1666
1667 @asyncio.coroutine
1668 def create_vls(self):
1669 """ Publish The VLs associated with this VNF """
1670 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1671 self.vnfd_id)
1672 for ivld_msg in self.vnfd.internal_vld:
1673 self._log.debug("Creating internal vld:"
1674 " %s, int_cp_ref = %s",
1675 ivld_msg, ivld_msg.internal_connection_point
1676 )
1677 vlr = InternalVirtualLinkRecord(dts=self._dts,
1678 log=self._log,
1679 loop=self._loop,
1680 project=self._project,
1681 vnfm=self._vnfm,
1682 ivld_msg=ivld_msg,
1683 vnfr_name=self.name,
1684 datacenter_name=self.datacenter_name,
1685 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1686 )
1687 self._vlrs[vlr.vlr_id] = vlr
1688 self._vnfm.add_vlr_id_vnfr_map(vlr.vlr_id, self)
1689
1690 for int_cp in ivld_msg.internal_connection_point:
1691 if int_cp.id_ref in self._vlr_by_cp:
1692 msg = ("Connection point %s already "
1693 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1694 raise InternalVirtualLinkRecordError(msg)
1695 self._log.debug("Setting vlr %s to internal cp = %s",
1696 vlr, int_cp.id_ref)
1697 self._vlr_by_cp[int_cp.id_ref] = vlr
1698
1699 @asyncio.coroutine
1700 def instantiate_vls(self, xact, restart_mode=False):
1701 """ Instantiate the VLs associated with this VNF """
1702 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1703 self.vnfd_id)
1704
1705 for vlr_id, vlr in self._vlrs.items():
1706 self._log.debug("Instantiating VLR %s", vlr)
1707 yield from vlr.instantiate(xact, restart_mode)
1708
1709 # Wait for the VLs to be ready before yielding control out
1710 if self._vlrs:
1711 self._log.debug("VNFR id:%s, name:%s - Waiting for %d VLs to be ready",
1712 self.vnfr_id, self.name, len(self._vlrs))
1713 yield from self._vls_ready.wait()
1714 else:
1715 self._log.debug("VNFR id:%s, name:%s, No virtual links found",
1716 self.vnfr_id, self.name)
1717 self._vls_ready.set()
1718
1719 def find_vlr_by_cp(self, cp_name):
1720 """ Find the VLR associated with the cp name """
1721 return self._vlr_by_cp[cp_name]
1722
1723 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1724 """
1725 Returns the cloud specific construct for placement group
1726 Arguments:
1727 input_group: VNFD PlacementGroup
1728 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1729 """
1730 copy_dict = ['name', 'requirement', 'strategy']
1731 for group_info in nsr_config.vnfd_placement_group_maps:
1732 if group_info.placement_group_ref == input_group.name and \
1733 group_info.vnfd_id_ref == self.vnfd_id:
1734 group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1735 group_dict = {k:v for k,v in
1736 group_info.as_dict().items()
1737 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1738 for param in copy_dict:
1739 group_dict.update({param: getattr(input_group, param)})
1740 group.from_dict(group_dict)
1741 return group
1742 return None
1743
1744 @asyncio.coroutine
1745 def get_vdu_placement_groups(self, vdu, nsr_config):
1746 placement_groups = []
1747 ### Step-1: Get VNF level placement groups
1748 for group in self._vnfr_msg.placement_groups_info:
1749 #group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1750 #group_info.from_dict(group.as_dict())
1751 placement_groups.append(group)
1752
1753 ### Step-2: Get VDU level placement groups
1754 for group in self.vnfd.placement_groups:
1755 for member_vdu in group.member_vdus:
1756 if member_vdu.member_vdu_ref == vdu.id:
1757 group_info = self.resolve_placement_group_cloud_construct(group,
1758 nsr_config)
1759 if group_info is None:
1760 self._log.info("Could not resolve cloud-construct for " +
1761 "placement group: %s", group.name)
1762 else:
1763 self._log.info("Successfully resolved cloud construct for " +
1764 "placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1765 str(group_info),
1766 vdu.name,
1767 self.vnf_name,
1768 self.member_vnf_index)
1769 placement_groups.append(group_info)
1770
1771 return placement_groups
1772
1773 @asyncio.coroutine
1774 def substitute_vdu_input_parameters(self, vdu):
1775 result = vdu
1776 for vdu_vnfr in self.vnfd.vdu:
1777 if vdu["id"] == vdu_vnfr.id:
1778 result = vdu_vnfr.as_dict()
1779 break
1780
1781 return RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd_Vdu.from_dict(result)
1782
1783
1784 @asyncio.coroutine
1785 def vdu_cloud_init_instantiation(self):
1786 [vdu.vdud_cloud_init for vdu in self._vdus]
1787
1788 @asyncio.coroutine
1789 def create_vdus(self, vnfr, restart_mode=False):
1790 """ Create the VDUs associated with this VNF """
1791
1792 def get_vdur_id(vdud):
1793 """Get the corresponding VDUR's id for the VDUD. This is useful in
1794 case of a restart.
1795
1796 In restart mode we check for exiting VDUR's ID and use them, if
1797 available. This way we don't end up creating duplicate VDURs
1798 """
1799 vdur_id = None
1800
1801 if restart_mode and vdud is not None:
1802 try:
1803 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1804 vdur_id = vdur[0]
1805 except IndexError:
1806 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1807
1808 return vdur_id
1809
1810
1811 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1812
1813 # Get NSR config - Needed for placement groups and to derive VDU short-name
1814 nsr_config = yield from self.get_nsr_config()
1815
1816 for vdu in self._rw_vnfd.vdu:
1817 self._log.debug("Creating vdu: %s", vdu)
1818 vdur_id = get_vdur_id(vdu)
1819
1820
1821 placement_groups = yield from self.get_vdu_placement_groups(vdu, nsr_config)
1822 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s, Existing vdur_id %s",
1823 vdu.name,
1824 self.vnf_name,
1825 self.member_vnf_index,
1826 [ group.name for group in placement_groups],
1827 vdur_id)
1828
1829 # Update VDU Info from VNFR (It contains the input parameter for VDUs as well)
1830 vdu_updated = yield from self.substitute_vdu_input_parameters(vdu.as_dict())
1831
1832 vdur = VirtualDeploymentUnitRecord(
1833 dts=self._dts,
1834 log=self._log,
1835 loop=self._loop,
1836 project = self._project,
1837 vdud=vdu_updated,
1838 vnfr=vnfr,
1839 nsr_config=nsr_config,
1840 mgmt_intf=self.has_mgmt_interface(vdu),
1841 mgmt_network=self._mgmt_network,
1842 datacenter_name=self.datacenter_name,
1843 vnfd_package_store=self._vnfd_package_store,
1844 vdur_id=vdur_id,
1845 placement_groups = placement_groups,
1846 )
1847 yield from vdur.vdu_opdata_register()
1848
1849 self._vdus.append(vdur)
1850
1851 @asyncio.coroutine
1852 def instantiate_vdus(self, xact, vnfr):
1853 """ Instantiate the VDUs associated with this VNF """
1854 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1855
1856 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1857
1858 # Identify any dependencies among the VDUs
1859 dependencies = collections.defaultdict(list)
1860 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1861
1862 for vdu in self._vdus:
1863 if vdu._vdud_cloud_init is not None:
1864 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1865 if vdu_id != vdu.vdu_id:
1866 # This means that vdu.vdu_id depends upon vdu_id,
1867 # i.e. vdu_id must be instantiated before
1868 # vdu.vdu_id.
1869 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1870
1871 # Define the terminal states of VDU instantiation
1872 terminal = (
1873 VDURecordState.READY,
1874 VDURecordState.TERMINATED,
1875 VDURecordState.FAILED,
1876 )
1877
1878 datastore = VdurDatastore()
1879 processed = set()
1880
1881 @asyncio.coroutine
1882 def instantiate_monitor(vdu):
1883 """Monitor the state of the VDU during instantiation
1884
1885 Arguments:
1886 vdu - a VirtualDeploymentUnitRecord
1887
1888 """
1889 # wait for the VDUR to enter a terminal state
1890 while vdu._state not in terminal:
1891 yield from asyncio.sleep(1, loop=self._loop)
1892 # update the datastore
1893 datastore.update(vdu)
1894
1895 # add the VDU to the set of processed VDUs
1896 processed.add(vdu.vdu_id)
1897
1898 @asyncio.coroutine
1899 def instantiate(vdu):
1900 """Instantiate the specified VDU
1901
1902 Arguments:
1903 vdu - a VirtualDeploymentUnitRecord
1904
1905 Raises:
1906 if the VDU, or any of the VDUs this VDU depends upon, are
1907 terminated or fail to instantiate properly, a
1908 VirtualDeploymentUnitRecordError is raised.
1909
1910 """
1911
1912 for dependency in dependencies[vdu.vdu_id]:
1913 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1914
1915 while dependency.vdu_id not in processed:
1916 yield from asyncio.sleep(1, loop=self._loop)
1917
1918 if not dependency.active:
1919 raise VirtualDeploymentUnitRecordError()
1920
1921 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1922
1923 # Populate the datastore with the current values of the VDU
1924 datastore.add(vdu)
1925
1926 # Substitute any variables contained in the cloud config script
1927 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1928
1929 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1930
1931 if len(parts) > 1:
1932
1933 # Extract the variable names
1934 variables = list()
1935 for variable in parts[1::2]:
1936 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1937
1938 # Iterate of the variables and substitute values from the
1939 # datastore.
1940
1941 for variable in variables:
1942
1943 # Handle a reference to a VDU by ID
1944 if variable.startswith('vdu['):
1945 value = datastore.get(variable)
1946 if value is None:
1947 msg = "Unable to find a substitute for {} in {} cloud-init script"
1948 raise ValueError(msg.format(variable, vdu.vdu_id))
1949
1950 config = config.replace("{{ %s }}" % variable, value)
1951 continue
1952
1953 # Handle a reference to the current VDU
1954 if variable.startswith('vdu'):
1955 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1956 config = config.replace("{{ %s }}" % variable, value)
1957 continue
1958
1959 # Handle a reference to Cloud Init Variables: Start with 'CI'
1960 if variable.startswith('CI'):
1961 custom_meta_data = datastore.get('vdu[{}]'.format(vdu.vdu_id) + ".custom_meta_data")
1962 try:
1963 for meta_data in custom_meta_data:
1964 if meta_data.destination == 'CLOUD_INIT':
1965 if meta_data.name == variable:
1966 config = config.replace("{{ %s }}" % variable, meta_data.value)
1967 except Exception:
1968 raise ValueError("Unrecognized Cloud Init Variable")
1969
1970 continue
1971
1972 # Handle unrecognized variables
1973 msg = 'unrecognized cloud-config variable: {}'
1974 raise ValueError(msg.format(variable))
1975
1976 # Instantiate the VDU
1977 with self._dts.transaction() as xact:
1978 self._log.debug("Instantiating vdu: %s", vdu)
1979 yield from vdu.instantiate(xact, vnfr, config=config)
1980 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1981 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1982 self.vnfr_id, vdu)
1983
1984 # First create a set of tasks to monitor the state of the VDUs and
1985 # report when they have entered a terminal state
1986 for vdu in self._vdus:
1987 self._loop.create_task(instantiate_monitor(vdu))
1988
1989 for vdu in self._vdus:
1990 self._loop.create_task(instantiate(vdu))
1991
1992 def has_mgmt_interface(self, vdu):
1993 # ## TODO: Support additional mgmt_interface type options
1994 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1995 return True
1996 return False
1997
1998 def vlr_xpath(self, vlr_id):
1999 """ vlr xpath """
2000 return self._project.add_project("D,/vlr:vlr-catalog/"
2001 "vlr:vlr[vlr:id={}]".format(quoted_key(vlr_id)))
2002
2003 def ext_vlr_by_id(self, vlr_id):
2004 """ find ext vlr by id """
2005 return self._ext_vlrs[vlr_id]
2006
2007 def all_vdus_active(self):
2008 """ Are all VDUS in this VNFR active? """
2009 for vdu in self._vdus:
2010 if not vdu.active:
2011 return False
2012
2013 self._log.debug("Inside all_vdus_active. Returning True")
2014 return True
2015
2016 @asyncio.coroutine
2017 def instantiation_failed(self, failed_reason=None):
2018 """ VNFR instantiation failed """
2019 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
2020 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
2021 self._state_failed_reason = failed_reason
2022
2023 # Update the VNFR with the changed status
2024 yield from self.publish(None)
2025
2026 @asyncio.coroutine
2027 def is_ready(self):
2028 """ This VNF is ready"""
2029 self._log.debug("VNFR id %s is ready", self.vnfr_id)
2030
2031 if self._state != VirtualNetworkFunctionRecordState.FAILED:
2032 self.set_state(VirtualNetworkFunctionRecordState.READY)
2033
2034 else:
2035 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
2036
2037 # Update the VNFR with the changed status
2038 yield from self.publish(None)
2039
2040 def update_cp(self, cp_name, ip_address, mac_addr, cp_id, virtual_cps = list()):
2041 """Updated the connection point with ip address"""
2042 for cp in self._cprs:
2043 if cp.name == cp_name:
2044 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
2045 cp_name, cp, ip_address, cp_id)
2046 cp.ip_address = ip_address
2047 cp.mac_address = mac_addr
2048 cp.connection_point_id = cp_id
2049 if virtual_cps:
2050 cp.virtual_cps = [VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint_VirtualCps.from_dict(v) for v in virtual_cps]
2051 return
2052
2053 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
2054 self._log.debug(err)
2055 raise VirtualDeploymentUnitRecordError(err)
2056
2057 def set_state(self, state):
2058 """ Set state for this VNFR"""
2059 self._state = state
2060
2061 @asyncio.coroutine
2062 def instantiate(self, xact, restart_mode=False):
2063 """ instantiate this VNF """
2064 self._log.info("Instantiate VNF {}: {}".format(self._vnfr_id, self._state))
2065 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
2066 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
2067
2068 nsr_op = yield from self.get_nsr_opdata()
2069 if nsr_op:
2070 self._ssh_key_file = nsr_op.ssh_key_generated.private_key_file
2071 self._ssh_pub_key = nsr_op.ssh_key_generated.public_key
2072
2073 @asyncio.coroutine
2074 def fetch_vlrs():
2075 """ Fetch VLRs """
2076 # Iterate over all the connection points in VNFR and fetch the
2077 # associated VLRs
2078
2079 def cpr_from_cp(cp):
2080 """ Creates a record level connection point from the desciptor cp"""
2081 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled", "type_yang"]
2082 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
2083 cpr_dict = {}
2084 cpr_dict.update(cp_copy_dict)
2085 return VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
2086
2087 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
2088 self._vnfr_id, self._vnfr.connection_point)
2089
2090 for cp in self._vnfr.connection_point:
2091 cpr = cpr_from_cp(cp)
2092 self._cprs.append(cpr)
2093 self._log.debug("Adding Connection point record %s ", cp)
2094
2095 vlr_path = self.vlr_xpath(cp.vlr_ref)
2096 self._log.debug("Fetching VLR with path = %s", vlr_path)
2097 res_iter = yield from self._dts.query_read(vlr_path,
2098 rwdts.XactFlag.MERGE)
2099 for i in res_iter:
2100 r = yield from i
2101 d = r.result
2102 self._ext_vlrs[cp.vlr_ref] = d
2103 cpr.vlr_ref = cp.vlr_ref
2104 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
2105
2106 # Increase the VNFD reference count
2107 self.vnfd_ref()
2108
2109 assert self.vnfd
2110
2111 # Fetch External VLRs
2112 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
2113 yield from fetch_vlrs()
2114
2115 # Publish VLs
2116 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
2117 yield from self.create_vls()
2118
2119 # publish the VNFR
2120 self._log.debug("Publish VNFR {}: {}".format(self._vnfr_id, self._state))
2121 yield from self.publish(xact)
2122
2123
2124 # instantiate VLs
2125 self._log.debug("VNFR-ID %s: Instantiate VLs, restart mode %s", self._vnfr_id, restart_mode)
2126 try:
2127 yield from self.instantiate_vls(xact, restart_mode)
2128 except Exception as e:
2129 self._log.exception("VL instantiation failed (%s)", str(e))
2130 yield from self.instantiation_failed(str(e))
2131 return
2132
2133 vl_state, failed_vl = self.vl_instantiation_state()
2134 if vl_state == VlRecordState.FAILED:
2135 self._log.error("VL Instantiation failed for one or more of the internal virtual links, vl:%s",failed_vl)
2136 yield from self.instantiation_failed(failed_vl.state_details)
2137 return
2138
2139 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
2140
2141 # instantiate VDUs
2142 self._log.debug("VNFR-ID %s: Create VDUs, restart mode %s", self._vnfr_id, restart_mode)
2143 yield from self.create_vdus(self, restart_mode)
2144
2145 try:
2146 yield from self.vdu_cloud_init_instantiation()
2147 except Exception as e:
2148 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
2149 self._state_failed_reason = str(e)
2150 yield from self.publish(xact)
2151
2152 # publish the VNFR
2153 self._log.debug("VNFR {}: Publish VNFR with state {}".
2154 format(self._vnfr_id, self._state))
2155 yield from self.publish(xact)
2156
2157 # instantiate VDUs
2158 # ToDo: Check if this should be prevented during restart
2159 self._log.debug("Instantiate VDUs {}: {}".format(self._vnfr_id, self._state))
2160 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
2161
2162 # publish the VNFR
2163 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
2164 yield from self.publish(xact)
2165
2166 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
2167
2168 @asyncio.coroutine
2169 def terminate(self, xact):
2170 """ Terminate this virtual network function """
2171
2172 if self._task:
2173 self._log.debug("Canceling scheduled tasks for VNFR %s", self._vnfr_id)
2174 self._task.cancel()
2175
2176 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
2177
2178 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
2179
2180 # stop monitoring
2181 if self._vnf_mon is not None:
2182 self._vnf_mon.stop()
2183 self._vnf_mon.deregister()
2184 self._vnf_mon = None
2185
2186 @asyncio.coroutine
2187 def terminate_vls():
2188 """ Terminate VLs in this VNF """
2189 for vlr_id, vl in self._vlrs.items():
2190 self._vnfm.remove_vlr_id_vnfr_map(vlr_id)
2191 yield from vl.terminate(xact)
2192
2193 @asyncio.coroutine
2194 def terminate_vdus():
2195 """ Terminate VDUS in this VNF """
2196 for vdu in self._vdus:
2197 yield from vdu.terminate(xact)
2198
2199 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
2200 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
2201 yield from terminate_vls()
2202
2203 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
2204 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
2205 yield from terminate_vdus()
2206
2207 self._log.debug("Terminated VNF id %s", self.vnfr_id)
2208 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
2209
2210 # Unref the VNFD
2211 self.vnfd_unref()
2212
2213 def vl_instantiation_state(self):
2214 """ Get the state of VL instantiation of this VNF """
2215 failed_vl = None
2216 for vl_id, vlr in self._vlrs.items():
2217 if vlr.state == VlRecordState.ACTIVE:
2218 continue
2219 elif vlr.state == VlRecordState.FAILED:
2220 failed_vl = vlr
2221 return VlRecordState.FAILED, failed_vl
2222 elif vlr.state == VlRecordState.INSTANTIATION_PENDING:
2223 failed_vl = vlr, failed_vl
2224 return VlRecordState.INSTANTIATION_PENDING, failed_vl
2225 else:
2226 self._log.debug("vlr %s still in state %s", vlr, vlr.state)
2227 raise VlRecordError("Invalid state %s", vlr.state)
2228 return VlRecordState.ACTIVE, failed_vl
2229
2230 def vl_instantiation_successful(self):
2231 """ Mark that all VLs in this VNF are active """
2232 if self._vls_ready.is_set():
2233 self._log.debug("VNFR id %s, vls_ready is already set", self.id)
2234
2235 vl_state, failed_vl = self.vl_instantiation_state()
2236
2237 if vl_state == VlRecordState.ACTIVE:
2238 self._log.info("VNFR id:%s name:%s has all Virtual Links in active state, Ready to orchestrate VDUs",
2239 self.vnfr_id, self.name)
2240 self._vls_ready.set()
2241
2242 elif vl_state == VlRecordState.FAILED:
2243 self._log.error("VNFR id:%s name:%s One of the Virtual Links failed to reach active state.Failed to orchestrate VNF",
2244 self.vnfr_id, self.name)
2245 self.instantiation_failed("VNFR id %s: failed since VL %s did not come up".format(self.vnfr_id, failed_vl.name))
2246 self._vls_ready.set()
2247
2248 def find_vlr(self, vlr_id):
2249 """ Find VLR matching the passed VLR id """
2250
2251 if vlr_id in self._vlrs:
2252 return self._vlrs[vlr_id]
2253 return None
2254
2255 def vlr_event(self, vlr, action):
2256 self._log.debug("Received VLR %s with action:%s", vlr, action)
2257
2258 vlr_local = self.find_vlr(vlr.id)
2259 if vlr_local is None:
2260 self._log.error("VLR %s:%s received for unknown id, state:%s ignoring event",
2261 vlr.id, vlr.name, vlr.state)
2262 return
2263
2264 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
2265 if vlr.operational_status == 'running':
2266 vlr_local.set_state_from_op_status(vlr.operational_status, vlr.operational_status_details)
2267 self._log.info("VLR %s:%s moving to active state",
2268 vlr.id, vlr.name)
2269 elif vlr.operational_status == 'failed':
2270 vlr_local.set_state_from_op_status(vlr.operational_status, vlr.operational_status_details)
2271 self._log.info("VLR %s:%s moving to failed state",
2272 vlr.id, vlr.name)
2273 else:
2274 self._log.warning("VLR %s:%s received state:%s",
2275 vlr.id, vlr.name, vlr.operational_status)
2276
2277 if vlr.has_field('network_id'):
2278 vlr_local.network_id = vlr.network_id
2279
2280 # Check if vl instantiation successful for this VNFR
2281 self.vl_instantiation_successful()
2282
2283
2284 class VnfdDtsHandler(object):
2285 """ DTS handler for VNFD config changes """
2286 XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
2287
2288 def __init__(self, dts, log, loop, vnfm):
2289 self._dts = dts
2290 self._log = log
2291 self._loop = loop
2292 self._vnfm = vnfm
2293 self._regh = None
2294 self._reg_ready = 0
2295
2296 @asyncio.coroutine
2297 def regh(self):
2298 """ DTS registration handle """
2299 return self._regh
2300
2301 def deregister(self):
2302 '''De-register from DTS'''
2303 self._log.debug("De-register VNFD DTS handler for project {}".
2304 format(self._vnfm._project.name))
2305 if self._regh:
2306 self._regh.deregister()
2307 self._regh = None
2308
2309 @asyncio.coroutine
2310 def register(self):
2311 """ Register for VNFD configuration"""
2312
2313 @asyncio.coroutine
2314 def on_apply(dts, acg, xact, action, scratch):
2315 """Apply the configuration"""
2316 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2317 xact, action, scratch)
2318
2319 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2320 # Create/Update a VNFD record
2321 if self._regh:
2322 for cfg in self._regh.get_xact_elements(xact):
2323 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2324 if cfg.id in scratch.get('vnfds', []) or is_recovery:
2325 self._vnfm.update_vnfd(cfg)
2326 else:
2327 self._log.warning("Reg handle none for {} in project {}".
2328 format(self.__class__, self._vnfm._project))
2329
2330 scratch.pop('vnfds', None)
2331
2332 if is_recovery:
2333 #yield from self._vnfm.vnfr_handler.register()
2334 #yield from self._vnfm.vnfr_ref_handler.register()
2335 self._reg_ready = 1
2336
2337 @asyncio.coroutine
2338 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2339 """ on prepare callback """
2340 xpath = ks_path.to_xpath(RwVnfmYang.get_schema())
2341 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2342 xpath,
2343 xact_info.query_action, msg)
2344 fref = ProtobufC.FieldReference.alloc()
2345 fref.goto_whole_message(msg.to_pbcm())
2346
2347 # Handle deletes in prepare_callback
2348 if fref.is_field_deleted():
2349 # Delete an VNFD record
2350 self._log.debug("Deleting VNFD with id %s", msg.id)
2351 if self._vnfm.vnfd_in_use(msg.id):
2352 self._log.debug("Cannot delete VNFD in use - %s", msg)
2353 err_msg = "Cannot delete a VNFD in use - %s" % msg
2354 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, xpath, err_msg)
2355 xact_info.respond_xpath(rwdts.XactRspCode.NACK, xpath)
2356 return
2357 # Delete a VNFD record
2358 yield from self._vnfm.delete_vnfd(msg.id)
2359
2360 try:
2361 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2362 except rift.tasklets.dts.ResponseError as e:
2363 self._log.warning(
2364 "VnfdDtsHandler in project {} with path {} for action {} failed: {}".
2365 format(self._vnfm._project, xpath, xact_info.query_action, e))
2366
2367 xpath = self._vnfm._project.add_project(VnfdDtsHandler.XPATH)
2368 self._log.debug("Registering for VNFD config using xpath: {}".
2369 format(xpath))
2370
2371 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2372 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2373 self._regh = acg.register(
2374 xpath=xpath,
2375 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2376 on_prepare=on_prepare)
2377
2378 class VnfrConsoleOperdataDtsHandler(object):
2379 """
2380 Registers 'D,/rw-project:project/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]'
2381 and handles CRUD from DTS
2382 """
2383
2384 @property
2385 def vnfr_vdu_console_xpath(self):
2386 """ path for resource-mgr"""
2387 return self._project.add_project(
2388 "D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id={}]".format(quoted_key(self._vnfr_id)) +
2389 "/rw-vnfr:vdur[vnfr:id={}]".format(quoted_key(self._vdur_id)))
2390
2391 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2392 self._dts = dts
2393 self._log = log
2394 self._loop = loop
2395 self._regh = None
2396 self._vnfm = vnfm
2397
2398 self._vnfr_id = vnfr_id
2399 self._vdur_id = vdur_id
2400 self._vdu_id = vdu_id
2401
2402 self._project = vnfm._project
2403
2404 def deregister(self):
2405 '''De-register from DTS'''
2406 self._log.debug("De-register VNFR console DTS handler for project {}".
2407 format(self._project))
2408 if self._regh:
2409 self._regh.deregister()
2410 self._regh = None
2411
2412 @asyncio.coroutine
2413 def register(self):
2414 """ Register for VNFR VDU Operational Data read from dts """
2415
2416 @asyncio.coroutine
2417 def on_prepare(xact_info, action, ks_path, msg):
2418 """ prepare callback from dts """
2419 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2420 self._log.debug(
2421 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2422 xact_info, action, xpath, msg
2423 )
2424
2425 if action == rwdts.QueryAction.READ:
2426 schema = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur.schema()
2427 path_entry = schema.keyspec_to_entry(ks_path)
2428 self._log.debug("VDU Opdata path is {}".format(path_entry.key00.id))
2429 try:
2430 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2431 except VnfRecordError as e:
2432 self._log.error("VNFR id %s not found", self._vnfr_id)
2433 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2434 return
2435 try:
2436 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2437 if not vdur._state == VDURecordState.READY:
2438 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2439 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2440 return
2441 with self._dts.transaction() as new_xact:
2442 resp = yield from vdur.read_resource(new_xact)
2443 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2444 vdur_console.id = self._vdur_id
2445 if resp.console_url:
2446 vdur_console.console_url = resp.console_url
2447 else:
2448 vdur_console.console_url = 'none'
2449 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2450 except Exception:
2451 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2452 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2453 vdur_console.id = self._vdur_id
2454 vdur_console.console_url = 'none'
2455
2456 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2457 xpath=self.vnfr_vdu_console_xpath,
2458 msg=vdur_console)
2459 else:
2460 #raise VnfRecordError("Not supported operation %s" % action)
2461 self._log.error("Not supported operation %s" % action)
2462 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2463 return
2464
2465
2466 self._log.debug("Registering for VNFR VDU using xpath: %s",
2467 self.vnfr_vdu_console_xpath)
2468 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2469 with self._dts.group_create() as group:
2470 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2471 handler=hdl,
2472 flags=rwdts.Flag.PUBLISHER,
2473 )
2474
2475
2476 class VnfrDtsHandler(object):
2477 """ registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2478 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2479
2480 def __init__(self, dts, log, loop, vnfm):
2481 self._dts = dts
2482 self._log = log
2483 self._loop = loop
2484 self._vnfm = vnfm
2485
2486 self._regh = None
2487 self._project = vnfm._project
2488
2489 @property
2490 def regh(self):
2491 """ Return registration handle"""
2492 return self._regh
2493
2494 @property
2495 def vnfm(self):
2496 """ Return VNF manager instance """
2497 return self._vnfm
2498
2499 def deregister(self):
2500 '''De-register from DTS'''
2501 self._log.debug("De-register VNFR DTS handler for project {}".
2502 format(self._project))
2503 if self._regh:
2504 self._regh.deregister()
2505 self._regh = None
2506
2507 @asyncio.coroutine
2508 def register(self):
2509 """ Register for vnfr create/update/delete/read requests from dts """
2510
2511 @asyncio.coroutine
2512 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2513
2514 @asyncio.coroutine
2515 def instantiate_realloc_vnfr(vnfr):
2516 """Re-populate the vnfm after restart
2517
2518 Arguments:
2519 vlink
2520
2521 """
2522
2523 yield from vnfr.instantiate(None, restart_mode=True)
2524
2525 self._log.debug("Got on_event in vnfm: {}".format(xact_event))
2526
2527 if xact_event == rwdts.MemberEvent.INSTALL:
2528 curr_cfg = self.regh.elements
2529 for cfg in curr_cfg:
2530 try:
2531 vnfr = self.vnfm.create_vnfr(cfg, restart_mode = True)
2532 if vnfr is None:
2533 self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(cfg.id))
2534 else:
2535 self._log.debug("Creating VNFR {}".format(vnfr.vnfr_id))
2536 except Exception as e:
2537 self._log.exception(e)
2538 raise e
2539
2540 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2541
2542 return rwdts.MemberRspCode.ACTION_OK
2543
2544 @asyncio.coroutine
2545 def on_prepare(xact_info, action, ks_path, msg):
2546 """ prepare callback from dts """
2547 self._log.debug(
2548 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2549 xact_info, action, msg
2550 )
2551
2552 @asyncio.coroutine
2553 def create_vnf(vnfr):
2554
2555 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2556
2557 if msg.operational_status == 'pre_init':
2558 vnfr.set_state(VirtualNetworkFunctionRecordState.PRE_INIT)
2559 yield from vnfr.publish(None)
2560
2561 if vnfr.external_ro:
2562 return
2563
2564 if msg.operational_status == 'init':
2565 vnfr._init = True
2566 def on_instantiate_done(fut):
2567 # If the do_instantiate fails, then publish NSR with failed result
2568 e = fut.exception()
2569 if e is not None:
2570 import traceback, sys
2571 print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True)
2572 self._log.exception("VNFR instantiation failed for VNFR id %s: %s", vnfr.vnfr_id, str(e))
2573 self._loop.create_task(vnfr.instantiation_failed(failed_reason=str(e)))
2574
2575 try:
2576 # RIFT-9105: Unable to add a READ query under an existing transaction
2577 # xact = xact_info.xact
2578 assert vnfr.task is None
2579 vnfr.task = self._loop.create_task(vnfr.instantiate(None))
2580 vnfr.task.add_done_callback(on_instantiate_done)
2581
2582
2583 except Exception as e:
2584 self._log.exception(e)
2585 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2586 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2587 yield from vnfr.publish(None)
2588
2589 return
2590
2591 if action == rwdts.QueryAction.CREATE:
2592 if not msg.has_field("vnfd"):
2593 err = "Vnfd not provided"
2594 self._log.error(err)
2595 raise VnfRecordError(err)
2596 vnfr = self.vnfm.create_vnfr(msg)
2597 if vnfr is None:
2598 self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg.id))
2599 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2600 else:
2601 yield from create_vnf(vnfr)
2602 return
2603
2604 elif action == rwdts.QueryAction.DELETE:
2605 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2606 path_entry = schema.keyspec_to_entry(ks_path)
2607 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2608
2609 if vnfr is None:
2610 self._log.error("VNFR id %s not found for delete", path_entry.key00.id)
2611 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2612 return
2613 # Preventing exception here if VNFR id is not found. This means delete is
2614 # invoked before Creation.
2615 # raise VirtualNetworkFunctionRecordNotFound(
2616 # "VNFR id %s", path_entry.key00.id)
2617
2618 try:
2619 if not vnfr.external_ro:
2620 yield from vnfr.terminate(xact_info.xact)
2621 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2622 except Exception as e:
2623 self._log.exception(e)
2624 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2625
2626 elif action == rwdts.QueryAction.UPDATE:
2627 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2628 path_entry = schema.keyspec_to_entry(ks_path)
2629 vnfr = None
2630 try:
2631 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2632
2633 if vnfr is None:
2634 # This means one of two things : The VNFR has been deleted or its a Launchpad restart.
2635 if msg.id in self._vnfm._deleted_vnfrs:
2636 # VNFR is deleted.
2637 self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg.id))
2638 return
2639
2640 self._log.debug("Launchpad Restart - Recreating VNFR - %s", msg.id)
2641 vnfr = self.vnfm.create_vnfr(msg)
2642 if vnfr is None:
2643 self._log.error("Not Creating VNFR {} as corresponding NS is terminated".format(msg.id))
2644 else:
2645 yield from create_vnf(vnfr)
2646
2647 return
2648
2649 except Exception as e:
2650 self._log.error("Exception in VNFR Update : %s", str(e))
2651 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2652 return
2653
2654 if vnfr.external_ro:
2655 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2656 return
2657
2658 if (msg.operational_status == 'pre_init' and not vnfr._init):
2659 # Creating VNFR INSTANTIATION TASK
2660 self._log.debug("VNFR {} update after substitution {} (operational_status {})".
2661 format(vnfr.name, msg.vnfd, msg.operational_status))
2662 yield from vnfr.update_vnfr_after_substitution(msg, xact_info)
2663 yield from create_vnf(vnfr)
2664 return
2665
2666 else:
2667 self._log.debug("VNFR {} update config status {} (current {})".
2668 format(vnfr.name, msg.config_status, vnfr.config_status))
2669 # Update the config and publish
2670 yield from vnfr.update_config(msg, xact_info)
2671
2672 else:
2673 raise NotImplementedError(
2674 "%s action on VirtualNetworkFunctionRecord not supported",
2675 action)
2676
2677 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2678
2679 xpath = self._project.add_project(VnfrDtsHandler.XPATH)
2680 self._log.debug("Registering for VNFR using xpath: {}".
2681 format(xpath))
2682
2683 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2684 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2685 with self._dts.group_create(handler=handlers) as group:
2686 self._regh = group.register(xpath=xpath,
2687 handler=hdl,
2688 flags=(rwdts.Flag.PUBLISHER |
2689 rwdts.Flag.SHARED |
2690 rwdts.Flag.NO_PREP_READ |
2691 rwdts.Flag.DATASTORE),)
2692
2693 @asyncio.coroutine
2694 def create(self, xact, xpath, msg):
2695 """
2696 Create a VNFR record in DTS with path and message
2697 """
2698 path = self._project.add_project(xpath)
2699 self._log.debug("Creating VNFR xact = %s, %s:%s",
2700 xact, path, msg)
2701
2702 self.regh.create_element(path, msg)
2703 self._log.debug("Created VNFR xact = %s, %s:%s",
2704 xact, path, msg)
2705
2706 @asyncio.coroutine
2707 def update(self, xact, xpath, msg, flags=rwdts.XactFlag.REPLACE):
2708 """
2709 Update a VNFR record in DTS with path and message
2710 """
2711 path = self._project.add_project(xpath)
2712 self._log.debug("Updating VNFR xact = %s, %s:%s",
2713 xact, path, msg)
2714 self.regh.update_element(path, msg, flags)
2715 self._log.debug("Updated VNFR xact = %s, %s:%s",
2716 xact, path, msg)
2717
2718 @asyncio.coroutine
2719 def delete(self, xact, xpath):
2720 """
2721 Delete a VNFR record in DTS with path and message
2722 """
2723 path = self._project.add_project(xpath)
2724 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2725 self.regh.delete_element(path)
2726 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2727
2728
2729 class VnfdRefCountDtsHandler(object):
2730 """ The VNFD Ref Count DTS handler """
2731 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2732
2733 def __init__(self, dts, log, loop, vnfm):
2734 self._dts = dts
2735 self._log = log
2736 self._loop = loop
2737 self._vnfm = vnfm
2738
2739 self._regh = None
2740
2741 @property
2742 def regh(self):
2743 """ Return registration handle """
2744 return self._regh
2745
2746 @property
2747 def vnfm(self):
2748 """ Return the NS manager instance """
2749 return self._vnfm
2750
2751 def deregister(self):
2752 '''De-register from DTS'''
2753 self._log.debug("De-register VNFD Ref DTS handler for project {}".
2754 format(self._vnfm._project))
2755 if self._regh:
2756 self._regh.deregister()
2757 self._regh = None
2758
2759 @asyncio.coroutine
2760 def register(self):
2761 """ Register for VNFD ref count read from dts """
2762
2763 @asyncio.coroutine
2764 def on_prepare(xact_info, action, ks_path, msg):
2765 """ prepare callback from dts """
2766 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2767 self._log.debug(
2768 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2769 xact_info, action, xpath, msg
2770 )
2771
2772 if action == rwdts.QueryAction.READ:
2773 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount.schema()
2774 path_entry = schema.keyspec_to_entry(ks_path)
2775 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2776 for xpath, msg in vnfd_list:
2777 self._log.debug("Responding to ref count query path:%s, msg:%s",
2778 xpath, msg)
2779 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2780 xpath=xpath,
2781 msg=msg)
2782 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2783 else:
2784 raise VnfRecordError("Not supported operation %s" % action)
2785
2786 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2787 with self._dts.group_create() as group:
2788 self._regh = group.register(xpath=self._vnfm._project.add_project(
2789 VnfdRefCountDtsHandler.XPATH),
2790 handler=hdl,
2791 flags=rwdts.Flag.PUBLISHER,
2792 )
2793
2794
2795 class VdurDatastore(object):
2796 """
2797 This VdurDatastore is intended to expose select information about a VDUR
2798 such that it can be referenced in a cloud config file. The data that is
2799 exposed does not necessarily follow the structure of the data in the yang
2800 model. This is intentional. The data that are exposed are intended to be
2801 agnostic of the yang model so that changes in the model do not necessarily
2802 require changes to the interface provided to the user. It also means that
2803 the user does not need to be familiar with the RIFT.ware yang models.
2804 """
2805
2806 def __init__(self):
2807 """Create an instance of VdurDatastore"""
2808 self._vdur_data = dict()
2809 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2810
2811 def add(self, vdur):
2812 """Add a new VDUR to the datastore
2813
2814 Arguments:
2815 vdur - a VirtualDeploymentUnitRecord instance
2816
2817 Raises:
2818 A ValueError is raised if the VDUR is (1) None or (2) already in
2819 the datastore.
2820
2821 """
2822 if vdur.vdu_id is None:
2823 raise ValueError('VDURs are required to have an ID')
2824
2825 if vdur.vdu_id in self._vdur_data:
2826 raise ValueError('cannot add a VDUR more than once')
2827
2828 self._vdur_data[vdur.vdu_id] = dict()
2829
2830 def set_if_not_none(key, attr):
2831 if attr is not None:
2832 self._vdur_data[vdur.vdu_id][key] = attr
2833
2834 set_if_not_none('name', vdur._vdud.name)
2835 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2836 # The below can be used for hostname
2837 set_if_not_none('vdur_name', vdur.unique_short_name)
2838 set_if_not_none('custom_meta_data', vdur._vdud.supplemental_boot_data.custom_meta_data)
2839
2840 def update(self, vdur):
2841 """Update the VDUR information in the datastore
2842
2843 Arguments:
2844 vdur - a GI representation of a VDUR
2845
2846 Raises:
2847 A ValueError is raised if the VDUR is (1) None or (2) already in
2848 the datastore.
2849
2850 """
2851 if vdur.vdu_id is None:
2852 raise ValueError('VNFDs are required to have an ID')
2853
2854 if vdur.vdu_id not in self._vdur_data:
2855 raise ValueError('VNF is not recognized')
2856
2857 def set_or_delete(key, attr):
2858 if attr is None:
2859 if key in self._vdur_data[vdur.vdu_id]:
2860 del self._vdur_data[vdur.vdu_id][key]
2861
2862 else:
2863 self._vdur_data[vdur.vdu_id][key] = attr
2864
2865 set_or_delete('name', vdur._vdud.name)
2866 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2867 # The below can be used for hostname
2868 set_or_delete('vdur_name', vdur.unique_short_name)
2869 set_or_delete('custom_meta_data', vdur._vdud.supplemental_boot_data.custom_meta_data)
2870
2871 def remove(self, vdur_id):
2872 """Remove all of the data associated with specified VDUR
2873
2874 Arguments:
2875 vdur_id - the identifier of a VNFD in the datastore
2876
2877 Raises:
2878 A ValueError is raised if the VDUR is not contained in the
2879 datastore.
2880
2881 """
2882 if vdur_id not in self._vdur_data:
2883 raise ValueError('VNF is not recognized')
2884
2885 del self._vdur_data[vdur_id]
2886
2887 def get(self, expr):
2888 """Retrieve VDUR information from the datastore
2889
2890 An expression should be of the form,
2891
2892 vdu[<id>].<attr>
2893
2894 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2895 the exposed attribute that the user wishes to retrieve.
2896
2897 If the requested data is not available, None is returned.
2898
2899 Arguments:
2900 expr - a string that specifies the data to return
2901
2902 Raises:
2903 A ValueError is raised if the provided expression cannot be parsed.
2904
2905 Returns:
2906 The requested data or None
2907
2908 """
2909
2910 result = self._pattern.match(expr)
2911 if result is None:
2912 raise ValueError('data expression not recognized ({})'.format(expr))
2913
2914 vdur_id, key = result.groups()
2915
2916 if vdur_id not in self._vdur_data:
2917 return None
2918
2919 return self._vdur_data[vdur_id].get(key, None)
2920
2921
2922 class VnfManager(object):
2923 """ The virtual network function manager class """
2924 def __init__(self, dts, log, loop, project, cluster_name):
2925 self._dts = dts
2926 self._log = log
2927 self._loop = loop
2928 self._project = project
2929 self._cluster_name = cluster_name
2930
2931 # This list maintains a list of all the deleted vnfrs' ids. This is done to be able to determine
2932 # if the vnfr is not found because of restart or simply because it was deleted. In the first case we
2933 # recreate the vnfr while in the latter we do not.
2934 self._deleted_vnfrs = []
2935
2936 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2937 self._vnfd_handler = VnfdDtsHandler(dts, log, loop, self)
2938 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2939 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(
2940 log, dts, loop, project, callback=self.handle_nsr)
2941 self._vlr_handler = subscriber.VlrSubscriberDtsHandler(log, dts, loop, project,
2942 callback=self.vlr_event)
2943
2944 self._dts_handlers = [self._vnfd_handler,
2945 self._vnfr_handler,
2946 self._vnfr_ref_handler,
2947 self._nsr_handler,
2948 self._vlr_handler
2949 ]
2950 self._vnfrs = {}
2951 self._vnfds_to_vnfr = {}
2952 self._nsrs = {}
2953 self._vnfr_for_vlr = {}
2954
2955 @property
2956 def vnfr_handler(self):
2957 """ VNFR dts handler """
2958 return self._vnfr_handler
2959
2960 @property
2961 def vnfr_ref_handler(self):
2962 """ VNFR dts handler """
2963 return self._vnfr_ref_handler
2964
2965 @asyncio.coroutine
2966 def register(self):
2967 """ Register all static DTS handlers """
2968 for hdl in self._dts_handlers:
2969 yield from hdl.register()
2970
2971 def deregister(self):
2972 self._log.debug("De-register VNFM project {}".format(self._project.name))
2973 for hdl in self._dts_handlers:
2974 hdl.deregister()
2975
2976 @asyncio.coroutine
2977 def run(self):
2978 """ Run this VNFM instance """
2979 self._log.debug("Run VNFManager - registering static DTS handlers""")
2980 yield from self.register()
2981
2982 def handle_nsr(self, nsr, action):
2983 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
2984 self._nsrs[nsr.id] = nsr
2985 elif action == rwdts.QueryAction.DELETE:
2986 if nsr.id in self._nsrs:
2987 del self._nsrs[nsr.id]
2988
2989 def get_nsr_config(self, nsr_id):
2990 """
2991 Gets the NSR config from the DTS cache.
2992 Called in recovery mode only.
2993 """
2994 if nsr_id in self._nsrs:
2995 return self._nsrs[nsr_id]
2996
2997 if len(self._nsrs):
2998 self._log.error("VNFR with id {} not found".format(nsr_id))
2999 return None
3000
3001 curr_cfgs = list(self._nsr_handler.reg.elements)
3002 key_map = { getattr(cfg, self._nsr_handler.key_name()): cfg for cfg in curr_cfgs }
3003 curr_cfgs = [key_map[key] for key in key_map]
3004
3005 for cfg in curr_cfgs:
3006 self._nsrs[cfg.id] = cfg
3007
3008 if nsr_id in self._nsrs:
3009 return self._nsrs[nsr_id]
3010
3011 self._log.error("VNFR with id {} not found in DTS cache".format(nsr_id))
3012 return None
3013
3014
3015 def get_linked_mgmt_network(self, vnfr, restart_mode=False):
3016 """For the given VNFR get the related mgmt network from the NSD, if
3017 available.
3018 """
3019 vnfd_id = vnfr.vnfd.id
3020 nsr_id = vnfr.nsr_id_ref
3021
3022 if restart_mode:
3023 self._nsrs[nsr_id] = self.get_nsr_config(vnfr.nsr_id_ref)
3024
3025 # for the given related VNFR, get the corresponding NSR-config
3026 nsr_obj = None
3027 try:
3028 nsr_obj = self._nsrs[nsr_id]
3029 except KeyError:
3030 raise("Unable to find the NS with the ID: {}".format(nsr_id))
3031
3032 # for the related NSD check if a VLD exists such that it's a mgmt
3033 # network
3034 for vld in nsr_obj.nsd.vld:
3035 if vld.mgmt_network:
3036 for vnfd in vld.vnfd_connection_point_ref:
3037 if vnfd.vnfd_id_ref == vnfd_id:
3038 if vld.vim_network_name is not None:
3039 mgmt_net = vld.vim_network_name
3040 else:
3041 mgmt_net = self._project.name + "." + nsr_obj.name + "." + vld.name
3042 return mgmt_net
3043
3044 return None
3045
3046 def get_vnfr(self, vnfr_id):
3047 """ get VNFR by vnfr id """
3048
3049 if vnfr_id not in self._vnfrs:
3050 self._log.error("VNFR id {} not found".format(vnfr_id))
3051 return None
3052 # Returning None to prevent exception here. The caller raises the exception.
3053 # raise VnfRecordError("VNFR id %s not found", vnfr_id)
3054
3055 return self._vnfrs[vnfr_id]
3056
3057 def create_vnfr(self, vnfr, restart_mode=False):
3058 # Check if NSR is present. This is a situation where the NS has been deleted before
3059 # VNFR Create starts.
3060 if vnfr.nsr_id_ref not in self._nsrs:
3061 return None
3062
3063 """ Create a VNFR instance """
3064 if vnfr.id in self._vnfrs:
3065 msg = "Vnfr id %s already exists" % vnfr.id
3066 self._log.error(msg)
3067 raise VnfRecordError(msg)
3068
3069 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
3070 vnfr.id,
3071 vnfr.vnfd.id)
3072
3073 try:
3074 mgmt_network = self.get_linked_mgmt_network(vnfr, restart_mode)
3075 except Exception as e:
3076 self._log.exception(e)
3077 raise e
3078
3079 # Identify if we are using Rift RO or external RO
3080 external_ro = False
3081 nsr = self._nsrs[vnfr.nsr_id_ref]
3082 if (nsr.resource_orchestrator and
3083 nsr.resource_orchestrator != 'rift'):
3084 self._log.debug("VNFR {} using external RO".
3085 format(vnfr.name))
3086 external_ro = True
3087
3088 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
3089 self._dts, self._log, self._loop, self._cluster_name, self, vnfr,
3090 mgmt_network=mgmt_network, external_ro=external_ro,
3091 )
3092
3093 #Update ref count
3094 if vnfr.vnfd.id in self._vnfds_to_vnfr:
3095 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
3096 else:
3097 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
3098
3099 return self._vnfrs[vnfr.id]
3100
3101 @asyncio.coroutine
3102 def delete_vnfr(self, xact, vnfr):
3103 """ Create a VNFR instance """
3104 if vnfr.vnfr_id in self._vnfrs:
3105 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
3106 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
3107
3108 if vnfr.vnfd.id in self._vnfds_to_vnfr:
3109 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
3110 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
3111
3112 del self._vnfrs[vnfr.vnfr_id]
3113 self._deleted_vnfrs.append(vnfr.vnfr_id)
3114
3115 @asyncio.coroutine
3116 def fetch_vnfd(self, vnfd_id):
3117 """ Fetch VNFDs based with the vnfd id"""
3118 vnfd_path = self._project.add_project(
3119 VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id))
3120 self._log.debug("Fetch vnfd with path %s", vnfd_path)
3121 vnfd = None
3122
3123 res_iter = yield from self._dts.query_read(vnfd_path,
3124 rwdts.XactFlag.MERGE)
3125
3126 for ent in res_iter:
3127 res = yield from ent
3128 vnfd = res.result
3129
3130 if vnfd is None:
3131 err = "Failed to get Vnfd %s" % vnfd_id
3132 self._log.error(err)
3133 raise VnfRecordError(err)
3134
3135 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
3136
3137 return vnfd
3138
3139 def vnfd_in_use(self, vnfd_id):
3140 """ Is this VNFD in use """
3141 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
3142 if vnfd_id in self._vnfds_to_vnfr:
3143 return (self._vnfds_to_vnfr[vnfd_id] > 0)
3144 return False
3145
3146 @asyncio.coroutine
3147 def publish_vnfr(self, xact, path, msg):
3148 """ Publish a VNFR """
3149 self._log.debug("publish_vnfr called with path %s, msg %s",
3150 path, msg)
3151 yield from self.vnfr_handler.update(xact, path, msg)
3152
3153 @asyncio.coroutine
3154 def delete_vnfd(self, vnfd_id):
3155 """ Delete the Virtual Network Function descriptor with the passed id """
3156 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
3157 if vnfd_id in self._vnfds_to_vnfr:
3158 if self._vnfds_to_vnfr[vnfd_id]:
3159 self._log.debug("Cannot delete VNFD id %s reference exists %s",
3160 vnfd_id,
3161 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
3162 raise VirtualNetworkFunctionDescriptorRefCountExists(
3163 "Cannot delete :%s, ref_count:%s",
3164 vnfd_id,
3165 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
3166
3167 del self._vnfds_to_vnfr[vnfd_id]
3168
3169 def vnfd_refcount_xpath(self, vnfd_id):
3170 """ xpath for ref count entry """
3171 return self._project.add_project(VnfdRefCountDtsHandler.XPATH +
3172 "[rw-vnfr:vnfd-id-ref={}]").format(quoted_key(vnfd_id))
3173
3174 @asyncio.coroutine
3175 def get_vnfd_refcount(self, vnfd_id):
3176 """ Get the vnfd_list from this VNFM"""
3177 vnfd_list = []
3178 if vnfd_id is None or vnfd_id == "":
3179 for vnfd in self._vnfds_to_vnfr.keys():
3180 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
3181 vnfd_msg.vnfd_id_ref = vnfd
3182 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
3183 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
3184 elif vnfd_id in self._vnfds_to_vnfr:
3185 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
3186 vnfd_msg.vnfd_id_ref = vnfd_id
3187 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
3188 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
3189
3190 return vnfd_list
3191
3192 def add_vlr_id_vnfr_map(self, vlr_id, vnfr):
3193 """ Add a mapping for vlr_id into VNFR """
3194 self._vnfr_for_vlr[vlr_id] = vnfr
3195
3196 def remove_vlr_id_vnfr_map(self, vlr_id):
3197 """ Remove a mapping for vlr_id into VNFR """
3198 del self._vnfr_for_vlr[vlr_id]
3199
3200 def find_vnfr_for_vlr_id(self, vlr_id):
3201 """ Find VNFR for VLR id """
3202 vnfr = None
3203 if vlr_id in self._vnfr_for_vlr:
3204 vnfr = self._vnfr_for_vlr[vlr_id]
3205
3206 def vlr_event(self, vlr, action):
3207 """ VLR event handler """
3208 self._log.debug("VnfManager: Received VLR %s with action:%s", vlr, action)
3209
3210 if vlr.id not in self._vnfr_for_vlr:
3211 self._log.warning("VLR %s:%s received for unknown id; %s",
3212 vlr.id, vlr.name, vlr)
3213 return
3214 vnfr = self._vnfr_for_vlr[vlr.id]
3215
3216 vnfr.vlr_event(vlr, action)
3217
3218
3219 class VnfmProject(ManoProject):
3220
3221 def __init__(self, name, tasklet, **kw):
3222 super(VnfmProject, self).__init__(tasklet.log, name)
3223 self.update(tasklet)
3224
3225 self._vnfm = None
3226
3227 @asyncio.coroutine
3228 def register (self):
3229 try:
3230 vm_parent_name = self._tasklet.tasklet_info.get_parent_vm_parent_instance_name()
3231 assert vm_parent_name is not None
3232 self._vnfm = VnfManager(self._dts, self.log, self.loop, self, vm_parent_name)
3233 yield from self._vnfm.run()
3234 except Exception:
3235 print("Caught Exception in VNFM init:", sys.exc_info()[0])
3236 raise
3237
3238 def deregister(self):
3239 self._log.debug("De-register project {} for VnfmProject".
3240 format(self.name))
3241 self._vnfm.deregister()
3242
3243 @asyncio.coroutine
3244 def delete_prepare(self):
3245 if self._vnfm and self._vnfm._vnfrs:
3246 delete_msg = "Project has VNFR associated with it. Delete all Project NSR and try again."
3247 return False, delete_msg
3248 return True, "True"
3249
3250 class VnfmTasklet(rift.tasklets.Tasklet):
3251 """ VNF Manager tasklet class """
3252 def __init__(self, *args, **kwargs):
3253 super(VnfmTasklet, self).__init__(*args, **kwargs)
3254 self.rwlog.set_category("rw-mano-log")
3255 self.rwlog.set_subcategory("vnfm")
3256
3257 self._dts = None
3258 self._project_handler = None
3259 self.projects = {}
3260
3261 @property
3262 def dts(self):
3263 return self._dts
3264
3265 def start(self):
3266 try:
3267 super(VnfmTasklet, self).start()
3268 self.log.info("Starting VnfmTasklet")
3269
3270 self.log.setLevel(logging.DEBUG)
3271
3272 self.log.debug("Registering with dts")
3273 self._dts = rift.tasklets.DTS(self.tasklet_info,
3274 RwVnfmYang.get_schema(),
3275 self.loop,
3276 self.on_dts_state_change)
3277
3278 self.log.debug("Created DTS Api GI Object: %s", self._dts)
3279 except Exception:
3280 self._log.error("Caught Exception in VNFM start:", sys.exc_info()[0])
3281 raise
3282
3283 def on_instance_started(self):
3284 """ Task insance started callback """
3285 self.log.debug("Got instance started callback")
3286
3287 def stop(self):
3288 try:
3289 self._dts.deinit()
3290 except Exception:
3291 self._log.error("Caught Exception in VNFM stop:", sys.exc_info()[0])
3292 raise
3293
3294 @asyncio.coroutine
3295 def init(self):
3296 """ Task init callback """
3297 self.log.debug("creating project handler")
3298 self.project_handler = ProjectHandler(self, VnfmProject)
3299 self.project_handler.register()
3300
3301 @asyncio.coroutine
3302 def run(self):
3303 """ Task run callback """
3304 pass
3305
3306 @asyncio.coroutine
3307 def on_dts_state_change(self, state):
3308 """Take action according to current dts state to transition
3309 application into the corresponding application state
3310
3311 Arguments
3312 state - current dts state
3313 """
3314 switch = {
3315 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
3316 rwdts.State.CONFIG: rwdts.State.RUN,
3317 }
3318
3319 handlers = {
3320 rwdts.State.INIT: self.init,
3321 rwdts.State.RUN: self.run,
3322 }
3323
3324 # Transition application to next state
3325 handler = handlers.get(state, None)
3326 if handler is not None:
3327 yield from handler()
3328
3329 # Transition dts to next state
3330 next_state = switch.get(state, None)
3331 if next_state is not None:
3332 self._dts.handle.set_state(next_state)