Model changes to change leaf-lists to lists
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52
53
54 class VMResourceError(Exception):
55 """ VM resource Error"""
56 pass
57
58
59 class VnfRecordError(Exception):
60 """ VNF record instatiation failed"""
61 pass
62
63
64 class VduRecordError(Exception):
65 """ VDU record instatiation failed"""
66 pass
67
68
69 class NotImplemented(Exception):
70 """Not implemented """
71 pass
72
73
74 class VnfrRecordExistsError(Exception):
75 """VNFR record already exist with the same VNFR id"""
76 pass
77
78
79 class InternalVirtualLinkRecordError(Exception):
80 """Internal virtual link record error"""
81 pass
82
83
84 class VDUImageNotFound(Exception):
85 """VDU Image not found error"""
86 pass
87
88
89 class VirtualDeploymentUnitRecordError(Exception):
90 """VDU Instantiation failed"""
91 pass
92
93
94 class VMNotReadyError(Exception):
95 """ VM Not yet received from resource manager """
96 pass
97
98
99 class VDURecordNotFound(Exception):
100 """ Could not find a VDU record """
101 pass
102
103
104 class VirtualNetworkFunctionRecordDescNotFound(Exception):
105 """ Cannot find Virtual Network Function Record Descriptor """
106 pass
107
108
109 class VirtualNetworkFunctionDescriptorError(Exception):
110 """ Virtual Network Function Record Descriptor Error """
111 pass
112
113
114 class VirtualNetworkFunctionDescriptorNotFound(Exception):
115 """ Virtual Network Function Record Descriptor Not Found """
116 pass
117
118
119 class VirtualNetworkFunctionRecordNotFound(Exception):
120 """ Virtual Network Function Record Not Found """
121 pass
122
123
124 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
125 """ Virtual Network Funtion Descriptor reference count exists """
126 pass
127
128
129 class VnfrInstantiationFailed(Exception):
130 """ Virtual Network Funtion Instantiation failed"""
131 pass
132
133
134 class VNFMPlacementGroupError(Exception):
135 pass
136
137 class VirtualNetworkFunctionRecordState(enum.Enum):
138 """ VNFR state """
139 INIT = 1
140 VL_INIT_PHASE = 2
141 VM_INIT_PHASE = 3
142 READY = 4
143 TERMINATE = 5
144 VL_TERMINATE_PHASE = 6
145 VDU_TERMINATE_PHASE = 7
146 TERMINATED = 7
147 FAILED = 10
148
149
150 class VDURecordState(enum.Enum):
151 """VDU record state """
152 INIT = 1
153 INSTANTIATING = 2
154 RESOURCE_ALLOC_PENDING = 3
155 READY = 4
156 TERMINATING = 5
157 TERMINATED = 6
158 FAILED = 10
159
160
161 class VcsComponent(object):
162 """ VCS Component within the VNF descriptor """
163 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
164 self._dts = dts
165 self._log = log
166 self._loop = loop
167 self._component = component
168 self._cluster_name = cluster_name
169 self._vcs_handler = vcs_handler
170 self._mangled_name = mangled_name
171
172 @staticmethod
173 def mangle_name(component_name, vnf_name, vnfd_id):
174 """ mangled component name """
175 return vnf_name + ":" + component_name + ":" + vnfd_id
176
177 @property
178 def name(self):
179 """ name of this component"""
180 return self._mangled_name
181
182 @property
183 def path(self):
184 """ The path for this object """
185 return("D,/rw-manifest:manifest" +
186 "/rw-manifest:operational-inventory" +
187 "/rw-manifest:component" +
188 "[rw-manifest:component-name = '{}']").format(self.name)
189
190 @property
191 def instance_xpath(self):
192 """ The path for this object """
193 return("D,/rw-base:vcs" +
194 "/instances" +
195 "/instance" +
196 "[instance-name = '{}']".format(self._cluster_name))
197
198 @property
199 def start_comp_xpath(self):
200 """ start component xpath """
201 return (self.instance_xpath +
202 "/child-n[instance-name = 'START-REQ']")
203
204 def get_start_comp_msg(self, ip_address):
205 """ start this component """
206 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
207 start_msg.instance_name = 'START-REQ'
208 start_msg.component_name = self.name
209 start_msg.admin_command = "START"
210 start_msg.ip_address = ip_address
211
212 return start_msg
213
214 @property
215 def msg(self):
216 """ Returns the message for this vcs component"""
217
218 vcs_comp_dict = self._component.as_dict()
219
220 def mangle_comp_names(comp_dict):
221 """ mangle component name with VNF name, id"""
222 for key, val in comp_dict.items():
223 if isinstance(val, dict):
224 comp_dict[key] = mangle_comp_names(val)
225 elif isinstance(val, list):
226 i = 0
227 for ent in val:
228 if isinstance(ent, dict):
229 val[i] = mangle_comp_names(ent)
230 else:
231 val[i] = ent
232 i += 1
233 elif key == "component_name":
234 comp_dict[key] = VcsComponent.mangle_name(val,
235 self._vnfd_name,
236 self._vnfd_id)
237 return comp_dict
238
239 mangled_dict = mangle_comp_names(vcs_comp_dict)
240 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
241 return msg
242
243 @asyncio.coroutine
244 def publish(self, xact):
245 """ Publishes the VCS component """
246 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
247 self.name, self.path, self.msg)
248 yield from self._vcs_handler.publish(xact, self.path, self.msg)
249
250 @asyncio.coroutine
251 def start(self, xact, parent, ip_addr=None):
252 """ Starts this VCS component """
253 # ATTN RV - replace with block add
254 start_msg = self.get_start_comp_msg(ip_addr)
255 self._log.debug("starting component %s %s",
256 self.start_comp_xpath, start_msg)
257 yield from self._dts.query_create(self.start_comp_xpath,
258 0,
259 start_msg)
260 self._log.debug("started component %s, %s",
261 self.start_comp_xpath, start_msg)
262
263
264 class VirtualDeploymentUnitRecord(object):
265 """ Virtual Deployment Unit Record """
266 def __init__(self,
267 dts,
268 log,
269 loop,
270 vdud,
271 vnfr,
272 mgmt_intf,
273 cloud_account_name,
274 vnfd_package_store,
275 vdur_id=None,
276 placement_groups=[]):
277 self._dts = dts
278 self._log = log
279 self._loop = loop
280 self._vdud = vdud
281 self._vnfr = vnfr
282 self._mgmt_intf = mgmt_intf
283 self._cloud_account_name = cloud_account_name
284 self._vnfd_package_store = vnfd_package_store
285
286 self._vdur_id = vdur_id or str(uuid.uuid4())
287 self._int_intf = []
288 self._ext_intf = []
289 self._state = VDURecordState.INIT
290 self._state_failed_reason = None
291 self._request_id = str(uuid.uuid4())
292 self._name = vnfr.name + "__" + vdud.id
293 self._placement_groups = placement_groups
294 self._rm_regh = None
295 self._vm_resp = None
296 self._vdud_cloud_init = None
297 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
298
299 @asyncio.coroutine
300 def vdu_opdata_register(self):
301 yield from self._vdur_console_handler.register()
302
303 def cp_ip_addr(self, cp_name):
304 """ Find ip address by connection point name """
305 if self._vm_resp is not None:
306 for conn_point in self._vm_resp.connection_points:
307 if conn_point.name == cp_name:
308 return conn_point.ip_address
309 return "0.0.0.0"
310
311 def cp_id(self, cp_name):
312 """ Find connection point id by connection point name """
313 if self._vm_resp is not None:
314 for conn_point in self._vm_resp.connection_points:
315 if conn_point.name == cp_name:
316 return conn_point.connection_point_id
317 return ''
318
319 @property
320 def vdu_id(self):
321 return self._vdud.id
322
323 @property
324 def vm_resp(self):
325 return self._vm_resp
326
327 @property
328 def name(self):
329 """ Return this VDUR's name """
330 return self._name
331
332 @property
333 def cloud_account_name(self):
334 """ Cloud account this VDU should be created in """
335 return self._cloud_account_name
336
337 @property
338 def image_name(self):
339 """ name that should be used to lookup the image on the CMP """
340 return os.path.basename(self._vdud.image)
341
342 @property
343 def image_checksum(self):
344 """ name that should be used to lookup the image on the CMP """
345 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
346
347 @property
348 def management_ip(self):
349 if not self.active:
350 return None
351 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
352
353 @property
354 def vm_management_ip(self):
355 if not self.active:
356 return None
357 return self._vm_resp.management_ip
358
359 @property
360 def operational_status(self):
361 """ Operational status of this VDU"""
362 op_stats_dict = {"INIT": "init",
363 "INSTANTIATING": "vm_init_phase",
364 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
365 "READY": "running",
366 "FAILED": "failed",
367 "TERMINATING": "terminated",
368 "TERMINATED": "terminated",
369 }
370 return op_stats_dict[self._state.name]
371
372 @property
373 def msg(self):
374 """ VDU message """
375 vdu_fields = ["vm_flavor",
376 "guest_epa",
377 "vswitch_epa",
378 "hypervisor_epa",
379 "host_epa",
380 "name"]
381 vdu_copy_dict = {k: v for k, v in
382 self._vdud.as_dict().items() if k in vdu_fields}
383 vdur_dict = {"id": self._vdur_id,
384 "vdu_id_ref": self._vdud.id,
385 "operational_status": self.operational_status,
386 "operational_status_details": self._state_failed_reason,
387 }
388 if self.vm_resp is not None:
389 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
390 "flavor_id": self.vm_resp.flavor_id,
391 "image_id": self.vm_resp.image_id,
392 })
393
394 if self.management_ip is not None:
395 vdur_dict["management_ip"] = self.management_ip
396
397 if self.vm_management_ip is not None:
398 vdur_dict["vm_management_ip"] = self.vm_management_ip
399
400 vdur_dict.update(vdu_copy_dict)
401
402 icp_list = []
403 ii_list = []
404
405 for intf, cp_id, vlr in self._int_intf:
406 cp = self.find_internal_cp_by_cp_id(cp_id)
407
408 icp_list.append({"name": cp.name,
409 "id": cp.id,
410 "type_yang": "VPORT",
411 "ip_address": self.cp_ip_addr(cp.id)})
412
413 ii_list.append({"name": intf.name,
414 "vdur_internal_connection_point_ref": cp.id,
415 "virtual_interface": {}})
416
417 vdur_dict["internal_connection_point"] = icp_list
418 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
419 vdur_dict["internal_interface"] = ii_list
420
421 ei_list = []
422 for intf, cp, vlr in self._ext_intf:
423 ei_list.append({"name": cp,
424 "vnfd_connection_point_ref": cp,
425 "virtual_interface": {}})
426 self._vnfr.update_cp(cp, self.cp_ip_addr(cp), self.cp_id(cp))
427
428 vdur_dict["external_interface"] = ei_list
429
430 placement_groups = []
431 for group in self._placement_groups:
432 placement_groups.append(group.as_dict())
433
434 vdur_dict['placement_groups_info'] = placement_groups
435 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
436
437 @property
438 def resmgr_path(self):
439 """ path for resource-mgr"""
440 return ("D,/rw-resource-mgr:resource-mgmt" +
441 "/vdu-event" +
442 "/vdu-event-data[event-id='{}']".format(self._request_id))
443
444 @property
445 def vm_flavor_msg(self):
446 """ VM flavor message """
447 flavor = self._vdud.vm_flavor.__class__()
448 flavor.copy_from(self._vdud.vm_flavor)
449
450 return flavor
451
452 @property
453 def vdud_cloud_init(self):
454 """ Return the cloud-init contents for the VDU """
455 if self._vdud_cloud_init is None:
456 self._vdud_cloud_init = self.cloud_init()
457
458 return self._vdud_cloud_init
459
460 def cloud_init(self):
461 """ Populate cloud_init with cloud-config script from
462 either the inline contents or from the file provided
463 """
464 if self._vdud.cloud_init is not None:
465 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
466 return self._vdud.cloud_init
467 elif self._vdud.cloud_init_file is not None:
468 # Get cloud-init script contents from the file provided in the cloud_init_file param
469 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
470 filename = self._vdud.cloud_init_file
471 self._vnfd_package_store.refresh()
472 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
473 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
474 try:
475 return cloud_init_extractor.read_script(stored_package, filename)
476 except rift.package.cloud_init.CloudInitExtractionError as e:
477 raise VirtualDeploymentUnitRecordError(e)
478 else:
479 self._log.debug("VDU Instantiation: cloud-init script not provided")
480
481 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
482 host_aggregates = []
483 availability_zones = []
484 server_groups = []
485 for group in self._placement_groups:
486 if group.has_field('host_aggregate'):
487 for aggregate in group.host_aggregate:
488 host_aggregates.append(aggregate.as_dict())
489 if group.has_field('availability_zone'):
490 availability_zones.append(group.availability_zone.as_dict())
491 if group.has_field('server_group'):
492 server_groups.append(group.server_group.as_dict())
493
494 if availability_zones:
495 if len(availability_zones) > 1:
496 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
497 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
498 else:
499 vm_create_msg_dict['availability_zone'] = availability_zones[0]
500
501 if server_groups:
502 if len(server_groups) > 1:
503 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
504 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
505 else:
506 vm_create_msg_dict['server_group'] = server_groups[0]
507
508 if host_aggregates:
509 vm_create_msg_dict['host_aggregate'] = host_aggregates
510
511 return
512
513 def process_placement_groups(self, vm_create_msg_dict):
514 """Process the placement_groups and fill resource-mgr request"""
515 if not self._placement_groups:
516 return
517
518 cloud_set = set([group.cloud_type for group in self._placement_groups])
519 assert len(cloud_set) == 1
520 cloud_type = cloud_set.pop()
521
522 if cloud_type == 'openstack':
523 self.process_openstack_placement_group_construct(vm_create_msg_dict)
524
525 else:
526 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
527 return
528
529 def resmgr_msg(self, config=None):
530 vdu_fields = ["vm_flavor",
531 "guest_epa",
532 "vswitch_epa",
533 "hypervisor_epa",
534 "host_epa"]
535
536 self._log.debug("Creating params based on VDUD: %s", self._vdud)
537 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
538
539 vm_create_msg_dict = {
540 "name": self.name,
541 "image_name": self.image_name,
542 }
543
544 if self.image_checksum is not None:
545 vm_create_msg_dict["image_checksum"] = self.image_checksum
546
547 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
548 if self._vdud.has_field('mgmt_vpci'):
549 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
550
551 self._log.debug("VDUD: %s", self._vdud)
552 if config is not None:
553 vm_create_msg_dict['vdu_init'] = {'userdata': config}
554
555 cp_list = []
556 for intf, cp, vlr in self._ext_intf:
557 cp_info = {"name": cp,
558 "virtual_link_id": vlr.network_id,
559 "type_yang": intf.virtual_interface.type_yang}
560
561 if (intf.virtual_interface.has_field('vpci') and
562 intf.virtual_interface.vpci is not None):
563 cp_info["vpci"] = intf.virtual_interface.vpci
564
565 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
566 cp_info['security_group'] = vlr.ip_profile_params.security_group
567
568 cp_list.append(cp_info)
569
570 for intf, cp, vlr in self._int_intf:
571 if (intf.virtual_interface.has_field('vpci') and
572 intf.virtual_interface.vpci is not None):
573 cp_list.append({"name": cp,
574 "virtual_link_id": vlr.network_id,
575 "type_yang": intf.virtual_interface.type_yang,
576 "vpci": intf.virtual_interface.vpci})
577 else:
578 cp_list.append({"name": cp,
579 "virtual_link_id": vlr.network_id,
580 "type_yang": intf.virtual_interface.type_yang})
581
582 vm_create_msg_dict["connection_points"] = cp_list
583 vm_create_msg_dict.update(vdu_copy_dict)
584
585 self.process_placement_groups(vm_create_msg_dict)
586
587 msg = RwResourceMgrYang.VDUEventData()
588 msg.event_id = self._request_id
589 msg.cloud_account = self.cloud_account_name
590 msg.request_info.from_dict(vm_create_msg_dict)
591 return msg
592
593 @asyncio.coroutine
594 def terminate(self, xact):
595 """ Delete resource in VIM """
596 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
597 self._log.warning("VDU terminate in not ready state - Ignoring request")
598 return
599
600 self._state = VDURecordState.TERMINATING
601 if self._vm_resp is not None:
602 try:
603 with self._dts.transaction() as new_xact:
604 yield from self.delete_resource(new_xact)
605 except Exception:
606 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
607
608 if self._rm_regh is not None:
609 self._log.debug("Deregistering resource manager registration handle")
610 self._rm_regh.deregister()
611 self._rm_regh = None
612
613 if self._vdur_console_handler is not None:
614 self._log.error("Deregistering vnfr vdur registration handle")
615 self._vdur_console_handler._regh.deregister()
616 self._vdur_console_handler._regh = None
617
618 self._state = VDURecordState.TERMINATED
619
620 def find_internal_cp_by_cp_id(self, cp_id):
621 """ Find the CP corresponding to the connection point id"""
622 cp = None
623
624 self._log.debug("find_internal_cp_by_cp_id(%s) called",
625 cp_id)
626
627 for int_cp in self._vdud.internal_connection_point:
628 self._log.debug("Checking for int cp %s in internal connection points",
629 int_cp.id)
630 if int_cp.id == cp_id:
631 cp = int_cp
632 break
633
634 if cp is None:
635 self._log.debug("Failed to find cp %s in internal connection points",
636 cp_id)
637 msg = "Failed to find cp %s in internal connection points" % cp_id
638 raise VduRecordError(msg)
639
640 # return the VLR associated with the connection point
641 return cp
642
643 @asyncio.coroutine
644 def create_resource(self, xact, vnfr, config=None):
645 """ Request resource from ResourceMgr """
646 def find_cp_by_name(cp_name):
647 """ Find a connection point by name """
648 cp = None
649 self._log.debug("find_cp_by_name(%s) called", cp_name)
650 for ext_cp in vnfr._cprs:
651 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
652 if ext_cp.name == cp_name:
653 cp = ext_cp
654 break
655 if cp is None:
656 self._log.debug("Failed to find cp %s in external connection points",
657 cp_name)
658 return cp
659
660 def find_internal_vlr_by_cp_name(cp_name):
661 """ Find the VLR corresponding to the connection point name"""
662 cp = None
663
664 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
665 cp_name)
666
667 for int_cp in self._vdud.internal_connection_point:
668 self._log.debug("Checking for int cp %s in internal connection points",
669 int_cp.id)
670 if int_cp.id == cp_name:
671 cp = int_cp
672 break
673
674 if cp is None:
675 self._log.debug("Failed to find cp %s in internal connection points",
676 cp_name)
677 msg = "Failed to find cp %s in internal connection points" % cp_name
678 raise VduRecordError(msg)
679
680 # return the VLR associated with the connection point
681 return vnfr.find_vlr_by_cp(cp_name)
682
683 block = xact.block_create()
684
685 self._log.debug("Executing vm request id: %s, action: create",
686 self._request_id)
687
688 # Resolve the networks associated external interfaces
689 for ext_intf in self._vdud.external_interface:
690 self._log.debug("Resolving external interface name [%s], cp[%s]",
691 ext_intf.name, ext_intf.vnfd_connection_point_ref)
692 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
693 if cp is None:
694 self._log.debug("Failed to find connection point - %s",
695 ext_intf.vnfd_connection_point_ref)
696 continue
697 self._log.debug("Connection point name [%s], type[%s]",
698 cp.name, cp.type_yang)
699
700 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
701
702 etuple = (ext_intf, cp.name, vlr)
703 self._ext_intf.append(etuple)
704
705 self._log.debug("Created external interface tuple : %s", etuple)
706
707 # Resolve the networks associated internal interfaces
708 for intf in self._vdud.internal_interface:
709 cp_id = intf.vdu_internal_connection_point_ref
710 self._log.debug("Resolving internal interface name [%s], cp[%s]",
711 intf.name, cp_id)
712
713 try:
714 vlr = find_internal_vlr_by_cp_name(cp_id)
715 except Exception as e:
716 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
717 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
718 raise VduRecordError(msg)
719
720 ituple = (intf, cp_id, vlr)
721 self._int_intf.append(ituple)
722
723 self._log.debug("Created internal interface tuple : %s", ituple)
724
725 resmgr_path = self.resmgr_path
726 resmgr_msg = self.resmgr_msg(config)
727
728 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
729 block.add_query_create(resmgr_path, resmgr_msg)
730
731 res_iter = yield from block.execute(now=True)
732
733 resp = None
734
735 for i in res_iter:
736 r = yield from i
737 resp = r.result
738
739 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
740 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
741 self._log.debug("Got vm request response: %s", resp.resource_info)
742 return resp.resource_info
743
744 @asyncio.coroutine
745 def delete_resource(self, xact):
746 block = xact.block_create()
747
748 self._log.debug("Executing vm request id: %s, action: delete",
749 self._request_id)
750
751 block.add_query_delete(self.resmgr_path)
752
753 yield from block.execute(flags=0, now=True)
754
755 @asyncio.coroutine
756 def read_resource(self, xact):
757 block = xact.block_create()
758
759 self._log.debug("Executing vm request id: %s, action: delete",
760 self._request_id)
761
762 block.add_query_read(self.resmgr_path)
763
764 res_iter = yield from block.execute(flags=0, now=True)
765 for i in res_iter:
766 r = yield from i
767 resp = r.result
768
769 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
770 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
771 self._log.debug("Got vm request response: %s", resp.resource_info)
772 #self._vm_resp = resp.resource_info
773 return resp.resource_info
774
775
776 @asyncio.coroutine
777 def start_component(self):
778 """ This VDUR is active """
779 self._log.debug("Starting component %s for vdud %s vdur %s",
780 self._vdud.vcs_component_ref,
781 self._vdud,
782 self._vdur_id)
783 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
784 self.vm_resp.management_ip)
785
786 @property
787 def active(self):
788 """ Is this VDU active """
789 return True if self._state is VDURecordState.READY else False
790
791 @asyncio.coroutine
792 def instantiation_failed(self, failed_reason=None):
793 """ VDU instantiation failed """
794 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
795 self._state = VDURecordState.FAILED
796 self._state_failed_reason = failed_reason
797 yield from self._vnfr.instantiation_failed(failed_reason)
798
799 @asyncio.coroutine
800 def vdu_is_active(self):
801 """ This VDU is active"""
802 if self.active:
803 self._log.warning("VDU %s was already marked as active", self._vdur_id)
804 return
805
806 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
807
808 if self._vdud.vcs_component_ref is not None:
809 yield from self.start_component()
810
811 self._state = VDURecordState.READY
812
813 if self._vnfr.all_vdus_active():
814 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
815 yield from self._vnfr.is_ready()
816
817 @asyncio.coroutine
818 def instantiate(self, xact, vnfr, config=None):
819 """ Instantiate this VDU """
820 self._state = VDURecordState.INSTANTIATING
821
822 @asyncio.coroutine
823 def on_prepare(xact_info, query_action, ks_path, msg):
824 """ This VDUR is active """
825 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
826 query_action,
827 ks_path,
828 msg)
829
830 if (query_action == rwdts.QueryAction.UPDATE or
831 query_action == rwdts.QueryAction.CREATE):
832 self._vm_resp = msg
833
834 if msg.resource_state == "active":
835 # Move this VDU to ready state
836 yield from self.vdu_is_active()
837 elif msg.resource_state == "failed":
838 yield from self.instantiation_failed(msg.resource_errors)
839 elif query_action == rwdts.QueryAction.DELETE:
840 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
841 else:
842 raise NotImplementedError(
843 "%s action on VirtualDeployementUnitRecord not supported",
844 query_action)
845
846 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
847
848 try:
849 reg_event = asyncio.Event(loop=self._loop)
850
851 @asyncio.coroutine
852 def on_ready(regh, status):
853 reg_event.set()
854
855 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
856 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
857 flags=rwdts.Flag.SUBSCRIBER,
858 handler=handler)
859 yield from reg_event.wait()
860
861 vm_resp = yield from self.create_resource(xact, vnfr, config)
862 self._vm_resp = vm_resp
863
864 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
865 self._log.debug("Requested VM from resource manager response %s",
866 vm_resp)
867 if vm_resp.resource_state == "active":
868 self._log.debug("Resourcemgr responded wih an active vm resp %s",
869 vm_resp)
870 yield from self.vdu_is_active()
871 self._state = VDURecordState.READY
872 elif (vm_resp.resource_state == "pending" or
873 vm_resp.resource_state == "inactive"):
874 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
875 vm_resp)
876 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
877 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
878 # flags=rwdts.Flag.SUBSCRIBER,
879 # handler=handler)
880 else:
881 self._log.debug("Resourcemgr responded wih an error vm resp %s",
882 vm_resp)
883 raise VirtualDeploymentUnitRecordError(
884 "Failed VDUR instantiation %s " % vm_resp)
885
886 except Exception as e:
887 import traceback
888 traceback.print_exc()
889 self._log.exception(e)
890 self._log.error("Instantiation of VDU record failed: %s", str(e))
891 self._state = VDURecordState.FAILED
892 yield from self.instantiation_failed(str(e))
893
894
895 class VlRecordState(enum.Enum):
896 """ VL Record State """
897 INIT = 101
898 INSTANTIATION_PENDING = 102
899 ACTIVE = 103
900 TERMINATE_PENDING = 104
901 TERMINATED = 105
902 FAILED = 106
903
904
905 class InternalVirtualLinkRecord(object):
906 """ Internal Virtual Link record """
907 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
908 self._dts = dts
909 self._log = log
910 self._loop = loop
911 self._ivld_msg = ivld_msg
912 self._vnfr_name = vnfr_name
913 self._cloud_account_name = cloud_account_name
914
915 self._vlr_req = self.create_vlr()
916 self._vlr = None
917 self._state = VlRecordState.INIT
918
919 @property
920 def vlr_id(self):
921 """ Find VLR by id """
922 return self._vlr_req.id
923
924 @property
925 def name(self):
926 """ Name of this VL """
927 return self._vnfr_name + "." + self._ivld_msg.name
928
929 @property
930 def network_id(self):
931 """ Find VLR by id """
932 return self._vlr.network_id if self._vlr else None
933
934 def vlr_path(self):
935 """ VLR path for this VLR instance"""
936 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
937
938 def create_vlr(self):
939 """ Create the VLR record which will be instantiated """
940
941 vld_fields = ["short_name",
942 "vendor",
943 "description",
944 "version",
945 "type_yang",
946 "provider_network"]
947
948 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
949
950 vlr_dict = {"id": str(uuid.uuid4()),
951 "name": self.name,
952 "cloud_account": self._cloud_account_name,
953 }
954 vlr_dict.update(vld_copy_dict)
955
956 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
957 return vlr
958
959 @asyncio.coroutine
960 def instantiate(self, xact, restart_mode=False):
961 """ Instantiate VL """
962
963 @asyncio.coroutine
964 def instantiate_vlr():
965 """ Instantiate VLR"""
966 self._log.debug("Create VL with xpath %s and vlr %s",
967 self.vlr_path(), self._vlr_req)
968
969 with self._dts.transaction(flags=0) as xact:
970 block = xact.block_create()
971 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
972 self._log.debug("Executing VL create path:%s msg:%s",
973 self.vlr_path(), self._vlr_req)
974
975 res_iter = None
976 try:
977 res_iter = yield from block.execute()
978 except Exception:
979 self._state = VlRecordState.FAILED
980 self._log.exception("Caught exception while instantial VL")
981 raise
982
983 for ent in res_iter:
984 res = yield from ent
985 self._vlr = res.result
986
987 if self._vlr.operational_status == 'failed':
988 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
989 self._state = VlRecordState.FAILED
990 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
991
992 self._log.info("Created VL with xpath %s and vlr %s",
993 self.vlr_path(), self._vlr)
994
995 @asyncio.coroutine
996 def get_vlr():
997 """ Get the network id """
998 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
999 vlr = None
1000 for ent in res_iter:
1001 res = yield from ent
1002 vlr = res.result
1003
1004 if vlr is None:
1005 err = "Failed to get VLR for path %s" % self.vlr_path()
1006 self._log.warn(err)
1007 raise InternalVirtualLinkRecordError(err)
1008 return vlr
1009
1010 self._state = VlRecordState.INSTANTIATION_PENDING
1011
1012 if restart_mode:
1013 vl = yield from get_vlr()
1014 if vl is None:
1015 yield from instantiate_vlr()
1016 else:
1017 yield from instantiate_vlr()
1018
1019 self._state = VlRecordState.ACTIVE
1020
1021 def vlr_in_vns(self):
1022 """ Is there a VLR record in VNS """
1023 if (self._state == VlRecordState.ACTIVE or
1024 self._state == VlRecordState.INSTANTIATION_PENDING or
1025 self._state == VlRecordState.FAILED):
1026 return True
1027
1028 return False
1029
1030 @asyncio.coroutine
1031 def terminate(self, xact):
1032 """Terminate this VL """
1033 if not self.vlr_in_vns():
1034 self._log.debug("Ignoring terminate request for id %s in state %s",
1035 self.vlr_id, self._state)
1036 return
1037
1038 self._log.debug("Terminating VL with path %s", self.vlr_path())
1039 self._state = VlRecordState.TERMINATE_PENDING
1040 block = xact.block_create()
1041 block.add_query_delete(self.vlr_path())
1042 yield from block.execute(flags=0, now=True)
1043 self._state = VlRecordState.TERMINATED
1044 self._log.debug("Terminated VL with path %s", self.vlr_path())
1045
1046
1047 class VirtualNetworkFunctionRecord(object):
1048 """ Virtual Network Function Record """
1049 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg):
1050 self._dts = dts
1051 self._log = log
1052 self._loop = loop
1053 self._cluster_name = cluster_name
1054 self._vnfr_msg = vnfr_msg
1055 self._vnfr_id = vnfr_msg.id
1056 self._vnfd_id = vnfr_msg.vnfd_ref
1057 self._vnfm = vnfm
1058 self._vcs_handler = vcs_handler
1059 self._vnfr = vnfr_msg
1060
1061 self._vnfd = None
1062 self._state = VirtualNetworkFunctionRecordState.INIT
1063 self._state_failed_reason = None
1064 self._ext_vlrs = {} # The list of external virtual links
1065 self._vlrs = [] # The list of internal virtual links
1066 self._vdus = [] # The list of vdu
1067 self._vlr_by_cp = {}
1068 self._cprs = []
1069 self._inventory = {}
1070 self._create_time = int(time.time())
1071 self._vnf_mon = None
1072 self._config_status = vnfr_msg.config_status
1073 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1074
1075 def _get_vdur_from_vdu_id(self, vdu_id):
1076 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1077 self._log.debug("Searching through vdus: %s", self._vdus)
1078 for vdu in self._vdus:
1079 self._log.debug("vdu_id: %s", vdu.vdu_id)
1080 if vdu.vdu_id == vdu_id:
1081 return vdu
1082
1083 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1084
1085 @property
1086 def operational_status(self):
1087 """ Operational status of this VNFR """
1088 op_status_map = {"INIT": "init",
1089 "VL_INIT_PHASE": "vl_init_phase",
1090 "VM_INIT_PHASE": "vm_init_phase",
1091 "READY": "running",
1092 "TERMINATE": "terminate",
1093 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1094 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1095 "TERMINATED": "terminated",
1096 "FAILED": "failed", }
1097 return op_status_map[self._state.name]
1098
1099 @property
1100 def vnfd_xpath(self):
1101 """ VNFD xpath associated with this VNFR """
1102 return("C,/vnfd:vnfd-catalog/"
1103 "vnfd:vnfd[vnfd:id = '{}']".format(self._vnfd_id))
1104
1105 @property
1106 def vnfd(self):
1107 """ VNFD for this VNFR """
1108 return self._vnfd
1109
1110 @property
1111 def vnf_name(self):
1112 """ VNFD name associated with this VNFR """
1113 return self.vnfd.name
1114
1115 @property
1116 def name(self):
1117 """ Name of this VNF in the record """
1118 return self._vnfr.name
1119
1120 @property
1121 def cloud_account_name(self):
1122 """ Name of the cloud account this VNFR is instantiated in """
1123 return self._vnfr.cloud_account
1124
1125 @property
1126 def vnfd_id(self):
1127 """ VNFD Id associated with this VNFR """
1128 return self.vnfd.id
1129
1130 @property
1131 def vnfr_id(self):
1132 """ VNFR Id associated with this VNFR """
1133 return self._vnfr_id
1134
1135 @property
1136 def member_vnf_index(self):
1137 """ Member VNF index associated with this VNFR """
1138 return self._vnfr.member_vnf_index_ref
1139
1140 @property
1141 def config_status(self):
1142 """ Config agent status for this VNFR """
1143 return self._config_status
1144
1145 def component_by_name(self, component_name):
1146 """ Find a component by name in the inventory list"""
1147 mangled_name = VcsComponent.mangle_name(component_name,
1148 self.vnf_name,
1149 self.vnfd_id)
1150 return self._inventory[mangled_name]
1151
1152
1153
1154 @asyncio.coroutine
1155 def get_nsr_config(self):
1156 ### Need access to NS instance configuration for runtime resolution.
1157 ### This shall be replaced when deployment flavors are implemented
1158 xpath = "C,/nsr:ns-instance-config"
1159 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1160
1161 for result in results:
1162 entry = yield from result
1163 ns_instance_config = entry.result
1164 for nsr in ns_instance_config.nsr:
1165 if nsr.id == self._vnfr_msg.nsr_id_ref:
1166 return nsr
1167 return None
1168
1169 @asyncio.coroutine
1170 def start_component(self, component_name, ip_addr):
1171 """ Start a component in the VNFR by name """
1172 comp = self.component_by_name(component_name)
1173 yield from comp.start(None, None, ip_addr)
1174
1175 def cp_ip_addr(self, cp_name):
1176 """ Get ip address for connection point """
1177 self._log.debug("cp_ip_addr()")
1178 for cp in self._cprs:
1179 if cp.name == cp_name and cp.ip_address is not None:
1180 return cp.ip_address
1181 return "0.0.0.0"
1182
1183 def mgmt_intf_info(self):
1184 """ Get Management interface info for this VNFR """
1185 mgmt_intf_desc = self.vnfd.msg.mgmt_interface
1186 ip_addr = None
1187 if mgmt_intf_desc.has_field("cp"):
1188 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1189 elif mgmt_intf_desc.has_field("vdu_id"):
1190 try:
1191 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1192 ip_addr = vdur.management_ip
1193 except VDURecordNotFound:
1194 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1195 ip_addr = None
1196 else:
1197 ip_addr = mgmt_intf_desc.ip_address
1198 port = mgmt_intf_desc.port
1199
1200 return ip_addr, port
1201
1202 @property
1203 def msg(self):
1204 """ Message associated with this VNFR """
1205 vnfd_fields = ["short_name", "vendor", "description", "version"]
1206 vnfd_copy_dict = {k: v for k, v in self.vnfd.msg.as_dict().items() if k in vnfd_fields}
1207
1208 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1209 ip_address, port = self.mgmt_intf_info()
1210
1211 if ip_address is not None:
1212 mgmt_intf.ip_address = ip_address
1213 if port is not None:
1214 mgmt_intf.port = port
1215
1216 vnfr_dict = {"id": self._vnfr_id,
1217 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1218 "name": self.name,
1219 "member_vnf_index_ref": self.member_vnf_index,
1220 "vnfd_ref": self.vnfd_id,
1221 "operational_status": self.operational_status,
1222 "operational_status_details": self._state_failed_reason,
1223 "cloud_account": self.cloud_account_name,
1224 "config_status": self._config_status
1225 }
1226
1227 vnfr_dict.update(vnfd_copy_dict)
1228
1229 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1230 vnfr_msg.mgmt_interface = mgmt_intf
1231
1232 # Add all the VLRs to VNFR
1233 for vlr in self._vlrs:
1234 ivlr = vnfr_msg.internal_vlr.add()
1235 ivlr.vlr_ref = vlr.vlr_id
1236
1237 # Add all the VDURs to VDUR
1238 if self._vdus is not None:
1239 for vdu in self._vdus:
1240 vdur = vnfr_msg.vdur.add()
1241 vdur.from_dict(vdu.msg.as_dict())
1242
1243 if self.vnfd.msg.mgmt_interface.has_field('dashboard_params'):
1244 vnfr_msg.dashboard_url = self.dashboard_url
1245
1246 for cpr in self._cprs:
1247 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1248 vnfr_msg.connection_point.append(new_cp)
1249
1250 if self._vnf_mon is not None:
1251 for monp in self._vnf_mon.msg:
1252 vnfr_msg.monitoring_param.append(
1253 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1254
1255 if self._vnfr.vnf_configuration is not None:
1256 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1257 if (ip_address is not None and
1258 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1259 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1260
1261 for group in self._vnfr_msg.placement_groups_info:
1262 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1263 group_info.from_dict(group.as_dict())
1264 vnfr_msg.placement_groups_info.append(group_info)
1265
1266 return vnfr_msg
1267
1268 @property
1269 def dashboard_url(self):
1270 ip, cfg_port = self.mgmt_intf_info()
1271 protocol = 'http'
1272 http_port = 80
1273 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('https'):
1274 if self.vnfd.msg.mgmt_interface.dashboard_params.https is True:
1275 protocol = 'https'
1276 http_port = 443
1277 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('port'):
1278 http_port = self.vnfd.msg.mgmt_interface.dashboard_params.port
1279
1280 url = "{protocol}://{ip_address}:{port}/{path}".format(
1281 protocol=protocol,
1282 ip_address=ip,
1283 port=http_port,
1284 path=self.vnfd.msg.mgmt_interface.dashboard_params.path.lstrip("/"),
1285 )
1286
1287 return url
1288
1289 @property
1290 def xpath(self):
1291 """ path for this VNFR """
1292 return("D,/vnfr:vnfr-catalog"
1293 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1294
1295 @asyncio.coroutine
1296 def publish(self, xact):
1297 """ publish this VNFR """
1298 vnfr = self.msg
1299 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1300 self.xpath, self.msg)
1301 vnfr.create_time = self._create_time
1302 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1303 self._log.debug("Published VNFR path = [%s], record = [%s]",
1304 self.xpath, self.msg)
1305
1306 @asyncio.coroutine
1307 def create_vls(self):
1308 """ Publish The VLs associated with this VNF """
1309 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1310 self.vnfd_id)
1311 for ivld_msg in self.vnfd.msg.internal_vld:
1312 self._log.debug("Creating internal vld:"
1313 " %s, int_cp_ref = %s",
1314 ivld_msg, ivld_msg.internal_connection_point
1315 )
1316 vlr = InternalVirtualLinkRecord(dts=self._dts,
1317 log=self._log,
1318 loop=self._loop,
1319 ivld_msg=ivld_msg,
1320 vnfr_name=self.name,
1321 cloud_account_name=self.cloud_account_name
1322 )
1323 self._vlrs.append(vlr)
1324
1325 for int_cp in ivld_msg.internal_connection_point:
1326 if int_cp.id_ref in self._vlr_by_cp:
1327 msg = ("Connection point %s already "
1328 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1329 raise InternalVirtualLinkRecordError(msg)
1330 self._log.debug("Setting vlr %s to internal cp = %s",
1331 vlr, int_cp.id_ref)
1332 self._vlr_by_cp[int_cp.id_ref] = vlr
1333
1334 @asyncio.coroutine
1335 def instantiate_vls(self, xact, restart_mode=False):
1336 """ Instantiate the VLs associated with this VNF """
1337 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1338 self.vnfd_id)
1339
1340 for vlr in self._vlrs:
1341 self._log.debug("Instantiating VLR %s", vlr)
1342 yield from vlr.instantiate(xact, restart_mode)
1343
1344 def find_vlr_by_cp(self, cp_name):
1345 """ Find the VLR associated with the cp name """
1346 return self._vlr_by_cp[cp_name]
1347
1348 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1349 """
1350 Returns the cloud specific construct for placement group
1351 Arguments:
1352 input_group: VNFD PlacementGroup
1353 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1354 """
1355 copy_dict = ['name', 'requirement', 'strategy']
1356 for group_info in nsr_config.vnfd_placement_group_maps:
1357 if group_info.placement_group_ref == input_group.name and \
1358 group_info.vnfd_id_ref == self.vnfd_id:
1359 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1360 group_dict = {k:v for k,v in
1361 group_info.as_dict().items()
1362 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1363 for param in copy_dict:
1364 group_dict.update({param: getattr(input_group, param)})
1365 group.from_dict(group_dict)
1366 return group
1367 return None
1368
1369 @asyncio.coroutine
1370 def get_vdu_placement_groups(self, vdu):
1371 placement_groups = []
1372 ### Step-1: Get VNF level placement groups
1373 for group in self._vnfr_msg.placement_groups_info:
1374 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1375 #group_info.from_dict(group.as_dict())
1376 placement_groups.append(group)
1377
1378 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1379 nsr_config = yield from self.get_nsr_config()
1380
1381 ### Step-3: Get VDU level placement groups
1382 for group in self.vnfd.msg.placement_groups:
1383 for member_vdu in group.member_vdus:
1384 if member_vdu.member_vdu_ref == vdu.id:
1385 group_info = self.resolve_placement_group_cloud_construct(group,
1386 nsr_config)
1387 if group_info is None:
1388 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1389 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1390 else:
1391 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1392 str(group_info),
1393 vdu.name,
1394 self.vnf_name,
1395 self.member_vnf_index)
1396 placement_groups.append(group_info)
1397
1398 return placement_groups
1399
1400 @asyncio.coroutine
1401 def create_vdus(self, vnfr, restart_mode=False):
1402 """ Create the VDUs associated with this VNF """
1403
1404 def get_vdur_id(vdud):
1405 """Get the corresponding VDUR's id for the VDUD. This is useful in
1406 case of a restart.
1407
1408 In restart mode we check for exiting VDUR's ID and use them, if
1409 available. This way we don't end up creating duplicate VDURs
1410 """
1411 vdur_id = None
1412
1413 if restart_mode and vdud is not None:
1414 try:
1415 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1416 vdur_id = vdur[0]
1417 except IndexError:
1418 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1419
1420 return vdur_id
1421
1422
1423 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1424 for vdu in self.vnfd.msg.vdu:
1425 self._log.debug("Creating vdu: %s", vdu)
1426 vdur_id = get_vdur_id(vdu)
1427
1428 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1429 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1430 vdu.name,
1431 self.vnf_name,
1432 self.member_vnf_index,
1433 [ group.name for group in placement_groups])
1434
1435 vdur = VirtualDeploymentUnitRecord(
1436 dts=self._dts,
1437 log=self._log,
1438 loop=self._loop,
1439 vdud=vdu,
1440 vnfr=vnfr,
1441 mgmt_intf=self.has_mgmt_interface(vdu),
1442 cloud_account_name=self.cloud_account_name,
1443 vnfd_package_store=self._vnfd_package_store,
1444 vdur_id=vdur_id,
1445 placement_groups = placement_groups,
1446 )
1447 yield from vdur.vdu_opdata_register()
1448
1449 self._vdus.append(vdur)
1450
1451 @asyncio.coroutine
1452 def instantiate_vdus(self, xact, vnfr):
1453 """ Instantiate the VDUs associated with this VNF """
1454 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1455
1456 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1457
1458 # Identify any dependencies among the VDUs
1459 dependencies = collections.defaultdict(list)
1460 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1461
1462 for vdu in self._vdus:
1463 if vdu.vdud_cloud_init is not None:
1464 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1465 if vdu_id != vdu.vdu_id:
1466 # This means that vdu.vdu_id depends upon vdu_id,
1467 # i.e. vdu_id must be instantiated before
1468 # vdu.vdu_id.
1469 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1470
1471 # Define the terminal states of VDU instantiation
1472 terminal = (
1473 VDURecordState.READY,
1474 VDURecordState.TERMINATED,
1475 VDURecordState.FAILED,
1476 )
1477
1478 datastore = VdurDatastore()
1479 processed = set()
1480
1481 @asyncio.coroutine
1482 def instantiate_monitor(vdu):
1483 """Monitor the state of the VDU during instantiation
1484
1485 Arguments:
1486 vdu - a VirtualDeploymentUnitRecord
1487
1488 """
1489 # wait for the VDUR to enter a terminal state
1490 while vdu._state not in terminal:
1491 yield from asyncio.sleep(1, loop=self._loop)
1492
1493 # update the datastore
1494 datastore.update(vdu)
1495
1496 # add the VDU to the set of processed VDUs
1497 processed.add(vdu.vdu_id)
1498
1499 @asyncio.coroutine
1500 def instantiate(vdu):
1501 """Instantiate the specified VDU
1502
1503 Arguments:
1504 vdu - a VirtualDeploymentUnitRecord
1505
1506 Raises:
1507 if the VDU, or any of the VDUs this VDU depends upon, are
1508 terminated or fail to instantiate properly, a
1509 VirtualDeploymentUnitRecordError is raised.
1510
1511 """
1512 for dependency in dependencies[vdu.vdu_id]:
1513 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1514
1515 while dependency.vdu_id not in processed:
1516 yield from asyncio.sleep(1, loop=self._loop)
1517
1518 if not dependency.active:
1519 raise VirtualDeploymentUnitRecordError()
1520
1521 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1522
1523 # Populate the datastore with the current values of the VDU
1524 datastore.add(vdu)
1525
1526 # Substitute any variables contained in the cloud config script
1527 config = str(vdu.vdud_cloud_init)
1528
1529 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1530 if len(parts) > 1:
1531
1532 # Extract the variable names
1533 variables = list()
1534 for variable in parts[1::2]:
1535 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1536
1537 # Iterate of the variables and substitute values from the
1538 # datastore.
1539 for variable in variables:
1540
1541 # Handle a reference to a VDU by ID
1542 if variable.startswith('vdu['):
1543 value = datastore.get(variable)
1544 if value is None:
1545 msg = "Unable to find a substitute for {} in {} cloud-init script"
1546 raise ValueError(msg.format(variable, vdu.vdu_id))
1547
1548 config = config.replace("{{ %s }}" % variable, value)
1549 continue
1550
1551 # Handle a reference to the current VDU
1552 if variable.startswith('vdu'):
1553 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1554 config = config.replace("{{ %s }}" % variable, value)
1555 continue
1556
1557 # Handle unrecognized variables
1558 msg = 'unrecognized cloud-config variable: {}'
1559 raise ValueError(msg.format(variable))
1560
1561 # Instantiate the VDU
1562 with self._dts.transaction() as xact:
1563 self._log.debug("Instantiating vdu: %s", vdu)
1564 yield from vdu.instantiate(xact, vnfr, config=config)
1565 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1566 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1567 self.vnfr_id, vdu)
1568
1569 # First create a set of tasks to monitor the state of the VDUs and
1570 # report when they have entered a terminal state
1571 for vdu in self._vdus:
1572 self._loop.create_task(instantiate_monitor(vdu))
1573
1574 for vdu in self._vdus:
1575 self._loop.create_task(instantiate(vdu))
1576
1577 def has_mgmt_interface(self, vdu):
1578 # ## TODO: Support additional mgmt_interface type options
1579 if self.vnfd.msg.mgmt_interface.vdu_id == vdu.id:
1580 return True
1581 return False
1582
1583 def vlr_xpath(self, vlr_id):
1584 """ vlr xpath """
1585 return(
1586 "D,/vlr:vlr-catalog/"
1587 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1588
1589 def ext_vlr_by_id(self, vlr_id):
1590 """ find ext vlr by id """
1591 return self._ext_vlrs[vlr_id]
1592
1593 @asyncio.coroutine
1594 def publish_inventory(self, xact):
1595 """ Publish the inventory associated with this VNF """
1596 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1597
1598 for component in self.vnfd.msg.component:
1599 self._log.debug("Creating inventory component %s", component)
1600 mangled_name = VcsComponent.mangle_name(component.component_name,
1601 self.vnf_name,
1602 self.vnfd_id
1603 )
1604 comp = VcsComponent(dts=self._dts,
1605 log=self._log,
1606 loop=self._loop,
1607 cluster_name=self._cluster_name,
1608 vcs_handler=self._vcs_handler,
1609 component=component,
1610 mangled_name=mangled_name,
1611 )
1612 if comp.name in self._inventory:
1613 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1614 component, self._vnfd_id)
1615 return
1616 self._log.debug("Adding component %s for vnrf %s",
1617 comp.name, self._vnfr_id)
1618 self._inventory[comp.name] = comp
1619 yield from comp.publish(xact)
1620
1621 def all_vdus_active(self):
1622 """ Are all VDUS in this VNFR active? """
1623 for vdu in self._vdus:
1624 if not vdu.active:
1625 return False
1626
1627 self._log.debug("Inside all_vdus_active. Returning True")
1628 return True
1629
1630 @asyncio.coroutine
1631 def instantiation_failed(self, failed_reason=None):
1632 """ VNFR instantiation failed """
1633 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1634 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1635 self._state_failed_reason = failed_reason
1636
1637 # Update the VNFR with the changed status
1638 yield from self.publish(None)
1639
1640 @asyncio.coroutine
1641 def is_ready(self):
1642 """ This VNF is ready"""
1643 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1644
1645 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1646 self.set_state(VirtualNetworkFunctionRecordState.READY)
1647
1648 else:
1649 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1650
1651 # Update the VNFR with the changed status
1652 yield from self.publish(None)
1653
1654 def update_cp(self, cp_name, ip_address, cp_id):
1655 """Updated the connection point with ip address"""
1656 for cp in self._cprs:
1657 if cp.name == cp_name:
1658 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1659 cp_name, cp, ip_address, cp_id)
1660 cp.ip_address = ip_address
1661 cp.connection_point_id = cp_id
1662 return
1663
1664 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1665 self._log.debug(err)
1666 raise VirtualDeploymentUnitRecordError(err)
1667
1668 def set_state(self, state):
1669 """ Set state for this VNFR"""
1670 self._state = state
1671
1672 @asyncio.coroutine
1673 def instantiate(self, xact, restart_mode=False):
1674 """ instantiate this VNF """
1675 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1676
1677 @asyncio.coroutine
1678 def fetch_vlrs():
1679 """ Fetch VLRs """
1680 # Iterate over all the connection points in VNFR and fetch the
1681 # associated VLRs
1682
1683 def cpr_from_cp(cp):
1684 """ Creates a record level connection point from the desciptor cp"""
1685 cp_fields = ["name", "image", "vm-flavor"]
1686 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1687 cpr_dict = {}
1688 cpr_dict.update(cp_copy_dict)
1689 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1690
1691 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1692 self._vnfr_id, self._vnfr.connection_point)
1693
1694 for cp in self._vnfr.connection_point:
1695 cpr = cpr_from_cp(cp)
1696 self._cprs.append(cpr)
1697 self._log.debug("Adding Connection point record %s ", cp)
1698
1699 vlr_path = self.vlr_xpath(cp.vlr_ref)
1700 self._log.debug("Fetching VLR with path = %s", vlr_path)
1701 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1702 rwdts.XactFlag.MERGE)
1703 for i in res_iter:
1704 r = yield from i
1705 d = r.result
1706 self._ext_vlrs[cp.vlr_ref] = d
1707 cpr.vlr_ref = cp.vlr_ref
1708 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1709
1710 # Fetch the VNFD associated with the VNFR
1711 self._log.debug("VNFR-ID %s: Fetching vnfds", self._vnfr_id)
1712 self._vnfd = yield from self._vnfm.get_vnfd_ref(self._vnfd_id)
1713 self._log.debug("VNFR-ID %s: Fetched vnfd:%s", self._vnfr_id, self._vnfd)
1714
1715 assert self.vnfd is not None
1716
1717 # Fetch External VLRs
1718 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1719 yield from fetch_vlrs()
1720
1721 # Publish inventory
1722 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1723 yield from self.publish_inventory(xact)
1724
1725 # Publish inventory
1726 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1727 yield from self.create_vls()
1728
1729 # publish the VNFR
1730 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1731 yield from self.publish(xact)
1732
1733 # instantiate VLs
1734 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1735 try:
1736 yield from self.instantiate_vls(xact, restart_mode)
1737 except Exception as e:
1738 self._log.exception("VL instantiation failed (%s)", str(e))
1739 yield from self.instantiation_failed(str(e))
1740 return
1741
1742 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1743
1744 # instantiate VDUs
1745 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1746 yield from self.create_vdus(self, restart_mode)
1747
1748 # publish the VNFR
1749 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1750 yield from self.publish(xact)
1751
1752 # instantiate VDUs
1753 # ToDo: Check if this should be prevented during restart
1754 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1755 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1756
1757 # publish the VNFR
1758 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1759 yield from self.publish(xact)
1760
1761 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1762
1763 @asyncio.coroutine
1764 def terminate(self, xact):
1765 """ Terminate this virtual network function """
1766
1767 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1768
1769 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1770
1771 # stop monitoring
1772 if self._vnf_mon is not None:
1773 self._vnf_mon.stop()
1774 self._vnf_mon.deregister()
1775 self._vnf_mon = None
1776
1777 @asyncio.coroutine
1778 def terminate_vls():
1779 """ Terminate VLs in this VNF """
1780 for vl in self._vlrs:
1781 yield from vl.terminate(xact)
1782
1783 @asyncio.coroutine
1784 def terminate_vdus():
1785 """ Terminate VDUS in this VNF """
1786 for vdu in self._vdus:
1787 yield from vdu.terminate(xact)
1788
1789 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1790 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1791 yield from terminate_vls()
1792
1793 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1794 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1795 yield from terminate_vdus()
1796
1797 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1798 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1799
1800
1801 class VnfdDtsHandler(object):
1802 """ DTS handler for VNFD config changes """
1803 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1804
1805 def __init__(self, dts, log, loop, vnfm):
1806 self._dts = dts
1807 self._log = log
1808 self._loop = loop
1809 self._vnfm = vnfm
1810 self._regh = None
1811
1812 @asyncio.coroutine
1813 def regh(self):
1814 """ DTS registration handle """
1815 return self._regh
1816
1817 @asyncio.coroutine
1818 def register(self):
1819 """ Register for VNFD configuration"""
1820
1821 def on_apply(dts, acg, xact, action, scratch):
1822 """Apply the configuration"""
1823 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1824 xact, action, scratch)
1825
1826 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1827 # Create/Update a VNFD record
1828 for cfg in self._regh.get_xact_elements(xact):
1829 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1830 if cfg.id in scratch.get('vnfds', []) or is_recovery:
1831 self._vnfm.update_vnfd(cfg)
1832
1833 scratch.pop('vnfds', None)
1834
1835 @asyncio.coroutine
1836 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1837 """ on prepare callback """
1838 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1839 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1840 fref = ProtobufC.FieldReference.alloc()
1841 fref.goto_whole_message(msg.to_pbcm())
1842
1843 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1844 if fref.is_field_deleted():
1845 # Delete an VNFD record
1846 self._log.debug("Deleting VNFD with id %s", msg.id)
1847 if self._vnfm.vnfd_in_use(msg.id):
1848 self._log.debug("Cannot delete VNFD in use - %s", msg)
1849 err = "Cannot delete a VNFD in use - %s" % msg
1850 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1851 # Delete a VNFD record
1852 yield from self._vnfm.delete_vnfd(msg.id)
1853 else:
1854 # Handle actual adds/updates in apply_callback,
1855 # just check if VNFD in use in prepare_callback
1856 if self._vnfm.vnfd_in_use(msg.id):
1857 self._log.debug("Cannot modify an VNFD in use - %s", msg)
1858 err = "Cannot modify an VNFD in use - %s" % msg
1859 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1860
1861 # Add this VNFD to scratch to create/update in apply callback
1862 vnfds = scratch.setdefault('vnfds', [])
1863 vnfds.append(msg.id)
1864
1865 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1866
1867 self._log.debug(
1868 "Registering for VNFD config using xpath: %s",
1869 VnfdDtsHandler.XPATH,
1870 )
1871 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1872 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1873 self._regh = acg.register(
1874 xpath=VnfdDtsHandler.XPATH,
1875 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1876 on_prepare=on_prepare)
1877
1878
1879 class VcsComponentDtsHandler(object):
1880 """ Vcs Component DTS handler """
1881 XPATH = ("D,/rw-manifest:manifest" +
1882 "/rw-manifest:operational-inventory" +
1883 "/rw-manifest:component")
1884
1885 def __init__(self, dts, log, loop, vnfm):
1886 self._dts = dts
1887 self._log = log
1888 self._loop = loop
1889 self._regh = None
1890 self._vnfm = vnfm
1891
1892 @property
1893 def regh(self):
1894 """ DTS registration handle """
1895 return self._regh
1896
1897 @asyncio.coroutine
1898 def register(self):
1899 """ Registers VCS component dts publisher registration"""
1900 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1901 VcsComponentDtsHandler.XPATH)
1902
1903 hdl = rift.tasklets.DTS.RegistrationHandler()
1904 handlers = rift.tasklets.Group.Handler()
1905 with self._dts.group_create(handler=handlers) as group:
1906 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1907 handler=hdl,
1908 flags=(rwdts.Flag.PUBLISHER |
1909 rwdts.Flag.NO_PREP_READ |
1910 rwdts.Flag.DATASTORE),)
1911
1912 @asyncio.coroutine
1913 def publish(self, xact, path, msg):
1914 """ Publishes the VCS component """
1915 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1916 xact, path, msg)
1917 self.regh.create_element(path, msg)
1918 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1919 VcsComponentDtsHandler.XPATH, xact, path, msg)
1920
1921 class VnfrConsoleOperdataDtsHandler(object):
1922 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1923 @property
1924 def vnfr_vdu_console_xpath(self):
1925 """ path for resource-mgr"""
1926 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1927
1928 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1929 self._dts = dts
1930 self._log = log
1931 self._loop = loop
1932 self._regh = None
1933 self._vnfm = vnfm
1934
1935 self._vnfr_id = vnfr_id
1936 self._vdur_id = vdur_id
1937 self._vdu_id = vdu_id
1938
1939 @asyncio.coroutine
1940 def register(self):
1941 """ Register for VNFR VDU Operational Data read from dts """
1942
1943 @asyncio.coroutine
1944 def on_prepare(xact_info, action, ks_path, msg):
1945 """ prepare callback from dts """
1946 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
1947 self._log.debug(
1948 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1949 xact_info, action, xpath, msg
1950 )
1951
1952 if action == rwdts.QueryAction.READ:
1953 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
1954 path_entry = schema.keyspec_to_entry(ks_path)
1955 self._log.debug("VDU Opdata path is {}".format(path_entry))
1956 try:
1957 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
1958 except VnfRecordError as e:
1959 self._log.error("VNFR id %s not found", self._vnfr_id)
1960 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
1961 return
1962 try:
1963 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
1964 if not vdur._state == VDURecordState.READY:
1965 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
1966 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
1967 return
1968 with self._dts.transaction() as new_xact:
1969 resp = yield from vdur.read_resource(new_xact)
1970 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1971 vdur_console.id = self._vdur_id
1972 if resp.console_url:
1973 vdur_console.console_url = resp.console_url
1974 else:
1975 vdur_console.console_url = 'none'
1976 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
1977 except Exception:
1978 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
1979 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1980 vdur_console.id = self._vdur_id
1981 vdur_console.console_url = 'none'
1982
1983 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
1984 xpath=self.vnfr_vdu_console_xpath,
1985 msg=vdur_console)
1986 else:
1987 #raise VnfRecordError("Not supported operation %s" % action)
1988 self._log.error("Not supported operation %s" % action)
1989 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
1990 return
1991
1992
1993 self._log.debug("Registering for VNFR VDU using xpath: %s",
1994 self.vnfr_vdu_console_xpath)
1995 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
1996 with self._dts.group_create() as group:
1997 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
1998 handler=hdl,
1999 flags=rwdts.Flag.PUBLISHER,
2000 )
2001
2002
2003 class VnfrDtsHandler(object):
2004 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2005 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2006
2007 def __init__(self, dts, log, loop, vnfm):
2008 self._dts = dts
2009 self._log = log
2010 self._loop = loop
2011 self._vnfm = vnfm
2012
2013 self._regh = None
2014
2015 @property
2016 def regh(self):
2017 """ Return registration handle"""
2018 return self._regh
2019
2020 @property
2021 def vnfm(self):
2022 """ Return VNF manager instance """
2023 return self._vnfm
2024
2025 @asyncio.coroutine
2026 def register(self):
2027 """ Register for vnfr create/update/delete/read requests from dts """
2028 def on_commit(xact_info):
2029 """ The transaction has been committed """
2030 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2031 return rwdts.MemberRspCode.ACTION_OK
2032
2033 def on_abort(*args):
2034 """ Abort callback """
2035 self._log.debug("VNF transaction got aborted")
2036
2037 @asyncio.coroutine
2038 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2039
2040 @asyncio.coroutine
2041 def instantiate_realloc_vnfr(vnfr):
2042 """Re-populate the vnfm after restart
2043
2044 Arguments:
2045 vlink
2046
2047 """
2048
2049 yield from vnfr.instantiate(None, restart_mode=True)
2050
2051 if xact_event == rwdts.MemberEvent.INSTALL:
2052 curr_cfg = self.regh.elements
2053 for cfg in curr_cfg:
2054 vnfr = self.vnfm.create_vnfr(cfg)
2055 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2056
2057 self._log.debug("Got on_event in vnfm")
2058
2059 return rwdts.MemberRspCode.ACTION_OK
2060
2061 @asyncio.coroutine
2062 def on_prepare(xact_info, action, ks_path, msg):
2063 """ prepare callback from dts """
2064 self._log.debug(
2065 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2066 xact_info, action, msg
2067 )
2068
2069 if action == rwdts.QueryAction.CREATE:
2070 if not msg.has_field("vnfd_ref"):
2071 err = "Vnfd reference not provided"
2072 self._log.error(err)
2073 raise VnfRecordError(err)
2074
2075 vnfr = self.vnfm.create_vnfr(msg)
2076 try:
2077 # RIFT-9105: Unable to add a READ query under an existing transaction
2078 # xact = xact_info.xact
2079 yield from vnfr.instantiate(None)
2080 except Exception as e:
2081 self._log.exception(e)
2082 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2083 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2084 yield from vnfr.publish(None)
2085 elif action == rwdts.QueryAction.DELETE:
2086 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2087 path_entry = schema.keyspec_to_entry(ks_path)
2088 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2089
2090 if vnfr is None:
2091 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2092 raise VirtualNetworkFunctionRecordNotFound(
2093 "VNFR id %s", path_entry.key00.id)
2094
2095 try:
2096 yield from vnfr.terminate(xact_info.xact)
2097 # Unref the VNFD
2098 vnfr.vnfd.unref()
2099 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2100 except Exception as e:
2101 self._log.exception(e)
2102 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2103
2104 elif action == rwdts.QueryAction.UPDATE:
2105 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2106 path_entry = schema.keyspec_to_entry(ks_path)
2107 vnfr = None
2108 try:
2109 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2110 except Exception as e:
2111 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2112 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2113 return
2114
2115 if vnfr is None:
2116 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2117 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2118 return
2119
2120 self._log.debug("VNFR {} update config status {} (current {})".
2121 format(vnfr.name, msg.config_status, vnfr.config_status))
2122 # Update the config status and publish
2123 vnfr._config_status = msg.config_status
2124 yield from vnfr.publish(None)
2125
2126 else:
2127 raise NotImplementedError(
2128 "%s action on VirtualNetworkFunctionRecord not supported",
2129 action)
2130
2131 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2132
2133 self._log.debug("Registering for VNFR using xpath: %s",
2134 VnfrDtsHandler.XPATH,)
2135
2136 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2137 on_prepare=on_prepare,)
2138 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2139 with self._dts.group_create(handler=handlers) as group:
2140 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2141 handler=hdl,
2142 flags=(rwdts.Flag.PUBLISHER |
2143 rwdts.Flag.NO_PREP_READ |
2144 rwdts.Flag.CACHE |
2145 rwdts.Flag.DATASTORE),)
2146
2147 @asyncio.coroutine
2148 def create(self, xact, path, msg):
2149 """
2150 Create a VNFR record in DTS with path and message
2151 """
2152 self._log.debug("Creating VNFR xact = %s, %s:%s",
2153 xact, path, msg)
2154
2155 self.regh.create_element(path, msg)
2156 self._log.debug("Created VNFR xact = %s, %s:%s",
2157 xact, path, msg)
2158
2159 @asyncio.coroutine
2160 def update(self, xact, path, msg):
2161 """
2162 Update a VNFR record in DTS with path and message
2163 """
2164 self._log.debug("Updating VNFR xact = %s, %s:%s",
2165 xact, path, msg)
2166 self.regh.update_element(path, msg)
2167 self._log.debug("Updated VNFR xact = %s, %s:%s",
2168 xact, path, msg)
2169
2170 @asyncio.coroutine
2171 def delete(self, xact, path):
2172 """
2173 Delete a VNFR record in DTS with path and message
2174 """
2175 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2176 self.regh.delete_element(path)
2177 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2178
2179
2180 class VirtualNetworkFunctionDescriptor(object):
2181 """
2182 Virtual Network Function descriptor class
2183 """
2184
2185 def __init__(self, dts, log, loop, vnfm, vnfd):
2186 self._dts = dts
2187 self._log = log
2188 self._loop = loop
2189
2190 self._vnfm = vnfm
2191 self._vnfd = vnfd
2192 self._ref_count = 0
2193
2194 @property
2195 def ref_count(self):
2196 """ Returns the reference count associated with
2197 this Virtual Network Function Descriptor"""
2198 return self._ref_count
2199
2200 @property
2201 def id(self):
2202 """ Returns vnfd id """
2203 return self._vnfd.id
2204
2205 @property
2206 def name(self):
2207 """ Returns vnfd name """
2208 return self._vnfd.name
2209
2210 def in_use(self):
2211 """ Returns whether vnfd is in use or not """
2212 return True if self._ref_count > 0 else False
2213
2214 def ref(self):
2215 """ Take a reference on this object """
2216 self._ref_count += 1
2217 return self._ref_count
2218
2219 def unref(self):
2220 """ Release reference on this object """
2221 if self.ref_count < 1:
2222 msg = ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2223 (self.id, self._ref_count))
2224 self._log.critical(msg)
2225 raise VnfRecordError(msg)
2226 self._log.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2227 self.id, self.ref_count)
2228 self._ref_count -= 1
2229 return self._ref_count
2230
2231 @property
2232 def msg(self):
2233 """ Return the message associated with this NetworkServiceDescriptor"""
2234 return self._vnfd
2235
2236 @staticmethod
2237 def path_for_id(vnfd_id):
2238 """ Return path for the passed vnfd_id"""
2239 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
2240
2241 def path(self):
2242 """ Return the path associated with this NetworkServiceDescriptor"""
2243 return VirtualNetworkFunctionDescriptor.path_for_id(self.id)
2244
2245 def update(self, vnfd):
2246 """ Update the Virtual Network Function Descriptor """
2247 if self.in_use():
2248 self._log.error("Cannot update descriptor %s in use refcnt=%d",
2249 self.id, self.ref_count)
2250
2251 # The following loop is added to debug RIFT-13284
2252 for vnf_rec in self._vnfm._vnfrs.values():
2253 if vnf_rec.vnfd_id == self.id:
2254 self._log.error("descriptor %s in used by %s:%s",
2255 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2256 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self.id)
2257 self._vnfd = vnfd
2258
2259 def delete(self):
2260 """ Delete the Virtual Network Function Descriptor """
2261 if self.in_use():
2262 self._log.error("Cannot delete descriptor %s in use refcnt=%d",
2263 self.id)
2264
2265 # The following loop is added to debug RIFT-13284
2266 for vnf_rec in self._vnfm._vnfrs.values():
2267 if vnf_rec.vnfd_id == self.id:
2268 self._log.error("descriptor %s in used by %s:%s",
2269 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2270 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self.id)
2271 self._vnfm.delete_vnfd(self.id)
2272
2273
2274 class VnfdRefCountDtsHandler(object):
2275 """ The VNFD Ref Count DTS handler """
2276 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2277
2278 def __init__(self, dts, log, loop, vnfm):
2279 self._dts = dts
2280 self._log = log
2281 self._loop = loop
2282 self._vnfm = vnfm
2283
2284 self._regh = None
2285
2286 @property
2287 def regh(self):
2288 """ Return registration handle """
2289 return self._regh
2290
2291 @property
2292 def vnfm(self):
2293 """ Return the NS manager instance """
2294 return self._vnfm
2295
2296 @asyncio.coroutine
2297 def register(self):
2298 """ Register for VNFD ref count read from dts """
2299
2300 @asyncio.coroutine
2301 def on_prepare(xact_info, action, ks_path, msg):
2302 """ prepare callback from dts """
2303 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2304 self._log.debug(
2305 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2306 xact_info, action, xpath, msg
2307 )
2308
2309 if action == rwdts.QueryAction.READ:
2310 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2311 path_entry = schema.keyspec_to_entry(ks_path)
2312 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2313 for xpath, msg in vnfd_list:
2314 self._log.debug("Responding to ref count query path:%s, msg:%s",
2315 xpath, msg)
2316 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2317 xpath=xpath,
2318 msg=msg)
2319 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2320 else:
2321 raise VnfRecordError("Not supported operation %s" % action)
2322
2323 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2324 with self._dts.group_create() as group:
2325 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2326 handler=hdl,
2327 flags=rwdts.Flag.PUBLISHER,
2328 )
2329
2330
2331 class VdurDatastore(object):
2332 """
2333 This VdurDatastore is intended to expose select information about a VDUR
2334 such that it can be referenced in a cloud config file. The data that is
2335 exposed does not necessarily follow the structure of the data in the yang
2336 model. This is intentional. The data that are exposed are intended to be
2337 agnostic of the yang model so that changes in the model do not necessarily
2338 require changes to the interface provided to the user. It also means that
2339 the user does not need to be familiar with the RIFT.ware yang models.
2340 """
2341
2342 def __init__(self):
2343 """Create an instance of VdurDatastore"""
2344 self._vdur_data = dict()
2345 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2346
2347 def add(self, vdur):
2348 """Add a new VDUR to the datastore
2349
2350 Arguments:
2351 vdur - a VirtualDeploymentUnitRecord instance
2352
2353 Raises:
2354 A ValueError is raised if the VDUR is (1) None or (2) already in
2355 the datastore.
2356
2357 """
2358 if vdur.vdu_id is None:
2359 raise ValueError('VDURs are required to have an ID')
2360
2361 if vdur.vdu_id in self._vdur_data:
2362 raise ValueError('cannot add a VDUR more than once')
2363
2364 self._vdur_data[vdur.vdu_id] = dict()
2365
2366 def set_if_not_none(key, attr):
2367 if attr is not None:
2368 self._vdur_data[vdur.vdu_id][key] = attr
2369
2370 set_if_not_none('name', vdur._vdud.name)
2371 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2372
2373 def update(self, vdur):
2374 """Update the VDUR information in the datastore
2375
2376 Arguments:
2377 vdur - a GI representation of a VDUR
2378
2379 Raises:
2380 A ValueError is raised if the VDUR is (1) None or (2) already in
2381 the datastore.
2382
2383 """
2384 if vdur.vdu_id is None:
2385 raise ValueError('VNFDs are required to have an ID')
2386
2387 if vdur.vdu_id not in self._vdur_data:
2388 raise ValueError('VNF is not recognized')
2389
2390 def set_or_delete(key, attr):
2391 if attr is None:
2392 if key in self._vdur_data[vdur.vdu_id]:
2393 del self._vdur_data[vdur.vdu_id][key]
2394
2395 else:
2396 self._vdur_data[vdur.vdu_id][key] = attr
2397
2398 set_or_delete('name', vdur._vdud.name)
2399 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2400
2401 def remove(self, vdur_id):
2402 """Remove all of the data associated with specified VDUR
2403
2404 Arguments:
2405 vdur_id - the identifier of a VNFD in the datastore
2406
2407 Raises:
2408 A ValueError is raised if the VDUR is not contained in the
2409 datastore.
2410
2411 """
2412 if vdur_id not in self._vdur_data:
2413 raise ValueError('VNF is not recognized')
2414
2415 del self._vdur_data[vdur_id]
2416
2417 def get(self, expr):
2418 """Retrieve VDUR information from the datastore
2419
2420 An expression should be of the form,
2421
2422 vdu[<id>].<attr>
2423
2424 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2425 the exposed attribute that the user wishes to retrieve.
2426
2427 If the requested data is not available, None is returned.
2428
2429 Arguments:
2430 expr - a string that specifies the data to return
2431
2432 Raises:
2433 A ValueError is raised if the provided expression cannot be parsed.
2434
2435 Returns:
2436 The requested data or None
2437
2438 """
2439 result = self._pattern.match(expr)
2440 if result is None:
2441 raise ValueError('data expression not recognized ({})'.format(expr))
2442
2443 vdur_id, key = result.groups()
2444
2445 if vdur_id not in self._vdur_data:
2446 return None
2447
2448 return self._vdur_data[vdur_id].get(key, None)
2449
2450
2451 class VnfManager(object):
2452 """ The virtual network function manager class """
2453 def __init__(self, dts, log, loop, cluster_name):
2454 self._dts = dts
2455 self._log = log
2456 self._loop = loop
2457 self._cluster_name = cluster_name
2458
2459 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2460 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2461
2462 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2463 self._vnfr_handler,
2464 self._vcs_handler,
2465 VnfdRefCountDtsHandler(dts, log, loop, self)]
2466 self._vnfrs = {}
2467 self._vnfds = {}
2468
2469 @property
2470 def vnfr_handler(self):
2471 """ VNFR dts handler """
2472 return self._vnfr_handler
2473
2474 @property
2475 def vcs_handler(self):
2476 """ VCS dts handler """
2477 return self._vcs_handler
2478
2479 @asyncio.coroutine
2480 def register(self):
2481 """ Register all static DTS handlers """
2482 for hdl in self._dts_handlers:
2483 yield from hdl.register()
2484
2485 @asyncio.coroutine
2486 def run(self):
2487 """ Run this VNFM instance """
2488 self._log.debug("Run VNFManager - registering static DTS handlers""")
2489 yield from self.register()
2490
2491 def get_vnfr(self, vnfr_id):
2492 """ get VNFR by vnfr id """
2493
2494 if vnfr_id not in self._vnfrs:
2495 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2496
2497 return self._vnfrs[vnfr_id]
2498
2499 def create_vnfr(self, vnfr):
2500 """ Create a VNFR instance """
2501 if vnfr.id in self._vnfrs:
2502 msg = "Vnfr id %s already exists" % vnfr.id
2503 self._log.error(msg)
2504 raise VnfRecordError(msg)
2505
2506 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2507 vnfr.id,
2508 vnfr.vnfd_ref)
2509
2510 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2511 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr
2512 )
2513 return self._vnfrs[vnfr.id]
2514
2515 @asyncio.coroutine
2516 def delete_vnfr(self, xact, vnfr):
2517 """ Create a VNFR instance """
2518 if vnfr.vnfr_id in self._vnfrs:
2519 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2520 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2521 del self._vnfrs[vnfr.vnfr_id]
2522
2523 @asyncio.coroutine
2524 def fetch_vnfd(self, vnfd_id):
2525 """ Fetch VNFDs based with the vnfd id"""
2526 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2527 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2528 vnfd = None
2529
2530 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2531
2532 for ent in res_iter:
2533 res = yield from ent
2534 vnfd = res.result
2535
2536 if vnfd is None:
2537 err = "Failed to get Vnfd %s" % vnfd_id
2538 self._log.error(err)
2539 raise VnfRecordError(err)
2540
2541 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2542
2543 return vnfd
2544
2545 @asyncio.coroutine
2546 def get_vnfd_ref(self, vnfd_id):
2547 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2548 vnfd = yield from self.get_vnfd(vnfd_id)
2549 vnfd.ref()
2550 return vnfd
2551
2552 @asyncio.coroutine
2553 def get_vnfd(self, vnfd_id):
2554 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2555 vnfd = None
2556 if vnfd_id not in self._vnfds:
2557 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2558 vnfd = yield from self.fetch_vnfd(vnfd_id)
2559
2560 if vnfd is None:
2561 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2562 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD id:%s", vnfd_id)
2563
2564 if vnfd.id != vnfd_id:
2565 self._log.error("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2566 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2567
2568 if vnfd.id not in self._vnfds:
2569 self.create_vnfd(vnfd)
2570
2571 return self._vnfds[vnfd_id]
2572
2573 def vnfd_in_use(self, vnfd_id):
2574 """ Is this VNFD in use """
2575 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2576 if vnfd_id in self._vnfds:
2577 return self._vnfds[vnfd_id].in_use()
2578 return False
2579
2580 @asyncio.coroutine
2581 def publish_vnfr(self, xact, path, msg):
2582 """ Publish a VNFR """
2583 self._log.debug("publish_vnfr called with path %s, msg %s",
2584 path, msg)
2585 yield from self.vnfr_handler.update(xact, path, msg)
2586
2587 def create_vnfd(self, vnfd):
2588 """ Create a virtual network function descriptor """
2589 self._log.debug("Create virtual networkfunction descriptor - %s", vnfd)
2590 if vnfd.id in self._vnfds:
2591 self._log.error("Cannot create VNFD %s -VNFD id already exists", vnfd)
2592 raise VirtualNetworkFunctionDescriptorError("VNFD already exists-%s", vnfd.id)
2593
2594 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2595 self._log,
2596 self._loop,
2597 self,
2598 vnfd)
2599 return self._vnfds[vnfd.id]
2600
2601 def update_vnfd(self, vnfd):
2602 """ update the Virtual Network Function descriptor """
2603 self._log.debug("Update virtual network function descriptor - %s", vnfd)
2604
2605 if vnfd.id not in self._vnfds:
2606 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
2607 self.create_vnfd(vnfd)
2608 else:
2609 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
2610 self._vnfds[vnfd.id].update(vnfd)
2611
2612 @asyncio.coroutine
2613 def delete_vnfd(self, vnfd_id):
2614 """ Delete the Virtual Network Function descriptor with the passed id """
2615 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2616 if vnfd_id not in self._vnfds:
2617 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2618 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2619
2620 if self._vnfds[vnfd_id].in_use():
2621 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2622 vnfd_id,
2623 self._vnfds[vnfd_id].ref_count)
2624 raise VirtualNetworkFunctionDescriptorRefCountExists(
2625 "Cannot delete :%s, ref_count:%s",
2626 vnfd_id,
2627 self._vnfds[vnfd_id].ref_count)
2628
2629 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2630 try:
2631 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2632 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2633 if os.path.exists(vnfd_dir):
2634 shutil.rmtree(vnfd_dir, ignore_errors=True)
2635 except Exception as e:
2636 self._log.error("Exception in cleaning up VNFD {}: {}".
2637 format(self._vnfds[vnfd_id].name, e))
2638 self._log.exception(e)
2639
2640 del self._vnfds[vnfd_id]
2641
2642 def vnfd_refcount_xpath(self, vnfd_id):
2643 """ xpath for ref count entry """
2644 return (VnfdRefCountDtsHandler.XPATH +
2645 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2646
2647 @asyncio.coroutine
2648 def get_vnfd_refcount(self, vnfd_id):
2649 """ Get the vnfd_list from this VNFM"""
2650 vnfd_list = []
2651 if vnfd_id is None or vnfd_id == "":
2652 for vnfd in self._vnfds.values():
2653 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2654 vnfd_msg.vnfd_id_ref = vnfd.id
2655 vnfd_msg.instance_ref_count = vnfd.ref_count
2656 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2657 elif vnfd_id in self._vnfds:
2658 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2659 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2660 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2661
2662 return vnfd_list
2663
2664
2665 class VnfmTasklet(rift.tasklets.Tasklet):
2666 """ VNF Manager tasklet class """
2667 def __init__(self, *args, **kwargs):
2668 super(VnfmTasklet, self).__init__(*args, **kwargs)
2669 self.rwlog.set_category("rw-mano-log")
2670 self.rwlog.set_subcategory("vnfm")
2671
2672 self._dts = None
2673 self._vnfm = None
2674
2675 def start(self):
2676 try:
2677 super(VnfmTasklet, self).start()
2678 self.log.info("Starting VnfmTasklet")
2679
2680 self.log.setLevel(logging.DEBUG)
2681
2682 self.log.debug("Registering with dts")
2683 self._dts = rift.tasklets.DTS(self.tasklet_info,
2684 RwVnfmYang.get_schema(),
2685 self.loop,
2686 self.on_dts_state_change)
2687
2688 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2689 except Exception:
2690 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2691 raise
2692
2693 def on_instance_started(self):
2694 """ Task insance started callback """
2695 self.log.debug("Got instance started callback")
2696
2697 def stop(self):
2698 try:
2699 self._dts.deinit()
2700 except Exception:
2701 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2702 raise
2703
2704 @asyncio.coroutine
2705 def init(self):
2706 """ Task init callback """
2707 try:
2708 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2709 assert vm_parent_name is not None
2710 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2711 yield from self._vnfm.run()
2712 except Exception:
2713 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2714 raise
2715
2716 @asyncio.coroutine
2717 def run(self):
2718 """ Task run callback """
2719 pass
2720
2721 @asyncio.coroutine
2722 def on_dts_state_change(self, state):
2723 """Take action according to current dts state to transition
2724 application into the corresponding application state
2725
2726 Arguments
2727 state - current dts state
2728 """
2729 switch = {
2730 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2731 rwdts.State.CONFIG: rwdts.State.RUN,
2732 }
2733
2734 handlers = {
2735 rwdts.State.INIT: self.init,
2736 rwdts.State.RUN: self.run,
2737 }
2738
2739 # Transition application to next state
2740 handler = handlers.get(state, None)
2741 if handler is not None:
2742 yield from handler()
2743
2744 # Transition dts to next state
2745 next_state = switch.get(state, None)
2746 if next_state is not None:
2747 self._dts.handle.set_state(next_state)