SO Multidisk changes
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.mano.dts as mano_dts
53
54
55 class VMResourceError(Exception):
56 """ VM resource Error"""
57 pass
58
59
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
62 pass
63
64
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
67 pass
68
69
70 class NotImplemented(Exception):
71 """Not implemented """
72 pass
73
74
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
77 pass
78
79
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
82 pass
83
84
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
87 pass
88
89
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
92 pass
93
94
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
97 pass
98
99
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
102 pass
103
104
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
107 pass
108
109
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
117 pass
118
119
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
127 pass
128
129
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
132 pass
133
134
135 class VNFMPlacementGroupError(Exception):
136 pass
137
138 class VirtualNetworkFunctionRecordState(enum.Enum):
139 """ VNFR state """
140 INIT = 1
141 VL_INIT_PHASE = 2
142 VM_INIT_PHASE = 3
143 READY = 4
144 TERMINATE = 5
145 VL_TERMINATE_PHASE = 6
146 VDU_TERMINATE_PHASE = 7
147 TERMINATED = 7
148 FAILED = 10
149
150
151 class VDURecordState(enum.Enum):
152 """VDU record state """
153 INIT = 1
154 INSTANTIATING = 2
155 RESOURCE_ALLOC_PENDING = 3
156 READY = 4
157 TERMINATING = 5
158 TERMINATED = 6
159 FAILED = 10
160
161
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
165 self._dts = dts
166 self._log = log
167 self._loop = loop
168 self._component = component
169 self._cluster_name = cluster_name
170 self._vcs_handler = vcs_handler
171 self._mangled_name = mangled_name
172
173 @staticmethod
174 def mangle_name(component_name, vnf_name, vnfd_id):
175 """ mangled component name """
176 return vnf_name + ":" + component_name + ":" + vnfd_id
177
178 @property
179 def name(self):
180 """ name of this component"""
181 return self._mangled_name
182
183 @property
184 def path(self):
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self.name)
190
191 @property
192 def instance_xpath(self):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
195 "/instances" +
196 "/instance" +
197 "[instance-name = '{}']".format(self._cluster_name))
198
199 @property
200 def start_comp_xpath(self):
201 """ start component xpath """
202 return (self.instance_xpath +
203 "/child-n[instance-name = 'START-REQ']")
204
205 def get_start_comp_msg(self, ip_address):
206 """ start this component """
207 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
208 start_msg.instance_name = 'START-REQ'
209 start_msg.component_name = self.name
210 start_msg.admin_command = "START"
211 start_msg.ip_address = ip_address
212
213 return start_msg
214
215 @property
216 def msg(self):
217 """ Returns the message for this vcs component"""
218
219 vcs_comp_dict = self._component.as_dict()
220
221 def mangle_comp_names(comp_dict):
222 """ mangle component name with VNF name, id"""
223 for key, val in comp_dict.items():
224 if isinstance(val, dict):
225 comp_dict[key] = mangle_comp_names(val)
226 elif isinstance(val, list):
227 i = 0
228 for ent in val:
229 if isinstance(ent, dict):
230 val[i] = mangle_comp_names(ent)
231 else:
232 val[i] = ent
233 i += 1
234 elif key == "component_name":
235 comp_dict[key] = VcsComponent.mangle_name(val,
236 self._vnfd_name,
237 self._vnfd_id)
238 return comp_dict
239
240 mangled_dict = mangle_comp_names(vcs_comp_dict)
241 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
242 return msg
243
244 @asyncio.coroutine
245 def publish(self, xact):
246 """ Publishes the VCS component """
247 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self.name, self.path, self.msg)
249 yield from self._vcs_handler.publish(xact, self.path, self.msg)
250
251 @asyncio.coroutine
252 def start(self, xact, parent, ip_addr=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg = self.get_start_comp_msg(ip_addr)
256 self._log.debug("starting component %s %s",
257 self.start_comp_xpath, start_msg)
258 yield from self._dts.query_create(self.start_comp_xpath,
259 0,
260 start_msg)
261 self._log.debug("started component %s, %s",
262 self.start_comp_xpath, start_msg)
263
264
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
267 def __init__(self,
268 dts,
269 log,
270 loop,
271 vdud,
272 vnfr,
273 mgmt_intf,
274 mgmt_network,
275 cloud_account_name,
276 vnfd_package_store,
277 vdur_id=None,
278 placement_groups=[]):
279 self._dts = dts
280 self._log = log
281 self._loop = loop
282 self._vdud = vdud
283 self._vnfr = vnfr
284 self._mgmt_intf = mgmt_intf
285 self._cloud_account_name = cloud_account_name
286 self._vnfd_package_store = vnfd_package_store
287 self._mgmt_network = mgmt_network
288
289 self._vdur_id = vdur_id or str(uuid.uuid4())
290 self._int_intf = []
291 self._ext_intf = []
292 self._state = VDURecordState.INIT
293 self._state_failed_reason = None
294 self._request_id = str(uuid.uuid4())
295 self._name = vnfr.name + "__" + vdud.id
296 self._placement_groups = placement_groups
297 self._rm_regh = None
298 self._vm_resp = None
299 self._vdud_cloud_init = None
300 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
301
302 @asyncio.coroutine
303 def vdu_opdata_register(self):
304 yield from self._vdur_console_handler.register()
305
306 def cp_ip_addr(self, cp_name):
307 """ Find ip address by connection point name """
308 if self._vm_resp is not None:
309 for conn_point in self._vm_resp.connection_points:
310 if conn_point.name == cp_name:
311 return conn_point.ip_address
312 return "0.0.0.0"
313
314 def cp_id(self, cp_name):
315 """ Find connection point id by connection point name """
316 if self._vm_resp is not None:
317 for conn_point in self._vm_resp.connection_points:
318 if conn_point.name == cp_name:
319 return conn_point.connection_point_id
320 return ''
321
322 @property
323 def vdu_id(self):
324 return self._vdud.id
325
326 @property
327 def vm_resp(self):
328 return self._vm_resp
329
330 @property
331 def name(self):
332 """ Return this VDUR's name """
333 return self._name
334
335 @property
336 def cloud_account_name(self):
337 """ Cloud account this VDU should be created in """
338 return self._cloud_account_name
339
340 @property
341 def image_name(self):
342 """ name that should be used to lookup the image on the CMP """
343 if 'image' not in self._vdud:
344 return None
345 return os.path.basename(self._vdud.image)
346
347 @property
348 def image_checksum(self):
349 """ name that should be used to lookup the image on the CMP """
350 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
351
352 @property
353 def management_ip(self):
354 if not self.active:
355 return None
356 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
357
358 @property
359 def vm_management_ip(self):
360 if not self.active:
361 return None
362 return self._vm_resp.management_ip
363
364 @property
365 def operational_status(self):
366 """ Operational status of this VDU"""
367 op_stats_dict = {"INIT": "init",
368 "INSTANTIATING": "vm_init_phase",
369 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
370 "READY": "running",
371 "FAILED": "failed",
372 "TERMINATING": "terminated",
373 "TERMINATED": "terminated",
374 }
375 return op_stats_dict[self._state.name]
376
377 @property
378 def msg(self):
379 """ VDU message """
380 vdu_fields = ["vm_flavor",
381 "guest_epa",
382 "vswitch_epa",
383 "hypervisor_epa",
384 "host_epa",
385 "volumes",
386 "name"]
387 vdu_copy_dict = {k: v for k, v in
388 self._vdud.as_dict().items() if k in vdu_fields}
389 vdur_dict = {"id": self._vdur_id,
390 "vdu_id_ref": self._vdud.id,
391 "operational_status": self.operational_status,
392 "operational_status_details": self._state_failed_reason,
393 }
394 if self.vm_resp is not None:
395 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
396 "flavor_id": self.vm_resp.flavor_id
397 })
398 if self._vm_resp.has_field('image_id'):
399 vdur_dict.update({ "image_id": self.vm_resp.image_id })
400
401 if self.management_ip is not None:
402 vdur_dict["management_ip"] = self.management_ip
403
404 if self.vm_management_ip is not None:
405 vdur_dict["vm_management_ip"] = self.vm_management_ip
406
407 vdur_dict.update(vdu_copy_dict)
408
409 if self.vm_resp is not None:
410 if self._vm_resp.has_field('volumes'):
411 for opvolume in self._vm_resp.volumes:
412 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
413 if len(vdurvol_data) == 1:
414 vdurvol_data[0]["volume_id"] = opvolume.volume_id
415
416 icp_list = []
417 ii_list = []
418
419 for intf, cp_id, vlr in self._int_intf:
420 cp = self.find_internal_cp_by_cp_id(cp_id)
421
422 icp_list.append({"name": cp.name,
423 "id": cp.id,
424 "type_yang": "VPORT",
425 "ip_address": self.cp_ip_addr(cp.id)})
426
427 ii_list.append({"name": intf.name,
428 "vdur_internal_connection_point_ref": cp.id,
429 "virtual_interface": {}})
430
431 vdur_dict["internal_connection_point"] = icp_list
432 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
433 vdur_dict["internal_interface"] = ii_list
434
435 ei_list = []
436 for intf, cp, vlr in self._ext_intf:
437 ei_list.append({"name": cp,
438 "vnfd_connection_point_ref": cp,
439 "virtual_interface": {}})
440 self._vnfr.update_cp(cp, self.cp_ip_addr(cp), self.cp_id(cp))
441
442 vdur_dict["external_interface"] = ei_list
443
444 placement_groups = []
445 for group in self._placement_groups:
446 placement_groups.append(group.as_dict())
447 vdur_dict['placement_groups_info'] = placement_groups
448
449 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
450
451 @property
452 def resmgr_path(self):
453 """ path for resource-mgr"""
454 return ("D,/rw-resource-mgr:resource-mgmt" +
455 "/vdu-event" +
456 "/vdu-event-data[event-id='{}']".format(self._request_id))
457
458 @property
459 def vm_flavor_msg(self):
460 """ VM flavor message """
461 flavor = self._vdud.vm_flavor.__class__()
462 flavor.copy_from(self._vdud.vm_flavor)
463
464 return flavor
465
466 @property
467 def vdud_cloud_init(self):
468 """ Return the cloud-init contents for the VDU """
469 if self._vdud_cloud_init is None:
470 self._vdud_cloud_init = self.cloud_init()
471
472 return self._vdud_cloud_init
473
474 def cloud_init(self):
475 """ Populate cloud_init with cloud-config script from
476 either the inline contents or from the file provided
477 """
478 if self._vdud.cloud_init is not None:
479 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
480 return self._vdud.cloud_init
481 elif self._vdud.cloud_init_file is not None:
482 # Get cloud-init script contents from the file provided in the cloud_init_file param
483 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
484 filename = self._vdud.cloud_init_file
485 self._vnfd_package_store.refresh()
486 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
487 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
488 try:
489 return cloud_init_extractor.read_script(stored_package, filename)
490 except rift.package.cloud_init.CloudInitExtractionError as e:
491 raise VirtualDeploymentUnitRecordError(e)
492 else:
493 self._log.debug("VDU Instantiation: cloud-init script not provided")
494
495 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
496 host_aggregates = []
497 availability_zones = []
498 server_groups = []
499 for group in self._placement_groups:
500 if group.has_field('host_aggregate'):
501 for aggregate in group.host_aggregate:
502 host_aggregates.append(aggregate.as_dict())
503 if group.has_field('availability_zone'):
504 availability_zones.append(group.availability_zone.as_dict())
505 if group.has_field('server_group'):
506 server_groups.append(group.server_group.as_dict())
507
508 if availability_zones:
509 if len(availability_zones) > 1:
510 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
511 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
512 else:
513 vm_create_msg_dict['availability_zone'] = availability_zones[0]
514
515 if server_groups:
516 if len(server_groups) > 1:
517 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
518 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
519 else:
520 vm_create_msg_dict['server_group'] = server_groups[0]
521
522 if host_aggregates:
523 vm_create_msg_dict['host_aggregate'] = host_aggregates
524
525 return
526
527 def process_placement_groups(self, vm_create_msg_dict):
528 """Process the placement_groups and fill resource-mgr request"""
529 if not self._placement_groups:
530 return
531
532 cloud_set = set([group.cloud_type for group in self._placement_groups])
533 assert len(cloud_set) == 1
534 cloud_type = cloud_set.pop()
535
536 if cloud_type == 'openstack':
537 self.process_openstack_placement_group_construct(vm_create_msg_dict)
538
539 else:
540 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
541 return
542
543 def resmgr_msg(self, config=None):
544 vdu_fields = ["vm_flavor",
545 "guest_epa",
546 "vswitch_epa",
547 "hypervisor_epa",
548 "host_epa"]
549
550 self._log.debug("Creating params based on VDUD: %s", self._vdud)
551 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
552
553 vm_create_msg_dict = {
554 "name": self.name,
555 }
556
557 if self.image_name is not None:
558 vm_create_msg_dict["image_name"] = self.image_name
559
560 if self.image_checksum is not None:
561 vm_create_msg_dict["image_checksum"] = self.image_checksum
562
563 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
564 if self._vdud.has_field('mgmt_vpci'):
565 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
566
567 self._log.debug("VDUD: %s", self._vdud)
568 if config is not None:
569 vm_create_msg_dict['vdu_init'] = {'userdata': config}
570
571 if self._mgmt_network:
572 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
573
574 cp_list = []
575 for intf, cp, vlr in self._ext_intf:
576 cp_info = {"name": cp,
577 "virtual_link_id": vlr.network_id,
578 "type_yang": intf.virtual_interface.type_yang}
579
580 if (intf.virtual_interface.has_field('vpci') and
581 intf.virtual_interface.vpci is not None):
582 cp_info["vpci"] = intf.virtual_interface.vpci
583
584 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
585 cp_info['security_group'] = vlr.ip_profile_params.security_group
586
587 cp_list.append(cp_info)
588
589 for intf, cp, vlr in self._int_intf:
590 if (intf.virtual_interface.has_field('vpci') and
591 intf.virtual_interface.vpci is not None):
592 cp_list.append({"name": cp,
593 "virtual_link_id": vlr.network_id,
594 "type_yang": intf.virtual_interface.type_yang,
595 "vpci": intf.virtual_interface.vpci})
596 else:
597 cp_list.append({"name": cp,
598 "virtual_link_id": vlr.network_id,
599 "type_yang": intf.virtual_interface.type_yang})
600
601 vm_create_msg_dict["connection_points"] = cp_list
602 vm_create_msg_dict.update(vdu_copy_dict)
603
604 self.process_placement_groups(vm_create_msg_dict)
605
606 msg = RwResourceMgrYang.VDUEventData()
607 msg.event_id = self._request_id
608 msg.cloud_account = self.cloud_account_name
609 msg.request_info.from_dict(vm_create_msg_dict)
610
611 for volume in self._vdud.volumes:
612 v = msg.request_info.volumes.add()
613 v.from_dict(volume.as_dict())
614 return msg
615
616 @asyncio.coroutine
617 def terminate(self, xact):
618 """ Delete resource in VIM """
619 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
620 self._log.warning("VDU terminate in not ready state - Ignoring request")
621 return
622
623 self._state = VDURecordState.TERMINATING
624 if self._vm_resp is not None:
625 try:
626 with self._dts.transaction() as new_xact:
627 yield from self.delete_resource(new_xact)
628 except Exception:
629 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
630
631 if self._rm_regh is not None:
632 self._log.debug("Deregistering resource manager registration handle")
633 self._rm_regh.deregister()
634 self._rm_regh = None
635
636 if self._vdur_console_handler is not None:
637 self._log.error("Deregistering vnfr vdur registration handle")
638 self._vdur_console_handler._regh.deregister()
639 self._vdur_console_handler._regh = None
640
641 self._state = VDURecordState.TERMINATED
642
643 def find_internal_cp_by_cp_id(self, cp_id):
644 """ Find the CP corresponding to the connection point id"""
645 cp = None
646
647 self._log.debug("find_internal_cp_by_cp_id(%s) called",
648 cp_id)
649
650 for int_cp in self._vdud.internal_connection_point:
651 self._log.debug("Checking for int cp %s in internal connection points",
652 int_cp.id)
653 if int_cp.id == cp_id:
654 cp = int_cp
655 break
656
657 if cp is None:
658 self._log.debug("Failed to find cp %s in internal connection points",
659 cp_id)
660 msg = "Failed to find cp %s in internal connection points" % cp_id
661 raise VduRecordError(msg)
662
663 # return the VLR associated with the connection point
664 return cp
665
666 @asyncio.coroutine
667 def create_resource(self, xact, vnfr, config=None):
668 """ Request resource from ResourceMgr """
669 def find_cp_by_name(cp_name):
670 """ Find a connection point by name """
671 cp = None
672 self._log.debug("find_cp_by_name(%s) called", cp_name)
673 for ext_cp in vnfr._cprs:
674 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
675 if ext_cp.name == cp_name:
676 cp = ext_cp
677 break
678 if cp is None:
679 self._log.debug("Failed to find cp %s in external connection points",
680 cp_name)
681 return cp
682
683 def find_internal_vlr_by_cp_name(cp_name):
684 """ Find the VLR corresponding to the connection point name"""
685 cp = None
686
687 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
688 cp_name)
689
690 for int_cp in self._vdud.internal_connection_point:
691 self._log.debug("Checking for int cp %s in internal connection points",
692 int_cp.id)
693 if int_cp.id == cp_name:
694 cp = int_cp
695 break
696
697 if cp is None:
698 self._log.debug("Failed to find cp %s in internal connection points",
699 cp_name)
700 msg = "Failed to find cp %s in internal connection points" % cp_name
701 raise VduRecordError(msg)
702
703 # return the VLR associated with the connection point
704 return vnfr.find_vlr_by_cp(cp_name)
705
706 block = xact.block_create()
707
708 self._log.debug("Executing vm request id: %s, action: create",
709 self._request_id)
710
711 # Resolve the networks associated external interfaces
712 for ext_intf in self._vdud.external_interface:
713 self._log.debug("Resolving external interface name [%s], cp[%s]",
714 ext_intf.name, ext_intf.vnfd_connection_point_ref)
715 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
716 if cp is None:
717 self._log.debug("Failed to find connection point - %s",
718 ext_intf.vnfd_connection_point_ref)
719 continue
720 self._log.debug("Connection point name [%s], type[%s]",
721 cp.name, cp.type_yang)
722
723 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
724
725 etuple = (ext_intf, cp.name, vlr)
726 self._ext_intf.append(etuple)
727
728 self._log.debug("Created external interface tuple : %s", etuple)
729
730 # Resolve the networks associated internal interfaces
731 for intf in self._vdud.internal_interface:
732 cp_id = intf.vdu_internal_connection_point_ref
733 self._log.debug("Resolving internal interface name [%s], cp[%s]",
734 intf.name, cp_id)
735
736 try:
737 vlr = find_internal_vlr_by_cp_name(cp_id)
738 except Exception as e:
739 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
740 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
741 raise VduRecordError(msg)
742
743 ituple = (intf, cp_id, vlr)
744 self._int_intf.append(ituple)
745
746 self._log.debug("Created internal interface tuple : %s", ituple)
747
748 resmgr_path = self.resmgr_path
749 resmgr_msg = self.resmgr_msg(config)
750
751 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
752 block.add_query_create(resmgr_path, resmgr_msg)
753
754 res_iter = yield from block.execute(now=True)
755
756 resp = None
757
758 for i in res_iter:
759 r = yield from i
760 resp = r.result
761
762 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
763 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
764 self._log.debug("Got vm request response: %s", resp.resource_info)
765 return resp.resource_info
766
767 @asyncio.coroutine
768 def delete_resource(self, xact):
769 block = xact.block_create()
770
771 self._log.debug("Executing vm request id: %s, action: delete",
772 self._request_id)
773
774 block.add_query_delete(self.resmgr_path)
775
776 yield from block.execute(flags=0, now=True)
777
778 @asyncio.coroutine
779 def read_resource(self, xact):
780 block = xact.block_create()
781
782 self._log.debug("Executing vm request id: %s, action: delete",
783 self._request_id)
784
785 block.add_query_read(self.resmgr_path)
786
787 res_iter = yield from block.execute(flags=0, now=True)
788 for i in res_iter:
789 r = yield from i
790 resp = r.result
791
792 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
793 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
794 self._log.debug("Got vm request response: %s", resp.resource_info)
795 #self._vm_resp = resp.resource_info
796 return resp.resource_info
797
798
799 @asyncio.coroutine
800 def start_component(self):
801 """ This VDUR is active """
802 self._log.debug("Starting component %s for vdud %s vdur %s",
803 self._vdud.vcs_component_ref,
804 self._vdud,
805 self._vdur_id)
806 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
807 self.vm_resp.management_ip)
808
809 @property
810 def active(self):
811 """ Is this VDU active """
812 return True if self._state is VDURecordState.READY else False
813
814 @asyncio.coroutine
815 def instantiation_failed(self, failed_reason=None):
816 """ VDU instantiation failed """
817 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
818 self._state = VDURecordState.FAILED
819 self._state_failed_reason = failed_reason
820 yield from self._vnfr.instantiation_failed(failed_reason)
821
822 @asyncio.coroutine
823 def vdu_is_active(self):
824 """ This VDU is active"""
825 if self.active:
826 self._log.warning("VDU %s was already marked as active", self._vdur_id)
827 return
828
829 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
830
831 if self._vdud.vcs_component_ref is not None:
832 yield from self.start_component()
833
834 self._state = VDURecordState.READY
835
836 if self._vnfr.all_vdus_active():
837 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
838 yield from self._vnfr.is_ready()
839
840 @asyncio.coroutine
841 def instantiate(self, xact, vnfr, config=None):
842 """ Instantiate this VDU """
843 self._state = VDURecordState.INSTANTIATING
844
845 @asyncio.coroutine
846 def on_prepare(xact_info, query_action, ks_path, msg):
847 """ This VDUR is active """
848 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
849 query_action,
850 ks_path,
851 msg)
852
853 if (query_action == rwdts.QueryAction.UPDATE or
854 query_action == rwdts.QueryAction.CREATE):
855 self._vm_resp = msg
856
857 if msg.resource_state == "active":
858 # Move this VDU to ready state
859 yield from self.vdu_is_active()
860 elif msg.resource_state == "failed":
861 yield from self.instantiation_failed(msg.resource_errors)
862 elif query_action == rwdts.QueryAction.DELETE:
863 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
864 else:
865 raise NotImplementedError(
866 "%s action on VirtualDeployementUnitRecord not supported",
867 query_action)
868
869 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
870
871 try:
872 reg_event = asyncio.Event(loop=self._loop)
873
874 @asyncio.coroutine
875 def on_ready(regh, status):
876 reg_event.set()
877
878 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
879 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
880 flags=rwdts.Flag.SUBSCRIBER,
881 handler=handler)
882 yield from reg_event.wait()
883
884 vm_resp = yield from self.create_resource(xact, vnfr, config)
885 self._vm_resp = vm_resp
886
887 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
888 self._log.debug("Requested VM from resource manager response %s",
889 vm_resp)
890 if vm_resp.resource_state == "active":
891 self._log.debug("Resourcemgr responded wih an active vm resp %s",
892 vm_resp)
893 yield from self.vdu_is_active()
894 self._state = VDURecordState.READY
895 elif (vm_resp.resource_state == "pending" or
896 vm_resp.resource_state == "inactive"):
897 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
898 vm_resp)
899 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
900 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
901 # flags=rwdts.Flag.SUBSCRIBER,
902 # handler=handler)
903 else:
904 self._log.debug("Resourcemgr responded wih an error vm resp %s",
905 vm_resp)
906 raise VirtualDeploymentUnitRecordError(
907 "Failed VDUR instantiation %s " % vm_resp)
908
909 except Exception as e:
910 import traceback
911 traceback.print_exc()
912 self._log.exception(e)
913 self._log.error("Instantiation of VDU record failed: %s", str(e))
914 self._state = VDURecordState.FAILED
915 yield from self.instantiation_failed(str(e))
916
917
918 class VlRecordState(enum.Enum):
919 """ VL Record State """
920 INIT = 101
921 INSTANTIATION_PENDING = 102
922 ACTIVE = 103
923 TERMINATE_PENDING = 104
924 TERMINATED = 105
925 FAILED = 106
926
927
928 class InternalVirtualLinkRecord(object):
929 """ Internal Virtual Link record """
930 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
931 self._dts = dts
932 self._log = log
933 self._loop = loop
934 self._ivld_msg = ivld_msg
935 self._vnfr_name = vnfr_name
936 self._cloud_account_name = cloud_account_name
937
938 self._vlr_req = self.create_vlr()
939 self._vlr = None
940 self._state = VlRecordState.INIT
941
942 @property
943 def vlr_id(self):
944 """ Find VLR by id """
945 return self._vlr_req.id
946
947 @property
948 def name(self):
949 """ Name of this VL """
950 return self._vnfr_name + "." + self._ivld_msg.name
951
952 @property
953 def network_id(self):
954 """ Find VLR by id """
955 return self._vlr.network_id if self._vlr else None
956
957 def vlr_path(self):
958 """ VLR path for this VLR instance"""
959 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
960
961 def create_vlr(self):
962 """ Create the VLR record which will be instantiated """
963
964 vld_fields = ["short_name",
965 "vendor",
966 "description",
967 "version",
968 "type_yang",
969 "provider_network"]
970
971 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
972
973 vlr_dict = {"id": str(uuid.uuid4()),
974 "name": self.name,
975 "cloud_account": self._cloud_account_name,
976 }
977 vlr_dict.update(vld_copy_dict)
978
979 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
980 return vlr
981
982 @asyncio.coroutine
983 def instantiate(self, xact, restart_mode=False):
984 """ Instantiate VL """
985
986 @asyncio.coroutine
987 def instantiate_vlr():
988 """ Instantiate VLR"""
989 self._log.debug("Create VL with xpath %s and vlr %s",
990 self.vlr_path(), self._vlr_req)
991
992 with self._dts.transaction(flags=0) as xact:
993 block = xact.block_create()
994 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
995 self._log.debug("Executing VL create path:%s msg:%s",
996 self.vlr_path(), self._vlr_req)
997
998 res_iter = None
999 try:
1000 res_iter = yield from block.execute()
1001 except Exception:
1002 self._state = VlRecordState.FAILED
1003 self._log.exception("Caught exception while instantial VL")
1004 raise
1005
1006 for ent in res_iter:
1007 res = yield from ent
1008 self._vlr = res.result
1009
1010 if self._vlr.operational_status == 'failed':
1011 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1012 self._state = VlRecordState.FAILED
1013 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1014
1015 self._log.info("Created VL with xpath %s and vlr %s",
1016 self.vlr_path(), self._vlr)
1017
1018 @asyncio.coroutine
1019 def get_vlr():
1020 """ Get the network id """
1021 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1022 vlr = None
1023 for ent in res_iter:
1024 res = yield from ent
1025 vlr = res.result
1026
1027 if vlr is None:
1028 err = "Failed to get VLR for path %s" % self.vlr_path()
1029 self._log.warn(err)
1030 raise InternalVirtualLinkRecordError(err)
1031 return vlr
1032
1033 self._state = VlRecordState.INSTANTIATION_PENDING
1034
1035 if restart_mode:
1036 vl = yield from get_vlr()
1037 if vl is None:
1038 yield from instantiate_vlr()
1039 else:
1040 yield from instantiate_vlr()
1041
1042 self._state = VlRecordState.ACTIVE
1043
1044 def vlr_in_vns(self):
1045 """ Is there a VLR record in VNS """
1046 if (self._state == VlRecordState.ACTIVE or
1047 self._state == VlRecordState.INSTANTIATION_PENDING or
1048 self._state == VlRecordState.FAILED):
1049 return True
1050
1051 return False
1052
1053 @asyncio.coroutine
1054 def terminate(self, xact):
1055 """Terminate this VL """
1056 if not self.vlr_in_vns():
1057 self._log.debug("Ignoring terminate request for id %s in state %s",
1058 self.vlr_id, self._state)
1059 return
1060
1061 self._log.debug("Terminating VL with path %s", self.vlr_path())
1062 self._state = VlRecordState.TERMINATE_PENDING
1063 block = xact.block_create()
1064 block.add_query_delete(self.vlr_path())
1065 yield from block.execute(flags=0, now=True)
1066 self._state = VlRecordState.TERMINATED
1067 self._log.debug("Terminated VL with path %s", self.vlr_path())
1068
1069
1070 class VirtualNetworkFunctionRecord(object):
1071 """ Virtual Network Function Record """
1072 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1073 self._dts = dts
1074 self._log = log
1075 self._loop = loop
1076 self._cluster_name = cluster_name
1077 self._vnfr_msg = vnfr_msg
1078 self._vnfr_id = vnfr_msg.id
1079 self._vnfd_id = vnfr_msg.vnfd_ref
1080 self._vnfm = vnfm
1081 self._vcs_handler = vcs_handler
1082 self._vnfr = vnfr_msg
1083 self._mgmt_network = mgmt_network
1084
1085 self._vnfd = None
1086 self._state = VirtualNetworkFunctionRecordState.INIT
1087 self._state_failed_reason = None
1088 self._ext_vlrs = {} # The list of external virtual links
1089 self._vlrs = [] # The list of internal virtual links
1090 self._vdus = [] # The list of vdu
1091 self._vlr_by_cp = {}
1092 self._cprs = []
1093 self._inventory = {}
1094 self._create_time = int(time.time())
1095 self._vnf_mon = None
1096 self._config_status = vnfr_msg.config_status
1097 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1098
1099 def _get_vdur_from_vdu_id(self, vdu_id):
1100 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1101 self._log.debug("Searching through vdus: %s", self._vdus)
1102 for vdu in self._vdus:
1103 self._log.debug("vdu_id: %s", vdu.vdu_id)
1104 if vdu.vdu_id == vdu_id:
1105 return vdu
1106
1107 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1108
1109 @property
1110 def operational_status(self):
1111 """ Operational status of this VNFR """
1112 op_status_map = {"INIT": "init",
1113 "VL_INIT_PHASE": "vl_init_phase",
1114 "VM_INIT_PHASE": "vm_init_phase",
1115 "READY": "running",
1116 "TERMINATE": "terminate",
1117 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1118 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1119 "TERMINATED": "terminated",
1120 "FAILED": "failed", }
1121 return op_status_map[self._state.name]
1122
1123 @property
1124 def vnfd_xpath(self):
1125 """ VNFD xpath associated with this VNFR """
1126 return("C,/vnfd:vnfd-catalog/"
1127 "vnfd:vnfd[vnfd:id = '{}']".format(self._vnfd_id))
1128
1129 @property
1130 def vnfd(self):
1131 """ VNFD for this VNFR """
1132 return self._vnfd
1133
1134 @property
1135 def vnf_name(self):
1136 """ VNFD name associated with this VNFR """
1137 return self.vnfd.name
1138
1139 @property
1140 def name(self):
1141 """ Name of this VNF in the record """
1142 return self._vnfr.name
1143
1144 @property
1145 def cloud_account_name(self):
1146 """ Name of the cloud account this VNFR is instantiated in """
1147 return self._vnfr.cloud_account
1148
1149 @property
1150 def vnfd_id(self):
1151 """ VNFD Id associated with this VNFR """
1152 return self.vnfd.id
1153
1154 @property
1155 def vnfr_id(self):
1156 """ VNFR Id associated with this VNFR """
1157 return self._vnfr_id
1158
1159 @property
1160 def member_vnf_index(self):
1161 """ Member VNF index associated with this VNFR """
1162 return self._vnfr.member_vnf_index_ref
1163
1164 @property
1165 def config_status(self):
1166 """ Config agent status for this VNFR """
1167 return self._config_status
1168
1169 def component_by_name(self, component_name):
1170 """ Find a component by name in the inventory list"""
1171 mangled_name = VcsComponent.mangle_name(component_name,
1172 self.vnf_name,
1173 self.vnfd_id)
1174 return self._inventory[mangled_name]
1175
1176
1177
1178 @asyncio.coroutine
1179 def get_nsr_config(self):
1180 ### Need access to NS instance configuration for runtime resolution.
1181 ### This shall be replaced when deployment flavors are implemented
1182 xpath = "C,/nsr:ns-instance-config"
1183 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1184
1185 for result in results:
1186 entry = yield from result
1187 ns_instance_config = entry.result
1188 for nsr in ns_instance_config.nsr:
1189 if nsr.id == self._vnfr_msg.nsr_id_ref:
1190 return nsr
1191 return None
1192
1193 @asyncio.coroutine
1194 def start_component(self, component_name, ip_addr):
1195 """ Start a component in the VNFR by name """
1196 comp = self.component_by_name(component_name)
1197 yield from comp.start(None, None, ip_addr)
1198
1199 def cp_ip_addr(self, cp_name):
1200 """ Get ip address for connection point """
1201 self._log.debug("cp_ip_addr()")
1202 for cp in self._cprs:
1203 if cp.name == cp_name and cp.ip_address is not None:
1204 return cp.ip_address
1205 return "0.0.0.0"
1206
1207 def mgmt_intf_info(self):
1208 """ Get Management interface info for this VNFR """
1209 mgmt_intf_desc = self.vnfd.msg.mgmt_interface
1210 ip_addr = None
1211 if mgmt_intf_desc.has_field("cp"):
1212 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1213 elif mgmt_intf_desc.has_field("vdu_id"):
1214 try:
1215 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1216 ip_addr = vdur.management_ip
1217 except VDURecordNotFound:
1218 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1219 ip_addr = None
1220 else:
1221 ip_addr = mgmt_intf_desc.ip_address
1222 port = mgmt_intf_desc.port
1223
1224 return ip_addr, port
1225
1226 @property
1227 def msg(self):
1228 """ Message associated with this VNFR """
1229 vnfd_fields = ["short_name", "vendor", "description", "version"]
1230 vnfd_copy_dict = {k: v for k, v in self.vnfd.msg.as_dict().items() if k in vnfd_fields}
1231
1232 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1233 ip_address, port = self.mgmt_intf_info()
1234
1235 if ip_address is not None:
1236 mgmt_intf.ip_address = ip_address
1237 if port is not None:
1238 mgmt_intf.port = port
1239
1240 vnfr_dict = {"id": self._vnfr_id,
1241 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1242 "name": self.name,
1243 "member_vnf_index_ref": self.member_vnf_index,
1244 "vnfd_ref": self.vnfd_id,
1245 "operational_status": self.operational_status,
1246 "operational_status_details": self._state_failed_reason,
1247 "cloud_account": self.cloud_account_name,
1248 "config_status": self._config_status
1249 }
1250
1251 vnfr_dict.update(vnfd_copy_dict)
1252
1253 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1254 vnfr_msg.uptime = int(time.time()) - self._create_time
1255 vnfr_msg.mgmt_interface = mgmt_intf
1256
1257 # Add all the VLRs to VNFR
1258 for vlr in self._vlrs:
1259 ivlr = vnfr_msg.internal_vlr.add()
1260 ivlr.vlr_ref = vlr.vlr_id
1261
1262 # Add all the VDURs to VDUR
1263 if self._vdus is not None:
1264 for vdu in self._vdus:
1265 vdur = vnfr_msg.vdur.add()
1266 vdur.from_dict(vdu.msg.as_dict())
1267
1268 if self.vnfd.msg.mgmt_interface.has_field('dashboard_params'):
1269 vnfr_msg.dashboard_url = self.dashboard_url
1270
1271 for cpr in self._cprs:
1272 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1273 vnfr_msg.connection_point.append(new_cp)
1274
1275 if self._vnf_mon is not None:
1276 for monp in self._vnf_mon.msg:
1277 vnfr_msg.monitoring_param.append(
1278 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1279
1280 if self._vnfr.vnf_configuration is not None:
1281 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1282 if (ip_address is not None and
1283 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1284 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1285
1286 for group in self._vnfr_msg.placement_groups_info:
1287 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1288 group_info.from_dict(group.as_dict())
1289 vnfr_msg.placement_groups_info.append(group_info)
1290
1291 return vnfr_msg
1292
1293 @property
1294 def dashboard_url(self):
1295 ip, cfg_port = self.mgmt_intf_info()
1296 protocol = 'http'
1297 http_port = 80
1298 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('https'):
1299 if self.vnfd.msg.mgmt_interface.dashboard_params.https is True:
1300 protocol = 'https'
1301 http_port = 443
1302 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('port'):
1303 http_port = self.vnfd.msg.mgmt_interface.dashboard_params.port
1304
1305 url = "{protocol}://{ip_address}:{port}/{path}".format(
1306 protocol=protocol,
1307 ip_address=ip,
1308 port=http_port,
1309 path=self.vnfd.msg.mgmt_interface.dashboard_params.path.lstrip("/"),
1310 )
1311
1312 return url
1313
1314 @property
1315 def xpath(self):
1316 """ path for this VNFR """
1317 return("D,/vnfr:vnfr-catalog"
1318 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1319
1320 @asyncio.coroutine
1321 def publish(self, xact):
1322 """ publish this VNFR """
1323 vnfr = self.msg
1324 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1325 self.xpath, self.msg)
1326 vnfr.create_time = self._create_time
1327 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1328 self._log.debug("Published VNFR path = [%s], record = [%s]",
1329 self.xpath, self.msg)
1330
1331 @asyncio.coroutine
1332 def create_vls(self):
1333 """ Publish The VLs associated with this VNF """
1334 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1335 self.vnfd_id)
1336 for ivld_msg in self.vnfd.msg.internal_vld:
1337 self._log.debug("Creating internal vld:"
1338 " %s, int_cp_ref = %s",
1339 ivld_msg, ivld_msg.internal_connection_point
1340 )
1341 vlr = InternalVirtualLinkRecord(dts=self._dts,
1342 log=self._log,
1343 loop=self._loop,
1344 ivld_msg=ivld_msg,
1345 vnfr_name=self.name,
1346 cloud_account_name=self.cloud_account_name
1347 )
1348 self._vlrs.append(vlr)
1349
1350 for int_cp in ivld_msg.internal_connection_point:
1351 if int_cp.id_ref in self._vlr_by_cp:
1352 msg = ("Connection point %s already "
1353 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1354 raise InternalVirtualLinkRecordError(msg)
1355 self._log.debug("Setting vlr %s to internal cp = %s",
1356 vlr, int_cp.id_ref)
1357 self._vlr_by_cp[int_cp.id_ref] = vlr
1358
1359 @asyncio.coroutine
1360 def instantiate_vls(self, xact, restart_mode=False):
1361 """ Instantiate the VLs associated with this VNF """
1362 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1363 self.vnfd_id)
1364
1365 for vlr in self._vlrs:
1366 self._log.debug("Instantiating VLR %s", vlr)
1367 yield from vlr.instantiate(xact, restart_mode)
1368
1369 def find_vlr_by_cp(self, cp_name):
1370 """ Find the VLR associated with the cp name """
1371 return self._vlr_by_cp[cp_name]
1372
1373 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1374 """
1375 Returns the cloud specific construct for placement group
1376 Arguments:
1377 input_group: VNFD PlacementGroup
1378 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1379 """
1380 copy_dict = ['name', 'requirement', 'strategy']
1381 for group_info in nsr_config.vnfd_placement_group_maps:
1382 if group_info.placement_group_ref == input_group.name and \
1383 group_info.vnfd_id_ref == self.vnfd_id:
1384 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1385 group_dict = {k:v for k,v in
1386 group_info.as_dict().items()
1387 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1388 for param in copy_dict:
1389 group_dict.update({param: getattr(input_group, param)})
1390 group.from_dict(group_dict)
1391 return group
1392 return None
1393
1394 @asyncio.coroutine
1395 def get_vdu_placement_groups(self, vdu):
1396 placement_groups = []
1397 ### Step-1: Get VNF level placement groups
1398 for group in self._vnfr_msg.placement_groups_info:
1399 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1400 #group_info.from_dict(group.as_dict())
1401 placement_groups.append(group)
1402
1403 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1404 nsr_config = yield from self.get_nsr_config()
1405
1406 ### Step-3: Get VDU level placement groups
1407 for group in self.vnfd.msg.placement_groups:
1408 for member_vdu in group.member_vdus:
1409 if member_vdu.member_vdu_ref == vdu.id:
1410 group_info = self.resolve_placement_group_cloud_construct(group,
1411 nsr_config)
1412 if group_info is None:
1413 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1414 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1415 else:
1416 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1417 str(group_info),
1418 vdu.name,
1419 self.vnf_name,
1420 self.member_vnf_index)
1421 placement_groups.append(group_info)
1422
1423 return placement_groups
1424
1425 @asyncio.coroutine
1426 def create_vdus(self, vnfr, restart_mode=False):
1427 """ Create the VDUs associated with this VNF """
1428
1429 def get_vdur_id(vdud):
1430 """Get the corresponding VDUR's id for the VDUD. This is useful in
1431 case of a restart.
1432
1433 In restart mode we check for exiting VDUR's ID and use them, if
1434 available. This way we don't end up creating duplicate VDURs
1435 """
1436 vdur_id = None
1437
1438 if restart_mode and vdud is not None:
1439 try:
1440 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1441 vdur_id = vdur[0]
1442 except IndexError:
1443 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1444
1445 return vdur_id
1446
1447
1448 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1449 for vdu in self.vnfd.msg.vdu:
1450 self._log.debug("Creating vdu: %s", vdu)
1451 vdur_id = get_vdur_id(vdu)
1452
1453 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1454 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1455 vdu.name,
1456 self.vnf_name,
1457 self.member_vnf_index,
1458 [ group.name for group in placement_groups])
1459
1460 vdur = VirtualDeploymentUnitRecord(
1461 dts=self._dts,
1462 log=self._log,
1463 loop=self._loop,
1464 vdud=vdu,
1465 vnfr=vnfr,
1466 mgmt_intf=self.has_mgmt_interface(vdu),
1467 mgmt_network=self._mgmt_network,
1468 cloud_account_name=self.cloud_account_name,
1469 vnfd_package_store=self._vnfd_package_store,
1470 vdur_id=vdur_id,
1471 placement_groups = placement_groups,
1472 )
1473 yield from vdur.vdu_opdata_register()
1474
1475 self._vdus.append(vdur)
1476
1477 @asyncio.coroutine
1478 def instantiate_vdus(self, xact, vnfr):
1479 """ Instantiate the VDUs associated with this VNF """
1480 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1481
1482 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1483
1484 # Identify any dependencies among the VDUs
1485 dependencies = collections.defaultdict(list)
1486 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1487
1488 for vdu in self._vdus:
1489 if vdu.vdud_cloud_init is not None:
1490 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1491 if vdu_id != vdu.vdu_id:
1492 # This means that vdu.vdu_id depends upon vdu_id,
1493 # i.e. vdu_id must be instantiated before
1494 # vdu.vdu_id.
1495 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1496
1497 # Define the terminal states of VDU instantiation
1498 terminal = (
1499 VDURecordState.READY,
1500 VDURecordState.TERMINATED,
1501 VDURecordState.FAILED,
1502 )
1503
1504 datastore = VdurDatastore()
1505 processed = set()
1506
1507 @asyncio.coroutine
1508 def instantiate_monitor(vdu):
1509 """Monitor the state of the VDU during instantiation
1510
1511 Arguments:
1512 vdu - a VirtualDeploymentUnitRecord
1513
1514 """
1515 # wait for the VDUR to enter a terminal state
1516 while vdu._state not in terminal:
1517 yield from asyncio.sleep(1, loop=self._loop)
1518
1519 # update the datastore
1520 datastore.update(vdu)
1521
1522 # add the VDU to the set of processed VDUs
1523 processed.add(vdu.vdu_id)
1524
1525 @asyncio.coroutine
1526 def instantiate(vdu):
1527 """Instantiate the specified VDU
1528
1529 Arguments:
1530 vdu - a VirtualDeploymentUnitRecord
1531
1532 Raises:
1533 if the VDU, or any of the VDUs this VDU depends upon, are
1534 terminated or fail to instantiate properly, a
1535 VirtualDeploymentUnitRecordError is raised.
1536
1537 """
1538 for dependency in dependencies[vdu.vdu_id]:
1539 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1540
1541 while dependency.vdu_id not in processed:
1542 yield from asyncio.sleep(1, loop=self._loop)
1543
1544 if not dependency.active:
1545 raise VirtualDeploymentUnitRecordError()
1546
1547 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1548
1549 # Populate the datastore with the current values of the VDU
1550 datastore.add(vdu)
1551
1552 # Substitute any variables contained in the cloud config script
1553 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1554
1555 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1556 if len(parts) > 1:
1557
1558 # Extract the variable names
1559 variables = list()
1560 for variable in parts[1::2]:
1561 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1562
1563 # Iterate of the variables and substitute values from the
1564 # datastore.
1565 for variable in variables:
1566
1567 # Handle a reference to a VDU by ID
1568 if variable.startswith('vdu['):
1569 value = datastore.get(variable)
1570 if value is None:
1571 msg = "Unable to find a substitute for {} in {} cloud-init script"
1572 raise ValueError(msg.format(variable, vdu.vdu_id))
1573
1574 config = config.replace("{{ %s }}" % variable, value)
1575 continue
1576
1577 # Handle a reference to the current VDU
1578 if variable.startswith('vdu'):
1579 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1580 config = config.replace("{{ %s }}" % variable, value)
1581 continue
1582
1583 # Handle unrecognized variables
1584 msg = 'unrecognized cloud-config variable: {}'
1585 raise ValueError(msg.format(variable))
1586
1587 # Instantiate the VDU
1588 with self._dts.transaction() as xact:
1589 self._log.debug("Instantiating vdu: %s", vdu)
1590 yield from vdu.instantiate(xact, vnfr, config=config)
1591 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1592 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1593 self.vnfr_id, vdu)
1594
1595 # First create a set of tasks to monitor the state of the VDUs and
1596 # report when they have entered a terminal state
1597 for vdu in self._vdus:
1598 self._loop.create_task(instantiate_monitor(vdu))
1599
1600 for vdu in self._vdus:
1601 self._loop.create_task(instantiate(vdu))
1602
1603 def has_mgmt_interface(self, vdu):
1604 # ## TODO: Support additional mgmt_interface type options
1605 if self.vnfd.msg.mgmt_interface.vdu_id == vdu.id:
1606 return True
1607 return False
1608
1609 def vlr_xpath(self, vlr_id):
1610 """ vlr xpath """
1611 return(
1612 "D,/vlr:vlr-catalog/"
1613 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1614
1615 def ext_vlr_by_id(self, vlr_id):
1616 """ find ext vlr by id """
1617 return self._ext_vlrs[vlr_id]
1618
1619 @asyncio.coroutine
1620 def publish_inventory(self, xact):
1621 """ Publish the inventory associated with this VNF """
1622 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1623
1624 for component in self.vnfd.msg.component:
1625 self._log.debug("Creating inventory component %s", component)
1626 mangled_name = VcsComponent.mangle_name(component.component_name,
1627 self.vnf_name,
1628 self.vnfd_id
1629 )
1630 comp = VcsComponent(dts=self._dts,
1631 log=self._log,
1632 loop=self._loop,
1633 cluster_name=self._cluster_name,
1634 vcs_handler=self._vcs_handler,
1635 component=component,
1636 mangled_name=mangled_name,
1637 )
1638 if comp.name in self._inventory:
1639 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1640 component, self._vnfd_id)
1641 return
1642 self._log.debug("Adding component %s for vnrf %s",
1643 comp.name, self._vnfr_id)
1644 self._inventory[comp.name] = comp
1645 yield from comp.publish(xact)
1646
1647 def all_vdus_active(self):
1648 """ Are all VDUS in this VNFR active? """
1649 for vdu in self._vdus:
1650 if not vdu.active:
1651 return False
1652
1653 self._log.debug("Inside all_vdus_active. Returning True")
1654 return True
1655
1656 @asyncio.coroutine
1657 def instantiation_failed(self, failed_reason=None):
1658 """ VNFR instantiation failed """
1659 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1660 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1661 self._state_failed_reason = failed_reason
1662
1663 # Update the VNFR with the changed status
1664 yield from self.publish(None)
1665
1666 @asyncio.coroutine
1667 def is_ready(self):
1668 """ This VNF is ready"""
1669 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1670
1671 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1672 self.set_state(VirtualNetworkFunctionRecordState.READY)
1673
1674 else:
1675 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1676
1677 # Update the VNFR with the changed status
1678 yield from self.publish(None)
1679
1680 def update_cp(self, cp_name, ip_address, cp_id):
1681 """Updated the connection point with ip address"""
1682 for cp in self._cprs:
1683 if cp.name == cp_name:
1684 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1685 cp_name, cp, ip_address, cp_id)
1686 cp.ip_address = ip_address
1687 cp.connection_point_id = cp_id
1688 return
1689
1690 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1691 self._log.debug(err)
1692 raise VirtualDeploymentUnitRecordError(err)
1693
1694 def set_state(self, state):
1695 """ Set state for this VNFR"""
1696 self._state = state
1697
1698 @asyncio.coroutine
1699 def instantiate(self, xact, restart_mode=False):
1700 """ instantiate this VNF """
1701 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1702
1703 @asyncio.coroutine
1704 def fetch_vlrs():
1705 """ Fetch VLRs """
1706 # Iterate over all the connection points in VNFR and fetch the
1707 # associated VLRs
1708
1709 def cpr_from_cp(cp):
1710 """ Creates a record level connection point from the desciptor cp"""
1711 cp_fields = ["name", "image", "vm-flavor"]
1712 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1713 cpr_dict = {}
1714 cpr_dict.update(cp_copy_dict)
1715 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1716
1717 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1718 self._vnfr_id, self._vnfr.connection_point)
1719
1720 for cp in self._vnfr.connection_point:
1721 cpr = cpr_from_cp(cp)
1722 self._cprs.append(cpr)
1723 self._log.debug("Adding Connection point record %s ", cp)
1724
1725 vlr_path = self.vlr_xpath(cp.vlr_ref)
1726 self._log.debug("Fetching VLR with path = %s", vlr_path)
1727 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1728 rwdts.XactFlag.MERGE)
1729 for i in res_iter:
1730 r = yield from i
1731 d = r.result
1732 self._ext_vlrs[cp.vlr_ref] = d
1733 cpr.vlr_ref = cp.vlr_ref
1734 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1735
1736 # Fetch the VNFD associated with the VNFR
1737 self._log.debug("VNFR-ID %s: Fetching vnfds", self._vnfr_id)
1738 self._vnfd = yield from self._vnfm.get_vnfd_ref(self._vnfd_id)
1739 self._log.debug("VNFR-ID %s: Fetched vnfd:%s", self._vnfr_id, self._vnfd)
1740
1741 assert self.vnfd is not None
1742
1743 # Fetch External VLRs
1744 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1745 yield from fetch_vlrs()
1746
1747 # Publish inventory
1748 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1749 yield from self.publish_inventory(xact)
1750
1751 # Publish inventory
1752 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1753 yield from self.create_vls()
1754
1755 # publish the VNFR
1756 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1757 yield from self.publish(xact)
1758
1759 # instantiate VLs
1760 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1761 try:
1762 yield from self.instantiate_vls(xact, restart_mode)
1763 except Exception as e:
1764 self._log.exception("VL instantiation failed (%s)", str(e))
1765 yield from self.instantiation_failed(str(e))
1766 return
1767
1768 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1769
1770 # instantiate VDUs
1771 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1772 yield from self.create_vdus(self, restart_mode)
1773
1774 # publish the VNFR
1775 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1776 yield from self.publish(xact)
1777
1778 # instantiate VDUs
1779 # ToDo: Check if this should be prevented during restart
1780 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1781 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1782
1783 # publish the VNFR
1784 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1785 yield from self.publish(xact)
1786
1787 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1788
1789 # create task updating uptime for this vnfr
1790 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1791 self._loop.create_task(self.vnfr_uptime_update(xact))
1792
1793 @asyncio.coroutine
1794 def terminate(self, xact):
1795 """ Terminate this virtual network function """
1796
1797 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1798
1799 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1800
1801 # stop monitoring
1802 if self._vnf_mon is not None:
1803 self._vnf_mon.stop()
1804 self._vnf_mon.deregister()
1805 self._vnf_mon = None
1806
1807 @asyncio.coroutine
1808 def terminate_vls():
1809 """ Terminate VLs in this VNF """
1810 for vl in self._vlrs:
1811 yield from vl.terminate(xact)
1812
1813 @asyncio.coroutine
1814 def terminate_vdus():
1815 """ Terminate VDUS in this VNF """
1816 for vdu in self._vdus:
1817 yield from vdu.terminate(xact)
1818
1819 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1820 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1821 yield from terminate_vls()
1822
1823 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1824 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1825 yield from terminate_vdus()
1826
1827 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1828 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1829
1830 @asyncio.coroutine
1831 def vnfr_uptime_update(self, xact):
1832 while True:
1833 # Return when vnfr state is FAILED or TERMINATED etc
1834 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1835 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1836 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1837 VirtualNetworkFunctionRecordState.READY]:
1838 return
1839 yield from self.publish(xact)
1840 yield from asyncio.sleep(2, loop=self._loop)
1841
1842
1843
1844 class VnfdDtsHandler(object):
1845 """ DTS handler for VNFD config changes """
1846 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1847
1848 def __init__(self, dts, log, loop, vnfm):
1849 self._dts = dts
1850 self._log = log
1851 self._loop = loop
1852 self._vnfm = vnfm
1853 self._regh = None
1854
1855 @asyncio.coroutine
1856 def regh(self):
1857 """ DTS registration handle """
1858 return self._regh
1859
1860 @asyncio.coroutine
1861 def register(self):
1862 """ Register for VNFD configuration"""
1863
1864 def on_apply(dts, acg, xact, action, scratch):
1865 """Apply the configuration"""
1866 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1867 xact, action, scratch)
1868
1869 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1870 # Create/Update a VNFD record
1871 for cfg in self._regh.get_xact_elements(xact):
1872 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1873 if cfg.id in scratch.get('vnfds', []) or is_recovery:
1874 self._vnfm.update_vnfd(cfg)
1875
1876 scratch.pop('vnfds', None)
1877
1878 @asyncio.coroutine
1879 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1880 """ on prepare callback """
1881 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1882 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1883 fref = ProtobufC.FieldReference.alloc()
1884 fref.goto_whole_message(msg.to_pbcm())
1885
1886 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1887 if fref.is_field_deleted():
1888 # Delete an VNFD record
1889 self._log.debug("Deleting VNFD with id %s", msg.id)
1890 if self._vnfm.vnfd_in_use(msg.id):
1891 self._log.debug("Cannot delete VNFD in use - %s", msg)
1892 err = "Cannot delete a VNFD in use - %s" % msg
1893 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1894 # Delete a VNFD record
1895 yield from self._vnfm.delete_vnfd(msg.id)
1896 else:
1897 # Handle actual adds/updates in apply_callback,
1898 # just check if VNFD in use in prepare_callback
1899 if self._vnfm.vnfd_in_use(msg.id):
1900 self._log.debug("Cannot modify an VNFD in use - %s", msg)
1901 err = "Cannot modify an VNFD in use - %s" % msg
1902 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1903
1904 # Add this VNFD to scratch to create/update in apply callback
1905 vnfds = scratch.setdefault('vnfds', [])
1906 vnfds.append(msg.id)
1907
1908 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1909
1910 self._log.debug(
1911 "Registering for VNFD config using xpath: %s",
1912 VnfdDtsHandler.XPATH,
1913 )
1914 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1915 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1916 self._regh = acg.register(
1917 xpath=VnfdDtsHandler.XPATH,
1918 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1919 on_prepare=on_prepare)
1920
1921
1922 class VcsComponentDtsHandler(object):
1923 """ Vcs Component DTS handler """
1924 XPATH = ("D,/rw-manifest:manifest" +
1925 "/rw-manifest:operational-inventory" +
1926 "/rw-manifest:component")
1927
1928 def __init__(self, dts, log, loop, vnfm):
1929 self._dts = dts
1930 self._log = log
1931 self._loop = loop
1932 self._regh = None
1933 self._vnfm = vnfm
1934
1935 @property
1936 def regh(self):
1937 """ DTS registration handle """
1938 return self._regh
1939
1940 @asyncio.coroutine
1941 def register(self):
1942 """ Registers VCS component dts publisher registration"""
1943 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1944 VcsComponentDtsHandler.XPATH)
1945
1946 hdl = rift.tasklets.DTS.RegistrationHandler()
1947 handlers = rift.tasklets.Group.Handler()
1948 with self._dts.group_create(handler=handlers) as group:
1949 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1950 handler=hdl,
1951 flags=(rwdts.Flag.PUBLISHER |
1952 rwdts.Flag.NO_PREP_READ |
1953 rwdts.Flag.DATASTORE),)
1954
1955 @asyncio.coroutine
1956 def publish(self, xact, path, msg):
1957 """ Publishes the VCS component """
1958 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1959 xact, path, msg)
1960 self.regh.create_element(path, msg)
1961 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1962 VcsComponentDtsHandler.XPATH, xact, path, msg)
1963
1964 class VnfrConsoleOperdataDtsHandler(object):
1965 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1966 @property
1967 def vnfr_vdu_console_xpath(self):
1968 """ path for resource-mgr"""
1969 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1970
1971 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1972 self._dts = dts
1973 self._log = log
1974 self._loop = loop
1975 self._regh = None
1976 self._vnfm = vnfm
1977
1978 self._vnfr_id = vnfr_id
1979 self._vdur_id = vdur_id
1980 self._vdu_id = vdu_id
1981
1982 @asyncio.coroutine
1983 def register(self):
1984 """ Register for VNFR VDU Operational Data read from dts """
1985
1986 @asyncio.coroutine
1987 def on_prepare(xact_info, action, ks_path, msg):
1988 """ prepare callback from dts """
1989 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
1990 self._log.debug(
1991 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1992 xact_info, action, xpath, msg
1993 )
1994
1995 if action == rwdts.QueryAction.READ:
1996 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
1997 path_entry = schema.keyspec_to_entry(ks_path)
1998 self._log.debug("VDU Opdata path is {}".format(path_entry))
1999 try:
2000 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2001 except VnfRecordError as e:
2002 self._log.error("VNFR id %s not found", self._vnfr_id)
2003 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2004 return
2005 try:
2006 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2007 if not vdur._state == VDURecordState.READY:
2008 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2009 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2010 return
2011 with self._dts.transaction() as new_xact:
2012 resp = yield from vdur.read_resource(new_xact)
2013 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2014 vdur_console.id = self._vdur_id
2015 if resp.console_url:
2016 vdur_console.console_url = resp.console_url
2017 else:
2018 vdur_console.console_url = 'none'
2019 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2020 except Exception:
2021 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2022 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2023 vdur_console.id = self._vdur_id
2024 vdur_console.console_url = 'none'
2025
2026 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2027 xpath=self.vnfr_vdu_console_xpath,
2028 msg=vdur_console)
2029 else:
2030 #raise VnfRecordError("Not supported operation %s" % action)
2031 self._log.error("Not supported operation %s" % action)
2032 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2033 return
2034
2035
2036 self._log.debug("Registering for VNFR VDU using xpath: %s",
2037 self.vnfr_vdu_console_xpath)
2038 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2039 with self._dts.group_create() as group:
2040 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2041 handler=hdl,
2042 flags=rwdts.Flag.PUBLISHER,
2043 )
2044
2045
2046 class VnfrDtsHandler(object):
2047 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2048 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2049
2050 def __init__(self, dts, log, loop, vnfm):
2051 self._dts = dts
2052 self._log = log
2053 self._loop = loop
2054 self._vnfm = vnfm
2055
2056 self._regh = None
2057
2058 @property
2059 def regh(self):
2060 """ Return registration handle"""
2061 return self._regh
2062
2063 @property
2064 def vnfm(self):
2065 """ Return VNF manager instance """
2066 return self._vnfm
2067
2068 @asyncio.coroutine
2069 def register(self):
2070 """ Register for vnfr create/update/delete/read requests from dts """
2071 def on_commit(xact_info):
2072 """ The transaction has been committed """
2073 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2074 return rwdts.MemberRspCode.ACTION_OK
2075
2076 def on_abort(*args):
2077 """ Abort callback """
2078 self._log.debug("VNF transaction got aborted")
2079
2080 @asyncio.coroutine
2081 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2082
2083 @asyncio.coroutine
2084 def instantiate_realloc_vnfr(vnfr):
2085 """Re-populate the vnfm after restart
2086
2087 Arguments:
2088 vlink
2089
2090 """
2091
2092 yield from vnfr.instantiate(None, restart_mode=True)
2093
2094 if xact_event == rwdts.MemberEvent.INSTALL:
2095 curr_cfg = self.regh.elements
2096 for cfg in curr_cfg:
2097 vnfr = self.vnfm.create_vnfr(cfg)
2098 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2099
2100 self._log.debug("Got on_event in vnfm")
2101
2102 return rwdts.MemberRspCode.ACTION_OK
2103
2104 @asyncio.coroutine
2105 def on_prepare(xact_info, action, ks_path, msg):
2106 """ prepare callback from dts """
2107 self._log.debug(
2108 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2109 xact_info, action, msg
2110 )
2111
2112 if action == rwdts.QueryAction.CREATE:
2113 if not msg.has_field("vnfd_ref"):
2114 err = "Vnfd reference not provided"
2115 self._log.error(err)
2116 raise VnfRecordError(err)
2117
2118 vnfr = self.vnfm.create_vnfr(msg)
2119 try:
2120 # RIFT-9105: Unable to add a READ query under an existing transaction
2121 # xact = xact_info.xact
2122 yield from vnfr.instantiate(None)
2123 except Exception as e:
2124 self._log.exception(e)
2125 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2126 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2127 yield from vnfr.publish(None)
2128 elif action == rwdts.QueryAction.DELETE:
2129 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2130 path_entry = schema.keyspec_to_entry(ks_path)
2131 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2132
2133 if vnfr is None:
2134 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2135 raise VirtualNetworkFunctionRecordNotFound(
2136 "VNFR id %s", path_entry.key00.id)
2137
2138 try:
2139 yield from vnfr.terminate(xact_info.xact)
2140 # Unref the VNFD
2141 vnfr.vnfd.unref()
2142 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2143 except Exception as e:
2144 self._log.exception(e)
2145 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2146
2147 elif action == rwdts.QueryAction.UPDATE:
2148 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2149 path_entry = schema.keyspec_to_entry(ks_path)
2150 vnfr = None
2151 try:
2152 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2153 except Exception as e:
2154 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2155 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2156 return
2157
2158 if vnfr is None:
2159 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2160 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2161 return
2162
2163 self._log.debug("VNFR {} update config status {} (current {})".
2164 format(vnfr.name, msg.config_status, vnfr.config_status))
2165 # Update the config status and publish
2166 vnfr._config_status = msg.config_status
2167 yield from vnfr.publish(None)
2168
2169 else:
2170 raise NotImplementedError(
2171 "%s action on VirtualNetworkFunctionRecord not supported",
2172 action)
2173
2174 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2175
2176 self._log.debug("Registering for VNFR using xpath: %s",
2177 VnfrDtsHandler.XPATH,)
2178
2179 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2180 on_prepare=on_prepare,)
2181 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2182 with self._dts.group_create(handler=handlers) as group:
2183 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2184 handler=hdl,
2185 flags=(rwdts.Flag.PUBLISHER |
2186 rwdts.Flag.NO_PREP_READ |
2187 rwdts.Flag.CACHE |
2188 rwdts.Flag.DATASTORE),)
2189
2190 @asyncio.coroutine
2191 def create(self, xact, path, msg):
2192 """
2193 Create a VNFR record in DTS with path and message
2194 """
2195 self._log.debug("Creating VNFR xact = %s, %s:%s",
2196 xact, path, msg)
2197
2198 self.regh.create_element(path, msg)
2199 self._log.debug("Created VNFR xact = %s, %s:%s",
2200 xact, path, msg)
2201
2202 @asyncio.coroutine
2203 def update(self, xact, path, msg):
2204 """
2205 Update a VNFR record in DTS with path and message
2206 """
2207 self._log.debug("Updating VNFR xact = %s, %s:%s",
2208 xact, path, msg)
2209 self.regh.update_element(path, msg)
2210 self._log.debug("Updated VNFR xact = %s, %s:%s",
2211 xact, path, msg)
2212
2213 @asyncio.coroutine
2214 def delete(self, xact, path):
2215 """
2216 Delete a VNFR record in DTS with path and message
2217 """
2218 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2219 self.regh.delete_element(path)
2220 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2221
2222
2223 class VirtualNetworkFunctionDescriptor(object):
2224 """
2225 Virtual Network Function descriptor class
2226 """
2227
2228 def __init__(self, dts, log, loop, vnfm, vnfd):
2229 self._dts = dts
2230 self._log = log
2231 self._loop = loop
2232
2233 self._vnfm = vnfm
2234 self._vnfd = vnfd
2235 self._ref_count = 0
2236
2237 @property
2238 def ref_count(self):
2239 """ Returns the reference count associated with
2240 this Virtual Network Function Descriptor"""
2241 return self._ref_count
2242
2243 @property
2244 def id(self):
2245 """ Returns vnfd id """
2246 return self._vnfd.id
2247
2248 @property
2249 def name(self):
2250 """ Returns vnfd name """
2251 return self._vnfd.name
2252
2253 def in_use(self):
2254 """ Returns whether vnfd is in use or not """
2255 return True if self._ref_count > 0 else False
2256
2257 def ref(self):
2258 """ Take a reference on this object """
2259 self._ref_count += 1
2260 return self._ref_count
2261
2262 def unref(self):
2263 """ Release reference on this object """
2264 if self.ref_count < 1:
2265 msg = ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2266 (self.id, self._ref_count))
2267 self._log.critical(msg)
2268 raise VnfRecordError(msg)
2269 self._log.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2270 self.id, self.ref_count)
2271 self._ref_count -= 1
2272 return self._ref_count
2273
2274 @property
2275 def msg(self):
2276 """ Return the message associated with this NetworkServiceDescriptor"""
2277 return self._vnfd
2278
2279 @staticmethod
2280 def path_for_id(vnfd_id):
2281 """ Return path for the passed vnfd_id"""
2282 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
2283
2284 def path(self):
2285 """ Return the path associated with this NetworkServiceDescriptor"""
2286 return VirtualNetworkFunctionDescriptor.path_for_id(self.id)
2287
2288 def update(self, vnfd):
2289 """ Update the Virtual Network Function Descriptor """
2290 if self.in_use():
2291 self._log.error("Cannot update descriptor %s in use refcnt=%d",
2292 self.id, self.ref_count)
2293
2294 # The following loop is added to debug RIFT-13284
2295 for vnf_rec in self._vnfm._vnfrs.values():
2296 if vnf_rec.vnfd_id == self.id:
2297 self._log.error("descriptor %s in used by %s:%s",
2298 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2299 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self.id)
2300 self._vnfd = vnfd
2301
2302 def delete(self):
2303 """ Delete the Virtual Network Function Descriptor """
2304 if self.in_use():
2305 self._log.error("Cannot delete descriptor %s in use refcnt=%d",
2306 self.id)
2307
2308 # The following loop is added to debug RIFT-13284
2309 for vnf_rec in self._vnfm._vnfrs.values():
2310 if vnf_rec.vnfd_id == self.id:
2311 self._log.error("descriptor %s in used by %s:%s",
2312 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2313 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self.id)
2314 self._vnfm.delete_vnfd(self.id)
2315
2316
2317 class VnfdRefCountDtsHandler(object):
2318 """ The VNFD Ref Count DTS handler """
2319 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2320
2321 def __init__(self, dts, log, loop, vnfm):
2322 self._dts = dts
2323 self._log = log
2324 self._loop = loop
2325 self._vnfm = vnfm
2326
2327 self._regh = None
2328
2329 @property
2330 def regh(self):
2331 """ Return registration handle """
2332 return self._regh
2333
2334 @property
2335 def vnfm(self):
2336 """ Return the NS manager instance """
2337 return self._vnfm
2338
2339 @asyncio.coroutine
2340 def register(self):
2341 """ Register for VNFD ref count read from dts """
2342
2343 @asyncio.coroutine
2344 def on_prepare(xact_info, action, ks_path, msg):
2345 """ prepare callback from dts """
2346 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2347 self._log.debug(
2348 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2349 xact_info, action, xpath, msg
2350 )
2351
2352 if action == rwdts.QueryAction.READ:
2353 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2354 path_entry = schema.keyspec_to_entry(ks_path)
2355 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2356 for xpath, msg in vnfd_list:
2357 self._log.debug("Responding to ref count query path:%s, msg:%s",
2358 xpath, msg)
2359 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2360 xpath=xpath,
2361 msg=msg)
2362 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2363 else:
2364 raise VnfRecordError("Not supported operation %s" % action)
2365
2366 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2367 with self._dts.group_create() as group:
2368 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2369 handler=hdl,
2370 flags=rwdts.Flag.PUBLISHER,
2371 )
2372
2373
2374 class VdurDatastore(object):
2375 """
2376 This VdurDatastore is intended to expose select information about a VDUR
2377 such that it can be referenced in a cloud config file. The data that is
2378 exposed does not necessarily follow the structure of the data in the yang
2379 model. This is intentional. The data that are exposed are intended to be
2380 agnostic of the yang model so that changes in the model do not necessarily
2381 require changes to the interface provided to the user. It also means that
2382 the user does not need to be familiar with the RIFT.ware yang models.
2383 """
2384
2385 def __init__(self):
2386 """Create an instance of VdurDatastore"""
2387 self._vdur_data = dict()
2388 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2389
2390 def add(self, vdur):
2391 """Add a new VDUR to the datastore
2392
2393 Arguments:
2394 vdur - a VirtualDeploymentUnitRecord instance
2395
2396 Raises:
2397 A ValueError is raised if the VDUR is (1) None or (2) already in
2398 the datastore.
2399
2400 """
2401 if vdur.vdu_id is None:
2402 raise ValueError('VDURs are required to have an ID')
2403
2404 if vdur.vdu_id in self._vdur_data:
2405 raise ValueError('cannot add a VDUR more than once')
2406
2407 self._vdur_data[vdur.vdu_id] = dict()
2408
2409 def set_if_not_none(key, attr):
2410 if attr is not None:
2411 self._vdur_data[vdur.vdu_id][key] = attr
2412
2413 set_if_not_none('name', vdur._vdud.name)
2414 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2415
2416 def update(self, vdur):
2417 """Update the VDUR information in the datastore
2418
2419 Arguments:
2420 vdur - a GI representation of a VDUR
2421
2422 Raises:
2423 A ValueError is raised if the VDUR is (1) None or (2) already in
2424 the datastore.
2425
2426 """
2427 if vdur.vdu_id is None:
2428 raise ValueError('VNFDs are required to have an ID')
2429
2430 if vdur.vdu_id not in self._vdur_data:
2431 raise ValueError('VNF is not recognized')
2432
2433 def set_or_delete(key, attr):
2434 if attr is None:
2435 if key in self._vdur_data[vdur.vdu_id]:
2436 del self._vdur_data[vdur.vdu_id][key]
2437
2438 else:
2439 self._vdur_data[vdur.vdu_id][key] = attr
2440
2441 set_or_delete('name', vdur._vdud.name)
2442 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2443
2444 def remove(self, vdur_id):
2445 """Remove all of the data associated with specified VDUR
2446
2447 Arguments:
2448 vdur_id - the identifier of a VNFD in the datastore
2449
2450 Raises:
2451 A ValueError is raised if the VDUR is not contained in the
2452 datastore.
2453
2454 """
2455 if vdur_id not in self._vdur_data:
2456 raise ValueError('VNF is not recognized')
2457
2458 del self._vdur_data[vdur_id]
2459
2460 def get(self, expr):
2461 """Retrieve VDUR information from the datastore
2462
2463 An expression should be of the form,
2464
2465 vdu[<id>].<attr>
2466
2467 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2468 the exposed attribute that the user wishes to retrieve.
2469
2470 If the requested data is not available, None is returned.
2471
2472 Arguments:
2473 expr - a string that specifies the data to return
2474
2475 Raises:
2476 A ValueError is raised if the provided expression cannot be parsed.
2477
2478 Returns:
2479 The requested data or None
2480
2481 """
2482 result = self._pattern.match(expr)
2483 if result is None:
2484 raise ValueError('data expression not recognized ({})'.format(expr))
2485
2486 vdur_id, key = result.groups()
2487
2488 if vdur_id not in self._vdur_data:
2489 return None
2490
2491 return self._vdur_data[vdur_id].get(key, None)
2492
2493
2494 class VnfManager(object):
2495 """ The virtual network function manager class """
2496 def __init__(self, dts, log, loop, cluster_name):
2497 self._dts = dts
2498 self._log = log
2499 self._loop = loop
2500 self._cluster_name = cluster_name
2501
2502 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2503 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2504 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2505 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2506
2507 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2508 self._vcs_handler,
2509 self._nsr_handler]
2510 self._vnfrs = {}
2511 self._vnfds = {}
2512 self._nsrs = {}
2513
2514 @property
2515 def vnfr_handler(self):
2516 """ VNFR dts handler """
2517 return self._vnfr_handler
2518
2519 @property
2520 def vcs_handler(self):
2521 """ VCS dts handler """
2522 return self._vcs_handler
2523
2524 @asyncio.coroutine
2525 def register(self):
2526 """ Register all static DTS handlers """
2527 for hdl in self._dts_handlers:
2528 yield from hdl.register()
2529
2530 @asyncio.coroutine
2531 def run(self):
2532 """ Run this VNFM instance """
2533 self._log.debug("Run VNFManager - registering static DTS handlers""")
2534 yield from self.register()
2535
2536 def handle_nsr(self, nsr, action):
2537 if action in [rwdts.QueryAction.CREATE]:
2538 self._nsrs[nsr.id] = nsr
2539 elif action == rwdts.QueryAction.DELETE:
2540 if nsr.id in self._nsrs:
2541 del self._nsrs[nsr.id]
2542
2543 def get_linked_mgmt_network(self, vnfr):
2544 """For the given VNFR get the related mgmt network from the NSD, if
2545 available.
2546 """
2547 vnfd_id = vnfr.vnfd_ref
2548 nsr_id = vnfr.nsr_id_ref
2549
2550 # for the given related VNFR, get the corresponding NSR-config
2551 nsr_obj = None
2552 try:
2553 nsr_obj = self._nsrs[nsr_id]
2554 except KeyError:
2555 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2556
2557 # for the related NSD check if a VLD exists such that it's a mgmt
2558 # network
2559 for vld in nsr_obj.nsd.vld:
2560 if vld.mgmt_network:
2561 return vld.name
2562
2563 return None
2564
2565 def get_vnfr(self, vnfr_id):
2566 """ get VNFR by vnfr id """
2567
2568 if vnfr_id not in self._vnfrs:
2569 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2570
2571 return self._vnfrs[vnfr_id]
2572
2573 def create_vnfr(self, vnfr):
2574 """ Create a VNFR instance """
2575 if vnfr.id in self._vnfrs:
2576 msg = "Vnfr id %s already exists" % vnfr.id
2577 self._log.error(msg)
2578 raise VnfRecordError(msg)
2579
2580 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2581 vnfr.id,
2582 vnfr.vnfd_ref)
2583
2584 mgmt_network = self.get_linked_mgmt_network(vnfr)
2585
2586 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2587 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2588 mgmt_network=mgmt_network
2589 )
2590 return self._vnfrs[vnfr.id]
2591
2592 @asyncio.coroutine
2593 def delete_vnfr(self, xact, vnfr):
2594 """ Create a VNFR instance """
2595 if vnfr.vnfr_id in self._vnfrs:
2596 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2597 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2598 del self._vnfrs[vnfr.vnfr_id]
2599
2600 @asyncio.coroutine
2601 def fetch_vnfd(self, vnfd_id):
2602 """ Fetch VNFDs based with the vnfd id"""
2603 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2604 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2605 vnfd = None
2606
2607 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2608
2609 for ent in res_iter:
2610 res = yield from ent
2611 vnfd = res.result
2612
2613 if vnfd is None:
2614 err = "Failed to get Vnfd %s" % vnfd_id
2615 self._log.error(err)
2616 raise VnfRecordError(err)
2617
2618 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2619
2620 return vnfd
2621
2622 @asyncio.coroutine
2623 def get_vnfd_ref(self, vnfd_id):
2624 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2625 vnfd = yield from self.get_vnfd(vnfd_id)
2626 vnfd.ref()
2627 return vnfd
2628
2629 @asyncio.coroutine
2630 def get_vnfd(self, vnfd_id):
2631 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2632 vnfd = None
2633 if vnfd_id not in self._vnfds:
2634 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2635 vnfd = yield from self.fetch_vnfd(vnfd_id)
2636
2637 if vnfd is None:
2638 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2639 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD id:%s", vnfd_id)
2640
2641 if vnfd.id != vnfd_id:
2642 self._log.error("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2643 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2644
2645 if vnfd.id not in self._vnfds:
2646 self.create_vnfd(vnfd)
2647
2648 return self._vnfds[vnfd_id]
2649
2650 def vnfd_in_use(self, vnfd_id):
2651 """ Is this VNFD in use """
2652 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2653 if vnfd_id in self._vnfds:
2654 return self._vnfds[vnfd_id].in_use()
2655 return False
2656
2657 @asyncio.coroutine
2658 def publish_vnfr(self, xact, path, msg):
2659 """ Publish a VNFR """
2660 self._log.debug("publish_vnfr called with path %s, msg %s",
2661 path, msg)
2662 yield from self.vnfr_handler.update(xact, path, msg)
2663
2664 def create_vnfd(self, vnfd):
2665 """ Create a virtual network function descriptor """
2666 self._log.debug("Create virtual networkfunction descriptor - %s", vnfd)
2667 if vnfd.id in self._vnfds:
2668 self._log.error("Cannot create VNFD %s -VNFD id already exists", vnfd)
2669 raise VirtualNetworkFunctionDescriptorError("VNFD already exists-%s", vnfd.id)
2670
2671 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2672 self._log,
2673 self._loop,
2674 self,
2675 vnfd)
2676 return self._vnfds[vnfd.id]
2677
2678 def update_vnfd(self, vnfd):
2679 """ update the Virtual Network Function descriptor """
2680 self._log.debug("Update virtual network function descriptor - %s", vnfd)
2681
2682 if vnfd.id not in self._vnfds:
2683 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
2684 self.create_vnfd(vnfd)
2685 else:
2686 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
2687 self._vnfds[vnfd.id].update(vnfd)
2688
2689 @asyncio.coroutine
2690 def delete_vnfd(self, vnfd_id):
2691 """ Delete the Virtual Network Function descriptor with the passed id """
2692 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2693 if vnfd_id not in self._vnfds:
2694 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2695 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2696
2697 if self._vnfds[vnfd_id].in_use():
2698 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2699 vnfd_id,
2700 self._vnfds[vnfd_id].ref_count)
2701 raise VirtualNetworkFunctionDescriptorRefCountExists(
2702 "Cannot delete :%s, ref_count:%s",
2703 vnfd_id,
2704 self._vnfds[vnfd_id].ref_count)
2705
2706 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2707 try:
2708 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2709 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2710 if os.path.exists(vnfd_dir):
2711 shutil.rmtree(vnfd_dir, ignore_errors=True)
2712 except Exception as e:
2713 self._log.error("Exception in cleaning up VNFD {}: {}".
2714 format(self._vnfds[vnfd_id].name, e))
2715 self._log.exception(e)
2716
2717 del self._vnfds[vnfd_id]
2718
2719 def vnfd_refcount_xpath(self, vnfd_id):
2720 """ xpath for ref count entry """
2721 return (VnfdRefCountDtsHandler.XPATH +
2722 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2723
2724 @asyncio.coroutine
2725 def get_vnfd_refcount(self, vnfd_id):
2726 """ Get the vnfd_list from this VNFM"""
2727 vnfd_list = []
2728 if vnfd_id is None or vnfd_id == "":
2729 for vnfd in self._vnfds.values():
2730 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2731 vnfd_msg.vnfd_id_ref = vnfd.id
2732 vnfd_msg.instance_ref_count = vnfd.ref_count
2733 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2734 elif vnfd_id in self._vnfds:
2735 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2736 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2737 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2738
2739 return vnfd_list
2740
2741
2742 class VnfmTasklet(rift.tasklets.Tasklet):
2743 """ VNF Manager tasklet class """
2744 def __init__(self, *args, **kwargs):
2745 super(VnfmTasklet, self).__init__(*args, **kwargs)
2746 self.rwlog.set_category("rw-mano-log")
2747 self.rwlog.set_subcategory("vnfm")
2748
2749 self._dts = None
2750 self._vnfm = None
2751
2752 def start(self):
2753 try:
2754 super(VnfmTasklet, self).start()
2755 self.log.info("Starting VnfmTasklet")
2756
2757 self.log.setLevel(logging.DEBUG)
2758
2759 self.log.debug("Registering with dts")
2760 self._dts = rift.tasklets.DTS(self.tasklet_info,
2761 RwVnfmYang.get_schema(),
2762 self.loop,
2763 self.on_dts_state_change)
2764
2765 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2766 except Exception:
2767 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2768 raise
2769
2770 def on_instance_started(self):
2771 """ Task insance started callback """
2772 self.log.debug("Got instance started callback")
2773
2774 def stop(self):
2775 try:
2776 self._dts.deinit()
2777 except Exception:
2778 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2779 raise
2780
2781 @asyncio.coroutine
2782 def init(self):
2783 """ Task init callback """
2784 try:
2785 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2786 assert vm_parent_name is not None
2787 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2788 yield from self._vnfm.run()
2789 except Exception:
2790 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2791 raise
2792
2793 @asyncio.coroutine
2794 def run(self):
2795 """ Task run callback """
2796 pass
2797
2798 @asyncio.coroutine
2799 def on_dts_state_change(self, state):
2800 """Take action according to current dts state to transition
2801 application into the corresponding application state
2802
2803 Arguments
2804 state - current dts state
2805 """
2806 switch = {
2807 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2808 rwdts.State.CONFIG: rwdts.State.RUN,
2809 }
2810
2811 handlers = {
2812 rwdts.State.INIT: self.init,
2813 rwdts.State.RUN: self.run,
2814 }
2815
2816 # Transition application to next state
2817 handler = handlers.get(state, None)
2818 if handler is not None:
2819 yield from handler()
2820
2821 # Transition dts to next state
2822 next_state = switch.get(state, None)
2823 if next_state is not None:
2824 self._dts.handle.set_state(next_state)