e39a8ae8eea90bf4b9c0b3b83ef6cbfc23051f9a
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.mano.dts as mano_dts
53
54
55 class VMResourceError(Exception):
56 """ VM resource Error"""
57 pass
58
59
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
62 pass
63
64
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
67 pass
68
69
70 class NotImplemented(Exception):
71 """Not implemented """
72 pass
73
74
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
77 pass
78
79
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
82 pass
83
84
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
87 pass
88
89
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
92 pass
93
94
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
97 pass
98
99
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
102 pass
103
104
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
107 pass
108
109
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
117 pass
118
119
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
127 pass
128
129
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
132 pass
133
134
135 class VNFMPlacementGroupError(Exception):
136 pass
137
138 class VirtualNetworkFunctionRecordState(enum.Enum):
139 """ VNFR state """
140 INIT = 1
141 VL_INIT_PHASE = 2
142 VM_INIT_PHASE = 3
143 READY = 4
144 TERMINATE = 5
145 VL_TERMINATE_PHASE = 6
146 VDU_TERMINATE_PHASE = 7
147 TERMINATED = 7
148 FAILED = 10
149
150
151 class VDURecordState(enum.Enum):
152 """VDU record state """
153 INIT = 1
154 INSTANTIATING = 2
155 RESOURCE_ALLOC_PENDING = 3
156 READY = 4
157 TERMINATING = 5
158 TERMINATED = 6
159 FAILED = 10
160
161
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
165 self._dts = dts
166 self._log = log
167 self._loop = loop
168 self._component = component
169 self._cluster_name = cluster_name
170 self._vcs_handler = vcs_handler
171 self._mangled_name = mangled_name
172
173 @staticmethod
174 def mangle_name(component_name, vnf_name, vnfd_id):
175 """ mangled component name """
176 return vnf_name + ":" + component_name + ":" + vnfd_id
177
178 @property
179 def name(self):
180 """ name of this component"""
181 return self._mangled_name
182
183 @property
184 def path(self):
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self.name)
190
191 @property
192 def instance_xpath(self):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
195 "/instances" +
196 "/instance" +
197 "[instance-name = '{}']".format(self._cluster_name))
198
199 @property
200 def start_comp_xpath(self):
201 """ start component xpath """
202 return (self.instance_xpath +
203 "/child-n[instance-name = 'START-REQ']")
204
205 def get_start_comp_msg(self, ip_address):
206 """ start this component """
207 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
208 start_msg.instance_name = 'START-REQ'
209 start_msg.component_name = self.name
210 start_msg.admin_command = "START"
211 start_msg.ip_address = ip_address
212
213 return start_msg
214
215 @property
216 def msg(self):
217 """ Returns the message for this vcs component"""
218
219 vcs_comp_dict = self._component.as_dict()
220
221 def mangle_comp_names(comp_dict):
222 """ mangle component name with VNF name, id"""
223 for key, val in comp_dict.items():
224 if isinstance(val, dict):
225 comp_dict[key] = mangle_comp_names(val)
226 elif isinstance(val, list):
227 i = 0
228 for ent in val:
229 if isinstance(ent, dict):
230 val[i] = mangle_comp_names(ent)
231 else:
232 val[i] = ent
233 i += 1
234 elif key == "component_name":
235 comp_dict[key] = VcsComponent.mangle_name(val,
236 self._vnfd_name,
237 self._vnfd_id)
238 return comp_dict
239
240 mangled_dict = mangle_comp_names(vcs_comp_dict)
241 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
242 return msg
243
244 @asyncio.coroutine
245 def publish(self, xact):
246 """ Publishes the VCS component """
247 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self.name, self.path, self.msg)
249 yield from self._vcs_handler.publish(xact, self.path, self.msg)
250
251 @asyncio.coroutine
252 def start(self, xact, parent, ip_addr=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg = self.get_start_comp_msg(ip_addr)
256 self._log.debug("starting component %s %s",
257 self.start_comp_xpath, start_msg)
258 yield from self._dts.query_create(self.start_comp_xpath,
259 0,
260 start_msg)
261 self._log.debug("started component %s, %s",
262 self.start_comp_xpath, start_msg)
263
264
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
267 def __init__(self,
268 dts,
269 log,
270 loop,
271 vdud,
272 vnfr,
273 mgmt_intf,
274 mgmt_network,
275 cloud_account_name,
276 vnfd_package_store,
277 vdur_id=None,
278 placement_groups=[]):
279 self._dts = dts
280 self._log = log
281 self._loop = loop
282 self._vdud = vdud
283 self._vnfr = vnfr
284 self._mgmt_intf = mgmt_intf
285 self._cloud_account_name = cloud_account_name
286 self._vnfd_package_store = vnfd_package_store
287 self._mgmt_network = mgmt_network
288
289 self._vdur_id = vdur_id or str(uuid.uuid4())
290 self._int_intf = []
291 self._ext_intf = []
292 self._state = VDURecordState.INIT
293 self._state_failed_reason = None
294 self._request_id = str(uuid.uuid4())
295 self._name = vnfr.name + "__" + vdud.id
296 self._placement_groups = placement_groups
297 self._rm_regh = None
298 self._vm_resp = None
299 self._vdud_cloud_init = None
300 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
301
302 @asyncio.coroutine
303 def vdu_opdata_register(self):
304 yield from self._vdur_console_handler.register()
305
306 def cp_ip_addr(self, cp_name):
307 """ Find ip address by connection point name """
308 if self._vm_resp is not None:
309 for conn_point in self._vm_resp.connection_points:
310 if conn_point.name == cp_name:
311 return conn_point.ip_address
312 return "0.0.0.0"
313
314 def cp_id(self, cp_name):
315 """ Find connection point id by connection point name """
316 if self._vm_resp is not None:
317 for conn_point in self._vm_resp.connection_points:
318 if conn_point.name == cp_name:
319 return conn_point.connection_point_id
320 return ''
321
322 @property
323 def vdu_id(self):
324 return self._vdud.id
325
326 @property
327 def vm_resp(self):
328 return self._vm_resp
329
330 @property
331 def name(self):
332 """ Return this VDUR's name """
333 return self._name
334
335 @property
336 def cloud_account_name(self):
337 """ Cloud account this VDU should be created in """
338 return self._cloud_account_name
339
340 @property
341 def image_name(self):
342 """ name that should be used to lookup the image on the CMP """
343 if 'image' not in self._vdud:
344 return None
345 return os.path.basename(self._vdud.image)
346
347 @property
348 def image_checksum(self):
349 """ name that should be used to lookup the image on the CMP """
350 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
351
352 @property
353 def management_ip(self):
354 if not self.active:
355 return None
356 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
357
358 @property
359 def vm_management_ip(self):
360 if not self.active:
361 return None
362 return self._vm_resp.management_ip
363
364 @property
365 def operational_status(self):
366 """ Operational status of this VDU"""
367 op_stats_dict = {"INIT": "init",
368 "INSTANTIATING": "vm_init_phase",
369 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
370 "READY": "running",
371 "FAILED": "failed",
372 "TERMINATING": "terminated",
373 "TERMINATED": "terminated",
374 }
375 return op_stats_dict[self._state.name]
376
377 @property
378 def msg(self):
379 """ VDU message """
380 vdu_fields = ["vm_flavor",
381 "guest_epa",
382 "vswitch_epa",
383 "hypervisor_epa",
384 "host_epa",
385 "volumes",
386 "name"]
387 vdu_copy_dict = {k: v for k, v in
388 self._vdud.as_dict().items() if k in vdu_fields}
389 vdur_dict = {"id": self._vdur_id,
390 "vdu_id_ref": self._vdud.id,
391 "operational_status": self.operational_status,
392 "operational_status_details": self._state_failed_reason,
393 }
394 if self.vm_resp is not None:
395 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
396 "flavor_id": self.vm_resp.flavor_id
397 })
398 if self._vm_resp.has_field('image_id'):
399 vdur_dict.update({ "image_id": self.vm_resp.image_id })
400
401 if self.management_ip is not None:
402 vdur_dict["management_ip"] = self.management_ip
403
404 if self.vm_management_ip is not None:
405 vdur_dict["vm_management_ip"] = self.vm_management_ip
406
407 vdur_dict.update(vdu_copy_dict)
408
409 if self.vm_resp is not None:
410 if self._vm_resp.has_field('volumes'):
411 for opvolume in self._vm_resp.volumes:
412 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
413 if len(vdurvol_data) == 1:
414 vdurvol_data[0]["volume_id"] = opvolume.volume_id
415
416 icp_list = []
417 ii_list = []
418
419 for intf, cp_id, vlr in self._int_intf:
420 cp = self.find_internal_cp_by_cp_id(cp_id)
421
422 icp_list.append({"name": cp.name,
423 "id": cp.id,
424 "type_yang": "VPORT",
425 "ip_address": self.cp_ip_addr(cp.name)})
426
427 ii_list.append({"name": intf.name,
428 "vdur_internal_connection_point_ref": cp.id,
429 "virtual_interface": {}})
430
431 vdur_dict["internal_connection_point"] = icp_list
432 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
433 vdur_dict["internal_interface"] = ii_list
434
435 ei_list = []
436 for intf, cp, vlr in self._ext_intf:
437 ei_list.append({"name": cp.name,
438 "vnfd_connection_point_ref": cp.name,
439 "virtual_interface": {}})
440 self._vnfr.update_cp(cp.name, self.cp_ip_addr(cp.name), self.cp_id(cp.name))
441
442 vdur_dict["external_interface"] = ei_list
443
444 placement_groups = []
445 for group in self._placement_groups:
446 placement_groups.append(group.as_dict())
447 vdur_dict['placement_groups_info'] = placement_groups
448
449 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
450
451 @property
452 def resmgr_path(self):
453 """ path for resource-mgr"""
454 return ("D,/rw-resource-mgr:resource-mgmt" +
455 "/vdu-event" +
456 "/vdu-event-data[event-id='{}']".format(self._request_id))
457
458 @property
459 def vm_flavor_msg(self):
460 """ VM flavor message """
461 flavor = self._vdud.vm_flavor.__class__()
462 flavor.copy_from(self._vdud.vm_flavor)
463
464 return flavor
465
466 @property
467 def vdud_cloud_init(self):
468 """ Return the cloud-init contents for the VDU """
469 if self._vdud_cloud_init is None:
470 self._vdud_cloud_init = self.cloud_init()
471
472 return self._vdud_cloud_init
473
474 def cloud_init(self):
475 """ Populate cloud_init with cloud-config script from
476 either the inline contents or from the file provided
477 """
478 if self._vdud.cloud_init is not None:
479 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
480 return self._vdud.cloud_init
481 elif self._vdud.cloud_init_file is not None:
482 # Get cloud-init script contents from the file provided in the cloud_init_file param
483 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
484 filename = self._vdud.cloud_init_file
485 self._vnfd_package_store.refresh()
486 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
487 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
488 try:
489 return cloud_init_extractor.read_script(stored_package, filename)
490 except rift.package.cloud_init.CloudInitExtractionError as e:
491 raise VirtualDeploymentUnitRecordError(e)
492 else:
493 self._log.debug("VDU Instantiation: cloud-init script not provided")
494
495 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
496 host_aggregates = []
497 availability_zones = []
498 server_groups = []
499 for group in self._placement_groups:
500 if group.has_field('host_aggregate'):
501 for aggregate in group.host_aggregate:
502 host_aggregates.append(aggregate.as_dict())
503 if group.has_field('availability_zone'):
504 availability_zones.append(group.availability_zone.as_dict())
505 if group.has_field('server_group'):
506 server_groups.append(group.server_group.as_dict())
507
508 if availability_zones:
509 if len(availability_zones) > 1:
510 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
511 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
512 else:
513 vm_create_msg_dict['availability_zone'] = availability_zones[0]
514
515 if server_groups:
516 if len(server_groups) > 1:
517 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
518 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
519 else:
520 vm_create_msg_dict['server_group'] = server_groups[0]
521
522 if host_aggregates:
523 vm_create_msg_dict['host_aggregate'] = host_aggregates
524
525 return
526
527 def process_placement_groups(self, vm_create_msg_dict):
528 """Process the placement_groups and fill resource-mgr request"""
529 if not self._placement_groups:
530 return
531
532 cloud_set = set([group.cloud_type for group in self._placement_groups])
533 assert len(cloud_set) == 1
534 cloud_type = cloud_set.pop()
535
536 if cloud_type == 'openstack':
537 self.process_openstack_placement_group_construct(vm_create_msg_dict)
538
539 else:
540 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
541 return
542
543 def resmgr_msg(self, config=None):
544 vdu_fields = ["vm_flavor",
545 "guest_epa",
546 "vswitch_epa",
547 "hypervisor_epa",
548 "host_epa"]
549
550 self._log.debug("Creating params based on VDUD: %s", self._vdud)
551 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
552
553 vm_create_msg_dict = {
554 "name": self.name,
555 }
556
557 if self.image_name is not None:
558 vm_create_msg_dict["image_name"] = self.image_name
559
560 if self.image_checksum is not None:
561 vm_create_msg_dict["image_checksum"] = self.image_checksum
562
563 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
564 if self._vdud.has_field('mgmt_vpci'):
565 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
566
567 self._log.debug("VDUD: %s", self._vdud)
568 if config is not None:
569 vm_create_msg_dict['vdu_init'] = {'userdata': config}
570
571 if self._mgmt_network:
572 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
573
574 cp_list = []
575 for intf, cp, vlr in self._ext_intf:
576 cp_info = {"name": cp.name,
577 "virtual_link_id": vlr.network_id,
578 "type_yang": intf.virtual_interface.type_yang}
579
580 try:
581 if cp.static_ip_address:
582 cp_info["static_ip_address"] = cp.static_ip_address
583 except AttributeError as e:
584 pass
585
586 if (intf.virtual_interface.has_field('vpci') and
587 intf.virtual_interface.vpci is not None):
588 cp_info["vpci"] = intf.virtual_interface.vpci
589
590 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
591 cp_info['security_group'] = vlr.ip_profile_params.security_group
592
593 self._log.debug("External CP info {}".format(cp_info))
594 cp_list.append(cp_info)
595
596 for intf, cp_id, vlr in self._int_intf:
597 cp = self.find_internal_cp_by_cp_id(cp_id)
598
599 cp_dict = {"name": cp_id,
600 "virtual_link_id": vlr.network_id,
601 "type_yang": intf.virtual_interface.type_yang}
602
603 if (intf.virtual_interface.has_field('vpci') and
604 intf.virtual_interface.vpci is not None):
605 cp_dict["vpci"] = intf.virtual_interface.vpci
606
607 try:
608 if cp.static_ip_address:
609 cp_dict["static_ip_address"] = cp.static_ip_address
610 except AttributeError as e:
611 pass
612
613 self._log.debug("Internal CP info {}".format(cp_info))
614 cp_list.append(cp_dict)
615
616 vm_create_msg_dict["connection_points"] = cp_list
617 vm_create_msg_dict.update(vdu_copy_dict)
618
619 self.process_placement_groups(vm_create_msg_dict)
620
621 msg = RwResourceMgrYang.VDUEventData()
622 msg.event_id = self._request_id
623 msg.cloud_account = self.cloud_account_name
624 msg.request_info.from_dict(vm_create_msg_dict)
625
626 for volume in self._vdud.volumes:
627 v = msg.request_info.volumes.add()
628 v.from_dict(volume.as_dict())
629 return msg
630
631 @asyncio.coroutine
632 def terminate(self, xact):
633 """ Delete resource in VIM """
634 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
635 self._log.warning("VDU terminate in not ready state - Ignoring request")
636 return
637
638 self._state = VDURecordState.TERMINATING
639 if self._vm_resp is not None:
640 try:
641 with self._dts.transaction() as new_xact:
642 yield from self.delete_resource(new_xact)
643 except Exception:
644 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
645
646 if self._rm_regh is not None:
647 self._log.debug("Deregistering resource manager registration handle")
648 self._rm_regh.deregister()
649 self._rm_regh = None
650
651 if self._vdur_console_handler is not None:
652 self._log.error("Deregistering vnfr vdur registration handle")
653 self._vdur_console_handler._regh.deregister()
654 self._vdur_console_handler._regh = None
655
656 self._state = VDURecordState.TERMINATED
657
658 def find_internal_cp_by_cp_id(self, cp_id):
659 """ Find the CP corresponding to the connection point id"""
660 cp = None
661
662 self._log.debug("find_internal_cp_by_cp_id(%s) called",
663 cp_id)
664
665 for int_cp in self._vdud.internal_connection_point:
666 self._log.debug("Checking for int cp %s in internal connection points",
667 int_cp.id)
668 if int_cp.id == cp_id:
669 cp = int_cp
670 break
671
672 if cp is None:
673 self._log.debug("Failed to find cp %s in internal connection points",
674 cp_id)
675 msg = "Failed to find cp %s in internal connection points" % cp_id
676 raise VduRecordError(msg)
677
678 # return the VLR associated with the connection point
679 return cp
680
681 @asyncio.coroutine
682 def create_resource(self, xact, vnfr, config=None):
683 """ Request resource from ResourceMgr """
684 def find_cp_by_name(cp_name):
685 """ Find a connection point by name """
686 cp = None
687 self._log.debug("find_cp_by_name(%s) called", cp_name)
688 for ext_cp in vnfr._cprs:
689 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
690 if ext_cp.name == cp_name:
691 cp = ext_cp
692 break
693 if cp is None:
694 self._log.debug("Failed to find cp %s in external connection points",
695 cp_name)
696 return cp
697
698 def find_internal_vlr_by_cp_id(cp_id):
699 self._log.debug("find_internal_vlr_by_cp_id(%s) called",
700 cp_id)
701
702 # Validate the cp
703 cp = self.find_internal_cp_by_cp_id(cp_id)
704
705 # return the VLR associated with the connection point
706 return vnfr.find_vlr_by_cp(cp_id)
707
708 block = xact.block_create()
709
710 self._log.debug("Executing vm request id: %s, action: create",
711 self._request_id)
712
713 # Resolve the networks associated external interfaces
714 for ext_intf in self._vdud.external_interface:
715 self._log.debug("Resolving external interface name [%s], cp[%s]",
716 ext_intf.name, ext_intf.vnfd_connection_point_ref)
717 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
718 if cp is None:
719 self._log.debug("Failed to find connection point - %s",
720 ext_intf.vnfd_connection_point_ref)
721 continue
722 self._log.debug("Connection point name [%s], type[%s]",
723 cp.name, cp.type_yang)
724
725 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
726
727 etuple = (ext_intf, cp, vlr)
728 self._ext_intf.append(etuple)
729
730 self._log.debug("Created external interface tuple : %s", etuple)
731
732 # Resolve the networks associated internal interfaces
733 for intf in self._vdud.internal_interface:
734 cp_id = intf.vdu_internal_connection_point_ref
735 self._log.debug("Resolving internal interface name [%s], cp[%s]",
736 intf.name, cp_id)
737
738 try:
739 vlr = find_internal_vlr_by_cp_id(cp_id)
740 except Exception as e:
741 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
742 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
743 raise VduRecordError(msg)
744
745 ituple = (intf, cp_id, vlr)
746 self._int_intf.append(ituple)
747
748 self._log.debug("Created internal interface tuple : %s", ituple)
749
750 resmgr_path = self.resmgr_path
751 resmgr_msg = self.resmgr_msg(config)
752
753 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
754 block.add_query_create(resmgr_path, resmgr_msg)
755
756 res_iter = yield from block.execute(now=True)
757
758 resp = None
759
760 for i in res_iter:
761 r = yield from i
762 resp = r.result
763
764 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
765 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
766 self._log.debug("Got vm request response: %s", resp.resource_info)
767 return resp.resource_info
768
769 @asyncio.coroutine
770 def delete_resource(self, xact):
771 block = xact.block_create()
772
773 self._log.debug("Executing vm request id: %s, action: delete",
774 self._request_id)
775
776 block.add_query_delete(self.resmgr_path)
777
778 yield from block.execute(flags=0, now=True)
779
780 @asyncio.coroutine
781 def read_resource(self, xact):
782 block = xact.block_create()
783
784 self._log.debug("Executing vm request id: %s, action: delete",
785 self._request_id)
786
787 block.add_query_read(self.resmgr_path)
788
789 res_iter = yield from block.execute(flags=0, now=True)
790 for i in res_iter:
791 r = yield from i
792 resp = r.result
793
794 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
795 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
796 self._log.debug("Got vm request response: %s", resp.resource_info)
797 #self._vm_resp = resp.resource_info
798 return resp.resource_info
799
800
801 @asyncio.coroutine
802 def start_component(self):
803 """ This VDUR is active """
804 self._log.debug("Starting component %s for vdud %s vdur %s",
805 self._vdud.vcs_component_ref,
806 self._vdud,
807 self._vdur_id)
808 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
809 self.vm_resp.management_ip)
810
811 @property
812 def active(self):
813 """ Is this VDU active """
814 return True if self._state is VDURecordState.READY else False
815
816 @asyncio.coroutine
817 def instantiation_failed(self, failed_reason=None):
818 """ VDU instantiation failed """
819 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
820 self._state = VDURecordState.FAILED
821 self._state_failed_reason = failed_reason
822 yield from self._vnfr.instantiation_failed(failed_reason)
823
824 @asyncio.coroutine
825 def vdu_is_active(self):
826 """ This VDU is active"""
827 if self.active:
828 self._log.warning("VDU %s was already marked as active", self._vdur_id)
829 return
830
831 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
832
833 if self._vdud.vcs_component_ref is not None:
834 yield from self.start_component()
835
836 self._state = VDURecordState.READY
837
838 if self._vnfr.all_vdus_active():
839 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
840 yield from self._vnfr.is_ready()
841
842 @asyncio.coroutine
843 def instantiate(self, xact, vnfr, config=None):
844 """ Instantiate this VDU """
845 self._state = VDURecordState.INSTANTIATING
846
847 @asyncio.coroutine
848 def on_prepare(xact_info, query_action, ks_path, msg):
849 """ This VDUR is active """
850 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
851 query_action,
852 ks_path,
853 msg)
854
855 if (query_action == rwdts.QueryAction.UPDATE or
856 query_action == rwdts.QueryAction.CREATE):
857 self._vm_resp = msg
858
859 if msg.resource_state == "active":
860 # Move this VDU to ready state
861 yield from self.vdu_is_active()
862 elif msg.resource_state == "failed":
863 yield from self.instantiation_failed(msg.resource_errors)
864 elif query_action == rwdts.QueryAction.DELETE:
865 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
866 else:
867 raise NotImplementedError(
868 "%s action on VirtualDeployementUnitRecord not supported",
869 query_action)
870
871 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
872
873 try:
874 reg_event = asyncio.Event(loop=self._loop)
875
876 @asyncio.coroutine
877 def on_ready(regh, status):
878 reg_event.set()
879
880 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
881 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
882 flags=rwdts.Flag.SUBSCRIBER,
883 handler=handler)
884 yield from reg_event.wait()
885
886 vm_resp = yield from self.create_resource(xact, vnfr, config)
887 self._vm_resp = vm_resp
888
889 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
890 self._log.debug("Requested VM from resource manager response %s",
891 vm_resp)
892 if vm_resp.resource_state == "active":
893 self._log.debug("Resourcemgr responded wih an active vm resp %s",
894 vm_resp)
895 yield from self.vdu_is_active()
896 self._state = VDURecordState.READY
897 elif (vm_resp.resource_state == "pending" or
898 vm_resp.resource_state == "inactive"):
899 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
900 vm_resp)
901 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
902 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
903 # flags=rwdts.Flag.SUBSCRIBER,
904 # handler=handler)
905 else:
906 self._log.debug("Resourcemgr responded wih an error vm resp %s",
907 vm_resp)
908 raise VirtualDeploymentUnitRecordError(
909 "Failed VDUR instantiation %s " % vm_resp)
910
911 except Exception as e:
912 import traceback
913 traceback.print_exc()
914 self._log.exception(e)
915 self._log.error("Instantiation of VDU record failed: %s", str(e))
916 self._state = VDURecordState.FAILED
917 yield from self.instantiation_failed(str(e))
918
919
920 class VlRecordState(enum.Enum):
921 """ VL Record State """
922 INIT = 101
923 INSTANTIATION_PENDING = 102
924 ACTIVE = 103
925 TERMINATE_PENDING = 104
926 TERMINATED = 105
927 FAILED = 106
928
929
930 class InternalVirtualLinkRecord(object):
931 """ Internal Virtual Link record """
932 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
933 self._dts = dts
934 self._log = log
935 self._loop = loop
936 self._ivld_msg = ivld_msg
937 self._vnfr_name = vnfr_name
938 self._cloud_account_name = cloud_account_name
939
940 self._vlr_req = self.create_vlr()
941 self._vlr = None
942 self._state = VlRecordState.INIT
943
944 @property
945 def vlr_id(self):
946 """ Find VLR by id """
947 return self._vlr_req.id
948
949 @property
950 def name(self):
951 """ Name of this VL """
952 return self._vnfr_name + "." + self._ivld_msg.name
953
954 @property
955 def network_id(self):
956 """ Find VLR by id """
957 return self._vlr.network_id if self._vlr else None
958
959 def vlr_path(self):
960 """ VLR path for this VLR instance"""
961 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
962
963 def create_vlr(self):
964 """ Create the VLR record which will be instantiated """
965
966 vld_fields = ["short_name",
967 "vendor",
968 "description",
969 "version",
970 "type_yang",
971 "provider_network"]
972
973 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
974
975 vlr_dict = {"id": str(uuid.uuid4()),
976 "name": self.name,
977 "cloud_account": self._cloud_account_name,
978 }
979 vlr_dict.update(vld_copy_dict)
980
981 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
982 return vlr
983
984 @asyncio.coroutine
985 def instantiate(self, xact, restart_mode=False):
986 """ Instantiate VL """
987
988 @asyncio.coroutine
989 def instantiate_vlr():
990 """ Instantiate VLR"""
991 self._log.debug("Create VL with xpath %s and vlr %s",
992 self.vlr_path(), self._vlr_req)
993
994 with self._dts.transaction(flags=0) as xact:
995 block = xact.block_create()
996 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
997 self._log.debug("Executing VL create path:%s msg:%s",
998 self.vlr_path(), self._vlr_req)
999
1000 res_iter = None
1001 try:
1002 res_iter = yield from block.execute()
1003 except Exception:
1004 self._state = VlRecordState.FAILED
1005 self._log.exception("Caught exception while instantial VL")
1006 raise
1007
1008 for ent in res_iter:
1009 res = yield from ent
1010 self._vlr = res.result
1011
1012 if self._vlr.operational_status == 'failed':
1013 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1014 self._state = VlRecordState.FAILED
1015 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1016
1017 self._log.info("Created VL with xpath %s and vlr %s",
1018 self.vlr_path(), self._vlr)
1019
1020 @asyncio.coroutine
1021 def get_vlr():
1022 """ Get the network id """
1023 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1024 vlr = None
1025 for ent in res_iter:
1026 res = yield from ent
1027 vlr = res.result
1028
1029 if vlr is None:
1030 err = "Failed to get VLR for path %s" % self.vlr_path()
1031 self._log.warn(err)
1032 raise InternalVirtualLinkRecordError(err)
1033 return vlr
1034
1035 self._state = VlRecordState.INSTANTIATION_PENDING
1036
1037 if restart_mode:
1038 vl = yield from get_vlr()
1039 if vl is None:
1040 yield from instantiate_vlr()
1041 else:
1042 yield from instantiate_vlr()
1043
1044 self._state = VlRecordState.ACTIVE
1045
1046 def vlr_in_vns(self):
1047 """ Is there a VLR record in VNS """
1048 if (self._state == VlRecordState.ACTIVE or
1049 self._state == VlRecordState.INSTANTIATION_PENDING or
1050 self._state == VlRecordState.FAILED):
1051 return True
1052
1053 return False
1054
1055 @asyncio.coroutine
1056 def terminate(self, xact):
1057 """Terminate this VL """
1058 if not self.vlr_in_vns():
1059 self._log.debug("Ignoring terminate request for id %s in state %s",
1060 self.vlr_id, self._state)
1061 return
1062
1063 self._log.debug("Terminating VL with path %s", self.vlr_path())
1064 self._state = VlRecordState.TERMINATE_PENDING
1065 block = xact.block_create()
1066 block.add_query_delete(self.vlr_path())
1067 yield from block.execute(flags=0, now=True)
1068 self._state = VlRecordState.TERMINATED
1069 self._log.debug("Terminated VL with path %s", self.vlr_path())
1070
1071
1072 class VirtualNetworkFunctionRecord(object):
1073 """ Virtual Network Function Record """
1074 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1075 self._dts = dts
1076 self._log = log
1077 self._loop = loop
1078 self._cluster_name = cluster_name
1079 self._vnfr_msg = vnfr_msg
1080 self._vnfr_id = vnfr_msg.id
1081 self._vnfd_id = vnfr_msg.vnfd.id
1082 self._vnfm = vnfm
1083 self._vcs_handler = vcs_handler
1084 self._vnfr = vnfr_msg
1085 self._mgmt_network = mgmt_network
1086
1087 self._vnfd = vnfr_msg.vnfd
1088 self._state = VirtualNetworkFunctionRecordState.INIT
1089 self._state_failed_reason = None
1090 self._ext_vlrs = {} # The list of external virtual links
1091 self._vlrs = [] # The list of internal virtual links
1092 self._vdus = [] # The list of vdu
1093 self._vlr_by_cp = {}
1094 self._cprs = []
1095 self._inventory = {}
1096 self._create_time = int(time.time())
1097 self._vnf_mon = None
1098 self._config_status = vnfr_msg.config_status
1099 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1100 self._rw_vnfd = None
1101 self._vnfd_ref_count = 0
1102
1103 def _get_vdur_from_vdu_id(self, vdu_id):
1104 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1105 self._log.debug("Searching through vdus: %s", self._vdus)
1106 for vdu in self._vdus:
1107 self._log.debug("vdu_id: %s", vdu.vdu_id)
1108 if vdu.vdu_id == vdu_id:
1109 return vdu
1110
1111 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1112
1113 @property
1114 def operational_status(self):
1115 """ Operational status of this VNFR """
1116 op_status_map = {"INIT": "init",
1117 "VL_INIT_PHASE": "vl_init_phase",
1118 "VM_INIT_PHASE": "vm_init_phase",
1119 "READY": "running",
1120 "TERMINATE": "terminate",
1121 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1122 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1123 "TERMINATED": "terminated",
1124 "FAILED": "failed", }
1125 return op_status_map[self._state.name]
1126
1127 @staticmethod
1128 def vnfd_xpath(vnfd_id):
1129 """ VNFD xpath associated with this VNFR """
1130 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1131
1132 @property
1133 def vnfd_ref_count(self):
1134 """ Returns the VNFD reference count associated with this VNFR """
1135 return self._vnfd_ref_count
1136
1137 def vnfd_in_use(self):
1138 """ Returns whether vnfd is in use or not """
1139 return True if self._vnfd_ref_count > 0 else False
1140
1141 def vnfd_ref(self):
1142 """ Take a reference on this object """
1143 self._vnfd_ref_count += 1
1144 return self._vnfd_ref_count
1145
1146 def vnfd_unref(self):
1147 """ Release reference on this object """
1148 if self._vnfd_ref_count < 1:
1149 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1150 (self.vnfd.id, self._vnfd_ref_count))
1151 self._log.critical(msg)
1152 raise VnfRecordError(msg)
1153 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1154 self.vnfd.id, self._vnfd_ref_count)
1155 self._vnfd_ref_count -= 1
1156 return self._vnfd_ref_count
1157
1158 @property
1159 def vnfd(self):
1160 """ VNFD for this VNFR """
1161 return self._vnfd
1162
1163 @property
1164 def vnf_name(self):
1165 """ VNFD name associated with this VNFR """
1166 return self.vnfd.name
1167
1168 @property
1169 def name(self):
1170 """ Name of this VNF in the record """
1171 return self._vnfr.name
1172
1173 @property
1174 def cloud_account_name(self):
1175 """ Name of the cloud account this VNFR is instantiated in """
1176 return self._vnfr.cloud_account
1177
1178 @property
1179 def vnfd_id(self):
1180 """ VNFD Id associated with this VNFR """
1181 return self.vnfd.id
1182
1183 @property
1184 def vnfr_id(self):
1185 """ VNFR Id associated with this VNFR """
1186 return self._vnfr_id
1187
1188 @property
1189 def member_vnf_index(self):
1190 """ Member VNF index associated with this VNFR """
1191 return self._vnfr.member_vnf_index_ref
1192
1193 @property
1194 def config_status(self):
1195 """ Config agent status for this VNFR """
1196 return self._config_status
1197
1198 def component_by_name(self, component_name):
1199 """ Find a component by name in the inventory list"""
1200 mangled_name = VcsComponent.mangle_name(component_name,
1201 self.vnf_name,
1202 self.vnfd_id)
1203 return self._inventory[mangled_name]
1204
1205
1206
1207 @asyncio.coroutine
1208 def get_nsr_config(self):
1209 ### Need access to NS instance configuration for runtime resolution.
1210 ### This shall be replaced when deployment flavors are implemented
1211 xpath = "C,/nsr:ns-instance-config"
1212 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1213
1214 for result in results:
1215 entry = yield from result
1216 ns_instance_config = entry.result
1217 for nsr in ns_instance_config.nsr:
1218 if nsr.id == self._vnfr_msg.nsr_id_ref:
1219 return nsr
1220 return None
1221
1222 @asyncio.coroutine
1223 def start_component(self, component_name, ip_addr):
1224 """ Start a component in the VNFR by name """
1225 comp = self.component_by_name(component_name)
1226 yield from comp.start(None, None, ip_addr)
1227
1228 def cp_ip_addr(self, cp_name):
1229 """ Get ip address for connection point """
1230 self._log.debug("cp_ip_addr()")
1231 for cp in self._cprs:
1232 if cp.name == cp_name and cp.ip_address is not None:
1233 return cp.ip_address
1234 return "0.0.0.0"
1235
1236 def mgmt_intf_info(self):
1237 """ Get Management interface info for this VNFR """
1238 mgmt_intf_desc = self.vnfd.mgmt_interface
1239 ip_addr = None
1240 if mgmt_intf_desc.has_field("cp"):
1241 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1242 elif mgmt_intf_desc.has_field("vdu_id"):
1243 try:
1244 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1245 ip_addr = vdur.management_ip
1246 except VDURecordNotFound:
1247 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1248 ip_addr = None
1249 else:
1250 ip_addr = mgmt_intf_desc.ip_address
1251 port = mgmt_intf_desc.port
1252
1253 return ip_addr, port
1254
1255 @property
1256 def msg(self):
1257 """ Message associated with this VNFR """
1258 vnfd_fields = ["short_name", "vendor", "description", "version"]
1259 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1260
1261 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1262 ip_address, port = self.mgmt_intf_info()
1263
1264 if ip_address is not None:
1265 mgmt_intf.ip_address = ip_address
1266 if port is not None:
1267 mgmt_intf.port = port
1268
1269 vnfr_dict = {"id": self._vnfr_id,
1270 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1271 "name": self.name,
1272 "member_vnf_index_ref": self.member_vnf_index,
1273 "operational_status": self.operational_status,
1274 "operational_status_details": self._state_failed_reason,
1275 "cloud_account": self.cloud_account_name,
1276 "config_status": self._config_status
1277 }
1278
1279 vnfr_dict.update(vnfd_copy_dict)
1280
1281 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1282 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1283
1284 vnfr_msg.create_time = self._create_time
1285 vnfr_msg.uptime = int(time.time()) - self._create_time
1286 vnfr_msg.mgmt_interface = mgmt_intf
1287
1288 # Add all the VLRs to VNFR
1289 for vlr in self._vlrs:
1290 ivlr = vnfr_msg.internal_vlr.add()
1291 ivlr.vlr_ref = vlr.vlr_id
1292
1293 # Add all the VDURs to VDUR
1294 if self._vdus is not None:
1295 for vdu in self._vdus:
1296 vdur = vnfr_msg.vdur.add()
1297 vdur.from_dict(vdu.msg.as_dict())
1298
1299 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1300 vnfr_msg.dashboard_url = self.dashboard_url
1301
1302 for cpr in self._cprs:
1303 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1304 vnfr_msg.connection_point.append(new_cp)
1305
1306 if self._vnf_mon is not None:
1307 for monp in self._vnf_mon.msg:
1308 vnfr_msg.monitoring_param.append(
1309 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1310
1311 if self._vnfr.vnf_configuration is not None:
1312 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1313 if (ip_address is not None and
1314 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1315 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1316
1317 for group in self._vnfr_msg.placement_groups_info:
1318 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1319 group_info.from_dict(group.as_dict())
1320 vnfr_msg.placement_groups_info.append(group_info)
1321
1322 return vnfr_msg
1323
1324 @property
1325 def dashboard_url(self):
1326 ip, cfg_port = self.mgmt_intf_info()
1327 protocol = 'http'
1328 http_port = 80
1329 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1330 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1331 protocol = 'https'
1332 http_port = 443
1333 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1334 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1335
1336 url = "{protocol}://{ip_address}:{port}/{path}".format(
1337 protocol=protocol,
1338 ip_address=ip,
1339 port=http_port,
1340 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1341 )
1342
1343 return url
1344
1345 @property
1346 def xpath(self):
1347 """ path for this VNFR """
1348 return("D,/vnfr:vnfr-catalog"
1349 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1350
1351 @asyncio.coroutine
1352 def publish(self, xact):
1353 """ publish this VNFR """
1354 vnfr = self.msg
1355 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1356 self.xpath, self.msg)
1357 vnfr.create_time = self._create_time
1358 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1359 self._log.debug("Published VNFR path = [%s], record = [%s]",
1360 self.xpath, self.msg)
1361
1362 @asyncio.coroutine
1363 def create_vls(self):
1364 """ Publish The VLs associated with this VNF """
1365 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1366 self.vnfd_id)
1367 for ivld_msg in self.vnfd.internal_vld:
1368 self._log.debug("Creating internal vld:"
1369 " %s, int_cp_ref = %s",
1370 ivld_msg, ivld_msg.internal_connection_point
1371 )
1372 vlr = InternalVirtualLinkRecord(dts=self._dts,
1373 log=self._log,
1374 loop=self._loop,
1375 ivld_msg=ivld_msg,
1376 vnfr_name=self.name,
1377 cloud_account_name=self.cloud_account_name
1378 )
1379 self._vlrs.append(vlr)
1380
1381 for int_cp in ivld_msg.internal_connection_point:
1382 if int_cp.id_ref in self._vlr_by_cp:
1383 msg = ("Connection point %s already "
1384 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1385 raise InternalVirtualLinkRecordError(msg)
1386 self._log.debug("Setting vlr %s to internal cp = %s",
1387 vlr, int_cp.id_ref)
1388 self._vlr_by_cp[int_cp.id_ref] = vlr
1389
1390 @asyncio.coroutine
1391 def instantiate_vls(self, xact, restart_mode=False):
1392 """ Instantiate the VLs associated with this VNF """
1393 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1394 self.vnfd_id)
1395
1396 for vlr in self._vlrs:
1397 self._log.debug("Instantiating VLR %s", vlr)
1398 yield from vlr.instantiate(xact, restart_mode)
1399
1400 def find_vlr_by_cp(self, cp_name):
1401 """ Find the VLR associated with the cp name """
1402 return self._vlr_by_cp[cp_name]
1403
1404 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1405 """
1406 Returns the cloud specific construct for placement group
1407 Arguments:
1408 input_group: VNFD PlacementGroup
1409 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1410 """
1411 copy_dict = ['name', 'requirement', 'strategy']
1412 for group_info in nsr_config.vnfd_placement_group_maps:
1413 if group_info.placement_group_ref == input_group.name and \
1414 group_info.vnfd_id_ref == self.vnfd_id:
1415 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1416 group_dict = {k:v for k,v in
1417 group_info.as_dict().items()
1418 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1419 for param in copy_dict:
1420 group_dict.update({param: getattr(input_group, param)})
1421 group.from_dict(group_dict)
1422 return group
1423 return None
1424
1425 @asyncio.coroutine
1426 def get_vdu_placement_groups(self, vdu):
1427 placement_groups = []
1428 ### Step-1: Get VNF level placement groups
1429 for group in self._vnfr_msg.placement_groups_info:
1430 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1431 #group_info.from_dict(group.as_dict())
1432 placement_groups.append(group)
1433
1434 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1435 nsr_config = yield from self.get_nsr_config()
1436
1437 ### Step-3: Get VDU level placement groups
1438 for group in self.vnfd.placement_groups:
1439 for member_vdu in group.member_vdus:
1440 if member_vdu.member_vdu_ref == vdu.id:
1441 group_info = self.resolve_placement_group_cloud_construct(group,
1442 nsr_config)
1443 if group_info is None:
1444 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1445 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1446 else:
1447 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1448 str(group_info),
1449 vdu.name,
1450 self.vnf_name,
1451 self.member_vnf_index)
1452 placement_groups.append(group_info)
1453
1454 return placement_groups
1455
1456 @asyncio.coroutine
1457 def create_vdus(self, vnfr, restart_mode=False):
1458 """ Create the VDUs associated with this VNF """
1459
1460 def get_vdur_id(vdud):
1461 """Get the corresponding VDUR's id for the VDUD. This is useful in
1462 case of a restart.
1463
1464 In restart mode we check for exiting VDUR's ID and use them, if
1465 available. This way we don't end up creating duplicate VDURs
1466 """
1467 vdur_id = None
1468
1469 if restart_mode and vdud is not None:
1470 try:
1471 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1472 vdur_id = vdur[0]
1473 except IndexError:
1474 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1475
1476 return vdur_id
1477
1478
1479 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1480 for vdu in self._rw_vnfd.vdu:
1481 self._log.debug("Creating vdu: %s", vdu)
1482 vdur_id = get_vdur_id(vdu)
1483
1484 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1485 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1486 vdu.name,
1487 self.vnf_name,
1488 self.member_vnf_index,
1489 [ group.name for group in placement_groups])
1490
1491 vdur = VirtualDeploymentUnitRecord(
1492 dts=self._dts,
1493 log=self._log,
1494 loop=self._loop,
1495 vdud=vdu,
1496 vnfr=vnfr,
1497 mgmt_intf=self.has_mgmt_interface(vdu),
1498 mgmt_network=self._mgmt_network,
1499 cloud_account_name=self.cloud_account_name,
1500 vnfd_package_store=self._vnfd_package_store,
1501 vdur_id=vdur_id,
1502 placement_groups = placement_groups,
1503 )
1504 yield from vdur.vdu_opdata_register()
1505
1506 self._vdus.append(vdur)
1507
1508 @asyncio.coroutine
1509 def instantiate_vdus(self, xact, vnfr):
1510 """ Instantiate the VDUs associated with this VNF """
1511 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1512
1513 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1514
1515 # Identify any dependencies among the VDUs
1516 dependencies = collections.defaultdict(list)
1517 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1518
1519 for vdu in self._vdus:
1520 if vdu.vdud_cloud_init is not None:
1521 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1522 if vdu_id != vdu.vdu_id:
1523 # This means that vdu.vdu_id depends upon vdu_id,
1524 # i.e. vdu_id must be instantiated before
1525 # vdu.vdu_id.
1526 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1527
1528 # Define the terminal states of VDU instantiation
1529 terminal = (
1530 VDURecordState.READY,
1531 VDURecordState.TERMINATED,
1532 VDURecordState.FAILED,
1533 )
1534
1535 datastore = VdurDatastore()
1536 processed = set()
1537
1538 @asyncio.coroutine
1539 def instantiate_monitor(vdu):
1540 """Monitor the state of the VDU during instantiation
1541
1542 Arguments:
1543 vdu - a VirtualDeploymentUnitRecord
1544
1545 """
1546 # wait for the VDUR to enter a terminal state
1547 while vdu._state not in terminal:
1548 yield from asyncio.sleep(1, loop=self._loop)
1549
1550 # update the datastore
1551 datastore.update(vdu)
1552
1553 # add the VDU to the set of processed VDUs
1554 processed.add(vdu.vdu_id)
1555
1556 @asyncio.coroutine
1557 def instantiate(vdu):
1558 """Instantiate the specified VDU
1559
1560 Arguments:
1561 vdu - a VirtualDeploymentUnitRecord
1562
1563 Raises:
1564 if the VDU, or any of the VDUs this VDU depends upon, are
1565 terminated or fail to instantiate properly, a
1566 VirtualDeploymentUnitRecordError is raised.
1567
1568 """
1569 for dependency in dependencies[vdu.vdu_id]:
1570 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1571
1572 while dependency.vdu_id not in processed:
1573 yield from asyncio.sleep(1, loop=self._loop)
1574
1575 if not dependency.active:
1576 raise VirtualDeploymentUnitRecordError()
1577
1578 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1579
1580 # Populate the datastore with the current values of the VDU
1581 datastore.add(vdu)
1582
1583 # Substitute any variables contained in the cloud config script
1584 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1585
1586 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1587 if len(parts) > 1:
1588
1589 # Extract the variable names
1590 variables = list()
1591 for variable in parts[1::2]:
1592 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1593
1594 # Iterate of the variables and substitute values from the
1595 # datastore.
1596 for variable in variables:
1597
1598 # Handle a reference to a VDU by ID
1599 if variable.startswith('vdu['):
1600 value = datastore.get(variable)
1601 if value is None:
1602 msg = "Unable to find a substitute for {} in {} cloud-init script"
1603 raise ValueError(msg.format(variable, vdu.vdu_id))
1604
1605 config = config.replace("{{ %s }}" % variable, value)
1606 continue
1607
1608 # Handle a reference to the current VDU
1609 if variable.startswith('vdu'):
1610 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1611 config = config.replace("{{ %s }}" % variable, value)
1612 continue
1613
1614 # Handle unrecognized variables
1615 msg = 'unrecognized cloud-config variable: {}'
1616 raise ValueError(msg.format(variable))
1617
1618 # Instantiate the VDU
1619 with self._dts.transaction() as xact:
1620 self._log.debug("Instantiating vdu: %s", vdu)
1621 yield from vdu.instantiate(xact, vnfr, config=config)
1622 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1623 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1624 self.vnfr_id, vdu)
1625
1626 # First create a set of tasks to monitor the state of the VDUs and
1627 # report when they have entered a terminal state
1628 for vdu in self._vdus:
1629 self._loop.create_task(instantiate_monitor(vdu))
1630
1631 for vdu in self._vdus:
1632 self._loop.create_task(instantiate(vdu))
1633
1634 def has_mgmt_interface(self, vdu):
1635 # ## TODO: Support additional mgmt_interface type options
1636 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1637 return True
1638 return False
1639
1640 def vlr_xpath(self, vlr_id):
1641 """ vlr xpath """
1642 return(
1643 "D,/vlr:vlr-catalog/"
1644 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1645
1646 def ext_vlr_by_id(self, vlr_id):
1647 """ find ext vlr by id """
1648 return self._ext_vlrs[vlr_id]
1649
1650 @asyncio.coroutine
1651 def publish_inventory(self, xact):
1652 """ Publish the inventory associated with this VNF """
1653 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1654
1655 for component in self._rw_vnfd.component:
1656 self._log.debug("Creating inventory component %s", component)
1657 mangled_name = VcsComponent.mangle_name(component.component_name,
1658 self.vnf_name,
1659 self.vnfd_id
1660 )
1661 comp = VcsComponent(dts=self._dts,
1662 log=self._log,
1663 loop=self._loop,
1664 cluster_name=self._cluster_name,
1665 vcs_handler=self._vcs_handler,
1666 component=component,
1667 mangled_name=mangled_name,
1668 )
1669 if comp.name in self._inventory:
1670 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1671 component, self._vnfd_id)
1672 return
1673 self._log.debug("Adding component %s for vnrf %s",
1674 comp.name, self._vnfr_id)
1675 self._inventory[comp.name] = comp
1676 yield from comp.publish(xact)
1677
1678 def all_vdus_active(self):
1679 """ Are all VDUS in this VNFR active? """
1680 for vdu in self._vdus:
1681 if not vdu.active:
1682 return False
1683
1684 self._log.debug("Inside all_vdus_active. Returning True")
1685 return True
1686
1687 @asyncio.coroutine
1688 def instantiation_failed(self, failed_reason=None):
1689 """ VNFR instantiation failed """
1690 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1691 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1692 self._state_failed_reason = failed_reason
1693
1694 # Update the VNFR with the changed status
1695 yield from self.publish(None)
1696
1697 @asyncio.coroutine
1698 def is_ready(self):
1699 """ This VNF is ready"""
1700 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1701
1702 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1703 self.set_state(VirtualNetworkFunctionRecordState.READY)
1704
1705 else:
1706 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1707
1708 # Update the VNFR with the changed status
1709 yield from self.publish(None)
1710
1711 def update_cp(self, cp_name, ip_address, cp_id):
1712 """Updated the connection point with ip address"""
1713 for cp in self._cprs:
1714 if cp.name == cp_name:
1715 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1716 cp_name, cp, ip_address, cp_id)
1717 cp.ip_address = ip_address
1718 cp.connection_point_id = cp_id
1719 return
1720
1721 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1722 self._log.debug(err)
1723 raise VirtualDeploymentUnitRecordError(err)
1724
1725 def set_state(self, state):
1726 """ Set state for this VNFR"""
1727 self._state = state
1728
1729 @asyncio.coroutine
1730 def instantiate(self, xact, restart_mode=False):
1731 """ instantiate this VNF """
1732 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1733 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1734
1735 @asyncio.coroutine
1736 def fetch_vlrs():
1737 """ Fetch VLRs """
1738 # Iterate over all the connection points in VNFR and fetch the
1739 # associated VLRs
1740
1741 def cpr_from_cp(cp):
1742 """ Creates a record level connection point from the desciptor cp"""
1743 cp_fields = ["name", "image", "vm-flavor", "static_ip_address"]
1744 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1745 cpr_dict = {}
1746 cpr_dict.update(cp_copy_dict)
1747 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1748
1749 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1750 self._vnfr_id, self._vnfr.connection_point)
1751
1752 for cp in self._vnfr.connection_point:
1753 cpr = cpr_from_cp(cp)
1754 self._cprs.append(cpr)
1755 self._log.debug("Adding Connection point record %s ", cp)
1756
1757 vlr_path = self.vlr_xpath(cp.vlr_ref)
1758 self._log.debug("Fetching VLR with path = %s", vlr_path)
1759 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1760 rwdts.XactFlag.MERGE)
1761 for i in res_iter:
1762 r = yield from i
1763 d = r.result
1764 self._ext_vlrs[cp.vlr_ref] = d
1765 cpr.vlr_ref = cp.vlr_ref
1766 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1767
1768 # Increase the VNFD reference count
1769 self.vnfd_ref()
1770
1771 assert self.vnfd
1772
1773 # Fetch External VLRs
1774 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1775 yield from fetch_vlrs()
1776
1777 # Publish inventory
1778 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1779 yield from self.publish_inventory(xact)
1780
1781 # Publish inventory
1782 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1783 yield from self.create_vls()
1784
1785 # publish the VNFR
1786 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1787 yield from self.publish(xact)
1788
1789 # instantiate VLs
1790 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1791 try:
1792 yield from self.instantiate_vls(xact, restart_mode)
1793 except Exception as e:
1794 self._log.exception("VL instantiation failed (%s)", str(e))
1795 yield from self.instantiation_failed(str(e))
1796 return
1797
1798 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1799
1800 # instantiate VDUs
1801 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1802 yield from self.create_vdus(self, restart_mode)
1803
1804 # publish the VNFR
1805 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1806 yield from self.publish(xact)
1807
1808 # instantiate VDUs
1809 # ToDo: Check if this should be prevented during restart
1810 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1811 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1812
1813 # publish the VNFR
1814 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1815 yield from self.publish(xact)
1816
1817 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1818
1819 # create task updating uptime for this vnfr
1820 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1821 self._loop.create_task(self.vnfr_uptime_update(xact))
1822
1823 @asyncio.coroutine
1824 def terminate(self, xact):
1825 """ Terminate this virtual network function """
1826
1827 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1828
1829 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1830
1831 # stop monitoring
1832 if self._vnf_mon is not None:
1833 self._vnf_mon.stop()
1834 self._vnf_mon.deregister()
1835 self._vnf_mon = None
1836
1837 @asyncio.coroutine
1838 def terminate_vls():
1839 """ Terminate VLs in this VNF """
1840 for vl in self._vlrs:
1841 yield from vl.terminate(xact)
1842
1843 @asyncio.coroutine
1844 def terminate_vdus():
1845 """ Terminate VDUS in this VNF """
1846 for vdu in self._vdus:
1847 yield from vdu.terminate(xact)
1848
1849 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1850 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1851 yield from terminate_vls()
1852
1853 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1854 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1855 yield from terminate_vdus()
1856
1857 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1858 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1859
1860 @asyncio.coroutine
1861 def vnfr_uptime_update(self, xact):
1862 while True:
1863 # Return when vnfr state is FAILED or TERMINATED etc
1864 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1865 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1866 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1867 VirtualNetworkFunctionRecordState.READY]:
1868 return
1869 yield from self.publish(xact)
1870 yield from asyncio.sleep(2, loop=self._loop)
1871
1872
1873
1874 class VnfdDtsHandler(object):
1875 """ DTS handler for VNFD config changes """
1876 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1877
1878 def __init__(self, dts, log, loop, vnfm):
1879 self._dts = dts
1880 self._log = log
1881 self._loop = loop
1882 self._vnfm = vnfm
1883 self._regh = None
1884
1885 @asyncio.coroutine
1886 def regh(self):
1887 """ DTS registration handle """
1888 return self._regh
1889
1890 @asyncio.coroutine
1891 def register(self):
1892 """ Register for VNFD configuration"""
1893
1894 def on_apply(dts, acg, xact, action, scratch):
1895 """Apply the configuration"""
1896 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1897 xact, action, scratch)
1898
1899 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1900
1901 @asyncio.coroutine
1902 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1903 """ on prepare callback """
1904 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1905 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1906 fref = ProtobufC.FieldReference.alloc()
1907 fref.goto_whole_message(msg.to_pbcm())
1908
1909 # Handle deletes in prepare_callback
1910 if fref.is_field_deleted():
1911 # Delete an VNFD record
1912 self._log.debug("Deleting VNFD with id %s", msg.id)
1913 if self._vnfm.vnfd_in_use(msg.id):
1914 self._log.debug("Cannot delete VNFD in use - %s", msg)
1915 err = "Cannot delete a VNFD in use - %s" % msg
1916 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1917 # Delete a VNFD record
1918 yield from self._vnfm.delete_vnfd(msg.id)
1919
1920 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1921
1922 self._log.debug(
1923 "Registering for VNFD config using xpath: %s",
1924 VnfdDtsHandler.XPATH,
1925 )
1926 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1927 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1928 self._regh = acg.register(
1929 xpath=VnfdDtsHandler.XPATH,
1930 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1931 on_prepare=on_prepare)
1932
1933
1934 class VcsComponentDtsHandler(object):
1935 """ Vcs Component DTS handler """
1936 XPATH = ("D,/rw-manifest:manifest" +
1937 "/rw-manifest:operational-inventory" +
1938 "/rw-manifest:component")
1939
1940 def __init__(self, dts, log, loop, vnfm):
1941 self._dts = dts
1942 self._log = log
1943 self._loop = loop
1944 self._regh = None
1945 self._vnfm = vnfm
1946
1947 @property
1948 def regh(self):
1949 """ DTS registration handle """
1950 return self._regh
1951
1952 @asyncio.coroutine
1953 def register(self):
1954 """ Registers VCS component dts publisher registration"""
1955 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1956 VcsComponentDtsHandler.XPATH)
1957
1958 hdl = rift.tasklets.DTS.RegistrationHandler()
1959 handlers = rift.tasklets.Group.Handler()
1960 with self._dts.group_create(handler=handlers) as group:
1961 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1962 handler=hdl,
1963 flags=(rwdts.Flag.PUBLISHER |
1964 rwdts.Flag.NO_PREP_READ |
1965 rwdts.Flag.DATASTORE),)
1966
1967 @asyncio.coroutine
1968 def publish(self, xact, path, msg):
1969 """ Publishes the VCS component """
1970 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1971 xact, path, msg)
1972 self.regh.create_element(path, msg)
1973 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1974 VcsComponentDtsHandler.XPATH, xact, path, msg)
1975
1976 class VnfrConsoleOperdataDtsHandler(object):
1977 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1978 @property
1979 def vnfr_vdu_console_xpath(self):
1980 """ path for resource-mgr"""
1981 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1982
1983 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1984 self._dts = dts
1985 self._log = log
1986 self._loop = loop
1987 self._regh = None
1988 self._vnfm = vnfm
1989
1990 self._vnfr_id = vnfr_id
1991 self._vdur_id = vdur_id
1992 self._vdu_id = vdu_id
1993
1994 @asyncio.coroutine
1995 def register(self):
1996 """ Register for VNFR VDU Operational Data read from dts """
1997
1998 @asyncio.coroutine
1999 def on_prepare(xact_info, action, ks_path, msg):
2000 """ prepare callback from dts """
2001 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2002 self._log.debug(
2003 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2004 xact_info, action, xpath, msg
2005 )
2006
2007 if action == rwdts.QueryAction.READ:
2008 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2009 path_entry = schema.keyspec_to_entry(ks_path)
2010 self._log.debug("VDU Opdata path is {}".format(path_entry))
2011 try:
2012 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2013 except VnfRecordError as e:
2014 self._log.error("VNFR id %s not found", self._vnfr_id)
2015 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2016 return
2017 try:
2018 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2019 if not vdur._state == VDURecordState.READY:
2020 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2021 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2022 return
2023 with self._dts.transaction() as new_xact:
2024 resp = yield from vdur.read_resource(new_xact)
2025 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2026 vdur_console.id = self._vdur_id
2027 if resp.console_url:
2028 vdur_console.console_url = resp.console_url
2029 else:
2030 vdur_console.console_url = 'none'
2031 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2032 except Exception:
2033 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2034 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2035 vdur_console.id = self._vdur_id
2036 vdur_console.console_url = 'none'
2037
2038 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2039 xpath=self.vnfr_vdu_console_xpath,
2040 msg=vdur_console)
2041 else:
2042 #raise VnfRecordError("Not supported operation %s" % action)
2043 self._log.error("Not supported operation %s" % action)
2044 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2045 return
2046
2047
2048 self._log.debug("Registering for VNFR VDU using xpath: %s",
2049 self.vnfr_vdu_console_xpath)
2050 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2051 with self._dts.group_create() as group:
2052 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2053 handler=hdl,
2054 flags=rwdts.Flag.PUBLISHER,
2055 )
2056
2057
2058 class VnfrDtsHandler(object):
2059 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2060 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2061
2062 def __init__(self, dts, log, loop, vnfm):
2063 self._dts = dts
2064 self._log = log
2065 self._loop = loop
2066 self._vnfm = vnfm
2067
2068 self._regh = None
2069
2070 @property
2071 def regh(self):
2072 """ Return registration handle"""
2073 return self._regh
2074
2075 @property
2076 def vnfm(self):
2077 """ Return VNF manager instance """
2078 return self._vnfm
2079
2080 @asyncio.coroutine
2081 def register(self):
2082 """ Register for vnfr create/update/delete/read requests from dts """
2083 def on_commit(xact_info):
2084 """ The transaction has been committed """
2085 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2086 return rwdts.MemberRspCode.ACTION_OK
2087
2088 def on_abort(*args):
2089 """ Abort callback """
2090 self._log.debug("VNF transaction got aborted")
2091
2092 @asyncio.coroutine
2093 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2094
2095 @asyncio.coroutine
2096 def instantiate_realloc_vnfr(vnfr):
2097 """Re-populate the vnfm after restart
2098
2099 Arguments:
2100 vlink
2101
2102 """
2103
2104 yield from vnfr.instantiate(None, restart_mode=True)
2105
2106 if xact_event == rwdts.MemberEvent.INSTALL:
2107 curr_cfg = self.regh.elements
2108 for cfg in curr_cfg:
2109 vnfr = self.vnfm.create_vnfr(cfg)
2110 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2111
2112 self._log.debug("Got on_event in vnfm")
2113
2114 return rwdts.MemberRspCode.ACTION_OK
2115
2116 @asyncio.coroutine
2117 def on_prepare(xact_info, action, ks_path, msg):
2118 """ prepare callback from dts """
2119 self._log.debug(
2120 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2121 xact_info, action, msg
2122 )
2123
2124 if action == rwdts.QueryAction.CREATE:
2125 if not msg.has_field("vnfd"):
2126 err = "Vnfd not provided"
2127 self._log.error(err)
2128 raise VnfRecordError(err)
2129
2130 vnfr = self.vnfm.create_vnfr(msg)
2131 try:
2132 # RIFT-9105: Unable to add a READ query under an existing transaction
2133 # xact = xact_info.xact
2134 yield from vnfr.instantiate(None)
2135 except Exception as e:
2136 self._log.exception(e)
2137 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2138 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2139 yield from vnfr.publish(None)
2140 elif action == rwdts.QueryAction.DELETE:
2141 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2142 path_entry = schema.keyspec_to_entry(ks_path)
2143 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2144
2145 if vnfr is None:
2146 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2147 raise VirtualNetworkFunctionRecordNotFound(
2148 "VNFR id %s", path_entry.key00.id)
2149
2150 try:
2151 yield from vnfr.terminate(xact_info.xact)
2152 # Unref the VNFD
2153 vnfr.vnfd_unref()
2154 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2155 except Exception as e:
2156 self._log.exception(e)
2157 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2158
2159 elif action == rwdts.QueryAction.UPDATE:
2160 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2161 path_entry = schema.keyspec_to_entry(ks_path)
2162 vnfr = None
2163 try:
2164 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2165 except Exception as e:
2166 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2167 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2168 return
2169
2170 if vnfr is None:
2171 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2172 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2173 return
2174
2175 self._log.debug("VNFR {} update config status {} (current {})".
2176 format(vnfr.name, msg.config_status, vnfr.config_status))
2177 # Update the config status and publish
2178 vnfr._config_status = msg.config_status
2179 yield from vnfr.publish(None)
2180
2181 else:
2182 raise NotImplementedError(
2183 "%s action on VirtualNetworkFunctionRecord not supported",
2184 action)
2185
2186 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2187
2188 self._log.debug("Registering for VNFR using xpath: %s",
2189 VnfrDtsHandler.XPATH,)
2190
2191 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2192 on_prepare=on_prepare,)
2193 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2194 with self._dts.group_create(handler=handlers) as group:
2195 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2196 handler=hdl,
2197 flags=(rwdts.Flag.PUBLISHER |
2198 rwdts.Flag.NO_PREP_READ |
2199 rwdts.Flag.CACHE |
2200 rwdts.Flag.DATASTORE),)
2201
2202 @asyncio.coroutine
2203 def create(self, xact, path, msg):
2204 """
2205 Create a VNFR record in DTS with path and message
2206 """
2207 self._log.debug("Creating VNFR xact = %s, %s:%s",
2208 xact, path, msg)
2209
2210 self.regh.create_element(path, msg)
2211 self._log.debug("Created VNFR xact = %s, %s:%s",
2212 xact, path, msg)
2213
2214 @asyncio.coroutine
2215 def update(self, xact, path, msg):
2216 """
2217 Update a VNFR record in DTS with path and message
2218 """
2219 self._log.debug("Updating VNFR xact = %s, %s:%s",
2220 xact, path, msg)
2221 self.regh.update_element(path, msg)
2222 self._log.debug("Updated VNFR xact = %s, %s:%s",
2223 xact, path, msg)
2224
2225 @asyncio.coroutine
2226 def delete(self, xact, path):
2227 """
2228 Delete a VNFR record in DTS with path and message
2229 """
2230 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2231 self.regh.delete_element(path)
2232 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2233
2234
2235 class VnfdRefCountDtsHandler(object):
2236 """ The VNFD Ref Count DTS handler """
2237 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2238
2239 def __init__(self, dts, log, loop, vnfm):
2240 self._dts = dts
2241 self._log = log
2242 self._loop = loop
2243 self._vnfm = vnfm
2244
2245 self._regh = None
2246
2247 @property
2248 def regh(self):
2249 """ Return registration handle """
2250 return self._regh
2251
2252 @property
2253 def vnfm(self):
2254 """ Return the NS manager instance """
2255 return self._vnfm
2256
2257 @asyncio.coroutine
2258 def register(self):
2259 """ Register for VNFD ref count read from dts """
2260
2261 @asyncio.coroutine
2262 def on_prepare(xact_info, action, ks_path, msg):
2263 """ prepare callback from dts """
2264 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2265 self._log.debug(
2266 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2267 xact_info, action, xpath, msg
2268 )
2269
2270 if action == rwdts.QueryAction.READ:
2271 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2272 path_entry = schema.keyspec_to_entry(ks_path)
2273 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2274 for xpath, msg in vnfd_list:
2275 self._log.debug("Responding to ref count query path:%s, msg:%s",
2276 xpath, msg)
2277 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2278 xpath=xpath,
2279 msg=msg)
2280 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2281 else:
2282 raise VnfRecordError("Not supported operation %s" % action)
2283
2284 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2285 with self._dts.group_create() as group:
2286 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2287 handler=hdl,
2288 flags=rwdts.Flag.PUBLISHER,
2289 )
2290
2291
2292 class VdurDatastore(object):
2293 """
2294 This VdurDatastore is intended to expose select information about a VDUR
2295 such that it can be referenced in a cloud config file. The data that is
2296 exposed does not necessarily follow the structure of the data in the yang
2297 model. This is intentional. The data that are exposed are intended to be
2298 agnostic of the yang model so that changes in the model do not necessarily
2299 require changes to the interface provided to the user. It also means that
2300 the user does not need to be familiar with the RIFT.ware yang models.
2301 """
2302
2303 def __init__(self):
2304 """Create an instance of VdurDatastore"""
2305 self._vdur_data = dict()
2306 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2307
2308 def add(self, vdur):
2309 """Add a new VDUR to the datastore
2310
2311 Arguments:
2312 vdur - a VirtualDeploymentUnitRecord instance
2313
2314 Raises:
2315 A ValueError is raised if the VDUR is (1) None or (2) already in
2316 the datastore.
2317
2318 """
2319 if vdur.vdu_id is None:
2320 raise ValueError('VDURs are required to have an ID')
2321
2322 if vdur.vdu_id in self._vdur_data:
2323 raise ValueError('cannot add a VDUR more than once')
2324
2325 self._vdur_data[vdur.vdu_id] = dict()
2326
2327 def set_if_not_none(key, attr):
2328 if attr is not None:
2329 self._vdur_data[vdur.vdu_id][key] = attr
2330
2331 set_if_not_none('name', vdur._vdud.name)
2332 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2333
2334 def update(self, vdur):
2335 """Update the VDUR information in the datastore
2336
2337 Arguments:
2338 vdur - a GI representation of a VDUR
2339
2340 Raises:
2341 A ValueError is raised if the VDUR is (1) None or (2) already in
2342 the datastore.
2343
2344 """
2345 if vdur.vdu_id is None:
2346 raise ValueError('VNFDs are required to have an ID')
2347
2348 if vdur.vdu_id not in self._vdur_data:
2349 raise ValueError('VNF is not recognized')
2350
2351 def set_or_delete(key, attr):
2352 if attr is None:
2353 if key in self._vdur_data[vdur.vdu_id]:
2354 del self._vdur_data[vdur.vdu_id][key]
2355
2356 else:
2357 self._vdur_data[vdur.vdu_id][key] = attr
2358
2359 set_or_delete('name', vdur._vdud.name)
2360 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2361
2362 def remove(self, vdur_id):
2363 """Remove all of the data associated with specified VDUR
2364
2365 Arguments:
2366 vdur_id - the identifier of a VNFD in the datastore
2367
2368 Raises:
2369 A ValueError is raised if the VDUR is not contained in the
2370 datastore.
2371
2372 """
2373 if vdur_id not in self._vdur_data:
2374 raise ValueError('VNF is not recognized')
2375
2376 del self._vdur_data[vdur_id]
2377
2378 def get(self, expr):
2379 """Retrieve VDUR information from the datastore
2380
2381 An expression should be of the form,
2382
2383 vdu[<id>].<attr>
2384
2385 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2386 the exposed attribute that the user wishes to retrieve.
2387
2388 If the requested data is not available, None is returned.
2389
2390 Arguments:
2391 expr - a string that specifies the data to return
2392
2393 Raises:
2394 A ValueError is raised if the provided expression cannot be parsed.
2395
2396 Returns:
2397 The requested data or None
2398
2399 """
2400 result = self._pattern.match(expr)
2401 if result is None:
2402 raise ValueError('data expression not recognized ({})'.format(expr))
2403
2404 vdur_id, key = result.groups()
2405
2406 if vdur_id not in self._vdur_data:
2407 return None
2408
2409 return self._vdur_data[vdur_id].get(key, None)
2410
2411
2412 class VnfManager(object):
2413 """ The virtual network function manager class """
2414 def __init__(self, dts, log, loop, cluster_name):
2415 self._dts = dts
2416 self._log = log
2417 self._loop = loop
2418 self._cluster_name = cluster_name
2419
2420 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2421 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2422 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2423 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2424
2425 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2426 self._vnfr_handler,
2427 self._vcs_handler,
2428 self._vnfr_ref_handler,
2429 self._nsr_handler]
2430 self._vnfrs = {}
2431 self._vnfds_to_vnfr = {}
2432 self._nsrs = {}
2433
2434 @property
2435 def vnfr_handler(self):
2436 """ VNFR dts handler """
2437 return self._vnfr_handler
2438
2439 @property
2440 def vcs_handler(self):
2441 """ VCS dts handler """
2442 return self._vcs_handler
2443
2444 @asyncio.coroutine
2445 def register(self):
2446 """ Register all static DTS handlers """
2447 for hdl in self._dts_handlers:
2448 yield from hdl.register()
2449
2450 @asyncio.coroutine
2451 def run(self):
2452 """ Run this VNFM instance """
2453 self._log.debug("Run VNFManager - registering static DTS handlers""")
2454 yield from self.register()
2455
2456 def handle_nsr(self, nsr, action):
2457 if action in [rwdts.QueryAction.CREATE]:
2458 self._nsrs[nsr.id] = nsr
2459 elif action == rwdts.QueryAction.DELETE:
2460 if nsr.id in self._nsrs:
2461 del self._nsrs[nsr.id]
2462
2463 def get_linked_mgmt_network(self, vnfr):
2464 """For the given VNFR get the related mgmt network from the NSD, if
2465 available.
2466 """
2467 vnfd_id = vnfr.vnfd.id
2468 nsr_id = vnfr.nsr_id_ref
2469
2470 # for the given related VNFR, get the corresponding NSR-config
2471 nsr_obj = None
2472 try:
2473 nsr_obj = self._nsrs[nsr_id]
2474 except KeyError:
2475 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2476
2477 # for the related NSD check if a VLD exists such that it's a mgmt
2478 # network
2479 for vld in nsr_obj.nsd.vld:
2480 if vld.mgmt_network:
2481 return vld.name
2482
2483 return None
2484
2485 def get_vnfr(self, vnfr_id):
2486 """ get VNFR by vnfr id """
2487
2488 if vnfr_id not in self._vnfrs:
2489 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2490
2491 return self._vnfrs[vnfr_id]
2492
2493 def create_vnfr(self, vnfr):
2494 """ Create a VNFR instance """
2495 if vnfr.id in self._vnfrs:
2496 msg = "Vnfr id %s already exists" % vnfr.id
2497 self._log.error(msg)
2498 raise VnfRecordError(msg)
2499
2500 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2501 vnfr.id,
2502 vnfr.vnfd.id)
2503
2504 mgmt_network = self.get_linked_mgmt_network(vnfr)
2505
2506 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2507 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2508 mgmt_network=mgmt_network
2509 )
2510 self._vnfds_to_vnfr[vnfr.vnfd.id] = self._vnfrs[vnfr.id]
2511 return self._vnfrs[vnfr.id]
2512
2513 @asyncio.coroutine
2514 def delete_vnfr(self, xact, vnfr):
2515 """ Create a VNFR instance """
2516 if vnfr.vnfr_id in self._vnfrs:
2517 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2518 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2519 del self._vnfrs[vnfr.vnfr_id]
2520
2521 @asyncio.coroutine
2522 def fetch_vnfd(self, vnfd_id):
2523 """ Fetch VNFDs based with the vnfd id"""
2524 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2525 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2526 vnfd = None
2527
2528 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2529
2530 for ent in res_iter:
2531 res = yield from ent
2532 vnfd = res.result
2533
2534 if vnfd is None:
2535 err = "Failed to get Vnfd %s" % vnfd_id
2536 self._log.error(err)
2537 raise VnfRecordError(err)
2538
2539 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2540
2541 return vnfd
2542
2543 def vnfd_in_use(self, vnfd_id):
2544 """ Is this VNFD in use """
2545 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2546 if vnfd_id in self._vnfds_to_vnfr:
2547 return self._vnfds_to_vnfr[vnfd_id].in_use()
2548 return False
2549
2550 @asyncio.coroutine
2551 def publish_vnfr(self, xact, path, msg):
2552 """ Publish a VNFR """
2553 self._log.debug("publish_vnfr called with path %s, msg %s",
2554 path, msg)
2555 yield from self.vnfr_handler.update(xact, path, msg)
2556
2557 @asyncio.coroutine
2558 def delete_vnfd(self, vnfd_id):
2559 """ Delete the Virtual Network Function descriptor with the passed id """
2560 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2561 if vnfd_id not in self._vnfds_to_vnfr:
2562 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2563 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2564
2565 if self._vnfds_to_vnfr[vnfd_id].in_use():
2566 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2567 vnfd_id,
2568 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2569 raise VirtualNetworkFunctionDescriptorRefCountExists(
2570 "Cannot delete :%s, ref_count:%s",
2571 vnfd_id,
2572 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2573
2574 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2575 try:
2576 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2577 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2578 if os.path.exists(vnfd_dir):
2579 shutil.rmtree(vnfd_dir, ignore_errors=True)
2580 except Exception as e:
2581 self._log.error("Exception in cleaning up VNFD {}: {}".
2582 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2583 self._log.exception(e)
2584
2585 del self._vnfds_to_vnfr[vnfd_id]
2586
2587 def vnfd_refcount_xpath(self, vnfd_id):
2588 """ xpath for ref count entry """
2589 return (VnfdRefCountDtsHandler.XPATH +
2590 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2591
2592 @asyncio.coroutine
2593 def get_vnfd_refcount(self, vnfd_id):
2594 """ Get the vnfd_list from this VNFM"""
2595 vnfd_list = []
2596 if vnfd_id is None or vnfd_id == "":
2597 for vnfr in self._vnfds_to_vnfr.values():
2598 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2599 vnfd_msg.vnfd_id_ref = vnfr.vnfd.id
2600 vnfd_msg.instance_ref_count = vnfr.vnfd_ref_count
2601 vnfd_list.append((self.vnfd_refcount_xpath(vnfr.vnfd.id), vnfd_msg))
2602 elif vnfd_id in self._vnfds_to_vnfr:
2603 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2604 vnfd_msg.vnfd_id_ref = self._vnfds_to_vnfr[vnfd_id].vnfd.id
2605 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count
2606 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2607
2608 return vnfd_list
2609
2610
2611 class VnfmTasklet(rift.tasklets.Tasklet):
2612 """ VNF Manager tasklet class """
2613 def __init__(self, *args, **kwargs):
2614 super(VnfmTasklet, self).__init__(*args, **kwargs)
2615 self.rwlog.set_category("rw-mano-log")
2616 self.rwlog.set_subcategory("vnfm")
2617
2618 self._dts = None
2619 self._vnfm = None
2620
2621 def start(self):
2622 try:
2623 super(VnfmTasklet, self).start()
2624 self.log.info("Starting VnfmTasklet")
2625
2626 self.log.setLevel(logging.DEBUG)
2627
2628 self.log.debug("Registering with dts")
2629 self._dts = rift.tasklets.DTS(self.tasklet_info,
2630 RwVnfmYang.get_schema(),
2631 self.loop,
2632 self.on_dts_state_change)
2633
2634 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2635 except Exception:
2636 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2637 raise
2638
2639 def on_instance_started(self):
2640 """ Task insance started callback """
2641 self.log.debug("Got instance started callback")
2642
2643 def stop(self):
2644 try:
2645 self._dts.deinit()
2646 except Exception:
2647 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2648 raise
2649
2650 @asyncio.coroutine
2651 def init(self):
2652 """ Task init callback """
2653 try:
2654 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2655 assert vm_parent_name is not None
2656 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2657 yield from self._vnfm.run()
2658 except Exception:
2659 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2660 raise
2661
2662 @asyncio.coroutine
2663 def run(self):
2664 """ Task run callback """
2665 pass
2666
2667 @asyncio.coroutine
2668 def on_dts_state_change(self, state):
2669 """Take action according to current dts state to transition
2670 application into the corresponding application state
2671
2672 Arguments
2673 state - current dts state
2674 """
2675 switch = {
2676 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2677 rwdts.State.CONFIG: rwdts.State.RUN,
2678 }
2679
2680 handlers = {
2681 rwdts.State.INIT: self.init,
2682 rwdts.State.RUN: self.run,
2683 }
2684
2685 # Transition application to next state
2686 handler = handlers.get(state, None)
2687 if handler is not None:
2688 yield from handler()
2689
2690 # Transition dts to next state
2691 next_state = switch.get(state, None)
2692 if next_state is not None:
2693 self._dts.handle.set_state(next_state)