Bug 113 add VNFD copy to VNFR
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.mano.dts as mano_dts
53
54
55 class VMResourceError(Exception):
56 """ VM resource Error"""
57 pass
58
59
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
62 pass
63
64
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
67 pass
68
69
70 class NotImplemented(Exception):
71 """Not implemented """
72 pass
73
74
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
77 pass
78
79
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
82 pass
83
84
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
87 pass
88
89
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
92 pass
93
94
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
97 pass
98
99
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
102 pass
103
104
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
107 pass
108
109
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
117 pass
118
119
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
127 pass
128
129
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
132 pass
133
134
135 class VNFMPlacementGroupError(Exception):
136 pass
137
138 class VirtualNetworkFunctionRecordState(enum.Enum):
139 """ VNFR state """
140 INIT = 1
141 VL_INIT_PHASE = 2
142 VM_INIT_PHASE = 3
143 READY = 4
144 TERMINATE = 5
145 VL_TERMINATE_PHASE = 6
146 VDU_TERMINATE_PHASE = 7
147 TERMINATED = 7
148 FAILED = 10
149
150
151 class VDURecordState(enum.Enum):
152 """VDU record state """
153 INIT = 1
154 INSTANTIATING = 2
155 RESOURCE_ALLOC_PENDING = 3
156 READY = 4
157 TERMINATING = 5
158 TERMINATED = 6
159 FAILED = 10
160
161
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
165 self._dts = dts
166 self._log = log
167 self._loop = loop
168 self._component = component
169 self._cluster_name = cluster_name
170 self._vcs_handler = vcs_handler
171 self._mangled_name = mangled_name
172
173 @staticmethod
174 def mangle_name(component_name, vnf_name, vnfd_id):
175 """ mangled component name """
176 return vnf_name + ":" + component_name + ":" + vnfd_id
177
178 @property
179 def name(self):
180 """ name of this component"""
181 return self._mangled_name
182
183 @property
184 def path(self):
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self.name)
190
191 @property
192 def instance_xpath(self):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
195 "/instances" +
196 "/instance" +
197 "[instance-name = '{}']".format(self._cluster_name))
198
199 @property
200 def start_comp_xpath(self):
201 """ start component xpath """
202 return (self.instance_xpath +
203 "/child-n[instance-name = 'START-REQ']")
204
205 def get_start_comp_msg(self, ip_address):
206 """ start this component """
207 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
208 start_msg.instance_name = 'START-REQ'
209 start_msg.component_name = self.name
210 start_msg.admin_command = "START"
211 start_msg.ip_address = ip_address
212
213 return start_msg
214
215 @property
216 def msg(self):
217 """ Returns the message for this vcs component"""
218
219 vcs_comp_dict = self._component.as_dict()
220
221 def mangle_comp_names(comp_dict):
222 """ mangle component name with VNF name, id"""
223 for key, val in comp_dict.items():
224 if isinstance(val, dict):
225 comp_dict[key] = mangle_comp_names(val)
226 elif isinstance(val, list):
227 i = 0
228 for ent in val:
229 if isinstance(ent, dict):
230 val[i] = mangle_comp_names(ent)
231 else:
232 val[i] = ent
233 i += 1
234 elif key == "component_name":
235 comp_dict[key] = VcsComponent.mangle_name(val,
236 self._vnfd_name,
237 self._vnfd_id)
238 return comp_dict
239
240 mangled_dict = mangle_comp_names(vcs_comp_dict)
241 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
242 return msg
243
244 @asyncio.coroutine
245 def publish(self, xact):
246 """ Publishes the VCS component """
247 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self.name, self.path, self.msg)
249 yield from self._vcs_handler.publish(xact, self.path, self.msg)
250
251 @asyncio.coroutine
252 def start(self, xact, parent, ip_addr=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg = self.get_start_comp_msg(ip_addr)
256 self._log.debug("starting component %s %s",
257 self.start_comp_xpath, start_msg)
258 yield from self._dts.query_create(self.start_comp_xpath,
259 0,
260 start_msg)
261 self._log.debug("started component %s, %s",
262 self.start_comp_xpath, start_msg)
263
264
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
267 def __init__(self,
268 dts,
269 log,
270 loop,
271 vdud,
272 vnfr,
273 mgmt_intf,
274 mgmt_network,
275 cloud_account_name,
276 vnfd_package_store,
277 vdur_id=None,
278 placement_groups=[]):
279 self._dts = dts
280 self._log = log
281 self._loop = loop
282 self._vdud = vdud
283 self._vnfr = vnfr
284 self._mgmt_intf = mgmt_intf
285 self._cloud_account_name = cloud_account_name
286 self._vnfd_package_store = vnfd_package_store
287 self._mgmt_network = mgmt_network
288
289 self._vdur_id = vdur_id or str(uuid.uuid4())
290 self._int_intf = []
291 self._ext_intf = []
292 self._state = VDURecordState.INIT
293 self._state_failed_reason = None
294 self._request_id = str(uuid.uuid4())
295 self._name = vnfr.name + "__" + vdud.id
296 self._placement_groups = placement_groups
297 self._rm_regh = None
298 self._vm_resp = None
299 self._vdud_cloud_init = None
300 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
301
302 @asyncio.coroutine
303 def vdu_opdata_register(self):
304 yield from self._vdur_console_handler.register()
305
306 def cp_ip_addr(self, cp_name):
307 """ Find ip address by connection point name """
308 if self._vm_resp is not None:
309 for conn_point in self._vm_resp.connection_points:
310 if conn_point.name == cp_name:
311 return conn_point.ip_address
312 return "0.0.0.0"
313
314 def cp_id(self, cp_name):
315 """ Find connection point id by connection point name """
316 if self._vm_resp is not None:
317 for conn_point in self._vm_resp.connection_points:
318 if conn_point.name == cp_name:
319 return conn_point.connection_point_id
320 return ''
321
322 @property
323 def vdu_id(self):
324 return self._vdud.id
325
326 @property
327 def vm_resp(self):
328 return self._vm_resp
329
330 @property
331 def name(self):
332 """ Return this VDUR's name """
333 return self._name
334
335 @property
336 def cloud_account_name(self):
337 """ Cloud account this VDU should be created in """
338 return self._cloud_account_name
339
340 @property
341 def image_name(self):
342 """ name that should be used to lookup the image on the CMP """
343 if 'image' not in self._vdud:
344 return None
345 return os.path.basename(self._vdud.image)
346
347 @property
348 def image_checksum(self):
349 """ name that should be used to lookup the image on the CMP """
350 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
351
352 @property
353 def management_ip(self):
354 if not self.active:
355 return None
356 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
357
358 @property
359 def vm_management_ip(self):
360 if not self.active:
361 return None
362 return self._vm_resp.management_ip
363
364 @property
365 def operational_status(self):
366 """ Operational status of this VDU"""
367 op_stats_dict = {"INIT": "init",
368 "INSTANTIATING": "vm_init_phase",
369 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
370 "READY": "running",
371 "FAILED": "failed",
372 "TERMINATING": "terminated",
373 "TERMINATED": "terminated",
374 }
375 return op_stats_dict[self._state.name]
376
377 @property
378 def msg(self):
379 """ VDU message """
380 vdu_fields = ["vm_flavor",
381 "guest_epa",
382 "vswitch_epa",
383 "hypervisor_epa",
384 "host_epa",
385 "volumes",
386 "name"]
387 vdu_copy_dict = {k: v for k, v in
388 self._vdud.as_dict().items() if k in vdu_fields}
389 vdur_dict = {"id": self._vdur_id,
390 "vdu_id_ref": self._vdud.id,
391 "operational_status": self.operational_status,
392 "operational_status_details": self._state_failed_reason,
393 }
394 if self.vm_resp is not None:
395 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
396 "flavor_id": self.vm_resp.flavor_id
397 })
398 if self._vm_resp.has_field('image_id'):
399 vdur_dict.update({ "image_id": self.vm_resp.image_id })
400
401 if self.management_ip is not None:
402 vdur_dict["management_ip"] = self.management_ip
403
404 if self.vm_management_ip is not None:
405 vdur_dict["vm_management_ip"] = self.vm_management_ip
406
407 vdur_dict.update(vdu_copy_dict)
408
409 if self.vm_resp is not None:
410 if self._vm_resp.has_field('volumes'):
411 for opvolume in self._vm_resp.volumes:
412 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
413 if len(vdurvol_data) == 1:
414 vdurvol_data[0]["volume_id"] = opvolume.volume_id
415
416 icp_list = []
417 ii_list = []
418
419 for intf, cp_id, vlr in self._int_intf:
420 cp = self.find_internal_cp_by_cp_id(cp_id)
421
422 icp_list.append({"name": cp.name,
423 "id": cp.id,
424 "type_yang": "VPORT",
425 "ip_address": self.cp_ip_addr(cp.id)})
426
427 ii_list.append({"name": intf.name,
428 "vdur_internal_connection_point_ref": cp.id,
429 "virtual_interface": {}})
430
431 vdur_dict["internal_connection_point"] = icp_list
432 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
433 vdur_dict["internal_interface"] = ii_list
434
435 ei_list = []
436 for intf, cp, vlr in self._ext_intf:
437 ei_list.append({"name": cp,
438 "vnfd_connection_point_ref": cp,
439 "virtual_interface": {}})
440 self._vnfr.update_cp(cp, self.cp_ip_addr(cp), self.cp_id(cp))
441
442 vdur_dict["external_interface"] = ei_list
443
444 placement_groups = []
445 for group in self._placement_groups:
446 placement_groups.append(group.as_dict())
447 vdur_dict['placement_groups_info'] = placement_groups
448
449 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
450
451 @property
452 def resmgr_path(self):
453 """ path for resource-mgr"""
454 return ("D,/rw-resource-mgr:resource-mgmt" +
455 "/vdu-event" +
456 "/vdu-event-data[event-id='{}']".format(self._request_id))
457
458 @property
459 def vm_flavor_msg(self):
460 """ VM flavor message """
461 flavor = self._vdud.vm_flavor.__class__()
462 flavor.copy_from(self._vdud.vm_flavor)
463
464 return flavor
465
466 @property
467 def vdud_cloud_init(self):
468 """ Return the cloud-init contents for the VDU """
469 if self._vdud_cloud_init is None:
470 self._vdud_cloud_init = self.cloud_init()
471
472 return self._vdud_cloud_init
473
474 def cloud_init(self):
475 """ Populate cloud_init with cloud-config script from
476 either the inline contents or from the file provided
477 """
478 if self._vdud.cloud_init is not None:
479 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
480 return self._vdud.cloud_init
481 elif self._vdud.cloud_init_file is not None:
482 # Get cloud-init script contents from the file provided in the cloud_init_file param
483 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
484 filename = self._vdud.cloud_init_file
485 self._vnfd_package_store.refresh()
486 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
487 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
488 try:
489 return cloud_init_extractor.read_script(stored_package, filename)
490 except rift.package.cloud_init.CloudInitExtractionError as e:
491 raise VirtualDeploymentUnitRecordError(e)
492 else:
493 self._log.debug("VDU Instantiation: cloud-init script not provided")
494
495 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
496 host_aggregates = []
497 availability_zones = []
498 server_groups = []
499 for group in self._placement_groups:
500 if group.has_field('host_aggregate'):
501 for aggregate in group.host_aggregate:
502 host_aggregates.append(aggregate.as_dict())
503 if group.has_field('availability_zone'):
504 availability_zones.append(group.availability_zone.as_dict())
505 if group.has_field('server_group'):
506 server_groups.append(group.server_group.as_dict())
507
508 if availability_zones:
509 if len(availability_zones) > 1:
510 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
511 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
512 else:
513 vm_create_msg_dict['availability_zone'] = availability_zones[0]
514
515 if server_groups:
516 if len(server_groups) > 1:
517 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
518 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
519 else:
520 vm_create_msg_dict['server_group'] = server_groups[0]
521
522 if host_aggregates:
523 vm_create_msg_dict['host_aggregate'] = host_aggregates
524
525 return
526
527 def process_placement_groups(self, vm_create_msg_dict):
528 """Process the placement_groups and fill resource-mgr request"""
529 if not self._placement_groups:
530 return
531
532 cloud_set = set([group.cloud_type for group in self._placement_groups])
533 assert len(cloud_set) == 1
534 cloud_type = cloud_set.pop()
535
536 if cloud_type == 'openstack':
537 self.process_openstack_placement_group_construct(vm_create_msg_dict)
538
539 else:
540 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
541 return
542
543 def resmgr_msg(self, config=None):
544 vdu_fields = ["vm_flavor",
545 "guest_epa",
546 "vswitch_epa",
547 "hypervisor_epa",
548 "host_epa"]
549
550 self._log.debug("Creating params based on VDUD: %s", self._vdud)
551 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
552
553 vm_create_msg_dict = {
554 "name": self.name,
555 }
556
557 if self.image_name is not None:
558 vm_create_msg_dict["image_name"] = self.image_name
559
560 if self.image_checksum is not None:
561 vm_create_msg_dict["image_checksum"] = self.image_checksum
562
563 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
564 if self._vdud.has_field('mgmt_vpci'):
565 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
566
567 self._log.debug("VDUD: %s", self._vdud)
568 if config is not None:
569 vm_create_msg_dict['vdu_init'] = {'userdata': config}
570
571 if self._mgmt_network:
572 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
573
574 cp_list = []
575 for intf, cp, vlr in self._ext_intf:
576 cp_info = {"name": cp,
577 "virtual_link_id": vlr.network_id,
578 "type_yang": intf.virtual_interface.type_yang}
579
580 if (intf.virtual_interface.has_field('vpci') and
581 intf.virtual_interface.vpci is not None):
582 cp_info["vpci"] = intf.virtual_interface.vpci
583
584 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
585 cp_info['security_group'] = vlr.ip_profile_params.security_group
586
587 cp_list.append(cp_info)
588
589 for intf, cp, vlr in self._int_intf:
590 if (intf.virtual_interface.has_field('vpci') and
591 intf.virtual_interface.vpci is not None):
592 cp_list.append({"name": cp,
593 "virtual_link_id": vlr.network_id,
594 "type_yang": intf.virtual_interface.type_yang,
595 "vpci": intf.virtual_interface.vpci})
596 else:
597 cp_list.append({"name": cp,
598 "virtual_link_id": vlr.network_id,
599 "type_yang": intf.virtual_interface.type_yang})
600
601 vm_create_msg_dict["connection_points"] = cp_list
602 vm_create_msg_dict.update(vdu_copy_dict)
603
604 self.process_placement_groups(vm_create_msg_dict)
605
606 msg = RwResourceMgrYang.VDUEventData()
607 msg.event_id = self._request_id
608 msg.cloud_account = self.cloud_account_name
609 msg.request_info.from_dict(vm_create_msg_dict)
610
611 for volume in self._vdud.volumes:
612 v = msg.request_info.volumes.add()
613 v.from_dict(volume.as_dict())
614 return msg
615
616 @asyncio.coroutine
617 def terminate(self, xact):
618 """ Delete resource in VIM """
619 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
620 self._log.warning("VDU terminate in not ready state - Ignoring request")
621 return
622
623 self._state = VDURecordState.TERMINATING
624 if self._vm_resp is not None:
625 try:
626 with self._dts.transaction() as new_xact:
627 yield from self.delete_resource(new_xact)
628 except Exception:
629 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
630
631 if self._rm_regh is not None:
632 self._log.debug("Deregistering resource manager registration handle")
633 self._rm_regh.deregister()
634 self._rm_regh = None
635
636 if self._vdur_console_handler is not None:
637 self._log.error("Deregistering vnfr vdur registration handle")
638 self._vdur_console_handler._regh.deregister()
639 self._vdur_console_handler._regh = None
640
641 self._state = VDURecordState.TERMINATED
642
643 def find_internal_cp_by_cp_id(self, cp_id):
644 """ Find the CP corresponding to the connection point id"""
645 cp = None
646
647 self._log.debug("find_internal_cp_by_cp_id(%s) called",
648 cp_id)
649
650 for int_cp in self._vdud.internal_connection_point:
651 self._log.debug("Checking for int cp %s in internal connection points",
652 int_cp.id)
653 if int_cp.id == cp_id:
654 cp = int_cp
655 break
656
657 if cp is None:
658 self._log.debug("Failed to find cp %s in internal connection points",
659 cp_id)
660 msg = "Failed to find cp %s in internal connection points" % cp_id
661 raise VduRecordError(msg)
662
663 # return the VLR associated with the connection point
664 return cp
665
666 @asyncio.coroutine
667 def create_resource(self, xact, vnfr, config=None):
668 """ Request resource from ResourceMgr """
669 def find_cp_by_name(cp_name):
670 """ Find a connection point by name """
671 cp = None
672 self._log.debug("find_cp_by_name(%s) called", cp_name)
673 for ext_cp in vnfr._cprs:
674 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
675 if ext_cp.name == cp_name:
676 cp = ext_cp
677 break
678 if cp is None:
679 self._log.debug("Failed to find cp %s in external connection points",
680 cp_name)
681 return cp
682
683 def find_internal_vlr_by_cp_name(cp_name):
684 """ Find the VLR corresponding to the connection point name"""
685 cp = None
686
687 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
688 cp_name)
689
690 for int_cp in self._vdud.internal_connection_point:
691 self._log.debug("Checking for int cp %s in internal connection points",
692 int_cp.id)
693 if int_cp.id == cp_name:
694 cp = int_cp
695 break
696
697 if cp is None:
698 self._log.debug("Failed to find cp %s in internal connection points",
699 cp_name)
700 msg = "Failed to find cp %s in internal connection points" % cp_name
701 raise VduRecordError(msg)
702
703 # return the VLR associated with the connection point
704 return vnfr.find_vlr_by_cp(cp_name)
705
706 block = xact.block_create()
707
708 self._log.debug("Executing vm request id: %s, action: create",
709 self._request_id)
710
711 # Resolve the networks associated external interfaces
712 for ext_intf in self._vdud.external_interface:
713 self._log.debug("Resolving external interface name [%s], cp[%s]",
714 ext_intf.name, ext_intf.vnfd_connection_point_ref)
715 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
716 if cp is None:
717 self._log.debug("Failed to find connection point - %s",
718 ext_intf.vnfd_connection_point_ref)
719 continue
720 self._log.debug("Connection point name [%s], type[%s]",
721 cp.name, cp.type_yang)
722
723 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
724
725 etuple = (ext_intf, cp.name, vlr)
726 self._ext_intf.append(etuple)
727
728 self._log.debug("Created external interface tuple : %s", etuple)
729
730 # Resolve the networks associated internal interfaces
731 for intf in self._vdud.internal_interface:
732 cp_id = intf.vdu_internal_connection_point_ref
733 self._log.debug("Resolving internal interface name [%s], cp[%s]",
734 intf.name, cp_id)
735
736 try:
737 vlr = find_internal_vlr_by_cp_name(cp_id)
738 except Exception as e:
739 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
740 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
741 raise VduRecordError(msg)
742
743 ituple = (intf, cp_id, vlr)
744 self._int_intf.append(ituple)
745
746 self._log.debug("Created internal interface tuple : %s", ituple)
747
748 resmgr_path = self.resmgr_path
749 resmgr_msg = self.resmgr_msg(config)
750
751 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
752 block.add_query_create(resmgr_path, resmgr_msg)
753
754 res_iter = yield from block.execute(now=True)
755
756 resp = None
757
758 for i in res_iter:
759 r = yield from i
760 resp = r.result
761
762 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
763 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
764 self._log.debug("Got vm request response: %s", resp.resource_info)
765 return resp.resource_info
766
767 @asyncio.coroutine
768 def delete_resource(self, xact):
769 block = xact.block_create()
770
771 self._log.debug("Executing vm request id: %s, action: delete",
772 self._request_id)
773
774 block.add_query_delete(self.resmgr_path)
775
776 yield from block.execute(flags=0, now=True)
777
778 @asyncio.coroutine
779 def read_resource(self, xact):
780 block = xact.block_create()
781
782 self._log.debug("Executing vm request id: %s, action: delete",
783 self._request_id)
784
785 block.add_query_read(self.resmgr_path)
786
787 res_iter = yield from block.execute(flags=0, now=True)
788 for i in res_iter:
789 r = yield from i
790 resp = r.result
791
792 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
793 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
794 self._log.debug("Got vm request response: %s", resp.resource_info)
795 #self._vm_resp = resp.resource_info
796 return resp.resource_info
797
798
799 @asyncio.coroutine
800 def start_component(self):
801 """ This VDUR is active """
802 self._log.debug("Starting component %s for vdud %s vdur %s",
803 self._vdud.vcs_component_ref,
804 self._vdud,
805 self._vdur_id)
806 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
807 self.vm_resp.management_ip)
808
809 @property
810 def active(self):
811 """ Is this VDU active """
812 return True if self._state is VDURecordState.READY else False
813
814 @asyncio.coroutine
815 def instantiation_failed(self, failed_reason=None):
816 """ VDU instantiation failed """
817 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
818 self._state = VDURecordState.FAILED
819 self._state_failed_reason = failed_reason
820 yield from self._vnfr.instantiation_failed(failed_reason)
821
822 @asyncio.coroutine
823 def vdu_is_active(self):
824 """ This VDU is active"""
825 if self.active:
826 self._log.warning("VDU %s was already marked as active", self._vdur_id)
827 return
828
829 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
830
831 if self._vdud.vcs_component_ref is not None:
832 yield from self.start_component()
833
834 self._state = VDURecordState.READY
835
836 if self._vnfr.all_vdus_active():
837 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
838 yield from self._vnfr.is_ready()
839
840 @asyncio.coroutine
841 def instantiate(self, xact, vnfr, config=None):
842 """ Instantiate this VDU """
843 self._state = VDURecordState.INSTANTIATING
844
845 @asyncio.coroutine
846 def on_prepare(xact_info, query_action, ks_path, msg):
847 """ This VDUR is active """
848 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
849 query_action,
850 ks_path,
851 msg)
852
853 if (query_action == rwdts.QueryAction.UPDATE or
854 query_action == rwdts.QueryAction.CREATE):
855 self._vm_resp = msg
856
857 if msg.resource_state == "active":
858 # Move this VDU to ready state
859 yield from self.vdu_is_active()
860 elif msg.resource_state == "failed":
861 yield from self.instantiation_failed(msg.resource_errors)
862 elif query_action == rwdts.QueryAction.DELETE:
863 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
864 else:
865 raise NotImplementedError(
866 "%s action on VirtualDeployementUnitRecord not supported",
867 query_action)
868
869 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
870
871 try:
872 reg_event = asyncio.Event(loop=self._loop)
873
874 @asyncio.coroutine
875 def on_ready(regh, status):
876 reg_event.set()
877
878 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
879 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
880 flags=rwdts.Flag.SUBSCRIBER,
881 handler=handler)
882 yield from reg_event.wait()
883
884 vm_resp = yield from self.create_resource(xact, vnfr, config)
885 self._vm_resp = vm_resp
886
887 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
888 self._log.debug("Requested VM from resource manager response %s",
889 vm_resp)
890 if vm_resp.resource_state == "active":
891 self._log.debug("Resourcemgr responded wih an active vm resp %s",
892 vm_resp)
893 yield from self.vdu_is_active()
894 self._state = VDURecordState.READY
895 elif (vm_resp.resource_state == "pending" or
896 vm_resp.resource_state == "inactive"):
897 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
898 vm_resp)
899 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
900 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
901 # flags=rwdts.Flag.SUBSCRIBER,
902 # handler=handler)
903 else:
904 self._log.debug("Resourcemgr responded wih an error vm resp %s",
905 vm_resp)
906 raise VirtualDeploymentUnitRecordError(
907 "Failed VDUR instantiation %s " % vm_resp)
908
909 except Exception as e:
910 import traceback
911 traceback.print_exc()
912 self._log.exception(e)
913 self._log.error("Instantiation of VDU record failed: %s", str(e))
914 self._state = VDURecordState.FAILED
915 yield from self.instantiation_failed(str(e))
916
917
918 class VlRecordState(enum.Enum):
919 """ VL Record State """
920 INIT = 101
921 INSTANTIATION_PENDING = 102
922 ACTIVE = 103
923 TERMINATE_PENDING = 104
924 TERMINATED = 105
925 FAILED = 106
926
927
928 class InternalVirtualLinkRecord(object):
929 """ Internal Virtual Link record """
930 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
931 self._dts = dts
932 self._log = log
933 self._loop = loop
934 self._ivld_msg = ivld_msg
935 self._vnfr_name = vnfr_name
936 self._cloud_account_name = cloud_account_name
937
938 self._vlr_req = self.create_vlr()
939 self._vlr = None
940 self._state = VlRecordState.INIT
941
942 @property
943 def vlr_id(self):
944 """ Find VLR by id """
945 return self._vlr_req.id
946
947 @property
948 def name(self):
949 """ Name of this VL """
950 return self._vnfr_name + "." + self._ivld_msg.name
951
952 @property
953 def network_id(self):
954 """ Find VLR by id """
955 return self._vlr.network_id if self._vlr else None
956
957 def vlr_path(self):
958 """ VLR path for this VLR instance"""
959 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
960
961 def create_vlr(self):
962 """ Create the VLR record which will be instantiated """
963
964 vld_fields = ["short_name",
965 "vendor",
966 "description",
967 "version",
968 "type_yang",
969 "provider_network"]
970
971 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
972
973 vlr_dict = {"id": str(uuid.uuid4()),
974 "name": self.name,
975 "cloud_account": self._cloud_account_name,
976 }
977 vlr_dict.update(vld_copy_dict)
978
979 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
980 return vlr
981
982 @asyncio.coroutine
983 def instantiate(self, xact, restart_mode=False):
984 """ Instantiate VL """
985
986 @asyncio.coroutine
987 def instantiate_vlr():
988 """ Instantiate VLR"""
989 self._log.debug("Create VL with xpath %s and vlr %s",
990 self.vlr_path(), self._vlr_req)
991
992 with self._dts.transaction(flags=0) as xact:
993 block = xact.block_create()
994 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
995 self._log.debug("Executing VL create path:%s msg:%s",
996 self.vlr_path(), self._vlr_req)
997
998 res_iter = None
999 try:
1000 res_iter = yield from block.execute()
1001 except Exception:
1002 self._state = VlRecordState.FAILED
1003 self._log.exception("Caught exception while instantial VL")
1004 raise
1005
1006 for ent in res_iter:
1007 res = yield from ent
1008 self._vlr = res.result
1009
1010 if self._vlr.operational_status == 'failed':
1011 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1012 self._state = VlRecordState.FAILED
1013 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1014
1015 self._log.info("Created VL with xpath %s and vlr %s",
1016 self.vlr_path(), self._vlr)
1017
1018 @asyncio.coroutine
1019 def get_vlr():
1020 """ Get the network id """
1021 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1022 vlr = None
1023 for ent in res_iter:
1024 res = yield from ent
1025 vlr = res.result
1026
1027 if vlr is None:
1028 err = "Failed to get VLR for path %s" % self.vlr_path()
1029 self._log.warn(err)
1030 raise InternalVirtualLinkRecordError(err)
1031 return vlr
1032
1033 self._state = VlRecordState.INSTANTIATION_PENDING
1034
1035 if restart_mode:
1036 vl = yield from get_vlr()
1037 if vl is None:
1038 yield from instantiate_vlr()
1039 else:
1040 yield from instantiate_vlr()
1041
1042 self._state = VlRecordState.ACTIVE
1043
1044 def vlr_in_vns(self):
1045 """ Is there a VLR record in VNS """
1046 if (self._state == VlRecordState.ACTIVE or
1047 self._state == VlRecordState.INSTANTIATION_PENDING or
1048 self._state == VlRecordState.FAILED):
1049 return True
1050
1051 return False
1052
1053 @asyncio.coroutine
1054 def terminate(self, xact):
1055 """Terminate this VL """
1056 if not self.vlr_in_vns():
1057 self._log.debug("Ignoring terminate request for id %s in state %s",
1058 self.vlr_id, self._state)
1059 return
1060
1061 self._log.debug("Terminating VL with path %s", self.vlr_path())
1062 self._state = VlRecordState.TERMINATE_PENDING
1063 block = xact.block_create()
1064 block.add_query_delete(self.vlr_path())
1065 yield from block.execute(flags=0, now=True)
1066 self._state = VlRecordState.TERMINATED
1067 self._log.debug("Terminated VL with path %s", self.vlr_path())
1068
1069
1070 class VirtualNetworkFunctionRecord(object):
1071 """ Virtual Network Function Record """
1072 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1073 self._dts = dts
1074 self._log = log
1075 self._loop = loop
1076 self._cluster_name = cluster_name
1077 self._vnfr_msg = vnfr_msg
1078 self._vnfr_id = vnfr_msg.id
1079 self._vnfd_id = vnfr_msg.vnfd.id
1080 self._vnfm = vnfm
1081 self._vcs_handler = vcs_handler
1082 self._vnfr = vnfr_msg
1083 self._mgmt_network = mgmt_network
1084
1085 self._vnfd = vnfr_msg.vnfd
1086 self._state = VirtualNetworkFunctionRecordState.INIT
1087 self._state_failed_reason = None
1088 self._ext_vlrs = {} # The list of external virtual links
1089 self._vlrs = [] # The list of internal virtual links
1090 self._vdus = [] # The list of vdu
1091 self._vlr_by_cp = {}
1092 self._cprs = []
1093 self._inventory = {}
1094 self._create_time = int(time.time())
1095 self._vnf_mon = None
1096 self._config_status = vnfr_msg.config_status
1097 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1098 self._rw_vnfd = None
1099 self._vnfd_ref_count = 0
1100
1101 def _get_vdur_from_vdu_id(self, vdu_id):
1102 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1103 self._log.debug("Searching through vdus: %s", self._vdus)
1104 for vdu in self._vdus:
1105 self._log.debug("vdu_id: %s", vdu.vdu_id)
1106 if vdu.vdu_id == vdu_id:
1107 return vdu
1108
1109 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1110
1111 @property
1112 def operational_status(self):
1113 """ Operational status of this VNFR """
1114 op_status_map = {"INIT": "init",
1115 "VL_INIT_PHASE": "vl_init_phase",
1116 "VM_INIT_PHASE": "vm_init_phase",
1117 "READY": "running",
1118 "TERMINATE": "terminate",
1119 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1120 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1121 "TERMINATED": "terminated",
1122 "FAILED": "failed", }
1123 return op_status_map[self._state.name]
1124
1125 @staticmethod
1126 def vnfd_xpath(vnfd_id):
1127 """ VNFD xpath associated with this VNFR """
1128 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1129
1130 @property
1131 def vnfd_ref_count(self):
1132 """ Returns the VNFD reference count associated with this VNFR """
1133 return self._vnfd_ref_count
1134
1135 def vnfd_in_use(self):
1136 """ Returns whether vnfd is in use or not """
1137 return True if self._vnfd_ref_count > 0 else False
1138
1139 def vnfd_ref(self):
1140 """ Take a reference on this object """
1141 self._vnfd_ref_count += 1
1142 return self._vnfd_ref_count
1143
1144 def vnfd_unref(self):
1145 """ Release reference on this object """
1146 if self._vnfd_ref_count < 1:
1147 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1148 (self.vnfd.id, self._vnfd_ref_count))
1149 self._log.critical(msg)
1150 raise VnfRecordError(msg)
1151 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1152 self.vnfd.id, self._vnfd_ref_count)
1153 self._vnfd_ref_count -= 1
1154 return self._vnfd_ref_count
1155
1156 @property
1157 def vnfd(self):
1158 """ VNFD for this VNFR """
1159 return self._vnfd
1160
1161 @property
1162 def vnf_name(self):
1163 """ VNFD name associated with this VNFR """
1164 return self.vnfd.name
1165
1166 @property
1167 def name(self):
1168 """ Name of this VNF in the record """
1169 return self._vnfr.name
1170
1171 @property
1172 def cloud_account_name(self):
1173 """ Name of the cloud account this VNFR is instantiated in """
1174 return self._vnfr.cloud_account
1175
1176 @property
1177 def vnfd_id(self):
1178 """ VNFD Id associated with this VNFR """
1179 return self.vnfd.id
1180
1181 @property
1182 def vnfr_id(self):
1183 """ VNFR Id associated with this VNFR """
1184 return self._vnfr_id
1185
1186 @property
1187 def member_vnf_index(self):
1188 """ Member VNF index associated with this VNFR """
1189 return self._vnfr.member_vnf_index_ref
1190
1191 @property
1192 def config_status(self):
1193 """ Config agent status for this VNFR """
1194 return self._config_status
1195
1196 def component_by_name(self, component_name):
1197 """ Find a component by name in the inventory list"""
1198 mangled_name = VcsComponent.mangle_name(component_name,
1199 self.vnf_name,
1200 self.vnfd_id)
1201 return self._inventory[mangled_name]
1202
1203
1204
1205 @asyncio.coroutine
1206 def get_nsr_config(self):
1207 ### Need access to NS instance configuration for runtime resolution.
1208 ### This shall be replaced when deployment flavors are implemented
1209 xpath = "C,/nsr:ns-instance-config"
1210 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1211
1212 for result in results:
1213 entry = yield from result
1214 ns_instance_config = entry.result
1215 for nsr in ns_instance_config.nsr:
1216 if nsr.id == self._vnfr_msg.nsr_id_ref:
1217 return nsr
1218 return None
1219
1220 @asyncio.coroutine
1221 def start_component(self, component_name, ip_addr):
1222 """ Start a component in the VNFR by name """
1223 comp = self.component_by_name(component_name)
1224 yield from comp.start(None, None, ip_addr)
1225
1226 def cp_ip_addr(self, cp_name):
1227 """ Get ip address for connection point """
1228 self._log.debug("cp_ip_addr()")
1229 for cp in self._cprs:
1230 if cp.name == cp_name and cp.ip_address is not None:
1231 return cp.ip_address
1232 return "0.0.0.0"
1233
1234 def mgmt_intf_info(self):
1235 """ Get Management interface info for this VNFR """
1236 mgmt_intf_desc = self.vnfd.mgmt_interface
1237 ip_addr = None
1238 if mgmt_intf_desc.has_field("cp"):
1239 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1240 elif mgmt_intf_desc.has_field("vdu_id"):
1241 try:
1242 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1243 ip_addr = vdur.management_ip
1244 except VDURecordNotFound:
1245 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1246 ip_addr = None
1247 else:
1248 ip_addr = mgmt_intf_desc.ip_address
1249 port = mgmt_intf_desc.port
1250
1251 return ip_addr, port
1252
1253 @property
1254 def msg(self):
1255 """ Message associated with this VNFR """
1256 vnfd_fields = ["short_name", "vendor", "description", "version"]
1257 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1258
1259 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1260 ip_address, port = self.mgmt_intf_info()
1261
1262 if ip_address is not None:
1263 mgmt_intf.ip_address = ip_address
1264 if port is not None:
1265 mgmt_intf.port = port
1266
1267 vnfr_dict = {"id": self._vnfr_id,
1268 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1269 "name": self.name,
1270 "member_vnf_index_ref": self.member_vnf_index,
1271 "operational_status": self.operational_status,
1272 "operational_status_details": self._state_failed_reason,
1273 "cloud_account": self.cloud_account_name,
1274 "config_status": self._config_status
1275 }
1276
1277 vnfr_dict.update(vnfd_copy_dict)
1278
1279 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1280 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1281
1282 vnfr_msg.create_time = self._create_time
1283 vnfr_msg.uptime = int(time.time()) - self._create_time
1284 vnfr_msg.mgmt_interface = mgmt_intf
1285
1286 # Add all the VLRs to VNFR
1287 for vlr in self._vlrs:
1288 ivlr = vnfr_msg.internal_vlr.add()
1289 ivlr.vlr_ref = vlr.vlr_id
1290
1291 # Add all the VDURs to VDUR
1292 if self._vdus is not None:
1293 for vdu in self._vdus:
1294 vdur = vnfr_msg.vdur.add()
1295 vdur.from_dict(vdu.msg.as_dict())
1296
1297 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1298 vnfr_msg.dashboard_url = self.dashboard_url
1299
1300 for cpr in self._cprs:
1301 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1302 vnfr_msg.connection_point.append(new_cp)
1303
1304 if self._vnf_mon is not None:
1305 for monp in self._vnf_mon.msg:
1306 vnfr_msg.monitoring_param.append(
1307 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1308
1309 if self._vnfr.vnf_configuration is not None:
1310 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1311 if (ip_address is not None and
1312 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1313 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1314
1315 for group in self._vnfr_msg.placement_groups_info:
1316 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1317 group_info.from_dict(group.as_dict())
1318 vnfr_msg.placement_groups_info.append(group_info)
1319
1320 return vnfr_msg
1321
1322 @property
1323 def dashboard_url(self):
1324 ip, cfg_port = self.mgmt_intf_info()
1325 protocol = 'http'
1326 http_port = 80
1327 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1328 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1329 protocol = 'https'
1330 http_port = 443
1331 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1332 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1333
1334 url = "{protocol}://{ip_address}:{port}/{path}".format(
1335 protocol=protocol,
1336 ip_address=ip,
1337 port=http_port,
1338 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1339 )
1340
1341 return url
1342
1343 @property
1344 def xpath(self):
1345 """ path for this VNFR """
1346 return("D,/vnfr:vnfr-catalog"
1347 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1348
1349 @asyncio.coroutine
1350 def publish(self, xact):
1351 """ publish this VNFR """
1352 vnfr = self.msg
1353 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1354 self.xpath, self.msg)
1355 vnfr.create_time = self._create_time
1356 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1357 self._log.debug("Published VNFR path = [%s], record = [%s]",
1358 self.xpath, self.msg)
1359
1360 @asyncio.coroutine
1361 def create_vls(self):
1362 """ Publish The VLs associated with this VNF """
1363 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1364 self.vnfd_id)
1365 for ivld_msg in self.vnfd.internal_vld:
1366 self._log.debug("Creating internal vld:"
1367 " %s, int_cp_ref = %s",
1368 ivld_msg, ivld_msg.internal_connection_point
1369 )
1370 vlr = InternalVirtualLinkRecord(dts=self._dts,
1371 log=self._log,
1372 loop=self._loop,
1373 ivld_msg=ivld_msg,
1374 vnfr_name=self.name,
1375 cloud_account_name=self.cloud_account_name
1376 )
1377 self._vlrs.append(vlr)
1378
1379 for int_cp in ivld_msg.internal_connection_point:
1380 if int_cp.id_ref in self._vlr_by_cp:
1381 msg = ("Connection point %s already "
1382 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1383 raise InternalVirtualLinkRecordError(msg)
1384 self._log.debug("Setting vlr %s to internal cp = %s",
1385 vlr, int_cp.id_ref)
1386 self._vlr_by_cp[int_cp.id_ref] = vlr
1387
1388 @asyncio.coroutine
1389 def instantiate_vls(self, xact, restart_mode=False):
1390 """ Instantiate the VLs associated with this VNF """
1391 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1392 self.vnfd_id)
1393
1394 for vlr in self._vlrs:
1395 self._log.debug("Instantiating VLR %s", vlr)
1396 yield from vlr.instantiate(xact, restart_mode)
1397
1398 def find_vlr_by_cp(self, cp_name):
1399 """ Find the VLR associated with the cp name """
1400 return self._vlr_by_cp[cp_name]
1401
1402 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1403 """
1404 Returns the cloud specific construct for placement group
1405 Arguments:
1406 input_group: VNFD PlacementGroup
1407 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1408 """
1409 copy_dict = ['name', 'requirement', 'strategy']
1410 for group_info in nsr_config.vnfd_placement_group_maps:
1411 if group_info.placement_group_ref == input_group.name and \
1412 group_info.vnfd_id_ref == self.vnfd_id:
1413 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1414 group_dict = {k:v for k,v in
1415 group_info.as_dict().items()
1416 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1417 for param in copy_dict:
1418 group_dict.update({param: getattr(input_group, param)})
1419 group.from_dict(group_dict)
1420 return group
1421 return None
1422
1423 @asyncio.coroutine
1424 def get_vdu_placement_groups(self, vdu):
1425 placement_groups = []
1426 ### Step-1: Get VNF level placement groups
1427 for group in self._vnfr_msg.placement_groups_info:
1428 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1429 #group_info.from_dict(group.as_dict())
1430 placement_groups.append(group)
1431
1432 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1433 nsr_config = yield from self.get_nsr_config()
1434
1435 ### Step-3: Get VDU level placement groups
1436 for group in self.vnfd.placement_groups:
1437 for member_vdu in group.member_vdus:
1438 if member_vdu.member_vdu_ref == vdu.id:
1439 group_info = self.resolve_placement_group_cloud_construct(group,
1440 nsr_config)
1441 if group_info is None:
1442 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1443 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1444 else:
1445 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1446 str(group_info),
1447 vdu.name,
1448 self.vnf_name,
1449 self.member_vnf_index)
1450 placement_groups.append(group_info)
1451
1452 return placement_groups
1453
1454 @asyncio.coroutine
1455 def create_vdus(self, vnfr, restart_mode=False):
1456 """ Create the VDUs associated with this VNF """
1457
1458 def get_vdur_id(vdud):
1459 """Get the corresponding VDUR's id for the VDUD. This is useful in
1460 case of a restart.
1461
1462 In restart mode we check for exiting VDUR's ID and use them, if
1463 available. This way we don't end up creating duplicate VDURs
1464 """
1465 vdur_id = None
1466
1467 if restart_mode and vdud is not None:
1468 try:
1469 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1470 vdur_id = vdur[0]
1471 except IndexError:
1472 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1473
1474 return vdur_id
1475
1476
1477 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1478 for vdu in self._rw_vnfd.vdu:
1479 self._log.debug("Creating vdu: %s", vdu)
1480 vdur_id = get_vdur_id(vdu)
1481
1482 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1483 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1484 vdu.name,
1485 self.vnf_name,
1486 self.member_vnf_index,
1487 [ group.name for group in placement_groups])
1488
1489 vdur = VirtualDeploymentUnitRecord(
1490 dts=self._dts,
1491 log=self._log,
1492 loop=self._loop,
1493 vdud=vdu,
1494 vnfr=vnfr,
1495 mgmt_intf=self.has_mgmt_interface(vdu),
1496 mgmt_network=self._mgmt_network,
1497 cloud_account_name=self.cloud_account_name,
1498 vnfd_package_store=self._vnfd_package_store,
1499 vdur_id=vdur_id,
1500 placement_groups = placement_groups,
1501 )
1502 yield from vdur.vdu_opdata_register()
1503
1504 self._vdus.append(vdur)
1505
1506 @asyncio.coroutine
1507 def instantiate_vdus(self, xact, vnfr):
1508 """ Instantiate the VDUs associated with this VNF """
1509 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1510
1511 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1512
1513 # Identify any dependencies among the VDUs
1514 dependencies = collections.defaultdict(list)
1515 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1516
1517 for vdu in self._vdus:
1518 if vdu.vdud_cloud_init is not None:
1519 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1520 if vdu_id != vdu.vdu_id:
1521 # This means that vdu.vdu_id depends upon vdu_id,
1522 # i.e. vdu_id must be instantiated before
1523 # vdu.vdu_id.
1524 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1525
1526 # Define the terminal states of VDU instantiation
1527 terminal = (
1528 VDURecordState.READY,
1529 VDURecordState.TERMINATED,
1530 VDURecordState.FAILED,
1531 )
1532
1533 datastore = VdurDatastore()
1534 processed = set()
1535
1536 @asyncio.coroutine
1537 def instantiate_monitor(vdu):
1538 """Monitor the state of the VDU during instantiation
1539
1540 Arguments:
1541 vdu - a VirtualDeploymentUnitRecord
1542
1543 """
1544 # wait for the VDUR to enter a terminal state
1545 while vdu._state not in terminal:
1546 yield from asyncio.sleep(1, loop=self._loop)
1547
1548 # update the datastore
1549 datastore.update(vdu)
1550
1551 # add the VDU to the set of processed VDUs
1552 processed.add(vdu.vdu_id)
1553
1554 @asyncio.coroutine
1555 def instantiate(vdu):
1556 """Instantiate the specified VDU
1557
1558 Arguments:
1559 vdu - a VirtualDeploymentUnitRecord
1560
1561 Raises:
1562 if the VDU, or any of the VDUs this VDU depends upon, are
1563 terminated or fail to instantiate properly, a
1564 VirtualDeploymentUnitRecordError is raised.
1565
1566 """
1567 for dependency in dependencies[vdu.vdu_id]:
1568 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1569
1570 while dependency.vdu_id not in processed:
1571 yield from asyncio.sleep(1, loop=self._loop)
1572
1573 if not dependency.active:
1574 raise VirtualDeploymentUnitRecordError()
1575
1576 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1577
1578 # Populate the datastore with the current values of the VDU
1579 datastore.add(vdu)
1580
1581 # Substitute any variables contained in the cloud config script
1582 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1583
1584 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1585 if len(parts) > 1:
1586
1587 # Extract the variable names
1588 variables = list()
1589 for variable in parts[1::2]:
1590 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1591
1592 # Iterate of the variables and substitute values from the
1593 # datastore.
1594 for variable in variables:
1595
1596 # Handle a reference to a VDU by ID
1597 if variable.startswith('vdu['):
1598 value = datastore.get(variable)
1599 if value is None:
1600 msg = "Unable to find a substitute for {} in {} cloud-init script"
1601 raise ValueError(msg.format(variable, vdu.vdu_id))
1602
1603 config = config.replace("{{ %s }}" % variable, value)
1604 continue
1605
1606 # Handle a reference to the current VDU
1607 if variable.startswith('vdu'):
1608 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1609 config = config.replace("{{ %s }}" % variable, value)
1610 continue
1611
1612 # Handle unrecognized variables
1613 msg = 'unrecognized cloud-config variable: {}'
1614 raise ValueError(msg.format(variable))
1615
1616 # Instantiate the VDU
1617 with self._dts.transaction() as xact:
1618 self._log.debug("Instantiating vdu: %s", vdu)
1619 yield from vdu.instantiate(xact, vnfr, config=config)
1620 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1621 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1622 self.vnfr_id, vdu)
1623
1624 # First create a set of tasks to monitor the state of the VDUs and
1625 # report when they have entered a terminal state
1626 for vdu in self._vdus:
1627 self._loop.create_task(instantiate_monitor(vdu))
1628
1629 for vdu in self._vdus:
1630 self._loop.create_task(instantiate(vdu))
1631
1632 def has_mgmt_interface(self, vdu):
1633 # ## TODO: Support additional mgmt_interface type options
1634 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1635 return True
1636 return False
1637
1638 def vlr_xpath(self, vlr_id):
1639 """ vlr xpath """
1640 return(
1641 "D,/vlr:vlr-catalog/"
1642 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1643
1644 def ext_vlr_by_id(self, vlr_id):
1645 """ find ext vlr by id """
1646 return self._ext_vlrs[vlr_id]
1647
1648 @asyncio.coroutine
1649 def publish_inventory(self, xact):
1650 """ Publish the inventory associated with this VNF """
1651 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1652
1653 for component in self._rw_vnfd.component:
1654 self._log.debug("Creating inventory component %s", component)
1655 mangled_name = VcsComponent.mangle_name(component.component_name,
1656 self.vnf_name,
1657 self.vnfd_id
1658 )
1659 comp = VcsComponent(dts=self._dts,
1660 log=self._log,
1661 loop=self._loop,
1662 cluster_name=self._cluster_name,
1663 vcs_handler=self._vcs_handler,
1664 component=component,
1665 mangled_name=mangled_name,
1666 )
1667 if comp.name in self._inventory:
1668 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1669 component, self._vnfd_id)
1670 return
1671 self._log.debug("Adding component %s for vnrf %s",
1672 comp.name, self._vnfr_id)
1673 self._inventory[comp.name] = comp
1674 yield from comp.publish(xact)
1675
1676 def all_vdus_active(self):
1677 """ Are all VDUS in this VNFR active? """
1678 for vdu in self._vdus:
1679 if not vdu.active:
1680 return False
1681
1682 self._log.debug("Inside all_vdus_active. Returning True")
1683 return True
1684
1685 @asyncio.coroutine
1686 def instantiation_failed(self, failed_reason=None):
1687 """ VNFR instantiation failed """
1688 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1689 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1690 self._state_failed_reason = failed_reason
1691
1692 # Update the VNFR with the changed status
1693 yield from self.publish(None)
1694
1695 @asyncio.coroutine
1696 def is_ready(self):
1697 """ This VNF is ready"""
1698 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1699
1700 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1701 self.set_state(VirtualNetworkFunctionRecordState.READY)
1702
1703 else:
1704 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1705
1706 # Update the VNFR with the changed status
1707 yield from self.publish(None)
1708
1709 def update_cp(self, cp_name, ip_address, cp_id):
1710 """Updated the connection point with ip address"""
1711 for cp in self._cprs:
1712 if cp.name == cp_name:
1713 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1714 cp_name, cp, ip_address, cp_id)
1715 cp.ip_address = ip_address
1716 cp.connection_point_id = cp_id
1717 return
1718
1719 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1720 self._log.debug(err)
1721 raise VirtualDeploymentUnitRecordError(err)
1722
1723 def set_state(self, state):
1724 """ Set state for this VNFR"""
1725 self._state = state
1726
1727 @asyncio.coroutine
1728 def instantiate(self, xact, restart_mode=False):
1729 """ instantiate this VNF """
1730 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1731 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1732
1733 @asyncio.coroutine
1734 def fetch_vlrs():
1735 """ Fetch VLRs """
1736 # Iterate over all the connection points in VNFR and fetch the
1737 # associated VLRs
1738
1739 def cpr_from_cp(cp):
1740 """ Creates a record level connection point from the desciptor cp"""
1741 cp_fields = ["name", "image", "vm-flavor"]
1742 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1743 cpr_dict = {}
1744 cpr_dict.update(cp_copy_dict)
1745 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1746
1747 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1748 self._vnfr_id, self._vnfr.connection_point)
1749
1750 for cp in self._vnfr.connection_point:
1751 cpr = cpr_from_cp(cp)
1752 self._cprs.append(cpr)
1753 self._log.debug("Adding Connection point record %s ", cp)
1754
1755 vlr_path = self.vlr_xpath(cp.vlr_ref)
1756 self._log.debug("Fetching VLR with path = %s", vlr_path)
1757 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1758 rwdts.XactFlag.MERGE)
1759 for i in res_iter:
1760 r = yield from i
1761 d = r.result
1762 self._ext_vlrs[cp.vlr_ref] = d
1763 cpr.vlr_ref = cp.vlr_ref
1764 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1765
1766 # Increase the VNFD reference count
1767 self.vnfd_ref()
1768
1769 assert self.vnfd
1770
1771 # Fetch External VLRs
1772 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1773 yield from fetch_vlrs()
1774
1775 # Publish inventory
1776 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1777 yield from self.publish_inventory(xact)
1778
1779 # Publish inventory
1780 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1781 yield from self.create_vls()
1782
1783 # publish the VNFR
1784 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1785 yield from self.publish(xact)
1786
1787 # instantiate VLs
1788 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1789 try:
1790 yield from self.instantiate_vls(xact, restart_mode)
1791 except Exception as e:
1792 self._log.exception("VL instantiation failed (%s)", str(e))
1793 yield from self.instantiation_failed(str(e))
1794 return
1795
1796 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1797
1798 # instantiate VDUs
1799 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1800 yield from self.create_vdus(self, restart_mode)
1801
1802 # publish the VNFR
1803 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1804 yield from self.publish(xact)
1805
1806 # instantiate VDUs
1807 # ToDo: Check if this should be prevented during restart
1808 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1809 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1810
1811 # publish the VNFR
1812 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1813 yield from self.publish(xact)
1814
1815 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1816
1817 # create task updating uptime for this vnfr
1818 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1819 self._loop.create_task(self.vnfr_uptime_update(xact))
1820
1821 @asyncio.coroutine
1822 def terminate(self, xact):
1823 """ Terminate this virtual network function """
1824
1825 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1826
1827 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1828
1829 # stop monitoring
1830 if self._vnf_mon is not None:
1831 self._vnf_mon.stop()
1832 self._vnf_mon.deregister()
1833 self._vnf_mon = None
1834
1835 @asyncio.coroutine
1836 def terminate_vls():
1837 """ Terminate VLs in this VNF """
1838 for vl in self._vlrs:
1839 yield from vl.terminate(xact)
1840
1841 @asyncio.coroutine
1842 def terminate_vdus():
1843 """ Terminate VDUS in this VNF """
1844 for vdu in self._vdus:
1845 yield from vdu.terminate(xact)
1846
1847 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1848 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1849 yield from terminate_vls()
1850
1851 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1852 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1853 yield from terminate_vdus()
1854
1855 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1856 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1857
1858 @asyncio.coroutine
1859 def vnfr_uptime_update(self, xact):
1860 while True:
1861 # Return when vnfr state is FAILED or TERMINATED etc
1862 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1863 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1864 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1865 VirtualNetworkFunctionRecordState.READY]:
1866 return
1867 yield from self.publish(xact)
1868 yield from asyncio.sleep(2, loop=self._loop)
1869
1870
1871
1872 class VnfdDtsHandler(object):
1873 """ DTS handler for VNFD config changes """
1874 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1875
1876 def __init__(self, dts, log, loop, vnfm):
1877 self._dts = dts
1878 self._log = log
1879 self._loop = loop
1880 self._vnfm = vnfm
1881 self._regh = None
1882
1883 @asyncio.coroutine
1884 def regh(self):
1885 """ DTS registration handle """
1886 return self._regh
1887
1888 @asyncio.coroutine
1889 def register(self):
1890 """ Register for VNFD configuration"""
1891
1892 def on_apply(dts, acg, xact, action, scratch):
1893 """Apply the configuration"""
1894 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1895 xact, action, scratch)
1896
1897 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1898
1899 @asyncio.coroutine
1900 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1901 """ on prepare callback """
1902 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1903 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1904 fref = ProtobufC.FieldReference.alloc()
1905 fref.goto_whole_message(msg.to_pbcm())
1906
1907 # Handle deletes in prepare_callback
1908 if fref.is_field_deleted():
1909 # Delete an VNFD record
1910 self._log.debug("Deleting VNFD with id %s", msg.id)
1911 if self._vnfm.vnfd_in_use(msg.id):
1912 self._log.debug("Cannot delete VNFD in use - %s", msg)
1913 err = "Cannot delete a VNFD in use - %s" % msg
1914 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1915 # Delete a VNFD record
1916 yield from self._vnfm.delete_vnfd(msg.id)
1917
1918 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1919
1920 self._log.debug(
1921 "Registering for VNFD config using xpath: %s",
1922 VnfdDtsHandler.XPATH,
1923 )
1924 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1925 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1926 self._regh = acg.register(
1927 xpath=VnfdDtsHandler.XPATH,
1928 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1929 on_prepare=on_prepare)
1930
1931
1932 class VcsComponentDtsHandler(object):
1933 """ Vcs Component DTS handler """
1934 XPATH = ("D,/rw-manifest:manifest" +
1935 "/rw-manifest:operational-inventory" +
1936 "/rw-manifest:component")
1937
1938 def __init__(self, dts, log, loop, vnfm):
1939 self._dts = dts
1940 self._log = log
1941 self._loop = loop
1942 self._regh = None
1943 self._vnfm = vnfm
1944
1945 @property
1946 def regh(self):
1947 """ DTS registration handle """
1948 return self._regh
1949
1950 @asyncio.coroutine
1951 def register(self):
1952 """ Registers VCS component dts publisher registration"""
1953 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1954 VcsComponentDtsHandler.XPATH)
1955
1956 hdl = rift.tasklets.DTS.RegistrationHandler()
1957 handlers = rift.tasklets.Group.Handler()
1958 with self._dts.group_create(handler=handlers) as group:
1959 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1960 handler=hdl,
1961 flags=(rwdts.Flag.PUBLISHER |
1962 rwdts.Flag.NO_PREP_READ |
1963 rwdts.Flag.DATASTORE),)
1964
1965 @asyncio.coroutine
1966 def publish(self, xact, path, msg):
1967 """ Publishes the VCS component """
1968 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1969 xact, path, msg)
1970 self.regh.create_element(path, msg)
1971 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1972 VcsComponentDtsHandler.XPATH, xact, path, msg)
1973
1974 class VnfrConsoleOperdataDtsHandler(object):
1975 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1976 @property
1977 def vnfr_vdu_console_xpath(self):
1978 """ path for resource-mgr"""
1979 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1980
1981 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1982 self._dts = dts
1983 self._log = log
1984 self._loop = loop
1985 self._regh = None
1986 self._vnfm = vnfm
1987
1988 self._vnfr_id = vnfr_id
1989 self._vdur_id = vdur_id
1990 self._vdu_id = vdu_id
1991
1992 @asyncio.coroutine
1993 def register(self):
1994 """ Register for VNFR VDU Operational Data read from dts """
1995
1996 @asyncio.coroutine
1997 def on_prepare(xact_info, action, ks_path, msg):
1998 """ prepare callback from dts """
1999 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2000 self._log.debug(
2001 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2002 xact_info, action, xpath, msg
2003 )
2004
2005 if action == rwdts.QueryAction.READ:
2006 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2007 path_entry = schema.keyspec_to_entry(ks_path)
2008 self._log.debug("VDU Opdata path is {}".format(path_entry))
2009 try:
2010 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2011 except VnfRecordError as e:
2012 self._log.error("VNFR id %s not found", self._vnfr_id)
2013 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2014 return
2015 try:
2016 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2017 if not vdur._state == VDURecordState.READY:
2018 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2019 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2020 return
2021 with self._dts.transaction() as new_xact:
2022 resp = yield from vdur.read_resource(new_xact)
2023 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2024 vdur_console.id = self._vdur_id
2025 if resp.console_url:
2026 vdur_console.console_url = resp.console_url
2027 else:
2028 vdur_console.console_url = 'none'
2029 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2030 except Exception:
2031 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2032 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2033 vdur_console.id = self._vdur_id
2034 vdur_console.console_url = 'none'
2035
2036 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2037 xpath=self.vnfr_vdu_console_xpath,
2038 msg=vdur_console)
2039 else:
2040 #raise VnfRecordError("Not supported operation %s" % action)
2041 self._log.error("Not supported operation %s" % action)
2042 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2043 return
2044
2045
2046 self._log.debug("Registering for VNFR VDU using xpath: %s",
2047 self.vnfr_vdu_console_xpath)
2048 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2049 with self._dts.group_create() as group:
2050 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2051 handler=hdl,
2052 flags=rwdts.Flag.PUBLISHER,
2053 )
2054
2055
2056 class VnfrDtsHandler(object):
2057 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2058 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2059
2060 def __init__(self, dts, log, loop, vnfm):
2061 self._dts = dts
2062 self._log = log
2063 self._loop = loop
2064 self._vnfm = vnfm
2065
2066 self._regh = None
2067
2068 @property
2069 def regh(self):
2070 """ Return registration handle"""
2071 return self._regh
2072
2073 @property
2074 def vnfm(self):
2075 """ Return VNF manager instance """
2076 return self._vnfm
2077
2078 @asyncio.coroutine
2079 def register(self):
2080 """ Register for vnfr create/update/delete/read requests from dts """
2081 def on_commit(xact_info):
2082 """ The transaction has been committed """
2083 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2084 return rwdts.MemberRspCode.ACTION_OK
2085
2086 def on_abort(*args):
2087 """ Abort callback """
2088 self._log.debug("VNF transaction got aborted")
2089
2090 @asyncio.coroutine
2091 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2092
2093 @asyncio.coroutine
2094 def instantiate_realloc_vnfr(vnfr):
2095 """Re-populate the vnfm after restart
2096
2097 Arguments:
2098 vlink
2099
2100 """
2101
2102 yield from vnfr.instantiate(None, restart_mode=True)
2103
2104 if xact_event == rwdts.MemberEvent.INSTALL:
2105 curr_cfg = self.regh.elements
2106 for cfg in curr_cfg:
2107 vnfr = self.vnfm.create_vnfr(cfg)
2108 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2109
2110 self._log.debug("Got on_event in vnfm")
2111
2112 return rwdts.MemberRspCode.ACTION_OK
2113
2114 @asyncio.coroutine
2115 def on_prepare(xact_info, action, ks_path, msg):
2116 """ prepare callback from dts """
2117 self._log.debug(
2118 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2119 xact_info, action, msg
2120 )
2121
2122 if action == rwdts.QueryAction.CREATE:
2123 if not msg.has_field("vnfd"):
2124 err = "Vnfd not provided"
2125 self._log.error(err)
2126 raise VnfRecordError(err)
2127
2128 vnfr = self.vnfm.create_vnfr(msg)
2129 try:
2130 # RIFT-9105: Unable to add a READ query under an existing transaction
2131 # xact = xact_info.xact
2132 yield from vnfr.instantiate(None)
2133 except Exception as e:
2134 self._log.exception(e)
2135 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2136 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2137 yield from vnfr.publish(None)
2138 elif action == rwdts.QueryAction.DELETE:
2139 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2140 path_entry = schema.keyspec_to_entry(ks_path)
2141 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2142
2143 if vnfr is None:
2144 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2145 raise VirtualNetworkFunctionRecordNotFound(
2146 "VNFR id %s", path_entry.key00.id)
2147
2148 try:
2149 yield from vnfr.terminate(xact_info.xact)
2150 # Unref the VNFD
2151 vnfr.vnfd_unref()
2152 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2153 except Exception as e:
2154 self._log.exception(e)
2155 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2156
2157 elif action == rwdts.QueryAction.UPDATE:
2158 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2159 path_entry = schema.keyspec_to_entry(ks_path)
2160 vnfr = None
2161 try:
2162 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2163 except Exception as e:
2164 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2165 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2166 return
2167
2168 if vnfr is None:
2169 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2170 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2171 return
2172
2173 self._log.debug("VNFR {} update config status {} (current {})".
2174 format(vnfr.name, msg.config_status, vnfr.config_status))
2175 # Update the config status and publish
2176 vnfr._config_status = msg.config_status
2177 yield from vnfr.publish(None)
2178
2179 else:
2180 raise NotImplementedError(
2181 "%s action on VirtualNetworkFunctionRecord not supported",
2182 action)
2183
2184 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2185
2186 self._log.debug("Registering for VNFR using xpath: %s",
2187 VnfrDtsHandler.XPATH,)
2188
2189 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2190 on_prepare=on_prepare,)
2191 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2192 with self._dts.group_create(handler=handlers) as group:
2193 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2194 handler=hdl,
2195 flags=(rwdts.Flag.PUBLISHER |
2196 rwdts.Flag.NO_PREP_READ |
2197 rwdts.Flag.CACHE |
2198 rwdts.Flag.DATASTORE),)
2199
2200 @asyncio.coroutine
2201 def create(self, xact, path, msg):
2202 """
2203 Create a VNFR record in DTS with path and message
2204 """
2205 self._log.debug("Creating VNFR xact = %s, %s:%s",
2206 xact, path, msg)
2207
2208 self.regh.create_element(path, msg)
2209 self._log.debug("Created VNFR xact = %s, %s:%s",
2210 xact, path, msg)
2211
2212 @asyncio.coroutine
2213 def update(self, xact, path, msg):
2214 """
2215 Update a VNFR record in DTS with path and message
2216 """
2217 self._log.debug("Updating VNFR xact = %s, %s:%s",
2218 xact, path, msg)
2219 self.regh.update_element(path, msg)
2220 self._log.debug("Updated VNFR xact = %s, %s:%s",
2221 xact, path, msg)
2222
2223 @asyncio.coroutine
2224 def delete(self, xact, path):
2225 """
2226 Delete a VNFR record in DTS with path and message
2227 """
2228 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2229 self.regh.delete_element(path)
2230 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2231
2232
2233 class VnfdRefCountDtsHandler(object):
2234 """ The VNFD Ref Count DTS handler """
2235 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2236
2237 def __init__(self, dts, log, loop, vnfm):
2238 self._dts = dts
2239 self._log = log
2240 self._loop = loop
2241 self._vnfm = vnfm
2242
2243 self._regh = None
2244
2245 @property
2246 def regh(self):
2247 """ Return registration handle """
2248 return self._regh
2249
2250 @property
2251 def vnfm(self):
2252 """ Return the NS manager instance """
2253 return self._vnfm
2254
2255 @asyncio.coroutine
2256 def register(self):
2257 """ Register for VNFD ref count read from dts """
2258
2259 @asyncio.coroutine
2260 def on_prepare(xact_info, action, ks_path, msg):
2261 """ prepare callback from dts """
2262 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2263 self._log.debug(
2264 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2265 xact_info, action, xpath, msg
2266 )
2267
2268 if action == rwdts.QueryAction.READ:
2269 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2270 path_entry = schema.keyspec_to_entry(ks_path)
2271 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2272 for xpath, msg in vnfd_list:
2273 self._log.debug("Responding to ref count query path:%s, msg:%s",
2274 xpath, msg)
2275 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2276 xpath=xpath,
2277 msg=msg)
2278 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2279 else:
2280 raise VnfRecordError("Not supported operation %s" % action)
2281
2282 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2283 with self._dts.group_create() as group:
2284 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2285 handler=hdl,
2286 flags=rwdts.Flag.PUBLISHER,
2287 )
2288
2289
2290 class VdurDatastore(object):
2291 """
2292 This VdurDatastore is intended to expose select information about a VDUR
2293 such that it can be referenced in a cloud config file. The data that is
2294 exposed does not necessarily follow the structure of the data in the yang
2295 model. This is intentional. The data that are exposed are intended to be
2296 agnostic of the yang model so that changes in the model do not necessarily
2297 require changes to the interface provided to the user. It also means that
2298 the user does not need to be familiar with the RIFT.ware yang models.
2299 """
2300
2301 def __init__(self):
2302 """Create an instance of VdurDatastore"""
2303 self._vdur_data = dict()
2304 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2305
2306 def add(self, vdur):
2307 """Add a new VDUR to the datastore
2308
2309 Arguments:
2310 vdur - a VirtualDeploymentUnitRecord instance
2311
2312 Raises:
2313 A ValueError is raised if the VDUR is (1) None or (2) already in
2314 the datastore.
2315
2316 """
2317 if vdur.vdu_id is None:
2318 raise ValueError('VDURs are required to have an ID')
2319
2320 if vdur.vdu_id in self._vdur_data:
2321 raise ValueError('cannot add a VDUR more than once')
2322
2323 self._vdur_data[vdur.vdu_id] = dict()
2324
2325 def set_if_not_none(key, attr):
2326 if attr is not None:
2327 self._vdur_data[vdur.vdu_id][key] = attr
2328
2329 set_if_not_none('name', vdur._vdud.name)
2330 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2331
2332 def update(self, vdur):
2333 """Update the VDUR information in the datastore
2334
2335 Arguments:
2336 vdur - a GI representation of a VDUR
2337
2338 Raises:
2339 A ValueError is raised if the VDUR is (1) None or (2) already in
2340 the datastore.
2341
2342 """
2343 if vdur.vdu_id is None:
2344 raise ValueError('VNFDs are required to have an ID')
2345
2346 if vdur.vdu_id not in self._vdur_data:
2347 raise ValueError('VNF is not recognized')
2348
2349 def set_or_delete(key, attr):
2350 if attr is None:
2351 if key in self._vdur_data[vdur.vdu_id]:
2352 del self._vdur_data[vdur.vdu_id][key]
2353
2354 else:
2355 self._vdur_data[vdur.vdu_id][key] = attr
2356
2357 set_or_delete('name', vdur._vdud.name)
2358 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2359
2360 def remove(self, vdur_id):
2361 """Remove all of the data associated with specified VDUR
2362
2363 Arguments:
2364 vdur_id - the identifier of a VNFD in the datastore
2365
2366 Raises:
2367 A ValueError is raised if the VDUR is not contained in the
2368 datastore.
2369
2370 """
2371 if vdur_id not in self._vdur_data:
2372 raise ValueError('VNF is not recognized')
2373
2374 del self._vdur_data[vdur_id]
2375
2376 def get(self, expr):
2377 """Retrieve VDUR information from the datastore
2378
2379 An expression should be of the form,
2380
2381 vdu[<id>].<attr>
2382
2383 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2384 the exposed attribute that the user wishes to retrieve.
2385
2386 If the requested data is not available, None is returned.
2387
2388 Arguments:
2389 expr - a string that specifies the data to return
2390
2391 Raises:
2392 A ValueError is raised if the provided expression cannot be parsed.
2393
2394 Returns:
2395 The requested data or None
2396
2397 """
2398 result = self._pattern.match(expr)
2399 if result is None:
2400 raise ValueError('data expression not recognized ({})'.format(expr))
2401
2402 vdur_id, key = result.groups()
2403
2404 if vdur_id not in self._vdur_data:
2405 return None
2406
2407 return self._vdur_data[vdur_id].get(key, None)
2408
2409
2410 class VnfManager(object):
2411 """ The virtual network function manager class """
2412 def __init__(self, dts, log, loop, cluster_name):
2413 self._dts = dts
2414 self._log = log
2415 self._loop = loop
2416 self._cluster_name = cluster_name
2417
2418 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2419 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2420 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2421 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2422
2423 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2424 self._vnfr_handler,
2425 self._vcs_handler,
2426 self._vnfr_ref_handler,
2427 self._nsr_handler]
2428 self._vnfrs = {}
2429 self._vnfds_to_vnfr = {}
2430 self._nsrs = {}
2431
2432 @property
2433 def vnfr_handler(self):
2434 """ VNFR dts handler """
2435 return self._vnfr_handler
2436
2437 @property
2438 def vcs_handler(self):
2439 """ VCS dts handler """
2440 return self._vcs_handler
2441
2442 @asyncio.coroutine
2443 def register(self):
2444 """ Register all static DTS handlers """
2445 for hdl in self._dts_handlers:
2446 yield from hdl.register()
2447
2448 @asyncio.coroutine
2449 def run(self):
2450 """ Run this VNFM instance """
2451 self._log.debug("Run VNFManager - registering static DTS handlers""")
2452 yield from self.register()
2453
2454 def handle_nsr(self, nsr, action):
2455 if action in [rwdts.QueryAction.CREATE]:
2456 self._nsrs[nsr.id] = nsr
2457 elif action == rwdts.QueryAction.DELETE:
2458 if nsr.id in self._nsrs:
2459 del self._nsrs[nsr.id]
2460
2461 def get_linked_mgmt_network(self, vnfr):
2462 """For the given VNFR get the related mgmt network from the NSD, if
2463 available.
2464 """
2465 vnfd_id = vnfr.vnfd.id
2466 nsr_id = vnfr.nsr_id_ref
2467
2468 # for the given related VNFR, get the corresponding NSR-config
2469 nsr_obj = None
2470 try:
2471 nsr_obj = self._nsrs[nsr_id]
2472 except KeyError:
2473 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2474
2475 # for the related NSD check if a VLD exists such that it's a mgmt
2476 # network
2477 for vld in nsr_obj.nsd.vld:
2478 if vld.mgmt_network:
2479 return vld.name
2480
2481 return None
2482
2483 def get_vnfr(self, vnfr_id):
2484 """ get VNFR by vnfr id """
2485
2486 if vnfr_id not in self._vnfrs:
2487 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2488
2489 return self._vnfrs[vnfr_id]
2490
2491 def create_vnfr(self, vnfr):
2492 """ Create a VNFR instance """
2493 if vnfr.id in self._vnfrs:
2494 msg = "Vnfr id %s already exists" % vnfr.id
2495 self._log.error(msg)
2496 raise VnfRecordError(msg)
2497
2498 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2499 vnfr.id,
2500 vnfr.vnfd.id)
2501
2502 mgmt_network = self.get_linked_mgmt_network(vnfr)
2503
2504 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2505 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2506 mgmt_network=mgmt_network
2507 )
2508 self._vnfds_to_vnfr[vnfr.vnfd.id] = self._vnfrs[vnfr.id]
2509 return self._vnfrs[vnfr.id]
2510
2511 @asyncio.coroutine
2512 def delete_vnfr(self, xact, vnfr):
2513 """ Create a VNFR instance """
2514 if vnfr.vnfr_id in self._vnfrs:
2515 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2516 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2517 del self._vnfrs[vnfr.vnfr_id]
2518
2519 @asyncio.coroutine
2520 def fetch_vnfd(self, vnfd_id):
2521 """ Fetch VNFDs based with the vnfd id"""
2522 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2523 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2524 vnfd = None
2525
2526 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2527
2528 for ent in res_iter:
2529 res = yield from ent
2530 vnfd = res.result
2531
2532 if vnfd is None:
2533 err = "Failed to get Vnfd %s" % vnfd_id
2534 self._log.error(err)
2535 raise VnfRecordError(err)
2536
2537 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2538
2539 return vnfd
2540
2541 def vnfd_in_use(self, vnfd_id):
2542 """ Is this VNFD in use """
2543 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2544 if vnfd_id in self._vnfds_to_vnfr:
2545 return self._vnfds_to_vnfr[vnfd_id].in_use()
2546 return False
2547
2548 @asyncio.coroutine
2549 def publish_vnfr(self, xact, path, msg):
2550 """ Publish a VNFR """
2551 self._log.debug("publish_vnfr called with path %s, msg %s",
2552 path, msg)
2553 yield from self.vnfr_handler.update(xact, path, msg)
2554
2555 @asyncio.coroutine
2556 def delete_vnfd(self, vnfd_id):
2557 """ Delete the Virtual Network Function descriptor with the passed id """
2558 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2559 if vnfd_id not in self._vnfds_to_vnfr:
2560 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2561 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2562
2563 if self._vnfds_to_vnfr[vnfd_id].in_use():
2564 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2565 vnfd_id,
2566 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2567 raise VirtualNetworkFunctionDescriptorRefCountExists(
2568 "Cannot delete :%s, ref_count:%s",
2569 vnfd_id,
2570 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2571
2572 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2573 try:
2574 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2575 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2576 if os.path.exists(vnfd_dir):
2577 shutil.rmtree(vnfd_dir, ignore_errors=True)
2578 except Exception as e:
2579 self._log.error("Exception in cleaning up VNFD {}: {}".
2580 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2581 self._log.exception(e)
2582
2583 del self._vnfds_to_vnfr[vnfd_id]
2584
2585 def vnfd_refcount_xpath(self, vnfd_id):
2586 """ xpath for ref count entry """
2587 return (VnfdRefCountDtsHandler.XPATH +
2588 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2589
2590 @asyncio.coroutine
2591 def get_vnfd_refcount(self, vnfd_id):
2592 """ Get the vnfd_list from this VNFM"""
2593 vnfd_list = []
2594 if vnfd_id is None or vnfd_id == "":
2595 for vnfr in self._vnfds_to_vnfr.values():
2596 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2597 vnfd_msg.vnfd_id_ref = vnfr.vnfd.id
2598 vnfd_msg.instance_ref_count = vnfr.vnfd_ref_count
2599 vnfd_list.append((self.vnfd_refcount_xpath(vnfr.vnfd.id), vnfd_msg))
2600 elif vnfd_id in self._vnfds_to_vnfr:
2601 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2602 vnfd_msg.vnfd_id_ref = self._vnfds_to_vnfr[vnfd_id].vnfd.id
2603 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count
2604 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2605
2606 return vnfd_list
2607
2608
2609 class VnfmTasklet(rift.tasklets.Tasklet):
2610 """ VNF Manager tasklet class """
2611 def __init__(self, *args, **kwargs):
2612 super(VnfmTasklet, self).__init__(*args, **kwargs)
2613 self.rwlog.set_category("rw-mano-log")
2614 self.rwlog.set_subcategory("vnfm")
2615
2616 self._dts = None
2617 self._vnfm = None
2618
2619 def start(self):
2620 try:
2621 super(VnfmTasklet, self).start()
2622 self.log.info("Starting VnfmTasklet")
2623
2624 self.log.setLevel(logging.DEBUG)
2625
2626 self.log.debug("Registering with dts")
2627 self._dts = rift.tasklets.DTS(self.tasklet_info,
2628 RwVnfmYang.get_schema(),
2629 self.loop,
2630 self.on_dts_state_change)
2631
2632 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2633 except Exception:
2634 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2635 raise
2636
2637 def on_instance_started(self):
2638 """ Task insance started callback """
2639 self.log.debug("Got instance started callback")
2640
2641 def stop(self):
2642 try:
2643 self._dts.deinit()
2644 except Exception:
2645 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2646 raise
2647
2648 @asyncio.coroutine
2649 def init(self):
2650 """ Task init callback """
2651 try:
2652 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2653 assert vm_parent_name is not None
2654 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2655 yield from self._vnfm.run()
2656 except Exception:
2657 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2658 raise
2659
2660 @asyncio.coroutine
2661 def run(self):
2662 """ Task run callback """
2663 pass
2664
2665 @asyncio.coroutine
2666 def on_dts_state_change(self, state):
2667 """Take action according to current dts state to transition
2668 application into the corresponding application state
2669
2670 Arguments
2671 state - current dts state
2672 """
2673 switch = {
2674 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2675 rwdts.State.CONFIG: rwdts.State.RUN,
2676 }
2677
2678 handlers = {
2679 rwdts.State.INIT: self.init,
2680 rwdts.State.RUN: self.run,
2681 }
2682
2683 # Transition application to next state
2684 handler = handlers.get(state, None)
2685 if handler is not None:
2686 yield from handler()
2687
2688 # Transition dts to next state
2689 next_state = switch.get(state, None)
2690 if next_state is not None:
2691 self._dts.handle.set_state(next_state)