Merge "Descriptor Unification"
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.mano.dts as mano_dts
53
54
55 class VMResourceError(Exception):
56 """ VM resource Error"""
57 pass
58
59
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
62 pass
63
64
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
67 pass
68
69
70 class NotImplemented(Exception):
71 """Not implemented """
72 pass
73
74
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
77 pass
78
79
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
82 pass
83
84
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
87 pass
88
89
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
92 pass
93
94
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
97 pass
98
99
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
102 pass
103
104
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
107 pass
108
109
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
117 pass
118
119
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
127 pass
128
129
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
132 pass
133
134
135 class VNFMPlacementGroupError(Exception):
136 pass
137
138 class VirtualNetworkFunctionRecordState(enum.Enum):
139 """ VNFR state """
140 INIT = 1
141 VL_INIT_PHASE = 2
142 VM_INIT_PHASE = 3
143 READY = 4
144 TERMINATE = 5
145 VL_TERMINATE_PHASE = 6
146 VDU_TERMINATE_PHASE = 7
147 TERMINATED = 7
148 FAILED = 10
149
150
151 class VDURecordState(enum.Enum):
152 """VDU record state """
153 INIT = 1
154 INSTANTIATING = 2
155 RESOURCE_ALLOC_PENDING = 3
156 READY = 4
157 TERMINATING = 5
158 TERMINATED = 6
159 FAILED = 10
160
161
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
165 self._dts = dts
166 self._log = log
167 self._loop = loop
168 self._component = component
169 self._cluster_name = cluster_name
170 self._vcs_handler = vcs_handler
171 self._mangled_name = mangled_name
172
173 @staticmethod
174 def mangle_name(component_name, vnf_name, vnfd_id):
175 """ mangled component name """
176 return vnf_name + ":" + component_name + ":" + vnfd_id
177
178 @property
179 def name(self):
180 """ name of this component"""
181 return self._mangled_name
182
183 @property
184 def path(self):
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self.name)
190
191 @property
192 def instance_xpath(self):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
195 "/instances" +
196 "/instance" +
197 "[instance-name = '{}']".format(self._cluster_name))
198
199 @property
200 def start_comp_xpath(self):
201 """ start component xpath """
202 return (self.instance_xpath +
203 "/child-n[instance-name = 'START-REQ']")
204
205 def get_start_comp_msg(self, ip_address):
206 """ start this component """
207 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
208 start_msg.instance_name = 'START-REQ'
209 start_msg.component_name = self.name
210 start_msg.admin_command = "START"
211 start_msg.ip_address = ip_address
212
213 return start_msg
214
215 @property
216 def msg(self):
217 """ Returns the message for this vcs component"""
218
219 vcs_comp_dict = self._component.as_dict()
220
221 def mangle_comp_names(comp_dict):
222 """ mangle component name with VNF name, id"""
223 for key, val in comp_dict.items():
224 if isinstance(val, dict):
225 comp_dict[key] = mangle_comp_names(val)
226 elif isinstance(val, list):
227 i = 0
228 for ent in val:
229 if isinstance(ent, dict):
230 val[i] = mangle_comp_names(ent)
231 else:
232 val[i] = ent
233 i += 1
234 elif key == "component_name":
235 comp_dict[key] = VcsComponent.mangle_name(val,
236 self._vnfd_name,
237 self._vnfd_id)
238 return comp_dict
239
240 mangled_dict = mangle_comp_names(vcs_comp_dict)
241 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
242 return msg
243
244 @asyncio.coroutine
245 def publish(self, xact):
246 """ Publishes the VCS component """
247 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self.name, self.path, self.msg)
249 yield from self._vcs_handler.publish(xact, self.path, self.msg)
250
251 @asyncio.coroutine
252 def start(self, xact, parent, ip_addr=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg = self.get_start_comp_msg(ip_addr)
256 self._log.debug("starting component %s %s",
257 self.start_comp_xpath, start_msg)
258 yield from self._dts.query_create(self.start_comp_xpath,
259 0,
260 start_msg)
261 self._log.debug("started component %s, %s",
262 self.start_comp_xpath, start_msg)
263
264
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
267 def __init__(self,
268 dts,
269 log,
270 loop,
271 vdud,
272 vnfr,
273 mgmt_intf,
274 mgmt_network,
275 cloud_account_name,
276 vnfd_package_store,
277 vdur_id=None,
278 placement_groups=[]):
279 self._dts = dts
280 self._log = log
281 self._loop = loop
282 self._vdud = vdud
283 self._vnfr = vnfr
284 self._mgmt_intf = mgmt_intf
285 self._cloud_account_name = cloud_account_name
286 self._vnfd_package_store = vnfd_package_store
287 self._mgmt_network = mgmt_network
288
289 self._vdur_id = vdur_id or str(uuid.uuid4())
290 self._int_intf = []
291 self._ext_intf = []
292 self._state = VDURecordState.INIT
293 self._state_failed_reason = None
294 self._request_id = str(uuid.uuid4())
295 self._name = vnfr.name + "__" + vdud.id
296 self._placement_groups = placement_groups
297 self._rm_regh = None
298 self._vm_resp = None
299 self._vdud_cloud_init = None
300 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
301
302 @asyncio.coroutine
303 def vdu_opdata_register(self):
304 yield from self._vdur_console_handler.register()
305
306 def cp_ip_addr(self, cp_name):
307 """ Find ip address by connection point name """
308 if self._vm_resp is not None:
309 for conn_point in self._vm_resp.connection_points:
310 if conn_point.name == cp_name:
311 return conn_point.ip_address
312 return "0.0.0.0"
313
314 def cp_id(self, cp_name):
315 """ Find connection point id by connection point name """
316 if self._vm_resp is not None:
317 for conn_point in self._vm_resp.connection_points:
318 if conn_point.name == cp_name:
319 return conn_point.connection_point_id
320 return ''
321
322 @property
323 def vdu_id(self):
324 return self._vdud.id
325
326 @property
327 def vm_resp(self):
328 return self._vm_resp
329
330 @property
331 def name(self):
332 """ Return this VDUR's name """
333 return self._name
334
335 @property
336 def cloud_account_name(self):
337 """ Cloud account this VDU should be created in """
338 return self._cloud_account_name
339
340 @property
341 def image_name(self):
342 """ name that should be used to lookup the image on the CMP """
343 return os.path.basename(self._vdud.image)
344
345 @property
346 def image_checksum(self):
347 """ name that should be used to lookup the image on the CMP """
348 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
349
350 @property
351 def management_ip(self):
352 if not self.active:
353 return None
354 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
355
356 @property
357 def vm_management_ip(self):
358 if not self.active:
359 return None
360 return self._vm_resp.management_ip
361
362 @property
363 def operational_status(self):
364 """ Operational status of this VDU"""
365 op_stats_dict = {"INIT": "init",
366 "INSTANTIATING": "vm_init_phase",
367 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
368 "READY": "running",
369 "FAILED": "failed",
370 "TERMINATING": "terminated",
371 "TERMINATED": "terminated",
372 }
373 return op_stats_dict[self._state.name]
374
375 @property
376 def msg(self):
377 """ VDU message """
378 vdu_fields = ["vm_flavor",
379 "guest_epa",
380 "vswitch_epa",
381 "hypervisor_epa",
382 "host_epa",
383 "name"]
384 vdu_copy_dict = {k: v for k, v in
385 self._vdud.as_dict().items() if k in vdu_fields}
386 vdur_dict = {"id": self._vdur_id,
387 "vdu_id_ref": self._vdud.id,
388 "operational_status": self.operational_status,
389 "operational_status_details": self._state_failed_reason,
390 }
391 if self.vm_resp is not None:
392 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
393 "flavor_id": self.vm_resp.flavor_id,
394 "image_id": self.vm_resp.image_id,
395 })
396
397 if self.management_ip is not None:
398 vdur_dict["management_ip"] = self.management_ip
399
400 if self.vm_management_ip is not None:
401 vdur_dict["vm_management_ip"] = self.vm_management_ip
402
403 vdur_dict.update(vdu_copy_dict)
404
405 icp_list = []
406 ii_list = []
407
408 for intf, cp_id, vlr in self._int_intf:
409 cp = self.find_internal_cp_by_cp_id(cp_id)
410
411 icp_list.append({"name": cp.name,
412 "id": cp.id,
413 "type_yang": "VPORT",
414 "ip_address": self.cp_ip_addr(cp.id)})
415
416 ii_list.append({"name": intf.name,
417 "vdur_internal_connection_point_ref": cp.id,
418 "virtual_interface": {}})
419
420 vdur_dict["internal_connection_point"] = icp_list
421 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
422 vdur_dict["internal_interface"] = ii_list
423
424 ei_list = []
425 for intf, cp, vlr in self._ext_intf:
426 ei_list.append({"name": cp,
427 "vnfd_connection_point_ref": cp,
428 "virtual_interface": {}})
429 self._vnfr.update_cp(cp, self.cp_ip_addr(cp), self.cp_id(cp))
430
431 vdur_dict["external_interface"] = ei_list
432
433 placement_groups = []
434 for group in self._placement_groups:
435 placement_groups.append(group.as_dict())
436
437 vdur_dict['placement_groups_info'] = placement_groups
438 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
439
440 @property
441 def resmgr_path(self):
442 """ path for resource-mgr"""
443 return ("D,/rw-resource-mgr:resource-mgmt" +
444 "/vdu-event" +
445 "/vdu-event-data[event-id='{}']".format(self._request_id))
446
447 @property
448 def vm_flavor_msg(self):
449 """ VM flavor message """
450 flavor = self._vdud.vm_flavor.__class__()
451 flavor.copy_from(self._vdud.vm_flavor)
452
453 return flavor
454
455 @property
456 def vdud_cloud_init(self):
457 """ Return the cloud-init contents for the VDU """
458 if self._vdud_cloud_init is None:
459 self._vdud_cloud_init = self.cloud_init()
460
461 return self._vdud_cloud_init
462
463 def cloud_init(self):
464 """ Populate cloud_init with cloud-config script from
465 either the inline contents or from the file provided
466 """
467 if self._vdud.cloud_init is not None:
468 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
469 return self._vdud.cloud_init
470 elif self._vdud.cloud_init_file is not None:
471 # Get cloud-init script contents from the file provided in the cloud_init_file param
472 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
473 filename = self._vdud.cloud_init_file
474 self._vnfd_package_store.refresh()
475 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
476 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
477 try:
478 return cloud_init_extractor.read_script(stored_package, filename)
479 except rift.package.cloud_init.CloudInitExtractionError as e:
480 raise VirtualDeploymentUnitRecordError(e)
481 else:
482 self._log.debug("VDU Instantiation: cloud-init script not provided")
483
484 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
485 host_aggregates = []
486 availability_zones = []
487 server_groups = []
488 for group in self._placement_groups:
489 if group.has_field('host_aggregate'):
490 for aggregate in group.host_aggregate:
491 host_aggregates.append(aggregate.as_dict())
492 if group.has_field('availability_zone'):
493 availability_zones.append(group.availability_zone.as_dict())
494 if group.has_field('server_group'):
495 server_groups.append(group.server_group.as_dict())
496
497 if availability_zones:
498 if len(availability_zones) > 1:
499 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
500 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
501 else:
502 vm_create_msg_dict['availability_zone'] = availability_zones[0]
503
504 if server_groups:
505 if len(server_groups) > 1:
506 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
507 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
508 else:
509 vm_create_msg_dict['server_group'] = server_groups[0]
510
511 if host_aggregates:
512 vm_create_msg_dict['host_aggregate'] = host_aggregates
513
514 return
515
516 def process_placement_groups(self, vm_create_msg_dict):
517 """Process the placement_groups and fill resource-mgr request"""
518 if not self._placement_groups:
519 return
520
521 cloud_set = set([group.cloud_type for group in self._placement_groups])
522 assert len(cloud_set) == 1
523 cloud_type = cloud_set.pop()
524
525 if cloud_type == 'openstack':
526 self.process_openstack_placement_group_construct(vm_create_msg_dict)
527
528 else:
529 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
530 return
531
532 def resmgr_msg(self, config=None):
533 vdu_fields = ["vm_flavor",
534 "guest_epa",
535 "vswitch_epa",
536 "hypervisor_epa",
537 "host_epa"]
538
539 self._log.debug("Creating params based on VDUD: %s", self._vdud)
540 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
541
542 vm_create_msg_dict = {
543 "name": self.name,
544 "image_name": self.image_name,
545 }
546
547 if self.image_checksum is not None:
548 vm_create_msg_dict["image_checksum"] = self.image_checksum
549
550 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
551 if self._vdud.has_field('mgmt_vpci'):
552 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
553
554 self._log.debug("VDUD: %s", self._vdud)
555 if config is not None:
556 vm_create_msg_dict['vdu_init'] = {'userdata': config}
557
558 if self._mgmt_network:
559 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
560
561 cp_list = []
562 for intf, cp, vlr in self._ext_intf:
563 cp_info = {"name": cp,
564 "virtual_link_id": vlr.network_id,
565 "type_yang": intf.virtual_interface.type_yang}
566
567 if (intf.virtual_interface.has_field('vpci') and
568 intf.virtual_interface.vpci is not None):
569 cp_info["vpci"] = intf.virtual_interface.vpci
570
571 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
572 cp_info['security_group'] = vlr.ip_profile_params.security_group
573
574 cp_list.append(cp_info)
575
576 for intf, cp, vlr in self._int_intf:
577 if (intf.virtual_interface.has_field('vpci') and
578 intf.virtual_interface.vpci is not None):
579 cp_list.append({"name": cp,
580 "virtual_link_id": vlr.network_id,
581 "type_yang": intf.virtual_interface.type_yang,
582 "vpci": intf.virtual_interface.vpci})
583 else:
584 cp_list.append({"name": cp,
585 "virtual_link_id": vlr.network_id,
586 "type_yang": intf.virtual_interface.type_yang})
587
588 vm_create_msg_dict["connection_points"] = cp_list
589 vm_create_msg_dict.update(vdu_copy_dict)
590
591 self.process_placement_groups(vm_create_msg_dict)
592
593 msg = RwResourceMgrYang.VDUEventData()
594 msg.event_id = self._request_id
595 msg.cloud_account = self.cloud_account_name
596 msg.request_info.from_dict(vm_create_msg_dict)
597 return msg
598
599 @asyncio.coroutine
600 def terminate(self, xact):
601 """ Delete resource in VIM """
602 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
603 self._log.warning("VDU terminate in not ready state - Ignoring request")
604 return
605
606 self._state = VDURecordState.TERMINATING
607 if self._vm_resp is not None:
608 try:
609 with self._dts.transaction() as new_xact:
610 yield from self.delete_resource(new_xact)
611 except Exception:
612 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
613
614 if self._rm_regh is not None:
615 self._log.debug("Deregistering resource manager registration handle")
616 self._rm_regh.deregister()
617 self._rm_regh = None
618
619 if self._vdur_console_handler is not None:
620 self._log.error("Deregistering vnfr vdur registration handle")
621 self._vdur_console_handler._regh.deregister()
622 self._vdur_console_handler._regh = None
623
624 self._state = VDURecordState.TERMINATED
625
626 def find_internal_cp_by_cp_id(self, cp_id):
627 """ Find the CP corresponding to the connection point id"""
628 cp = None
629
630 self._log.debug("find_internal_cp_by_cp_id(%s) called",
631 cp_id)
632
633 for int_cp in self._vdud.internal_connection_point:
634 self._log.debug("Checking for int cp %s in internal connection points",
635 int_cp.id)
636 if int_cp.id == cp_id:
637 cp = int_cp
638 break
639
640 if cp is None:
641 self._log.debug("Failed to find cp %s in internal connection points",
642 cp_id)
643 msg = "Failed to find cp %s in internal connection points" % cp_id
644 raise VduRecordError(msg)
645
646 # return the VLR associated with the connection point
647 return cp
648
649 @asyncio.coroutine
650 def create_resource(self, xact, vnfr, config=None):
651 """ Request resource from ResourceMgr """
652 def find_cp_by_name(cp_name):
653 """ Find a connection point by name """
654 cp = None
655 self._log.debug("find_cp_by_name(%s) called", cp_name)
656 for ext_cp in vnfr._cprs:
657 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
658 if ext_cp.name == cp_name:
659 cp = ext_cp
660 break
661 if cp is None:
662 self._log.debug("Failed to find cp %s in external connection points",
663 cp_name)
664 return cp
665
666 def find_internal_vlr_by_cp_name(cp_name):
667 """ Find the VLR corresponding to the connection point name"""
668 cp = None
669
670 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
671 cp_name)
672
673 for int_cp in self._vdud.internal_connection_point:
674 self._log.debug("Checking for int cp %s in internal connection points",
675 int_cp.id)
676 if int_cp.id == cp_name:
677 cp = int_cp
678 break
679
680 if cp is None:
681 self._log.debug("Failed to find cp %s in internal connection points",
682 cp_name)
683 msg = "Failed to find cp %s in internal connection points" % cp_name
684 raise VduRecordError(msg)
685
686 # return the VLR associated with the connection point
687 return vnfr.find_vlr_by_cp(cp_name)
688
689 block = xact.block_create()
690
691 self._log.debug("Executing vm request id: %s, action: create",
692 self._request_id)
693
694 # Resolve the networks associated external interfaces
695 for ext_intf in self._vdud.external_interface:
696 self._log.debug("Resolving external interface name [%s], cp[%s]",
697 ext_intf.name, ext_intf.vnfd_connection_point_ref)
698 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
699 if cp is None:
700 self._log.debug("Failed to find connection point - %s",
701 ext_intf.vnfd_connection_point_ref)
702 continue
703 self._log.debug("Connection point name [%s], type[%s]",
704 cp.name, cp.type_yang)
705
706 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
707
708 etuple = (ext_intf, cp.name, vlr)
709 self._ext_intf.append(etuple)
710
711 self._log.debug("Created external interface tuple : %s", etuple)
712
713 # Resolve the networks associated internal interfaces
714 for intf in self._vdud.internal_interface:
715 cp_id = intf.vdu_internal_connection_point_ref
716 self._log.debug("Resolving internal interface name [%s], cp[%s]",
717 intf.name, cp_id)
718
719 try:
720 vlr = find_internal_vlr_by_cp_name(cp_id)
721 except Exception as e:
722 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
723 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
724 raise VduRecordError(msg)
725
726 ituple = (intf, cp_id, vlr)
727 self._int_intf.append(ituple)
728
729 self._log.debug("Created internal interface tuple : %s", ituple)
730
731 resmgr_path = self.resmgr_path
732 resmgr_msg = self.resmgr_msg(config)
733
734 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
735 block.add_query_create(resmgr_path, resmgr_msg)
736
737 res_iter = yield from block.execute(now=True)
738
739 resp = None
740
741 for i in res_iter:
742 r = yield from i
743 resp = r.result
744
745 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
746 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
747 self._log.debug("Got vm request response: %s", resp.resource_info)
748 return resp.resource_info
749
750 @asyncio.coroutine
751 def delete_resource(self, xact):
752 block = xact.block_create()
753
754 self._log.debug("Executing vm request id: %s, action: delete",
755 self._request_id)
756
757 block.add_query_delete(self.resmgr_path)
758
759 yield from block.execute(flags=0, now=True)
760
761 @asyncio.coroutine
762 def read_resource(self, xact):
763 block = xact.block_create()
764
765 self._log.debug("Executing vm request id: %s, action: delete",
766 self._request_id)
767
768 block.add_query_read(self.resmgr_path)
769
770 res_iter = yield from block.execute(flags=0, now=True)
771 for i in res_iter:
772 r = yield from i
773 resp = r.result
774
775 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
776 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
777 self._log.debug("Got vm request response: %s", resp.resource_info)
778 #self._vm_resp = resp.resource_info
779 return resp.resource_info
780
781
782 @asyncio.coroutine
783 def start_component(self):
784 """ This VDUR is active """
785 self._log.debug("Starting component %s for vdud %s vdur %s",
786 self._vdud.vcs_component_ref,
787 self._vdud,
788 self._vdur_id)
789 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
790 self.vm_resp.management_ip)
791
792 @property
793 def active(self):
794 """ Is this VDU active """
795 return True if self._state is VDURecordState.READY else False
796
797 @asyncio.coroutine
798 def instantiation_failed(self, failed_reason=None):
799 """ VDU instantiation failed """
800 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
801 self._state = VDURecordState.FAILED
802 self._state_failed_reason = failed_reason
803 yield from self._vnfr.instantiation_failed(failed_reason)
804
805 @asyncio.coroutine
806 def vdu_is_active(self):
807 """ This VDU is active"""
808 if self.active:
809 self._log.warning("VDU %s was already marked as active", self._vdur_id)
810 return
811
812 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
813
814 if self._vdud.vcs_component_ref is not None:
815 yield from self.start_component()
816
817 self._state = VDURecordState.READY
818
819 if self._vnfr.all_vdus_active():
820 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
821 yield from self._vnfr.is_ready()
822
823 @asyncio.coroutine
824 def instantiate(self, xact, vnfr, config=None):
825 """ Instantiate this VDU """
826 self._state = VDURecordState.INSTANTIATING
827
828 @asyncio.coroutine
829 def on_prepare(xact_info, query_action, ks_path, msg):
830 """ This VDUR is active """
831 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
832 query_action,
833 ks_path,
834 msg)
835
836 if (query_action == rwdts.QueryAction.UPDATE or
837 query_action == rwdts.QueryAction.CREATE):
838 self._vm_resp = msg
839
840 if msg.resource_state == "active":
841 # Move this VDU to ready state
842 yield from self.vdu_is_active()
843 elif msg.resource_state == "failed":
844 yield from self.instantiation_failed(msg.resource_errors)
845 elif query_action == rwdts.QueryAction.DELETE:
846 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
847 else:
848 raise NotImplementedError(
849 "%s action on VirtualDeployementUnitRecord not supported",
850 query_action)
851
852 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
853
854 try:
855 reg_event = asyncio.Event(loop=self._loop)
856
857 @asyncio.coroutine
858 def on_ready(regh, status):
859 reg_event.set()
860
861 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
862 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
863 flags=rwdts.Flag.SUBSCRIBER,
864 handler=handler)
865 yield from reg_event.wait()
866
867 vm_resp = yield from self.create_resource(xact, vnfr, config)
868 self._vm_resp = vm_resp
869
870 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
871 self._log.debug("Requested VM from resource manager response %s",
872 vm_resp)
873 if vm_resp.resource_state == "active":
874 self._log.debug("Resourcemgr responded wih an active vm resp %s",
875 vm_resp)
876 yield from self.vdu_is_active()
877 self._state = VDURecordState.READY
878 elif (vm_resp.resource_state == "pending" or
879 vm_resp.resource_state == "inactive"):
880 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
881 vm_resp)
882 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
883 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
884 # flags=rwdts.Flag.SUBSCRIBER,
885 # handler=handler)
886 else:
887 self._log.debug("Resourcemgr responded wih an error vm resp %s",
888 vm_resp)
889 raise VirtualDeploymentUnitRecordError(
890 "Failed VDUR instantiation %s " % vm_resp)
891
892 except Exception as e:
893 import traceback
894 traceback.print_exc()
895 self._log.exception(e)
896 self._log.error("Instantiation of VDU record failed: %s", str(e))
897 self._state = VDURecordState.FAILED
898 yield from self.instantiation_failed(str(e))
899
900
901 class VlRecordState(enum.Enum):
902 """ VL Record State """
903 INIT = 101
904 INSTANTIATION_PENDING = 102
905 ACTIVE = 103
906 TERMINATE_PENDING = 104
907 TERMINATED = 105
908 FAILED = 106
909
910
911 class InternalVirtualLinkRecord(object):
912 """ Internal Virtual Link record """
913 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
914 self._dts = dts
915 self._log = log
916 self._loop = loop
917 self._ivld_msg = ivld_msg
918 self._vnfr_name = vnfr_name
919 self._cloud_account_name = cloud_account_name
920
921 self._vlr_req = self.create_vlr()
922 self._vlr = None
923 self._state = VlRecordState.INIT
924
925 @property
926 def vlr_id(self):
927 """ Find VLR by id """
928 return self._vlr_req.id
929
930 @property
931 def name(self):
932 """ Name of this VL """
933 return self._vnfr_name + "." + self._ivld_msg.name
934
935 @property
936 def network_id(self):
937 """ Find VLR by id """
938 return self._vlr.network_id if self._vlr else None
939
940 def vlr_path(self):
941 """ VLR path for this VLR instance"""
942 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
943
944 def create_vlr(self):
945 """ Create the VLR record which will be instantiated """
946
947 vld_fields = ["short_name",
948 "vendor",
949 "description",
950 "version",
951 "type_yang",
952 "provider_network"]
953
954 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
955
956 vlr_dict = {"id": str(uuid.uuid4()),
957 "name": self.name,
958 "cloud_account": self._cloud_account_name,
959 }
960 vlr_dict.update(vld_copy_dict)
961
962 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
963 return vlr
964
965 @asyncio.coroutine
966 def instantiate(self, xact, restart_mode=False):
967 """ Instantiate VL """
968
969 @asyncio.coroutine
970 def instantiate_vlr():
971 """ Instantiate VLR"""
972 self._log.debug("Create VL with xpath %s and vlr %s",
973 self.vlr_path(), self._vlr_req)
974
975 with self._dts.transaction(flags=0) as xact:
976 block = xact.block_create()
977 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
978 self._log.debug("Executing VL create path:%s msg:%s",
979 self.vlr_path(), self._vlr_req)
980
981 res_iter = None
982 try:
983 res_iter = yield from block.execute()
984 except Exception:
985 self._state = VlRecordState.FAILED
986 self._log.exception("Caught exception while instantial VL")
987 raise
988
989 for ent in res_iter:
990 res = yield from ent
991 self._vlr = res.result
992
993 if self._vlr.operational_status == 'failed':
994 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
995 self._state = VlRecordState.FAILED
996 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
997
998 self._log.info("Created VL with xpath %s and vlr %s",
999 self.vlr_path(), self._vlr)
1000
1001 @asyncio.coroutine
1002 def get_vlr():
1003 """ Get the network id """
1004 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1005 vlr = None
1006 for ent in res_iter:
1007 res = yield from ent
1008 vlr = res.result
1009
1010 if vlr is None:
1011 err = "Failed to get VLR for path %s" % self.vlr_path()
1012 self._log.warn(err)
1013 raise InternalVirtualLinkRecordError(err)
1014 return vlr
1015
1016 self._state = VlRecordState.INSTANTIATION_PENDING
1017
1018 if restart_mode:
1019 vl = yield from get_vlr()
1020 if vl is None:
1021 yield from instantiate_vlr()
1022 else:
1023 yield from instantiate_vlr()
1024
1025 self._state = VlRecordState.ACTIVE
1026
1027 def vlr_in_vns(self):
1028 """ Is there a VLR record in VNS """
1029 if (self._state == VlRecordState.ACTIVE or
1030 self._state == VlRecordState.INSTANTIATION_PENDING or
1031 self._state == VlRecordState.FAILED):
1032 return True
1033
1034 return False
1035
1036 @asyncio.coroutine
1037 def terminate(self, xact):
1038 """Terminate this VL """
1039 if not self.vlr_in_vns():
1040 self._log.debug("Ignoring terminate request for id %s in state %s",
1041 self.vlr_id, self._state)
1042 return
1043
1044 self._log.debug("Terminating VL with path %s", self.vlr_path())
1045 self._state = VlRecordState.TERMINATE_PENDING
1046 block = xact.block_create()
1047 block.add_query_delete(self.vlr_path())
1048 yield from block.execute(flags=0, now=True)
1049 self._state = VlRecordState.TERMINATED
1050 self._log.debug("Terminated VL with path %s", self.vlr_path())
1051
1052
1053 class VirtualNetworkFunctionRecord(object):
1054 """ Virtual Network Function Record """
1055 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1056 self._dts = dts
1057 self._log = log
1058 self._loop = loop
1059 self._cluster_name = cluster_name
1060 self._vnfr_msg = vnfr_msg
1061 self._vnfr_id = vnfr_msg.id
1062 self._vnfd_id = vnfr_msg.vnfd_ref
1063 self._vnfm = vnfm
1064 self._vcs_handler = vcs_handler
1065 self._vnfr = vnfr_msg
1066 self._mgmt_network = mgmt_network
1067
1068 self._vnfd = None
1069 self._state = VirtualNetworkFunctionRecordState.INIT
1070 self._state_failed_reason = None
1071 self._ext_vlrs = {} # The list of external virtual links
1072 self._vlrs = [] # The list of internal virtual links
1073 self._vdus = [] # The list of vdu
1074 self._vlr_by_cp = {}
1075 self._cprs = []
1076 self._inventory = {}
1077 self._create_time = int(time.time())
1078 self._vnf_mon = None
1079 self._config_status = vnfr_msg.config_status
1080 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1081
1082 def _get_vdur_from_vdu_id(self, vdu_id):
1083 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1084 self._log.debug("Searching through vdus: %s", self._vdus)
1085 for vdu in self._vdus:
1086 self._log.debug("vdu_id: %s", vdu.vdu_id)
1087 if vdu.vdu_id == vdu_id:
1088 return vdu
1089
1090 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1091
1092 @property
1093 def operational_status(self):
1094 """ Operational status of this VNFR """
1095 op_status_map = {"INIT": "init",
1096 "VL_INIT_PHASE": "vl_init_phase",
1097 "VM_INIT_PHASE": "vm_init_phase",
1098 "READY": "running",
1099 "TERMINATE": "terminate",
1100 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1101 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1102 "TERMINATED": "terminated",
1103 "FAILED": "failed", }
1104 return op_status_map[self._state.name]
1105
1106 @property
1107 def vnfd_xpath(self):
1108 """ VNFD xpath associated with this VNFR """
1109 return("C,/vnfd:vnfd-catalog/"
1110 "vnfd:vnfd[vnfd:id = '{}']".format(self._vnfd_id))
1111
1112 @property
1113 def vnfd(self):
1114 """ VNFD for this VNFR """
1115 return self._vnfd
1116
1117 @property
1118 def vnf_name(self):
1119 """ VNFD name associated with this VNFR """
1120 return self.vnfd.name
1121
1122 @property
1123 def name(self):
1124 """ Name of this VNF in the record """
1125 return self._vnfr.name
1126
1127 @property
1128 def cloud_account_name(self):
1129 """ Name of the cloud account this VNFR is instantiated in """
1130 return self._vnfr.cloud_account
1131
1132 @property
1133 def vnfd_id(self):
1134 """ VNFD Id associated with this VNFR """
1135 return self.vnfd.id
1136
1137 @property
1138 def vnfr_id(self):
1139 """ VNFR Id associated with this VNFR """
1140 return self._vnfr_id
1141
1142 @property
1143 def member_vnf_index(self):
1144 """ Member VNF index associated with this VNFR """
1145 return self._vnfr.member_vnf_index_ref
1146
1147 @property
1148 def config_status(self):
1149 """ Config agent status for this VNFR """
1150 return self._config_status
1151
1152 def component_by_name(self, component_name):
1153 """ Find a component by name in the inventory list"""
1154 mangled_name = VcsComponent.mangle_name(component_name,
1155 self.vnf_name,
1156 self.vnfd_id)
1157 return self._inventory[mangled_name]
1158
1159
1160
1161 @asyncio.coroutine
1162 def get_nsr_config(self):
1163 ### Need access to NS instance configuration for runtime resolution.
1164 ### This shall be replaced when deployment flavors are implemented
1165 xpath = "C,/nsr:ns-instance-config"
1166 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1167
1168 for result in results:
1169 entry = yield from result
1170 ns_instance_config = entry.result
1171 for nsr in ns_instance_config.nsr:
1172 if nsr.id == self._vnfr_msg.nsr_id_ref:
1173 return nsr
1174 return None
1175
1176 @asyncio.coroutine
1177 def start_component(self, component_name, ip_addr):
1178 """ Start a component in the VNFR by name """
1179 comp = self.component_by_name(component_name)
1180 yield from comp.start(None, None, ip_addr)
1181
1182 def cp_ip_addr(self, cp_name):
1183 """ Get ip address for connection point """
1184 self._log.debug("cp_ip_addr()")
1185 for cp in self._cprs:
1186 if cp.name == cp_name and cp.ip_address is not None:
1187 return cp.ip_address
1188 return "0.0.0.0"
1189
1190 def mgmt_intf_info(self):
1191 """ Get Management interface info for this VNFR """
1192 mgmt_intf_desc = self.vnfd.msg.mgmt_interface
1193 ip_addr = None
1194 if mgmt_intf_desc.has_field("cp"):
1195 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1196 elif mgmt_intf_desc.has_field("vdu_id"):
1197 try:
1198 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1199 ip_addr = vdur.management_ip
1200 except VDURecordNotFound:
1201 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1202 ip_addr = None
1203 else:
1204 ip_addr = mgmt_intf_desc.ip_address
1205 port = mgmt_intf_desc.port
1206
1207 return ip_addr, port
1208
1209 @property
1210 def msg(self):
1211 """ Message associated with this VNFR """
1212 vnfd_fields = ["short_name", "vendor", "description", "version"]
1213 vnfd_copy_dict = {k: v for k, v in self.vnfd.msg.as_dict().items() if k in vnfd_fields}
1214
1215 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1216 ip_address, port = self.mgmt_intf_info()
1217
1218 if ip_address is not None:
1219 mgmt_intf.ip_address = ip_address
1220 if port is not None:
1221 mgmt_intf.port = port
1222
1223 vnfr_dict = {"id": self._vnfr_id,
1224 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1225 "name": self.name,
1226 "member_vnf_index_ref": self.member_vnf_index,
1227 "vnfd_ref": self.vnfd_id,
1228 "operational_status": self.operational_status,
1229 "operational_status_details": self._state_failed_reason,
1230 "cloud_account": self.cloud_account_name,
1231 "config_status": self._config_status
1232 }
1233
1234 vnfr_dict.update(vnfd_copy_dict)
1235
1236 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1237 vnfr_msg.uptime = int(time.time()) - self._create_time
1238 vnfr_msg.mgmt_interface = mgmt_intf
1239
1240 # Add all the VLRs to VNFR
1241 for vlr in self._vlrs:
1242 ivlr = vnfr_msg.internal_vlr.add()
1243 ivlr.vlr_ref = vlr.vlr_id
1244
1245 # Add all the VDURs to VDUR
1246 if self._vdus is not None:
1247 for vdu in self._vdus:
1248 vdur = vnfr_msg.vdur.add()
1249 vdur.from_dict(vdu.msg.as_dict())
1250
1251 if self.vnfd.msg.mgmt_interface.has_field('dashboard_params'):
1252 vnfr_msg.dashboard_url = self.dashboard_url
1253
1254 for cpr in self._cprs:
1255 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1256 vnfr_msg.connection_point.append(new_cp)
1257
1258 if self._vnf_mon is not None:
1259 for monp in self._vnf_mon.msg:
1260 vnfr_msg.monitoring_param.append(
1261 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1262
1263 if self._vnfr.vnf_configuration is not None:
1264 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1265 if (ip_address is not None and
1266 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1267 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1268
1269 for group in self._vnfr_msg.placement_groups_info:
1270 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1271 group_info.from_dict(group.as_dict())
1272 vnfr_msg.placement_groups_info.append(group_info)
1273
1274 return vnfr_msg
1275
1276 @property
1277 def dashboard_url(self):
1278 ip, cfg_port = self.mgmt_intf_info()
1279 protocol = 'http'
1280 http_port = 80
1281 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('https'):
1282 if self.vnfd.msg.mgmt_interface.dashboard_params.https is True:
1283 protocol = 'https'
1284 http_port = 443
1285 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('port'):
1286 http_port = self.vnfd.msg.mgmt_interface.dashboard_params.port
1287
1288 url = "{protocol}://{ip_address}:{port}/{path}".format(
1289 protocol=protocol,
1290 ip_address=ip,
1291 port=http_port,
1292 path=self.vnfd.msg.mgmt_interface.dashboard_params.path.lstrip("/"),
1293 )
1294
1295 return url
1296
1297 @property
1298 def xpath(self):
1299 """ path for this VNFR """
1300 return("D,/vnfr:vnfr-catalog"
1301 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1302
1303 @asyncio.coroutine
1304 def publish(self, xact):
1305 """ publish this VNFR """
1306 vnfr = self.msg
1307 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1308 self.xpath, self.msg)
1309 vnfr.create_time = self._create_time
1310 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1311 self._log.debug("Published VNFR path = [%s], record = [%s]",
1312 self.xpath, self.msg)
1313
1314 @asyncio.coroutine
1315 def create_vls(self):
1316 """ Publish The VLs associated with this VNF """
1317 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1318 self.vnfd_id)
1319 for ivld_msg in self.vnfd.msg.internal_vld:
1320 self._log.debug("Creating internal vld:"
1321 " %s, int_cp_ref = %s",
1322 ivld_msg, ivld_msg.internal_connection_point
1323 )
1324 vlr = InternalVirtualLinkRecord(dts=self._dts,
1325 log=self._log,
1326 loop=self._loop,
1327 ivld_msg=ivld_msg,
1328 vnfr_name=self.name,
1329 cloud_account_name=self.cloud_account_name
1330 )
1331 self._vlrs.append(vlr)
1332
1333 for int_cp in ivld_msg.internal_connection_point:
1334 if int_cp.id_ref in self._vlr_by_cp:
1335 msg = ("Connection point %s already "
1336 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1337 raise InternalVirtualLinkRecordError(msg)
1338 self._log.debug("Setting vlr %s to internal cp = %s",
1339 vlr, int_cp.id_ref)
1340 self._vlr_by_cp[int_cp.id_ref] = vlr
1341
1342 @asyncio.coroutine
1343 def instantiate_vls(self, xact, restart_mode=False):
1344 """ Instantiate the VLs associated with this VNF """
1345 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1346 self.vnfd_id)
1347
1348 for vlr in self._vlrs:
1349 self._log.debug("Instantiating VLR %s", vlr)
1350 yield from vlr.instantiate(xact, restart_mode)
1351
1352 def find_vlr_by_cp(self, cp_name):
1353 """ Find the VLR associated with the cp name """
1354 return self._vlr_by_cp[cp_name]
1355
1356 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1357 """
1358 Returns the cloud specific construct for placement group
1359 Arguments:
1360 input_group: VNFD PlacementGroup
1361 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1362 """
1363 copy_dict = ['name', 'requirement', 'strategy']
1364 for group_info in nsr_config.vnfd_placement_group_maps:
1365 if group_info.placement_group_ref == input_group.name and \
1366 group_info.vnfd_id_ref == self.vnfd_id:
1367 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1368 group_dict = {k:v for k,v in
1369 group_info.as_dict().items()
1370 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1371 for param in copy_dict:
1372 group_dict.update({param: getattr(input_group, param)})
1373 group.from_dict(group_dict)
1374 return group
1375 return None
1376
1377 @asyncio.coroutine
1378 def get_vdu_placement_groups(self, vdu):
1379 placement_groups = []
1380 ### Step-1: Get VNF level placement groups
1381 for group in self._vnfr_msg.placement_groups_info:
1382 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1383 #group_info.from_dict(group.as_dict())
1384 placement_groups.append(group)
1385
1386 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1387 nsr_config = yield from self.get_nsr_config()
1388
1389 ### Step-3: Get VDU level placement groups
1390 for group in self.vnfd.msg.placement_groups:
1391 for member_vdu in group.member_vdus:
1392 if member_vdu.member_vdu_ref == vdu.id:
1393 group_info = self.resolve_placement_group_cloud_construct(group,
1394 nsr_config)
1395 if group_info is None:
1396 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1397 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1398 else:
1399 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1400 str(group_info),
1401 vdu.name,
1402 self.vnf_name,
1403 self.member_vnf_index)
1404 placement_groups.append(group_info)
1405
1406 return placement_groups
1407
1408 @asyncio.coroutine
1409 def create_vdus(self, vnfr, restart_mode=False):
1410 """ Create the VDUs associated with this VNF """
1411
1412 def get_vdur_id(vdud):
1413 """Get the corresponding VDUR's id for the VDUD. This is useful in
1414 case of a restart.
1415
1416 In restart mode we check for exiting VDUR's ID and use them, if
1417 available. This way we don't end up creating duplicate VDURs
1418 """
1419 vdur_id = None
1420
1421 if restart_mode and vdud is not None:
1422 try:
1423 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1424 vdur_id = vdur[0]
1425 except IndexError:
1426 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1427
1428 return vdur_id
1429
1430
1431 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1432 for vdu in self.vnfd.msg.vdu:
1433 self._log.debug("Creating vdu: %s", vdu)
1434 vdur_id = get_vdur_id(vdu)
1435
1436 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1437 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1438 vdu.name,
1439 self.vnf_name,
1440 self.member_vnf_index,
1441 [ group.name for group in placement_groups])
1442
1443 vdur = VirtualDeploymentUnitRecord(
1444 dts=self._dts,
1445 log=self._log,
1446 loop=self._loop,
1447 vdud=vdu,
1448 vnfr=vnfr,
1449 mgmt_intf=self.has_mgmt_interface(vdu),
1450 mgmt_network=self._mgmt_network,
1451 cloud_account_name=self.cloud_account_name,
1452 vnfd_package_store=self._vnfd_package_store,
1453 vdur_id=vdur_id,
1454 placement_groups = placement_groups,
1455 )
1456 yield from vdur.vdu_opdata_register()
1457
1458 self._vdus.append(vdur)
1459
1460 @asyncio.coroutine
1461 def instantiate_vdus(self, xact, vnfr):
1462 """ Instantiate the VDUs associated with this VNF """
1463 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1464
1465 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1466
1467 # Identify any dependencies among the VDUs
1468 dependencies = collections.defaultdict(list)
1469 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1470
1471 for vdu in self._vdus:
1472 if vdu.vdud_cloud_init is not None:
1473 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1474 if vdu_id != vdu.vdu_id:
1475 # This means that vdu.vdu_id depends upon vdu_id,
1476 # i.e. vdu_id must be instantiated before
1477 # vdu.vdu_id.
1478 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1479
1480 # Define the terminal states of VDU instantiation
1481 terminal = (
1482 VDURecordState.READY,
1483 VDURecordState.TERMINATED,
1484 VDURecordState.FAILED,
1485 )
1486
1487 datastore = VdurDatastore()
1488 processed = set()
1489
1490 @asyncio.coroutine
1491 def instantiate_monitor(vdu):
1492 """Monitor the state of the VDU during instantiation
1493
1494 Arguments:
1495 vdu - a VirtualDeploymentUnitRecord
1496
1497 """
1498 # wait for the VDUR to enter a terminal state
1499 while vdu._state not in terminal:
1500 yield from asyncio.sleep(1, loop=self._loop)
1501
1502 # update the datastore
1503 datastore.update(vdu)
1504
1505 # add the VDU to the set of processed VDUs
1506 processed.add(vdu.vdu_id)
1507
1508 @asyncio.coroutine
1509 def instantiate(vdu):
1510 """Instantiate the specified VDU
1511
1512 Arguments:
1513 vdu - a VirtualDeploymentUnitRecord
1514
1515 Raises:
1516 if the VDU, or any of the VDUs this VDU depends upon, are
1517 terminated or fail to instantiate properly, a
1518 VirtualDeploymentUnitRecordError is raised.
1519
1520 """
1521 for dependency in dependencies[vdu.vdu_id]:
1522 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1523
1524 while dependency.vdu_id not in processed:
1525 yield from asyncio.sleep(1, loop=self._loop)
1526
1527 if not dependency.active:
1528 raise VirtualDeploymentUnitRecordError()
1529
1530 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1531
1532 # Populate the datastore with the current values of the VDU
1533 datastore.add(vdu)
1534
1535 # Substitute any variables contained in the cloud config script
1536 config = str(vdu.vdud_cloud_init)
1537
1538 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1539 if len(parts) > 1:
1540
1541 # Extract the variable names
1542 variables = list()
1543 for variable in parts[1::2]:
1544 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1545
1546 # Iterate of the variables and substitute values from the
1547 # datastore.
1548 for variable in variables:
1549
1550 # Handle a reference to a VDU by ID
1551 if variable.startswith('vdu['):
1552 value = datastore.get(variable)
1553 if value is None:
1554 msg = "Unable to find a substitute for {} in {} cloud-init script"
1555 raise ValueError(msg.format(variable, vdu.vdu_id))
1556
1557 config = config.replace("{{ %s }}" % variable, value)
1558 continue
1559
1560 # Handle a reference to the current VDU
1561 if variable.startswith('vdu'):
1562 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1563 config = config.replace("{{ %s }}" % variable, value)
1564 continue
1565
1566 # Handle unrecognized variables
1567 msg = 'unrecognized cloud-config variable: {}'
1568 raise ValueError(msg.format(variable))
1569
1570 # Instantiate the VDU
1571 with self._dts.transaction() as xact:
1572 self._log.debug("Instantiating vdu: %s", vdu)
1573 yield from vdu.instantiate(xact, vnfr, config=config)
1574 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1575 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1576 self.vnfr_id, vdu)
1577
1578 # First create a set of tasks to monitor the state of the VDUs and
1579 # report when they have entered a terminal state
1580 for vdu in self._vdus:
1581 self._loop.create_task(instantiate_monitor(vdu))
1582
1583 for vdu in self._vdus:
1584 self._loop.create_task(instantiate(vdu))
1585
1586 def has_mgmt_interface(self, vdu):
1587 # ## TODO: Support additional mgmt_interface type options
1588 if self.vnfd.msg.mgmt_interface.vdu_id == vdu.id:
1589 return True
1590 return False
1591
1592 def vlr_xpath(self, vlr_id):
1593 """ vlr xpath """
1594 return(
1595 "D,/vlr:vlr-catalog/"
1596 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1597
1598 def ext_vlr_by_id(self, vlr_id):
1599 """ find ext vlr by id """
1600 return self._ext_vlrs[vlr_id]
1601
1602 @asyncio.coroutine
1603 def publish_inventory(self, xact):
1604 """ Publish the inventory associated with this VNF """
1605 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1606
1607 for component in self.vnfd.msg.component:
1608 self._log.debug("Creating inventory component %s", component)
1609 mangled_name = VcsComponent.mangle_name(component.component_name,
1610 self.vnf_name,
1611 self.vnfd_id
1612 )
1613 comp = VcsComponent(dts=self._dts,
1614 log=self._log,
1615 loop=self._loop,
1616 cluster_name=self._cluster_name,
1617 vcs_handler=self._vcs_handler,
1618 component=component,
1619 mangled_name=mangled_name,
1620 )
1621 if comp.name in self._inventory:
1622 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1623 component, self._vnfd_id)
1624 return
1625 self._log.debug("Adding component %s for vnrf %s",
1626 comp.name, self._vnfr_id)
1627 self._inventory[comp.name] = comp
1628 yield from comp.publish(xact)
1629
1630 def all_vdus_active(self):
1631 """ Are all VDUS in this VNFR active? """
1632 for vdu in self._vdus:
1633 if not vdu.active:
1634 return False
1635
1636 self._log.debug("Inside all_vdus_active. Returning True")
1637 return True
1638
1639 @asyncio.coroutine
1640 def instantiation_failed(self, failed_reason=None):
1641 """ VNFR instantiation failed """
1642 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1643 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1644 self._state_failed_reason = failed_reason
1645
1646 # Update the VNFR with the changed status
1647 yield from self.publish(None)
1648
1649 @asyncio.coroutine
1650 def is_ready(self):
1651 """ This VNF is ready"""
1652 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1653
1654 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1655 self.set_state(VirtualNetworkFunctionRecordState.READY)
1656
1657 else:
1658 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1659
1660 # Update the VNFR with the changed status
1661 yield from self.publish(None)
1662
1663 def update_cp(self, cp_name, ip_address, cp_id):
1664 """Updated the connection point with ip address"""
1665 for cp in self._cprs:
1666 if cp.name == cp_name:
1667 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1668 cp_name, cp, ip_address, cp_id)
1669 cp.ip_address = ip_address
1670 cp.connection_point_id = cp_id
1671 return
1672
1673 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1674 self._log.debug(err)
1675 raise VirtualDeploymentUnitRecordError(err)
1676
1677 def set_state(self, state):
1678 """ Set state for this VNFR"""
1679 self._state = state
1680
1681 @asyncio.coroutine
1682 def instantiate(self, xact, restart_mode=False):
1683 """ instantiate this VNF """
1684 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1685
1686 @asyncio.coroutine
1687 def fetch_vlrs():
1688 """ Fetch VLRs """
1689 # Iterate over all the connection points in VNFR and fetch the
1690 # associated VLRs
1691
1692 def cpr_from_cp(cp):
1693 """ Creates a record level connection point from the desciptor cp"""
1694 cp_fields = ["name", "image", "vm-flavor"]
1695 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1696 cpr_dict = {}
1697 cpr_dict.update(cp_copy_dict)
1698 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1699
1700 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1701 self._vnfr_id, self._vnfr.connection_point)
1702
1703 for cp in self._vnfr.connection_point:
1704 cpr = cpr_from_cp(cp)
1705 self._cprs.append(cpr)
1706 self._log.debug("Adding Connection point record %s ", cp)
1707
1708 vlr_path = self.vlr_xpath(cp.vlr_ref)
1709 self._log.debug("Fetching VLR with path = %s", vlr_path)
1710 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1711 rwdts.XactFlag.MERGE)
1712 for i in res_iter:
1713 r = yield from i
1714 d = r.result
1715 self._ext_vlrs[cp.vlr_ref] = d
1716 cpr.vlr_ref = cp.vlr_ref
1717 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1718
1719 # Fetch the VNFD associated with the VNFR
1720 self._log.debug("VNFR-ID %s: Fetching vnfds", self._vnfr_id)
1721 self._vnfd = yield from self._vnfm.get_vnfd_ref(self._vnfd_id)
1722 self._log.debug("VNFR-ID %s: Fetched vnfd:%s", self._vnfr_id, self._vnfd)
1723
1724 assert self.vnfd is not None
1725
1726 # Fetch External VLRs
1727 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1728 yield from fetch_vlrs()
1729
1730 # Publish inventory
1731 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1732 yield from self.publish_inventory(xact)
1733
1734 # Publish inventory
1735 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1736 yield from self.create_vls()
1737
1738 # publish the VNFR
1739 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1740 yield from self.publish(xact)
1741
1742 # instantiate VLs
1743 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1744 try:
1745 yield from self.instantiate_vls(xact, restart_mode)
1746 except Exception as e:
1747 self._log.exception("VL instantiation failed (%s)", str(e))
1748 yield from self.instantiation_failed(str(e))
1749 return
1750
1751 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1752
1753 # instantiate VDUs
1754 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1755 yield from self.create_vdus(self, restart_mode)
1756
1757 # publish the VNFR
1758 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1759 yield from self.publish(xact)
1760
1761 # instantiate VDUs
1762 # ToDo: Check if this should be prevented during restart
1763 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1764 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1765
1766 # publish the VNFR
1767 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1768 yield from self.publish(xact)
1769
1770 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1771
1772 # create task updating uptime for this vnfr
1773 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1774 self._loop.create_task(self.vnfr_uptime_update(xact))
1775
1776 @asyncio.coroutine
1777 def terminate(self, xact):
1778 """ Terminate this virtual network function """
1779
1780 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1781
1782 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1783
1784 # stop monitoring
1785 if self._vnf_mon is not None:
1786 self._vnf_mon.stop()
1787 self._vnf_mon.deregister()
1788 self._vnf_mon = None
1789
1790 @asyncio.coroutine
1791 def terminate_vls():
1792 """ Terminate VLs in this VNF """
1793 for vl in self._vlrs:
1794 yield from vl.terminate(xact)
1795
1796 @asyncio.coroutine
1797 def terminate_vdus():
1798 """ Terminate VDUS in this VNF """
1799 for vdu in self._vdus:
1800 yield from vdu.terminate(xact)
1801
1802 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1803 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1804 yield from terminate_vls()
1805
1806 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1807 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1808 yield from terminate_vdus()
1809
1810 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1811 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1812
1813 @asyncio.coroutine
1814 def vnfr_uptime_update(self, xact):
1815 while True:
1816 # Return when vnfr state is FAILED or TERMINATED etc
1817 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1818 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1819 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1820 VirtualNetworkFunctionRecordState.READY]:
1821 return
1822 yield from self.publish(xact)
1823 yield from asyncio.sleep(2, loop=self._loop)
1824
1825
1826
1827 class VnfdDtsHandler(object):
1828 """ DTS handler for VNFD config changes """
1829 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1830
1831 def __init__(self, dts, log, loop, vnfm):
1832 self._dts = dts
1833 self._log = log
1834 self._loop = loop
1835 self._vnfm = vnfm
1836 self._regh = None
1837
1838 @asyncio.coroutine
1839 def regh(self):
1840 """ DTS registration handle """
1841 return self._regh
1842
1843 @asyncio.coroutine
1844 def register(self):
1845 """ Register for VNFD configuration"""
1846
1847 def on_apply(dts, acg, xact, action, scratch):
1848 """Apply the configuration"""
1849 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1850 xact, action, scratch)
1851
1852 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1853 # Create/Update a VNFD record
1854 for cfg in self._regh.get_xact_elements(xact):
1855 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1856 if cfg.id in scratch.get('vnfds', []) or is_recovery:
1857 self._vnfm.update_vnfd(cfg)
1858
1859 scratch.pop('vnfds', None)
1860
1861 @asyncio.coroutine
1862 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1863 """ on prepare callback """
1864 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1865 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1866 fref = ProtobufC.FieldReference.alloc()
1867 fref.goto_whole_message(msg.to_pbcm())
1868
1869 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1870 if fref.is_field_deleted():
1871 # Delete an VNFD record
1872 self._log.debug("Deleting VNFD with id %s", msg.id)
1873 if self._vnfm.vnfd_in_use(msg.id):
1874 self._log.debug("Cannot delete VNFD in use - %s", msg)
1875 err = "Cannot delete a VNFD in use - %s" % msg
1876 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1877 # Delete a VNFD record
1878 yield from self._vnfm.delete_vnfd(msg.id)
1879 else:
1880 # Handle actual adds/updates in apply_callback,
1881 # just check if VNFD in use in prepare_callback
1882 if self._vnfm.vnfd_in_use(msg.id):
1883 self._log.debug("Cannot modify an VNFD in use - %s", msg)
1884 err = "Cannot modify an VNFD in use - %s" % msg
1885 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1886
1887 # Add this VNFD to scratch to create/update in apply callback
1888 vnfds = scratch.setdefault('vnfds', [])
1889 vnfds.append(msg.id)
1890
1891 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1892
1893 self._log.debug(
1894 "Registering for VNFD config using xpath: %s",
1895 VnfdDtsHandler.XPATH,
1896 )
1897 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1898 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1899 self._regh = acg.register(
1900 xpath=VnfdDtsHandler.XPATH,
1901 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1902 on_prepare=on_prepare)
1903
1904
1905 class VcsComponentDtsHandler(object):
1906 """ Vcs Component DTS handler """
1907 XPATH = ("D,/rw-manifest:manifest" +
1908 "/rw-manifest:operational-inventory" +
1909 "/rw-manifest:component")
1910
1911 def __init__(self, dts, log, loop, vnfm):
1912 self._dts = dts
1913 self._log = log
1914 self._loop = loop
1915 self._regh = None
1916 self._vnfm = vnfm
1917
1918 @property
1919 def regh(self):
1920 """ DTS registration handle """
1921 return self._regh
1922
1923 @asyncio.coroutine
1924 def register(self):
1925 """ Registers VCS component dts publisher registration"""
1926 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1927 VcsComponentDtsHandler.XPATH)
1928
1929 hdl = rift.tasklets.DTS.RegistrationHandler()
1930 handlers = rift.tasklets.Group.Handler()
1931 with self._dts.group_create(handler=handlers) as group:
1932 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1933 handler=hdl,
1934 flags=(rwdts.Flag.PUBLISHER |
1935 rwdts.Flag.NO_PREP_READ |
1936 rwdts.Flag.DATASTORE),)
1937
1938 @asyncio.coroutine
1939 def publish(self, xact, path, msg):
1940 """ Publishes the VCS component """
1941 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1942 xact, path, msg)
1943 self.regh.create_element(path, msg)
1944 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1945 VcsComponentDtsHandler.XPATH, xact, path, msg)
1946
1947 class VnfrConsoleOperdataDtsHandler(object):
1948 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1949 @property
1950 def vnfr_vdu_console_xpath(self):
1951 """ path for resource-mgr"""
1952 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1953
1954 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1955 self._dts = dts
1956 self._log = log
1957 self._loop = loop
1958 self._regh = None
1959 self._vnfm = vnfm
1960
1961 self._vnfr_id = vnfr_id
1962 self._vdur_id = vdur_id
1963 self._vdu_id = vdu_id
1964
1965 @asyncio.coroutine
1966 def register(self):
1967 """ Register for VNFR VDU Operational Data read from dts """
1968
1969 @asyncio.coroutine
1970 def on_prepare(xact_info, action, ks_path, msg):
1971 """ prepare callback from dts """
1972 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
1973 self._log.debug(
1974 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1975 xact_info, action, xpath, msg
1976 )
1977
1978 if action == rwdts.QueryAction.READ:
1979 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
1980 path_entry = schema.keyspec_to_entry(ks_path)
1981 self._log.debug("VDU Opdata path is {}".format(path_entry))
1982 try:
1983 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
1984 except VnfRecordError as e:
1985 self._log.error("VNFR id %s not found", self._vnfr_id)
1986 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
1987 return
1988 try:
1989 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
1990 if not vdur._state == VDURecordState.READY:
1991 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
1992 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
1993 return
1994 with self._dts.transaction() as new_xact:
1995 resp = yield from vdur.read_resource(new_xact)
1996 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1997 vdur_console.id = self._vdur_id
1998 if resp.console_url:
1999 vdur_console.console_url = resp.console_url
2000 else:
2001 vdur_console.console_url = 'none'
2002 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2003 except Exception:
2004 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2005 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2006 vdur_console.id = self._vdur_id
2007 vdur_console.console_url = 'none'
2008
2009 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2010 xpath=self.vnfr_vdu_console_xpath,
2011 msg=vdur_console)
2012 else:
2013 #raise VnfRecordError("Not supported operation %s" % action)
2014 self._log.error("Not supported operation %s" % action)
2015 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2016 return
2017
2018
2019 self._log.debug("Registering for VNFR VDU using xpath: %s",
2020 self.vnfr_vdu_console_xpath)
2021 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2022 with self._dts.group_create() as group:
2023 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2024 handler=hdl,
2025 flags=rwdts.Flag.PUBLISHER,
2026 )
2027
2028
2029 class VnfrDtsHandler(object):
2030 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2031 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2032
2033 def __init__(self, dts, log, loop, vnfm):
2034 self._dts = dts
2035 self._log = log
2036 self._loop = loop
2037 self._vnfm = vnfm
2038
2039 self._regh = None
2040
2041 @property
2042 def regh(self):
2043 """ Return registration handle"""
2044 return self._regh
2045
2046 @property
2047 def vnfm(self):
2048 """ Return VNF manager instance """
2049 return self._vnfm
2050
2051 @asyncio.coroutine
2052 def register(self):
2053 """ Register for vnfr create/update/delete/read requests from dts """
2054 def on_commit(xact_info):
2055 """ The transaction has been committed """
2056 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2057 return rwdts.MemberRspCode.ACTION_OK
2058
2059 def on_abort(*args):
2060 """ Abort callback """
2061 self._log.debug("VNF transaction got aborted")
2062
2063 @asyncio.coroutine
2064 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2065
2066 @asyncio.coroutine
2067 def instantiate_realloc_vnfr(vnfr):
2068 """Re-populate the vnfm after restart
2069
2070 Arguments:
2071 vlink
2072
2073 """
2074
2075 yield from vnfr.instantiate(None, restart_mode=True)
2076
2077 if xact_event == rwdts.MemberEvent.INSTALL:
2078 curr_cfg = self.regh.elements
2079 for cfg in curr_cfg:
2080 vnfr = self.vnfm.create_vnfr(cfg)
2081 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2082
2083 self._log.debug("Got on_event in vnfm")
2084
2085 return rwdts.MemberRspCode.ACTION_OK
2086
2087 @asyncio.coroutine
2088 def on_prepare(xact_info, action, ks_path, msg):
2089 """ prepare callback from dts """
2090 self._log.debug(
2091 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2092 xact_info, action, msg
2093 )
2094
2095 if action == rwdts.QueryAction.CREATE:
2096 if not msg.has_field("vnfd_ref"):
2097 err = "Vnfd reference not provided"
2098 self._log.error(err)
2099 raise VnfRecordError(err)
2100
2101 vnfr = self.vnfm.create_vnfr(msg)
2102 try:
2103 # RIFT-9105: Unable to add a READ query under an existing transaction
2104 # xact = xact_info.xact
2105 yield from vnfr.instantiate(None)
2106 except Exception as e:
2107 self._log.exception(e)
2108 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2109 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2110 yield from vnfr.publish(None)
2111 elif action == rwdts.QueryAction.DELETE:
2112 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2113 path_entry = schema.keyspec_to_entry(ks_path)
2114 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2115
2116 if vnfr is None:
2117 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2118 raise VirtualNetworkFunctionRecordNotFound(
2119 "VNFR id %s", path_entry.key00.id)
2120
2121 try:
2122 yield from vnfr.terminate(xact_info.xact)
2123 # Unref the VNFD
2124 vnfr.vnfd.unref()
2125 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2126 except Exception as e:
2127 self._log.exception(e)
2128 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2129
2130 elif action == rwdts.QueryAction.UPDATE:
2131 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2132 path_entry = schema.keyspec_to_entry(ks_path)
2133 vnfr = None
2134 try:
2135 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2136 except Exception as e:
2137 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2138 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2139 return
2140
2141 if vnfr is None:
2142 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2143 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2144 return
2145
2146 self._log.debug("VNFR {} update config status {} (current {})".
2147 format(vnfr.name, msg.config_status, vnfr.config_status))
2148 # Update the config status and publish
2149 vnfr._config_status = msg.config_status
2150 yield from vnfr.publish(None)
2151
2152 else:
2153 raise NotImplementedError(
2154 "%s action on VirtualNetworkFunctionRecord not supported",
2155 action)
2156
2157 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2158
2159 self._log.debug("Registering for VNFR using xpath: %s",
2160 VnfrDtsHandler.XPATH,)
2161
2162 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2163 on_prepare=on_prepare,)
2164 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2165 with self._dts.group_create(handler=handlers) as group:
2166 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2167 handler=hdl,
2168 flags=(rwdts.Flag.PUBLISHER |
2169 rwdts.Flag.NO_PREP_READ |
2170 rwdts.Flag.CACHE |
2171 rwdts.Flag.DATASTORE),)
2172
2173 @asyncio.coroutine
2174 def create(self, xact, path, msg):
2175 """
2176 Create a VNFR record in DTS with path and message
2177 """
2178 self._log.debug("Creating VNFR xact = %s, %s:%s",
2179 xact, path, msg)
2180
2181 self.regh.create_element(path, msg)
2182 self._log.debug("Created VNFR xact = %s, %s:%s",
2183 xact, path, msg)
2184
2185 @asyncio.coroutine
2186 def update(self, xact, path, msg):
2187 """
2188 Update a VNFR record in DTS with path and message
2189 """
2190 self._log.debug("Updating VNFR xact = %s, %s:%s",
2191 xact, path, msg)
2192 self.regh.update_element(path, msg)
2193 self._log.debug("Updated VNFR xact = %s, %s:%s",
2194 xact, path, msg)
2195
2196 @asyncio.coroutine
2197 def delete(self, xact, path):
2198 """
2199 Delete a VNFR record in DTS with path and message
2200 """
2201 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2202 self.regh.delete_element(path)
2203 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2204
2205
2206 class VirtualNetworkFunctionDescriptor(object):
2207 """
2208 Virtual Network Function descriptor class
2209 """
2210
2211 def __init__(self, dts, log, loop, vnfm, vnfd):
2212 self._dts = dts
2213 self._log = log
2214 self._loop = loop
2215
2216 self._vnfm = vnfm
2217 self._vnfd = vnfd
2218 self._ref_count = 0
2219
2220 @property
2221 def ref_count(self):
2222 """ Returns the reference count associated with
2223 this Virtual Network Function Descriptor"""
2224 return self._ref_count
2225
2226 @property
2227 def id(self):
2228 """ Returns vnfd id """
2229 return self._vnfd.id
2230
2231 @property
2232 def name(self):
2233 """ Returns vnfd name """
2234 return self._vnfd.name
2235
2236 def in_use(self):
2237 """ Returns whether vnfd is in use or not """
2238 return True if self._ref_count > 0 else False
2239
2240 def ref(self):
2241 """ Take a reference on this object """
2242 self._ref_count += 1
2243 return self._ref_count
2244
2245 def unref(self):
2246 """ Release reference on this object """
2247 if self.ref_count < 1:
2248 msg = ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2249 (self.id, self._ref_count))
2250 self._log.critical(msg)
2251 raise VnfRecordError(msg)
2252 self._log.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2253 self.id, self.ref_count)
2254 self._ref_count -= 1
2255 return self._ref_count
2256
2257 @property
2258 def msg(self):
2259 """ Return the message associated with this NetworkServiceDescriptor"""
2260 return self._vnfd
2261
2262 @staticmethod
2263 def path_for_id(vnfd_id):
2264 """ Return path for the passed vnfd_id"""
2265 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
2266
2267 def path(self):
2268 """ Return the path associated with this NetworkServiceDescriptor"""
2269 return VirtualNetworkFunctionDescriptor.path_for_id(self.id)
2270
2271 def update(self, vnfd):
2272 """ Update the Virtual Network Function Descriptor """
2273 if self.in_use():
2274 self._log.error("Cannot update descriptor %s in use refcnt=%d",
2275 self.id, self.ref_count)
2276
2277 # The following loop is added to debug RIFT-13284
2278 for vnf_rec in self._vnfm._vnfrs.values():
2279 if vnf_rec.vnfd_id == self.id:
2280 self._log.error("descriptor %s in used by %s:%s",
2281 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2282 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self.id)
2283 self._vnfd = vnfd
2284
2285 def delete(self):
2286 """ Delete the Virtual Network Function Descriptor """
2287 if self.in_use():
2288 self._log.error("Cannot delete descriptor %s in use refcnt=%d",
2289 self.id)
2290
2291 # The following loop is added to debug RIFT-13284
2292 for vnf_rec in self._vnfm._vnfrs.values():
2293 if vnf_rec.vnfd_id == self.id:
2294 self._log.error("descriptor %s in used by %s:%s",
2295 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2296 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self.id)
2297 self._vnfm.delete_vnfd(self.id)
2298
2299
2300 class VnfdRefCountDtsHandler(object):
2301 """ The VNFD Ref Count DTS handler """
2302 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2303
2304 def __init__(self, dts, log, loop, vnfm):
2305 self._dts = dts
2306 self._log = log
2307 self._loop = loop
2308 self._vnfm = vnfm
2309
2310 self._regh = None
2311
2312 @property
2313 def regh(self):
2314 """ Return registration handle """
2315 return self._regh
2316
2317 @property
2318 def vnfm(self):
2319 """ Return the NS manager instance """
2320 return self._vnfm
2321
2322 @asyncio.coroutine
2323 def register(self):
2324 """ Register for VNFD ref count read from dts """
2325
2326 @asyncio.coroutine
2327 def on_prepare(xact_info, action, ks_path, msg):
2328 """ prepare callback from dts """
2329 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2330 self._log.debug(
2331 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2332 xact_info, action, xpath, msg
2333 )
2334
2335 if action == rwdts.QueryAction.READ:
2336 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2337 path_entry = schema.keyspec_to_entry(ks_path)
2338 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2339 for xpath, msg in vnfd_list:
2340 self._log.debug("Responding to ref count query path:%s, msg:%s",
2341 xpath, msg)
2342 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2343 xpath=xpath,
2344 msg=msg)
2345 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2346 else:
2347 raise VnfRecordError("Not supported operation %s" % action)
2348
2349 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2350 with self._dts.group_create() as group:
2351 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2352 handler=hdl,
2353 flags=rwdts.Flag.PUBLISHER,
2354 )
2355
2356
2357 class VdurDatastore(object):
2358 """
2359 This VdurDatastore is intended to expose select information about a VDUR
2360 such that it can be referenced in a cloud config file. The data that is
2361 exposed does not necessarily follow the structure of the data in the yang
2362 model. This is intentional. The data that are exposed are intended to be
2363 agnostic of the yang model so that changes in the model do not necessarily
2364 require changes to the interface provided to the user. It also means that
2365 the user does not need to be familiar with the RIFT.ware yang models.
2366 """
2367
2368 def __init__(self):
2369 """Create an instance of VdurDatastore"""
2370 self._vdur_data = dict()
2371 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2372
2373 def add(self, vdur):
2374 """Add a new VDUR to the datastore
2375
2376 Arguments:
2377 vdur - a VirtualDeploymentUnitRecord instance
2378
2379 Raises:
2380 A ValueError is raised if the VDUR is (1) None or (2) already in
2381 the datastore.
2382
2383 """
2384 if vdur.vdu_id is None:
2385 raise ValueError('VDURs are required to have an ID')
2386
2387 if vdur.vdu_id in self._vdur_data:
2388 raise ValueError('cannot add a VDUR more than once')
2389
2390 self._vdur_data[vdur.vdu_id] = dict()
2391
2392 def set_if_not_none(key, attr):
2393 if attr is not None:
2394 self._vdur_data[vdur.vdu_id][key] = attr
2395
2396 set_if_not_none('name', vdur._vdud.name)
2397 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2398
2399 def update(self, vdur):
2400 """Update the VDUR information in the datastore
2401
2402 Arguments:
2403 vdur - a GI representation of a VDUR
2404
2405 Raises:
2406 A ValueError is raised if the VDUR is (1) None or (2) already in
2407 the datastore.
2408
2409 """
2410 if vdur.vdu_id is None:
2411 raise ValueError('VNFDs are required to have an ID')
2412
2413 if vdur.vdu_id not in self._vdur_data:
2414 raise ValueError('VNF is not recognized')
2415
2416 def set_or_delete(key, attr):
2417 if attr is None:
2418 if key in self._vdur_data[vdur.vdu_id]:
2419 del self._vdur_data[vdur.vdu_id][key]
2420
2421 else:
2422 self._vdur_data[vdur.vdu_id][key] = attr
2423
2424 set_or_delete('name', vdur._vdud.name)
2425 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2426
2427 def remove(self, vdur_id):
2428 """Remove all of the data associated with specified VDUR
2429
2430 Arguments:
2431 vdur_id - the identifier of a VNFD in the datastore
2432
2433 Raises:
2434 A ValueError is raised if the VDUR is not contained in the
2435 datastore.
2436
2437 """
2438 if vdur_id not in self._vdur_data:
2439 raise ValueError('VNF is not recognized')
2440
2441 del self._vdur_data[vdur_id]
2442
2443 def get(self, expr):
2444 """Retrieve VDUR information from the datastore
2445
2446 An expression should be of the form,
2447
2448 vdu[<id>].<attr>
2449
2450 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2451 the exposed attribute that the user wishes to retrieve.
2452
2453 If the requested data is not available, None is returned.
2454
2455 Arguments:
2456 expr - a string that specifies the data to return
2457
2458 Raises:
2459 A ValueError is raised if the provided expression cannot be parsed.
2460
2461 Returns:
2462 The requested data or None
2463
2464 """
2465 result = self._pattern.match(expr)
2466 if result is None:
2467 raise ValueError('data expression not recognized ({})'.format(expr))
2468
2469 vdur_id, key = result.groups()
2470
2471 if vdur_id not in self._vdur_data:
2472 return None
2473
2474 return self._vdur_data[vdur_id].get(key, None)
2475
2476
2477 class VnfManager(object):
2478 """ The virtual network function manager class """
2479 def __init__(self, dts, log, loop, cluster_name):
2480 self._dts = dts
2481 self._log = log
2482 self._loop = loop
2483 self._cluster_name = cluster_name
2484
2485 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2486 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2487 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2488 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2489
2490 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2491 self._vcs_handler,
2492 self._nsr_handler]
2493 self._vnfrs = {}
2494 self._vnfds = {}
2495 self._nsrs = {}
2496
2497 @property
2498 def vnfr_handler(self):
2499 """ VNFR dts handler """
2500 return self._vnfr_handler
2501
2502 @property
2503 def vcs_handler(self):
2504 """ VCS dts handler """
2505 return self._vcs_handler
2506
2507 @asyncio.coroutine
2508 def register(self):
2509 """ Register all static DTS handlers """
2510 for hdl in self._dts_handlers:
2511 yield from hdl.register()
2512
2513 @asyncio.coroutine
2514 def run(self):
2515 """ Run this VNFM instance """
2516 self._log.debug("Run VNFManager - registering static DTS handlers""")
2517 yield from self.register()
2518
2519 def handle_nsr(self, nsr, action):
2520 if action in [rwdts.QueryAction.CREATE]:
2521 self._nsrs[nsr.id] = nsr
2522 elif action == rwdts.QueryAction.DELETE:
2523 if nsr.id in self._nsrs:
2524 del self._nsrs[nsr.id]
2525
2526 def get_linked_mgmt_network(self, vnfr):
2527 """For the given VNFR get the related mgmt network from the NSD, if
2528 available.
2529 """
2530 vnfd_id = vnfr.vnfd_ref
2531 nsr_id = vnfr.nsr_id_ref
2532
2533 # for the given related VNFR, get the corresponding NSR-config
2534 nsr_obj = None
2535 try:
2536 nsr_obj = self._nsrs[nsr_id]
2537 except KeyError:
2538 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2539
2540 # for the related NSD check if a VLD exists such that it's a mgmt
2541 # network
2542 for vld in nsr_obj.nsd.vld:
2543 if vld.mgmt_network:
2544 return vld.name
2545
2546 return None
2547
2548 def get_vnfr(self, vnfr_id):
2549 """ get VNFR by vnfr id """
2550
2551 if vnfr_id not in self._vnfrs:
2552 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2553
2554 return self._vnfrs[vnfr_id]
2555
2556 def create_vnfr(self, vnfr):
2557 """ Create a VNFR instance """
2558 if vnfr.id in self._vnfrs:
2559 msg = "Vnfr id %s already exists" % vnfr.id
2560 self._log.error(msg)
2561 raise VnfRecordError(msg)
2562
2563 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2564 vnfr.id,
2565 vnfr.vnfd_ref)
2566
2567 mgmt_network = self.get_linked_mgmt_network(vnfr)
2568
2569 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2570 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2571 mgmt_network=mgmt_network
2572 )
2573 return self._vnfrs[vnfr.id]
2574
2575 @asyncio.coroutine
2576 def delete_vnfr(self, xact, vnfr):
2577 """ Create a VNFR instance """
2578 if vnfr.vnfr_id in self._vnfrs:
2579 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2580 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2581 del self._vnfrs[vnfr.vnfr_id]
2582
2583 @asyncio.coroutine
2584 def fetch_vnfd(self, vnfd_id):
2585 """ Fetch VNFDs based with the vnfd id"""
2586 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2587 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2588 vnfd = None
2589
2590 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2591
2592 for ent in res_iter:
2593 res = yield from ent
2594 vnfd = res.result
2595
2596 if vnfd is None:
2597 err = "Failed to get Vnfd %s" % vnfd_id
2598 self._log.error(err)
2599 raise VnfRecordError(err)
2600
2601 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2602
2603 return vnfd
2604
2605 @asyncio.coroutine
2606 def get_vnfd_ref(self, vnfd_id):
2607 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2608 vnfd = yield from self.get_vnfd(vnfd_id)
2609 vnfd.ref()
2610 return vnfd
2611
2612 @asyncio.coroutine
2613 def get_vnfd(self, vnfd_id):
2614 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2615 vnfd = None
2616 if vnfd_id not in self._vnfds:
2617 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2618 vnfd = yield from self.fetch_vnfd(vnfd_id)
2619
2620 if vnfd is None:
2621 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2622 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD id:%s", vnfd_id)
2623
2624 if vnfd.id != vnfd_id:
2625 self._log.error("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2626 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2627
2628 if vnfd.id not in self._vnfds:
2629 self.create_vnfd(vnfd)
2630
2631 return self._vnfds[vnfd_id]
2632
2633 def vnfd_in_use(self, vnfd_id):
2634 """ Is this VNFD in use """
2635 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2636 if vnfd_id in self._vnfds:
2637 return self._vnfds[vnfd_id].in_use()
2638 return False
2639
2640 @asyncio.coroutine
2641 def publish_vnfr(self, xact, path, msg):
2642 """ Publish a VNFR """
2643 self._log.debug("publish_vnfr called with path %s, msg %s",
2644 path, msg)
2645 yield from self.vnfr_handler.update(xact, path, msg)
2646
2647 def create_vnfd(self, vnfd):
2648 """ Create a virtual network function descriptor """
2649 self._log.debug("Create virtual networkfunction descriptor - %s", vnfd)
2650 if vnfd.id in self._vnfds:
2651 self._log.error("Cannot create VNFD %s -VNFD id already exists", vnfd)
2652 raise VirtualNetworkFunctionDescriptorError("VNFD already exists-%s", vnfd.id)
2653
2654 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2655 self._log,
2656 self._loop,
2657 self,
2658 vnfd)
2659 return self._vnfds[vnfd.id]
2660
2661 def update_vnfd(self, vnfd):
2662 """ update the Virtual Network Function descriptor """
2663 self._log.debug("Update virtual network function descriptor - %s", vnfd)
2664
2665 if vnfd.id not in self._vnfds:
2666 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
2667 self.create_vnfd(vnfd)
2668 else:
2669 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
2670 self._vnfds[vnfd.id].update(vnfd)
2671
2672 @asyncio.coroutine
2673 def delete_vnfd(self, vnfd_id):
2674 """ Delete the Virtual Network Function descriptor with the passed id """
2675 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2676 if vnfd_id not in self._vnfds:
2677 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2678 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2679
2680 if self._vnfds[vnfd_id].in_use():
2681 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2682 vnfd_id,
2683 self._vnfds[vnfd_id].ref_count)
2684 raise VirtualNetworkFunctionDescriptorRefCountExists(
2685 "Cannot delete :%s, ref_count:%s",
2686 vnfd_id,
2687 self._vnfds[vnfd_id].ref_count)
2688
2689 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2690 try:
2691 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2692 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2693 if os.path.exists(vnfd_dir):
2694 shutil.rmtree(vnfd_dir, ignore_errors=True)
2695 except Exception as e:
2696 self._log.error("Exception in cleaning up VNFD {}: {}".
2697 format(self._vnfds[vnfd_id].name, e))
2698 self._log.exception(e)
2699
2700 del self._vnfds[vnfd_id]
2701
2702 def vnfd_refcount_xpath(self, vnfd_id):
2703 """ xpath for ref count entry """
2704 return (VnfdRefCountDtsHandler.XPATH +
2705 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2706
2707 @asyncio.coroutine
2708 def get_vnfd_refcount(self, vnfd_id):
2709 """ Get the vnfd_list from this VNFM"""
2710 vnfd_list = []
2711 if vnfd_id is None or vnfd_id == "":
2712 for vnfd in self._vnfds.values():
2713 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2714 vnfd_msg.vnfd_id_ref = vnfd.id
2715 vnfd_msg.instance_ref_count = vnfd.ref_count
2716 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2717 elif vnfd_id in self._vnfds:
2718 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2719 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2720 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2721
2722 return vnfd_list
2723
2724
2725 class VnfmTasklet(rift.tasklets.Tasklet):
2726 """ VNF Manager tasklet class """
2727 def __init__(self, *args, **kwargs):
2728 super(VnfmTasklet, self).__init__(*args, **kwargs)
2729 self.rwlog.set_category("rw-mano-log")
2730 self.rwlog.set_subcategory("vnfm")
2731
2732 self._dts = None
2733 self._vnfm = None
2734
2735 def start(self):
2736 try:
2737 super(VnfmTasklet, self).start()
2738 self.log.info("Starting VnfmTasklet")
2739
2740 self.log.setLevel(logging.DEBUG)
2741
2742 self.log.debug("Registering with dts")
2743 self._dts = rift.tasklets.DTS(self.tasklet_info,
2744 RwVnfmYang.get_schema(),
2745 self.loop,
2746 self.on_dts_state_change)
2747
2748 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2749 except Exception:
2750 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2751 raise
2752
2753 def on_instance_started(self):
2754 """ Task insance started callback """
2755 self.log.debug("Got instance started callback")
2756
2757 def stop(self):
2758 try:
2759 self._dts.deinit()
2760 except Exception:
2761 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2762 raise
2763
2764 @asyncio.coroutine
2765 def init(self):
2766 """ Task init callback """
2767 try:
2768 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2769 assert vm_parent_name is not None
2770 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2771 yield from self._vnfm.run()
2772 except Exception:
2773 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2774 raise
2775
2776 @asyncio.coroutine
2777 def run(self):
2778 """ Task run callback """
2779 pass
2780
2781 @asyncio.coroutine
2782 def on_dts_state_change(self, state):
2783 """Take action according to current dts state to transition
2784 application into the corresponding application state
2785
2786 Arguments
2787 state - current dts state
2788 """
2789 switch = {
2790 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2791 rwdts.State.CONFIG: rwdts.State.RUN,
2792 }
2793
2794 handlers = {
2795 rwdts.State.INIT: self.init,
2796 rwdts.State.RUN: self.run,
2797 }
2798
2799 # Transition application to next state
2800 handler = handlers.get(state, None)
2801 if handler is not None:
2802 yield from handler()
2803
2804 # Transition dts to next state
2805 next_state = switch.get(state, None)
2806 if next_state is not None:
2807 self._dts.handle.set_state(next_state)