RIFT-14481 No create-time on VLR
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import ncclient
20 import ncclient.asyncio_manager
21 import os
22 import shutil
23 import sys
24 import tempfile
25 import time
26 import uuid
27 import yaml
28 import requests
29 import json
30
31
32 from collections import deque
33 from collections import defaultdict
34 from enum import Enum
35
36 import gi
37 gi.require_version('RwYang', '1.0')
38 gi.require_version('RwNsdYang', '1.0')
39 gi.require_version('RwDts', '1.0')
40 gi.require_version('RwNsmYang', '1.0')
41 gi.require_version('RwNsrYang', '1.0')
42 gi.require_version('RwTypes', '1.0')
43 gi.require_version('RwVlrYang', '1.0')
44 gi.require_version('RwVnfrYang', '1.0')
45 from gi.repository import (
46 RwYang,
47 RwNsrYang,
48 NsrYang,
49 NsdYang,
50 RwVlrYang,
51 VnfrYang,
52 RwVnfrYang,
53 RwNsmYang,
54 RwsdnYang,
55 RwDts as rwdts,
56 RwTypes,
57 ProtobufC,
58 )
59
60 import rift.tasklets
61 import rift.mano.ncclient
62 import rift.mano.config_data.config
63 import rift.mano.dts as mano_dts
64
65 from . import rwnsm_conman as conman
66 from . import cloud
67 from . import publisher
68 from . import xpath
69 from . import config_value_pool
70 from . import rwvnffgmgr
71 from . import scale_group
72
73
74 class NetworkServiceRecordState(Enum):
75 """ Network Service Record State """
76 INIT = 101
77 VL_INIT_PHASE = 102
78 VNF_INIT_PHASE = 103
79 VNFFG_INIT_PHASE = 104
80 RUNNING = 106
81 SCALING_OUT = 107
82 SCALING_IN = 108
83 TERMINATE = 109
84 TERMINATE_RCVD = 110
85 VL_TERMINATE_PHASE = 111
86 VNF_TERMINATE_PHASE = 112
87 VNFFG_TERMINATE_PHASE = 113
88 TERMINATED = 114
89 FAILED = 115
90 VL_INSTANTIATE = 116
91 VL_TERMINATE = 117
92
93
94 class NetworkServiceRecordError(Exception):
95 """ Network Service Record Error """
96 pass
97
98
99 class NetworkServiceDescriptorError(Exception):
100 """ Network Service Descriptor Error """
101 pass
102
103
104 class VirtualNetworkFunctionRecordError(Exception):
105 """ Virtual Network Function Record Error """
106 pass
107
108
109 class NetworkServiceDescriptorNotFound(Exception):
110 """ Cannot find Network Service Descriptor"""
111 pass
112
113
114 class NetworkServiceDescriptorRefCountExists(Exception):
115 """ Network Service Descriptor reference count exists """
116 pass
117
118
119 class NetworkServiceDescriptorUnrefError(Exception):
120 """ Failed to unref a network service descriptor """
121 pass
122
123
124 class NsrInstantiationFailed(Exception):
125 """ Failed to instantiate network service """
126 pass
127
128
129 class VnfInstantiationFailed(Exception):
130 """ Failed to instantiate virtual network function"""
131 pass
132
133
134 class VnffgInstantiationFailed(Exception):
135 """ Failed to instantiate virtual network function"""
136 pass
137
138
139 class VnfDescriptorError(Exception):
140 """Failed to instantiate virtual network function"""
141 pass
142
143
144 class ScalingOperationError(Exception):
145 pass
146
147
148 class ScaleGroupMissingError(Exception):
149 pass
150
151
152 class PlacementGroupError(Exception):
153 pass
154
155
156 class NsrNsdUpdateError(Exception):
157 pass
158
159
160 class NsrVlUpdateError(NsrNsdUpdateError):
161 pass
162
163
164 class VlRecordState(Enum):
165 """ VL Record State """
166 INIT = 101
167 INSTANTIATION_PENDING = 102
168 ACTIVE = 103
169 TERMINATE_PENDING = 104
170 TERMINATED = 105
171 FAILED = 106
172
173
174 class VnffgRecordState(Enum):
175 """ VNFFG Record State """
176 INIT = 101
177 INSTANTIATION_PENDING = 102
178 ACTIVE = 103
179 TERMINATE_PENDING = 104
180 TERMINATED = 105
181 FAILED = 106
182
183
184 class VnffgRecord(object):
185 """ Vnffg Records class"""
186 SFF_DP_PORT = 4790
187 SFF_MGMT_PORT = 5000
188 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name):
189
190 self._dts = dts
191 self._log = log
192 self._loop = loop
193 self._vnffgmgr = vnffgmgr
194 self._nsr = nsr
195 self._nsr_name = nsr_name
196 self._vnffgd_msg = vnffgd_msg
197 if sdn_account_name is None:
198 self._sdn_account_name = ''
199 else:
200 self._sdn_account_name = sdn_account_name
201
202 self._vnffgr_id = str(uuid.uuid4())
203 self._vnffgr_rsp_id = list()
204 self._vnffgr_state = VnffgRecordState.INIT
205
206 @property
207 def id(self):
208 """ VNFFGR id """
209 return self._vnffgr_id
210
211 @property
212 def state(self):
213 """ state of this VNF """
214 return self._vnffgr_state
215
216 def fetch_vnffgr(self):
217 """
218 Get VNFFGR message to be published
219 """
220
221 if self._vnffgr_state == VnffgRecordState.INIT:
222 vnffgr_dict = {"id": self._vnffgr_id,
223 "vnffgd_id_ref": self._vnffgd_msg.id,
224 "vnffgd_name_ref": self._vnffgd_msg.name,
225 "sdn_account": self._sdn_account_name,
226 "operational_status": 'init',
227 }
228 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
229 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
230 vnffgr_dict = {"id": self._vnffgr_id,
231 "vnffgd_id_ref": self._vnffgd_msg.id,
232 "vnffgd_name_ref": self._vnffgd_msg.name,
233 "sdn_account": self._sdn_account_name,
234 "operational_status": 'terminated',
235 }
236 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
237 else:
238 try:
239 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
240 except Exception:
241 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
242 self._vnffgr_state = VnffgRecordState.FAILED
243 vnffgr_dict = {"id": self._vnffgr_id,
244 "vnffgd_id_ref": self._vnffgd_msg.id,
245 "vnffgd_name_ref": self._vnffgd_msg.name,
246 "sdn_account": self._sdn_account_name,
247 "operational_status": 'failed',
248 }
249 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
250
251 return vnffgr
252
253 @asyncio.coroutine
254 def vnffgr_create_msg(self):
255 """ Virtual Link Record message for Creating VLR in VNS """
256 vnffgr_dict = {"id": self._vnffgr_id,
257 "vnffgd_id_ref": self._vnffgd_msg.id,
258 "vnffgd_name_ref": self._vnffgd_msg.name,
259 "sdn_account": self._sdn_account_name,
260 }
261 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
262 for rsp in self._vnffgd_msg.rsp:
263 vnffgr_rsp = vnffgr.rsp.add()
264 vnffgr_rsp.id = str(uuid.uuid4())
265 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
266 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
267 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
268 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
269 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
270 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
271 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
272 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
273 self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
274 else:
275 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref)
276 continue
277
278 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
279 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
280 vnfr_cp_ref.hop_number = rsp_cp_ref.order
281 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
282 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
283 for nsr_vnfr in self._nsr.vnfrs.values():
284 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
285 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
286 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
287 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
288 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
289
290 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
291 self._log.debug(" Received VNFR is %s", vnfr)
292 while vnfr.operational_status != 'running':
293 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
294 if vnfr.operational_status == 'failed':
295 self._log.error("Fetching VNFR for %s failed", vnfr.id)
296 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
297 yield from asyncio.sleep(2, loop=self._loop)
298 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
299 self._log.debug("Received VNFR is %s", vnfr)
300
301 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
302 for cp in vnfr.connection_point:
303 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
304 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
305 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
306 for vdu in vnfr.vdur:
307 for ext_intf in vdu.external_interface:
308 if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref:
309 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
310 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
311 vnfr_cp_ref.connection_point_params.vm_id)
312 break
313
314 vnfr_cp_ref.connection_point_params.address = cp.ip_address
315 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
316
317 for vnffgd_classifier in self._vnffgd_msg.classifier:
318 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
319 if len(_rsp) > 0:
320 rsp_id_ref = _rsp[0].id
321 rsp_name = _rsp[0].name
322 else:
323 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
324 continue
325 vnffgr_classifier = vnffgr.classifier.add()
326 vnffgr_classifier.id = vnffgd_classifier.id
327 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
328 _rsp[0].classifier_name = vnffgr_classifier.name
329 vnffgr_classifier.rsp_id_ref = rsp_id_ref
330 vnffgr_classifier.rsp_name = rsp_name
331 for nsr_vnfr in self._nsr.vnfrs.values():
332 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
333 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
334 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
335 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
336 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
337
338 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
339 vnffgr_classifier.sff_name = nsr_vnfr.name
340
341 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
342 self._log.debug(" Received VNFR is %s", vnfr)
343 while vnfr.operational_status != 'running':
344 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
345 if vnfr.operational_status == 'failed':
346 self._log.error("Fetching VNFR for %s failed", vnfr.id)
347 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
348 yield from asyncio.sleep(2, loop=self._loop)
349 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
350 self._log.debug("Received VNFR is %s", vnfr)
351
352 for cp in vnfr.connection_point:
353 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
354 vnffgr_classifier.port_id = cp.connection_point_id
355 vnffgr_classifier.ip_address = cp.ip_address
356 for vdu in vnfr.vdur:
357 for ext_intf in vdu.external_interface:
358 if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref:
359 vnffgr_classifier.vm_id = vdu.vim_id
360 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
361 vnfr_cp_ref.connection_point_params.vm_id)
362 break
363
364 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
365 return vnffgr
366
367 @asyncio.coroutine
368 def vnffgr_nsr_sff_list(self):
369 """ SFF List for VNFR """
370 sff_list = {}
371 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
372
373 for nsr_vnfr in self._nsr.vnfrs.values():
374 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
375 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
376 self._log.debug(" Received VNFR is %s", vnfr)
377 while vnfr.operational_status != 'running':
378 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
379 if vnfr.operational_status == 'failed':
380 self._log.error("Fetching VNFR for %s failed", vnfr.id)
381 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
382 yield from asyncio.sleep(2, loop=self._loop)
383 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
384 self._log.debug("Received VNFR is %s", vnfr)
385
386 sff = RwsdnYang.VNFFGSff()
387 sff_list[nsr_vnfr.vnfd.id] = sff
388 sff.name = nsr_vnfr.name
389 sff.function_type = nsr_vnfr.vnfd.service_function_chain
390
391 sff.mgmt_address = vnfr.mgmt_interface.ip_address
392 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
393 for cp in vnfr.connection_point:
394 sff_dp = sff.dp_endpoints.add()
395 sff_dp.name = self._nsr.name + '.' + cp.name
396 sff_dp.address = cp.ip_address
397 sff_dp.port = VnffgRecord.SFF_DP_PORT
398 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
399 for sf_name in sf_list:
400 _sf = sff.vnfr_list.add()
401 _sf.vnfr_name = sf_name
402
403 return sff_list
404
405 @asyncio.coroutine
406 def instantiate(self):
407 """ Instantiate this VNFFG """
408
409 self._log.info("Instaniating VNFFGR with vnffgd %s",
410 self._vnffgd_msg)
411
412
413 vnffgr_request = yield from self.vnffgr_create_msg()
414 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
415
416 try:
417 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
418 except Exception as e:
419 self._log.exception("VNFFG instantiation failed: %s", str(e))
420 self._vnffgr_state = VnffgRecordState.FAILED
421 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
422
423 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
424
425 self._log.info("Instantiated VNFFGR :%s", vnffgr)
426 self._vnffgr_state = VnffgRecordState.ACTIVE
427
428 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
429 yield from self._nsr.update_state()
430
431 def vnffgr_in_vnffgrm(self):
432 """ Is there a VNFR record in VNFM """
433 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
434 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
435 self._vnffgr_state == VnffgRecordState.FAILED):
436 return True
437
438 return False
439
440 @asyncio.coroutine
441 def terminate(self):
442 """ Terminate this VNFFGR """
443 if not self.vnffgr_in_vnffgrm():
444 self._log.error("Ignoring terminate request for id %s in state %s",
445 self.id, self._vnffgr_state)
446 return
447
448 self._log.info("Terminating VNFFGR id:%s", self.id)
449 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
450
451 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
452
453 self._vnffgr_state = VnffgRecordState.TERMINATED
454 self._log.debug("Terminated VNFFGR id:%s", self.id)
455
456
457 class VirtualLinkRecord(object):
458 """ Virtual Link Records class"""
459 @staticmethod
460 @asyncio.coroutine
461 def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False):
462 """Creates a new VLR object based on the given data.
463
464 If restart mode is enabled, then we look for existing records in the
465 DTS and create a VLR records using the exiting data(ID)
466
467 Returns:
468 VirtualLinkRecord
469 """
470 vlr_obj = VirtualLinkRecord(
471 dts,
472 log,
473 loop,
474 nsr_name,
475 vld_msg,
476 cloud_account_name,
477 om_datacenter,
478 ip_profile,
479 nsr_id,
480 )
481
482 if restart_mode:
483 res_iter = yield from dts.query_read(
484 "D,/vlr:vlr-catalog/vlr:vlr",
485 rwdts.XactFlag.MERGE)
486
487 for fut in res_iter:
488 response = yield from fut
489 vlr = response.result
490
491 # Check if the record is already present, if so use the ID of
492 # the existing record. Since the name of the record is uniquely
493 # formed we can use it as a search key!
494 if vlr.name == vlr_obj.name:
495 vlr_obj.reset_id(vlr.id)
496 break
497
498 return vlr_obj
499
500 def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id):
501 self._dts = dts
502 self._log = log
503 self._loop = loop
504 self._nsr_name = nsr_name
505 self._vld_msg = vld_msg
506 self._cloud_account_name = cloud_account_name
507 self._om_datacenter_name = om_datacenter
508 self._assigned_subnet = None
509 self._nsr_id = nsr_id
510 self._ip_profile = ip_profile
511 self._vlr_id = str(uuid.uuid4())
512 self._state = VlRecordState.INIT
513 self._prev_state = None
514 self._create_time = int(time.time())
515
516 @property
517 def xpath(self):
518 """ path for this object """
519 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id)
520
521 @property
522 def id(self):
523 """ VLR id """
524 return self._vlr_id
525
526 @property
527 def nsr_name(self):
528 """ Get NSR name for this VL """
529 return self.nsr_name
530
531 @property
532 def vld_msg(self):
533 """ Virtual Link Desciptor """
534 return self._vld_msg
535
536 @property
537 def assigned_subnet(self):
538 """ Subnet assigned to this VL"""
539 return self._assigned_subnet
540
541 @property
542 def name(self):
543 """
544 Get the name for this VLR.
545 VLR name is "nsr name:VLD name"
546 """
547 if self.vld_msg.vim_network_name:
548 return self.vld_msg.vim_network_name
549 elif self.vld_msg.name == "multisite":
550 # This is a temporary hack to identify manually provisioned inter-site network
551 return self.vld_msg.name
552 else:
553 return self._nsr_name + "." + self.vld_msg.name
554
555 @property
556 def cloud_account_name(self):
557 """ Cloud account that this VLR should be created in """
558 return self._cloud_account_name
559
560 @property
561 def om_datacenter_name(self):
562 """ Datacenter that this VLR should be created in """
563 return self._om_datacenter_name
564
565 @staticmethod
566 def vlr_xpath(vlr):
567 """ Get the VLR path from VLR """
568 return (VirtualLinkRecord.XPATH + "[vlr:id = '{}']").format(vlr.id)
569
570 @property
571 def state(self):
572 """ VLR state """
573 return self._state
574
575 @state.setter
576 def state(self, value):
577 """ VLR set state """
578 self._state = value
579
580 @property
581 def prev_state(self):
582 """ VLR previous state """
583 return self._prev_state
584
585 @prev_state.setter
586 def prev_state(self, value):
587 """ VLR set previous state """
588 self._prev_state = value
589
590 @property
591 def vlr_msg(self):
592 """ Virtual Link Record message for Creating VLR in VNS """
593 vld_fields = ["short_name",
594 "vendor",
595 "description",
596 "version",
597 "type_yang",
598 "vim_network_name",
599 "provider_network"]
600
601 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
602 if k in vld_fields}
603
604 vlr_dict = {"id": self._vlr_id,
605 "nsr_id_ref": self._nsr_id,
606 "vld_ref": self.vld_msg.id,
607 "name": self.name,
608 "create_time": self._create_time,
609 "cloud_account": self.cloud_account_name,
610 "om_datacenter": self.om_datacenter_name,
611 }
612
613 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
614 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
615
616 vlr_dict.update(vld_copy_dict)
617 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
618 return vlr
619
620 def reset_id(self, vlr_id):
621 self._vlr_id = vlr_id
622
623 def create_nsr_vlr_msg(self, vnfrs):
624 """ The VLR message"""
625 nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
626 nsr_vlr.vlr_ref = self._vlr_id
627 nsr_vlr.assigned_subnet = self.assigned_subnet
628 nsr_vlr.cloud_account = self.cloud_account_name
629 nsr_vlr.om_datacenter = self.om_datacenter_name
630
631 for conn in self.vld_msg.vnfd_connection_point_ref:
632 for vnfr in vnfrs:
633 if (vnfr.vnfd.id == conn.vnfd_id_ref and
634 vnfr.member_vnf_index == conn.member_vnf_index_ref and
635 self.cloud_account_name == vnfr.cloud_account_name and
636 self.om_datacenter_name == vnfr.om_datacenter_name):
637 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
638 cp_entry.vnfr_id = vnfr.id
639 cp_entry.connection_point = conn.vnfd_connection_point_ref
640
641 return nsr_vlr
642
643 @asyncio.coroutine
644 def instantiate(self):
645 """ Instantiate this VL """
646
647 self._log.debug("Instaniating VLR key %s, vld %s",
648 self.xpath, self._vld_msg)
649 vlr = None
650 self._state = VlRecordState.INSTANTIATION_PENDING
651 self._log.debug("Executing VL create path:%s msg:%s",
652 self.xpath, self.vlr_msg)
653
654 with self._dts.transaction(flags=0) as xact:
655 block = xact.block_create()
656 block.add_query_create(self.xpath, self.vlr_msg)
657 self._log.debug("Executing VL create path:%s msg:%s",
658 self.xpath, self.vlr_msg)
659 res_iter = yield from block.execute(now=True)
660 for ent in res_iter:
661 res = yield from ent
662 vlr = res.result
663
664 if vlr is None:
665 self._state = VlRecordState.FAILED
666 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
667
668 if vlr.operational_status == 'failed':
669 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
670 self._state = VlRecordState.FAILED
671 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
672
673 self._log.info("Instantiated VL with xpath %s and vlr:%s",
674 self.xpath, vlr)
675 self._state = VlRecordState.ACTIVE
676 self._assigned_subnet = vlr.assigned_subnet
677
678 def vlr_in_vns(self):
679 """ Is there a VLR record in VNS """
680 if (self._state == VlRecordState.ACTIVE or
681 self._state == VlRecordState.INSTANTIATION_PENDING or
682 self._state == VlRecordState.TERMINATE_PENDING or
683 self._state == VlRecordState.FAILED):
684 return True
685
686 return False
687
688 @asyncio.coroutine
689 def terminate(self):
690 """ Terminate this VL """
691 if not self.vlr_in_vns():
692 self._log.debug("Ignoring terminate request for id %s in state %s",
693 self.id, self._state)
694 return
695
696 self._log.debug("Terminating VL id:%s", self.id)
697 self._state = VlRecordState.TERMINATE_PENDING
698
699 with self._dts.transaction(flags=0) as xact:
700 block = xact.block_create()
701 block.add_query_delete(self.xpath)
702 yield from block.execute(flags=0, now=True)
703
704 self._state = VlRecordState.TERMINATED
705 self._log.debug("Terminated VL id:%s", self.id)
706
707
708 class VnfRecordState(Enum):
709 """ Vnf Record State """
710 INIT = 101
711 INSTANTIATION_PENDING = 102
712 ACTIVE = 103
713 TERMINATE_PENDING = 104
714 TERMINATED = 105
715 FAILED = 106
716
717
718 class VirtualNetworkFunctionRecord(object):
719 """ Virtual Network Function Record class"""
720 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
721
722 @staticmethod
723 @asyncio.coroutine
724 def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name,
725 cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id,
726 placement_groups, restart_mode=False):
727 """Creates a new VNFR object based on the given data.
728
729 If restart mode is enabled, then we look for existing records in the
730 DTS and create a VNFR records using the exiting data(ID)
731
732 Returns:
733 VirtualNetworkFunctionRecord
734 """
735 vnfr_obj = VirtualNetworkFunctionRecord(
736 dts,
737 log,
738 loop,
739 vnfd,
740 const_vnfd_msg,
741 nsd_id,
742 nsr_name,
743 cloud_account_name,
744 om_datacenter_name,
745 nsr_id,
746 group_name,
747 group_instance_id,
748 placement_groups,
749 restart_mode=restart_mode)
750
751 if restart_mode:
752 res_iter = yield from dts.query_read(
753 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
754 rwdts.XactFlag.MERGE)
755
756 for fut in res_iter:
757 response = yield from fut
758 vnfr = response.result
759
760 if vnfr.name == vnfr_obj.name:
761 vnfr_obj.reset_id(vnfr.id)
762 break
763
764 return vnfr_obj
765
766 def __init__(self,
767 dts,
768 log,
769 loop,
770 vnfd,
771 const_vnfd_msg,
772 nsd_id,
773 nsr_name,
774 cloud_account_name,
775 om_datacenter_name,
776 nsr_id,
777 group_name=None,
778 group_instance_id=None,
779 placement_groups = [],
780 restart_mode = False):
781 self._dts = dts
782 self._log = log
783 self._loop = loop
784 self._vnfd = vnfd
785 self._const_vnfd_msg = const_vnfd_msg
786 self._nsd_id = nsd_id
787 self._nsr_name = nsr_name
788 self._nsr_id = nsr_id
789 self._cloud_account_name = cloud_account_name
790 self._om_datacenter_name = om_datacenter_name
791 self._group_name = group_name
792 self._group_instance_id = group_instance_id
793 self._placement_groups = placement_groups
794 self._config_status = NsrYang.ConfigStates.INIT
795
796 self._prev_state = VnfRecordState.INIT
797 self._state = VnfRecordState.INIT
798 self._state_failed_reason = None
799
800 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
801 self.configure()
802
803 self._vnfr_id = str(uuid.uuid4())
804 self._name = None
805 self._vnfr_msg = self.create_vnfr_msg()
806 self._log.debug("Set VNFR {} config type to {}".
807 format(self.name, self.config_type))
808 self.restart_mode = restart_mode
809
810
811 if group_name is None and group_instance_id is not None:
812 raise ValueError("Group instance id must not be provided with an empty group name")
813
814 @property
815 def id(self):
816 """ VNFR id """
817 return self._vnfr_id
818
819 @property
820 def xpath(self):
821 """ VNFR xpath """
822 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id)
823
824 @property
825 def vnfr_msg(self):
826 """ VNFR message """
827 return self._vnfr_msg
828
829 @property
830 def const_vnfr_msg(self):
831 """ VNFR message """
832 return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name)
833
834 @property
835 def vnfd(self):
836 """ vnfd """
837 return self._vnfd
838
839 @property
840 def cloud_account_name(self):
841 """ Cloud account that this VNF should be created in """
842 return self._cloud_account_name
843
844 @property
845 def om_datacenter_name(self):
846 """ Datacenter that this VNF should be created in """
847 return self._om_datacenter_name
848
849
850 @property
851 def active(self):
852 """ Is this VNF actve """
853 return True if self._state == VnfRecordState.ACTIVE else False
854
855 @property
856 def state(self):
857 """ state of this VNF """
858 return self._state
859
860 @property
861 def state_failed_reason(self):
862 """ Error message in case this VNF is in failed state """
863 return self._state_failed_reason
864
865 @property
866 def member_vnf_index(self):
867 """ Member VNF index """
868 return self._const_vnfd_msg.member_vnf_index
869
870 @property
871 def nsr_name(self):
872 """ NSR name"""
873 return self._nsr_name
874
875 @property
876 def name(self):
877 """ Name of this VNFR """
878 if self._name is not None:
879 return self._name
880
881 name_tags = [self._nsr_name]
882
883 if self._group_name is not None:
884 name_tags.append(self._group_name)
885
886 if self._group_instance_id is not None:
887 name_tags.append(str(self._group_instance_id))
888
889 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
890
891 self._name = "__".join(name_tags)
892
893 return self._name
894
895 @staticmethod
896 def vnfr_xpath(vnfr):
897 """ Get the VNFR path from VNFR """
898 return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id)
899
900 @property
901 def config_type(self):
902 cfg_types = ['netconf', 'juju', 'script']
903 for method in cfg_types:
904 if self._vnfd.vnf_configuration.has_field(method):
905 return method
906 return 'none'
907
908 @property
909 def config_status(self):
910 """Return the config status as YANG ENUM string"""
911 self._log.debug("Map VNFR {} config status {} ({})".
912 format(self.name, self._config_status, self.config_type))
913 if self.config_type == 'none':
914 return 'config_not_needed'
915 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
916 return 'configured'
917 elif self._config_status == NsrYang.ConfigStates.FAILED:
918 return 'failed'
919
920 return 'configuring'
921
922 def set_state(self, state):
923 """ set the state of this object """
924 self._prev_state = self._state
925 self._state = state
926
927 def reset_id(self, vnfr_id):
928 self._vnfr_id = vnfr_id
929 self._vnfr_msg = self.create_vnfr_msg()
930
931 def configure(self):
932 self.config_store.merge_vnfd_config(
933 self._nsd_id,
934 self._vnfd,
935 self.member_vnf_index,
936 )
937
938 def create_vnfr_msg(self):
939 """ VNFR message for this VNFR """
940 vnfd_fields = [
941 "short_name",
942 "vendor",
943 "description",
944 "version",
945 "type_yang",
946 ]
947 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
948 vnfr_dict = {
949 "id": self.id,
950 "nsr_id_ref": self._nsr_id,
951 "vnfd_ref": self.vnfd.id,
952 "name": self.name,
953 "cloud_account": self._cloud_account_name,
954 "om_datacenter": self._om_datacenter_name,
955 "config_status": self.config_status
956 }
957 vnfr_dict.update(vnfd_copy_dict)
958
959 vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
960 vnfr.member_vnf_index_ref = self.member_vnf_index
961 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
962
963 if self._vnfd.mgmt_interface.has_field("port"):
964 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
965
966 for group_info in self._placement_groups:
967 group = vnfr.placement_groups_info.add()
968 group.from_dict(group_info.as_dict())
969
970 # UI expects the monitoring param field to exist
971 vnfr.monitoring_param = []
972
973 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
974 return vnfr
975
976 @asyncio.coroutine
977 def update_vnfm(self):
978 self._log.debug("Send an update to VNFM for VNFR {} with {}".
979 format(self.name, self.vnfr_msg))
980 yield from self._dts.query_update(
981 self.xpath,
982 rwdts.XactFlag.TRACE,
983 self.vnfr_msg
984 )
985
986 def get_config_status(self):
987 """Return the config status as YANG ENUM"""
988 return self._config_status
989
990 @asyncio.coroutine
991 def set_config_status(self, status):
992
993 def status_to_string(status):
994 status_dc = {
995 NsrYang.ConfigStates.INIT : 'init',
996 NsrYang.ConfigStates.CONFIGURING : 'configuring',
997 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
998 NsrYang.ConfigStates.CONFIGURED : 'configured',
999 NsrYang.ConfigStates.FAILED : 'failed',
1000 }
1001
1002 return status_dc[status]
1003
1004 self._log.debug("Update VNFR {} from {} ({}) to {}".
1005 format(self.name, self._config_status,
1006 self.config_type, status))
1007 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1008 self._log.error("Updating already configured VNFR {}".
1009 format(self.name))
1010 return
1011
1012 if self._config_status != status:
1013 try:
1014 self._config_status = status
1015 # I don't think this is used. Original implementor can check.
1016 # Caused Exception, so corrected it by status_to_string
1017 # But not sure whats the use of this variable?
1018 self.vnfr_msg.config_status = status_to_string(status)
1019 except Exception as e:
1020 self._log.error("Exception=%s", str(e))
1021 pass
1022
1023 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1024
1025 if self._config_status != NsrYang.ConfigStates.INIT:
1026 try:
1027 # Publish only after VNFM has the VNFR created
1028 yield from self.update_vnfm()
1029 except Exception as e:
1030 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1031 format(status, self.name, e))
1032 self._log.exception(e)
1033
1034 def is_configured(self):
1035 if self.config_type == 'none':
1036 return True
1037
1038 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1039 return True
1040
1041 return False
1042
1043 @asyncio.coroutine
1044 def instantiate(self, nsr):
1045 """ Instantiate this VNFR"""
1046
1047 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1048 self.xpath, self._vnfd)
1049
1050 self._log.debug("Create VNF with xpath %s and vnfr %s",
1051 self.xpath, self.vnfr_msg)
1052
1053 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1054
1055 def find_vlr_for_cp(conn):
1056 """ Find VLR for the given connection point """
1057 for vlr in nsr.vlrs:
1058 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1059 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1060 vnfd_cp.vnfd_connection_point_ref == conn.name and
1061 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1062 vlr.cloud_account_name == self.cloud_account_name):
1063 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1064 conn.name, self.member_vnf_index)
1065 return vlr
1066 return None
1067
1068 # For every connection point in the VNFD fill in the identifier
1069 for conn_p in self._vnfd.connection_point:
1070 cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1071 cpr.name = conn_p.name
1072 cpr.type_yang = conn_p.type_yang
1073 vlr_ref = find_vlr_for_cp(conn_p)
1074 if vlr_ref is None:
1075 msg = "Failed to find VLR for cp = %s" % conn_p.name
1076 self._log.debug("%s", msg)
1077 # raise VirtualNetworkFunctionRecordError(msg)
1078 continue
1079
1080 cpr.vlr_ref = vlr_ref.id
1081 self.vnfr_msg.connection_point.append(cpr)
1082 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1083 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd_ref)
1084
1085 if not self.restart_mode:
1086 yield from self._dts.query_create(self.xpath,
1087 0, # this is sub
1088 self.vnfr_msg)
1089 else:
1090 yield from self._dts.query_update(self.xpath,
1091 0,
1092 self.vnfr_msg)
1093
1094 self._log.info("Created VNF with xpath %s and vnfr %s",
1095 self.xpath, self.vnfr_msg)
1096
1097 self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1098 self.xpath, self._vnfd, self.vnfr_msg)
1099
1100 @asyncio.coroutine
1101 def update_state(self, vnfr_msg):
1102 """ Update this VNFR"""
1103 if vnfr_msg.operational_status == "running":
1104 if self.vnfr_msg.operational_status != "running":
1105 yield from self.is_active()
1106 elif vnfr_msg.operational_status == "failed":
1107 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1108
1109 @asyncio.coroutine
1110 def is_active(self):
1111 """ This VNFR is active """
1112 self._log.debug("VNFR %s is active", self._vnfr_id)
1113 self.set_state(VnfRecordState.ACTIVE)
1114
1115 @asyncio.coroutine
1116 def instantiation_failed(self, failed_reason=None):
1117 """ This VNFR instantiation failed"""
1118 self._log.error("VNFR %s instantiation failed", self._vnfr_id)
1119 self.set_state(VnfRecordState.FAILED)
1120 self._state_failed_reason = failed_reason
1121
1122 def vnfr_in_vnfm(self):
1123 """ Is there a VNFR record in VNFM """
1124 if (self._state == VnfRecordState.ACTIVE or
1125 self._state == VnfRecordState.INSTANTIATION_PENDING or
1126 self._state == VnfRecordState.FAILED):
1127 return True
1128
1129 return False
1130
1131 @asyncio.coroutine
1132 def terminate(self):
1133 """ Terminate this VNF """
1134 if not self.vnfr_in_vnfm():
1135 self._log.debug("Ignoring terminate request for id %s in state %s",
1136 self.id, self._state)
1137 return
1138
1139 self._log.debug("Terminating VNF id:%s", self.id)
1140 self.set_state(VnfRecordState.TERMINATE_PENDING)
1141 with self._dts.transaction(flags=0) as xact:
1142 block = xact.block_create()
1143 block.add_query_delete(self.xpath)
1144 yield from block.execute(flags=0)
1145 self.set_state(VnfRecordState.TERMINATED)
1146 self._log.debug("Terminated VNF id:%s", self.id)
1147
1148
1149 class NetworkServiceStatus(object):
1150 """ A class representing the Network service's status """
1151 MAX_EVENTS_RECORDED = 10
1152 """ Network service Status class"""
1153 def __init__(self, dts, log, loop):
1154 self._dts = dts
1155 self._log = log
1156 self._loop = loop
1157
1158 self._state = NetworkServiceRecordState.INIT
1159 self._events = deque([])
1160
1161 @asyncio.coroutine
1162 def create_notification(self, evt, evt_desc, evt_details):
1163 xp = "N,/rw-nsr:nsm-notification"
1164 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1165 notif.event = evt
1166 notif.description = evt_desc
1167 notif.details = evt_details if evt_details is not None else None
1168
1169 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1170 self._log.info("Notification called by creating dts query: %s", notif)
1171
1172 def record_event(self, evt, evt_desc, evt_details):
1173 """ Record an event """
1174 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1175 evt, evt_desc, len(self._events))
1176 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1177 self._events.popleft()
1178 self._events.append((int(time.time()), evt, evt_desc,
1179 evt_details if evt_details is not None else None))
1180
1181 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1182
1183 def set_state(self, state):
1184 """ set the state of this status object """
1185 self._state = state
1186
1187 def yang_str(self):
1188 """ Return the state as a yang enum string """
1189 state_to_str_map = {"INIT": "init",
1190 "VL_INIT_PHASE": "vl_init_phase",
1191 "VNF_INIT_PHASE": "vnf_init_phase",
1192 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1193 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1194 "RUNNING": "running",
1195 "SCALING_OUT": "scaling_out",
1196 "SCALING_IN": "scaling_in",
1197 "TERMINATE_RCVD": "terminate_rcvd",
1198 "TERMINATE": "terminate",
1199 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1200 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1201 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1202 "TERMINATED": "terminated",
1203 "FAILED": "failed",
1204 "VL_INSTANTIATE": "vl_instantiate",
1205 "VL_TERMINATE": "vl_terminate",
1206 }
1207 return state_to_str_map[self._state.name]
1208
1209 @property
1210 def state(self):
1211 """ State of this status object """
1212 return self._state
1213
1214 @property
1215 def msg(self):
1216 """ Network Service Record as a message"""
1217 event_list = []
1218 idx = 1
1219 for entry in self._events:
1220 event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1221 event.id = idx
1222 idx += 1
1223 event.timestamp, event.event, event.description, event.details = entry
1224 event_list.append(event)
1225 return event_list
1226
1227
1228 class NetworkServiceRecord(object):
1229 """ Network service record """
1230 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1231
1232 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False):
1233 self._dts = dts
1234 self._log = log
1235 self._loop = loop
1236 self._nsm = nsm
1237 self._nsr_cfg_msg = nsr_cfg_msg
1238 self._nsm_plugin = nsm_plugin
1239 self._sdn_account_name = sdn_account_name
1240
1241 self._nsd = None
1242 self._nsr_msg = None
1243 self._nsr_regh = None
1244 self._key_pairs = key_pairs
1245 self._vlrs = []
1246 self._vnfrs = {}
1247 self._vnfds = {}
1248 self._vnffgrs = {}
1249 self._param_pools = {}
1250 self._scaling_groups = {}
1251 self._create_time = int(time.time())
1252 self._op_status = NetworkServiceStatus(dts, log, loop)
1253 self._config_status = NsrYang.ConfigStates.CONFIGURING
1254 self._config_status_details = None
1255 self._job_id = 0
1256 self.restart_mode = restart_mode
1257 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1258 self._debug_running = False
1259 self._is_active = False
1260 self._vl_phase_completed = False
1261 self._vnf_phase_completed = False
1262
1263
1264 # Initalise the state to init
1265 # The NSR moves through the following transitions
1266 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1267 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1268 # 3. VNFS_READY - READY when the NSR is published
1269
1270 self.set_state(NetworkServiceRecordState.INIT)
1271
1272 self.substitute_input_parameters = InputParameterSubstitution(self._log)
1273
1274 @property
1275 def nsm_plugin(self):
1276 """ NSM Plugin """
1277 return self._nsm_plugin
1278
1279 def set_state(self, state):
1280 """ Set state for this NSR"""
1281 self._log.debug("Setting state to %s", state)
1282 # We are in init phase and is moving to the next state
1283 # The new state could be a FAILED state or VNF_INIIT_PHASE
1284 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1285 self._vl_phase_completed = True
1286
1287 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1288 self._vnf_phase_completed = True
1289
1290 self._op_status.set_state(state)
1291
1292 @property
1293 def id(self):
1294 """ Get id for this NSR"""
1295 return self._nsr_cfg_msg.id
1296
1297 @property
1298 def name(self):
1299 """ Name of this network service record """
1300 return self._nsr_cfg_msg.name
1301
1302 @property
1303 def cloud_account_name(self):
1304 return self._nsr_cfg_msg.cloud_account
1305
1306 @property
1307 def om_datacenter_name(self):
1308 if self._nsr_cfg_msg.has_field('om_datacenter'):
1309 return self._nsr_cfg_msg.om_datacenter
1310 return None
1311
1312 @property
1313 def state(self):
1314 """State of this NetworkServiceRecord"""
1315 return self._op_status.state
1316
1317 @property
1318 def active(self):
1319 """ Is this NSR active ?"""
1320 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1321
1322 @property
1323 def vlrs(self):
1324 """ VLRs associated with this NSR"""
1325 return self._vlrs
1326
1327 @property
1328 def vnfrs(self):
1329 """ VNFRs associated with this NSR"""
1330 return self._vnfrs
1331
1332 @property
1333 def vnffgrs(self):
1334 """ VNFFGRs associated with this NSR"""
1335 return self._vnffgrs
1336
1337 @property
1338 def scaling_groups(self):
1339 """ Scaling groups associated with this NSR """
1340 return self._scaling_groups
1341
1342 @property
1343 def param_pools(self):
1344 """ Parameter value pools associated with this NSR"""
1345 return self._param_pools
1346
1347 @property
1348 def nsr_cfg_msg(self):
1349 return self._nsr_cfg_msg
1350
1351 @nsr_cfg_msg.setter
1352 def nsr_cfg_msg(self, msg):
1353 self._nsr_cfg_msg = msg
1354
1355 @property
1356 def nsd_msg(self):
1357 """ NSD Protobuf for this NSR """
1358 if self._nsd is not None:
1359 return self._nsd
1360 self._nsd = self._nsr_cfg_msg.nsd
1361 return self._nsd
1362
1363 @property
1364 def nsd_id(self):
1365 """ NSD ID for this NSR """
1366 return self.nsd_msg.id
1367
1368 @property
1369 def job_id(self):
1370 ''' Get a new job id for config primitive'''
1371 self._job_id += 1
1372 return self._job_id
1373
1374 @property
1375 def config_status(self):
1376 """ Config status for NSR """
1377 return self._config_status
1378
1379 def resolve_placement_group_cloud_construct(self, input_group):
1380 """
1381 Returns the cloud specific construct for placement group
1382 """
1383 copy_dict = ['name', 'requirement', 'strategy']
1384
1385 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1386 if group_info.placement_group_ref == input_group.name:
1387 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1388 group_dict = {k:v for k,v in
1389 group_info.as_dict().items() if k != 'placement_group_ref'}
1390 for param in copy_dict:
1391 group_dict.update({param: getattr(input_group, param)})
1392 group.from_dict(group_dict)
1393 return group
1394 return None
1395
1396
1397 def __str__(self):
1398 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1399 self.name, self.nsd_id, self.cloud_account_name
1400 )
1401
1402 def _get_vnfd(self, vnfd_id, config_xact):
1403 """ Fetch vnfd msg for the passed vnfd id """
1404 return self._nsm.get_vnfd(vnfd_id, config_xact)
1405
1406 def _get_vnfd_cloud_account(self, vnfd_member_index):
1407 """ Fetch Cloud Account for the passed vnfd id """
1408 if self._nsr_cfg_msg.vnf_cloud_account_map:
1409 vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \
1410 if vnfd_member_index == vnf.member_vnf_index_ref]
1411 if vim_accounts and vim_accounts[0]:
1412 return vim_accounts[0]
1413 return (self.cloud_account_name,self.om_datacenter_name)
1414
1415 def _get_constituent_vnfd_msg(self, vnf_index):
1416 for const_vnfd in self.nsd_msg.constituent_vnfd:
1417 if const_vnfd.member_vnf_index == vnf_index:
1418 return const_vnfd
1419
1420 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1421
1422 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1423 """ Record an event """
1424 self._op_status.record_event(evt, evt_desc, evt_details)
1425 if state is not None:
1426 self.set_state(state)
1427
1428 def scaling_trigger_str(self, trigger):
1429 SCALING_TRIGGER_STRS = {
1430 NsdYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1431 NsdYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1432 NsdYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1433 NsdYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1434 }
1435 try:
1436 return SCALING_TRIGGER_STRS[trigger]
1437 except Exception as e:
1438 self._log.error("Scaling trigger mapping error for {} : {}".
1439 format(trigger, e))
1440 self._log.exception(e)
1441 return "Unknown trigger"
1442
1443 @asyncio.coroutine
1444 def instantiate_vls(self):
1445 """
1446 This function instantiates VLs for every VL in this Network Service
1447 """
1448 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1449 self.id)
1450 for vlr in self._vlrs:
1451 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1452 vlr.state = VlRecordState.ACTIVE
1453
1454 @asyncio.coroutine
1455 def create(self, config_xact):
1456 """ Create this network service"""
1457 # Create virtual links for all the external vnf
1458 # connection points in this NS
1459 yield from self.create_vls()
1460
1461 # Create VNFs in this network service
1462 yield from self.create_vnfs(config_xact)
1463
1464 # Create VNFFG for network service
1465 self.create_vnffgs()
1466
1467 # Create Scaling Groups for each scaling group in NSD
1468 self.create_scaling_groups()
1469
1470 # Create Parameter Pools
1471 self.create_param_pools()
1472
1473 @asyncio.coroutine
1474 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1475 """ Apply config based on script for scale group """
1476
1477 @asyncio.coroutine
1478 def add_vnfrs_data(vnfrs_list):
1479 """ Add as a dict each of the VNFRs data """
1480 vnfrs_data = []
1481 for vnfr in vnfrs_list:
1482 self._log.debug("Add VNFR {} data".format(vnfr))
1483 vnfr_data = dict()
1484 vnfr_data['name'] = vnfr.name
1485 if trigger in [NsdYang.ScalingTrigger.PRE_SCALE_IN, NsdYang.ScalingTrigger.POST_SCALE_OUT]:
1486 # Get VNF management and other IPs, etc
1487 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1488 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1489 try:
1490 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1491 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1492 except Exception as e:
1493 self._log.error("Unable to get management IP for vnfr {}:{}".
1494 format(vnfr.name, e))
1495
1496 try:
1497 vnfr_data['connection_points'] = []
1498 for cp in opdata.connection_point:
1499 con_pt = dict()
1500 con_pt['name'] = cp.name
1501 con_pt['ip_address'] = cp.ip_address
1502 vnfr_data['connection_points'].append(con_pt)
1503 except Exception as e:
1504 self._log.error("Exception getting connections points for VNFR {}: {}".
1505 format(vnfr.name, e))
1506
1507 vnfrs_data.append(vnfr_data)
1508 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1509
1510 return vnfrs_data
1511
1512 def add_nsr_data(nsr):
1513 nsr_data = dict()
1514 nsr_data['name'] = nsr.name
1515 return nsr_data
1516
1517 if script is None or len(script) == 0:
1518 self._log.error("Script not provided for scale group config: {}".format(group.name))
1519 return False
1520
1521 if script[0] == '/':
1522 path = script
1523 else:
1524 path = os.path.join(os.environ['RIFT_INSTALL'], "usr/bin", script)
1525 if not os.path.exists(path):
1526 self._log.error("Config faled for scale group {}: Script does not exist at {}".
1527 format(group.name, path))
1528 return False
1529
1530 # Build a YAML file with all parameters for the script to execute
1531 # The data consists of 5 sections
1532 # 1. Trigger
1533 # 2. Scale group config
1534 # 3. VNFRs in the scale group
1535 # 4. VNFRs outside scale group
1536 # 5. NSR data
1537 data = dict()
1538 data['trigger'] = group.trigger_map(trigger)
1539 data['config'] = group.group_msg.as_dict()
1540
1541 if vnfrs:
1542 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1543 else:
1544 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1545
1546 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1547 data["nsr"] = add_nsr_data(self)
1548
1549 tmp_file = None
1550 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1551 tmp_file.write(yaml.dump(data, default_flow_style=True)
1552 .encode("UTF-8"))
1553
1554 self._log.debug("Creating a temp file: {} with input data: {}".
1555 format(tmp_file.name, data))
1556
1557 cmd = "{} {}".format(path, tmp_file.name)
1558 self._log.debug("Running the CMD: {}".format(cmd))
1559 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1560 rc = yield from proc.wait()
1561 if rc:
1562 self._log.error("The script {} for scale group {} config returned: {}".
1563 format(script, group.name, rc))
1564 return False
1565
1566 # Success
1567 return True
1568
1569
1570 @asyncio.coroutine
1571 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1572 """ Apply the config for the scaling group based on trigger """
1573 if group is None or scale_instance is None:
1574 return False
1575
1576 @asyncio.coroutine
1577 def update_config_status(success=True, err_msg=None):
1578 self._log.debug("Update %s config status to %r : %s",
1579 scale_instance, success, err_msg)
1580 if (scale_instance.config_status == "failed"):
1581 # Do not update the config status if it is already in failed state
1582 return
1583
1584 if scale_instance.config_status == "configured":
1585 # Update only to failed state an already configured scale instance
1586 if not success:
1587 scale_instance.config_status = "failed"
1588 scale_instance.config_err_msg = err_msg
1589 yield from self.update_state()
1590 else:
1591 # We are in configuring state
1592 # Only after post scale out mark instance as configured
1593 if trigger == NsdYang.ScalingTrigger.POST_SCALE_OUT:
1594 if success:
1595 scale_instance.config_status = "configured"
1596 else:
1597 scale_instance.config_status = "failed"
1598 scale_instance.config_err_msg = err_msg
1599 yield from self.update_state()
1600
1601 config = group.trigger_config(trigger)
1602 if config is None:
1603 return True
1604
1605 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1606 if config.has_field("ns_config_primitive_name_ref"):
1607 config_name = config.ns_config_primitive_name_ref
1608 nsd_msg = self.nsd_msg
1609 config_primitive = None
1610 for ns_cfg_prim in nsd_msg.service_primitive:
1611 if ns_cfg_prim.name == config_name:
1612 config_primitive = ns_cfg_prim
1613 break
1614
1615 if config_primitive is None:
1616 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1617
1618 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1619 if config_primitive.has_field("user_defined_script"):
1620 rc = yield from self.apply_scale_group_config_script(config_primitive.user_defined_script,
1621 group, scale_instance, trigger, vnfrs)
1622 err_msg = None
1623 if not rc:
1624 err_msg = "Failed config for trigger {} using config script '{}'". \
1625 format(self.scaling_trigger_str(trigger),
1626 config_primitive.user_defined_script)
1627 yield from update_config_status(success=rc, err_msg=err_msg)
1628 return rc
1629 else:
1630 err_msg = "Failed config for trigger {} as config script is not specified". \
1631 format(self.scaling_trigger_str(trigger))
1632 yield from update_config_status(success=False, err_msg=err_msg)
1633 raise NotImplementedError("Only script based config support for scale group for now: {}".
1634 format(group.name))
1635 else:
1636 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1637 format(self.scaling_trigger_str(trigger))
1638 yield from update_config_status(success=False, err_msg=err_msg)
1639 self._log.error("Config primitive not specified for config action in scale group %s" %
1640 (group.name))
1641 return False
1642
1643 def create_scaling_groups(self):
1644 """ This function creates a NSScalingGroup for every scaling
1645 group defined in he NSD"""
1646
1647 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1648 self._log.debug("Found scaling_group %s in nsr id %s",
1649 scaling_group_msg.name, self.id)
1650
1651 group_record = scale_group.ScalingGroup(
1652 self._log,
1653 scaling_group_msg
1654 )
1655
1656 self._scaling_groups[group_record.name] = group_record
1657
1658 @asyncio.coroutine
1659 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1660 group = self._scaling_groups[group_name]
1661 scale_instance = group.create_instance(index, is_default)
1662
1663 @asyncio.coroutine
1664 def create_vnfs():
1665 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1666 len(self.nsd_msg.constituent_vnfd), self.id, self)
1667
1668 vnfrs = []
1669 for vnf_index, count in group.vnf_index_count_map.items():
1670 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1671 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1672
1673 cloud_account_name, om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd_msg.member_vnf_index)
1674 if cloud_account_name is None:
1675 cloud_account_name = self.cloud_account_name
1676 for _ in range(count):
1677 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index)
1678 scale_instance.add_vnfr(vnfr)
1679 vnfrs.append(vnfr)
1680
1681 return vnfrs
1682
1683 @asyncio.coroutine
1684 def instantiate_instance():
1685 self._log.debug("Creating %s VNFRS", scale_instance)
1686 vnfrs = yield from create_vnfs()
1687 yield from self.publish()
1688
1689 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1690 scale_instance.operational_status = "vnf_init_phase"
1691 yield from self.update_state()
1692
1693 try:
1694 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_OUT,
1695 group, scale_instance, vnfrs)
1696 if not rc:
1697 self._log.error("Pre scale out config for scale group {} ({}) failed".
1698 format(group.name, index))
1699 scale_instance.operational_status = "failed"
1700 else:
1701 yield from self.instantiate_vnfs(vnfrs)
1702
1703 except Exception as e:
1704 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1705 format(group.name, e))
1706 self._log.exception(e)
1707 scale_instance.operational_status = "failed"
1708
1709 yield from self.update_state()
1710
1711 yield from instantiate_instance()
1712
1713 @asyncio.coroutine
1714 def delete_scale_group_instance(self, group_name, index):
1715 group = self._scaling_groups[group_name]
1716 scale_instance = group.get_instance(index)
1717 if scale_instance.is_default:
1718 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1719
1720 scale_instance.operational_status = "terminate"
1721 yield from self.update_state()
1722
1723 @asyncio.coroutine
1724 def terminate_instance():
1725 self._log.debug("Terminating %s VNFRS" % scale_instance)
1726 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_IN,
1727 group, scale_instance)
1728 if not rc:
1729 self._log.error("Pre scale in config for scale group {} ({}) failed".
1730 format(group.name, index))
1731
1732 # Going ahead with terminate, even if there is an error in pre-scale-in config
1733 # as this could be result of scale out failure and we need to cleanup this group
1734 yield from self.terminate_vnfrs(scale_instance.vnfrs)
1735 group.delete_instance(index)
1736
1737 scale_instance.operational_status = "vnf_terminate_phase"
1738 yield from self.update_state()
1739
1740 yield from terminate_instance()
1741
1742 @asyncio.coroutine
1743 def _update_scale_group_instances_status(self):
1744 @asyncio.coroutine
1745 def post_scale_out_task(group, instance):
1746 # Apply post scale out config once all VNFRs are active
1747 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_OUT,
1748 group, instance)
1749 instance.operational_status = "running"
1750 if rc:
1751 self._log.debug("Scale out for group {} and instance {} succeeded".
1752 format(group.name, instance.instance_id))
1753 else:
1754 self._log.error("Post scale out config for scale group {} ({}) failed".
1755 format(group.name, instance.instance_id))
1756
1757 yield from self.update_state()
1758
1759 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1760 for group, instances in group_instances.items():
1761 self._log.debug("Updating %s instance status", group)
1762 for instance in instances:
1763 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
1764 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
1765 if instance.operational_status == "vnf_init_phase":
1766 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
1767 instance.operational_status = "running"
1768
1769 # Create a task for post scale out to allow us to sleep before attempting
1770 # to configure newly created VM's
1771 self._loop.create_task(post_scale_out_task(group, instance))
1772
1773 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
1774 self._log.debug("Scale out for group {} and instance {} failed".
1775 format(group.name, instance.instance_id))
1776 instance.operational_status = "failed"
1777
1778 elif instance.operational_status == "vnf_terminate_phase":
1779 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
1780 instance.operational_status = "terminated"
1781 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN,
1782 group, instance)
1783 if rc:
1784 self._log.debug("Scale in for group {} and instance {} succeeded".
1785 format(group.name, instance.instance_id))
1786 else:
1787 self._log.error("Post scale in config for scale group {} ({}) failed".
1788 format(group.name, instance.instance_id))
1789
1790 def create_vnffgs(self):
1791 """ This function creates VNFFGs for every VNFFG in the NSD
1792 associated with this NSR"""
1793
1794 for vnffgd in self.nsd_msg.vnffgd:
1795 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
1796 vnffgr = VnffgRecord(self._dts,
1797 self._log,
1798 self._loop,
1799 self._nsm._vnffgmgr,
1800 self,
1801 self.name,
1802 vnffgd,
1803 self._sdn_account_name
1804 )
1805 self._vnffgrs[vnffgr.id] = vnffgr
1806
1807 def resolve_vld_ip_profile(self, nsd_msg, vld):
1808 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1809 if not vld.has_field('ip_profile_ref'):
1810 return None
1811 profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1812 return profile[0] if profile else None
1813
1814 @asyncio.coroutine
1815 def _create_vls(self, vld, cloud_account,om_datacenter):
1816 """Create a VLR in the cloud account specified using the given VLD
1817
1818 Args:
1819 vld : VLD yang obj
1820 cloud_account : Cloud account name
1821
1822 Returns:
1823 VirtualLinkRecord
1824 """
1825 vlr = yield from VirtualLinkRecord.create_record(
1826 self._dts,
1827 self._log,
1828 self._loop,
1829 self.name,
1830 vld,
1831 cloud_account,
1832 om_datacenter,
1833 self.resolve_vld_ip_profile(self.nsd_msg, vld),
1834 self.id,
1835 restart_mode=self.restart_mode)
1836
1837 return vlr
1838
1839 def _extract_cloud_accounts_for_vl(self, vld):
1840 """
1841 Extracts the list of cloud accounts from the NS Config obj
1842
1843 Rules:
1844 1. Cloud accounts based connection point (vnf_cloud_account_map)
1845 Args:
1846 vld : VLD yang object
1847
1848 Returns:
1849 TYPE: Description
1850 """
1851 cloud_account_list = []
1852
1853 if self._nsr_cfg_msg.vnf_cloud_account_map:
1854 # Handle case where cloud_account is None
1855 vnf_cloud_map = {}
1856 for vnf in self._nsr_cfg_msg.vnf_cloud_account_map:
1857 if vnf.cloud_account is not None or vnf.om_datacenter is not None:
1858 vnf_cloud_map[vnf.member_vnf_index_ref] = (vnf.cloud_account,vnf.om_datacenter)
1859
1860 for vnfc in vld.vnfd_connection_point_ref:
1861 cloud_account = vnf_cloud_map.get(
1862 vnfc.member_vnf_index_ref,
1863 (self.cloud_account_name,self.om_datacenter_name))
1864
1865 cloud_account_list.append(cloud_account)
1866
1867 if self._nsr_cfg_msg.vl_cloud_account_map:
1868 for vld_map in self._nsr_cfg_msg.vl_cloud_account_map:
1869 if vld_map.vld_id_ref == vld.id:
1870 for cloud_account in vld_map.cloud_accounts:
1871 cloud_account_list.extend((cloud_account,None))
1872 for om_datacenter in vld_map.om_datacenters:
1873 cloud_account_list.extend((None,om_datacenter))
1874
1875 # If no config has been provided then fall-back to the default
1876 # account
1877 if not cloud_account_list:
1878 cloud_account_list = [(self.cloud_account_name,self.om_datacenter_name)]
1879
1880 self._log.debug("VL {} cloud accounts: {}".
1881 format(vld.name, cloud_account_list))
1882 return set(cloud_account_list)
1883
1884 @asyncio.coroutine
1885 def create_vls(self):
1886 """ This function creates VLs for every VLD in the NSD
1887 associated with this NSR"""
1888 for vld in self.nsd_msg.vld:
1889 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
1890 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1891 for cloud_account,om_datacenter in cloud_account_list:
1892 vlr = yield from self._create_vls(vld, cloud_account,om_datacenter)
1893 self._vlrs.append(vlr)
1894
1895
1896 @asyncio.coroutine
1897 def create_vl_instance(self, vld):
1898 self._log.debug("Create VL for {}: {}".format(self.id, vld.as_dict()))
1899 # Check if the VL is already present
1900 vlr = None
1901 for vl in self._vlrs:
1902 if vl.vld_msg.id == vld.id:
1903 self._log.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1904 vld.id, self.id, vl.id, vl.state)
1905 vlr = vl
1906 if vlr.state != VlRecordState.TERMINATED:
1907 err_msg = "VLR for VL %s in NSR %s already instantiated", \
1908 vld, self.id
1909 self._log.error(err_msg)
1910 raise NsrVlUpdateError(err_msg)
1911 break
1912
1913 if vlr is None:
1914 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1915 for account,om_datacenter in cloud_account_list:
1916 vlr = yield from self._create_vls(vld, account,om_datacenter)
1917 self._vlrs.append(vlr)
1918
1919 vlr.state = VlRecordState.INSTANTIATION_PENDING
1920 yield from self.update_state()
1921
1922 try:
1923 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1924 vlr.state = VlRecordState.ACTIVE
1925
1926 except Exception as e:
1927 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
1928 format(self.id, vld.id, e)
1929 self._log.error(err_msg)
1930 self._log.exception(e)
1931 vlr.state = VlRecordState.FAILED
1932
1933 yield from self.update_state()
1934
1935 @asyncio.coroutine
1936 def delete_vl_instance(self, vld):
1937 for vlr in self._vlrs:
1938 if vlr.vld_msg.id == vld.id:
1939 self._log.debug("Found VLR %s for VLD %s in NSR %s",
1940 vlr.id, vld.id, self.id)
1941 vlr.state = VlRecordState.TERMINATE_PENDING
1942 yield from self.update_state()
1943
1944 try:
1945 yield from self.nsm_plugin.terminate_vl(vlr)
1946 vlr.state = VlRecordState.TERMINATED
1947 self._vlrs.remove(vlr)
1948
1949 except Exception as e:
1950 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
1951 format(self.id, vld.id, e)
1952 self._log.error(err_msg)
1953 self._log.exception(e)
1954 vlr.state = VlRecordState.FAILED
1955
1956 yield from self.update_state()
1957 break
1958
1959 @asyncio.coroutine
1960 def create_vnfs(self, config_xact):
1961 """
1962 This function creates VNFs for every VNF in the NSD
1963 associated with this NSR
1964 """
1965 self._log.debug("Creating %u VNFs associated with this NS id %s",
1966 len(self.nsd_msg.constituent_vnfd), self.id)
1967
1968 for const_vnfd in self.nsd_msg.constituent_vnfd:
1969 if not const_vnfd.start_by_default:
1970 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1971 const_vnfd.member_vnf_index)
1972 continue
1973
1974 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
1975 cloud_account_name,om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd.member_vnf_index)
1976 if cloud_account_name is None:
1977 cloud_account_name = self.cloud_account_name
1978 yield from self.create_vnf_record(vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name)
1979
1980
1981 def get_placement_groups(self, vnfd_msg, const_vnfd):
1982 placement_groups = []
1983 for group in self.nsd_msg.placement_groups:
1984 for member_vnfd in group.member_vnfd:
1985 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
1986 (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index):
1987 group_info = self.resolve_placement_group_cloud_construct(group)
1988 if group_info is None:
1989 self._log.error("Could not resolve cloud-construct for placement group: %s", group.name)
1990 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1991 else:
1992 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
1993 str(group_info),
1994 vnfd_msg.name,
1995 const_vnfd.member_vnf_index)
1996 placement_groups.append(group_info)
1997 return placement_groups
1998
1999 @asyncio.coroutine
2000 def create_vnf_record(self, vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name, group_name=None, group_instance_id=None):
2001 # Fetch the VNFD associated with this VNF
2002 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
2003 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name)
2004 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2005 vnfd_msg.name,
2006 const_vnfd.member_vnf_index,
2007 [ group.name for group in placement_groups])
2008 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
2009 self._log,
2010 self._loop,
2011 vnfd_msg,
2012 const_vnfd,
2013 self.nsd_id,
2014 self.name,
2015 cloud_account_name,
2016 om_datacenter_name,
2017 self.id,
2018 group_name,
2019 group_instance_id,
2020 placement_groups,
2021 restart_mode=self.restart_mode,
2022 )
2023 if vnfr.id in self._vnfrs:
2024 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
2025 raise NetworkServiceRecordError(err)
2026
2027 self._vnfrs[vnfr.id] = vnfr
2028 self._nsm.vnfrs[vnfr.id] = vnfr
2029
2030 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
2031
2032 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2033 vnfr.name,
2034 vnfr.id)
2035
2036 return vnfr
2037
2038 def create_param_pools(self):
2039 for param_pool in self.nsd_msg.parameter_pool:
2040 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2041
2042 start_value = param_pool.range.start_value
2043 end_value = param_pool.range.end_value
2044 if end_value < start_value:
2045 raise NetworkServiceRecordError(
2046 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2047 start_value, end_value
2048 )
2049 )
2050
2051 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2052 self._log,
2053 param_pool.name,
2054 range(start_value, end_value)
2055 )
2056
2057 @asyncio.coroutine
2058 def fetch_vnfr(self, vnfr_path):
2059 """ Fetch VNFR record """
2060 vnfr = None
2061 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2062 vnfr_path, self.id)
2063 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2064
2065 for ent in res_iter:
2066 res = yield from ent
2067 vnfr = res.result
2068
2069 return vnfr
2070
2071 @asyncio.coroutine
2072 def instantiate_vnfs(self, vnfrs):
2073 """
2074 This function instantiates VNFs for every VNF in this Network Service
2075 """
2076 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2077 for vnf in vnfrs:
2078 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2079 yield from self.nsm_plugin.instantiate_vnf(self, vnf)
2080
2081 @asyncio.coroutine
2082 def instantiate_vnffgs(self):
2083 """
2084 This function instantiates VNFFGs for every VNFFG in this Network Service
2085 """
2086 self._log.debug("Instantiating %u VNFFGs in NS %s",
2087 len(self.nsd_msg.vnffgd), self.id)
2088 for _, vnfr in self.vnfrs.items():
2089 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2090 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2091 yield from asyncio.sleep(2, loop=self._loop)
2092 if vnfr.state == VnfRecordState.ACTIVE:
2093 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2094 continue
2095 else:
2096 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2097 self._vnffgr_state = VnffgRecordState.FAILED
2098 return
2099
2100 self._log.info("Waiting for 90 seconds for VMs to come up")
2101 yield from asyncio.sleep(90, loop=self._loop)
2102 self._log.info("Starting VNFFG orchestration")
2103 for vnffg in self._vnffgrs.values():
2104 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2105 yield from vnffg.instantiate()
2106
2107 @asyncio.coroutine
2108 def instantiate_scaling_instances(self, config_xact):
2109 """ Instantiate any default scaling instances in this Network Service """
2110 for group in self._scaling_groups.values():
2111 for i in range(group.min_instance_count):
2112 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2113 yield from self.create_scale_group_instance(
2114 group.name, i, config_xact, is_default=True
2115 )
2116
2117 for group_msg in self._nsr_cfg_msg.scaling_group:
2118 if group_msg.scaling_group_name_ref != group.name:
2119 continue
2120
2121 for instance in group_msg.instance:
2122 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2123 yield from self.create_scale_group_instance(
2124 group.name, instance.id, config_xact, is_default=False
2125 )
2126
2127 def has_scaling_instances(self):
2128 """ Return boolean indicating if the network service has default scaling groups """
2129 for group in self._scaling_groups.values():
2130 if group.min_instance_count > 0:
2131 return True
2132
2133 for group_msg in self._nsr_cfg_msg.scaling_group:
2134 if len(group_msg.instance) > 0:
2135 return True
2136
2137 return False
2138
2139 @asyncio.coroutine
2140 def publish(self):
2141 """ This function publishes this NSR """
2142 self._nsr_msg = self.create_msg()
2143
2144 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2145 self.nsr_xpath,
2146 self._nsr_msg)
2147
2148 if self._debug_running:
2149 self._log.debug("Publishing NSR in RUNNING state!")
2150 #raise()
2151
2152 with self._dts.transaction() as xact:
2153 yield from self._nsm.nsr_handler.update(xact, self.nsr_xpath, self._nsr_msg)
2154 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2155 self._debug_running = True
2156
2157 @asyncio.coroutine
2158 def unpublish(self, xact):
2159 """ Unpublish this NSR object """
2160 self._log.debug("Unpublishing Network service id %s", self.id)
2161 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2162
2163 @property
2164 def nsr_xpath(self):
2165 """ Returns the xpath associated with this NSR """
2166 return(
2167 "D,/nsr:ns-instance-opdata" +
2168 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2169 ).format(self.id)
2170
2171 @staticmethod
2172 def xpath_from_nsr(nsr):
2173 """ Returns the xpath associated with this NSR op data"""
2174 return (NetworkServiceRecord.XPATH +
2175 "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)
2176
2177 @property
2178 def nsd_xpath(self):
2179 """ Return NSD config xpath."""
2180 return(
2181 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2182 ).format(self.nsd_id)
2183
2184 @asyncio.coroutine
2185 def instantiate(self, config_xact):
2186 """"Instantiates a NetworkServiceRecord.
2187
2188 This function instantiates a Network service
2189 which involves the following steps,
2190
2191 * Instantiate every VL in NSD by sending create VLR request to DTS.
2192 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2193 * Publish the NSR details to DTS
2194
2195 Arguments:
2196 nsr: The NSR configuration request containing nsr-id and nsd
2197 config_xact: The configuration transaction which initiated the instatiation
2198
2199 Raises:
2200 NetworkServiceRecordError if the NSR creation fails
2201
2202 Returns:
2203 No return value
2204 """
2205
2206 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2207
2208 # Move the state to INIITALIZING
2209 self.set_state(NetworkServiceRecordState.INIT)
2210
2211 event_descr = "Instantiation Request Received NSR Id:%s" % self.id
2212 self.record_event("instantiating", event_descr)
2213
2214 # Find the NSD
2215 self._nsd = self._nsr_cfg_msg.nsd
2216
2217 try:
2218 # Update ref count if nsd present in catalog
2219 self._nsm.get_nsd_ref(self.nsd_id)
2220
2221 except NetworkServiceDescriptorError:
2222 # This could be an NSD not in the nsd-catalog
2223 pass
2224
2225 # Merge any config and initial config primitive values
2226 self.config_store.merge_nsd_config(self.nsd_msg)
2227 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2228
2229 event_descr = "Fetched NSD with descriptor id %s" % self.nsd_id
2230 self.record_event("nsd-fetched", event_descr)
2231
2232 if self._nsd is None:
2233 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2234 self._log.debug(msg, self.nsd_id, self.id)
2235 raise NetworkServiceRecordError(self)
2236
2237 self._log.debug("Got nsd result %s", self._nsd)
2238
2239 # Substitute any input parameters
2240 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2241
2242 # Create the record
2243 yield from self.create(config_xact)
2244
2245 # Publish the NSR to DTS
2246 yield from self.publish()
2247
2248 @asyncio.coroutine
2249 def do_instantiate():
2250 """
2251 Instantiate network service
2252 """
2253 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2254 self.id, self.nsd_id)
2255
2256 # instantiate the VLs
2257 event_descr = ("Instantiating %s external VLs for NSR id %s" %
2258 (len(self.nsd_msg.vld), self.id))
2259 self.record_event("begin-external-vls-instantiation", event_descr)
2260
2261 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2262
2263 yield from self.instantiate_vls()
2264
2265 # Publish the NSR to DTS
2266 yield from self.publish()
2267
2268 event_descr = ("Finished instantiating %s external VLs for NSR id %s" %
2269 (len(self.nsd_msg.vld), self.id))
2270 self.record_event("end-external-vls-instantiation", event_descr)
2271
2272 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2273
2274 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2275 self.id, self.nsd_id)
2276
2277 # instantiate the VNFs
2278 event_descr = ("Instantiating %s VNFS for NSR id %s" %
2279 (len(self.nsd_msg.constituent_vnfd), self.id))
2280
2281 self.record_event("begin-vnf-instantiation", event_descr)
2282
2283 yield from self.instantiate_vnfs(self._vnfrs.values())
2284
2285 self._log.debug(" Finished instantiating %d VNFs for NSR id %s",
2286 len(self.nsd_msg.constituent_vnfd), self.id)
2287
2288 event_descr = ("Finished instantiating %s VNFs for NSR id %s" %
2289 (len(self.nsd_msg.constituent_vnfd), self.id))
2290 self.record_event("end-vnf-instantiation", event_descr)
2291
2292 if len(self.vnffgrs) > 0:
2293 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2294 event_descr = ("Instantiating %s VNFFGS for NSR id %s" %
2295 (len(self.nsd_msg.vnffgd), self.id))
2296
2297 self.record_event("begin-vnffg-instantiation", event_descr)
2298
2299 yield from self.instantiate_vnffgs()
2300
2301 event_descr = ("Finished instantiating %s VNFFGDs for NSR id %s" %
2302 (len(self.nsd_msg.vnffgd), self.id))
2303 self.record_event("end-vnffg-instantiation", event_descr)
2304
2305 if self.has_scaling_instances():
2306 event_descr = ("Instantiating %s Scaling Groups for NSR id %s" %
2307 (len(self._scaling_groups), self.id))
2308
2309 self.record_event("begin-scaling-group-instantiation", event_descr)
2310 yield from self.instantiate_scaling_instances(config_xact)
2311 self.record_event("end-scaling-group-instantiation", event_descr)
2312
2313 # Give the plugin a chance to deploy the network service now that all
2314 # virtual links and vnfs are instantiated
2315 yield from self.nsm_plugin.deploy(self._nsr_msg)
2316
2317 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2318 self.id, self.nsd_id)
2319
2320 # Publish the NSR to DTS
2321 yield from self.publish()
2322
2323 self._log.debug("Published NSR...... nsr[%s], nsd[%s]",
2324 self.id, self.nsd_id)
2325
2326 def on_instantiate_done(fut):
2327 # If the do_instantiate fails, then publish NSR with failed result
2328 if fut.exception() is not None:
2329 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(fut.exception()))
2330 self._loop.create_task(self.instantiation_failed(failed_reason=str(fut.exception())))
2331
2332 instantiate_task = self._loop.create_task(do_instantiate())
2333 instantiate_task.add_done_callback(on_instantiate_done)
2334
2335 @asyncio.coroutine
2336 def set_config_status(self, status, status_details=None):
2337 if self.config_status != status:
2338 self._log.debug("Updating NSR {} status for {} to {}".
2339 format(self.name, self.config_status, status))
2340 self._config_status = status
2341 self._config_status_details = status_details
2342
2343 if self._config_status == NsrYang.ConfigStates.FAILED:
2344 self.record_event("config-failed", "NS configuration failed",
2345 evt_details=self._config_status_details)
2346
2347 yield from self.publish()
2348
2349 @asyncio.coroutine
2350 def is_active(self):
2351 """ This NS is active """
2352 self.set_state(NetworkServiceRecordState.RUNNING)
2353 if self._is_active:
2354 return
2355
2356 # Publish the NSR to DTS
2357 self._log.debug("Network service %s is active ", self.id)
2358 self._is_active = True
2359
2360 event_descr = "NSR in running state for NSR id %s" % self.id
2361 self.record_event("ns-running", event_descr)
2362
2363 yield from self.publish()
2364
2365 @asyncio.coroutine
2366 def instantiation_failed(self, failed_reason=None):
2367 """ The NS instantiation failed"""
2368 self._log.error("Network service id:%s, name:%s instantiation failed",
2369 self.id, self.name)
2370 self.set_state(NetworkServiceRecordState.FAILED)
2371
2372 event_descr = "Instantiation of NS %s failed" % self.id
2373 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2374
2375 # Publish the NSR to DTS
2376 yield from self.publish()
2377
2378 @asyncio.coroutine
2379 def terminate_vnfrs(self, vnfrs):
2380 """ Terminate VNFRS in this network service """
2381 self._log.debug("Terminating VNFs in network service %s", self.id)
2382 for vnfr in vnfrs:
2383 yield from self.nsm_plugin.terminate_vnf(vnfr)
2384
2385 @asyncio.coroutine
2386 def terminate(self):
2387 """ Terminate a NetworkServiceRecord."""
2388 def terminate_vnffgrs():
2389 """ Terminate VNFFGRS in this network service """
2390 self._log.debug("Terminating VNFFGRs in network service %s", self.id)
2391 for vnffgr in self.vnffgrs.values():
2392 yield from vnffgr.terminate()
2393
2394 def terminate_vlrs():
2395 """ Terminate VLRs in this netork service """
2396 self._log.debug("Terminating VLs in network service %s", self.id)
2397 for vlr in self.vlrs:
2398 yield from self.nsm_plugin.terminate_vl(vlr)
2399 vlr.state = VlRecordState.TERMINATED
2400
2401 self._log.debug("Terminating network service id %s", self.id)
2402
2403 # Move the state to TERMINATE
2404 self.set_state(NetworkServiceRecordState.TERMINATE)
2405 event_descr = "Terminate being processed for NS Id:%s" % self.id
2406 self.record_event("terminate", event_descr)
2407
2408 # Move the state to VNF_TERMINATE_PHASE
2409 self._log.debug("Terminating VNFFGs in NS ID: %s", self.id)
2410 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2411 event_descr = "Terminating VNFFGS in NS Id:%s" % self.id
2412 self.record_event("terminating-vnffgss", event_descr)
2413 yield from terminate_vnffgrs()
2414
2415 # Move the state to VNF_TERMINATE_PHASE
2416 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2417 event_descr = "Terminating VNFS in NS Id:%s" % self.id
2418 self.record_event("terminating-vnfs", event_descr)
2419 yield from self.terminate_vnfrs(self.vnfrs.values())
2420
2421 # Move the state to VL_TERMINATE_PHASE
2422 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2423 event_descr = "Terminating VLs in NS Id:%s" % self.id
2424 self.record_event("terminating-vls", event_descr)
2425 yield from terminate_vlrs()
2426
2427 yield from self.nsm_plugin.terminate_ns(self)
2428
2429 # Move the state to TERMINATED
2430 self.set_state(NetworkServiceRecordState.TERMINATED)
2431 event_descr = "Terminated NS Id:%s" % self.id
2432 self.record_event("terminated", event_descr)
2433
2434 def enable(self):
2435 """"Enable a NetworkServiceRecord."""
2436 pass
2437
2438 def disable(self):
2439 """"Disable a NetworkServiceRecord."""
2440 pass
2441
2442 def map_config_status(self):
2443 self._log.debug("Config status for ns {} is {}".
2444 format(self.name, self._config_status))
2445 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2446 return 'configuring'
2447 if self._config_status == NsrYang.ConfigStates.FAILED:
2448 return 'failed'
2449 return 'configured'
2450
2451 def vl_phase_completed(self):
2452 """ Are VLs created in this NS?"""
2453 return self._vl_phase_completed
2454
2455 def vnf_phase_completed(self):
2456 """ Are VLs created in this NS?"""
2457 return self._vnf_phase_completed
2458
2459 def create_msg(self):
2460 """ The network serice record as a message """
2461 nsr_dict = {"ns_instance_config_ref": self.id}
2462 nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2463 #nsr.cloud_account = self.cloud_account_name
2464 nsr.sdn_account = self._sdn_account_name
2465 nsr.name_ref = self.name
2466 nsr.nsd_ref = self.nsd_id
2467 nsr.nsd_name_ref = self.nsd_msg.name
2468 nsr.operational_events = self._op_status.msg
2469 nsr.operational_status = self._op_status.yang_str()
2470 nsr.config_status = self.map_config_status()
2471 nsr.config_status_details = self._config_status_details
2472 nsr.create_time = self._create_time
2473
2474 for cfg_prim in self.nsd_msg.service_primitive:
2475 cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2476 cfg_prim.as_dict())
2477 nsr.service_primitive.append(cfg_prim)
2478
2479 for init_cfg in self.nsd_msg.initial_config_primitive:
2480 prim = NsrYang.NsrInitialConfigPrimitive.from_dict(
2481 init_cfg.as_dict())
2482 nsr.initial_config_primitive.append(prim)
2483
2484 if self.vl_phase_completed():
2485 for vlr in self.vlrs:
2486 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2487
2488 if self.vnf_phase_completed():
2489 for vnfr_id in self.vnfrs:
2490 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2491 for vnffgr in self.vnffgrs.values():
2492 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2493 for scaling_group in self._scaling_groups.values():
2494 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2495
2496 return nsr
2497
2498 def all_vnfs_active(self):
2499 """ Are all VNFS in this NS active? """
2500 for _, vnfr in self.vnfrs.items():
2501 if vnfr.active is not True:
2502 return False
2503 return True
2504
2505 @asyncio.coroutine
2506 def update_state(self):
2507 """ Re-evaluate this NS's state """
2508 curr_state = self._op_status.state
2509
2510 if curr_state == NetworkServiceRecordState.TERMINATED:
2511 self._log.debug("NS (%s) in terminated state, not updating state", self.id)
2512 return
2513
2514 new_state = NetworkServiceRecordState.RUNNING
2515 self._log.info("Received update_state for nsr: %s, curr-state: %s",
2516 self.id, curr_state)
2517
2518 # Check all the VNFRs are present
2519 for _, vnfr in self.vnfrs.items():
2520 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2521 pass
2522 elif vnfr.state == VnfRecordState.FAILED:
2523 if vnfr._prev_state != vnfr.state:
2524 event_descr = "Instantiation of VNF %s failed" % vnfr.id
2525 event_error_details = vnfr.state_failed_reason
2526 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2527 vnfr.set_state(VnfRecordState.FAILED)
2528 else:
2529 self._log.info("VNF state did not change, curr=%s, prev=%s",
2530 vnfr.state, vnfr._prev_state)
2531 new_state = NetworkServiceRecordState.FAILED
2532 break
2533 else:
2534 self._log.info("VNF %s in NSR %s is still not active; current state is: %s",
2535 vnfr.id, self.id, vnfr.state)
2536 new_state = curr_state
2537
2538 # If new state is RUNNING; check all VLs
2539 if new_state == NetworkServiceRecordState.RUNNING:
2540 for vl in self.vlrs:
2541
2542 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2543 pass
2544 elif vl.state == VlRecordState.FAILED:
2545 if vl.prev_state != vl.state:
2546 event_descr = "Instantiation of VL %s failed" % vl.id
2547 event_error_details = vl.state_failed_reason
2548 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2549 vl.prev_state = vl.state
2550 else:
2551 self._log.debug("VL %s already in failed state")
2552 else:
2553 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2554 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2555 break
2556
2557 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2558 new_state = NetworkServiceRecordState.VL_TERMINATE
2559 break
2560
2561 # If new state is RUNNING; check VNFFGRs are also active
2562 if new_state == NetworkServiceRecordState.RUNNING:
2563 for _, vnffgr in self.vnffgrs.items():
2564 self._log.info("Checking vnffgr state for nsr %s is: %s",
2565 self.id, vnffgr.state)
2566 if vnffgr.state == VnffgRecordState.ACTIVE:
2567 pass
2568 elif vnffgr.state == VnffgRecordState.FAILED:
2569 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2570 self.record_event("vnffg-failed", event_descr)
2571 new_state = NetworkServiceRecordState.FAILED
2572 break
2573 else:
2574 self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2575 vnffgr.id, self.id, vnffgr.state)
2576 new_state = curr_state
2577
2578 # Update all the scaling group instance operational status to
2579 # reflect the state of all VNFR within that instance
2580 yield from self._update_scale_group_instances_status()
2581
2582 for _, group in self._scaling_groups.items():
2583 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
2584 new_state = NetworkServiceRecordState.SCALING_OUT
2585 break
2586 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
2587 new_state = NetworkServiceRecordState.SCALING_IN
2588 break
2589
2590 if new_state != curr_state:
2591 self._log.debug("Changing state of Network service %s from %s to %s",
2592 self.id, curr_state, new_state)
2593 if new_state == NetworkServiceRecordState.RUNNING:
2594 yield from self.is_active()
2595 elif new_state == NetworkServiceRecordState.FAILED:
2596 # If the NS is already active and we entered scaling_in, scaling_out,
2597 # do not mark the NS as failing if scaling operation failed.
2598 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
2599 NetworkServiceRecordState.SCALING_IN] and self._is_active:
2600 new_state = NetworkServiceRecordState.RUNNING
2601 self.set_state(new_state)
2602 else:
2603 yield from self.instantiation_failed()
2604 else:
2605 self.set_state(new_state)
2606
2607 yield from self.publish()
2608
2609
2610 class InputParameterSubstitution(object):
2611 """
2612 This class is responsible for substituting input parameters into an NSD.
2613 """
2614
2615 def __init__(self, log):
2616 """Create an instance of InputParameterSubstitution
2617
2618 Arguments:
2619 log - a logger for this object to use
2620
2621 """
2622 self.log = log
2623
2624 def __call__(self, nsd, nsr_config):
2625 """Substitutes input parameters from the NSR config into the NSD
2626
2627 This call modifies the provided NSD with the input parameters that are
2628 contained in the NSR config.
2629
2630 Arguments:
2631 nsd - a GI NSD object
2632 nsr_config - a GI NSR config object
2633
2634 """
2635 if nsd is None or nsr_config is None:
2636 return
2637
2638 # Create a lookup of the xpath elements that this descriptor allows
2639 # to be modified
2640 optional_input_parameters = set()
2641 for input_parameter in nsd.input_parameter_xpath:
2642 optional_input_parameters.add(input_parameter.xpath)
2643
2644 # Apply the input parameters to the descriptor
2645 if nsr_config.input_parameter:
2646 for param in nsr_config.input_parameter:
2647 if param.xpath not in optional_input_parameters:
2648 msg = "tried to set an invalid input parameter ({})"
2649 self.log.error(msg.format(param.xpath))
2650 continue
2651
2652 self.log.debug(
2653 "input-parameter:{} = {}".format(
2654 param.xpath,
2655 param.value,
2656 )
2657 )
2658
2659 try:
2660 xpath.setxattr(nsd, param.xpath, param.value)
2661
2662 except Exception as e:
2663 self.log.exception(e)
2664
2665
2666 class NetworkServiceDescriptor(object):
2667 """
2668 Network service descriptor class
2669 """
2670
2671 def __init__(self, dts, log, loop, nsd, nsm):
2672 self._dts = dts
2673 self._log = log
2674 self._loop = loop
2675
2676 self._nsd = nsd
2677 self._ref_count = 0
2678
2679 self._nsm = nsm
2680
2681 @property
2682 def id(self):
2683 """ Returns nsd id """
2684 return self._nsd.id
2685
2686 @property
2687 def name(self):
2688 """ Returns name of nsd """
2689 return self._nsd.name
2690
2691 @property
2692 def ref_count(self):
2693 """ Returns reference count"""
2694 return self._ref_count
2695
2696 def in_use(self):
2697 """ Returns whether nsd is in use or not """
2698 return True if self.ref_count > 0 else False
2699
2700 def ref(self):
2701 """ Take a reference on this object """
2702 self._ref_count += 1
2703
2704 def unref(self):
2705 """ Release reference on this object """
2706 if self.ref_count < 1:
2707 msg = ("Unref on a NSD object - nsd id %s, ref_count = %s" %
2708 (self.id, self.ref_count))
2709 self._log.critical(msg)
2710 raise NetworkServiceDescriptorError(msg)
2711 self._ref_count -= 1
2712
2713 @property
2714 def msg(self):
2715 """ Return the message associated with this NetworkServiceDescriptor"""
2716 return self._nsd
2717
2718 @staticmethod
2719 def path_for_id(nsd_id):
2720 """ Return path for the passed nsd_id"""
2721 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id)
2722
2723 def path(self):
2724 """ Return the message associated with this NetworkServiceDescriptor"""
2725 return NetworkServiceDescriptor.path_for_id(self.id)
2726
2727 def update(self, nsd):
2728 """ Update the NSD descriptor """
2729 self._nsd = nsd
2730
2731
2732 class NsdDtsHandler(object):
2733 """ The network service descriptor DTS handler """
2734 XPATH = "C,/nsd:nsd-catalog/nsd:nsd"
2735
2736 def __init__(self, dts, log, loop, nsm):
2737 self._dts = dts
2738 self._log = log
2739 self._loop = loop
2740 self._nsm = nsm
2741
2742 self._regh = None
2743
2744 @property
2745 def regh(self):
2746 """ Return registration handle """
2747 return self._regh
2748
2749 @asyncio.coroutine
2750 def register(self):
2751 """ Register for Nsd create/update/delete/read requests from dts """
2752
2753 def on_apply(dts, acg, xact, action, scratch):
2754 """Apply the configuration"""
2755 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2756 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2757 xact, action)
2758 # Create/Update an NSD record
2759 for cfg in self._regh.get_xact_elements(xact):
2760 # Only interested in those NSD cfgs whose ID was received in prepare callback
2761 if cfg.id in scratch.get('nsds', []) or is_recovery:
2762 self._nsm.update_nsd(cfg)
2763
2764 scratch.pop('nsds', None)
2765
2766 return RwTypes.RwStatus.SUCCESS
2767
2768 @asyncio.coroutine
2769 def delete_nsd_libs(nsd_id):
2770 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2771 try:
2772 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2773 nsd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', nsd_id)
2774
2775 if os.path.exists (nsd_dir):
2776 shutil.rmtree(nsd_dir, ignore_errors=True)
2777 except Exception as e:
2778 self._log.error("Exception in cleaning up NSD libs {}: {}".
2779 format(nsd_id, e))
2780 self._log.excpetion(e)
2781
2782 @asyncio.coroutine
2783 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2784 """ Prepare callback from DTS for NSD config """
2785
2786 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
2787 msg.id, msg)
2788
2789 fref = ProtobufC.FieldReference.alloc()
2790 fref.goto_whole_message(msg.to_pbcm())
2791
2792 if fref.is_field_deleted():
2793 # Delete an NSD record
2794 self._log.debug("Deleting NSD with id %s", msg.id)
2795 if self._nsm.nsd_in_use(msg.id):
2796 self._log.debug("Cannot delete NSD in use - %s", msg.id)
2797 err = "Cannot delete an NSD in use - %s" % msg.id
2798 raise NetworkServiceDescriptorRefCountExists(err)
2799
2800 yield from delete_nsd_libs(msg.id)
2801 self._nsm.delete_nsd(msg.id)
2802 else:
2803 # Add this NSD to scratch to create/update in apply callback
2804 nsds = scratch.setdefault('nsds', [])
2805 nsds.append(msg.id)
2806 # acg._scratch['nsds'].append(msg.id)
2807
2808 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2809
2810 self._log.debug(
2811 "Registering for NSD config using xpath: %s",
2812 NsdDtsHandler.XPATH,
2813 )
2814
2815 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2816 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2817 # Need a list in scratch to store NSDs to create/update later
2818 # acg._scratch['nsds'] = list()
2819 self._regh = acg.register(
2820 xpath=NsdDtsHandler.XPATH,
2821 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
2822 on_prepare=on_prepare)
2823
2824
2825 class VnfdDtsHandler(object):
2826 """ DTS handler for VNFD config changes """
2827 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2828
2829 def __init__(self, dts, log, loop, nsm):
2830 self._dts = dts
2831 self._log = log
2832 self._loop = loop
2833 self._nsm = nsm
2834 self._regh = None
2835
2836 @property
2837 def regh(self):
2838 """ DTS registration handle """
2839 return self._regh
2840
2841 @asyncio.coroutine
2842 def register(self):
2843 """ Register for VNFD configuration"""
2844
2845 @asyncio.coroutine
2846 def on_apply(dts, acg, xact, action, scratch):
2847 """Apply the configuration"""
2848 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2849 xact, action, scratch)
2850
2851 # Create/Update a VNFD record
2852 for cfg in self._regh.get_xact_elements(xact):
2853 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2854 if cfg.id in scratch.get('vnfds', []):
2855 self._nsm.update_vnfd(cfg)
2856
2857 for cfg in self._regh.elements:
2858 if cfg.id in scratch.get('deleted_vnfds', []):
2859 yield from self._nsm.delete_vnfd(cfg.id)
2860
2861 scratch.pop('vnfds', None)
2862 scratch.pop('deleted_vnfds', None)
2863
2864 @asyncio.coroutine
2865 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2866 """ on prepare callback """
2867 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2868 ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg)
2869
2870 fref = ProtobufC.FieldReference.alloc()
2871 fref.goto_whole_message(msg.to_pbcm())
2872
2873 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2874 if fref.is_field_deleted():
2875 self._log.debug("Adding msg to deleted field")
2876 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
2877 deleted_vnfds.append(msg.id)
2878 else:
2879 # Add this VNFD to scratch to create/update in apply callback
2880 vnfds = scratch.setdefault('vnfds', [])
2881 vnfds.append(msg.id)
2882
2883 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2884
2885 self._log.debug(
2886 "Registering for VNFD config using xpath: %s",
2887 VnfdDtsHandler.XPATH,
2888 )
2889 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2890 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2891 # Need a list in scratch to store VNFDs to create/update later
2892 # acg._scratch['vnfds'] = list()
2893 # acg._scratch['deleted_vnfds'] = list()
2894 self._regh = acg.register(
2895 xpath=VnfdDtsHandler.XPATH,
2896 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2897 on_prepare=on_prepare)
2898
2899 class NsrRpcDtsHandler(object):
2900 """ The network service instantiation RPC DTS handler """
2901 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
2902 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
2903 NETCONF_IP_ADDRESS = "127.0.0.1"
2904 NETCONF_PORT = 2022
2905 RESTCONF_PORT = 8888
2906 NETCONF_USER = "admin"
2907 NETCONF_PW = "admin"
2908 REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1",8888)
2909
2910 def __init__(self, dts, log, loop, nsm):
2911 self._dts = dts
2912 self._log = log
2913 self._loop = loop
2914 self._nsm = nsm
2915 self._nsd = None
2916
2917 self._ns_regh = None
2918
2919 self._manager = None
2920 self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + 'config/ns-instance-config'
2921
2922 self._model = RwYang.Model.create_libncx()
2923 self._model.load_schema_ypbc(RwNsrYang.get_schema())
2924
2925 @property
2926 def nsm(self):
2927 """ Return the NS manager instance """
2928 return self._nsm
2929
2930 @staticmethod
2931 def wrap_netconf_config_xml(xml):
2932 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
2933 return xml
2934
2935 @asyncio.coroutine
2936 def _connect(self, timeout_secs=240):
2937
2938 start_time = time.time()
2939 while (time.time() - start_time) < timeout_secs:
2940
2941 try:
2942 self._log.debug("Attemping NsmTasklet netconf connection.")
2943
2944 manager = yield from ncclient.asyncio_manager.asyncio_connect(
2945 loop=self._loop,
2946 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
2947 port=NsrRpcDtsHandler.NETCONF_PORT,
2948 username=NsrRpcDtsHandler.NETCONF_USER,
2949 password=NsrRpcDtsHandler.NETCONF_PW,
2950 allow_agent=False,
2951 look_for_keys=False,
2952 hostkey_verify=False,
2953 )
2954
2955 return manager
2956
2957 except ncclient.transport.errors.SSHError as e:
2958 self._log.warning("Netconf connection to launchpad %s failed: %s",
2959 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
2960
2961 yield from asyncio.sleep(5, loop=self._loop)
2962
2963 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2964 timeout_secs)
2965
2966 def _apply_ns_instance_config(self,payload_dict):
2967 #self._log.debug("At apply NS instance config with payload %s",payload_dict)
2968 req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
2969 response=requests.post(self._nsr_config_url, headers=req_hdr, auth=('admin', 'admin'),data=payload_dict,verify=False)
2970 return response
2971
2972 @asyncio.coroutine
2973 def register(self):
2974 """ Register for NS monitoring read from dts """
2975 @asyncio.coroutine
2976 def on_ns_config_prepare(xact_info, action, ks_path, msg):
2977 """ prepare callback from dts start-network-service"""
2978 assert action == rwdts.QueryAction.RPC
2979 rpc_ip = msg
2980 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
2981 "nsr_id":str(uuid.uuid4())
2982 })
2983
2984 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
2985 self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip))
2986
2987
2988 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
2989
2990 try:
2991 # Add used value to the pool
2992 self._log.debug("RPC output: {}".format(rpc_op))
2993
2994 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
2995
2996 #if not self._manager:
2997 # self._manager = yield from self._connect()
2998
2999 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
3000 rpc_ip.name, rpc_ip.nsd_ref)
3001
3002 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
3003 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
3004 if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields}
3005 ns_instance_config_dict.update(ns_instance_config_copy_dict)
3006
3007 ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
3008 ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
3009 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
3010
3011 payload_dict = ns_instance_config.to_json(self._model)
3012 #xml = ns_instance_config.to_xml_v2(self._model)
3013 #netconf_xml = self.wrap_netconf_config_xml(xml)
3014
3015 #self._log.debug("Sending configure ns-instance-config xml to %s: %s",
3016 # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
3017 self._log.debug("Sending configure ns-instance-config json to %s: %s",
3018 self._nsr_config_url,ns_instance_config)
3019
3020 #response = yield from self._manager.edit_config(
3021 # target="running",
3022 # config=netconf_xml,
3023 # )
3024 response = yield from self._loop.run_in_executor(
3025 None,
3026 self._apply_ns_instance_config,
3027 payload_dict
3028 )
3029 response.raise_for_status()
3030 self._log.debug("Received edit config response: %s", response.json())
3031
3032 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
3033 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3034 rpc_op)
3035 except Exception as e:
3036 self._log.error("Exception processing the "
3037 "start-network-service: {}".format(e))
3038 self._log.exception(e)
3039 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3040 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3041
3042
3043 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
3044
3045 with self._dts.group_create() as group:
3046 self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
3047 handler=hdl_ns,
3048 flags=rwdts.Flag.PUBLISHER,
3049 )
3050
3051
3052 class NsrDtsHandler(object):
3053 """ The network service DTS handler """
3054 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3055 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3056 KEY_PAIR_XPATH = "C,/nsr:key-pair"
3057
3058 def __init__(self, dts, log, loop, nsm):
3059 self._dts = dts
3060 self._log = log
3061 self._loop = loop
3062 self._nsm = nsm
3063
3064 self._nsr_regh = None
3065 self._scale_regh = None
3066 self._key_pair_regh = None
3067
3068 @property
3069 def nsm(self):
3070 """ Return the NS manager instance """
3071 return self._nsm
3072
3073 @asyncio.coroutine
3074 def register(self):
3075 """ Register for Nsr create/update/delete/read requests from dts """
3076
3077 def nsr_id_from_keyspec(ks):
3078 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3079 nsr_id = nsr_path_entry.key00.id
3080 return nsr_id
3081
3082 def group_name_from_keyspec(ks):
3083 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3084 group_name = group_path_entry.key00.scaling_group_name_ref
3085 return group_name
3086
3087 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3088 """ Return boolean indicating if scaling group instance was already commited previously.
3089
3090 By looking at the existing elements in this registration handle (elements not part
3091 of this current xact), we can tell if the instance was configured previously without
3092 keeping any application state.
3093 """
3094 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3095 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3096 elem_group_name = group_name_from_keyspec(keyspec)
3097
3098 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3099 continue
3100
3101 if instance_cfg.id == instance_id:
3102 return True
3103
3104 return False
3105
3106 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3107 delta = {"added": [], "deleted": []}
3108 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3109 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3110 if elem_nsr_id != nsr_id:
3111 continue
3112
3113 elem_group_name = group_name_from_keyspec(keyspec)
3114 if elem_group_name != group_name:
3115 continue
3116
3117 delta["added"].append(instance_cfg.id)
3118
3119 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3120 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3121 if elem_nsr_id != nsr_id:
3122 continue
3123
3124 elem_group_name = group_name_from_keyspec(keyspec)
3125 if elem_group_name != group_name:
3126 continue
3127
3128 if instance_cfg.id in delta["added"]:
3129 delta["added"].remove(instance_cfg.id)
3130 else:
3131 delta["deleted"].append(instance_cfg.id)
3132
3133 return delta
3134
3135 @asyncio.coroutine
3136 def update_nsr_nsd(nsr_id, xact, scratch):
3137
3138 @asyncio.coroutine
3139 def get_nsr_vl_delta(nsr_id, xact, scratch):
3140 delta = {"added": [], "deleted": []}
3141 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3142 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3143 if elem_nsr_id != nsr_id:
3144 continue
3145
3146 if 'vld' in instance_cfg.nsd:
3147 for vld in instance_cfg.nsd.vld:
3148 delta["added"].append(vld)
3149
3150 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3151 self._log.debug("NSR update: %s", instance_cfg)
3152 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3153 if elem_nsr_id != nsr_id:
3154 continue
3155
3156 if 'vld' in instance_cfg.nsd:
3157 for vld in instance_cfg.nsd.vld:
3158 if vld in delta["added"]:
3159 delta["added"].remove(vld)
3160 else:
3161 delta["deleted"].append(vld)
3162
3163 return delta
3164
3165 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3166 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3167
3168 for vld in vl_delta["added"]:
3169 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3170
3171 for vld in vl_delta["deleted"]:
3172 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3173
3174 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch):
3175 # Unfortunately, it is currently difficult to figure out what has exactly
3176 # changed in this xact without Pbdelta support (RIFT-4916)
3177 # As a workaround, we can fetch the pre and post xact elements and
3178 # perform a comparison to figure out adds/deletes/updates
3179 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
3180 curr_cfgs = list(dts_member_reg.elements)
3181
3182 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
3183 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
3184
3185 # Find Adds
3186 added_keys = set(xact_key_map) - set(curr_key_map)
3187 added_cfgs = [xact_key_map[key] for key in added_keys]
3188
3189 # Find Deletes
3190 deleted_keys = set(curr_key_map) - set(xact_key_map)
3191 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
3192
3193 # Find Updates
3194 updated_keys = set(curr_key_map) & set(xact_key_map)
3195 updated_cfgs = [xact_key_map[key] for key in updated_keys
3196 if xact_key_map[key] != curr_key_map[key]]
3197
3198 return added_cfgs, deleted_cfgs, updated_cfgs
3199
3200 def get_nsr_key_pairs(dts_member_reg, xact):
3201 key_pairs = {}
3202 for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True):
3203 self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec))
3204 xpath = keyspec.to_xpath(RwNsrYang.get_schema())
3205 key_pairs[instance_cfg.name] = instance_cfg
3206 return key_pairs
3207
3208 def on_apply(dts, acg, xact, action, scratch):
3209 """Apply the configuration"""
3210 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3211 xact, action, scratch)
3212
3213 def handle_create_nsr(msg, key_pairs=None, restart_mode=False):
3214 # Handle create nsr requests """
3215 # Do some validations
3216 if not msg.has_field("nsd"):
3217 err = "NSD not provided"
3218 self._log.error(err)
3219 raise NetworkServiceRecordError(err)
3220
3221 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3222 msg.id, msg.as_dict())
3223 nsr = self.nsm.create_nsr(msg, key_pairs=key_pairs, restart_mode=restart_mode)
3224 return nsr
3225
3226 def handle_delete_nsr(msg):
3227 @asyncio.coroutine
3228 def delete_instantiation(ns_id):
3229 """ Delete instantiation """
3230 with self._dts.transaction() as xact:
3231 yield from self._nsm.terminate_ns(ns_id, xact)
3232
3233 # Handle delete NSR requests
3234 self._log.info("Delete req for NSR Id: %s received", msg.id)
3235 # Terminate the NSR instance
3236 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3237
3238 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3239 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3240 nsr.record_event("terminate-rcvd", event_descr)
3241
3242 self._loop.create_task(delete_instantiation(msg.id))
3243
3244 @asyncio.coroutine
3245 def begin_instantiation(nsr):
3246 # Begin instantiation
3247 self._log.info("Beginning NS instantiation: %s", nsr.id)
3248 yield from self._nsm.instantiate_ns(nsr.id, xact)
3249
3250 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3251 xact, action, scratch)
3252
3253 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3254 key_pairs = []
3255 for element in self._key_pair_regh.elements:
3256 key_pairs.append(element)
3257 for element in self._nsr_regh.elements:
3258 nsr = handle_create_nsr(element, key_pairs, restart_mode=True)
3259 self._loop.create_task(begin_instantiation(nsr))
3260
3261
3262 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3263 xact,
3264 "id",
3265 scratch)
3266 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3267 deleted_msgs, updated_msgs)
3268
3269 for msg in added_msgs:
3270 if msg.id not in self._nsm.nsrs:
3271 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3272 key_pairs = get_nsr_key_pairs(self._key_pair_regh, xact)
3273 nsr = handle_create_nsr(msg,key_pairs)
3274 self._loop.create_task(begin_instantiation(nsr))
3275
3276 for msg in deleted_msgs:
3277 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3278 try:
3279 handle_delete_nsr(msg)
3280 except Exception:
3281 self._log.exception("Failed to terminate NS:%s", msg.id)
3282
3283 for msg in updated_msgs:
3284 self._log.info("Update NSR received in on_apply: %s", msg)
3285
3286 self._nsm.nsr_update_cfg(msg.id, msg)
3287
3288 if 'nsd' in msg:
3289 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3290
3291 for group in msg.scaling_group:
3292 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3293 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3294
3295 for instance_id in instance_delta["added"]:
3296 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3297
3298 for instance_id in instance_delta["deleted"]:
3299 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3300
3301
3302 return RwTypes.RwStatus.SUCCESS
3303
3304 @asyncio.coroutine
3305 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3306 """ Prepare calllback from DTS for NSR """
3307
3308 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3309 action = xact_info.query_action
3310 self._log.debug(
3311 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3312 xact, action, xact_info, xpath, msg
3313 )
3314
3315 @asyncio.coroutine
3316 def delete_instantiation(ns_id):
3317 """ Delete instantiation """
3318 yield from self._nsm.terminate_ns(ns_id, None)
3319
3320 def handle_delete_nsr():
3321 """ Handle delete NSR requests """
3322 self._log.info("Delete req for NSR Id: %s received", msg.id)
3323 # Terminate the NSR instance
3324 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3325
3326 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3327 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3328 nsr.record_event("terminate-rcvd", event_descr)
3329
3330 self._loop.create_task(delete_instantiation(msg.id))
3331
3332 fref = ProtobufC.FieldReference.alloc()
3333 fref.goto_whole_message(msg.to_pbcm())
3334
3335 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3336 # if this is an NSR create
3337 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3338 # Ensure the Cloud account/datacenter has been specified
3339 if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"):
3340 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3341
3342 # Check if nsd is specified
3343 if not msg.has_field("nsd"):
3344 raise NsrInstantiationFailed("NSD not specified in NSR")
3345
3346 else:
3347 nsr = self._nsm.nsrs[msg.id]
3348
3349 if msg.has_field("nsd"):
3350 if nsr.state != NetworkServiceRecordState.RUNNING:
3351 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3352 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3353 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3354
3355 if msg.has_field("scaling_group"):
3356 if nsr.state != NetworkServiceRecordState.RUNNING:
3357 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3358
3359 if len(msg.scaling_group) > 1:
3360 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3361
3362 for group_msg in msg.scaling_group:
3363 num_new_group_instances = len(group_msg.instance)
3364 if num_new_group_instances > 1:
3365 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3366
3367 elif num_new_group_instances == 1:
3368 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
3369 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
3370 if len(scale_group.instances) == scale_group.max_instance_count:
3371 raise ScalingOperationError("Max instances for %s reached" % scale_group)
3372
3373 acg.handle.prepare_complete_ok(xact_info.handle)
3374
3375
3376 self._log.debug("Registering for NSR config using xpath: %s",
3377 NsrDtsHandler.NSR_XPATH)
3378
3379 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3380 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3381 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
3382 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3383 on_prepare=on_prepare)
3384
3385 self._scale_regh = acg.register(
3386 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
3387 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
3388 )
3389
3390 self._key_pair_regh = acg.register(
3391 xpath=NsrDtsHandler.KEY_PAIR_XPATH,
3392 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3393 )
3394
3395
3396 class NsrOpDataDtsHandler(object):
3397 """ The network service op data DTS handler """
3398 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
3399
3400 def __init__(self, dts, log, loop, nsm):
3401 self._dts = dts
3402 self._log = log
3403 self._loop = loop
3404 self._nsm = nsm
3405 self._regh = None
3406
3407 @property
3408 def regh(self):
3409 """ Return the registration handle"""
3410 return self._regh
3411
3412 @property
3413 def nsm(self):
3414 """ Return the NS manager instance """
3415 return self._nsm
3416
3417 @asyncio.coroutine
3418 def register(self):
3419 """ Register for Nsr op data publisher registration"""
3420 self._log.debug("Registering Nsr op data path %s as publisher",
3421 NsrOpDataDtsHandler.XPATH)
3422
3423 hdl = rift.tasklets.DTS.RegistrationHandler()
3424 handlers = rift.tasklets.Group.Handler()
3425 with self._dts.group_create(handler=handlers) as group:
3426 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
3427 handler=hdl,
3428 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE)
3429
3430 @asyncio.coroutine
3431 def create(self, path, msg):
3432 """
3433 Create an NS record in DTS with the path and message
3434 """
3435 self._log.debug("Creating NSR %s:%s", path, msg)
3436 self.regh.create_element(path, msg)
3437 self._log.debug("Created NSR, %s:%s", path, msg)
3438
3439 @asyncio.coroutine
3440 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
3441 """
3442 Update an NS record in DTS with the path and message
3443 """
3444 self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh)
3445 self.regh.update_element(path, msg, flags)
3446 self._log.debug("Updated NSR, %s:%s", path, msg)
3447
3448 @asyncio.coroutine
3449 def delete(self, path):
3450 """
3451 Update an NS record in DTS with the path and message
3452 """
3453 self._log.debug("Deleting NSR path:%s", path)
3454 self.regh.delete_element(path)
3455 self._log.debug("Deleted NSR path:%s", path)
3456
3457
3458 class VnfrDtsHandler(object):
3459 """ The virtual network service DTS handler """
3460 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3461
3462 def __init__(self, dts, log, loop, nsm):
3463 self._dts = dts
3464 self._log = log
3465 self._loop = loop
3466 self._nsm = nsm
3467
3468 self._regh = None
3469
3470 @property
3471 def regh(self):
3472 """ Return registration handle """
3473 return self._regh
3474
3475 @property
3476 def nsm(self):
3477 """ Return the NS manager instance """
3478 return self._nsm
3479
3480 @asyncio.coroutine
3481 def register(self):
3482 """ Register for vnfr create/update/delete/ advises from dts """
3483
3484 def on_commit(xact_info):
3485 """ The transaction has been committed """
3486 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
3487 return rwdts.MemberRspCode.ACTION_OK
3488
3489 @asyncio.coroutine
3490 def on_prepare(xact_info, action, ks_path, msg):
3491 """ prepare callback from dts """
3492 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3493 self._log.debug(
3494 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3495 xact_info, action, ks_path, msg
3496 )
3497
3498 schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
3499 path_entry = schema.keyspec_to_entry(ks_path)
3500 if path_entry.key00.id not in self._nsm._vnfrs:
3501 self._log.error("%s request for non existent record path %s",
3502 action, xpath)
3503 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
3504
3505 return
3506
3507 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3508 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3509 yield from self._nsm.update_vnfr(msg)
3510 elif action == rwdts.QueryAction.DELETE:
3511 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3512 self._nsm.delete_vnfr(path_entry.key00.id)
3513
3514 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
3515
3516 self._log.debug("Registering for VNFR using xpath: %s",
3517 VnfrDtsHandler.XPATH,)
3518
3519 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
3520 on_prepare=on_prepare,)
3521 with self._dts.group_create() as group:
3522 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
3523 handler=hdl,
3524 flags=(rwdts.Flag.SUBSCRIBER),)
3525
3526
3527 class NsdRefCountDtsHandler(object):
3528 """ The NSD Ref Count DTS handler """
3529 XPATH = "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count"
3530
3531 def __init__(self, dts, log, loop, nsm):
3532 self._dts = dts
3533 self._log = log
3534 self._loop = loop
3535 self._nsm = nsm
3536
3537 self._regh = None
3538
3539 @property
3540 def regh(self):
3541 """ Return registration handle """
3542 return self._regh
3543
3544 @property
3545 def nsm(self):
3546 """ Return the NS manager instance """
3547 return self._nsm
3548
3549 @asyncio.coroutine
3550 def register(self):
3551 """ Register for NSD ref count read from dts """
3552
3553 @asyncio.coroutine
3554 def on_prepare(xact_info, action, ks_path, msg):
3555 """ prepare callback from dts """
3556 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3557
3558 if action == rwdts.QueryAction.READ:
3559 schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema()
3560 path_entry = schema.keyspec_to_entry(ks_path)
3561 nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref)
3562 for xpath, msg in nsd_list:
3563 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
3564 xpath=xpath,
3565 msg=msg)
3566 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3567 else:
3568 raise NetworkServiceRecordError("Not supported operation %s" % action)
3569
3570 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
3571 with self._dts.group_create() as group:
3572 self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH,
3573 handler=hdl,
3574 flags=rwdts.Flag.PUBLISHER,)
3575
3576
3577 class NsManager(object):
3578 """ The Network Service Manager class"""
3579 def __init__(self, dts, log, loop,
3580 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
3581 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
3582 self._dts = dts
3583 self._log = log
3584 self._loop = loop
3585 self._nsr_handler = nsr_handler
3586 self._vnfr_pub_handler = vnfr_handler
3587 self._vlr_pub_handler = vlr_handler
3588 self._vnffgmgr = vnffgmgr
3589 self._vnfd_pub_handler = vnfd_pub_handler
3590 self._cloud_account_handler = cloud_account_handler
3591
3592 self._ro_plugin_selector = ro_plugin_selector
3593 self._ncclient = rift.mano.ncclient.NcClient(
3594 host="127.0.0.1",
3595 port=2022,
3596 username="admin",
3597 password="admin",
3598 loop=self._loop)
3599
3600 self._nsrs = {}
3601 self._nsds = {}
3602 self._vnfds = {}
3603 self._vnfrs = {}
3604
3605 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
3606
3607 # TODO: All these handlers should move to tasklet level.
3608 # Passing self is often an indication of bad design
3609 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
3610 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
3611 self._dts_handlers = [self._nsd_dts_handler,
3612 VnfrDtsHandler(dts, log, loop, self),
3613 NsdRefCountDtsHandler(dts, log, loop, self),
3614 NsrDtsHandler(dts, log, loop, self),
3615 ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback),
3616 NsrRpcDtsHandler(dts,log,loop,self),
3617 self._vnfd_dts_handler,
3618 self.cfgmgr_obj,
3619 ]
3620
3621
3622 @property
3623 def log(self):
3624 """ Log handle """
3625 return self._log
3626
3627 @property
3628 def loop(self):
3629 """ Loop """
3630 return self._loop
3631
3632 @property
3633 def dts(self):
3634 """ DTS handle """
3635 return self._dts
3636
3637 @property
3638 def nsr_handler(self):
3639 """" NSR handler """
3640 return self._nsr_handler
3641
3642 @property
3643 def so_obj(self):
3644 """" So Obj handler """
3645 return self._so_obj
3646
3647 @property
3648 def nsrs(self):
3649 """ NSRs in this NSM"""
3650 return self._nsrs
3651
3652 @property
3653 def nsds(self):
3654 """ NSDs in this NSM"""
3655 return self._nsds
3656
3657 @property
3658 def vnfds(self):
3659 """ VNFDs in this NSM"""
3660 return self._vnfds
3661
3662 @property
3663 def vnfrs(self):
3664 """ VNFRs in this NSM"""
3665 return self._vnfrs
3666
3667 @property
3668 def nsr_pub_handler(self):
3669 """ NSR publication handler """
3670 return self._nsr_handler
3671
3672 @property
3673 def vnfr_pub_handler(self):
3674 """ VNFR publication handler """
3675 return self._vnfr_pub_handler
3676
3677 @property
3678 def vlr_pub_handler(self):
3679 """ VLR publication handler """
3680 return self._vlr_pub_handler
3681
3682 @property
3683 def vnfd_pub_handler(self):
3684 return self._vnfd_pub_handler
3685
3686 @asyncio.coroutine
3687 def register(self):
3688 """ Register all static DTS handlers """
3689 for dts_handle in self._dts_handlers:
3690 yield from dts_handle.register()
3691
3692
3693 def get_ns_by_nsr_id(self, nsr_id):
3694 """ get NSR by nsr id """
3695 if nsr_id not in self._nsrs:
3696 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
3697
3698 return self._nsrs[nsr_id]
3699
3700 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
3701 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3702 nsr_id,
3703 scale_group_name,
3704 instance_id
3705 )
3706 nsr = self._nsrs[nsr_id]
3707 if nsr.state != NetworkServiceRecordState.RUNNING:
3708 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3709
3710 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
3711
3712 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
3713 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3714 nsr_id,
3715 scale_group_name,
3716 instance_id,
3717 )
3718 nsr = self._nsrs[nsr_id]
3719 if nsr.state != NetworkServiceRecordState.RUNNING:
3720 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3721
3722 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
3723
3724 def scale_rpc_callback(self, xact, msg, action):
3725 """Callback handler for RPC calls
3726 Args:
3727 xact : Transaction Handler
3728 msg : RPC input
3729 action : Scaling Action
3730 """
3731 ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
3732 ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
3733
3734 xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
3735 msg.nsr_id_ref)
3736 instance = ScalingGroupInstance.from_dict({"id": msg.instance_id})
3737
3738 @asyncio.coroutine
3739 def get_nsr_scaling_group():
3740 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3741
3742 for result in results:
3743 res = yield from result
3744 nsr_config = res.result
3745
3746 for scaling_group in nsr_config.scaling_group:
3747 if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref:
3748 break
3749 else:
3750 scaling_group = nsr_config.scaling_group.add()
3751 scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref
3752
3753 return (nsr_config, scaling_group)
3754
3755 @asyncio.coroutine
3756 def update_config(nsr_config):
3757 xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config)
3758 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
3759 yield from self._ncclient.connect()
3760 yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace")
3761
3762 @asyncio.coroutine
3763 def scale_out():
3764 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3765 scaling_group.instance.append(instance)
3766 yield from update_config(nsr_config)
3767
3768 @asyncio.coroutine
3769 def scale_in():
3770 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3771 scaling_group.instance.remove(instance)
3772 yield from update_config(nsr_config)
3773
3774 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3775 self._loop.create_task(scale_out())
3776 else:
3777 self._loop.create_task(scale_in())
3778
3779 # Opdata based calls, disabled for now!
3780 # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3781 # self.scale_nsr_out(
3782 # msg.nsr_id_ref,
3783 # msg.scaling_group_name_ref,
3784 # msg.instance_id,
3785 # xact)
3786 # else:
3787 # self.scale_nsr_in(
3788 # msg.nsr_id_ref,
3789 # msg.scaling_group_name_ref,
3790 # msg.instance_id)
3791
3792 def nsr_update_cfg(self, nsr_id, msg):
3793 nsr = self._nsrs[nsr_id]
3794 nsr.nsr_cfg_msg= msg
3795
3796 def nsr_instantiate_vl(self, nsr_id, vld):
3797 self.log.debug("NSR {} create VL {}".format(nsr_id, vld))
3798 nsr = self._nsrs[nsr_id]
3799 if nsr.state != NetworkServiceRecordState.RUNNING:
3800 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3801
3802 # Not calling in a separate task as this is called from a separate task
3803 yield from nsr.create_vl_instance(vld)
3804
3805 def nsr_terminate_vl(self, nsr_id, vld):
3806 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
3807 nsr = self._nsrs[nsr_id]
3808 if nsr.state != NetworkServiceRecordState.RUNNING:
3809 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3810
3811 # Not calling in a separate task as this is called from a separate task
3812 yield from nsr.delete_vl_instance(vld)
3813
3814 def create_nsr(self, nsr_msg, key_pairs=None,restart_mode=False):
3815 """ Create an NSR instance """
3816 if nsr_msg.id in self._nsrs:
3817 msg = "NSR id %s already exists" % nsr_msg.id
3818 self._log.error(msg)
3819 raise NetworkServiceRecordError(msg)
3820
3821 self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3822 nsr_msg.id,
3823 nsr_msg.nsd.id)
3824
3825 nsm_plugin = self._ro_plugin_selector.ro_plugin
3826 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account)
3827
3828 nsr = NetworkServiceRecord(self._dts,
3829 self._log,
3830 self._loop,
3831 self,
3832 nsm_plugin,
3833 nsr_msg,
3834 sdn_account_name,
3835 key_pairs,
3836 restart_mode=restart_mode
3837 )
3838 self._nsrs[nsr_msg.id] = nsr
3839 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs)
3840
3841 return nsr
3842
3843 def delete_nsr(self, nsr_id):
3844 """
3845 Delete NSR with the passed nsr id
3846 """
3847 del self._nsrs[nsr_id]
3848
3849 @asyncio.coroutine
3850 def instantiate_ns(self, nsr_id, config_xact):
3851 """ Instantiate an NS instance """
3852 self._log.debug("Instantiating Network service id %s", nsr_id)
3853 if nsr_id not in self._nsrs:
3854 err = "NSR id %s not found " % nsr_id
3855 self._log.error(err)
3856 raise NetworkServiceRecordError(err)
3857
3858 nsr = self._nsrs[nsr_id]
3859 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
3860
3861 @asyncio.coroutine
3862 def update_vnfr(self, vnfr):
3863 """Create/Update an VNFR """
3864
3865 vnfr_state = self._vnfrs[vnfr.id].state
3866 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
3867
3868 yield from self._vnfrs[vnfr.id].update_state(vnfr)
3869 nsr = self.find_nsr_for_vnfr(vnfr.id)
3870 yield from nsr.update_state()
3871
3872 def find_nsr_for_vnfr(self, vnfr_id):
3873 """ Find the NSR which )has the passed vnfr id"""
3874 for nsr in list(self.nsrs.values()):
3875 for vnfr in list(nsr.vnfrs.values()):
3876 if vnfr.id == vnfr_id:
3877 return nsr
3878 return None
3879
3880 def delete_vnfr(self, vnfr_id):
3881 """ Delete VNFR with the passed id"""
3882 del self._vnfrs[vnfr_id]
3883
3884 def get_nsd_ref(self, nsd_id):
3885 """ Get network service descriptor for the passed nsd_id
3886 with a reference"""
3887 nsd = self.get_nsd(nsd_id)
3888 nsd.ref()
3889 return nsd
3890
3891 @asyncio.coroutine
3892 def get_nsr_config(self, nsd_id):
3893 xpath = "C,/nsr:ns-instance-config"
3894 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3895
3896 for result in results:
3897 entry = yield from result
3898 ns_instance_config = entry.result
3899
3900 for nsr in ns_instance_config.nsr:
3901 if nsr.nsd.id == nsd_id:
3902 return nsr
3903
3904 return None
3905
3906 @asyncio.coroutine
3907 def nsd_unref_by_nsr_id(self, nsr_id):
3908 """ Unref the network service descriptor based on NSR id """
3909 self._log.debug("NSR Unref called for Nsr Id:%s", nsr_id)
3910 if nsr_id in self._nsrs:
3911 nsr = self._nsrs[nsr_id]
3912
3913 try:
3914 nsd = self.get_nsd(nsr.nsd_id)
3915 self._log.debug("Releasing ref on NSD %s held by NSR %s - Curr %d",
3916 nsd.id, nsr.id, nsd.ref_count)
3917 nsd.unref()
3918 except NetworkServiceDescriptorError:
3919 # We store a copy of NSD in NSR and the NSD in nsd-catalog
3920 # could be deleted
3921 pass
3922
3923 else:
3924 self._log.error("Cannot find NSR with id %s", nsr_id)
3925 raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id)
3926
3927 @asyncio.coroutine
3928 def nsd_unref(self, nsd_id):
3929 """ Unref the network service descriptor associated with the id """
3930 nsd = self.get_nsd(nsd_id)
3931 nsd.unref()
3932
3933 def get_nsd(self, nsd_id):
3934 """ Get network service descriptor for the passed nsd_id"""
3935 if nsd_id not in self._nsds:
3936 self._log.error("Cannot find NSD id:%s", nsd_id)
3937 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
3938
3939 return self._nsds[nsd_id]
3940
3941 def create_nsd(self, nsd_msg):
3942 """ Create a network service descriptor """
3943 self._log.debug("Create network service descriptor - %s", nsd_msg)
3944 if nsd_msg.id in self._nsds:
3945 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
3946 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
3947
3948 nsd = NetworkServiceDescriptor(
3949 self._dts,
3950 self._log,
3951 self._loop,
3952 nsd_msg,
3953 self
3954 )
3955 self._nsds[nsd_msg.id] = nsd
3956
3957 return nsd
3958
3959 def update_nsd(self, nsd):
3960 """ update the Network service descriptor """
3961 self._log.debug("Update network service descriptor - %s", nsd)
3962 if nsd.id not in self._nsds:
3963 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
3964 self.create_nsd(nsd)
3965 else:
3966 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
3967 self._nsds[nsd.id].update(nsd)
3968
3969 def delete_nsd(self, nsd_id):
3970 """ Delete the Network service descriptor with the passed id """
3971 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
3972 if nsd_id not in self._nsds:
3973 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
3974 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
3975
3976 if nsd_id not in self._nsds:
3977 self._log.debug("Cannot delete NSD id %s reference exists %s",
3978 nsd_id,
3979 self._nsds[nsd_id].ref_count)
3980 raise NetworkServiceDescriptorRefCountExists(
3981 "Cannot delete :%s, ref_count:%s",
3982 nsd_id,
3983 self._nsds[nsd_id].ref_count)
3984
3985 del self._nsds[nsd_id]
3986
3987 def get_vnfd_config(self, xact):
3988 vnfd_dts_reg = self._vnfd_dts_handler.regh
3989 for cfg in vnfd_dts_reg.get_xact_elements(xact):
3990 if cfg.id not in self._vnfds:
3991 self.create_vnfd(cfg)
3992
3993 def get_vnfd(self, vnfd_id, xact):
3994 """ Get virtual network function descriptor for the passed vnfd_id"""
3995 if vnfd_id not in self._vnfds:
3996 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3997 self.get_vnfd_config(xact)
3998
3999 if vnfd_id not in self._vnfds:
4000 self._log.error("Cannot find VNFD id:%s", vnfd_id)
4001 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
4002
4003 return self._vnfds[vnfd_id]
4004
4005 def create_vnfd(self, vnfd):
4006 """ Create a virtual network function descriptor """
4007 self._log.debug("Create virtual network function descriptor - %s", vnfd)
4008 if vnfd.id in self._vnfds:
4009 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
4010 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
4011
4012 self._vnfds[vnfd.id] = vnfd
4013 return self._vnfds[vnfd.id]
4014
4015 def update_vnfd(self, vnfd):
4016 """ Update the virtual network function descriptor """
4017 self._log.debug("Update virtual network function descriptor- %s", vnfd)
4018
4019
4020 if vnfd.id not in self._vnfds:
4021 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
4022 self.create_vnfd(vnfd)
4023 else:
4024 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
4025 self._vnfds[vnfd.id] = vnfd
4026
4027 @asyncio.coroutine
4028 def delete_vnfd(self, vnfd_id):
4029 """ Delete the virtual network function descriptor with the passed id """
4030 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
4031 if vnfd_id not in self._vnfds:
4032 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
4033 raise VnfDescriptorError("Cannot find %s", vnfd_id)
4034
4035 del self._vnfds[vnfd_id]
4036
4037 def nsd_in_use(self, nsd_id):
4038 """ Is the NSD with the passed id in use """
4039 self._log.debug("Is this NSD in use - msg:%s", nsd_id)
4040 if nsd_id in self._nsds:
4041 return self._nsds[nsd_id].in_use()
4042 return False
4043
4044 @asyncio.coroutine
4045 def publish_nsr(self, xact, path, msg):
4046 """ Publish a NSR """
4047 self._log.debug("Publish NSR with path %s, msg %s",
4048 path, msg)
4049 yield from self.nsr_handler.update(xact, path, msg)
4050
4051 @asyncio.coroutine
4052 def unpublish_nsr(self, xact, path):
4053 """ Un Publish an NSR """
4054 self._log.debug("Publishing delete NSR with path %s", path)
4055 yield from self.nsr_handler.delete(path, xact)
4056
4057 def vnfr_is_ready(self, vnfr_id):
4058 """ VNFR with the id is ready """
4059 self._log.debug("VNFR id %s ready", vnfr_id)
4060 if vnfr_id not in self._vnfds:
4061 err = "Did not find VNFR ID with id %s" % vnfr_id
4062 self._log.critical("err")
4063 raise VirtualNetworkFunctionRecordError(err)
4064 self._vnfrs[vnfr_id].is_ready()
4065
4066 @asyncio.coroutine
4067 def get_nsd_refcount(self, nsd_id):
4068 """ Get the nsd_list from this NSM"""
4069
4070 def nsd_refcount_xpath(nsd_id):
4071 """ xpath for ref count entry """
4072 return (NsdRefCountDtsHandler.XPATH +
4073 "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id)
4074
4075 nsd_list = []
4076 if nsd_id is None or nsd_id == "":
4077 for nsd in self._nsds.values():
4078 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4079 nsd_msg.nsd_id_ref = nsd.id
4080 nsd_msg.instance_ref_count = nsd.ref_count
4081 nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg))
4082 elif nsd_id in self._nsds:
4083 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4084 nsd_msg.nsd_id_ref = self._nsds[nsd_id].id
4085 nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count
4086 nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg))
4087
4088 return nsd_list
4089
4090 @asyncio.coroutine
4091 def terminate_ns(self, nsr_id, xact):
4092 """
4093 Terminate network service for the given NSR Id
4094 """
4095
4096 # Terminate the instances/networks assocaited with this nw service
4097 self._log.debug("Terminating the network service %s", nsr_id)
4098 try :
4099 yield from self._nsrs[nsr_id].terminate()
4100 except Exception as e:
4101 self.log.exception("Failed to terminate NSR[id=%s]", nsr_id)
4102
4103 # Unref the NSD
4104 yield from self.nsd_unref_by_nsr_id(nsr_id)
4105
4106 # Unpublish the NSR record
4107 self._log.debug("Unpublishing the network service %s", nsr_id)
4108 yield from self._nsrs[nsr_id].unpublish(xact)
4109
4110 # Finaly delete the NS instance from this NS Manager
4111 self._log.debug("Deletng the network service %s", nsr_id)
4112 self.delete_nsr(nsr_id)
4113
4114
4115 class NsmRecordsPublisherProxy(object):
4116 """ This class provides a publisher interface that allows plugin objects
4117 to publish NSR/VNFR/VLR"""
4118
4119 def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr):
4120 self._dts = dts
4121 self._log = log
4122 self._loop = loop
4123 self._nsr_pub_hdlr = nsr_pub_hdlr
4124 self._vlr_pub_hdlr = vlr_pub_hdlr
4125 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4126
4127 @asyncio.coroutine
4128 def publish_nsr(self, xact, nsr):
4129 """ Publish an NSR """
4130 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4131 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4132
4133 @asyncio.coroutine
4134 def unpublish_nsr(self, xact, nsr):
4135 """ Unpublish an NSR """
4136 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4137 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4138
4139 @asyncio.coroutine
4140 def publish_vnfr(self, xact, vnfr):
4141 """ Publish an VNFR """
4142 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4143 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4144
4145 @asyncio.coroutine
4146 def unpublish_vnfr(self, xact, vnfr):
4147 """ Unpublish a VNFR """
4148 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4149 return (yield from self._vnfr_pub_hdlr.delete(xact, path))
4150
4151 @asyncio.coroutine
4152 def publish_vlr(self, xact, vlr):
4153 """ Publish a VLR """
4154 path = VirtualLinkRecord.vlr_xpath(vlr)
4155 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4156
4157 @asyncio.coroutine
4158 def unpublish_vlr(self, xact, vlr):
4159 """ Unpublish a VLR """
4160 path = VirtualLinkRecord.vlr_xpath(vlr)
4161 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4162
4163
4164 class ScalingRpcHandler(mano_dts.DtsHandler):
4165 """ The Network service Monitor DTS handler """
4166 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4167 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4168
4169 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4170 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4171
4172 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4173
4174 def __init__(self, log, dts, loop, callback=None):
4175 super().__init__(log, dts, loop)
4176 self.callback = callback
4177 self.last_instance_id = defaultdict(int)
4178
4179 @asyncio.coroutine
4180 def register(self):
4181
4182 @asyncio.coroutine
4183 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4184 assert action == rwdts.QueryAction.RPC
4185
4186 try:
4187 if self.callback:
4188 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4189
4190 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4191 "instance_id": msg.instance_id})
4192
4193 xact_info.respond_xpath(
4194 rwdts.XactRspCode.ACK,
4195 self.__class__.SCALE_IN_OUTPUT_XPATH,
4196 rpc_op)
4197
4198 except Exception as e:
4199 self.log.exception(e)
4200 xact_info.respond_xpath(
4201 rwdts.XactRspCode.NACK,
4202 self.__class__.SCALE_IN_OUTPUT_XPATH)
4203
4204 @asyncio.coroutine
4205 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4206 assert action == rwdts.QueryAction.RPC
4207
4208 try:
4209 scaling_group = msg.scaling_group_name_ref
4210 if not msg.instance_id:
4211 last_instance_id = self.last_instance_id[scale_group]
4212 msg.instance_id = last_instance_id + 1
4213 self.last_instance_id[scale_group] += 1
4214
4215 if self.callback:
4216 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4217
4218 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4219 "instance_id": msg.instance_id})
4220
4221 xact_info.respond_xpath(
4222 rwdts.XactRspCode.ACK,
4223 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4224 rpc_op)
4225
4226 except Exception as e:
4227 self.log.exception(e)
4228 xact_info.respond_xpath(
4229 rwdts.XactRspCode.NACK,
4230 self.__class__.SCALE_OUT_OUTPUT_XPATH)
4231
4232 scale_in_hdl = rift.tasklets.DTS.RegistrationHandler(
4233 on_prepare=on_scale_in_prepare)
4234 scale_out_hdl = rift.tasklets.DTS.RegistrationHandler(
4235 on_prepare=on_scale_out_prepare)
4236
4237 with self.dts.group_create() as group:
4238 group.register(
4239 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4240 handler=scale_in_hdl,
4241 flags=rwdts.Flag.PUBLISHER)
4242 group.register(
4243 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4244 handler=scale_out_hdl,
4245 flags=rwdts.Flag.PUBLISHER)
4246
4247
4248 class NsmTasklet(rift.tasklets.Tasklet):
4249 """
4250 The network service manager tasklet
4251 """
4252 def __init__(self, *args, **kwargs):
4253 super(NsmTasklet, self).__init__(*args, **kwargs)
4254 self.rwlog.set_category("rw-mano-log")
4255 self.rwlog.set_subcategory("nsm")
4256
4257 self._dts = None
4258 self._nsm = None
4259
4260 self._ro_plugin_selector = None
4261 self._vnffgmgr = None
4262
4263 self._nsr_handler = None
4264 self._vnfr_pub_handler = None
4265 self._vlr_pub_handler = None
4266 self._vnfd_pub_handler = None
4267 self._scale_cfg_handler = None
4268
4269 self._records_publisher_proxy = None
4270
4271 def start(self):
4272 """ The task start callback """
4273 super(NsmTasklet, self).start()
4274 self.log.info("Starting NsmTasklet")
4275
4276 self.log.debug("Registering with dts")
4277 self._dts = rift.tasklets.DTS(self.tasklet_info,
4278 RwNsmYang.get_schema(),
4279 self.loop,
4280 self.on_dts_state_change)
4281
4282 self.log.debug("Created DTS Api GI Object: %s", self._dts)
4283
4284 def stop(self):
4285 try:
4286 self._dts.deinit()
4287 except Exception:
4288 print("Caught Exception in NSM stop:", sys.exc_info()[0])
4289 raise
4290
4291 def on_instance_started(self):
4292 """ Task instance started callback """
4293 self.log.debug("Got instance started callback")
4294
4295 @asyncio.coroutine
4296 def init(self):
4297 """ Task init callback """
4298 self.log.debug("Got instance started callback")
4299
4300 self.log.debug("creating config account handler")
4301
4302 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop)
4303 yield from self._nsr_pub_handler.register()
4304
4305 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop)
4306 yield from self._vnfr_pub_handler.register()
4307
4308 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop)
4309 yield from self._vlr_pub_handler.register()
4310
4311 manifest = self.tasklet_info.get_pb_manifest()
4312 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4313 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4314 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4315
4316 self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop)
4317
4318 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4319 self._dts,
4320 self.log,
4321 self.loop,
4322 self._nsr_pub_handler,
4323 self._vnfr_pub_handler,
4324 self._vlr_pub_handler,
4325 )
4326
4327 # Register the NSM to receive the nsm plugin
4328 # when cloud account is configured
4329 self._ro_plugin_selector = cloud.ROAccountPluginSelector(
4330 self._dts,
4331 self.log,
4332 self.loop,
4333 self._records_publisher_proxy,
4334 )
4335 yield from self._ro_plugin_selector.register()
4336
4337 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4338 self._log,
4339 self._dts,
4340 self.log_hdl)
4341
4342 yield from self._cloud_account_handler.register()
4343
4344 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop)
4345 yield from self._vnffgmgr.register()
4346
4347 self._nsm = NsManager(
4348 self._dts,
4349 self.log,
4350 self.loop,
4351 self._nsr_pub_handler,
4352 self._vnfr_pub_handler,
4353 self._vlr_pub_handler,
4354 self._ro_plugin_selector,
4355 self._vnffgmgr,
4356 self._vnfd_pub_handler,
4357 self._cloud_account_handler
4358 )
4359
4360 yield from self._nsm.register()
4361
4362 @asyncio.coroutine
4363 def run(self):
4364 """ Task run callback """
4365 pass
4366
4367 @asyncio.coroutine
4368 def on_dts_state_change(self, state):
4369 """Take action according to current dts state to transition
4370 application into the corresponding application state
4371
4372 Arguments
4373 state - current dts state
4374 """
4375 switch = {
4376 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
4377 rwdts.State.CONFIG: rwdts.State.RUN,
4378 }
4379
4380 handlers = {
4381 rwdts.State.INIT: self.init,
4382 rwdts.State.RUN: self.run,
4383 }
4384
4385 # Transition application to next state
4386 handler = handlers.get(state, None)
4387 if handler is not None:
4388 yield from handler()
4389
4390 # Transition dts to next state
4391 next_state = switch.get(state, None)
4392 if next_state is not None:
4393 self.log.debug("Changing state to %s", next_state)
4394 self._dts.handle.set_state(next_state)