e600b9af38d2701101f09fe6f5f9c6de6ee3dab7
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import ncclient
20 import ncclient.asyncio_manager
21 import os
22 import shutil
23 import sys
24 import tempfile
25 import time
26 import uuid
27 import yaml
28 import requests
29 import json
30
31
32 from collections import deque
33 from collections import defaultdict
34 from enum import Enum
35
36 import gi
37 gi.require_version('RwYang', '1.0')
38 gi.require_version('RwNsdYang', '1.0')
39 gi.require_version('RwDts', '1.0')
40 gi.require_version('RwNsmYang', '1.0')
41 gi.require_version('RwNsrYang', '1.0')
42 gi.require_version('RwTypes', '1.0')
43 gi.require_version('RwVlrYang', '1.0')
44 gi.require_version('RwVnfrYang', '1.0')
45 from gi.repository import (
46 RwYang,
47 RwNsrYang,
48 NsrYang,
49 NsdYang,
50 RwVlrYang,
51 VnfrYang,
52 RwVnfrYang,
53 RwNsmYang,
54 RwsdnalYang,
55 RwDts as rwdts,
56 RwTypes,
57 ProtobufC,
58 )
59
60 import rift.tasklets
61 import rift.mano.ncclient
62 import rift.mano.config_data.config
63 import rift.mano.dts as mano_dts
64
65 from . import rwnsm_conman as conman
66 from . import cloud
67 from . import publisher
68 from . import xpath
69 from . import config_value_pool
70 from . import rwvnffgmgr
71 from . import scale_group
72
73
74 class NetworkServiceRecordState(Enum):
75 """ Network Service Record State """
76 INIT = 101
77 VL_INIT_PHASE = 102
78 VNF_INIT_PHASE = 103
79 VNFFG_INIT_PHASE = 104
80 RUNNING = 106
81 SCALING_OUT = 107
82 SCALING_IN = 108
83 TERMINATE = 109
84 TERMINATE_RCVD = 110
85 VL_TERMINATE_PHASE = 111
86 VNF_TERMINATE_PHASE = 112
87 VNFFG_TERMINATE_PHASE = 113
88 TERMINATED = 114
89 FAILED = 115
90 VL_INSTANTIATE = 116
91 VL_TERMINATE = 117
92
93
94 class NetworkServiceRecordError(Exception):
95 """ Network Service Record Error """
96 pass
97
98
99 class NetworkServiceDescriptorError(Exception):
100 """ Network Service Descriptor Error """
101 pass
102
103
104 class VirtualNetworkFunctionRecordError(Exception):
105 """ Virtual Network Function Record Error """
106 pass
107
108
109 class NetworkServiceDescriptorNotFound(Exception):
110 """ Cannot find Network Service Descriptor"""
111 pass
112
113
114 class NetworkServiceDescriptorNotFound(Exception):
115 """ Network Service Descriptor reference count exists """
116 pass
117
118 class NsrInstantiationFailed(Exception):
119 """ Failed to instantiate network service """
120 pass
121
122
123 class VnfInstantiationFailed(Exception):
124 """ Failed to instantiate virtual network function"""
125 pass
126
127
128 class VnffgInstantiationFailed(Exception):
129 """ Failed to instantiate virtual network function"""
130 pass
131
132
133 class VnfDescriptorError(Exception):
134 """Failed to instantiate virtual network function"""
135 pass
136
137
138 class ScalingOperationError(Exception):
139 pass
140
141
142 class ScaleGroupMissingError(Exception):
143 pass
144
145
146 class PlacementGroupError(Exception):
147 pass
148
149
150 class NsrNsdUpdateError(Exception):
151 pass
152
153
154 class NsrVlUpdateError(NsrNsdUpdateError):
155 pass
156
157
158 class VlRecordState(Enum):
159 """ VL Record State """
160 INIT = 101
161 INSTANTIATION_PENDING = 102
162 ACTIVE = 103
163 TERMINATE_PENDING = 104
164 TERMINATED = 105
165 FAILED = 106
166
167
168 class VnffgRecordState(Enum):
169 """ VNFFG Record State """
170 INIT = 101
171 INSTANTIATION_PENDING = 102
172 ACTIVE = 103
173 TERMINATE_PENDING = 104
174 TERMINATED = 105
175 FAILED = 106
176
177
178 class VnffgRecord(object):
179 """ Vnffg Records class"""
180 SFF_DP_PORT = 4790
181 SFF_MGMT_PORT = 5000
182 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name):
183
184 self._dts = dts
185 self._log = log
186 self._loop = loop
187 self._vnffgmgr = vnffgmgr
188 self._nsr = nsr
189 self._nsr_name = nsr_name
190 self._vnffgd_msg = vnffgd_msg
191 if sdn_account_name is None:
192 self._sdn_account_name = ''
193 else:
194 self._sdn_account_name = sdn_account_name
195
196 self._vnffgr_id = str(uuid.uuid4())
197 self._vnffgr_rsp_id = list()
198 self._vnffgr_state = VnffgRecordState.INIT
199
200 @property
201 def id(self):
202 """ VNFFGR id """
203 return self._vnffgr_id
204
205 @property
206 def state(self):
207 """ state of this VNF """
208 return self._vnffgr_state
209
210 def fetch_vnffgr(self):
211 """
212 Get VNFFGR message to be published
213 """
214
215 if self._vnffgr_state == VnffgRecordState.INIT:
216 vnffgr_dict = {"id": self._vnffgr_id,
217 "vnffgd_id_ref": self._vnffgd_msg.id,
218 "vnffgd_name_ref": self._vnffgd_msg.name,
219 "sdn_account": self._sdn_account_name,
220 "operational_status": 'init',
221 }
222 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
223 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
224 vnffgr_dict = {"id": self._vnffgr_id,
225 "vnffgd_id_ref": self._vnffgd_msg.id,
226 "vnffgd_name_ref": self._vnffgd_msg.name,
227 "sdn_account": self._sdn_account_name,
228 "operational_status": 'terminated',
229 }
230 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
231 else:
232 try:
233 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
234 except Exception:
235 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
236 self._vnffgr_state = VnffgRecordState.FAILED
237 vnffgr_dict = {"id": self._vnffgr_id,
238 "vnffgd_id_ref": self._vnffgd_msg.id,
239 "vnffgd_name_ref": self._vnffgd_msg.name,
240 "sdn_account": self._sdn_account_name,
241 "operational_status": 'failed',
242 }
243 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
244
245 return vnffgr
246
247 @asyncio.coroutine
248 def vnffgr_create_msg(self):
249 """ Virtual Link Record message for Creating VLR in VNS """
250 vnffgr_dict = {"id": self._vnffgr_id,
251 "vnffgd_id_ref": self._vnffgd_msg.id,
252 "vnffgd_name_ref": self._vnffgd_msg.name,
253 "sdn_account": self._sdn_account_name,
254 }
255 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
256 for rsp in self._vnffgd_msg.rsp:
257 vnffgr_rsp = vnffgr.rsp.add()
258 vnffgr_rsp.id = str(uuid.uuid4())
259 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
260 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
261 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
262 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
263 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
264 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
265 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
266 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
267 self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
268 else:
269 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref)
270 continue
271
272 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
273 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
274 vnfr_cp_ref.hop_number = rsp_cp_ref.order
275 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
276 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
277 for nsr_vnfr in self._nsr.vnfrs.values():
278 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
279 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
280 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
281 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
282 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
283
284 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
285 self._log.debug(" Received VNFR is %s", vnfr)
286 while vnfr.operational_status != 'running':
287 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
288 if vnfr.operational_status == 'failed':
289 self._log.error("Fetching VNFR for %s failed", vnfr.id)
290 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
291 yield from asyncio.sleep(2, loop=self._loop)
292 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
293 self._log.debug("Received VNFR is %s", vnfr)
294
295 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
296 for cp in vnfr.connection_point:
297 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
298 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
299 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
300 for vdu in vnfr.vdur:
301 for ext_intf in vdu.external_interface:
302 if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref:
303 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
304 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
305 vnfr_cp_ref.connection_point_params.vm_id)
306 break
307
308 vnfr_cp_ref.connection_point_params.address = cp.ip_address
309 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
310
311 for vnffgd_classifier in self._vnffgd_msg.classifier:
312 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
313 if len(_rsp) > 0:
314 rsp_id_ref = _rsp[0].id
315 rsp_name = _rsp[0].name
316 else:
317 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
318 continue
319 vnffgr_classifier = vnffgr.classifier.add()
320 vnffgr_classifier.id = vnffgd_classifier.id
321 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
322 _rsp[0].classifier_name = vnffgr_classifier.name
323 vnffgr_classifier.rsp_id_ref = rsp_id_ref
324 vnffgr_classifier.rsp_name = rsp_name
325 for nsr_vnfr in self._nsr.vnfrs.values():
326 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
327 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
328 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
329 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
330 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
331
332 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
333 vnffgr_classifier.sff_name = nsr_vnfr.name
334
335 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
336 self._log.debug(" Received VNFR is %s", vnfr)
337 while vnfr.operational_status != 'running':
338 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
339 if vnfr.operational_status == 'failed':
340 self._log.error("Fetching VNFR for %s failed", vnfr.id)
341 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
342 yield from asyncio.sleep(2, loop=self._loop)
343 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
344 self._log.debug("Received VNFR is %s", vnfr)
345
346 for cp in vnfr.connection_point:
347 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
348 vnffgr_classifier.port_id = cp.connection_point_id
349 vnffgr_classifier.ip_address = cp.ip_address
350 for vdu in vnfr.vdur:
351 for ext_intf in vdu.external_interface:
352 if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref:
353 vnffgr_classifier.vm_id = vdu.vim_id
354 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
355 vnfr_cp_ref.connection_point_params.vm_id)
356 break
357
358 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
359 return vnffgr
360
361 @asyncio.coroutine
362 def vnffgr_nsr_sff_list(self):
363 """ SFF List for VNFR """
364 sff_list = {}
365 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
366
367 for nsr_vnfr in self._nsr.vnfrs.values():
368 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
369 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
370 self._log.debug(" Received VNFR is %s", vnfr)
371 while vnfr.operational_status != 'running':
372 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
373 if vnfr.operational_status == 'failed':
374 self._log.error("Fetching VNFR for %s failed", vnfr.id)
375 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
376 yield from asyncio.sleep(2, loop=self._loop)
377 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
378 self._log.debug("Received VNFR is %s", vnfr)
379
380 sff = RwsdnalYang.VNFFGSff()
381 sff_list[nsr_vnfr.vnfd.id] = sff
382 sff.name = nsr_vnfr.name
383 sff.function_type = nsr_vnfr.vnfd.service_function_chain
384
385 sff.mgmt_address = vnfr.mgmt_interface.ip_address
386 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
387 for cp in vnfr.connection_point:
388 sff_dp = sff.dp_endpoints.add()
389 sff_dp.name = self._nsr.name + '.' + cp.name
390 sff_dp.address = cp.ip_address
391 sff_dp.port = VnffgRecord.SFF_DP_PORT
392 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
393 for sf_name in sf_list:
394 _sf = sff.vnfr_list.add()
395 _sf.vnfr_name = sf_name
396
397 return sff_list
398
399 @asyncio.coroutine
400 def instantiate(self):
401 """ Instantiate this VNFFG """
402
403 self._log.info("Instaniating VNFFGR with vnffgd %s",
404 self._vnffgd_msg)
405
406
407 vnffgr_request = yield from self.vnffgr_create_msg()
408 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
409
410 try:
411 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
412 except Exception as e:
413 self._log.exception("VNFFG instantiation failed: %s", str(e))
414 self._vnffgr_state = VnffgRecordState.FAILED
415 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
416
417 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
418
419 self._log.info("Instantiated VNFFGR :%s", vnffgr)
420 self._vnffgr_state = VnffgRecordState.ACTIVE
421
422 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
423 yield from self._nsr.update_state()
424
425 def vnffgr_in_vnffgrm(self):
426 """ Is there a VNFR record in VNFM """
427 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
428 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
429 self._vnffgr_state == VnffgRecordState.FAILED):
430 return True
431
432 return False
433
434 @asyncio.coroutine
435 def terminate(self):
436 """ Terminate this VNFFGR """
437 if not self.vnffgr_in_vnffgrm():
438 self._log.error("Ignoring terminate request for id %s in state %s",
439 self.id, self._vnffgr_state)
440 return
441
442 self._log.info("Terminating VNFFGR id:%s", self.id)
443 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
444
445 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
446
447 self._vnffgr_state = VnffgRecordState.TERMINATED
448 self._log.debug("Terminated VNFFGR id:%s", self.id)
449
450
451 class VirtualLinkRecord(object):
452 """ Virtual Link Records class"""
453 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
454 @staticmethod
455 @asyncio.coroutine
456 def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False):
457 """Creates a new VLR object based on the given data.
458
459 If restart mode is enabled, then we look for existing records in the
460 DTS and create a VLR records using the exiting data(ID)
461
462 Returns:
463 VirtualLinkRecord
464 """
465 vlr_obj = VirtualLinkRecord(
466 dts,
467 log,
468 loop,
469 nsr_name,
470 vld_msg,
471 cloud_account_name,
472 om_datacenter,
473 ip_profile,
474 nsr_id,
475 )
476
477 if restart_mode:
478 res_iter = yield from dts.query_read(
479 "D,/vlr:vlr-catalog/vlr:vlr",
480 rwdts.XactFlag.MERGE)
481
482 for fut in res_iter:
483 response = yield from fut
484 vlr = response.result
485
486 # Check if the record is already present, if so use the ID of
487 # the existing record. Since the name of the record is uniquely
488 # formed we can use it as a search key!
489 if vlr.name == vlr_obj.name:
490 vlr_obj.reset_id(vlr.id)
491 break
492
493 return vlr_obj
494
495 def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id):
496 self._dts = dts
497 self._log = log
498 self._loop = loop
499 self._nsr_name = nsr_name
500 self._vld_msg = vld_msg
501 self._cloud_account_name = cloud_account_name
502 self._om_datacenter_name = om_datacenter
503 self._assigned_subnet = None
504 self._nsr_id = nsr_id
505 self._ip_profile = ip_profile
506 self._vlr_id = str(uuid.uuid4())
507 self._state = VlRecordState.INIT
508 self._prev_state = None
509 self._create_time = int(time.time())
510
511 @property
512 def xpath(self):
513 """ path for this object """
514 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id)
515
516 @property
517 def id(self):
518 """ VLR id """
519 return self._vlr_id
520
521 @property
522 def nsr_name(self):
523 """ Get NSR name for this VL """
524 return self.nsr_name
525
526 @property
527 def vld_msg(self):
528 """ Virtual Link Desciptor """
529 return self._vld_msg
530
531 @property
532 def assigned_subnet(self):
533 """ Subnet assigned to this VL"""
534 return self._assigned_subnet
535
536 @property
537 def name(self):
538 """
539 Get the name for this VLR.
540 VLR name is "nsr name:VLD name"
541 """
542 if self.vld_msg.vim_network_name:
543 return self.vld_msg.vim_network_name
544 elif self.vld_msg.name == "multisite":
545 # This is a temporary hack to identify manually provisioned inter-site network
546 return self.vld_msg.name
547 else:
548 return self._nsr_name + "." + self.vld_msg.name
549
550 @property
551 def cloud_account_name(self):
552 """ Cloud account that this VLR should be created in """
553 return self._cloud_account_name
554
555 @property
556 def om_datacenter_name(self):
557 """ Datacenter that this VLR should be created in """
558 return self._om_datacenter_name
559
560 @staticmethod
561 def vlr_xpath(vlr):
562 """ Get the VLR path from VLR """
563 return (VirtualLinkRecord.XPATH + "[vlr:id = '{}']").format(vlr.id)
564
565 @property
566 def state(self):
567 """ VLR state """
568 return self._state
569
570 @state.setter
571 def state(self, value):
572 """ VLR set state """
573 self._state = value
574
575 @property
576 def prev_state(self):
577 """ VLR previous state """
578 return self._prev_state
579
580 @prev_state.setter
581 def prev_state(self, value):
582 """ VLR set previous state """
583 self._prev_state = value
584
585 @property
586 def vlr_msg(self):
587 """ Virtual Link Record message for Creating VLR in VNS """
588 vld_fields = ["short_name",
589 "vendor",
590 "description",
591 "version",
592 "type_yang",
593 "vim_network_name",
594 "provider_network"]
595
596 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
597 if k in vld_fields}
598
599 vlr_dict = {"id": self._vlr_id,
600 "nsr_id_ref": self._nsr_id,
601 "vld_ref": self.vld_msg.id,
602 "name": self.name,
603 "create_time": self._create_time,
604 "cloud_account": self.cloud_account_name,
605 "om_datacenter": self.om_datacenter_name,
606 }
607
608 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
609 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
610
611 vlr_dict.update(vld_copy_dict)
612 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
613 return vlr
614
615 def reset_id(self, vlr_id):
616 self._vlr_id = vlr_id
617
618 def create_nsr_vlr_msg(self, vnfrs):
619 """ The VLR message"""
620 nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
621 nsr_vlr.vlr_ref = self._vlr_id
622 nsr_vlr.assigned_subnet = self.assigned_subnet
623 nsr_vlr.cloud_account = self.cloud_account_name
624 nsr_vlr.om_datacenter = self.om_datacenter_name
625
626 for conn in self.vld_msg.vnfd_connection_point_ref:
627 for vnfr in vnfrs:
628 if (vnfr.vnfd.id == conn.vnfd_id_ref and
629 vnfr.member_vnf_index == conn.member_vnf_index_ref and
630 self.cloud_account_name == vnfr.cloud_account_name and
631 self.om_datacenter_name == vnfr.om_datacenter_name):
632 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
633 cp_entry.vnfr_id = vnfr.id
634 cp_entry.connection_point = conn.vnfd_connection_point_ref
635
636 return nsr_vlr
637
638 @asyncio.coroutine
639 def instantiate(self):
640 """ Instantiate this VL """
641 self._log.debug("Instaniating VLR key %s, vld %s",
642 self.xpath, self._vld_msg)
643 vlr = None
644 self._state = VlRecordState.INSTANTIATION_PENDING
645 self._log.debug("Executing VL create path:%s msg:%s",
646 self.xpath, self.vlr_msg)
647
648 with self._dts.transaction(flags=0) as xact:
649 block = xact.block_create()
650 block.add_query_create(self.xpath, self.vlr_msg)
651 self._log.debug("Executing VL create path:%s msg:%s",
652 self.xpath, self.vlr_msg)
653 res_iter = yield from block.execute(now=True)
654 for ent in res_iter:
655 res = yield from ent
656 vlr = res.result
657
658 if vlr is None:
659 self._state = VlRecordState.FAILED
660 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
661
662 if vlr.operational_status == 'failed':
663 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
664 self._state = VlRecordState.FAILED
665 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
666
667 self._log.info("Instantiated VL with xpath %s and vlr:%s",
668 self.xpath, vlr)
669 self._state = VlRecordState.ACTIVE
670 self._assigned_subnet = vlr.assigned_subnet
671
672 def vlr_in_vns(self):
673 """ Is there a VLR record in VNS """
674 if (self._state == VlRecordState.ACTIVE or
675 self._state == VlRecordState.INSTANTIATION_PENDING or
676 self._state == VlRecordState.TERMINATE_PENDING or
677 self._state == VlRecordState.FAILED):
678 return True
679
680 return False
681
682 @asyncio.coroutine
683 def terminate(self):
684 """ Terminate this VL """
685 if not self.vlr_in_vns():
686 self._log.debug("Ignoring terminate request for id %s in state %s",
687 self.id, self._state)
688 return
689
690 self._log.debug("Terminating VL id:%s", self.id)
691 self._state = VlRecordState.TERMINATE_PENDING
692
693 with self._dts.transaction(flags=0) as xact:
694 block = xact.block_create()
695 block.add_query_delete(self.xpath)
696 yield from block.execute(flags=0, now=True)
697
698 self._state = VlRecordState.TERMINATED
699 self._log.debug("Terminated VL id:%s", self.id)
700
701
702 class VnfRecordState(Enum):
703 """ Vnf Record State """
704 INIT = 101
705 INSTANTIATION_PENDING = 102
706 ACTIVE = 103
707 TERMINATE_PENDING = 104
708 TERMINATED = 105
709 FAILED = 106
710
711
712 class VirtualNetworkFunctionRecord(object):
713 """ Virtual Network Function Record class"""
714 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
715
716 @staticmethod
717 @asyncio.coroutine
718 def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name,
719 cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id,
720 placement_groups, restart_mode=False):
721 """Creates a new VNFR object based on the given data.
722
723 If restart mode is enabled, then we look for existing records in the
724 DTS and create a VNFR records using the exiting data(ID)
725
726 Returns:
727 VirtualNetworkFunctionRecord
728 """
729 vnfr_obj = VirtualNetworkFunctionRecord(
730 dts,
731 log,
732 loop,
733 vnfd,
734 const_vnfd_msg,
735 nsd_id,
736 nsr_name,
737 cloud_account_name,
738 om_datacenter_name,
739 nsr_id,
740 group_name,
741 group_instance_id,
742 placement_groups,
743 restart_mode=restart_mode)
744
745 if restart_mode:
746 res_iter = yield from dts.query_read(
747 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
748 rwdts.XactFlag.MERGE)
749
750 for fut in res_iter:
751 response = yield from fut
752 vnfr = response.result
753
754 if vnfr.name == vnfr_obj.name:
755 vnfr_obj.reset_id(vnfr.id)
756 break
757
758 return vnfr_obj
759
760 def __init__(self,
761 dts,
762 log,
763 loop,
764 vnfd,
765 const_vnfd_msg,
766 nsd_id,
767 nsr_name,
768 cloud_account_name,
769 om_datacenter_name,
770 nsr_id,
771 group_name=None,
772 group_instance_id=None,
773 placement_groups = [],
774 restart_mode = False):
775 self._dts = dts
776 self._log = log
777 self._loop = loop
778 self._vnfd = vnfd
779 self._const_vnfd_msg = const_vnfd_msg
780 self._nsd_id = nsd_id
781 self._nsr_name = nsr_name
782 self._nsr_id = nsr_id
783 self._cloud_account_name = cloud_account_name
784 self._om_datacenter_name = om_datacenter_name
785 self._group_name = group_name
786 self._group_instance_id = group_instance_id
787 self._placement_groups = placement_groups
788 self._config_status = NsrYang.ConfigStates.INIT
789 self._create_time = int(time.time())
790
791 self._prev_state = VnfRecordState.INIT
792 self._state = VnfRecordState.INIT
793 self._state_failed_reason = None
794
795 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
796 self.configure()
797
798 self._vnfr_id = str(uuid.uuid4())
799 self._name = None
800 self._vnfr_msg = self.create_vnfr_msg()
801 self._log.debug("Set VNFR {} config type to {}".
802 format(self.name, self.config_type))
803 self.restart_mode = restart_mode
804
805
806 if group_name is None and group_instance_id is not None:
807 raise ValueError("Group instance id must not be provided with an empty group name")
808
809 @property
810 def id(self):
811 """ VNFR id """
812 return self._vnfr_id
813
814 @property
815 def xpath(self):
816 """ VNFR xpath """
817 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id)
818
819 @property
820 def vnfr_msg(self):
821 """ VNFR message """
822 return self._vnfr_msg
823
824 @property
825 def const_vnfr_msg(self):
826 """ VNFR message """
827 return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name)
828
829 @property
830 def vnfd(self):
831 """ vnfd """
832 return self._vnfd
833
834 @property
835 def cloud_account_name(self):
836 """ Cloud account that this VNF should be created in """
837 return self._cloud_account_name
838
839 @property
840 def om_datacenter_name(self):
841 """ Datacenter that this VNF should be created in """
842 return self._om_datacenter_name
843
844
845 @property
846 def active(self):
847 """ Is this VNF actve """
848 return True if self._state == VnfRecordState.ACTIVE else False
849
850 @property
851 def state(self):
852 """ state of this VNF """
853 return self._state
854
855 @property
856 def state_failed_reason(self):
857 """ Error message in case this VNF is in failed state """
858 return self._state_failed_reason
859
860 @property
861 def member_vnf_index(self):
862 """ Member VNF index """
863 return self._const_vnfd_msg.member_vnf_index
864
865 @property
866 def nsr_name(self):
867 """ NSR name"""
868 return self._nsr_name
869
870 @property
871 def name(self):
872 """ Name of this VNFR """
873 if self._name is not None:
874 return self._name
875
876 name_tags = [self._nsr_name]
877
878 if self._group_name is not None:
879 name_tags.append(self._group_name)
880
881 if self._group_instance_id is not None:
882 name_tags.append(str(self._group_instance_id))
883
884 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
885
886 self._name = "__".join(name_tags)
887
888 return self._name
889
890 @staticmethod
891 def vnfr_xpath(vnfr):
892 """ Get the VNFR path from VNFR """
893 return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id)
894
895 @property
896 def config_type(self):
897 cfg_types = ['netconf', 'juju', 'script']
898 for method in cfg_types:
899 if self._vnfd.vnf_configuration.has_field(method):
900 return method
901 return 'none'
902
903 @property
904 def config_status(self):
905 """Return the config status as YANG ENUM string"""
906 self._log.debug("Map VNFR {} config status {} ({})".
907 format(self.name, self._config_status, self.config_type))
908 if self.config_type == 'none':
909 return 'config_not_needed'
910 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
911 return 'configured'
912 elif self._config_status == NsrYang.ConfigStates.FAILED:
913 return 'failed'
914
915 return 'configuring'
916
917 def set_state(self, state):
918 """ set the state of this object """
919 self._prev_state = self._state
920 self._state = state
921
922 def reset_id(self, vnfr_id):
923 self._vnfr_id = vnfr_id
924 self._vnfr_msg = self.create_vnfr_msg()
925
926 def configure(self):
927 self.config_store.merge_vnfd_config(
928 self._nsd_id,
929 self._vnfd,
930 self.member_vnf_index,
931 )
932
933 def create_vnfr_msg(self):
934 """ VNFR message for this VNFR """
935 vnfd_fields = [
936 "short_name",
937 "vendor",
938 "description",
939 "version",
940 "type_yang",
941 ]
942 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
943 vnfr_dict = {
944 "id": self.id,
945 "nsr_id_ref": self._nsr_id,
946 "name": self.name,
947 "cloud_account": self._cloud_account_name,
948 "om_datacenter": self._om_datacenter_name,
949 "config_status": self.config_status
950 }
951 vnfr_dict.update(vnfd_copy_dict)
952
953 vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
954 vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(),
955 ignore_missing_keys=True)
956 vnfr.member_vnf_index_ref = self.member_vnf_index
957 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
958
959 if self._vnfd.mgmt_interface.has_field("port"):
960 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
961
962 for group_info in self._placement_groups:
963 group = vnfr.placement_groups_info.add()
964 group.from_dict(group_info.as_dict())
965
966 # UI expects the monitoring param field to exist
967 vnfr.monitoring_param = []
968
969 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
970 return vnfr
971
972 @asyncio.coroutine
973 def update_vnfm(self):
974 self._log.debug("Send an update to VNFM for VNFR {} with {}".
975 format(self.name, self.vnfr_msg))
976 yield from self._dts.query_update(
977 self.xpath,
978 rwdts.XactFlag.TRACE,
979 self.vnfr_msg
980 )
981
982 def get_config_status(self):
983 """Return the config status as YANG ENUM"""
984 return self._config_status
985
986 @asyncio.coroutine
987 def set_config_status(self, status):
988
989 def status_to_string(status):
990 status_dc = {
991 NsrYang.ConfigStates.INIT : 'init',
992 NsrYang.ConfigStates.CONFIGURING : 'configuring',
993 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
994 NsrYang.ConfigStates.CONFIGURED : 'configured',
995 NsrYang.ConfigStates.FAILED : 'failed',
996 }
997
998 return status_dc[status]
999
1000 self._log.debug("Update VNFR {} from {} ({}) to {}".
1001 format(self.name, self._config_status,
1002 self.config_type, status))
1003 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1004 self._log.error("Updating already configured VNFR {}".
1005 format(self.name))
1006 return
1007
1008 if self._config_status != status:
1009 try:
1010 self._config_status = status
1011 # I don't think this is used. Original implementor can check.
1012 # Caused Exception, so corrected it by status_to_string
1013 # But not sure whats the use of this variable?
1014 self.vnfr_msg.config_status = status_to_string(status)
1015 except Exception as e:
1016 self._log.error("Exception=%s", str(e))
1017 pass
1018
1019 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1020
1021 if self._config_status != NsrYang.ConfigStates.INIT:
1022 try:
1023 # Publish only after VNFM has the VNFR created
1024 yield from self.update_vnfm()
1025 except Exception as e:
1026 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1027 format(status, self.name, e))
1028 self._log.exception(e)
1029
1030 def is_configured(self):
1031 if self.config_type == 'none':
1032 return True
1033
1034 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1035 return True
1036
1037 return False
1038
1039 @asyncio.coroutine
1040 def instantiate(self, nsr):
1041 """ Instantiate this VNFR"""
1042
1043 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1044 self.xpath, self._vnfd)
1045
1046 self._log.debug("Create VNF with xpath %s and vnfr %s",
1047 self.xpath, self.vnfr_msg)
1048
1049 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1050
1051 def find_vlr_for_cp(conn):
1052 """ Find VLR for the given connection point """
1053 for vlr in nsr.vlrs:
1054 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1055 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1056 vnfd_cp.vnfd_connection_point_ref == conn.name and
1057 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1058 vlr.cloud_account_name == self.cloud_account_name):
1059 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1060 conn.name, self.member_vnf_index)
1061 return vlr
1062 return None
1063
1064 # For every connection point in the VNFD fill in the identifier
1065 for conn_p in self._vnfd.connection_point:
1066 cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1067 cpr.name = conn_p.name
1068 cpr.type_yang = conn_p.type_yang
1069 if conn_p.has_field('port_security_enabled'):
1070 cpr.port_security_enabled = conn_p.port_security_enabled
1071
1072 vlr_ref = find_vlr_for_cp(conn_p)
1073 if vlr_ref is None:
1074 msg = "Failed to find VLR for cp = %s" % conn_p.name
1075 self._log.debug("%s", msg)
1076 # raise VirtualNetworkFunctionRecordError(msg)
1077 continue
1078
1079 cpr.vlr_ref = vlr_ref.id
1080 self.vnfr_msg.connection_point.append(cpr)
1081 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1082 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id)
1083
1084 if not self.restart_mode:
1085 yield from self._dts.query_create(self.xpath,
1086 0, # this is sub
1087 self.vnfr_msg)
1088 else:
1089 yield from self._dts.query_update(self.xpath,
1090 0,
1091 self.vnfr_msg)
1092
1093 self._log.info("Created VNF with xpath %s and vnfr %s",
1094 self.xpath, self.vnfr_msg)
1095
1096 self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1097 self.xpath, self._vnfd, self.vnfr_msg)
1098
1099 @asyncio.coroutine
1100 def update_state(self, vnfr_msg):
1101 """ Update this VNFR"""
1102 if vnfr_msg.operational_status == "running":
1103 if self.vnfr_msg.operational_status != "running":
1104 yield from self.is_active()
1105 elif vnfr_msg.operational_status == "failed":
1106 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1107
1108 @asyncio.coroutine
1109 def is_active(self):
1110 """ This VNFR is active """
1111 self._log.debug("VNFR %s is active", self._vnfr_id)
1112 self.set_state(VnfRecordState.ACTIVE)
1113
1114 @asyncio.coroutine
1115 def instantiation_failed(self, failed_reason=None):
1116 """ This VNFR instantiation failed"""
1117 self._log.error("VNFR %s instantiation failed", self._vnfr_id)
1118 self.set_state(VnfRecordState.FAILED)
1119 self._state_failed_reason = failed_reason
1120
1121 def vnfr_in_vnfm(self):
1122 """ Is there a VNFR record in VNFM """
1123 if (self._state == VnfRecordState.ACTIVE or
1124 self._state == VnfRecordState.INSTANTIATION_PENDING or
1125 self._state == VnfRecordState.FAILED):
1126 return True
1127
1128 return False
1129
1130 @asyncio.coroutine
1131 def terminate(self):
1132 """ Terminate this VNF """
1133 if not self.vnfr_in_vnfm():
1134 self._log.debug("Ignoring terminate request for id %s in state %s",
1135 self.id, self._state)
1136 return
1137
1138 self._log.debug("Terminating VNF id:%s", self.id)
1139 self.set_state(VnfRecordState.TERMINATE_PENDING)
1140 with self._dts.transaction(flags=0) as xact:
1141 block = xact.block_create()
1142 block.add_query_delete(self.xpath)
1143 yield from block.execute(flags=0)
1144 self.set_state(VnfRecordState.TERMINATED)
1145 self._log.debug("Terminated VNF id:%s", self.id)
1146
1147
1148 class NetworkServiceStatus(object):
1149 """ A class representing the Network service's status """
1150 MAX_EVENTS_RECORDED = 10
1151 """ Network service Status class"""
1152 def __init__(self, dts, log, loop):
1153 self._dts = dts
1154 self._log = log
1155 self._loop = loop
1156
1157 self._state = NetworkServiceRecordState.INIT
1158 self._events = deque([])
1159
1160 @asyncio.coroutine
1161 def create_notification(self, evt, evt_desc, evt_details):
1162 xp = "N,/rw-nsr:nsm-notification"
1163 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1164 notif.event = evt
1165 notif.description = evt_desc
1166 notif.details = evt_details if evt_details is not None else None
1167
1168 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1169 self._log.info("Notification called by creating dts query: %s", notif)
1170
1171 def record_event(self, evt, evt_desc, evt_details):
1172 """ Record an event """
1173 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1174 evt, evt_desc, len(self._events))
1175 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1176 self._events.popleft()
1177 self._events.append((int(time.time()), evt, evt_desc,
1178 evt_details if evt_details is not None else None))
1179
1180 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1181
1182 def set_state(self, state):
1183 """ set the state of this status object """
1184 self._state = state
1185
1186 def yang_str(self):
1187 """ Return the state as a yang enum string """
1188 state_to_str_map = {"INIT": "init",
1189 "VL_INIT_PHASE": "vl_init_phase",
1190 "VNF_INIT_PHASE": "vnf_init_phase",
1191 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1192 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1193 "RUNNING": "running",
1194 "SCALING_OUT": "scaling_out",
1195 "SCALING_IN": "scaling_in",
1196 "TERMINATE_RCVD": "terminate_rcvd",
1197 "TERMINATE": "terminate",
1198 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1199 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1200 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1201 "TERMINATED": "terminated",
1202 "FAILED": "failed",
1203 "VL_INSTANTIATE": "vl_instantiate",
1204 "VL_TERMINATE": "vl_terminate",
1205 }
1206 return state_to_str_map[self._state.name]
1207
1208 @property
1209 def state(self):
1210 """ State of this status object """
1211 return self._state
1212
1213 @property
1214 def msg(self):
1215 """ Network Service Record as a message"""
1216 event_list = []
1217 idx = 1
1218 for entry in self._events:
1219 event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1220 event.id = idx
1221 idx += 1
1222 event.timestamp, event.event, event.description, event.details = entry
1223 event_list.append(event)
1224 return event_list
1225
1226
1227 class NetworkServiceRecord(object):
1228 """ Network service record """
1229 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1230
1231 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False,
1232 vlr_handler=None):
1233 self._dts = dts
1234 self._log = log
1235 self._loop = loop
1236 self._nsm = nsm
1237 self._nsr_cfg_msg = nsr_cfg_msg
1238 self._nsm_plugin = nsm_plugin
1239 self._sdn_account_name = sdn_account_name
1240 self._vlr_handler = vlr_handler
1241
1242 self._nsd = None
1243 self._nsr_msg = None
1244 self._nsr_regh = None
1245 self._key_pairs = key_pairs
1246 self._vlrs = []
1247 self._vnfrs = {}
1248 self._vnfds = {}
1249 self._vnffgrs = {}
1250 self._param_pools = {}
1251 self._scaling_groups = {}
1252 self._create_time = int(time.time())
1253 self._op_status = NetworkServiceStatus(dts, log, loop)
1254 self._config_status = NsrYang.ConfigStates.CONFIGURING
1255 self._config_status_details = None
1256 self._job_id = 0
1257 self.restart_mode = restart_mode
1258 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1259 self._debug_running = False
1260 self._is_active = False
1261 self._vl_phase_completed = False
1262 self._vnf_phase_completed = False
1263
1264 # Initalise the state to init
1265 # The NSR moves through the following transitions
1266 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1267 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1268 # 3. VNFS_READY - READY when the NSR is published
1269
1270 self.set_state(NetworkServiceRecordState.INIT)
1271
1272 self.substitute_input_parameters = InputParameterSubstitution(self._log)
1273
1274 @property
1275 def nsm_plugin(self):
1276 """ NSM Plugin """
1277 return self._nsm_plugin
1278
1279 def set_state(self, state):
1280 """ Set state for this NSR"""
1281 self._log.debug("Setting state to %s", state)
1282 # We are in init phase and is moving to the next state
1283 # The new state could be a FAILED state or VNF_INIIT_PHASE
1284 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1285 self._vl_phase_completed = True
1286
1287 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1288 self._vnf_phase_completed = True
1289
1290 self._op_status.set_state(state)
1291 self._nsm_plugin.set_state(self.id, state)
1292
1293 @property
1294 def id(self):
1295 """ Get id for this NSR"""
1296 return self._nsr_cfg_msg.id
1297
1298 @property
1299 def name(self):
1300 """ Name of this network service record """
1301 return self._nsr_cfg_msg.name
1302
1303 @property
1304 def cloud_account_name(self):
1305 return self._nsr_cfg_msg.cloud_account
1306
1307 @property
1308 def om_datacenter_name(self):
1309 if self._nsr_cfg_msg.has_field('om_datacenter'):
1310 return self._nsr_cfg_msg.om_datacenter
1311 return None
1312
1313 @property
1314 def state(self):
1315 """State of this NetworkServiceRecord"""
1316 return self._op_status.state
1317
1318 @property
1319 def active(self):
1320 """ Is this NSR active ?"""
1321 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1322
1323 @property
1324 def vlrs(self):
1325 """ VLRs associated with this NSR"""
1326 return self._vlrs
1327
1328 @property
1329 def vnfrs(self):
1330 """ VNFRs associated with this NSR"""
1331 return self._vnfrs
1332
1333 @property
1334 def vnffgrs(self):
1335 """ VNFFGRs associated with this NSR"""
1336 return self._vnffgrs
1337
1338 @property
1339 def scaling_groups(self):
1340 """ Scaling groups associated with this NSR """
1341 return self._scaling_groups
1342
1343 @property
1344 def param_pools(self):
1345 """ Parameter value pools associated with this NSR"""
1346 return self._param_pools
1347
1348 @property
1349 def nsr_cfg_msg(self):
1350 return self._nsr_cfg_msg
1351
1352 @nsr_cfg_msg.setter
1353 def nsr_cfg_msg(self, msg):
1354 self._nsr_cfg_msg = msg
1355
1356 @property
1357 def nsd_msg(self):
1358 """ NSD Protobuf for this NSR """
1359 if self._nsd is not None:
1360 return self._nsd
1361 self._nsd = self._nsr_cfg_msg.nsd
1362 return self._nsd
1363
1364 @property
1365 def nsd_id(self):
1366 """ NSD ID for this NSR """
1367 return self.nsd_msg.id
1368
1369 @property
1370 def job_id(self):
1371 ''' Get a new job id for config primitive'''
1372 self._job_id += 1
1373 return self._job_id
1374
1375 @property
1376 def config_status(self):
1377 """ Config status for NSR """
1378 return self._config_status
1379
1380 def resolve_placement_group_cloud_construct(self, input_group):
1381 """
1382 Returns the cloud specific construct for placement group
1383 """
1384 copy_dict = ['name', 'requirement', 'strategy']
1385
1386 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1387 if group_info.placement_group_ref == input_group.name:
1388 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1389 group_dict = {k:v for k,v in
1390 group_info.as_dict().items() if k != 'placement_group_ref'}
1391 for param in copy_dict:
1392 group_dict.update({param: getattr(input_group, param)})
1393 group.from_dict(group_dict)
1394 return group
1395 return None
1396
1397
1398 def __str__(self):
1399 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1400 self.name, self.nsd_id, self.cloud_account_name
1401 )
1402
1403 def _get_vnfd(self, vnfd_id, config_xact):
1404 """ Fetch vnfd msg for the passed vnfd id """
1405 return self._nsm.get_vnfd(vnfd_id, config_xact)
1406
1407 def _get_vnfd_cloud_account(self, vnfd_member_index):
1408 """ Fetch Cloud Account for the passed vnfd id """
1409 if self._nsr_cfg_msg.vnf_cloud_account_map:
1410 vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \
1411 if vnfd_member_index == vnf.member_vnf_index_ref]
1412 if vim_accounts and vim_accounts[0]:
1413 return vim_accounts[0]
1414 return (self.cloud_account_name,self.om_datacenter_name)
1415
1416 def _get_constituent_vnfd_msg(self, vnf_index):
1417 for const_vnfd in self.nsd_msg.constituent_vnfd:
1418 if const_vnfd.member_vnf_index == vnf_index:
1419 return const_vnfd
1420
1421 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1422
1423 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1424 """ Record an event """
1425 self._op_status.record_event(evt, evt_desc, evt_details)
1426 if state is not None:
1427 self.set_state(state)
1428
1429 def scaling_trigger_str(self, trigger):
1430 SCALING_TRIGGER_STRS = {
1431 NsdYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1432 NsdYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1433 NsdYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1434 NsdYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1435 }
1436 try:
1437 return SCALING_TRIGGER_STRS[trigger]
1438 except Exception as e:
1439 self._log.error("Scaling trigger mapping error for {} : {}".
1440 format(trigger, e))
1441 self._log.exception(e)
1442 return "Unknown trigger"
1443
1444 @asyncio.coroutine
1445 def instantiate_vls(self):
1446 """
1447 This function instantiates VLs for every VL in this Network Service
1448 """
1449 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1450 self.id)
1451 for vlr in self._vlrs:
1452 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1453 vlr.state = VlRecordState.ACTIVE
1454
1455
1456 @asyncio.coroutine
1457 def create(self, config_xact):
1458 """ Create this network service"""
1459 # Create virtual links for all the external vnf
1460 # connection points in this NS
1461 yield from self.create_vls()
1462
1463 # Create VNFs in this network service
1464 yield from self.create_vnfs(config_xact)
1465
1466 # Create VNFFG for network service
1467 self.create_vnffgs()
1468
1469 # Create Scaling Groups for each scaling group in NSD
1470 self.create_scaling_groups()
1471
1472 # Create Parameter Pools
1473 self.create_param_pools()
1474
1475 @asyncio.coroutine
1476 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1477 """ Apply config based on script for scale group """
1478
1479 @asyncio.coroutine
1480 def add_vnfrs_data(vnfrs_list):
1481 """ Add as a dict each of the VNFRs data """
1482 vnfrs_data = []
1483 for vnfr in vnfrs_list:
1484 self._log.debug("Add VNFR {} data".format(vnfr))
1485 vnfr_data = dict()
1486 vnfr_data['name'] = vnfr.name
1487 if trigger in [NsdYang.ScalingTrigger.PRE_SCALE_IN, NsdYang.ScalingTrigger.POST_SCALE_OUT]:
1488 # Get VNF management and other IPs, etc
1489 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1490 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1491 try:
1492 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1493 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1494 except Exception as e:
1495 self._log.error("Unable to get management IP for vnfr {}:{}".
1496 format(vnfr.name, e))
1497
1498 try:
1499 vnfr_data['connection_points'] = []
1500 for cp in opdata.connection_point:
1501 con_pt = dict()
1502 con_pt['name'] = cp.name
1503 con_pt['ip_address'] = cp.ip_address
1504 vnfr_data['connection_points'].append(con_pt)
1505 except Exception as e:
1506 self._log.error("Exception getting connections points for VNFR {}: {}".
1507 format(vnfr.name, e))
1508
1509 vnfrs_data.append(vnfr_data)
1510 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1511
1512 return vnfrs_data
1513
1514 def add_nsr_data(nsr):
1515 nsr_data = dict()
1516 nsr_data['name'] = nsr.name
1517 return nsr_data
1518
1519 if script is None or len(script) == 0:
1520 self._log.error("Script not provided for scale group config: {}".format(group.name))
1521 return False
1522
1523 if script[0] == '/':
1524 path = script
1525 else:
1526 path = os.path.join(os.environ['RIFT_INSTALL'], "usr/bin", script)
1527 if not os.path.exists(path):
1528 self._log.error("Config faled for scale group {}: Script does not exist at {}".
1529 format(group.name, path))
1530 return False
1531
1532 # Build a YAML file with all parameters for the script to execute
1533 # The data consists of 5 sections
1534 # 1. Trigger
1535 # 2. Scale group config
1536 # 3. VNFRs in the scale group
1537 # 4. VNFRs outside scale group
1538 # 5. NSR data
1539 data = dict()
1540 data['trigger'] = group.trigger_map(trigger)
1541 data['config'] = group.group_msg.as_dict()
1542
1543 if vnfrs:
1544 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1545 else:
1546 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1547
1548 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1549 data["nsr"] = add_nsr_data(self)
1550
1551 tmp_file = None
1552 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1553 tmp_file.write(yaml.dump(data, default_flow_style=True)
1554 .encode("UTF-8"))
1555
1556 self._log.debug("Creating a temp file: {} with input data: {}".
1557 format(tmp_file.name, data))
1558
1559 cmd = "{} {}".format(path, tmp_file.name)
1560 self._log.debug("Running the CMD: {}".format(cmd))
1561 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1562 rc = yield from proc.wait()
1563 if rc:
1564 self._log.error("The script {} for scale group {} config returned: {}".
1565 format(script, group.name, rc))
1566 return False
1567
1568 # Success
1569 return True
1570
1571
1572 @asyncio.coroutine
1573 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1574 """ Apply the config for the scaling group based on trigger """
1575 if group is None or scale_instance is None:
1576 return False
1577
1578 @asyncio.coroutine
1579 def update_config_status(success=True, err_msg=None):
1580 self._log.debug("Update %s config status to %r : %s",
1581 scale_instance, success, err_msg)
1582 if (scale_instance.config_status == "failed"):
1583 # Do not update the config status if it is already in failed state
1584 return
1585
1586 if scale_instance.config_status == "configured":
1587 # Update only to failed state an already configured scale instance
1588 if not success:
1589 scale_instance.config_status = "failed"
1590 scale_instance.config_err_msg = err_msg
1591 yield from self.update_state()
1592 else:
1593 # We are in configuring state
1594 # Only after post scale out mark instance as configured
1595 if trigger == NsdYang.ScalingTrigger.POST_SCALE_OUT:
1596 if success:
1597 scale_instance.config_status = "configured"
1598 else:
1599 scale_instance.config_status = "failed"
1600 scale_instance.config_err_msg = err_msg
1601 yield from self.update_state()
1602
1603 config = group.trigger_config(trigger)
1604 if config is None:
1605 return True
1606
1607 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1608 if config.has_field("ns_config_primitive_name_ref"):
1609 config_name = config.ns_config_primitive_name_ref
1610 nsd_msg = self.nsd_msg
1611 config_primitive = None
1612 for ns_cfg_prim in nsd_msg.service_primitive:
1613 if ns_cfg_prim.name == config_name:
1614 config_primitive = ns_cfg_prim
1615 break
1616
1617 if config_primitive is None:
1618 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1619
1620 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1621 if config_primitive.has_field("user_defined_script"):
1622 rc = yield from self.apply_scale_group_config_script(config_primitive.user_defined_script,
1623 group, scale_instance, trigger, vnfrs)
1624 err_msg = None
1625 if not rc:
1626 err_msg = "Failed config for trigger {} using config script '{}'". \
1627 format(self.scaling_trigger_str(trigger),
1628 config_primitive.user_defined_script)
1629 yield from update_config_status(success=rc, err_msg=err_msg)
1630 return rc
1631 else:
1632 err_msg = "Failed config for trigger {} as config script is not specified". \
1633 format(self.scaling_trigger_str(trigger))
1634 yield from update_config_status(success=False, err_msg=err_msg)
1635 raise NotImplementedError("Only script based config support for scale group for now: {}".
1636 format(group.name))
1637 else:
1638 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1639 format(self.scaling_trigger_str(trigger))
1640 yield from update_config_status(success=False, err_msg=err_msg)
1641 self._log.error("Config primitive not specified for config action in scale group %s" %
1642 (group.name))
1643 return False
1644
1645 def create_scaling_groups(self):
1646 """ This function creates a NSScalingGroup for every scaling
1647 group defined in he NSD"""
1648
1649 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1650 self._log.debug("Found scaling_group %s in nsr id %s",
1651 scaling_group_msg.name, self.id)
1652
1653 group_record = scale_group.ScalingGroup(
1654 self._log,
1655 scaling_group_msg
1656 )
1657
1658 self._scaling_groups[group_record.name] = group_record
1659
1660 @asyncio.coroutine
1661 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1662 group = self._scaling_groups[group_name]
1663 scale_instance = group.create_instance(index, is_default)
1664
1665 @asyncio.coroutine
1666 def create_vnfs():
1667 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1668 len(self.nsd_msg.constituent_vnfd), self.id, self)
1669
1670 vnfrs = []
1671 for vnf_index, count in group.vnf_index_count_map.items():
1672 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1673 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1674
1675 cloud_account_name, om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd_msg.member_vnf_index)
1676 if cloud_account_name is None:
1677 cloud_account_name = self.cloud_account_name
1678 for _ in range(count):
1679 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index)
1680 scale_instance.add_vnfr(vnfr)
1681 vnfrs.append(vnfr)
1682 return vnfrs
1683
1684 @asyncio.coroutine
1685 def instantiate_instance():
1686 self._log.debug("Creating %s VNFRS", scale_instance)
1687 vnfrs = yield from create_vnfs()
1688 yield from self.publish()
1689
1690 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1691 scale_instance.operational_status = "vnf_init_phase"
1692 yield from self.update_state()
1693
1694 try:
1695 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_OUT,
1696 group, scale_instance, vnfrs)
1697 if not rc:
1698 self._log.error("Pre scale out config for scale group {} ({}) failed".
1699 format(group.name, index))
1700 scale_instance.operational_status = "failed"
1701 else:
1702 yield from self.instantiate_vnfs(vnfrs, scaleout=True)
1703
1704
1705 except Exception as e:
1706 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1707 format(group.name, e))
1708 self._log.exception(e)
1709 scale_instance.operational_status = "failed"
1710
1711 yield from self.update_state()
1712
1713 yield from instantiate_instance()
1714
1715 @asyncio.coroutine
1716 def delete_scale_group_instance(self, group_name, index):
1717 group = self._scaling_groups[group_name]
1718 scale_instance = group.get_instance(index)
1719 if scale_instance.is_default:
1720 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1721
1722 scale_instance.operational_status = "terminate"
1723 yield from self.update_state()
1724
1725 @asyncio.coroutine
1726 def terminate_instance():
1727 self._log.debug("Terminating %s VNFRS" % scale_instance)
1728 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_IN,
1729 group, scale_instance)
1730 if not rc:
1731 self._log.error("Pre scale in config for scale group {} ({}) failed".
1732 format(group.name, index))
1733
1734 # Going ahead with terminate, even if there is an error in pre-scale-in config
1735 # as this could be result of scale out failure and we need to cleanup this group
1736 yield from self.terminate_vnfrs(scale_instance.vnfrs, scalein=True)
1737 group.delete_instance(index)
1738
1739 scale_instance.operational_status = "vnf_terminate_phase"
1740 yield from self.update_state()
1741
1742 yield from terminate_instance()
1743
1744 @asyncio.coroutine
1745 def _update_scale_group_instances_status(self):
1746 @asyncio.coroutine
1747 def post_scale_out_task(group, instance):
1748 # Apply post scale out config once all VNFRs are active
1749 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_OUT,
1750 group, instance)
1751 instance.operational_status = "running"
1752 if rc:
1753 self._log.debug("Scale out for group {} and instance {} succeeded".
1754 format(group.name, instance.instance_id))
1755 else:
1756 self._log.error("Post scale out config for scale group {} ({}) failed".
1757 format(group.name, instance.instance_id))
1758
1759 yield from self.update_state()
1760
1761 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1762 for group, instances in group_instances.items():
1763 self._log.debug("Updating %s instance status", group)
1764 for instance in instances:
1765 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
1766 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
1767 if instance.operational_status == "vnf_init_phase":
1768 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
1769 instance.operational_status = "running"
1770
1771 # Create a task for post scale out to allow us to sleep before attempting
1772 # to configure newly created VM's
1773 self._loop.create_task(post_scale_out_task(group, instance))
1774
1775 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
1776 self._log.debug("Scale out for group {} and instance {} failed".
1777 format(group.name, instance.instance_id))
1778 instance.operational_status = "failed"
1779
1780 elif instance.operational_status == "vnf_terminate_phase":
1781 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
1782 instance.operational_status = "terminated"
1783 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN,
1784 group, instance)
1785 if rc:
1786 self._log.debug("Scale in for group {} and instance {} succeeded".
1787 format(group.name, instance.instance_id))
1788 else:
1789 self._log.error("Post scale in config for scale group {} ({}) failed".
1790 format(group.name, instance.instance_id))
1791
1792 def create_vnffgs(self):
1793 """ This function creates VNFFGs for every VNFFG in the NSD
1794 associated with this NSR"""
1795
1796 for vnffgd in self.nsd_msg.vnffgd:
1797 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
1798 vnffgr = VnffgRecord(self._dts,
1799 self._log,
1800 self._loop,
1801 self._nsm._vnffgmgr,
1802 self,
1803 self.name,
1804 vnffgd,
1805 self._sdn_account_name
1806 )
1807 self._vnffgrs[vnffgr.id] = vnffgr
1808
1809 def resolve_vld_ip_profile(self, nsd_msg, vld):
1810 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1811 if not vld.has_field('ip_profile_ref'):
1812 return None
1813 profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1814 return profile[0] if profile else None
1815
1816 @asyncio.coroutine
1817 def _create_vls(self, vld, cloud_account,om_datacenter):
1818 """Create a VLR in the cloud account specified using the given VLD
1819
1820 Args:
1821 vld : VLD yang obj
1822 cloud_account : Cloud account name
1823
1824 Returns:
1825 VirtualLinkRecord
1826 """
1827 vlr = yield from VirtualLinkRecord.create_record(
1828 self._dts,
1829 self._log,
1830 self._loop,
1831 self.name,
1832 vld,
1833 cloud_account,
1834 om_datacenter,
1835 self.resolve_vld_ip_profile(self.nsd_msg, vld),
1836 self.id,
1837 restart_mode=self.restart_mode)
1838
1839 return vlr
1840
1841 def _extract_cloud_accounts_for_vl(self, vld):
1842 """
1843 Extracts the list of cloud accounts from the NS Config obj
1844
1845 Rules:
1846 1. Cloud accounts based connection point (vnf_cloud_account_map)
1847 Args:
1848 vld : VLD yang object
1849
1850 Returns:
1851 TYPE: Description
1852 """
1853 cloud_account_list = []
1854
1855 if self._nsr_cfg_msg.vnf_cloud_account_map:
1856 # Handle case where cloud_account is None
1857 vnf_cloud_map = {}
1858 for vnf in self._nsr_cfg_msg.vnf_cloud_account_map:
1859 if vnf.cloud_account is not None or vnf.om_datacenter is not None:
1860 vnf_cloud_map[vnf.member_vnf_index_ref] = (vnf.cloud_account,vnf.om_datacenter)
1861
1862 for vnfc in vld.vnfd_connection_point_ref:
1863 cloud_account = vnf_cloud_map.get(
1864 vnfc.member_vnf_index_ref,
1865 (self.cloud_account_name,self.om_datacenter_name))
1866
1867 cloud_account_list.append(cloud_account)
1868
1869 if self._nsr_cfg_msg.vl_cloud_account_map:
1870 for vld_map in self._nsr_cfg_msg.vl_cloud_account_map:
1871 if vld_map.vld_id_ref == vld.id:
1872 for cloud_account in vld_map.cloud_accounts:
1873 cloud_account_list.extend((cloud_account,None))
1874 for om_datacenter in vld_map.om_datacenters:
1875 cloud_account_list.extend((None,om_datacenter))
1876
1877 # If no config has been provided then fall-back to the default
1878 # account
1879 if not cloud_account_list:
1880 cloud_account_list = [(self.cloud_account_name,self.om_datacenter_name)]
1881
1882 self._log.debug("VL {} cloud accounts: {}".
1883 format(vld.name, cloud_account_list))
1884 return set(cloud_account_list)
1885
1886 @asyncio.coroutine
1887 def create_vls(self):
1888 """ This function creates VLs for every VLD in the NSD
1889 associated with this NSR"""
1890 for vld in self.nsd_msg.vld:
1891
1892 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
1893 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1894 for cloud_account,om_datacenter in cloud_account_list:
1895 vlr = yield from self._create_vls(vld, cloud_account,om_datacenter)
1896 self._vlrs.append(vlr)
1897
1898
1899 @asyncio.coroutine
1900 def create_vl_instance(self, vld):
1901 self._log.debug("Create VL for {}: {}".format(self.id, vld.as_dict()))
1902 # Check if the VL is already present
1903 vlr = None
1904 for vl in self._vlrs:
1905 if vl.vld_msg.id == vld.id:
1906 self._log.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1907 vld.id, self.id, vl.id, vl.state)
1908 vlr = vl
1909 if vlr.state != VlRecordState.TERMINATED:
1910 err_msg = "VLR for VL %s in NSR %s already instantiated", \
1911 vld, self.id
1912 self._log.error(err_msg)
1913 raise NsrVlUpdateError(err_msg)
1914 break
1915
1916 if vlr is None:
1917 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1918 for account,om_datacenter in cloud_account_list:
1919 vlr = yield from self._create_vls(vld, account,om_datacenter)
1920 self._vlrs.append(vlr)
1921
1922 vlr.state = VlRecordState.INSTANTIATION_PENDING
1923 yield from self.update_state()
1924
1925 try:
1926 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1927 vlr.state = VlRecordState.ACTIVE
1928
1929 except Exception as e:
1930 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
1931 format(self.id, vld.id, e)
1932 self._log.error(err_msg)
1933 self._log.exception(e)
1934 vlr.state = VlRecordState.FAILED
1935
1936 yield from self.update_state()
1937
1938 @asyncio.coroutine
1939 def delete_vl_instance(self, vld):
1940 for vlr in self._vlrs:
1941 if vlr.vld_msg.id == vld.id:
1942 self._log.debug("Found VLR %s for VLD %s in NSR %s",
1943 vlr.id, vld.id, self.id)
1944 vlr.state = VlRecordState.TERMINATE_PENDING
1945 yield from self.update_state()
1946
1947 try:
1948 yield from self.nsm_plugin.terminate_vl(vlr)
1949 vlr.state = VlRecordState.TERMINATED
1950 self._vlrs.remove(vlr)
1951
1952 except Exception as e:
1953 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
1954 format(self.id, vld.id, e)
1955 self._log.error(err_msg)
1956 self._log.exception(e)
1957 vlr.state = VlRecordState.FAILED
1958
1959 yield from self.update_state()
1960 break
1961
1962 @asyncio.coroutine
1963 def create_vnfs(self, config_xact):
1964 """
1965 This function creates VNFs for every VNF in the NSD
1966 associated with this NSR
1967 """
1968 self._log.debug("Creating %u VNFs associated with this NS id %s",
1969 len(self.nsd_msg.constituent_vnfd), self.id)
1970
1971 for const_vnfd in self.nsd_msg.constituent_vnfd:
1972 if not const_vnfd.start_by_default:
1973 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1974 const_vnfd.member_vnf_index)
1975 continue
1976
1977 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
1978 cloud_account_name,om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd.member_vnf_index)
1979 if cloud_account_name is None:
1980 cloud_account_name = self.cloud_account_name
1981 yield from self.create_vnf_record(vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name)
1982
1983
1984 def get_placement_groups(self, vnfd_msg, const_vnfd):
1985 placement_groups = []
1986 for group in self.nsd_msg.placement_groups:
1987 for member_vnfd in group.member_vnfd:
1988 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
1989 (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index):
1990 group_info = self.resolve_placement_group_cloud_construct(group)
1991 if group_info is None:
1992 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1993 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1994 else:
1995 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
1996 str(group_info),
1997 vnfd_msg.name,
1998 const_vnfd.member_vnf_index)
1999 placement_groups.append(group_info)
2000 return placement_groups
2001
2002 @asyncio.coroutine
2003 def create_vnf_record(self, vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name, group_name=None, group_instance_id=None):
2004 # Fetch the VNFD associated with this VNF
2005 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
2006 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name)
2007 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2008 vnfd_msg.name,
2009 const_vnfd.member_vnf_index,
2010 [ group.name for group in placement_groups])
2011 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
2012 self._log,
2013 self._loop,
2014 vnfd_msg,
2015 const_vnfd,
2016 self.nsd_id,
2017 self.name,
2018 cloud_account_name,
2019 om_datacenter_name,
2020 self.id,
2021 group_name,
2022 group_instance_id,
2023 placement_groups,
2024 restart_mode=self.restart_mode,
2025 )
2026 if vnfr.id in self._vnfrs:
2027 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
2028 raise NetworkServiceRecordError(err)
2029
2030 self._vnfrs[vnfr.id] = vnfr
2031 self._nsm.vnfrs[vnfr.id] = vnfr
2032
2033 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
2034
2035 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2036 vnfr.name,
2037 vnfr.id)
2038
2039 return vnfr
2040
2041 def create_param_pools(self):
2042 for param_pool in self.nsd_msg.parameter_pool:
2043 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2044
2045 start_value = param_pool.range.start_value
2046 end_value = param_pool.range.end_value
2047 if end_value < start_value:
2048 raise NetworkServiceRecordError(
2049 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2050 start_value, end_value
2051 )
2052 )
2053
2054 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2055 self._log,
2056 param_pool.name,
2057 range(start_value, end_value)
2058 )
2059
2060 @asyncio.coroutine
2061 def fetch_vnfr(self, vnfr_path):
2062 """ Fetch VNFR record """
2063 vnfr = None
2064 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2065 vnfr_path, self.id)
2066 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2067
2068 for ent in res_iter:
2069 res = yield from ent
2070 vnfr = res.result
2071
2072 return vnfr
2073
2074 @asyncio.coroutine
2075 def instantiate_vnfs(self, vnfrs, scaleout=False):
2076 """
2077 This function instantiates VNFs for every VNF in this Network Service
2078 """
2079 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2080 for vnf in vnfrs:
2081 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2082 yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout)
2083
2084 @asyncio.coroutine
2085 def instantiate_vnffgs(self):
2086 """
2087 This function instantiates VNFFGs for every VNFFG in this Network Service
2088 """
2089 self._log.debug("Instantiating %u VNFFGs in NS %s",
2090 len(self.nsd_msg.vnffgd), self.id)
2091 for _, vnfr in self.vnfrs.items():
2092 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2093 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2094 yield from asyncio.sleep(2, loop=self._loop)
2095 if vnfr.state == VnfRecordState.ACTIVE:
2096 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2097 continue
2098 else:
2099 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2100 self._vnffgr_state = VnffgRecordState.FAILED
2101 return
2102
2103 self._log.info("Waiting for 90 seconds for VMs to come up")
2104 yield from asyncio.sleep(90, loop=self._loop)
2105 self._log.info("Starting VNFFG orchestration")
2106 for vnffg in self._vnffgrs.values():
2107 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2108 yield from vnffg.instantiate()
2109
2110 @asyncio.coroutine
2111 def instantiate_scaling_instances(self, config_xact):
2112 """ Instantiate any default scaling instances in this Network Service """
2113 for group in self._scaling_groups.values():
2114 for i in range(group.min_instance_count):
2115 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2116 yield from self.create_scale_group_instance(
2117 group.name, i, config_xact, is_default=True
2118 )
2119
2120 for group_msg in self._nsr_cfg_msg.scaling_group:
2121 if group_msg.scaling_group_name_ref != group.name:
2122 continue
2123
2124 for instance in group_msg.instance:
2125 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2126 yield from self.create_scale_group_instance(
2127 group.name, instance.id, config_xact, is_default=False
2128 )
2129
2130 def has_scaling_instances(self):
2131 """ Return boolean indicating if the network service has default scaling groups """
2132 for group in self._scaling_groups.values():
2133 if group.min_instance_count > 0:
2134 return True
2135
2136 for group_msg in self._nsr_cfg_msg.scaling_group:
2137 if len(group_msg.instance) > 0:
2138 return True
2139
2140 return False
2141
2142 @asyncio.coroutine
2143 def publish(self):
2144 """ This function publishes this NSR """
2145 self._nsr_msg = self.create_msg()
2146
2147 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2148 self.nsr_xpath,
2149 self._nsr_msg)
2150
2151 if self._debug_running:
2152 self._log.debug("Publishing NSR in RUNNING state!")
2153 #raise()
2154
2155 with self._dts.transaction() as xact:
2156 yield from self._nsm.nsr_handler.update(xact, self.nsr_xpath, self._nsr_msg)
2157 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2158 self._debug_running = True
2159
2160 @asyncio.coroutine
2161 def unpublish(self, xact):
2162 """ Unpublish this NSR object """
2163 self._log.debug("Unpublishing Network service id %s", self.id)
2164 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2165
2166 @property
2167 def nsr_xpath(self):
2168 """ Returns the xpath associated with this NSR """
2169 return(
2170 "D,/nsr:ns-instance-opdata" +
2171 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2172 ).format(self.id)
2173
2174 @staticmethod
2175 def xpath_from_nsr(nsr):
2176 """ Returns the xpath associated with this NSR op data"""
2177 return (NetworkServiceRecord.XPATH +
2178 "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)
2179
2180 @property
2181 def nsd_xpath(self):
2182 """ Return NSD config xpath."""
2183 return(
2184 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2185 ).format(self.nsd_id)
2186
2187 @asyncio.coroutine
2188 def instantiate(self, config_xact):
2189 """"Instantiates a NetworkServiceRecord.
2190
2191 This function instantiates a Network service
2192 which involves the following steps,
2193
2194 * Instantiate every VL in NSD by sending create VLR request to DTS.
2195 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2196 * Publish the NSR details to DTS
2197
2198 Arguments:
2199 nsr: The NSR configuration request containing nsr-id and nsd
2200 config_xact: The configuration transaction which initiated the instatiation
2201
2202 Raises:
2203 NetworkServiceRecordError if the NSR creation fails
2204
2205 Returns:
2206 No return value
2207 """
2208
2209 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2210
2211 # Move the state to INIITALIZING
2212 self.set_state(NetworkServiceRecordState.INIT)
2213
2214 event_descr = "Instantiation Request Received NSR Id:%s" % self.id
2215 self.record_event("instantiating", event_descr)
2216
2217 # Find the NSD
2218 self._nsd = self._nsr_cfg_msg.nsd
2219
2220 # Merge any config and initial config primitive values
2221 self.config_store.merge_nsd_config(self.nsd_msg)
2222 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2223
2224 event_descr = "Fetched NSD with descriptor id %s" % self.nsd_id
2225 self.record_event("nsd-fetched", event_descr)
2226
2227 if self._nsd is None:
2228 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2229 self._log.debug(msg, self.nsd_id, self.id)
2230 raise NetworkServiceRecordError(self)
2231
2232 self._log.debug("Got nsd result %s", self._nsd)
2233
2234 # Substitute any input parameters
2235 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2236
2237 # Create the record
2238 yield from self.create(config_xact)
2239
2240 # Publish the NSR to DTS
2241 yield from self.publish()
2242
2243 @asyncio.coroutine
2244 def do_instantiate():
2245 """
2246 Instantiate network service
2247 """
2248 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2249 self.id, self.nsd_id)
2250
2251 # instantiate the VLs
2252 event_descr = ("Instantiating %s external VLs for NSR id %s" %
2253 (len(self.nsd_msg.vld), self.id))
2254 self.record_event("begin-external-vls-instantiation", event_descr)
2255
2256 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2257
2258 yield from self.instantiate_vls()
2259
2260 # Publish the NSR to DTS
2261 yield from self.publish()
2262
2263 event_descr = ("Finished instantiating %s external VLs for NSR id %s" %
2264 (len(self.nsd_msg.vld), self.id))
2265 self.record_event("end-external-vls-instantiation", event_descr)
2266
2267 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2268
2269 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2270 self.id, self.nsd_id)
2271
2272 # instantiate the VNFs
2273 event_descr = ("Instantiating %s VNFS for NSR id %s" %
2274 (len(self.nsd_msg.constituent_vnfd), self.id))
2275
2276 self.record_event("begin-vnf-instantiation", event_descr)
2277
2278 yield from self.instantiate_vnfs(self._vnfrs.values())
2279
2280 self._log.debug(" Finished instantiating %d VNFs for NSR id %s",
2281 len(self.nsd_msg.constituent_vnfd), self.id)
2282
2283 event_descr = ("Finished instantiating %s VNFs for NSR id %s" %
2284 (len(self.nsd_msg.constituent_vnfd), self.id))
2285 self.record_event("end-vnf-instantiation", event_descr)
2286
2287 if len(self.vnffgrs) > 0:
2288 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2289 event_descr = ("Instantiating %s VNFFGS for NSR id %s" %
2290 (len(self.nsd_msg.vnffgd), self.id))
2291
2292 self.record_event("begin-vnffg-instantiation", event_descr)
2293
2294 yield from self.instantiate_vnffgs()
2295
2296 event_descr = ("Finished instantiating %s VNFFGDs for NSR id %s" %
2297 (len(self.nsd_msg.vnffgd), self.id))
2298 self.record_event("end-vnffg-instantiation", event_descr)
2299
2300 if self.has_scaling_instances():
2301 event_descr = ("Instantiating %s Scaling Groups for NSR id %s" %
2302 (len(self._scaling_groups), self.id))
2303
2304 self.record_event("begin-scaling-group-instantiation", event_descr)
2305 yield from self.instantiate_scaling_instances(config_xact)
2306 self.record_event("end-scaling-group-instantiation", event_descr)
2307
2308 # Give the plugin a chance to deploy the network service now that all
2309 # virtual links and vnfs are instantiated
2310 yield from self.nsm_plugin.deploy(self._nsr_msg)
2311
2312 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2313 self.id, self.nsd_id)
2314
2315 # Publish the NSR to DTS
2316 yield from self.publish()
2317
2318 self._log.debug("Published NSR...... nsr[%s], nsd[%s]",
2319 self.id, self.nsd_id)
2320
2321 def on_instantiate_done(fut):
2322 # If the do_instantiate fails, then publish NSR with failed result
2323 e = fut.exception()
2324 if e is not None:
2325 import traceback, sys
2326 print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True)
2327 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(e))
2328 self._loop.create_task(self.instantiation_failed(failed_reason=str(e)))
2329
2330 instantiate_task = self._loop.create_task(do_instantiate())
2331 instantiate_task.add_done_callback(on_instantiate_done)
2332
2333 @asyncio.coroutine
2334 def set_config_status(self, status, status_details=None):
2335 if self.config_status != status:
2336 self._log.debug("Updating NSR {} status for {} to {}".
2337 format(self.name, self.config_status, status))
2338 self._config_status = status
2339 self._config_status_details = status_details
2340
2341 if self._config_status == NsrYang.ConfigStates.FAILED:
2342 self.record_event("config-failed", "NS configuration failed",
2343 evt_details=self._config_status_details)
2344
2345 yield from self.publish()
2346
2347 @asyncio.coroutine
2348 def is_active(self):
2349 """ This NS is active """
2350 self.set_state(NetworkServiceRecordState.RUNNING)
2351 if self._is_active:
2352 return
2353
2354 # Publish the NSR to DTS
2355 self._log.debug("Network service %s is active ", self.id)
2356 self._is_active = True
2357
2358 event_descr = "NSR in running state for NSR id %s" % self.id
2359 self.record_event("ns-running", event_descr)
2360
2361 yield from self.publish()
2362
2363 @asyncio.coroutine
2364 def instantiation_failed(self, failed_reason=None):
2365 """ The NS instantiation failed"""
2366 self._log.error("Network service id:%s, name:%s instantiation failed",
2367 self.id, self.name)
2368 self.set_state(NetworkServiceRecordState.FAILED)
2369
2370 event_descr = "Instantiation of NS %s failed" % self.id
2371 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2372
2373 # Publish the NSR to DTS
2374 yield from self.publish()
2375
2376 @asyncio.coroutine
2377 def terminate_vnfrs(self, vnfrs, scalein=False):
2378 """ Terminate VNFRS in this network service """
2379 self._log.debug("Terminating VNFs in network service %s", self.id)
2380 for vnfr in vnfrs:
2381 self._log.debug("Terminating VNFs in network service %s %s", vnfr.id, self.id)
2382 if scalein:
2383 yield from self.nsm_plugin.terminate_vnf(self, vnfr, scalein=True)
2384
2385 @asyncio.coroutine
2386 def terminate(self):
2387 """ Terminate a NetworkServiceRecord."""
2388 def terminate_vnffgrs():
2389 """ Terminate VNFFGRS in this network service """
2390 self._log.debug("Terminating VNFFGRs in network service %s", self.id)
2391 for vnffgr in self.vnffgrs.values():
2392 yield from vnffgr.terminate()
2393
2394 def terminate_vlrs():
2395 """ Terminate VLRs in this netork service """
2396 self._log.debug("Terminating VLs in network service %s", self.id)
2397 for vlr in self.vlrs:
2398 yield from self.nsm_plugin.terminate_vl(vlr)
2399 vlr.state = VlRecordState.TERMINATED
2400
2401 self._log.debug("Terminating network service id %s", self.id)
2402
2403 # Move the state to TERMINATE
2404 self.set_state(NetworkServiceRecordState.TERMINATE)
2405 event_descr = "Terminate being processed for NS Id:%s" % self.id
2406 self.record_event("terminate", event_descr)
2407
2408 # Move the state to VNF_TERMINATE_PHASE
2409 self._log.debug("Terminating VNFFGs in NS ID: %s", self.id)
2410 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2411 event_descr = "Terminating VNFFGS in NS Id:%s" % self.id
2412 self.record_event("terminating-vnffgss", event_descr)
2413 yield from terminate_vnffgrs()
2414
2415 # Move the state to VNF_TERMINATE_PHASE
2416 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2417 event_descr = "Terminating VNFS in NS Id:%s" % self.id
2418 self.record_event("terminating-vnfs", event_descr)
2419 yield from self.terminate_vnfrs(self.vnfrs.values())
2420
2421 # Move the state to VL_TERMINATE_PHASE
2422 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2423 event_descr = "Terminating VLs in NS Id:%s" % self.id
2424 self.record_event("terminating-vls", event_descr)
2425 yield from terminate_vlrs()
2426 yield from self.nsm_plugin.terminate_ns(self)
2427 # Move the state to TERMINATED
2428 self.set_state(NetworkServiceRecordState.TERMINATED)
2429 event_descr = "Terminated NS Id:%s" % self.id
2430 self.record_event("terminated", event_descr)
2431
2432 def enable(self):
2433 """"Enable a NetworkServiceRecord."""
2434 pass
2435
2436 def disable(self):
2437 """"Disable a NetworkServiceRecord."""
2438 pass
2439
2440 def map_config_status(self):
2441 self._log.debug("Config status for ns {} is {}".
2442 format(self.name, self._config_status))
2443 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2444 return 'configuring'
2445 if self._config_status == NsrYang.ConfigStates.FAILED:
2446 return 'failed'
2447 return 'configured'
2448
2449 def vl_phase_completed(self):
2450 """ Are VLs created in this NS?"""
2451 return self._vl_phase_completed
2452
2453 def vnf_phase_completed(self):
2454 """ Are VLs created in this NS?"""
2455 return self._vnf_phase_completed
2456
2457 def create_msg(self):
2458 """ The network serice record as a message """
2459 nsr_dict = {"ns_instance_config_ref": self.id}
2460 nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2461 #nsr.cloud_account = self.cloud_account_name
2462 nsr.sdn_account = self._sdn_account_name
2463 nsr.name_ref = self.name
2464 nsr.nsd_ref = self.nsd_id
2465 nsr.nsd_name_ref = self.nsd_msg.name
2466 nsr.operational_events = self._op_status.msg
2467 nsr.operational_status = self._op_status.yang_str()
2468 nsr.config_status = self.map_config_status()
2469 nsr.config_status_details = self._config_status_details
2470 nsr.create_time = self._create_time
2471 nsr.uptime = int(time.time()) - self._create_time
2472
2473 for cfg_prim in self.nsd_msg.service_primitive:
2474 cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2475 cfg_prim.as_dict())
2476 nsr.service_primitive.append(cfg_prim)
2477
2478 for init_cfg in self.nsd_msg.initial_config_primitive:
2479 prim = NsrYang.NsrInitialConfigPrimitive.from_dict(
2480 init_cfg.as_dict())
2481 nsr.initial_config_primitive.append(prim)
2482
2483 if self.vl_phase_completed():
2484 for vlr in self.vlrs:
2485 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2486
2487 if self.vnf_phase_completed():
2488 for vnfr_id in self.vnfrs:
2489 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2490 for vnffgr in self.vnffgrs.values():
2491 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2492 for scaling_group in self._scaling_groups.values():
2493 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2494
2495 return nsr
2496
2497 def all_vnfs_active(self):
2498 """ Are all VNFS in this NS active? """
2499 for _, vnfr in self.vnfrs.items():
2500 if vnfr.active is not True:
2501 return False
2502 return True
2503
2504 @asyncio.coroutine
2505 def update_state(self):
2506 """ Re-evaluate this NS's state """
2507 curr_state = self._op_status.state
2508
2509 if curr_state == NetworkServiceRecordState.TERMINATED:
2510 self._log.debug("NS (%s) in terminated state, not updating state", self.id)
2511 return
2512
2513 new_state = NetworkServiceRecordState.RUNNING
2514 self._log.info("Received update_state for nsr: %s, curr-state: %s",
2515 self.id, curr_state)
2516
2517 # Check all the VNFRs are present
2518 for _, vnfr in self.vnfrs.items():
2519 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2520 pass
2521 elif vnfr.state == VnfRecordState.FAILED:
2522 if vnfr._prev_state != vnfr.state:
2523 event_descr = "Instantiation of VNF %s failed" % vnfr.id
2524 event_error_details = vnfr.state_failed_reason
2525 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2526 vnfr.set_state(VnfRecordState.FAILED)
2527 else:
2528 self._log.info("VNF state did not change, curr=%s, prev=%s",
2529 vnfr.state, vnfr._prev_state)
2530 new_state = NetworkServiceRecordState.FAILED
2531 break
2532 else:
2533 self._log.info("VNF %s in NSR %s is still not active; current state is: %s",
2534 vnfr.id, self.id, vnfr.state)
2535 new_state = curr_state
2536
2537 # If new state is RUNNING; check all VLs
2538 if new_state == NetworkServiceRecordState.RUNNING:
2539 for vl in self.vlrs:
2540
2541 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2542 pass
2543 elif vl.state == VlRecordState.FAILED:
2544 if vl.prev_state != vl.state:
2545 event_descr = "Instantiation of VL %s failed" % vl.id
2546 event_error_details = vl.state_failed_reason
2547 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2548 vl.prev_state = vl.state
2549 else:
2550 self._log.debug("VL %s already in failed state")
2551 else:
2552 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2553 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2554 break
2555
2556 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2557 new_state = NetworkServiceRecordState.VL_TERMINATE
2558 break
2559
2560 # If new state is RUNNING; check VNFFGRs are also active
2561 if new_state == NetworkServiceRecordState.RUNNING:
2562 for _, vnffgr in self.vnffgrs.items():
2563 self._log.info("Checking vnffgr state for nsr %s is: %s",
2564 self.id, vnffgr.state)
2565 if vnffgr.state == VnffgRecordState.ACTIVE:
2566 pass
2567 elif vnffgr.state == VnffgRecordState.FAILED:
2568 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2569 self.record_event("vnffg-failed", event_descr)
2570 new_state = NetworkServiceRecordState.FAILED
2571 break
2572 else:
2573 self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2574 vnffgr.id, self.id, vnffgr.state)
2575 new_state = curr_state
2576
2577 # Update all the scaling group instance operational status to
2578 # reflect the state of all VNFR within that instance
2579 yield from self._update_scale_group_instances_status()
2580
2581 for _, group in self._scaling_groups.items():
2582 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
2583 new_state = NetworkServiceRecordState.SCALING_OUT
2584 break
2585 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
2586 new_state = NetworkServiceRecordState.SCALING_IN
2587 break
2588
2589 if new_state != curr_state:
2590 self._log.debug("Changing state of Network service %s from %s to %s",
2591 self.id, curr_state, new_state)
2592 if new_state == NetworkServiceRecordState.RUNNING:
2593 yield from self.is_active()
2594 elif new_state == NetworkServiceRecordState.FAILED:
2595 # If the NS is already active and we entered scaling_in, scaling_out,
2596 # do not mark the NS as failing if scaling operation failed.
2597 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
2598 NetworkServiceRecordState.SCALING_IN] and self._is_active:
2599 new_state = NetworkServiceRecordState.RUNNING
2600 self.set_state(new_state)
2601 else:
2602 yield from self.instantiation_failed()
2603 else:
2604 self.set_state(new_state)
2605
2606 yield from self.publish()
2607
2608
2609 class InputParameterSubstitution(object):
2610 """
2611 This class is responsible for substituting input parameters into an NSD.
2612 """
2613
2614 def __init__(self, log):
2615 """Create an instance of InputParameterSubstitution
2616
2617 Arguments:
2618 log - a logger for this object to use
2619
2620 """
2621 self.log = log
2622
2623 def __call__(self, nsd, nsr_config):
2624 """Substitutes input parameters from the NSR config into the NSD
2625
2626 This call modifies the provided NSD with the input parameters that are
2627 contained in the NSR config.
2628
2629 Arguments:
2630 nsd - a GI NSD object
2631 nsr_config - a GI NSR config object
2632
2633 """
2634 if nsd is None or nsr_config is None:
2635 return
2636
2637 # Create a lookup of the xpath elements that this descriptor allows
2638 # to be modified
2639 optional_input_parameters = set()
2640 for input_parameter in nsd.input_parameter_xpath:
2641 optional_input_parameters.add(input_parameter.xpath)
2642
2643 # Apply the input parameters to the descriptor
2644 if nsr_config.input_parameter:
2645 for param in nsr_config.input_parameter:
2646 if param.xpath not in optional_input_parameters:
2647 msg = "tried to set an invalid input parameter ({})"
2648 self.log.error(msg.format(param.xpath))
2649 continue
2650
2651 self.log.debug(
2652 "input-parameter:{} = {}".format(
2653 param.xpath,
2654 param.value,
2655 )
2656 )
2657
2658 try:
2659 xpath.setxattr(nsd, param.xpath, param.value)
2660
2661 except Exception as e:
2662 self.log.exception(e)
2663
2664
2665 class NetworkServiceDescriptor(object):
2666 """
2667 Network service descriptor class
2668 """
2669
2670 def __init__(self, dts, log, loop, nsd, nsm):
2671 self._dts = dts
2672 self._log = log
2673 self._loop = loop
2674
2675 self._nsd = nsd
2676 self._nsm = nsm
2677
2678 @property
2679 def id(self):
2680 """ Returns nsd id """
2681 return self._nsd.id
2682
2683 @property
2684 def name(self):
2685 """ Returns name of nsd """
2686 return self._nsd.name
2687
2688 @property
2689 def msg(self):
2690 """ Return the message associated with this NetworkServiceDescriptor"""
2691 return self._nsd
2692
2693 @staticmethod
2694 def path_for_id(nsd_id):
2695 """ Return path for the passed nsd_id"""
2696 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id)
2697
2698 def path(self):
2699 """ Return the message associated with this NetworkServiceDescriptor"""
2700 return NetworkServiceDescriptor.path_for_id(self.id)
2701
2702 def update(self, nsd):
2703 """ Update the NSD descriptor """
2704 self._nsd = nsd
2705
2706
2707 class NsdDtsHandler(object):
2708 """ The network service descriptor DTS handler """
2709 XPATH = "C,/nsd:nsd-catalog/nsd:nsd"
2710
2711 def __init__(self, dts, log, loop, nsm):
2712 self._dts = dts
2713 self._log = log
2714 self._loop = loop
2715 self._nsm = nsm
2716
2717 self._regh = None
2718
2719 @property
2720 def regh(self):
2721 """ Return registration handle """
2722 return self._regh
2723
2724 @asyncio.coroutine
2725 def register(self):
2726 """ Register for Nsd create/update/delete/read requests from dts """
2727
2728 def on_apply(dts, acg, xact, action, scratch):
2729 """Apply the configuration"""
2730 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2731 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2732 xact, action)
2733 # Create/Update an NSD record
2734 for cfg in self._regh.get_xact_elements(xact):
2735 # Only interested in those NSD cfgs whose ID was received in prepare callback
2736 if cfg.id in scratch.get('nsds', []) or is_recovery:
2737 self._nsm.update_nsd(cfg)
2738
2739 scratch.pop('nsds', None)
2740
2741 return RwTypes.RwStatus.SUCCESS
2742
2743 @asyncio.coroutine
2744 def delete_nsd_libs(nsd_id):
2745 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2746 try:
2747 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2748 nsd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', nsd_id)
2749
2750 if os.path.exists (nsd_dir):
2751 shutil.rmtree(nsd_dir, ignore_errors=True)
2752 except Exception as e:
2753 self._log.error("Exception in cleaning up NSD libs {}: {}".
2754 format(nsd_id, e))
2755 self._log.excpetion(e)
2756
2757 @asyncio.coroutine
2758 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2759 """ Prepare callback from DTS for NSD config """
2760
2761 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
2762 msg.id, msg)
2763
2764 fref = ProtobufC.FieldReference.alloc()
2765 fref.goto_whole_message(msg.to_pbcm())
2766
2767 if fref.is_field_deleted():
2768 # Delete an NSD record
2769 self._log.debug("Deleting NSD with id %s", msg.id)
2770 yield from delete_nsd_libs(msg.id)
2771 self._nsm.delete_nsd(msg.id)
2772 else:
2773 # Add this NSD to scratch to create/update in apply callback
2774 nsds = scratch.setdefault('nsds', [])
2775 nsds.append(msg.id)
2776 # acg._scratch['nsds'].append(msg.id)
2777
2778 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2779
2780 self._log.debug(
2781 "Registering for NSD config using xpath: %s",
2782 NsdDtsHandler.XPATH,
2783 )
2784
2785 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2786 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2787 # Need a list in scratch to store NSDs to create/update later
2788 # acg._scratch['nsds'] = list()
2789 self._regh = acg.register(
2790 xpath=NsdDtsHandler.XPATH,
2791 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
2792 on_prepare=on_prepare)
2793
2794
2795 class VnfdDtsHandler(object):
2796 """ DTS handler for VNFD config changes """
2797 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2798
2799 def __init__(self, dts, log, loop, nsm):
2800 self._dts = dts
2801 self._log = log
2802 self._loop = loop
2803 self._nsm = nsm
2804 self._regh = None
2805
2806 @property
2807 def regh(self):
2808 """ DTS registration handle """
2809 return self._regh
2810
2811 @asyncio.coroutine
2812 def register(self):
2813 """ Register for VNFD configuration"""
2814
2815 @asyncio.coroutine
2816 def on_apply(dts, acg, xact, action, scratch):
2817 """Apply the configuration"""
2818 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2819 xact, action, scratch)
2820
2821 # Create/Update a VNFD record
2822 for cfg in self._regh.get_xact_elements(xact):
2823 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2824 if cfg.id in scratch.get('vnfds', []):
2825 self._nsm.update_vnfd(cfg)
2826
2827 for cfg in self._regh.elements:
2828 if cfg.id in scratch.get('deleted_vnfds', []):
2829 yield from self._nsm.delete_vnfd(cfg.id)
2830
2831 scratch.pop('vnfds', None)
2832 scratch.pop('deleted_vnfds', None)
2833
2834 @asyncio.coroutine
2835 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2836 """ on prepare callback """
2837 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2838 ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg)
2839
2840 fref = ProtobufC.FieldReference.alloc()
2841 fref.goto_whole_message(msg.to_pbcm())
2842
2843 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2844 if fref.is_field_deleted():
2845 self._log.debug("Adding msg to deleted field")
2846 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
2847 deleted_vnfds.append(msg.id)
2848 else:
2849 # Add this VNFD to scratch to create/update in apply callback
2850 vnfds = scratch.setdefault('vnfds', [])
2851 vnfds.append(msg.id)
2852
2853 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2854
2855 self._log.debug(
2856 "Registering for VNFD config using xpath: %s",
2857 VnfdDtsHandler.XPATH,
2858 )
2859 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2860 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2861 # Need a list in scratch to store VNFDs to create/update later
2862 # acg._scratch['vnfds'] = list()
2863 # acg._scratch['deleted_vnfds'] = list()
2864 self._regh = acg.register(
2865 xpath=VnfdDtsHandler.XPATH,
2866 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2867 on_prepare=on_prepare)
2868
2869 class NsrRpcDtsHandler(object):
2870 """ The network service instantiation RPC DTS handler """
2871 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
2872 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
2873 NETCONF_IP_ADDRESS = "127.0.0.1"
2874 NETCONF_PORT = 2022
2875 RESTCONF_PORT = 8888
2876 NETCONF_USER = "admin"
2877 NETCONF_PW = "admin"
2878 REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1",8888)
2879
2880 def __init__(self, dts, log, loop, nsm):
2881 self._dts = dts
2882 self._log = log
2883 self._loop = loop
2884 self._nsm = nsm
2885 self._nsd = None
2886
2887 self._ns_regh = None
2888
2889 self._manager = None
2890 self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + 'config/ns-instance-config'
2891
2892 self._model = RwYang.Model.create_libncx()
2893 self._model.load_schema_ypbc(RwNsrYang.get_schema())
2894
2895 @property
2896 def nsm(self):
2897 """ Return the NS manager instance """
2898 return self._nsm
2899
2900 @staticmethod
2901 def wrap_netconf_config_xml(xml):
2902 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
2903 return xml
2904
2905 @asyncio.coroutine
2906 def _connect(self, timeout_secs=240):
2907
2908 start_time = time.time()
2909 while (time.time() - start_time) < timeout_secs:
2910
2911 try:
2912 self._log.debug("Attemping NsmTasklet netconf connection.")
2913
2914 manager = yield from ncclient.asyncio_manager.asyncio_connect(
2915 loop=self._loop,
2916 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
2917 port=NsrRpcDtsHandler.NETCONF_PORT,
2918 username=NsrRpcDtsHandler.NETCONF_USER,
2919 password=NsrRpcDtsHandler.NETCONF_PW,
2920 allow_agent=False,
2921 look_for_keys=False,
2922 hostkey_verify=False,
2923 )
2924
2925 return manager
2926
2927 except ncclient.transport.errors.SSHError as e:
2928 self._log.warning("Netconf connection to launchpad %s failed: %s",
2929 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
2930
2931 yield from asyncio.sleep(5, loop=self._loop)
2932
2933 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2934 timeout_secs)
2935
2936 def _apply_ns_instance_config(self,payload_dict):
2937 #self._log.debug("At apply NS instance config with payload %s",payload_dict)
2938 req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
2939 response=requests.post(self._nsr_config_url, headers=req_hdr, auth=('admin', 'admin'),data=payload_dict,verify=False)
2940 return response
2941
2942 @asyncio.coroutine
2943 def register(self):
2944 """ Register for NS monitoring read from dts """
2945 @asyncio.coroutine
2946 def on_ns_config_prepare(xact_info, action, ks_path, msg):
2947 """ prepare callback from dts start-network-service"""
2948 assert action == rwdts.QueryAction.RPC
2949 rpc_ip = msg
2950 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
2951 "nsr_id":str(uuid.uuid4())
2952 })
2953
2954 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
2955 self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip))
2956
2957
2958 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
2959
2960 try:
2961 # Add used value to the pool
2962 self._log.debug("RPC output: {}".format(rpc_op))
2963
2964 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
2965
2966 #if not self._manager:
2967 # self._manager = yield from self._connect()
2968
2969 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
2970 rpc_ip.name, rpc_ip.nsd_ref)
2971
2972 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
2973 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
2974 if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields}
2975 ns_instance_config_dict.update(ns_instance_config_copy_dict)
2976
2977 ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
2978 ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
2979 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
2980
2981 payload_dict = ns_instance_config.to_json(self._model)
2982 #xml = ns_instance_config.to_xml_v2(self._model)
2983 #netconf_xml = self.wrap_netconf_config_xml(xml)
2984
2985 #self._log.debug("Sending configure ns-instance-config xml to %s: %s",
2986 # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
2987 self._log.debug("Sending configure ns-instance-config json to %s: %s",
2988 self._nsr_config_url,ns_instance_config)
2989
2990 #response = yield from self._manager.edit_config(
2991 # target="running",
2992 # config=netconf_xml,
2993 # )
2994 response = yield from self._loop.run_in_executor(
2995 None,
2996 self._apply_ns_instance_config,
2997 payload_dict
2998 )
2999 response.raise_for_status()
3000 self._log.debug("Received edit config response: %s", response.json())
3001
3002 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
3003 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3004 rpc_op)
3005 except Exception as e:
3006 self._log.error("Exception processing the "
3007 "start-network-service: {}".format(e))
3008 self._log.exception(e)
3009 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3010 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3011
3012
3013 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
3014
3015 with self._dts.group_create() as group:
3016 self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
3017 handler=hdl_ns,
3018 flags=rwdts.Flag.PUBLISHER,
3019 )
3020
3021
3022 class NsrDtsHandler(object):
3023 """ The network service DTS handler """
3024 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3025 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3026 KEY_PAIR_XPATH = "C,/nsr:key-pair"
3027
3028 def __init__(self, dts, log, loop, nsm):
3029 self._dts = dts
3030 self._log = log
3031 self._loop = loop
3032 self._nsm = nsm
3033
3034 self._nsr_regh = None
3035 self._scale_regh = None
3036 self._key_pair_regh = None
3037
3038 @property
3039 def nsm(self):
3040 """ Return the NS manager instance """
3041 return self._nsm
3042
3043 @asyncio.coroutine
3044 def register(self):
3045 """ Register for Nsr create/update/delete/read requests from dts """
3046
3047 def nsr_id_from_keyspec(ks):
3048 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3049 nsr_id = nsr_path_entry.key00.id
3050 return nsr_id
3051
3052 def group_name_from_keyspec(ks):
3053 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3054 group_name = group_path_entry.key00.scaling_group_name_ref
3055 return group_name
3056
3057 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3058 """ Return boolean indicating if scaling group instance was already commited previously.
3059
3060 By looking at the existing elements in this registration handle (elements not part
3061 of this current xact), we can tell if the instance was configured previously without
3062 keeping any application state.
3063 """
3064 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3065 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3066 elem_group_name = group_name_from_keyspec(keyspec)
3067
3068 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3069 continue
3070
3071 if instance_cfg.id == instance_id:
3072 return True
3073
3074 return False
3075
3076 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3077 delta = {"added": [], "deleted": []}
3078 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3079 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3080 if elem_nsr_id != nsr_id:
3081 continue
3082
3083 elem_group_name = group_name_from_keyspec(keyspec)
3084 if elem_group_name != group_name:
3085 continue
3086
3087 delta["added"].append(instance_cfg.id)
3088
3089 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3090 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3091 if elem_nsr_id != nsr_id:
3092 continue
3093
3094 elem_group_name = group_name_from_keyspec(keyspec)
3095 if elem_group_name != group_name:
3096 continue
3097
3098 if instance_cfg.id in delta["added"]:
3099 delta["added"].remove(instance_cfg.id)
3100 else:
3101 delta["deleted"].append(instance_cfg.id)
3102
3103 return delta
3104
3105 @asyncio.coroutine
3106 def update_nsr_nsd(nsr_id, xact, scratch):
3107
3108 @asyncio.coroutine
3109 def get_nsr_vl_delta(nsr_id, xact, scratch):
3110 delta = {"added": [], "deleted": []}
3111 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3112 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3113 if elem_nsr_id != nsr_id:
3114 continue
3115
3116 if 'vld' in instance_cfg.nsd:
3117 for vld in instance_cfg.nsd.vld:
3118 delta["added"].append(vld)
3119
3120 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3121 self._log.debug("NSR update: %s", instance_cfg)
3122 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3123 if elem_nsr_id != nsr_id:
3124 continue
3125
3126 if 'vld' in instance_cfg.nsd:
3127 for vld in instance_cfg.nsd.vld:
3128 if vld in delta["added"]:
3129 delta["added"].remove(vld)
3130 else:
3131 delta["deleted"].append(vld)
3132
3133 return delta
3134
3135 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3136 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3137
3138 for vld in vl_delta["added"]:
3139 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3140
3141 for vld in vl_delta["deleted"]:
3142 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3143
3144 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch):
3145 # Unfortunately, it is currently difficult to figure out what has exactly
3146 # changed in this xact without Pbdelta support (RIFT-4916)
3147 # As a workaround, we can fetch the pre and post xact elements and
3148 # perform a comparison to figure out adds/deletes/updates
3149 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
3150 curr_cfgs = list(dts_member_reg.elements)
3151
3152 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
3153 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
3154
3155 # Find Adds
3156 added_keys = set(xact_key_map) - set(curr_key_map)
3157 added_cfgs = [xact_key_map[key] for key in added_keys]
3158
3159 # Find Deletes
3160 deleted_keys = set(curr_key_map) - set(xact_key_map)
3161 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
3162
3163 # Find Updates
3164 updated_keys = set(curr_key_map) & set(xact_key_map)
3165 updated_cfgs = [xact_key_map[key] for key in updated_keys
3166 if xact_key_map[key] != curr_key_map[key]]
3167
3168 return added_cfgs, deleted_cfgs, updated_cfgs
3169
3170 def get_nsr_key_pairs(dts_member_reg, xact):
3171 key_pairs = {}
3172 for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True):
3173 self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec))
3174 xpath = keyspec.to_xpath(RwNsrYang.get_schema())
3175 key_pairs[instance_cfg.name] = instance_cfg
3176 return key_pairs
3177
3178 def on_apply(dts, acg, xact, action, scratch):
3179 """Apply the configuration"""
3180 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3181 xact, action, scratch)
3182
3183 def handle_create_nsr(msg, key_pairs=None, restart_mode=False):
3184 # Handle create nsr requests """
3185 # Do some validations
3186 if not msg.has_field("nsd"):
3187 err = "NSD not provided"
3188 self._log.error(err)
3189 raise NetworkServiceRecordError(err)
3190
3191 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3192 msg.id, msg.as_dict())
3193 nsr = self.nsm.create_nsr(msg, key_pairs=key_pairs, restart_mode=restart_mode)
3194 return nsr
3195
3196 def handle_delete_nsr(msg):
3197 @asyncio.coroutine
3198 def delete_instantiation(ns_id):
3199 """ Delete instantiation """
3200 with self._dts.transaction() as xact:
3201 yield from self._nsm.terminate_ns(ns_id, xact)
3202
3203 # Handle delete NSR requests
3204 self._log.info("Delete req for NSR Id: %s received", msg.id)
3205 # Terminate the NSR instance
3206 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3207
3208 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3209 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3210 nsr.record_event("terminate-rcvd", event_descr)
3211
3212 self._loop.create_task(delete_instantiation(msg.id))
3213
3214 @asyncio.coroutine
3215 def begin_instantiation(nsr):
3216 # Begin instantiation
3217 self._log.info("Beginning NS instantiation: %s", nsr.id)
3218 yield from self._nsm.instantiate_ns(nsr.id, xact)
3219
3220 def on_instantiate_done(fut):
3221 # If the do_instantiate fails, then publish NSR with failed result
3222 e = fut.exception()
3223 if e is not None:
3224 import traceback
3225 print(traceback.format_exception(None, e, e.__traceback__), file=sys.stderr, flush=True)
3226 self._log.error("NSR instantiation failed for NSR id %s: %s", msg.id, str(e))
3227 failed_nsr = self._nsm.nsrs[msg.id]
3228 self._loop.create_task(failed_nsr.instantiation_failed(failed_reason=str(e)))
3229
3230
3231 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3232 xact, action, scratch)
3233
3234 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3235 key_pairs = []
3236 for element in self._key_pair_regh.elements:
3237 key_pairs.append(element)
3238 for element in self._nsr_regh.elements:
3239 nsr = handle_create_nsr(element, key_pairs, restart_mode=True)
3240 instantiate_task = self._loop.create_task(begin_instantiation(nsr))
3241 instantiate_task.add_done_callback(on_instantiate_done)
3242
3243
3244 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3245 xact,
3246 "id",
3247 scratch)
3248 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3249 deleted_msgs, updated_msgs)
3250
3251 for msg in added_msgs:
3252 if msg.id not in self._nsm.nsrs:
3253 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3254 key_pairs = get_nsr_key_pairs(self._key_pair_regh, xact)
3255 nsr = handle_create_nsr(msg,key_pairs)
3256 instantiate_task = self._loop.create_task(begin_instantiation(nsr))
3257 instantiate_task.add_done_callback(on_instantiate_done)
3258
3259 for msg in deleted_msgs:
3260 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3261 try:
3262 handle_delete_nsr(msg)
3263 except Exception:
3264 self._log.exception("Failed to terminate NS:%s", msg.id)
3265
3266 for msg in updated_msgs:
3267 self._log.info("Update NSR received in on_apply: %s", msg)
3268
3269 self._nsm.nsr_update_cfg(msg.id, msg)
3270
3271 if 'nsd' in msg:
3272 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3273
3274 for group in msg.scaling_group:
3275 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3276 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3277
3278 for instance_id in instance_delta["added"]:
3279 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3280
3281 for instance_id in instance_delta["deleted"]:
3282 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3283
3284
3285 return RwTypes.RwStatus.SUCCESS
3286
3287 @asyncio.coroutine
3288 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3289 """ Prepare calllback from DTS for NSR """
3290
3291 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3292 action = xact_info.query_action
3293 self._log.debug(
3294 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3295 xact, action, xact_info, xpath, msg
3296 )
3297
3298 @asyncio.coroutine
3299 def delete_instantiation(ns_id):
3300 """ Delete instantiation """
3301 yield from self._nsm.terminate_ns(ns_id, None)
3302
3303 def handle_delete_nsr():
3304 """ Handle delete NSR requests """
3305 self._log.info("Delete req for NSR Id: %s received", msg.id)
3306 # Terminate the NSR instance
3307 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3308
3309 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3310 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3311 nsr.record_event("terminate-rcvd", event_descr)
3312
3313 self._loop.create_task(delete_instantiation(msg.id))
3314
3315 fref = ProtobufC.FieldReference.alloc()
3316 fref.goto_whole_message(msg.to_pbcm())
3317
3318 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3319 # if this is an NSR create
3320 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3321 # Ensure the Cloud account/datacenter has been specified
3322 if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"):
3323 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3324
3325 # Check if nsd is specified
3326 if not msg.has_field("nsd"):
3327 raise NsrInstantiationFailed("NSD not specified in NSR")
3328
3329 else:
3330 nsr = self._nsm.nsrs[msg.id]
3331
3332 if msg.has_field("nsd"):
3333 if nsr.state != NetworkServiceRecordState.RUNNING:
3334 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3335 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3336 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3337
3338 if msg.has_field("scaling_group"):
3339 self._log.debug("ScaleMsg %s", msg)
3340 self._log.debug("NSSCALINGSTATE %s", nsr.state)
3341 if nsr.state != NetworkServiceRecordState.RUNNING:
3342 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3343
3344 if len(msg.scaling_group) > 1:
3345 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3346
3347 for group_msg in msg.scaling_group:
3348 num_new_group_instances = len(group_msg.instance)
3349 if num_new_group_instances > 1:
3350 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3351
3352 elif num_new_group_instances == 1:
3353 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
3354 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
3355 if len(scale_group.instances) == scale_group.max_instance_count:
3356 raise ScalingOperationError("Max instances for %s reached" % scale_group)
3357
3358 acg.handle.prepare_complete_ok(xact_info.handle)
3359
3360
3361 self._log.debug("Registering for NSR config using xpath: %s",
3362 NsrDtsHandler.NSR_XPATH)
3363
3364 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3365 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3366 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
3367 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3368 on_prepare=on_prepare)
3369
3370 self._scale_regh = acg.register(
3371 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
3372 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
3373 )
3374
3375 self._key_pair_regh = acg.register(
3376 xpath=NsrDtsHandler.KEY_PAIR_XPATH,
3377 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3378 )
3379
3380
3381 class NsrOpDataDtsHandler(object):
3382 """ The network service op data DTS handler """
3383 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
3384
3385 def __init__(self, dts, log, loop, nsm):
3386 self._dts = dts
3387 self._log = log
3388 self._loop = loop
3389 self._nsm = nsm
3390 self._regh = None
3391
3392 @property
3393 def regh(self):
3394 """ Return the registration handle"""
3395 return self._regh
3396
3397 @property
3398 def nsm(self):
3399 """ Return the NS manager instance """
3400 return self._nsm
3401
3402 @asyncio.coroutine
3403 def register(self):
3404 """ Register for Nsr op data publisher registration"""
3405 self._log.debug("Registering Nsr op data path %s as publisher",
3406 NsrOpDataDtsHandler.XPATH)
3407
3408 hdl = rift.tasklets.DTS.RegistrationHandler()
3409 handlers = rift.tasklets.Group.Handler()
3410 with self._dts.group_create(handler=handlers) as group:
3411 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
3412 handler=hdl,
3413 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE)
3414
3415 @asyncio.coroutine
3416 def create(self, path, msg):
3417 """
3418 Create an NS record in DTS with the path and message
3419 """
3420 self._log.debug("Creating NSR %s:%s", path, msg)
3421 self.regh.create_element(path, msg)
3422 self._log.debug("Created NSR, %s:%s", path, msg)
3423
3424 @asyncio.coroutine
3425 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
3426 """
3427 Update an NS record in DTS with the path and message
3428 """
3429 self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh)
3430 self.regh.update_element(path, msg, flags)
3431 self._log.debug("Updated NSR, %s:%s", path, msg)
3432
3433 @asyncio.coroutine
3434 def delete(self, path):
3435 """
3436 Update an NS record in DTS with the path and message
3437 """
3438 self._log.debug("Deleting NSR path:%s", path)
3439 self.regh.delete_element(path)
3440 self._log.debug("Deleted NSR path:%s", path)
3441
3442
3443 class VnfrDtsHandler(object):
3444 """ The virtual network service DTS handler """
3445 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3446
3447 def __init__(self, dts, log, loop, nsm):
3448 self._dts = dts
3449 self._log = log
3450 self._loop = loop
3451 self._nsm = nsm
3452
3453 self._regh = None
3454
3455 @property
3456 def regh(self):
3457 """ Return registration handle """
3458 return self._regh
3459
3460 @property
3461 def nsm(self):
3462 """ Return the NS manager instance """
3463 return self._nsm
3464
3465 @asyncio.coroutine
3466 def register(self):
3467 """ Register for vnfr create/update/delete/ advises from dts """
3468
3469 def on_commit(xact_info):
3470 """ The transaction has been committed """
3471 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
3472 return rwdts.MemberRspCode.ACTION_OK
3473
3474 @asyncio.coroutine
3475 def on_prepare(xact_info, action, ks_path, msg):
3476 """ prepare callback from dts """
3477 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3478 self._log.debug(
3479 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3480 xact_info, action, ks_path, msg
3481 )
3482
3483 schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
3484 path_entry = schema.keyspec_to_entry(ks_path)
3485 if path_entry.key00.id not in self._nsm._vnfrs:
3486 self._log.error("%s request for non existent record path %s",
3487 action, xpath)
3488 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
3489
3490 return
3491
3492 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3493 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3494 yield from self._nsm.update_vnfr(msg)
3495 elif action == rwdts.QueryAction.DELETE:
3496 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3497 self._nsm.delete_vnfr(path_entry.key00.id)
3498
3499 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
3500
3501 self._log.debug("Registering for VNFR using xpath: %s",
3502 VnfrDtsHandler.XPATH,)
3503
3504 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
3505 on_prepare=on_prepare,)
3506 with self._dts.group_create() as group:
3507 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
3508 handler=hdl,
3509 flags=(rwdts.Flag.SUBSCRIBER),)
3510
3511
3512 class NsManager(object):
3513 """ The Network Service Manager class"""
3514 def __init__(self, dts, log, loop,
3515 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
3516 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
3517 self._dts = dts
3518 self._log = log
3519 self._loop = loop
3520 self._nsr_handler = nsr_handler
3521 self._vnfr_pub_handler = vnfr_handler
3522 self._vlr_pub_handler = vlr_handler
3523 self._vnffgmgr = vnffgmgr
3524 self._vnfd_pub_handler = vnfd_pub_handler
3525 self._cloud_account_handler = cloud_account_handler
3526
3527 self._ro_plugin_selector = ro_plugin_selector
3528
3529 # Intialize the set of variables for implementing Scaling RPC using REST.
3530 self._headers = {"content-type":"application/json", "accept":"application/json"}
3531 #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed.
3532 self._user = 'admin'
3533 self._password = 'admin'
3534 self._ip = 'localhost'
3535 self._rport = 8008
3536 self._conf_url = "https://{ip}:{port}/api/config". \
3537 format(ip=self._ip,
3538 port=self._rport)
3539
3540 self._nsrs = {}
3541 self._nsds = {}
3542 self._vnfds = {}
3543 self._vnfrs = {}
3544
3545 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
3546
3547 # TODO: All these handlers should move to tasklet level.
3548 # Passing self is often an indication of bad design
3549 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
3550 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
3551 self._dts_handlers = [self._nsd_dts_handler,
3552 VnfrDtsHandler(dts, log, loop, self),
3553 NsrDtsHandler(dts, log, loop, self),
3554 ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback),
3555 NsrRpcDtsHandler(dts,log,loop,self),
3556 self._vnfd_dts_handler,
3557 self.cfgmgr_obj,
3558 ]
3559
3560
3561 @property
3562 def log(self):
3563 """ Log handle """
3564 return self._log
3565
3566 @property
3567 def loop(self):
3568 """ Loop """
3569 return self._loop
3570
3571 @property
3572 def dts(self):
3573 """ DTS handle """
3574 return self._dts
3575
3576 @property
3577 def nsr_handler(self):
3578 """" NSR handler """
3579 return self._nsr_handler
3580
3581 @property
3582 def so_obj(self):
3583 """" So Obj handler """
3584 return self._so_obj
3585
3586 @property
3587 def nsrs(self):
3588 """ NSRs in this NSM"""
3589 return self._nsrs
3590
3591 @property
3592 def nsds(self):
3593 """ NSDs in this NSM"""
3594 return self._nsds
3595
3596 @property
3597 def vnfds(self):
3598 """ VNFDs in this NSM"""
3599 return self._vnfds
3600
3601 @property
3602 def vnfrs(self):
3603 """ VNFRs in this NSM"""
3604 return self._vnfrs
3605
3606 @property
3607 def nsr_pub_handler(self):
3608 """ NSR publication handler """
3609 return self._nsr_handler
3610
3611 @property
3612 def vnfr_pub_handler(self):
3613 """ VNFR publication handler """
3614 return self._vnfr_pub_handler
3615
3616 @property
3617 def vlr_pub_handler(self):
3618 """ VLR publication handler """
3619 return self._vlr_pub_handler
3620
3621 @property
3622 def vnfd_pub_handler(self):
3623 return self._vnfd_pub_handler
3624
3625 @asyncio.coroutine
3626 def register(self):
3627 """ Register all static DTS handlers """
3628 for dts_handle in self._dts_handlers:
3629 yield from dts_handle.register()
3630
3631
3632 def get_ns_by_nsr_id(self, nsr_id):
3633 """ get NSR by nsr id """
3634 if nsr_id not in self._nsrs:
3635 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
3636
3637 return self._nsrs[nsr_id]
3638
3639 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
3640 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3641 nsr_id,
3642 scale_group_name,
3643 instance_id
3644 )
3645 nsr = self._nsrs[nsr_id]
3646 if nsr.state != NetworkServiceRecordState.RUNNING:
3647 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3648
3649 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
3650
3651 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
3652 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3653 nsr_id,
3654 scale_group_name,
3655 instance_id,
3656 )
3657 nsr = self._nsrs[nsr_id]
3658 if nsr.state != NetworkServiceRecordState.RUNNING:
3659 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3660
3661 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
3662
3663 def scale_rpc_callback(self, xact, msg, action):
3664 """Callback handler for RPC calls
3665 Args:
3666 xact : Transaction Handler
3667 msg : RPC input
3668 action : Scaling Action
3669 """
3670 def get_scaling_group_information():
3671 scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
3672 output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False)
3673 if output.text == None or len(output.text) == 0:
3674 self.log.error("nsr id %s information not present", self._nsr_id)
3675 return None
3676 scaling_group_info = json.loads(output.text)
3677 return scaling_group_info
3678
3679 def config_scaling_group_information(scaling_group_info):
3680 data_str = json.dumps(scaling_group_info)
3681 self.log.debug("scaling group Info %s", data_str)
3682
3683 scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
3684 response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers)
3685 response.raise_for_status()
3686
3687 def scale_out():
3688 scaling_group_info = get_scaling_group_information()
3689 if scaling_group_info is None:
3690 return
3691
3692 scaling_group_present = False
3693 if "scaling-group" in scaling_group_info["nsr:nsr"]:
3694 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
3695 for scaling_group in scaling_group_array:
3696 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
3697 scaling_group_present = True
3698 if 'instance' not in scaling_group:
3699 scaling_group['instance'] = []
3700 for instance in scaling_group['instance']:
3701 if instance["id"] == int(msg.instance_id):
3702 self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id)
3703 return
3704 scaling_group["instance"].append({"id": int(msg.instance_id)})
3705
3706 if not scaling_group_present:
3707 scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}]
3708
3709 config_scaling_group_information(scaling_group_info)
3710 return
3711
3712 def scale_in():
3713 scaling_group_info = get_scaling_group_information()
3714 if scaling_group_info is None:
3715 return
3716
3717 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
3718 scaling_group_present = False
3719 instance_id_present = False
3720 for scaling_group in scaling_group_array:
3721 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
3722 scaling_group_present = True
3723 if 'instance' in scaling_group:
3724 instance_array = scaling_group["instance"];
3725 for index in range(len(instance_array)):
3726 if instance_array[index]["id"] == int(msg.instance_id):
3727 instance_array.pop(index)
3728 instance_id_present = True
3729 break
3730
3731 if not scaling_group_present:
3732 self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref)
3733 return
3734
3735 if not instance_id_present:
3736 self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id)
3737 return
3738
3739 config_scaling_group_information(scaling_group_info)
3740 return
3741
3742 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3743 self._loop.run_in_executor(None, scale_out)
3744 else:
3745 self._loop.run_in_executor(None, scale_in)
3746
3747 def nsr_update_cfg(self, nsr_id, msg):
3748 nsr = self._nsrs[nsr_id]
3749 nsr.nsr_cfg_msg= msg
3750
3751 def nsr_instantiate_vl(self, nsr_id, vld):
3752 self.log.debug("NSR {} create VL {}".format(nsr_id, vld))
3753 nsr = self._nsrs[nsr_id]
3754 if nsr.state != NetworkServiceRecordState.RUNNING:
3755 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3756
3757 # Not calling in a separate task as this is called from a separate task
3758 yield from nsr.create_vl_instance(vld)
3759
3760 def nsr_terminate_vl(self, nsr_id, vld):
3761 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
3762 nsr = self._nsrs[nsr_id]
3763 if nsr.state != NetworkServiceRecordState.RUNNING:
3764 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3765
3766 # Not calling in a separate task as this is called from a separate task
3767 yield from nsr.delete_vl_instance(vld)
3768
3769 def create_nsr(self, nsr_msg, key_pairs=None,restart_mode=False):
3770 """ Create an NSR instance """
3771 self._log.debug("NSRMSG %s", nsr_msg)
3772 if nsr_msg.id in self._nsrs:
3773 msg = "NSR id %s already exists" % nsr_msg.id
3774 self._log.error(msg)
3775 raise NetworkServiceRecordError(msg)
3776
3777 self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3778 nsr_msg.id,
3779 nsr_msg.nsd.id)
3780
3781 nsm_plugin = self._ro_plugin_selector.ro_plugin
3782 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account)
3783
3784 nsr = NetworkServiceRecord(self._dts,
3785 self._log,
3786 self._loop,
3787 self,
3788 nsm_plugin,
3789 nsr_msg,
3790 sdn_account_name,
3791 key_pairs,
3792 restart_mode=restart_mode,
3793 vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr
3794 )
3795 self._nsrs[nsr_msg.id] = nsr
3796 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs)
3797
3798 return nsr
3799
3800 def delete_nsr(self, nsr_id):
3801 """
3802 Delete NSR with the passed nsr id
3803 """
3804 del self._nsrs[nsr_id]
3805
3806 @asyncio.coroutine
3807 def instantiate_ns(self, nsr_id, config_xact):
3808 """ Instantiate an NS instance """
3809 self._log.debug("Instantiating Network service id %s", nsr_id)
3810 if nsr_id not in self._nsrs:
3811 err = "NSR id %s not found " % nsr_id
3812 self._log.error(err)
3813 raise NetworkServiceRecordError(err)
3814
3815 nsr = self._nsrs[nsr_id]
3816 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
3817
3818 @asyncio.coroutine
3819 def update_vnfr(self, vnfr):
3820 """Create/Update an VNFR """
3821
3822 vnfr_state = self._vnfrs[vnfr.id].state
3823 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
3824
3825 yield from self._vnfrs[vnfr.id].update_state(vnfr)
3826 nsr = self.find_nsr_for_vnfr(vnfr.id)
3827 yield from nsr.update_state()
3828
3829 def find_nsr_for_vnfr(self, vnfr_id):
3830 """ Find the NSR which )has the passed vnfr id"""
3831 for nsr in list(self.nsrs.values()):
3832 for vnfr in list(nsr.vnfrs.values()):
3833 if vnfr.id == vnfr_id:
3834 return nsr
3835 return None
3836
3837 def delete_vnfr(self, vnfr_id):
3838 """ Delete VNFR with the passed id"""
3839 del self._vnfrs[vnfr_id]
3840
3841 @asyncio.coroutine
3842 def get_nsr_config(self, nsd_id):
3843 xpath = "C,/nsr:ns-instance-config"
3844 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3845
3846 for result in results:
3847 entry = yield from result
3848 ns_instance_config = entry.result
3849
3850 for nsr in ns_instance_config.nsr:
3851 if nsr.nsd.id == nsd_id:
3852 return nsr
3853
3854 return None
3855
3856 def get_nsd(self, nsd_id):
3857 """ Get network service descriptor for the passed nsd_id"""
3858 if nsd_id not in self._nsds:
3859 self._log.error("Cannot find NSD id:%s", nsd_id)
3860 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
3861
3862 return self._nsds[nsd_id]
3863
3864 def create_nsd(self, nsd_msg):
3865 """ Create a network service descriptor """
3866 self._log.debug("Create network service descriptor - %s", nsd_msg)
3867 if nsd_msg.id in self._nsds:
3868 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
3869 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
3870
3871 nsd = NetworkServiceDescriptor(
3872 self._dts,
3873 self._log,
3874 self._loop,
3875 nsd_msg,
3876 self
3877 )
3878 self._nsds[nsd_msg.id] = nsd
3879
3880 return nsd
3881
3882 def update_nsd(self, nsd):
3883 """ update the Network service descriptor """
3884 self._log.debug("Update network service descriptor - %s", nsd)
3885 if nsd.id not in self._nsds:
3886 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
3887 self.create_nsd(nsd)
3888 else:
3889 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
3890 self._nsds[nsd.id].update(nsd)
3891
3892 def delete_nsd(self, nsd_id):
3893 """ Delete the Network service descriptor with the passed id """
3894 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
3895 if nsd_id not in self._nsds:
3896 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
3897 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
3898 del self._nsds[nsd_id]
3899
3900 def get_vnfd_config(self, xact):
3901 vnfd_dts_reg = self._vnfd_dts_handler.regh
3902 for cfg in vnfd_dts_reg.get_xact_elements(xact):
3903 if cfg.id not in self._vnfds:
3904 self.create_vnfd(cfg)
3905
3906 def get_vnfd(self, vnfd_id, xact):
3907 """ Get virtual network function descriptor for the passed vnfd_id"""
3908 if vnfd_id not in self._vnfds:
3909 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3910 self.get_vnfd_config(xact)
3911
3912 if vnfd_id not in self._vnfds:
3913 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3914 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
3915
3916 return self._vnfds[vnfd_id]
3917
3918 def create_vnfd(self, vnfd):
3919 """ Create a virtual network function descriptor """
3920 self._log.debug("Create virtual network function descriptor - %s", vnfd)
3921 if vnfd.id in self._vnfds:
3922 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
3923 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
3924
3925 self._vnfds[vnfd.id] = vnfd
3926 return self._vnfds[vnfd.id]
3927
3928 def update_vnfd(self, vnfd):
3929 """ Update the virtual network function descriptor """
3930 self._log.debug("Update virtual network function descriptor- %s", vnfd)
3931
3932
3933 if vnfd.id not in self._vnfds:
3934 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
3935 self.create_vnfd(vnfd)
3936 else:
3937 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
3938 self._vnfds[vnfd.id] = vnfd
3939
3940 @asyncio.coroutine
3941 def delete_vnfd(self, vnfd_id):
3942 """ Delete the virtual network function descriptor with the passed id """
3943 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
3944 if vnfd_id not in self._vnfds:
3945 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
3946 raise VnfDescriptorError("Cannot find %s", vnfd_id)
3947
3948 del self._vnfds[vnfd_id]
3949
3950 @asyncio.coroutine
3951 def publish_nsr(self, xact, path, msg):
3952 """ Publish a NSR """
3953 self._log.debug("Publish NSR with path %s, msg %s",
3954 path, msg)
3955 yield from self.nsr_handler.update(xact, path, msg)
3956
3957 @asyncio.coroutine
3958 def unpublish_nsr(self, xact, path):
3959 """ Un Publish an NSR """
3960 self._log.debug("Publishing delete NSR with path %s", path)
3961 yield from self.nsr_handler.delete(path, xact)
3962
3963 def vnfr_is_ready(self, vnfr_id):
3964 """ VNFR with the id is ready """
3965 self._log.debug("VNFR id %s ready", vnfr_id)
3966 if vnfr_id not in self._vnfds:
3967 err = "Did not find VNFR ID with id %s" % vnfr_id
3968 self._log.critical("err")
3969 raise VirtualNetworkFunctionRecordError(err)
3970 self._vnfrs[vnfr_id].is_ready()
3971
3972
3973 @asyncio.coroutine
3974 def terminate_ns(self, nsr_id, xact):
3975 """
3976 Terminate network service for the given NSR Id
3977 """
3978
3979 # Terminate the instances/networks assocaited with this nw service
3980 self._log.debug("Terminating the network service %s", nsr_id)
3981 try :
3982 yield from self._nsrs[nsr_id].terminate()
3983 except Exception as e:
3984 self.log.exception("Failed to terminate NSR[id=%s]", nsr_id)
3985
3986 # Unpublish the NSR record
3987 self._log.debug("Unpublishing the network service %s", nsr_id)
3988 yield from self._nsrs[nsr_id].unpublish(xact)
3989
3990 # Finaly delete the NS instance from this NS Manager
3991 self._log.debug("Deletng the network service %s", nsr_id)
3992 self.delete_nsr(nsr_id)
3993
3994
3995 class NsmRecordsPublisherProxy(object):
3996 """ This class provides a publisher interface that allows plugin objects
3997 to publish NSR/VNFR/VLR"""
3998
3999 def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr):
4000 self._dts = dts
4001 self._log = log
4002 self._loop = loop
4003 self._nsr_pub_hdlr = nsr_pub_hdlr
4004 self._vlr_pub_hdlr = vlr_pub_hdlr
4005 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4006
4007 @asyncio.coroutine
4008 def publish_nsr(self, xact, nsr):
4009 """ Publish an NSR """
4010 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4011 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4012
4013 @asyncio.coroutine
4014 def unpublish_nsr(self, xact, nsr):
4015 """ Unpublish an NSR """
4016 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4017 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4018
4019 @asyncio.coroutine
4020 def publish_vnfr(self, xact, vnfr):
4021 """ Publish an VNFR """
4022 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4023 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4024
4025 @asyncio.coroutine
4026 def unpublish_vnfr(self, xact, vnfr):
4027 """ Unpublish a VNFR """
4028 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4029 return (yield from self._vnfr_pub_hdlr.delete(xact, path))
4030
4031 @asyncio.coroutine
4032 def publish_vlr(self, xact, vlr):
4033 """ Publish a VLR """
4034 path = VirtualLinkRecord.vlr_xpath(vlr)
4035 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4036
4037 @asyncio.coroutine
4038 def unpublish_vlr(self, xact, vlr):
4039 """ Unpublish a VLR """
4040 path = VirtualLinkRecord.vlr_xpath(vlr)
4041 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4042
4043
4044 class ScalingRpcHandler(mano_dts.DtsHandler):
4045 """ The Network service Monitor DTS handler """
4046 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4047 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4048
4049 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4050 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4051
4052 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4053
4054 def __init__(self, log, dts, loop, callback=None):
4055 super().__init__(log, dts, loop)
4056 self.callback = callback
4057 self.last_instance_id = defaultdict(int)
4058
4059 @asyncio.coroutine
4060 def register(self):
4061
4062 @asyncio.coroutine
4063 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4064 assert action == rwdts.QueryAction.RPC
4065
4066 try:
4067 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4068 "instance_id": msg.instance_id})
4069
4070 xact_info.respond_xpath(
4071 rwdts.XactRspCode.ACK,
4072 self.__class__.SCALE_IN_OUTPUT_XPATH,
4073 rpc_op)
4074
4075 if self.callback:
4076 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4077 except Exception as e:
4078 self.log.exception(e)
4079 xact_info.respond_xpath(
4080 rwdts.XactRspCode.NACK,
4081 self.__class__.SCALE_IN_OUTPUT_XPATH)
4082
4083 @asyncio.coroutine
4084 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4085 assert action == rwdts.QueryAction.RPC
4086
4087 try:
4088 scaling_group = msg.scaling_group_name_ref
4089 if not msg.instance_id:
4090 last_instance_id = self.last_instance_id[scale_group]
4091 msg.instance_id = last_instance_id + 1
4092 self.last_instance_id[scale_group] += 1
4093
4094 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4095 "instance_id": msg.instance_id})
4096
4097 xact_info.respond_xpath(
4098 rwdts.XactRspCode.ACK,
4099 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4100 rpc_op)
4101
4102 if self.callback:
4103 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4104 except Exception as e:
4105 self.log.exception(e)
4106 xact_info.respond_xpath(
4107 rwdts.XactRspCode.NACK,
4108 self.__class__.SCALE_OUT_OUTPUT_XPATH)
4109
4110 scale_in_hdl = rift.tasklets.DTS.RegistrationHandler(
4111 on_prepare=on_scale_in_prepare)
4112 scale_out_hdl = rift.tasklets.DTS.RegistrationHandler(
4113 on_prepare=on_scale_out_prepare)
4114
4115 with self.dts.group_create() as group:
4116 group.register(
4117 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4118 handler=scale_in_hdl,
4119 flags=rwdts.Flag.PUBLISHER)
4120 group.register(
4121 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4122 handler=scale_out_hdl,
4123 flags=rwdts.Flag.PUBLISHER)
4124
4125
4126 class NsmTasklet(rift.tasklets.Tasklet):
4127 """
4128 The network service manager tasklet
4129 """
4130 def __init__(self, *args, **kwargs):
4131 super(NsmTasklet, self).__init__(*args, **kwargs)
4132 self.rwlog.set_category("rw-mano-log")
4133 self.rwlog.set_subcategory("nsm")
4134
4135 self._dts = None
4136 self._nsm = None
4137
4138 self._ro_plugin_selector = None
4139 self._vnffgmgr = None
4140
4141 self._nsr_handler = None
4142 self._vnfr_pub_handler = None
4143 self._vlr_pub_handler = None
4144 self._vnfd_pub_handler = None
4145 self._scale_cfg_handler = None
4146
4147 self._records_publisher_proxy = None
4148
4149 def start(self):
4150 """ The task start callback """
4151 super(NsmTasklet, self).start()
4152 self.log.info("Starting NsmTasklet")
4153
4154 self.log.debug("Registering with dts")
4155 self._dts = rift.tasklets.DTS(self.tasklet_info,
4156 RwNsmYang.get_schema(),
4157 self.loop,
4158 self.on_dts_state_change)
4159
4160 self.log.debug("Created DTS Api GI Object: %s", self._dts)
4161
4162 def stop(self):
4163 try:
4164 self._dts.deinit()
4165 except Exception:
4166 print("Caught Exception in NSM stop:", sys.exc_info()[0])
4167 raise
4168
4169 def on_instance_started(self):
4170 """ Task instance started callback """
4171 self.log.debug("Got instance started callback")
4172
4173 @asyncio.coroutine
4174 def init(self):
4175 """ Task init callback """
4176 self.log.debug("Got instance started callback")
4177
4178 self.log.debug("creating config account handler")
4179
4180 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop)
4181 yield from self._nsr_pub_handler.register()
4182
4183 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop)
4184 yield from self._vnfr_pub_handler.register()
4185
4186 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop)
4187 yield from self._vlr_pub_handler.register()
4188
4189 manifest = self.tasklet_info.get_pb_manifest()
4190 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4191 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4192 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4193
4194 self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop)
4195
4196 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4197 self._dts,
4198 self.log,
4199 self.loop,
4200 self._nsr_pub_handler,
4201 self._vnfr_pub_handler,
4202 self._vlr_pub_handler,
4203 )
4204
4205 # Register the NSM to receive the nsm plugin
4206 # when cloud account is configured
4207 self._ro_plugin_selector = cloud.ROAccountPluginSelector(
4208 self._dts,
4209 self.log,
4210 self.loop,
4211 self._records_publisher_proxy,
4212 )
4213 yield from self._ro_plugin_selector.register()
4214
4215 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4216 self._log,
4217 self._dts,
4218 self.log_hdl)
4219
4220 yield from self._cloud_account_handler.register()
4221
4222 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop)
4223 yield from self._vnffgmgr.register()
4224
4225 self._nsm = NsManager(
4226 self._dts,
4227 self.log,
4228 self.loop,
4229 self._nsr_pub_handler,
4230 self._vnfr_pub_handler,
4231 self._vlr_pub_handler,
4232 self._ro_plugin_selector,
4233 self._vnffgmgr,
4234 self._vnfd_pub_handler,
4235 self._cloud_account_handler
4236 )
4237
4238 yield from self._nsm.register()
4239
4240 @asyncio.coroutine
4241 def run(self):
4242 """ Task run callback """
4243 pass
4244
4245 @asyncio.coroutine
4246 def on_dts_state_change(self, state):
4247 """Take action according to current dts state to transition
4248 application into the corresponding application state
4249
4250 Arguments
4251 state - current dts state
4252 """
4253 switch = {
4254 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
4255 rwdts.State.CONFIG: rwdts.State.RUN,
4256 }
4257
4258 handlers = {
4259 rwdts.State.INIT: self.init,
4260 rwdts.State.RUN: self.run,
4261 }
4262
4263 # Transition application to next state
4264 handler = handlers.get(state, None)
4265 if handler is not None:
4266 yield from handler()
4267
4268 # Transition dts to next state
4269 next_state = switch.get(state, None)
4270 if next_state is not None:
4271 self.log.debug("Changing state to %s", next_state)
4272 self._dts.handle.set_state(next_state)