merge from v1.0
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import ncclient
20 import ncclient.asyncio_manager
21 import os
22 import shutil
23 import sys
24 import tempfile
25 import time
26 import uuid
27 import yaml
28 import requests
29 import json
30
31
32 from collections import deque
33 from collections import defaultdict
34 from enum import Enum
35
36 import gi
37 gi.require_version('RwYang', '1.0')
38 gi.require_version('RwNsdYang', '1.0')
39 gi.require_version('RwDts', '1.0')
40 gi.require_version('RwNsmYang', '1.0')
41 gi.require_version('RwNsrYang', '1.0')
42 gi.require_version('RwTypes', '1.0')
43 gi.require_version('RwVlrYang', '1.0')
44 gi.require_version('RwVnfrYang', '1.0')
45 from gi.repository import (
46 RwYang,
47 RwNsrYang,
48 NsrYang,
49 NsdYang,
50 RwVlrYang,
51 VnfrYang,
52 RwVnfrYang,
53 RwNsmYang,
54 RwsdnYang,
55 RwDts as rwdts,
56 RwTypes,
57 ProtobufC,
58 )
59
60 import rift.tasklets
61 import rift.mano.ncclient
62 import rift.mano.config_data.config
63 import rift.mano.dts as mano_dts
64
65 from . import rwnsm_conman as conman
66 from . import cloud
67 from . import publisher
68 from . import xpath
69 from . import config_value_pool
70 from . import rwvnffgmgr
71 from . import scale_group
72
73
74 class NetworkServiceRecordState(Enum):
75 """ Network Service Record State """
76 INIT = 101
77 VL_INIT_PHASE = 102
78 VNF_INIT_PHASE = 103
79 VNFFG_INIT_PHASE = 104
80 RUNNING = 106
81 SCALING_OUT = 107
82 SCALING_IN = 108
83 TERMINATE = 109
84 TERMINATE_RCVD = 110
85 VL_TERMINATE_PHASE = 111
86 VNF_TERMINATE_PHASE = 112
87 VNFFG_TERMINATE_PHASE = 113
88 TERMINATED = 114
89 FAILED = 115
90 VL_INSTANTIATE = 116
91 VL_TERMINATE = 117
92
93
94 class NetworkServiceRecordError(Exception):
95 """ Network Service Record Error """
96 pass
97
98
99 class NetworkServiceDescriptorError(Exception):
100 """ Network Service Descriptor Error """
101 pass
102
103
104 class VirtualNetworkFunctionRecordError(Exception):
105 """ Virtual Network Function Record Error """
106 pass
107
108
109 class NetworkServiceDescriptorNotFound(Exception):
110 """ Cannot find Network Service Descriptor"""
111 pass
112
113
114 class NetworkServiceDescriptorRefCountExists(Exception):
115 """ Network Service Descriptor reference count exists """
116 pass
117
118
119 class NetworkServiceDescriptorUnrefError(Exception):
120 """ Failed to unref a network service descriptor """
121 pass
122
123
124 class NsrInstantiationFailed(Exception):
125 """ Failed to instantiate network service """
126 pass
127
128
129 class VnfInstantiationFailed(Exception):
130 """ Failed to instantiate virtual network function"""
131 pass
132
133
134 class VnffgInstantiationFailed(Exception):
135 """ Failed to instantiate virtual network function"""
136 pass
137
138
139 class VnfDescriptorError(Exception):
140 """Failed to instantiate virtual network function"""
141 pass
142
143
144 class ScalingOperationError(Exception):
145 pass
146
147
148 class ScaleGroupMissingError(Exception):
149 pass
150
151
152 class PlacementGroupError(Exception):
153 pass
154
155
156 class NsrNsdUpdateError(Exception):
157 pass
158
159
160 class NsrVlUpdateError(NsrNsdUpdateError):
161 pass
162
163
164 class VlRecordState(Enum):
165 """ VL Record State """
166 INIT = 101
167 INSTANTIATION_PENDING = 102
168 ACTIVE = 103
169 TERMINATE_PENDING = 104
170 TERMINATED = 105
171 FAILED = 106
172
173
174 class VnffgRecordState(Enum):
175 """ VNFFG Record State """
176 INIT = 101
177 INSTANTIATION_PENDING = 102
178 ACTIVE = 103
179 TERMINATE_PENDING = 104
180 TERMINATED = 105
181 FAILED = 106
182
183
184 class VnffgRecord(object):
185 """ Vnffg Records class"""
186 SFF_DP_PORT = 4790
187 SFF_MGMT_PORT = 5000
188 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name):
189
190 self._dts = dts
191 self._log = log
192 self._loop = loop
193 self._vnffgmgr = vnffgmgr
194 self._nsr = nsr
195 self._nsr_name = nsr_name
196 self._vnffgd_msg = vnffgd_msg
197 if sdn_account_name is None:
198 self._sdn_account_name = ''
199 else:
200 self._sdn_account_name = sdn_account_name
201
202 self._vnffgr_id = str(uuid.uuid4())
203 self._vnffgr_rsp_id = list()
204 self._vnffgr_state = VnffgRecordState.INIT
205
206 @property
207 def id(self):
208 """ VNFFGR id """
209 return self._vnffgr_id
210
211 @property
212 def state(self):
213 """ state of this VNF """
214 return self._vnffgr_state
215
216 def fetch_vnffgr(self):
217 """
218 Get VNFFGR message to be published
219 """
220
221 if self._vnffgr_state == VnffgRecordState.INIT:
222 vnffgr_dict = {"id": self._vnffgr_id,
223 "vnffgd_id_ref": self._vnffgd_msg.id,
224 "vnffgd_name_ref": self._vnffgd_msg.name,
225 "sdn_account": self._sdn_account_name,
226 "operational_status": 'init',
227 }
228 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
229 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
230 vnffgr_dict = {"id": self._vnffgr_id,
231 "vnffgd_id_ref": self._vnffgd_msg.id,
232 "vnffgd_name_ref": self._vnffgd_msg.name,
233 "sdn_account": self._sdn_account_name,
234 "operational_status": 'terminated',
235 }
236 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
237 else:
238 try:
239 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
240 except Exception:
241 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
242 self._vnffgr_state = VnffgRecordState.FAILED
243 vnffgr_dict = {"id": self._vnffgr_id,
244 "vnffgd_id_ref": self._vnffgd_msg.id,
245 "vnffgd_name_ref": self._vnffgd_msg.name,
246 "sdn_account": self._sdn_account_name,
247 "operational_status": 'failed',
248 }
249 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
250
251 return vnffgr
252
253 @asyncio.coroutine
254 def vnffgr_create_msg(self):
255 """ Virtual Link Record message for Creating VLR in VNS """
256 vnffgr_dict = {"id": self._vnffgr_id,
257 "vnffgd_id_ref": self._vnffgd_msg.id,
258 "vnffgd_name_ref": self._vnffgd_msg.name,
259 "sdn_account": self._sdn_account_name,
260 }
261 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
262 for rsp in self._vnffgd_msg.rsp:
263 vnffgr_rsp = vnffgr.rsp.add()
264 vnffgr_rsp.id = str(uuid.uuid4())
265 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
266 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
267 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
268 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
269 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
270 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
271 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
272 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
273 self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
274 else:
275 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref)
276 continue
277
278 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
279 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
280 vnfr_cp_ref.hop_number = rsp_cp_ref.order
281 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
282 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
283 for nsr_vnfr in self._nsr.vnfrs.values():
284 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
285 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
286 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
287 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
288 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
289
290 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
291 self._log.debug(" Received VNFR is %s", vnfr)
292 while vnfr.operational_status != 'running':
293 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
294 if vnfr.operational_status == 'failed':
295 self._log.error("Fetching VNFR for %s failed", vnfr.id)
296 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
297 yield from asyncio.sleep(2, loop=self._loop)
298 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
299 self._log.debug("Received VNFR is %s", vnfr)
300
301 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
302 for cp in vnfr.connection_point:
303 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
304 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
305 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
306 for vdu in vnfr.vdur:
307 for ext_intf in vdu.external_interface:
308 if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref:
309 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
310 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
311 vnfr_cp_ref.connection_point_params.vm_id)
312 break
313
314 vnfr_cp_ref.connection_point_params.address = cp.ip_address
315 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
316
317 for vnffgd_classifier in self._vnffgd_msg.classifier:
318 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
319 if len(_rsp) > 0:
320 rsp_id_ref = _rsp[0].id
321 rsp_name = _rsp[0].name
322 else:
323 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
324 continue
325 vnffgr_classifier = vnffgr.classifier.add()
326 vnffgr_classifier.id = vnffgd_classifier.id
327 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
328 _rsp[0].classifier_name = vnffgr_classifier.name
329 vnffgr_classifier.rsp_id_ref = rsp_id_ref
330 vnffgr_classifier.rsp_name = rsp_name
331 for nsr_vnfr in self._nsr.vnfrs.values():
332 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
333 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
334 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
335 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
336 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
337
338 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
339 vnffgr_classifier.sff_name = nsr_vnfr.name
340
341 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
342 self._log.debug(" Received VNFR is %s", vnfr)
343 while vnfr.operational_status != 'running':
344 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
345 if vnfr.operational_status == 'failed':
346 self._log.error("Fetching VNFR for %s failed", vnfr.id)
347 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
348 yield from asyncio.sleep(2, loop=self._loop)
349 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
350 self._log.debug("Received VNFR is %s", vnfr)
351
352 for cp in vnfr.connection_point:
353 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
354 vnffgr_classifier.port_id = cp.connection_point_id
355 vnffgr_classifier.ip_address = cp.ip_address
356 for vdu in vnfr.vdur:
357 for ext_intf in vdu.external_interface:
358 if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref:
359 vnffgr_classifier.vm_id = vdu.vim_id
360 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
361 vnfr_cp_ref.connection_point_params.vm_id)
362 break
363
364 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
365 return vnffgr
366
367 @asyncio.coroutine
368 def vnffgr_nsr_sff_list(self):
369 """ SFF List for VNFR """
370 sff_list = {}
371 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
372
373 for nsr_vnfr in self._nsr.vnfrs.values():
374 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
375 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
376 self._log.debug(" Received VNFR is %s", vnfr)
377 while vnfr.operational_status != 'running':
378 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
379 if vnfr.operational_status == 'failed':
380 self._log.error("Fetching VNFR for %s failed", vnfr.id)
381 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
382 yield from asyncio.sleep(2, loop=self._loop)
383 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
384 self._log.debug("Received VNFR is %s", vnfr)
385
386 sff = RwsdnYang.VNFFGSff()
387 sff_list[nsr_vnfr.vnfd.id] = sff
388 sff.name = nsr_vnfr.name
389 sff.function_type = nsr_vnfr.vnfd.service_function_chain
390
391 sff.mgmt_address = vnfr.mgmt_interface.ip_address
392 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
393 for cp in vnfr.connection_point:
394 sff_dp = sff.dp_endpoints.add()
395 sff_dp.name = self._nsr.name + '.' + cp.name
396 sff_dp.address = cp.ip_address
397 sff_dp.port = VnffgRecord.SFF_DP_PORT
398 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
399 for sf_name in sf_list:
400 _sf = sff.vnfr_list.add()
401 _sf.vnfr_name = sf_name
402
403 return sff_list
404
405 @asyncio.coroutine
406 def instantiate(self):
407 """ Instantiate this VNFFG """
408
409 self._log.info("Instaniating VNFFGR with vnffgd %s",
410 self._vnffgd_msg)
411
412
413 vnffgr_request = yield from self.vnffgr_create_msg()
414 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
415
416 try:
417 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
418 except Exception as e:
419 self._log.exception("VNFFG instantiation failed: %s", str(e))
420 self._vnffgr_state = VnffgRecordState.FAILED
421 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
422
423 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
424
425 self._log.info("Instantiated VNFFGR :%s", vnffgr)
426 self._vnffgr_state = VnffgRecordState.ACTIVE
427
428 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
429 yield from self._nsr.update_state()
430
431 def vnffgr_in_vnffgrm(self):
432 """ Is there a VNFR record in VNFM """
433 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
434 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
435 self._vnffgr_state == VnffgRecordState.FAILED):
436 return True
437
438 return False
439
440 @asyncio.coroutine
441 def terminate(self):
442 """ Terminate this VNFFGR """
443 if not self.vnffgr_in_vnffgrm():
444 self._log.error("Ignoring terminate request for id %s in state %s",
445 self.id, self._vnffgr_state)
446 return
447
448 self._log.info("Terminating VNFFGR id:%s", self.id)
449 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
450
451 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
452
453 self._vnffgr_state = VnffgRecordState.TERMINATED
454 self._log.debug("Terminated VNFFGR id:%s", self.id)
455
456
457 class VirtualLinkRecord(object):
458 """ Virtual Link Records class"""
459 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
460 @staticmethod
461 @asyncio.coroutine
462 def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False):
463 """Creates a new VLR object based on the given data.
464
465 If restart mode is enabled, then we look for existing records in the
466 DTS and create a VLR records using the exiting data(ID)
467
468 Returns:
469 VirtualLinkRecord
470 """
471 vlr_obj = VirtualLinkRecord(
472 dts,
473 log,
474 loop,
475 nsr_name,
476 vld_msg,
477 cloud_account_name,
478 om_datacenter,
479 ip_profile,
480 nsr_id,
481 )
482
483 if restart_mode:
484 res_iter = yield from dts.query_read(
485 "D,/vlr:vlr-catalog/vlr:vlr",
486 rwdts.XactFlag.MERGE)
487
488 for fut in res_iter:
489 response = yield from fut
490 vlr = response.result
491
492 # Check if the record is already present, if so use the ID of
493 # the existing record. Since the name of the record is uniquely
494 # formed we can use it as a search key!
495 if vlr.name == vlr_obj.name:
496 vlr_obj.reset_id(vlr.id)
497 break
498
499 return vlr_obj
500
501 def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id):
502 self._dts = dts
503 self._log = log
504 self._loop = loop
505 self._nsr_name = nsr_name
506 self._vld_msg = vld_msg
507 self._cloud_account_name = cloud_account_name
508 self._om_datacenter_name = om_datacenter
509 self._assigned_subnet = None
510 self._nsr_id = nsr_id
511 self._ip_profile = ip_profile
512 self._vlr_id = str(uuid.uuid4())
513 self._state = VlRecordState.INIT
514 self._prev_state = None
515 self._create_time = int(time.time())
516
517 @property
518 def xpath(self):
519 """ path for this object """
520 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id)
521
522 @property
523 def id(self):
524 """ VLR id """
525 return self._vlr_id
526
527 @property
528 def nsr_name(self):
529 """ Get NSR name for this VL """
530 return self.nsr_name
531
532 @property
533 def vld_msg(self):
534 """ Virtual Link Desciptor """
535 return self._vld_msg
536
537 @property
538 def assigned_subnet(self):
539 """ Subnet assigned to this VL"""
540 return self._assigned_subnet
541
542 @property
543 def name(self):
544 """
545 Get the name for this VLR.
546 VLR name is "nsr name:VLD name"
547 """
548 if self.vld_msg.vim_network_name:
549 return self.vld_msg.vim_network_name
550 elif self.vld_msg.name == "multisite":
551 # This is a temporary hack to identify manually provisioned inter-site network
552 return self.vld_msg.name
553 else:
554 return self._nsr_name + "." + self.vld_msg.name
555
556 @property
557 def cloud_account_name(self):
558 """ Cloud account that this VLR should be created in """
559 return self._cloud_account_name
560
561 @property
562 def om_datacenter_name(self):
563 """ Datacenter that this VLR should be created in """
564 return self._om_datacenter_name
565
566 @staticmethod
567 def vlr_xpath(vlr):
568 """ Get the VLR path from VLR """
569 return (VirtualLinkRecord.XPATH + "[vlr:id = '{}']").format(vlr.id)
570
571 @property
572 def state(self):
573 """ VLR state """
574 return self._state
575
576 @state.setter
577 def state(self, value):
578 """ VLR set state """
579 self._state = value
580
581 @property
582 def prev_state(self):
583 """ VLR previous state """
584 return self._prev_state
585
586 @prev_state.setter
587 def prev_state(self, value):
588 """ VLR set previous state """
589 self._prev_state = value
590
591 @property
592 def vlr_msg(self):
593 """ Virtual Link Record message for Creating VLR in VNS """
594 vld_fields = ["short_name",
595 "vendor",
596 "description",
597 "version",
598 "type_yang",
599 "vim_network_name",
600 "provider_network"]
601
602 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
603 if k in vld_fields}
604
605 vlr_dict = {"id": self._vlr_id,
606 "nsr_id_ref": self._nsr_id,
607 "vld_ref": self.vld_msg.id,
608 "name": self.name,
609 "create_time": self._create_time,
610 "cloud_account": self.cloud_account_name,
611 "om_datacenter": self.om_datacenter_name,
612 }
613
614 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
615 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
616
617 vlr_dict.update(vld_copy_dict)
618 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
619 return vlr
620
621 def reset_id(self, vlr_id):
622 self._vlr_id = vlr_id
623
624 def create_nsr_vlr_msg(self, vnfrs):
625 """ The VLR message"""
626 nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
627 nsr_vlr.vlr_ref = self._vlr_id
628 nsr_vlr.assigned_subnet = self.assigned_subnet
629 nsr_vlr.cloud_account = self.cloud_account_name
630 nsr_vlr.om_datacenter = self.om_datacenter_name
631
632 for conn in self.vld_msg.vnfd_connection_point_ref:
633 for vnfr in vnfrs:
634 if (vnfr.vnfd.id == conn.vnfd_id_ref and
635 vnfr.member_vnf_index == conn.member_vnf_index_ref and
636 self.cloud_account_name == vnfr.cloud_account_name and
637 self.om_datacenter_name == vnfr.om_datacenter_name):
638 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
639 cp_entry.vnfr_id = vnfr.id
640 cp_entry.connection_point = conn.vnfd_connection_point_ref
641
642 return nsr_vlr
643
644 @asyncio.coroutine
645 def instantiate(self):
646 """ Instantiate this VL """
647 self._log.debug("Instaniating VLR key %s, vld %s",
648 self.xpath, self._vld_msg)
649 vlr = None
650 self._state = VlRecordState.INSTANTIATION_PENDING
651 self._log.debug("Executing VL create path:%s msg:%s",
652 self.xpath, self.vlr_msg)
653
654 with self._dts.transaction(flags=0) as xact:
655 block = xact.block_create()
656 block.add_query_create(self.xpath, self.vlr_msg)
657 self._log.debug("Executing VL create path:%s msg:%s",
658 self.xpath, self.vlr_msg)
659 res_iter = yield from block.execute(now=True)
660 for ent in res_iter:
661 res = yield from ent
662 vlr = res.result
663
664 if vlr is None:
665 self._state = VlRecordState.FAILED
666 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
667
668 if vlr.operational_status == 'failed':
669 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
670 self._state = VlRecordState.FAILED
671 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
672
673 self._log.info("Instantiated VL with xpath %s and vlr:%s",
674 self.xpath, vlr)
675 self._state = VlRecordState.ACTIVE
676 self._assigned_subnet = vlr.assigned_subnet
677
678 def vlr_in_vns(self):
679 """ Is there a VLR record in VNS """
680 if (self._state == VlRecordState.ACTIVE or
681 self._state == VlRecordState.INSTANTIATION_PENDING or
682 self._state == VlRecordState.TERMINATE_PENDING or
683 self._state == VlRecordState.FAILED):
684 return True
685
686 return False
687
688 @asyncio.coroutine
689 def terminate(self):
690 """ Terminate this VL """
691 if not self.vlr_in_vns():
692 self._log.debug("Ignoring terminate request for id %s in state %s",
693 self.id, self._state)
694 return
695
696 self._log.debug("Terminating VL id:%s", self.id)
697 self._state = VlRecordState.TERMINATE_PENDING
698
699 with self._dts.transaction(flags=0) as xact:
700 block = xact.block_create()
701 block.add_query_delete(self.xpath)
702 yield from block.execute(flags=0, now=True)
703
704 self._state = VlRecordState.TERMINATED
705 self._log.debug("Terminated VL id:%s", self.id)
706
707
708 class VnfRecordState(Enum):
709 """ Vnf Record State """
710 INIT = 101
711 INSTANTIATION_PENDING = 102
712 ACTIVE = 103
713 TERMINATE_PENDING = 104
714 TERMINATED = 105
715 FAILED = 106
716
717
718 class VirtualNetworkFunctionRecord(object):
719 """ Virtual Network Function Record class"""
720 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
721
722 @staticmethod
723 @asyncio.coroutine
724 def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name,
725 cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id,
726 placement_groups, restart_mode=False):
727 """Creates a new VNFR object based on the given data.
728
729 If restart mode is enabled, then we look for existing records in the
730 DTS and create a VNFR records using the exiting data(ID)
731
732 Returns:
733 VirtualNetworkFunctionRecord
734 """
735 vnfr_obj = VirtualNetworkFunctionRecord(
736 dts,
737 log,
738 loop,
739 vnfd,
740 const_vnfd_msg,
741 nsd_id,
742 nsr_name,
743 cloud_account_name,
744 om_datacenter_name,
745 nsr_id,
746 group_name,
747 group_instance_id,
748 placement_groups,
749 restart_mode=restart_mode)
750
751 if restart_mode:
752 res_iter = yield from dts.query_read(
753 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
754 rwdts.XactFlag.MERGE)
755
756 for fut in res_iter:
757 response = yield from fut
758 vnfr = response.result
759
760 if vnfr.name == vnfr_obj.name:
761 vnfr_obj.reset_id(vnfr.id)
762 break
763
764 return vnfr_obj
765
766 def __init__(self,
767 dts,
768 log,
769 loop,
770 vnfd,
771 const_vnfd_msg,
772 nsd_id,
773 nsr_name,
774 cloud_account_name,
775 om_datacenter_name,
776 nsr_id,
777 group_name=None,
778 group_instance_id=None,
779 placement_groups = [],
780 restart_mode = False):
781 self._dts = dts
782 self._log = log
783 self._loop = loop
784 self._vnfd = vnfd
785 self._const_vnfd_msg = const_vnfd_msg
786 self._nsd_id = nsd_id
787 self._nsr_name = nsr_name
788 self._nsr_id = nsr_id
789 self._cloud_account_name = cloud_account_name
790 self._om_datacenter_name = om_datacenter_name
791 self._group_name = group_name
792 self._group_instance_id = group_instance_id
793 self._placement_groups = placement_groups
794 self._config_status = NsrYang.ConfigStates.INIT
795 self._create_time = int(time.time())
796
797 self._prev_state = VnfRecordState.INIT
798 self._state = VnfRecordState.INIT
799 self._state_failed_reason = None
800
801 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
802 self.configure()
803
804 self._vnfr_id = str(uuid.uuid4())
805 self._name = None
806 self._vnfr_msg = self.create_vnfr_msg()
807 self._log.debug("Set VNFR {} config type to {}".
808 format(self.name, self.config_type))
809 self.restart_mode = restart_mode
810
811
812 if group_name is None and group_instance_id is not None:
813 raise ValueError("Group instance id must not be provided with an empty group name")
814
815 @property
816 def id(self):
817 """ VNFR id """
818 return self._vnfr_id
819
820 @property
821 def xpath(self):
822 """ VNFR xpath """
823 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id)
824
825 @property
826 def vnfr_msg(self):
827 """ VNFR message """
828 return self._vnfr_msg
829
830 @property
831 def const_vnfr_msg(self):
832 """ VNFR message """
833 return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name)
834
835 @property
836 def vnfd(self):
837 """ vnfd """
838 return self._vnfd
839
840 @property
841 def cloud_account_name(self):
842 """ Cloud account that this VNF should be created in """
843 return self._cloud_account_name
844
845 @property
846 def om_datacenter_name(self):
847 """ Datacenter that this VNF should be created in """
848 return self._om_datacenter_name
849
850
851 @property
852 def active(self):
853 """ Is this VNF actve """
854 return True if self._state == VnfRecordState.ACTIVE else False
855
856 @property
857 def state(self):
858 """ state of this VNF """
859 return self._state
860
861 @property
862 def state_failed_reason(self):
863 """ Error message in case this VNF is in failed state """
864 return self._state_failed_reason
865
866 @property
867 def member_vnf_index(self):
868 """ Member VNF index """
869 return self._const_vnfd_msg.member_vnf_index
870
871 @property
872 def nsr_name(self):
873 """ NSR name"""
874 return self._nsr_name
875
876 @property
877 def name(self):
878 """ Name of this VNFR """
879 if self._name is not None:
880 return self._name
881
882 name_tags = [self._nsr_name]
883
884 if self._group_name is not None:
885 name_tags.append(self._group_name)
886
887 if self._group_instance_id is not None:
888 name_tags.append(str(self._group_instance_id))
889
890 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
891
892 self._name = "__".join(name_tags)
893
894 return self._name
895
896 @staticmethod
897 def vnfr_xpath(vnfr):
898 """ Get the VNFR path from VNFR """
899 return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id)
900
901 @property
902 def config_type(self):
903 cfg_types = ['netconf', 'juju', 'script']
904 for method in cfg_types:
905 if self._vnfd.vnf_configuration.has_field(method):
906 return method
907 return 'none'
908
909 @property
910 def config_status(self):
911 """Return the config status as YANG ENUM string"""
912 self._log.debug("Map VNFR {} config status {} ({})".
913 format(self.name, self._config_status, self.config_type))
914 if self.config_type == 'none':
915 return 'config_not_needed'
916 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
917 return 'configured'
918 elif self._config_status == NsrYang.ConfigStates.FAILED:
919 return 'failed'
920
921 return 'configuring'
922
923 def set_state(self, state):
924 """ set the state of this object """
925 self._prev_state = self._state
926 self._state = state
927
928 def reset_id(self, vnfr_id):
929 self._vnfr_id = vnfr_id
930 self._vnfr_msg = self.create_vnfr_msg()
931
932 def configure(self):
933 self.config_store.merge_vnfd_config(
934 self._nsd_id,
935 self._vnfd,
936 self.member_vnf_index,
937 )
938
939 def create_vnfr_msg(self):
940 """ VNFR message for this VNFR """
941 vnfd_fields = [
942 "short_name",
943 "vendor",
944 "description",
945 "version",
946 "type_yang",
947 ]
948 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
949 vnfr_dict = {
950 "id": self.id,
951 "nsr_id_ref": self._nsr_id,
952 "name": self.name,
953 "cloud_account": self._cloud_account_name,
954 "om_datacenter": self._om_datacenter_name,
955 "config_status": self.config_status
956 }
957 vnfr_dict.update(vnfd_copy_dict)
958
959 vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
960
961 vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
962 vnfr.member_vnf_index_ref = self.member_vnf_index
963 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
964
965 if self._vnfd.mgmt_interface.has_field("port"):
966 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
967
968 for group_info in self._placement_groups:
969 group = vnfr.placement_groups_info.add()
970 group.from_dict(group_info.as_dict())
971
972 # UI expects the monitoring param field to exist
973 vnfr.monitoring_param = []
974
975 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
976 return vnfr
977
978 @asyncio.coroutine
979 def update_vnfm(self):
980 self._log.debug("Send an update to VNFM for VNFR {} with {}".
981 format(self.name, self.vnfr_msg))
982 yield from self._dts.query_update(
983 self.xpath,
984 rwdts.XactFlag.TRACE,
985 self.vnfr_msg
986 )
987
988 def get_config_status(self):
989 """Return the config status as YANG ENUM"""
990 return self._config_status
991
992 @asyncio.coroutine
993 def set_config_status(self, status):
994
995 def status_to_string(status):
996 status_dc = {
997 NsrYang.ConfigStates.INIT : 'init',
998 NsrYang.ConfigStates.CONFIGURING : 'configuring',
999 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
1000 NsrYang.ConfigStates.CONFIGURED : 'configured',
1001 NsrYang.ConfigStates.FAILED : 'failed',
1002 }
1003
1004 return status_dc[status]
1005
1006 self._log.debug("Update VNFR {} from {} ({}) to {}".
1007 format(self.name, self._config_status,
1008 self.config_type, status))
1009 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1010 self._log.error("Updating already configured VNFR {}".
1011 format(self.name))
1012 return
1013
1014 if self._config_status != status:
1015 try:
1016 self._config_status = status
1017 # I don't think this is used. Original implementor can check.
1018 # Caused Exception, so corrected it by status_to_string
1019 # But not sure whats the use of this variable?
1020 self.vnfr_msg.config_status = status_to_string(status)
1021 except Exception as e:
1022 self._log.error("Exception=%s", str(e))
1023 pass
1024
1025 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1026
1027 if self._config_status != NsrYang.ConfigStates.INIT:
1028 try:
1029 # Publish only after VNFM has the VNFR created
1030 yield from self.update_vnfm()
1031 except Exception as e:
1032 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1033 format(status, self.name, e))
1034 self._log.exception(e)
1035
1036 def is_configured(self):
1037 if self.config_type == 'none':
1038 return True
1039
1040 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1041 return True
1042
1043 return False
1044
1045 @asyncio.coroutine
1046 def instantiate(self, nsr):
1047 """ Instantiate this VNFR"""
1048
1049 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1050 self.xpath, self._vnfd)
1051
1052 self._log.debug("Create VNF with xpath %s and vnfr %s",
1053 self.xpath, self.vnfr_msg)
1054
1055 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1056
1057 def find_vlr_for_cp(conn):
1058 """ Find VLR for the given connection point """
1059 for vlr in nsr.vlrs:
1060 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1061 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1062 vnfd_cp.vnfd_connection_point_ref == conn.name and
1063 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1064 vlr.cloud_account_name == self.cloud_account_name):
1065 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1066 conn.name, self.member_vnf_index)
1067 return vlr
1068 return None
1069
1070 # For every connection point in the VNFD fill in the identifier
1071 for conn_p in self._vnfd.connection_point:
1072 cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1073 cpr.name = conn_p.name
1074 cpr.type_yang = conn_p.type_yang
1075 vlr_ref = find_vlr_for_cp(conn_p)
1076 if vlr_ref is None:
1077 msg = "Failed to find VLR for cp = %s" % conn_p.name
1078 self._log.debug("%s", msg)
1079 # raise VirtualNetworkFunctionRecordError(msg)
1080 continue
1081
1082 cpr.vlr_ref = vlr_ref.id
1083 self.vnfr_msg.connection_point.append(cpr)
1084 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1085 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id)
1086
1087 if not self.restart_mode:
1088 yield from self._dts.query_create(self.xpath,
1089 0, # this is sub
1090 self.vnfr_msg)
1091 else:
1092 yield from self._dts.query_update(self.xpath,
1093 0,
1094 self.vnfr_msg)
1095
1096 self._log.info("Created VNF with xpath %s and vnfr %s",
1097 self.xpath, self.vnfr_msg)
1098
1099 self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1100 self.xpath, self._vnfd, self.vnfr_msg)
1101
1102 @asyncio.coroutine
1103 def update_state(self, vnfr_msg):
1104 """ Update this VNFR"""
1105 if vnfr_msg.operational_status == "running":
1106 if self.vnfr_msg.operational_status != "running":
1107 yield from self.is_active()
1108 elif vnfr_msg.operational_status == "failed":
1109 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1110
1111 @asyncio.coroutine
1112 def is_active(self):
1113 """ This VNFR is active """
1114 self._log.debug("VNFR %s is active", self._vnfr_id)
1115 self.set_state(VnfRecordState.ACTIVE)
1116
1117 @asyncio.coroutine
1118 def instantiation_failed(self, failed_reason=None):
1119 """ This VNFR instantiation failed"""
1120 self._log.error("VNFR %s instantiation failed", self._vnfr_id)
1121 self.set_state(VnfRecordState.FAILED)
1122 self._state_failed_reason = failed_reason
1123
1124 def vnfr_in_vnfm(self):
1125 """ Is there a VNFR record in VNFM """
1126 if (self._state == VnfRecordState.ACTIVE or
1127 self._state == VnfRecordState.INSTANTIATION_PENDING or
1128 self._state == VnfRecordState.FAILED):
1129 return True
1130
1131 return False
1132
1133 @asyncio.coroutine
1134 def terminate(self):
1135 """ Terminate this VNF """
1136 if not self.vnfr_in_vnfm():
1137 self._log.debug("Ignoring terminate request for id %s in state %s",
1138 self.id, self._state)
1139 return
1140
1141 self._log.debug("Terminating VNF id:%s", self.id)
1142 self.set_state(VnfRecordState.TERMINATE_PENDING)
1143 with self._dts.transaction(flags=0) as xact:
1144 block = xact.block_create()
1145 block.add_query_delete(self.xpath)
1146 yield from block.execute(flags=0)
1147 self.set_state(VnfRecordState.TERMINATED)
1148 self._log.debug("Terminated VNF id:%s", self.id)
1149
1150
1151 class NetworkServiceStatus(object):
1152 """ A class representing the Network service's status """
1153 MAX_EVENTS_RECORDED = 10
1154 """ Network service Status class"""
1155 def __init__(self, dts, log, loop):
1156 self._dts = dts
1157 self._log = log
1158 self._loop = loop
1159
1160 self._state = NetworkServiceRecordState.INIT
1161 self._events = deque([])
1162
1163 @asyncio.coroutine
1164 def create_notification(self, evt, evt_desc, evt_details):
1165 xp = "N,/rw-nsr:nsm-notification"
1166 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1167 notif.event = evt
1168 notif.description = evt_desc
1169 notif.details = evt_details if evt_details is not None else None
1170
1171 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1172 self._log.info("Notification called by creating dts query: %s", notif)
1173
1174 def record_event(self, evt, evt_desc, evt_details):
1175 """ Record an event """
1176 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1177 evt, evt_desc, len(self._events))
1178 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1179 self._events.popleft()
1180 self._events.append((int(time.time()), evt, evt_desc,
1181 evt_details if evt_details is not None else None))
1182
1183 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1184
1185 def set_state(self, state):
1186 """ set the state of this status object """
1187 self._state = state
1188
1189 def yang_str(self):
1190 """ Return the state as a yang enum string """
1191 state_to_str_map = {"INIT": "init",
1192 "VL_INIT_PHASE": "vl_init_phase",
1193 "VNF_INIT_PHASE": "vnf_init_phase",
1194 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1195 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1196 "RUNNING": "running",
1197 "SCALING_OUT": "scaling_out",
1198 "SCALING_IN": "scaling_in",
1199 "TERMINATE_RCVD": "terminate_rcvd",
1200 "TERMINATE": "terminate",
1201 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1202 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1203 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1204 "TERMINATED": "terminated",
1205 "FAILED": "failed",
1206 "VL_INSTANTIATE": "vl_instantiate",
1207 "VL_TERMINATE": "vl_terminate",
1208 }
1209 return state_to_str_map[self._state.name]
1210
1211 @property
1212 def state(self):
1213 """ State of this status object """
1214 return self._state
1215
1216 @property
1217 def msg(self):
1218 """ Network Service Record as a message"""
1219 event_list = []
1220 idx = 1
1221 for entry in self._events:
1222 event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1223 event.id = idx
1224 idx += 1
1225 event.timestamp, event.event, event.description, event.details = entry
1226 event_list.append(event)
1227 return event_list
1228
1229
1230 class NetworkServiceRecord(object):
1231 """ Network service record """
1232 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1233
1234 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False,
1235 vlr_handler=None):
1236 self._dts = dts
1237 self._log = log
1238 self._loop = loop
1239 self._nsm = nsm
1240 self._nsr_cfg_msg = nsr_cfg_msg
1241 self._nsm_plugin = nsm_plugin
1242 self._sdn_account_name = sdn_account_name
1243 self._vlr_handler = vlr_handler
1244
1245 self._nsd = None
1246 self._nsr_msg = None
1247 self._nsr_regh = None
1248 self._key_pairs = key_pairs
1249 self._vlrs = []
1250 self._vnfrs = {}
1251 self._vnfds = {}
1252 self._vnffgrs = {}
1253 self._param_pools = {}
1254 self._scaling_groups = {}
1255 self._create_time = int(time.time())
1256 self._op_status = NetworkServiceStatus(dts, log, loop)
1257 self._config_status = NsrYang.ConfigStates.CONFIGURING
1258 self._config_status_details = None
1259 self._job_id = 0
1260 self.restart_mode = restart_mode
1261 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1262 self._debug_running = False
1263 self._is_active = False
1264 self._vl_phase_completed = False
1265 self._vnf_phase_completed = False
1266 self.vlr_uptime_tasks = {}
1267
1268
1269 # Initalise the state to init
1270 # The NSR moves through the following transitions
1271 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1272 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1273 # 3. VNFS_READY - READY when the NSR is published
1274
1275 self.set_state(NetworkServiceRecordState.INIT)
1276
1277 self.substitute_input_parameters = InputParameterSubstitution(self._log)
1278
1279 @property
1280 def nsm_plugin(self):
1281 """ NSM Plugin """
1282 return self._nsm_plugin
1283
1284 def set_state(self, state):
1285 """ Set state for this NSR"""
1286 self._log.debug("Setting state to %s", state)
1287 # We are in init phase and is moving to the next state
1288 # The new state could be a FAILED state or VNF_INIIT_PHASE
1289 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1290 self._vl_phase_completed = True
1291
1292 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1293 self._vnf_phase_completed = True
1294
1295 self._op_status.set_state(state)
1296
1297 @property
1298 def id(self):
1299 """ Get id for this NSR"""
1300 return self._nsr_cfg_msg.id
1301
1302 @property
1303 def name(self):
1304 """ Name of this network service record """
1305 return self._nsr_cfg_msg.name
1306
1307 @property
1308 def cloud_account_name(self):
1309 return self._nsr_cfg_msg.cloud_account
1310
1311 @property
1312 def om_datacenter_name(self):
1313 if self._nsr_cfg_msg.has_field('om_datacenter'):
1314 return self._nsr_cfg_msg.om_datacenter
1315 return None
1316
1317 @property
1318 def state(self):
1319 """State of this NetworkServiceRecord"""
1320 return self._op_status.state
1321
1322 @property
1323 def active(self):
1324 """ Is this NSR active ?"""
1325 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1326
1327 @property
1328 def vlrs(self):
1329 """ VLRs associated with this NSR"""
1330 return self._vlrs
1331
1332 @property
1333 def vnfrs(self):
1334 """ VNFRs associated with this NSR"""
1335 return self._vnfrs
1336
1337 @property
1338 def vnffgrs(self):
1339 """ VNFFGRs associated with this NSR"""
1340 return self._vnffgrs
1341
1342 @property
1343 def scaling_groups(self):
1344 """ Scaling groups associated with this NSR """
1345 return self._scaling_groups
1346
1347 @property
1348 def param_pools(self):
1349 """ Parameter value pools associated with this NSR"""
1350 return self._param_pools
1351
1352 @property
1353 def nsr_cfg_msg(self):
1354 return self._nsr_cfg_msg
1355
1356 @nsr_cfg_msg.setter
1357 def nsr_cfg_msg(self, msg):
1358 self._nsr_cfg_msg = msg
1359
1360 @property
1361 def nsd_msg(self):
1362 """ NSD Protobuf for this NSR """
1363 if self._nsd is not None:
1364 return self._nsd
1365 self._nsd = self._nsr_cfg_msg.nsd
1366 return self._nsd
1367
1368 @property
1369 def nsd_id(self):
1370 """ NSD ID for this NSR """
1371 return self.nsd_msg.id
1372
1373 @property
1374 def job_id(self):
1375 ''' Get a new job id for config primitive'''
1376 self._job_id += 1
1377 return self._job_id
1378
1379 @property
1380 def config_status(self):
1381 """ Config status for NSR """
1382 return self._config_status
1383
1384 def resolve_placement_group_cloud_construct(self, input_group):
1385 """
1386 Returns the cloud specific construct for placement group
1387 """
1388 copy_dict = ['name', 'requirement', 'strategy']
1389
1390 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1391 if group_info.placement_group_ref == input_group.name:
1392 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1393 group_dict = {k:v for k,v in
1394 group_info.as_dict().items() if k != 'placement_group_ref'}
1395 for param in copy_dict:
1396 group_dict.update({param: getattr(input_group, param)})
1397 group.from_dict(group_dict)
1398 return group
1399 return None
1400
1401
1402 def __str__(self):
1403 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1404 self.name, self.nsd_id, self.cloud_account_name
1405 )
1406
1407 def _get_vnfd(self, vnfd_id, config_xact):
1408 """ Fetch vnfd msg for the passed vnfd id """
1409 return self._nsm.get_vnfd(vnfd_id, config_xact)
1410
1411 def _get_vnfd_cloud_account(self, vnfd_member_index):
1412 """ Fetch Cloud Account for the passed vnfd id """
1413 if self._nsr_cfg_msg.vnf_cloud_account_map:
1414 vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \
1415 if vnfd_member_index == vnf.member_vnf_index_ref]
1416 if vim_accounts and vim_accounts[0]:
1417 return vim_accounts[0]
1418 return (self.cloud_account_name,self.om_datacenter_name)
1419
1420 def _get_constituent_vnfd_msg(self, vnf_index):
1421 for const_vnfd in self.nsd_msg.constituent_vnfd:
1422 if const_vnfd.member_vnf_index == vnf_index:
1423 return const_vnfd
1424
1425 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1426
1427 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1428 """ Record an event """
1429 self._op_status.record_event(evt, evt_desc, evt_details)
1430 if state is not None:
1431 self.set_state(state)
1432
1433 def scaling_trigger_str(self, trigger):
1434 SCALING_TRIGGER_STRS = {
1435 NsdYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1436 NsdYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1437 NsdYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1438 NsdYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1439 }
1440 try:
1441 return SCALING_TRIGGER_STRS[trigger]
1442 except Exception as e:
1443 self._log.error("Scaling trigger mapping error for {} : {}".
1444 format(trigger, e))
1445 self._log.exception(e)
1446 return "Unknown trigger"
1447
1448 @asyncio.coroutine
1449 def instantiate_vls(self):
1450 """
1451 This function instantiates VLs for every VL in this Network Service
1452 """
1453 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1454 self.id)
1455 for vlr in self._vlrs:
1456 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1457 vlr.state = VlRecordState.ACTIVE
1458 self.vlr_uptime_tasks[vlr.id] = self._loop.create_task(self.vlr_uptime_update(vlr))
1459
1460
1461 def vlr_uptime_update(self, vlr):
1462 try:
1463
1464 vlr_ = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict({'id': vlr.id})
1465 while True:
1466 vlr_.uptime = int(time.time()) - vlr._create_time
1467 yield from self._vlr_handler.update(None, VirtualLinkRecord.vlr_xpath(vlr), vlr_)
1468 yield from asyncio.sleep(2, loop=self._loop)
1469 except asyncio.CancelledError:
1470 self._log.debug("Received cancellation request for vlr_uptime_update task")
1471 yield from self._vlr_handler.delete(None, VirtualLinkRecord.vlr_xpath(vlr))
1472
1473
1474 @asyncio.coroutine
1475 def create(self, config_xact):
1476 """ Create this network service"""
1477 # Create virtual links for all the external vnf
1478 # connection points in this NS
1479 yield from self.create_vls()
1480
1481 # Create VNFs in this network service
1482 yield from self.create_vnfs(config_xact)
1483
1484 # Create VNFFG for network service
1485 self.create_vnffgs()
1486
1487 # Create Scaling Groups for each scaling group in NSD
1488 self.create_scaling_groups()
1489
1490 # Create Parameter Pools
1491 self.create_param_pools()
1492
1493 @asyncio.coroutine
1494 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1495 """ Apply config based on script for scale group """
1496
1497 @asyncio.coroutine
1498 def add_vnfrs_data(vnfrs_list):
1499 """ Add as a dict each of the VNFRs data """
1500 vnfrs_data = []
1501 for vnfr in vnfrs_list:
1502 self._log.debug("Add VNFR {} data".format(vnfr))
1503 vnfr_data = dict()
1504 vnfr_data['name'] = vnfr.name
1505 if trigger in [NsdYang.ScalingTrigger.PRE_SCALE_IN, NsdYang.ScalingTrigger.POST_SCALE_OUT]:
1506 # Get VNF management and other IPs, etc
1507 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1508 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1509 try:
1510 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1511 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1512 except Exception as e:
1513 self._log.error("Unable to get management IP for vnfr {}:{}".
1514 format(vnfr.name, e))
1515
1516 try:
1517 vnfr_data['connection_points'] = []
1518 for cp in opdata.connection_point:
1519 con_pt = dict()
1520 con_pt['name'] = cp.name
1521 con_pt['ip_address'] = cp.ip_address
1522 vnfr_data['connection_points'].append(con_pt)
1523 except Exception as e:
1524 self._log.error("Exception getting connections points for VNFR {}: {}".
1525 format(vnfr.name, e))
1526
1527 vnfrs_data.append(vnfr_data)
1528 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1529
1530 return vnfrs_data
1531
1532 def add_nsr_data(nsr):
1533 nsr_data = dict()
1534 nsr_data['name'] = nsr.name
1535 return nsr_data
1536
1537 if script is None or len(script) == 0:
1538 self._log.error("Script not provided for scale group config: {}".format(group.name))
1539 return False
1540
1541 if script[0] == '/':
1542 path = script
1543 else:
1544 path = os.path.join(os.environ['RIFT_INSTALL'], "usr/bin", script)
1545 if not os.path.exists(path):
1546 self._log.error("Config faled for scale group {}: Script does not exist at {}".
1547 format(group.name, path))
1548 return False
1549
1550 # Build a YAML file with all parameters for the script to execute
1551 # The data consists of 5 sections
1552 # 1. Trigger
1553 # 2. Scale group config
1554 # 3. VNFRs in the scale group
1555 # 4. VNFRs outside scale group
1556 # 5. NSR data
1557 data = dict()
1558 data['trigger'] = group.trigger_map(trigger)
1559 data['config'] = group.group_msg.as_dict()
1560
1561 if vnfrs:
1562 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1563 else:
1564 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1565
1566 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1567 data["nsr"] = add_nsr_data(self)
1568
1569 tmp_file = None
1570 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1571 tmp_file.write(yaml.dump(data, default_flow_style=True)
1572 .encode("UTF-8"))
1573
1574 self._log.debug("Creating a temp file: {} with input data: {}".
1575 format(tmp_file.name, data))
1576
1577 cmd = "{} {}".format(path, tmp_file.name)
1578 self._log.debug("Running the CMD: {}".format(cmd))
1579 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1580 rc = yield from proc.wait()
1581 if rc:
1582 self._log.error("The script {} for scale group {} config returned: {}".
1583 format(script, group.name, rc))
1584 return False
1585
1586 # Success
1587 return True
1588
1589
1590 @asyncio.coroutine
1591 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1592 """ Apply the config for the scaling group based on trigger """
1593 if group is None or scale_instance is None:
1594 return False
1595
1596 @asyncio.coroutine
1597 def update_config_status(success=True, err_msg=None):
1598 self._log.debug("Update %s config status to %r : %s",
1599 scale_instance, success, err_msg)
1600 if (scale_instance.config_status == "failed"):
1601 # Do not update the config status if it is already in failed state
1602 return
1603
1604 if scale_instance.config_status == "configured":
1605 # Update only to failed state an already configured scale instance
1606 if not success:
1607 scale_instance.config_status = "failed"
1608 scale_instance.config_err_msg = err_msg
1609 yield from self.update_state()
1610 else:
1611 # We are in configuring state
1612 # Only after post scale out mark instance as configured
1613 if trigger == NsdYang.ScalingTrigger.POST_SCALE_OUT:
1614 if success:
1615 scale_instance.config_status = "configured"
1616 else:
1617 scale_instance.config_status = "failed"
1618 scale_instance.config_err_msg = err_msg
1619 yield from self.update_state()
1620
1621 config = group.trigger_config(trigger)
1622 if config is None:
1623 return True
1624
1625 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1626 if config.has_field("ns_config_primitive_name_ref"):
1627 config_name = config.ns_config_primitive_name_ref
1628 nsd_msg = self.nsd_msg
1629 config_primitive = None
1630 for ns_cfg_prim in nsd_msg.service_primitive:
1631 if ns_cfg_prim.name == config_name:
1632 config_primitive = ns_cfg_prim
1633 break
1634
1635 if config_primitive is None:
1636 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1637
1638 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1639 if config_primitive.has_field("user_defined_script"):
1640 rc = yield from self.apply_scale_group_config_script(config_primitive.user_defined_script,
1641 group, scale_instance, trigger, vnfrs)
1642 err_msg = None
1643 if not rc:
1644 err_msg = "Failed config for trigger {} using config script '{}'". \
1645 format(self.scaling_trigger_str(trigger),
1646 config_primitive.user_defined_script)
1647 yield from update_config_status(success=rc, err_msg=err_msg)
1648 return rc
1649 else:
1650 err_msg = "Failed config for trigger {} as config script is not specified". \
1651 format(self.scaling_trigger_str(trigger))
1652 yield from update_config_status(success=False, err_msg=err_msg)
1653 raise NotImplementedError("Only script based config support for scale group for now: {}".
1654 format(group.name))
1655 else:
1656 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1657 format(self.scaling_trigger_str(trigger))
1658 yield from update_config_status(success=False, err_msg=err_msg)
1659 self._log.error("Config primitive not specified for config action in scale group %s" %
1660 (group.name))
1661 return False
1662
1663 def create_scaling_groups(self):
1664 """ This function creates a NSScalingGroup for every scaling
1665 group defined in he NSD"""
1666
1667 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1668 self._log.debug("Found scaling_group %s in nsr id %s",
1669 scaling_group_msg.name, self.id)
1670
1671 group_record = scale_group.ScalingGroup(
1672 self._log,
1673 scaling_group_msg
1674 )
1675
1676 self._scaling_groups[group_record.name] = group_record
1677
1678 @asyncio.coroutine
1679 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1680 group = self._scaling_groups[group_name]
1681 scale_instance = group.create_instance(index, is_default)
1682
1683 @asyncio.coroutine
1684 def create_vnfs():
1685 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1686 len(self.nsd_msg.constituent_vnfd), self.id, self)
1687
1688 vnfrs = []
1689 for vnf_index, count in group.vnf_index_count_map.items():
1690 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1691 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1692
1693 cloud_account_name, om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd_msg.member_vnf_index)
1694 if cloud_account_name is None:
1695 cloud_account_name = self.cloud_account_name
1696 for _ in range(count):
1697 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index)
1698 scale_instance.add_vnfr(vnfr)
1699 vnfrs.append(vnfr)
1700
1701 return vnfrs
1702
1703 @asyncio.coroutine
1704 def instantiate_instance():
1705 self._log.debug("Creating %s VNFRS", scale_instance)
1706 vnfrs = yield from create_vnfs()
1707 yield from self.publish()
1708
1709 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1710 scale_instance.operational_status = "vnf_init_phase"
1711 yield from self.update_state()
1712
1713 try:
1714 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_OUT,
1715 group, scale_instance, vnfrs)
1716 if not rc:
1717 self._log.error("Pre scale out config for scale group {} ({}) failed".
1718 format(group.name, index))
1719 scale_instance.operational_status = "failed"
1720 else:
1721 yield from self.instantiate_vnfs(vnfrs)
1722
1723 except Exception as e:
1724 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1725 format(group.name, e))
1726 self._log.exception(e)
1727 scale_instance.operational_status = "failed"
1728
1729 yield from self.update_state()
1730
1731 yield from instantiate_instance()
1732
1733 @asyncio.coroutine
1734 def delete_scale_group_instance(self, group_name, index):
1735 group = self._scaling_groups[group_name]
1736 scale_instance = group.get_instance(index)
1737 if scale_instance.is_default:
1738 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1739
1740 scale_instance.operational_status = "terminate"
1741 yield from self.update_state()
1742
1743 @asyncio.coroutine
1744 def terminate_instance():
1745 self._log.debug("Terminating %s VNFRS" % scale_instance)
1746 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_IN,
1747 group, scale_instance)
1748 if not rc:
1749 self._log.error("Pre scale in config for scale group {} ({}) failed".
1750 format(group.name, index))
1751
1752 # Going ahead with terminate, even if there is an error in pre-scale-in config
1753 # as this could be result of scale out failure and we need to cleanup this group
1754 yield from self.terminate_vnfrs(scale_instance.vnfrs)
1755 group.delete_instance(index)
1756
1757 scale_instance.operational_status = "vnf_terminate_phase"
1758 yield from self.update_state()
1759
1760 yield from terminate_instance()
1761
1762 @asyncio.coroutine
1763 def _update_scale_group_instances_status(self):
1764 @asyncio.coroutine
1765 def post_scale_out_task(group, instance):
1766 # Apply post scale out config once all VNFRs are active
1767 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_OUT,
1768 group, instance)
1769 instance.operational_status = "running"
1770 if rc:
1771 self._log.debug("Scale out for group {} and instance {} succeeded".
1772 format(group.name, instance.instance_id))
1773 else:
1774 self._log.error("Post scale out config for scale group {} ({}) failed".
1775 format(group.name, instance.instance_id))
1776
1777 yield from self.update_state()
1778
1779 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1780 for group, instances in group_instances.items():
1781 self._log.debug("Updating %s instance status", group)
1782 for instance in instances:
1783 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
1784 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
1785 if instance.operational_status == "vnf_init_phase":
1786 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
1787 instance.operational_status = "running"
1788
1789 # Create a task for post scale out to allow us to sleep before attempting
1790 # to configure newly created VM's
1791 self._loop.create_task(post_scale_out_task(group, instance))
1792
1793 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
1794 self._log.debug("Scale out for group {} and instance {} failed".
1795 format(group.name, instance.instance_id))
1796 instance.operational_status = "failed"
1797
1798 elif instance.operational_status == "vnf_terminate_phase":
1799 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
1800 instance.operational_status = "terminated"
1801 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN,
1802 group, instance)
1803 if rc:
1804 self._log.debug("Scale in for group {} and instance {} succeeded".
1805 format(group.name, instance.instance_id))
1806 else:
1807 self._log.error("Post scale in config for scale group {} ({}) failed".
1808 format(group.name, instance.instance_id))
1809
1810 def create_vnffgs(self):
1811 """ This function creates VNFFGs for every VNFFG in the NSD
1812 associated with this NSR"""
1813
1814 for vnffgd in self.nsd_msg.vnffgd:
1815 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
1816 vnffgr = VnffgRecord(self._dts,
1817 self._log,
1818 self._loop,
1819 self._nsm._vnffgmgr,
1820 self,
1821 self.name,
1822 vnffgd,
1823 self._sdn_account_name
1824 )
1825 self._vnffgrs[vnffgr.id] = vnffgr
1826
1827 def resolve_vld_ip_profile(self, nsd_msg, vld):
1828 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1829 if not vld.has_field('ip_profile_ref'):
1830 return None
1831 profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1832 return profile[0] if profile else None
1833
1834 @asyncio.coroutine
1835 def _create_vls(self, vld, cloud_account,om_datacenter):
1836 """Create a VLR in the cloud account specified using the given VLD
1837
1838 Args:
1839 vld : VLD yang obj
1840 cloud_account : Cloud account name
1841
1842 Returns:
1843 VirtualLinkRecord
1844 """
1845 vlr = yield from VirtualLinkRecord.create_record(
1846 self._dts,
1847 self._log,
1848 self._loop,
1849 self.name,
1850 vld,
1851 cloud_account,
1852 om_datacenter,
1853 self.resolve_vld_ip_profile(self.nsd_msg, vld),
1854 self.id,
1855 restart_mode=self.restart_mode)
1856
1857 return vlr
1858
1859 def _extract_cloud_accounts_for_vl(self, vld):
1860 """
1861 Extracts the list of cloud accounts from the NS Config obj
1862
1863 Rules:
1864 1. Cloud accounts based connection point (vnf_cloud_account_map)
1865 Args:
1866 vld : VLD yang object
1867
1868 Returns:
1869 TYPE: Description
1870 """
1871 cloud_account_list = []
1872
1873 if self._nsr_cfg_msg.vnf_cloud_account_map:
1874 # Handle case where cloud_account is None
1875 vnf_cloud_map = {}
1876 for vnf in self._nsr_cfg_msg.vnf_cloud_account_map:
1877 if vnf.cloud_account is not None or vnf.om_datacenter is not None:
1878 vnf_cloud_map[vnf.member_vnf_index_ref] = (vnf.cloud_account,vnf.om_datacenter)
1879
1880 for vnfc in vld.vnfd_connection_point_ref:
1881 cloud_account = vnf_cloud_map.get(
1882 vnfc.member_vnf_index_ref,
1883 (self.cloud_account_name,self.om_datacenter_name))
1884
1885 cloud_account_list.append(cloud_account)
1886
1887 if self._nsr_cfg_msg.vl_cloud_account_map:
1888 for vld_map in self._nsr_cfg_msg.vl_cloud_account_map:
1889 if vld_map.vld_id_ref == vld.id:
1890 for cloud_account in vld_map.cloud_accounts:
1891 cloud_account_list.extend((cloud_account,None))
1892 for om_datacenter in vld_map.om_datacenters:
1893 cloud_account_list.extend((None,om_datacenter))
1894
1895 # If no config has been provided then fall-back to the default
1896 # account
1897 if not cloud_account_list:
1898 cloud_account_list = [(self.cloud_account_name,self.om_datacenter_name)]
1899
1900 self._log.debug("VL {} cloud accounts: {}".
1901 format(vld.name, cloud_account_list))
1902 return set(cloud_account_list)
1903
1904 @asyncio.coroutine
1905 def create_vls(self):
1906 """ This function creates VLs for every VLD in the NSD
1907 associated with this NSR"""
1908 for vld in self.nsd_msg.vld:
1909
1910 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
1911 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1912 for cloud_account,om_datacenter in cloud_account_list:
1913 vlr = yield from self._create_vls(vld, cloud_account,om_datacenter)
1914 self._vlrs.append(vlr)
1915
1916
1917 @asyncio.coroutine
1918 def create_vl_instance(self, vld):
1919 self._log.debug("Create VL for {}: {}".format(self.id, vld.as_dict()))
1920 # Check if the VL is already present
1921 vlr = None
1922 for vl in self._vlrs:
1923 if vl.vld_msg.id == vld.id:
1924 self._log.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1925 vld.id, self.id, vl.id, vl.state)
1926 vlr = vl
1927 if vlr.state != VlRecordState.TERMINATED:
1928 err_msg = "VLR for VL %s in NSR %s already instantiated", \
1929 vld, self.id
1930 self._log.error(err_msg)
1931 raise NsrVlUpdateError(err_msg)
1932 break
1933
1934 if vlr is None:
1935 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1936 for account,om_datacenter in cloud_account_list:
1937 vlr = yield from self._create_vls(vld, account,om_datacenter)
1938 self._vlrs.append(vlr)
1939
1940 vlr.state = VlRecordState.INSTANTIATION_PENDING
1941 yield from self.update_state()
1942
1943 try:
1944 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1945 vlr.state = VlRecordState.ACTIVE
1946
1947 except Exception as e:
1948 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
1949 format(self.id, vld.id, e)
1950 self._log.error(err_msg)
1951 self._log.exception(e)
1952 vlr.state = VlRecordState.FAILED
1953
1954 yield from self.update_state()
1955
1956 @asyncio.coroutine
1957 def delete_vl_instance(self, vld):
1958 for vlr in self._vlrs:
1959 if vlr.vld_msg.id == vld.id:
1960 self._log.debug("Found VLR %s for VLD %s in NSR %s",
1961 vlr.id, vld.id, self.id)
1962 vlr.state = VlRecordState.TERMINATE_PENDING
1963 yield from self.update_state()
1964
1965 try:
1966 yield from self.nsm_plugin.terminate_vl(vlr)
1967 vlr.state = VlRecordState.TERMINATED
1968 self._vlrs.remove(vlr)
1969
1970 except Exception as e:
1971 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
1972 format(self.id, vld.id, e)
1973 self._log.error(err_msg)
1974 self._log.exception(e)
1975 vlr.state = VlRecordState.FAILED
1976
1977 yield from self.update_state()
1978 break
1979
1980 @asyncio.coroutine
1981 def create_vnfs(self, config_xact):
1982 """
1983 This function creates VNFs for every VNF in the NSD
1984 associated with this NSR
1985 """
1986 self._log.debug("Creating %u VNFs associated with this NS id %s",
1987 len(self.nsd_msg.constituent_vnfd), self.id)
1988
1989 for const_vnfd in self.nsd_msg.constituent_vnfd:
1990 if not const_vnfd.start_by_default:
1991 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1992 const_vnfd.member_vnf_index)
1993 continue
1994
1995 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
1996 cloud_account_name,om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd.member_vnf_index)
1997 if cloud_account_name is None:
1998 cloud_account_name = self.cloud_account_name
1999 yield from self.create_vnf_record(vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name)
2000
2001
2002 def get_placement_groups(self, vnfd_msg, const_vnfd):
2003 placement_groups = []
2004 for group in self.nsd_msg.placement_groups:
2005 for member_vnfd in group.member_vnfd:
2006 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
2007 (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index):
2008 group_info = self.resolve_placement_group_cloud_construct(group)
2009 if group_info is None:
2010 self._log.error("Could not resolve cloud-construct for placement group: %s", group.name)
2011 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
2012 else:
2013 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
2014 str(group_info),
2015 vnfd_msg.name,
2016 const_vnfd.member_vnf_index)
2017 placement_groups.append(group_info)
2018 return placement_groups
2019
2020 @asyncio.coroutine
2021 def create_vnf_record(self, vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name, group_name=None, group_instance_id=None):
2022 # Fetch the VNFD associated with this VNF
2023 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
2024 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name)
2025 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2026 vnfd_msg.name,
2027 const_vnfd.member_vnf_index,
2028 [ group.name for group in placement_groups])
2029 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
2030 self._log,
2031 self._loop,
2032 vnfd_msg,
2033 const_vnfd,
2034 self.nsd_id,
2035 self.name,
2036 cloud_account_name,
2037 om_datacenter_name,
2038 self.id,
2039 group_name,
2040 group_instance_id,
2041 placement_groups,
2042 restart_mode=self.restart_mode,
2043 )
2044 if vnfr.id in self._vnfrs:
2045 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
2046 raise NetworkServiceRecordError(err)
2047
2048 self._vnfrs[vnfr.id] = vnfr
2049 self._nsm.vnfrs[vnfr.id] = vnfr
2050
2051 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
2052
2053 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2054 vnfr.name,
2055 vnfr.id)
2056
2057 return vnfr
2058
2059 def create_param_pools(self):
2060 for param_pool in self.nsd_msg.parameter_pool:
2061 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2062
2063 start_value = param_pool.range.start_value
2064 end_value = param_pool.range.end_value
2065 if end_value < start_value:
2066 raise NetworkServiceRecordError(
2067 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2068 start_value, end_value
2069 )
2070 )
2071
2072 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2073 self._log,
2074 param_pool.name,
2075 range(start_value, end_value)
2076 )
2077
2078 @asyncio.coroutine
2079 def fetch_vnfr(self, vnfr_path):
2080 """ Fetch VNFR record """
2081 vnfr = None
2082 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2083 vnfr_path, self.id)
2084 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2085
2086 for ent in res_iter:
2087 res = yield from ent
2088 vnfr = res.result
2089
2090 return vnfr
2091
2092 @asyncio.coroutine
2093 def instantiate_vnfs(self, vnfrs):
2094 """
2095 This function instantiates VNFs for every VNF in this Network Service
2096 """
2097 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2098 for vnf in vnfrs:
2099 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2100 yield from self.nsm_plugin.instantiate_vnf(self, vnf)
2101
2102 @asyncio.coroutine
2103 def instantiate_vnffgs(self):
2104 """
2105 This function instantiates VNFFGs for every VNFFG in this Network Service
2106 """
2107 self._log.debug("Instantiating %u VNFFGs in NS %s",
2108 len(self.nsd_msg.vnffgd), self.id)
2109 for _, vnfr in self.vnfrs.items():
2110 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2111 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2112 yield from asyncio.sleep(2, loop=self._loop)
2113 if vnfr.state == VnfRecordState.ACTIVE:
2114 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2115 continue
2116 else:
2117 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2118 self._vnffgr_state = VnffgRecordState.FAILED
2119 return
2120
2121 self._log.info("Waiting for 90 seconds for VMs to come up")
2122 yield from asyncio.sleep(90, loop=self._loop)
2123 self._log.info("Starting VNFFG orchestration")
2124 for vnffg in self._vnffgrs.values():
2125 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2126 yield from vnffg.instantiate()
2127
2128 @asyncio.coroutine
2129 def instantiate_scaling_instances(self, config_xact):
2130 """ Instantiate any default scaling instances in this Network Service """
2131 for group in self._scaling_groups.values():
2132 for i in range(group.min_instance_count):
2133 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2134 yield from self.create_scale_group_instance(
2135 group.name, i, config_xact, is_default=True
2136 )
2137
2138 for group_msg in self._nsr_cfg_msg.scaling_group:
2139 if group_msg.scaling_group_name_ref != group.name:
2140 continue
2141
2142 for instance in group_msg.instance:
2143 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2144 yield from self.create_scale_group_instance(
2145 group.name, instance.id, config_xact, is_default=False
2146 )
2147
2148 def has_scaling_instances(self):
2149 """ Return boolean indicating if the network service has default scaling groups """
2150 for group in self._scaling_groups.values():
2151 if group.min_instance_count > 0:
2152 return True
2153
2154 for group_msg in self._nsr_cfg_msg.scaling_group:
2155 if len(group_msg.instance) > 0:
2156 return True
2157
2158 return False
2159
2160 @asyncio.coroutine
2161 def publish(self):
2162 """ This function publishes this NSR """
2163 self._nsr_msg = self.create_msg()
2164
2165 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2166 self.nsr_xpath,
2167 self._nsr_msg)
2168
2169 if self._debug_running:
2170 self._log.debug("Publishing NSR in RUNNING state!")
2171 #raise()
2172
2173 with self._dts.transaction() as xact:
2174 yield from self._nsm.nsr_handler.update(xact, self.nsr_xpath, self._nsr_msg)
2175 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2176 self._debug_running = True
2177
2178 @asyncio.coroutine
2179 def unpublish(self, xact):
2180 """ Unpublish this NSR object """
2181 self._log.debug("Unpublishing Network service id %s", self.id)
2182 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2183
2184 @property
2185 def nsr_xpath(self):
2186 """ Returns the xpath associated with this NSR """
2187 return(
2188 "D,/nsr:ns-instance-opdata" +
2189 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2190 ).format(self.id)
2191
2192 @staticmethod
2193 def xpath_from_nsr(nsr):
2194 """ Returns the xpath associated with this NSR op data"""
2195 return (NetworkServiceRecord.XPATH +
2196 "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)
2197
2198 @property
2199 def nsd_xpath(self):
2200 """ Return NSD config xpath."""
2201 return(
2202 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2203 ).format(self.nsd_id)
2204
2205 @asyncio.coroutine
2206 def instantiate(self, config_xact):
2207 """"Instantiates a NetworkServiceRecord.
2208
2209 This function instantiates a Network service
2210 which involves the following steps,
2211
2212 * Instantiate every VL in NSD by sending create VLR request to DTS.
2213 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2214 * Publish the NSR details to DTS
2215
2216 Arguments:
2217 nsr: The NSR configuration request containing nsr-id and nsd
2218 config_xact: The configuration transaction which initiated the instatiation
2219
2220 Raises:
2221 NetworkServiceRecordError if the NSR creation fails
2222
2223 Returns:
2224 No return value
2225 """
2226
2227 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2228
2229 # Move the state to INIITALIZING
2230 self.set_state(NetworkServiceRecordState.INIT)
2231
2232 event_descr = "Instantiation Request Received NSR Id:%s" % self.id
2233 self.record_event("instantiating", event_descr)
2234
2235 # Find the NSD
2236 self._nsd = self._nsr_cfg_msg.nsd
2237
2238 try:
2239 # Update ref count if nsd present in catalog
2240 self._nsm.get_nsd_ref(self.nsd_id)
2241
2242 except NetworkServiceDescriptorError:
2243 # This could be an NSD not in the nsd-catalog
2244 pass
2245
2246 # Merge any config and initial config primitive values
2247 self.config_store.merge_nsd_config(self.nsd_msg)
2248 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2249
2250 event_descr = "Fetched NSD with descriptor id %s" % self.nsd_id
2251 self.record_event("nsd-fetched", event_descr)
2252
2253 if self._nsd is None:
2254 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2255 self._log.debug(msg, self.nsd_id, self.id)
2256 raise NetworkServiceRecordError(self)
2257
2258 self._log.debug("Got nsd result %s", self._nsd)
2259
2260 # Substitute any input parameters
2261 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2262
2263 # Create the record
2264 yield from self.create(config_xact)
2265
2266 # Publish the NSR to DTS
2267 yield from self.publish()
2268
2269 @asyncio.coroutine
2270 def do_instantiate():
2271 """
2272 Instantiate network service
2273 """
2274 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2275 self.id, self.nsd_id)
2276
2277 # instantiate the VLs
2278 event_descr = ("Instantiating %s external VLs for NSR id %s" %
2279 (len(self.nsd_msg.vld), self.id))
2280 self.record_event("begin-external-vls-instantiation", event_descr)
2281
2282 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2283
2284 yield from self.instantiate_vls()
2285
2286 # Publish the NSR to DTS
2287 yield from self.publish()
2288
2289 event_descr = ("Finished instantiating %s external VLs for NSR id %s" %
2290 (len(self.nsd_msg.vld), self.id))
2291 self.record_event("end-external-vls-instantiation", event_descr)
2292
2293 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2294
2295 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2296 self.id, self.nsd_id)
2297
2298 # instantiate the VNFs
2299 event_descr = ("Instantiating %s VNFS for NSR id %s" %
2300 (len(self.nsd_msg.constituent_vnfd), self.id))
2301
2302 self.record_event("begin-vnf-instantiation", event_descr)
2303
2304 yield from self.instantiate_vnfs(self._vnfrs.values())
2305
2306 self._log.debug(" Finished instantiating %d VNFs for NSR id %s",
2307 len(self.nsd_msg.constituent_vnfd), self.id)
2308
2309 event_descr = ("Finished instantiating %s VNFs for NSR id %s" %
2310 (len(self.nsd_msg.constituent_vnfd), self.id))
2311 self.record_event("end-vnf-instantiation", event_descr)
2312
2313 if len(self.vnffgrs) > 0:
2314 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2315 event_descr = ("Instantiating %s VNFFGS for NSR id %s" %
2316 (len(self.nsd_msg.vnffgd), self.id))
2317
2318 self.record_event("begin-vnffg-instantiation", event_descr)
2319
2320 yield from self.instantiate_vnffgs()
2321
2322 event_descr = ("Finished instantiating %s VNFFGDs for NSR id %s" %
2323 (len(self.nsd_msg.vnffgd), self.id))
2324 self.record_event("end-vnffg-instantiation", event_descr)
2325
2326 if self.has_scaling_instances():
2327 event_descr = ("Instantiating %s Scaling Groups for NSR id %s" %
2328 (len(self._scaling_groups), self.id))
2329
2330 self.record_event("begin-scaling-group-instantiation", event_descr)
2331 yield from self.instantiate_scaling_instances(config_xact)
2332 self.record_event("end-scaling-group-instantiation", event_descr)
2333
2334 # Give the plugin a chance to deploy the network service now that all
2335 # virtual links and vnfs are instantiated
2336 yield from self.nsm_plugin.deploy(self._nsr_msg)
2337
2338 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2339 self.id, self.nsd_id)
2340
2341 # Publish the NSR to DTS
2342 yield from self.publish()
2343
2344 self._log.debug("Published NSR...... nsr[%s], nsd[%s]",
2345 self.id, self.nsd_id)
2346
2347 def on_instantiate_done(fut):
2348 # If the do_instantiate fails, then publish NSR with failed result
2349 if fut.exception() is not None:
2350 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(fut.exception()))
2351 self._loop.create_task(self.instantiation_failed(failed_reason=str(fut.exception())))
2352
2353 instantiate_task = self._loop.create_task(do_instantiate())
2354 instantiate_task.add_done_callback(on_instantiate_done)
2355
2356 @asyncio.coroutine
2357 def set_config_status(self, status, status_details=None):
2358 if self.config_status != status:
2359 self._log.debug("Updating NSR {} status for {} to {}".
2360 format(self.name, self.config_status, status))
2361 self._config_status = status
2362 self._config_status_details = status_details
2363
2364 if self._config_status == NsrYang.ConfigStates.FAILED:
2365 self.record_event("config-failed", "NS configuration failed",
2366 evt_details=self._config_status_details)
2367
2368 yield from self.publish()
2369
2370 @asyncio.coroutine
2371 def is_active(self):
2372 """ This NS is active """
2373 self.set_state(NetworkServiceRecordState.RUNNING)
2374 if self._is_active:
2375 return
2376
2377 # Publish the NSR to DTS
2378 self._log.debug("Network service %s is active ", self.id)
2379 self._is_active = True
2380
2381 event_descr = "NSR in running state for NSR id %s" % self.id
2382 self.record_event("ns-running", event_descr)
2383
2384 yield from self.publish()
2385
2386 @asyncio.coroutine
2387 def instantiation_failed(self, failed_reason=None):
2388 """ The NS instantiation failed"""
2389 self._log.error("Network service id:%s, name:%s instantiation failed",
2390 self.id, self.name)
2391 self.set_state(NetworkServiceRecordState.FAILED)
2392
2393 event_descr = "Instantiation of NS %s failed" % self.id
2394 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2395
2396 # Publish the NSR to DTS
2397 yield from self.publish()
2398
2399 @asyncio.coroutine
2400 def terminate_vnfrs(self, vnfrs):
2401 """ Terminate VNFRS in this network service """
2402 self._log.debug("Terminating VNFs in network service %s", self.id)
2403 for vnfr in vnfrs:
2404 yield from self.nsm_plugin.terminate_vnf(vnfr)
2405
2406 @asyncio.coroutine
2407 def terminate(self):
2408 """ Terminate a NetworkServiceRecord."""
2409 def terminate_vnffgrs():
2410 """ Terminate VNFFGRS in this network service """
2411 self._log.debug("Terminating VNFFGRs in network service %s", self.id)
2412 for vnffgr in self.vnffgrs.values():
2413 yield from vnffgr.terminate()
2414
2415 def terminate_vlrs():
2416 """ Terminate VLRs in this netork service """
2417 self._log.debug("Terminating VLs in network service %s", self.id)
2418 for vlr in self.vlrs:
2419 yield from self.nsm_plugin.terminate_vl(vlr)
2420 vlr.state = VlRecordState.TERMINATED
2421 if vlr.id in self.vlr_uptime_tasks:
2422 self.vlr_uptime_tasks[vlr.id].cancel()
2423
2424 self._log.debug("Terminating network service id %s", self.id)
2425
2426 # Move the state to TERMINATE
2427 self.set_state(NetworkServiceRecordState.TERMINATE)
2428 event_descr = "Terminate being processed for NS Id:%s" % self.id
2429 self.record_event("terminate", event_descr)
2430
2431 # Move the state to VNF_TERMINATE_PHASE
2432 self._log.debug("Terminating VNFFGs in NS ID: %s", self.id)
2433 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2434 event_descr = "Terminating VNFFGS in NS Id:%s" % self.id
2435 self.record_event("terminating-vnffgss", event_descr)
2436 yield from terminate_vnffgrs()
2437
2438 # Move the state to VNF_TERMINATE_PHASE
2439 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2440 event_descr = "Terminating VNFS in NS Id:%s" % self.id
2441 self.record_event("terminating-vnfs", event_descr)
2442 yield from self.terminate_vnfrs(self.vnfrs.values())
2443
2444 # Move the state to VL_TERMINATE_PHASE
2445 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2446 event_descr = "Terminating VLs in NS Id:%s" % self.id
2447 self.record_event("terminating-vls", event_descr)
2448 yield from terminate_vlrs()
2449
2450 yield from self.nsm_plugin.terminate_ns(self)
2451
2452 # Move the state to TERMINATED
2453 self.set_state(NetworkServiceRecordState.TERMINATED)
2454 event_descr = "Terminated NS Id:%s" % self.id
2455 self.record_event("terminated", event_descr)
2456
2457 def enable(self):
2458 """"Enable a NetworkServiceRecord."""
2459 pass
2460
2461 def disable(self):
2462 """"Disable a NetworkServiceRecord."""
2463 pass
2464
2465 def map_config_status(self):
2466 self._log.debug("Config status for ns {} is {}".
2467 format(self.name, self._config_status))
2468 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2469 return 'configuring'
2470 if self._config_status == NsrYang.ConfigStates.FAILED:
2471 return 'failed'
2472 return 'configured'
2473
2474 def vl_phase_completed(self):
2475 """ Are VLs created in this NS?"""
2476 return self._vl_phase_completed
2477
2478 def vnf_phase_completed(self):
2479 """ Are VLs created in this NS?"""
2480 return self._vnf_phase_completed
2481
2482 def create_msg(self):
2483 """ The network serice record as a message """
2484 nsr_dict = {"ns_instance_config_ref": self.id}
2485 nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2486 #nsr.cloud_account = self.cloud_account_name
2487 nsr.sdn_account = self._sdn_account_name
2488 nsr.name_ref = self.name
2489 nsr.nsd_ref = self.nsd_id
2490 nsr.nsd_name_ref = self.nsd_msg.name
2491 nsr.operational_events = self._op_status.msg
2492 nsr.operational_status = self._op_status.yang_str()
2493 nsr.config_status = self.map_config_status()
2494 nsr.config_status_details = self._config_status_details
2495 nsr.create_time = self._create_time
2496 nsr.uptime = int(time.time()) - self._create_time
2497
2498 for cfg_prim in self.nsd_msg.service_primitive:
2499 cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2500 cfg_prim.as_dict())
2501 nsr.service_primitive.append(cfg_prim)
2502
2503 for init_cfg in self.nsd_msg.initial_config_primitive:
2504 prim = NsrYang.NsrInitialConfigPrimitive.from_dict(
2505 init_cfg.as_dict())
2506 nsr.initial_config_primitive.append(prim)
2507
2508 if self.vl_phase_completed():
2509 for vlr in self.vlrs:
2510 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2511
2512 if self.vnf_phase_completed():
2513 for vnfr_id in self.vnfrs:
2514 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2515 for vnffgr in self.vnffgrs.values():
2516 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2517 for scaling_group in self._scaling_groups.values():
2518 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2519
2520 return nsr
2521
2522 def all_vnfs_active(self):
2523 """ Are all VNFS in this NS active? """
2524 for _, vnfr in self.vnfrs.items():
2525 if vnfr.active is not True:
2526 return False
2527 return True
2528
2529 @asyncio.coroutine
2530 def update_state(self):
2531 """ Re-evaluate this NS's state """
2532 curr_state = self._op_status.state
2533
2534 if curr_state == NetworkServiceRecordState.TERMINATED:
2535 self._log.debug("NS (%s) in terminated state, not updating state", self.id)
2536 return
2537
2538 new_state = NetworkServiceRecordState.RUNNING
2539 self._log.info("Received update_state for nsr: %s, curr-state: %s",
2540 self.id, curr_state)
2541
2542 # Check all the VNFRs are present
2543 for _, vnfr in self.vnfrs.items():
2544 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2545 pass
2546 elif vnfr.state == VnfRecordState.FAILED:
2547 if vnfr._prev_state != vnfr.state:
2548 event_descr = "Instantiation of VNF %s failed" % vnfr.id
2549 event_error_details = vnfr.state_failed_reason
2550 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2551 vnfr.set_state(VnfRecordState.FAILED)
2552 else:
2553 self._log.info("VNF state did not change, curr=%s, prev=%s",
2554 vnfr.state, vnfr._prev_state)
2555 new_state = NetworkServiceRecordState.FAILED
2556 break
2557 else:
2558 self._log.info("VNF %s in NSR %s is still not active; current state is: %s",
2559 vnfr.id, self.id, vnfr.state)
2560 new_state = curr_state
2561
2562 # If new state is RUNNING; check all VLs
2563 if new_state == NetworkServiceRecordState.RUNNING:
2564 for vl in self.vlrs:
2565
2566 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2567 pass
2568 elif vl.state == VlRecordState.FAILED:
2569 if vl.prev_state != vl.state:
2570 event_descr = "Instantiation of VL %s failed" % vl.id
2571 event_error_details = vl.state_failed_reason
2572 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2573 vl.prev_state = vl.state
2574 else:
2575 self._log.debug("VL %s already in failed state")
2576 else:
2577 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2578 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2579 break
2580
2581 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2582 new_state = NetworkServiceRecordState.VL_TERMINATE
2583 break
2584
2585 # If new state is RUNNING; check VNFFGRs are also active
2586 if new_state == NetworkServiceRecordState.RUNNING:
2587 for _, vnffgr in self.vnffgrs.items():
2588 self._log.info("Checking vnffgr state for nsr %s is: %s",
2589 self.id, vnffgr.state)
2590 if vnffgr.state == VnffgRecordState.ACTIVE:
2591 pass
2592 elif vnffgr.state == VnffgRecordState.FAILED:
2593 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2594 self.record_event("vnffg-failed", event_descr)
2595 new_state = NetworkServiceRecordState.FAILED
2596 break
2597 else:
2598 self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2599 vnffgr.id, self.id, vnffgr.state)
2600 new_state = curr_state
2601
2602 # Update all the scaling group instance operational status to
2603 # reflect the state of all VNFR within that instance
2604 yield from self._update_scale_group_instances_status()
2605
2606 for _, group in self._scaling_groups.items():
2607 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
2608 new_state = NetworkServiceRecordState.SCALING_OUT
2609 break
2610 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
2611 new_state = NetworkServiceRecordState.SCALING_IN
2612 break
2613
2614 if new_state != curr_state:
2615 self._log.debug("Changing state of Network service %s from %s to %s",
2616 self.id, curr_state, new_state)
2617 if new_state == NetworkServiceRecordState.RUNNING:
2618 yield from self.is_active()
2619 elif new_state == NetworkServiceRecordState.FAILED:
2620 # If the NS is already active and we entered scaling_in, scaling_out,
2621 # do not mark the NS as failing if scaling operation failed.
2622 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
2623 NetworkServiceRecordState.SCALING_IN] and self._is_active:
2624 new_state = NetworkServiceRecordState.RUNNING
2625 self.set_state(new_state)
2626 else:
2627 yield from self.instantiation_failed()
2628 else:
2629 self.set_state(new_state)
2630
2631 yield from self.publish()
2632
2633
2634 class InputParameterSubstitution(object):
2635 """
2636 This class is responsible for substituting input parameters into an NSD.
2637 """
2638
2639 def __init__(self, log):
2640 """Create an instance of InputParameterSubstitution
2641
2642 Arguments:
2643 log - a logger for this object to use
2644
2645 """
2646 self.log = log
2647
2648 def __call__(self, nsd, nsr_config):
2649 """Substitutes input parameters from the NSR config into the NSD
2650
2651 This call modifies the provided NSD with the input parameters that are
2652 contained in the NSR config.
2653
2654 Arguments:
2655 nsd - a GI NSD object
2656 nsr_config - a GI NSR config object
2657
2658 """
2659 if nsd is None or nsr_config is None:
2660 return
2661
2662 # Create a lookup of the xpath elements that this descriptor allows
2663 # to be modified
2664 optional_input_parameters = set()
2665 for input_parameter in nsd.input_parameter_xpath:
2666 optional_input_parameters.add(input_parameter.xpath)
2667
2668 # Apply the input parameters to the descriptor
2669 if nsr_config.input_parameter:
2670 for param in nsr_config.input_parameter:
2671 if param.xpath not in optional_input_parameters:
2672 msg = "tried to set an invalid input parameter ({})"
2673 self.log.error(msg.format(param.xpath))
2674 continue
2675
2676 self.log.debug(
2677 "input-parameter:{} = {}".format(
2678 param.xpath,
2679 param.value,
2680 )
2681 )
2682
2683 try:
2684 xpath.setxattr(nsd, param.xpath, param.value)
2685
2686 except Exception as e:
2687 self.log.exception(e)
2688
2689
2690 class NetworkServiceDescriptor(object):
2691 """
2692 Network service descriptor class
2693 """
2694
2695 def __init__(self, dts, log, loop, nsd, nsm):
2696 self._dts = dts
2697 self._log = log
2698 self._loop = loop
2699
2700 self._nsd = nsd
2701 self._ref_count = 0
2702
2703 self._nsm = nsm
2704
2705 @property
2706 def id(self):
2707 """ Returns nsd id """
2708 return self._nsd.id
2709
2710 @property
2711 def name(self):
2712 """ Returns name of nsd """
2713 return self._nsd.name
2714
2715 @property
2716 def ref_count(self):
2717 """ Returns reference count"""
2718 return self._ref_count
2719
2720 def in_use(self):
2721 """ Returns whether nsd is in use or not """
2722 return True if self.ref_count > 0 else False
2723
2724 def ref(self):
2725 """ Take a reference on this object """
2726 self._ref_count += 1
2727
2728 def unref(self):
2729 """ Release reference on this object """
2730 if self.ref_count < 1:
2731 msg = ("Unref on a NSD object - nsd id %s, ref_count = %s" %
2732 (self.id, self.ref_count))
2733 self._log.critical(msg)
2734 raise NetworkServiceDescriptorError(msg)
2735 self._ref_count -= 1
2736
2737 @property
2738 def msg(self):
2739 """ Return the message associated with this NetworkServiceDescriptor"""
2740 return self._nsd
2741
2742 @staticmethod
2743 def path_for_id(nsd_id):
2744 """ Return path for the passed nsd_id"""
2745 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id)
2746
2747 def path(self):
2748 """ Return the message associated with this NetworkServiceDescriptor"""
2749 return NetworkServiceDescriptor.path_for_id(self.id)
2750
2751 def update(self, nsd):
2752 """ Update the NSD descriptor """
2753 self._nsd = nsd
2754
2755
2756 class NsdDtsHandler(object):
2757 """ The network service descriptor DTS handler """
2758 XPATH = "C,/nsd:nsd-catalog/nsd:nsd"
2759
2760 def __init__(self, dts, log, loop, nsm):
2761 self._dts = dts
2762 self._log = log
2763 self._loop = loop
2764 self._nsm = nsm
2765
2766 self._regh = None
2767
2768 @property
2769 def regh(self):
2770 """ Return registration handle """
2771 return self._regh
2772
2773 @asyncio.coroutine
2774 def register(self):
2775 """ Register for Nsd create/update/delete/read requests from dts """
2776
2777 def on_apply(dts, acg, xact, action, scratch):
2778 """Apply the configuration"""
2779 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2780 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2781 xact, action)
2782 # Create/Update an NSD record
2783 for cfg in self._regh.get_xact_elements(xact):
2784 # Only interested in those NSD cfgs whose ID was received in prepare callback
2785 if cfg.id in scratch.get('nsds', []) or is_recovery:
2786 self._nsm.update_nsd(cfg)
2787
2788 scratch.pop('nsds', None)
2789
2790 return RwTypes.RwStatus.SUCCESS
2791
2792 @asyncio.coroutine
2793 def delete_nsd_libs(nsd_id):
2794 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2795 try:
2796 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2797 nsd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', nsd_id)
2798
2799 if os.path.exists (nsd_dir):
2800 shutil.rmtree(nsd_dir, ignore_errors=True)
2801 except Exception as e:
2802 self._log.error("Exception in cleaning up NSD libs {}: {}".
2803 format(nsd_id, e))
2804 self._log.excpetion(e)
2805
2806 @asyncio.coroutine
2807 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2808 """ Prepare callback from DTS for NSD config """
2809
2810 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
2811 msg.id, msg)
2812
2813 fref = ProtobufC.FieldReference.alloc()
2814 fref.goto_whole_message(msg.to_pbcm())
2815
2816 if fref.is_field_deleted():
2817 # Delete an NSD record
2818 self._log.debug("Deleting NSD with id %s", msg.id)
2819 if self._nsm.nsd_in_use(msg.id):
2820 self._log.debug("Cannot delete NSD in use - %s", msg.id)
2821 err = "Cannot delete an NSD in use - %s" % msg.id
2822 raise NetworkServiceDescriptorRefCountExists(err)
2823
2824 yield from delete_nsd_libs(msg.id)
2825 self._nsm.delete_nsd(msg.id)
2826 else:
2827 # Add this NSD to scratch to create/update in apply callback
2828 nsds = scratch.setdefault('nsds', [])
2829 nsds.append(msg.id)
2830 # acg._scratch['nsds'].append(msg.id)
2831
2832 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2833
2834 self._log.debug(
2835 "Registering for NSD config using xpath: %s",
2836 NsdDtsHandler.XPATH,
2837 )
2838
2839 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2840 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2841 # Need a list in scratch to store NSDs to create/update later
2842 # acg._scratch['nsds'] = list()
2843 self._regh = acg.register(
2844 xpath=NsdDtsHandler.XPATH,
2845 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
2846 on_prepare=on_prepare)
2847
2848
2849 class VnfdDtsHandler(object):
2850 """ DTS handler for VNFD config changes """
2851 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2852
2853 def __init__(self, dts, log, loop, nsm):
2854 self._dts = dts
2855 self._log = log
2856 self._loop = loop
2857 self._nsm = nsm
2858 self._regh = None
2859
2860 @property
2861 def regh(self):
2862 """ DTS registration handle """
2863 return self._regh
2864
2865 @asyncio.coroutine
2866 def register(self):
2867 """ Register for VNFD configuration"""
2868
2869 @asyncio.coroutine
2870 def on_apply(dts, acg, xact, action, scratch):
2871 """Apply the configuration"""
2872 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2873 xact, action, scratch)
2874
2875 # Create/Update a VNFD record
2876 for cfg in self._regh.get_xact_elements(xact):
2877 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2878 if cfg.id in scratch.get('vnfds', []):
2879 self._nsm.update_vnfd(cfg)
2880
2881 for cfg in self._regh.elements:
2882 if cfg.id in scratch.get('deleted_vnfds', []):
2883 yield from self._nsm.delete_vnfd(cfg.id)
2884
2885 scratch.pop('vnfds', None)
2886 scratch.pop('deleted_vnfds', None)
2887
2888 @asyncio.coroutine
2889 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2890 """ on prepare callback """
2891 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2892 ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg)
2893
2894 fref = ProtobufC.FieldReference.alloc()
2895 fref.goto_whole_message(msg.to_pbcm())
2896
2897 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2898 if fref.is_field_deleted():
2899 self._log.debug("Adding msg to deleted field")
2900 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
2901 deleted_vnfds.append(msg.id)
2902 else:
2903 # Add this VNFD to scratch to create/update in apply callback
2904 vnfds = scratch.setdefault('vnfds', [])
2905 vnfds.append(msg.id)
2906
2907 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2908
2909 self._log.debug(
2910 "Registering for VNFD config using xpath: %s",
2911 VnfdDtsHandler.XPATH,
2912 )
2913 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2914 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2915 # Need a list in scratch to store VNFDs to create/update later
2916 # acg._scratch['vnfds'] = list()
2917 # acg._scratch['deleted_vnfds'] = list()
2918 self._regh = acg.register(
2919 xpath=VnfdDtsHandler.XPATH,
2920 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2921 on_prepare=on_prepare)
2922
2923 class NsrRpcDtsHandler(object):
2924 """ The network service instantiation RPC DTS handler """
2925 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
2926 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
2927 NETCONF_IP_ADDRESS = "127.0.0.1"
2928 NETCONF_PORT = 2022
2929 RESTCONF_PORT = 8888
2930 NETCONF_USER = "admin"
2931 NETCONF_PW = "admin"
2932 REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1",8888)
2933
2934 def __init__(self, dts, log, loop, nsm):
2935 self._dts = dts
2936 self._log = log
2937 self._loop = loop
2938 self._nsm = nsm
2939 self._nsd = None
2940
2941 self._ns_regh = None
2942
2943 self._manager = None
2944 self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + 'config/ns-instance-config'
2945
2946 self._model = RwYang.Model.create_libncx()
2947 self._model.load_schema_ypbc(RwNsrYang.get_schema())
2948
2949 @property
2950 def nsm(self):
2951 """ Return the NS manager instance """
2952 return self._nsm
2953
2954 @staticmethod
2955 def wrap_netconf_config_xml(xml):
2956 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
2957 return xml
2958
2959 @asyncio.coroutine
2960 def _connect(self, timeout_secs=240):
2961
2962 start_time = time.time()
2963 while (time.time() - start_time) < timeout_secs:
2964
2965 try:
2966 self._log.debug("Attemping NsmTasklet netconf connection.")
2967
2968 manager = yield from ncclient.asyncio_manager.asyncio_connect(
2969 loop=self._loop,
2970 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
2971 port=NsrRpcDtsHandler.NETCONF_PORT,
2972 username=NsrRpcDtsHandler.NETCONF_USER,
2973 password=NsrRpcDtsHandler.NETCONF_PW,
2974 allow_agent=False,
2975 look_for_keys=False,
2976 hostkey_verify=False,
2977 )
2978
2979 return manager
2980
2981 except ncclient.transport.errors.SSHError as e:
2982 self._log.warning("Netconf connection to launchpad %s failed: %s",
2983 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
2984
2985 yield from asyncio.sleep(5, loop=self._loop)
2986
2987 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2988 timeout_secs)
2989
2990 def _apply_ns_instance_config(self,payload_dict):
2991 #self._log.debug("At apply NS instance config with payload %s",payload_dict)
2992 req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
2993 response=requests.post(self._nsr_config_url, headers=req_hdr, auth=('admin', 'admin'),data=payload_dict,verify=False)
2994 return response
2995
2996 @asyncio.coroutine
2997 def register(self):
2998 """ Register for NS monitoring read from dts """
2999 @asyncio.coroutine
3000 def on_ns_config_prepare(xact_info, action, ks_path, msg):
3001 """ prepare callback from dts start-network-service"""
3002 assert action == rwdts.QueryAction.RPC
3003 rpc_ip = msg
3004 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
3005 "nsr_id":str(uuid.uuid4())
3006 })
3007
3008 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
3009 self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip))
3010
3011
3012 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
3013
3014 try:
3015 # Add used value to the pool
3016 self._log.debug("RPC output: {}".format(rpc_op))
3017
3018 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
3019
3020 #if not self._manager:
3021 # self._manager = yield from self._connect()
3022
3023 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
3024 rpc_ip.name, rpc_ip.nsd_ref)
3025
3026 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
3027 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
3028 if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields}
3029 ns_instance_config_dict.update(ns_instance_config_copy_dict)
3030
3031 ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
3032 ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
3033 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
3034
3035 payload_dict = ns_instance_config.to_json(self._model)
3036 #xml = ns_instance_config.to_xml_v2(self._model)
3037 #netconf_xml = self.wrap_netconf_config_xml(xml)
3038
3039 #self._log.debug("Sending configure ns-instance-config xml to %s: %s",
3040 # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
3041 self._log.debug("Sending configure ns-instance-config json to %s: %s",
3042 self._nsr_config_url,ns_instance_config)
3043
3044 #response = yield from self._manager.edit_config(
3045 # target="running",
3046 # config=netconf_xml,
3047 # )
3048 response = yield from self._loop.run_in_executor(
3049 None,
3050 self._apply_ns_instance_config,
3051 payload_dict
3052 )
3053 response.raise_for_status()
3054 self._log.debug("Received edit config response: %s", response.json())
3055
3056 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
3057 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3058 rpc_op)
3059 except Exception as e:
3060 self._log.error("Exception processing the "
3061 "start-network-service: {}".format(e))
3062 self._log.exception(e)
3063 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3064 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3065
3066
3067 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
3068
3069 with self._dts.group_create() as group:
3070 self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
3071 handler=hdl_ns,
3072 flags=rwdts.Flag.PUBLISHER,
3073 )
3074
3075
3076 class NsrDtsHandler(object):
3077 """ The network service DTS handler """
3078 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3079 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3080 KEY_PAIR_XPATH = "C,/nsr:key-pair"
3081
3082 def __init__(self, dts, log, loop, nsm):
3083 self._dts = dts
3084 self._log = log
3085 self._loop = loop
3086 self._nsm = nsm
3087
3088 self._nsr_regh = None
3089 self._scale_regh = None
3090 self._key_pair_regh = None
3091
3092 @property
3093 def nsm(self):
3094 """ Return the NS manager instance """
3095 return self._nsm
3096
3097 @asyncio.coroutine
3098 def register(self):
3099 """ Register for Nsr create/update/delete/read requests from dts """
3100
3101 def nsr_id_from_keyspec(ks):
3102 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3103 nsr_id = nsr_path_entry.key00.id
3104 return nsr_id
3105
3106 def group_name_from_keyspec(ks):
3107 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3108 group_name = group_path_entry.key00.scaling_group_name_ref
3109 return group_name
3110
3111 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3112 """ Return boolean indicating if scaling group instance was already commited previously.
3113
3114 By looking at the existing elements in this registration handle (elements not part
3115 of this current xact), we can tell if the instance was configured previously without
3116 keeping any application state.
3117 """
3118 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3119 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3120 elem_group_name = group_name_from_keyspec(keyspec)
3121
3122 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3123 continue
3124
3125 if instance_cfg.id == instance_id:
3126 return True
3127
3128 return False
3129
3130 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3131 delta = {"added": [], "deleted": []}
3132 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3133 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3134 if elem_nsr_id != nsr_id:
3135 continue
3136
3137 elem_group_name = group_name_from_keyspec(keyspec)
3138 if elem_group_name != group_name:
3139 continue
3140
3141 delta["added"].append(instance_cfg.id)
3142
3143 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3144 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3145 if elem_nsr_id != nsr_id:
3146 continue
3147
3148 elem_group_name = group_name_from_keyspec(keyspec)
3149 if elem_group_name != group_name:
3150 continue
3151
3152 if instance_cfg.id in delta["added"]:
3153 delta["added"].remove(instance_cfg.id)
3154 else:
3155 delta["deleted"].append(instance_cfg.id)
3156
3157 return delta
3158
3159 @asyncio.coroutine
3160 def update_nsr_nsd(nsr_id, xact, scratch):
3161
3162 @asyncio.coroutine
3163 def get_nsr_vl_delta(nsr_id, xact, scratch):
3164 delta = {"added": [], "deleted": []}
3165 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3166 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3167 if elem_nsr_id != nsr_id:
3168 continue
3169
3170 if 'vld' in instance_cfg.nsd:
3171 for vld in instance_cfg.nsd.vld:
3172 delta["added"].append(vld)
3173
3174 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3175 self._log.debug("NSR update: %s", instance_cfg)
3176 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3177 if elem_nsr_id != nsr_id:
3178 continue
3179
3180 if 'vld' in instance_cfg.nsd:
3181 for vld in instance_cfg.nsd.vld:
3182 if vld in delta["added"]:
3183 delta["added"].remove(vld)
3184 else:
3185 delta["deleted"].append(vld)
3186
3187 return delta
3188
3189 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3190 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3191
3192 for vld in vl_delta["added"]:
3193 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3194
3195 for vld in vl_delta["deleted"]:
3196 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3197
3198 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch):
3199 # Unfortunately, it is currently difficult to figure out what has exactly
3200 # changed in this xact without Pbdelta support (RIFT-4916)
3201 # As a workaround, we can fetch the pre and post xact elements and
3202 # perform a comparison to figure out adds/deletes/updates
3203 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
3204 curr_cfgs = list(dts_member_reg.elements)
3205
3206 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
3207 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
3208
3209 # Find Adds
3210 added_keys = set(xact_key_map) - set(curr_key_map)
3211 added_cfgs = [xact_key_map[key] for key in added_keys]
3212
3213 # Find Deletes
3214 deleted_keys = set(curr_key_map) - set(xact_key_map)
3215 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
3216
3217 # Find Updates
3218 updated_keys = set(curr_key_map) & set(xact_key_map)
3219 updated_cfgs = [xact_key_map[key] for key in updated_keys
3220 if xact_key_map[key] != curr_key_map[key]]
3221
3222 return added_cfgs, deleted_cfgs, updated_cfgs
3223
3224 def get_nsr_key_pairs(dts_member_reg, xact):
3225 key_pairs = {}
3226 for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True):
3227 self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec))
3228 xpath = keyspec.to_xpath(RwNsrYang.get_schema())
3229 key_pairs[instance_cfg.name] = instance_cfg
3230 return key_pairs
3231
3232 def on_apply(dts, acg, xact, action, scratch):
3233 """Apply the configuration"""
3234 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3235 xact, action, scratch)
3236
3237 def handle_create_nsr(msg, key_pairs=None, restart_mode=False):
3238 # Handle create nsr requests """
3239 # Do some validations
3240 if not msg.has_field("nsd"):
3241 err = "NSD not provided"
3242 self._log.error(err)
3243 raise NetworkServiceRecordError(err)
3244
3245 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3246 msg.id, msg.as_dict())
3247 nsr = self.nsm.create_nsr(msg, key_pairs=key_pairs, restart_mode=restart_mode)
3248 return nsr
3249
3250 def handle_delete_nsr(msg):
3251 @asyncio.coroutine
3252 def delete_instantiation(ns_id):
3253 """ Delete instantiation """
3254 with self._dts.transaction() as xact:
3255 yield from self._nsm.terminate_ns(ns_id, xact)
3256
3257 # Handle delete NSR requests
3258 self._log.info("Delete req for NSR Id: %s received", msg.id)
3259 # Terminate the NSR instance
3260 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3261
3262 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3263 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3264 nsr.record_event("terminate-rcvd", event_descr)
3265
3266 self._loop.create_task(delete_instantiation(msg.id))
3267
3268 @asyncio.coroutine
3269 def begin_instantiation(nsr):
3270 # Begin instantiation
3271 self._log.info("Beginning NS instantiation: %s", nsr.id)
3272 yield from self._nsm.instantiate_ns(nsr.id, xact)
3273
3274 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3275 xact, action, scratch)
3276
3277 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3278 key_pairs = []
3279 for element in self._key_pair_regh.elements:
3280 key_pairs.append(element)
3281 for element in self._nsr_regh.elements:
3282 nsr = handle_create_nsr(element, key_pairs, restart_mode=True)
3283 self._loop.create_task(begin_instantiation(nsr))
3284
3285
3286 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3287 xact,
3288 "id",
3289 scratch)
3290 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3291 deleted_msgs, updated_msgs)
3292
3293 for msg in added_msgs:
3294 if msg.id not in self._nsm.nsrs:
3295 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3296 key_pairs = get_nsr_key_pairs(self._key_pair_regh, xact)
3297 nsr = handle_create_nsr(msg,key_pairs)
3298 self._loop.create_task(begin_instantiation(nsr))
3299
3300 for msg in deleted_msgs:
3301 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3302 try:
3303 handle_delete_nsr(msg)
3304 except Exception:
3305 self._log.exception("Failed to terminate NS:%s", msg.id)
3306
3307 for msg in updated_msgs:
3308 self._log.info("Update NSR received in on_apply: %s", msg)
3309
3310 self._nsm.nsr_update_cfg(msg.id, msg)
3311
3312 if 'nsd' in msg:
3313 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3314
3315 for group in msg.scaling_group:
3316 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3317 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3318
3319 for instance_id in instance_delta["added"]:
3320 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3321
3322 for instance_id in instance_delta["deleted"]:
3323 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3324
3325
3326 return RwTypes.RwStatus.SUCCESS
3327
3328 @asyncio.coroutine
3329 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3330 """ Prepare calllback from DTS for NSR """
3331
3332 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3333 action = xact_info.query_action
3334 self._log.debug(
3335 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3336 xact, action, xact_info, xpath, msg
3337 )
3338
3339 @asyncio.coroutine
3340 def delete_instantiation(ns_id):
3341 """ Delete instantiation """
3342 yield from self._nsm.terminate_ns(ns_id, None)
3343
3344 def handle_delete_nsr():
3345 """ Handle delete NSR requests """
3346 self._log.info("Delete req for NSR Id: %s received", msg.id)
3347 # Terminate the NSR instance
3348 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3349
3350 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3351 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3352 nsr.record_event("terminate-rcvd", event_descr)
3353
3354 self._loop.create_task(delete_instantiation(msg.id))
3355
3356 fref = ProtobufC.FieldReference.alloc()
3357 fref.goto_whole_message(msg.to_pbcm())
3358
3359 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3360 # if this is an NSR create
3361 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3362 # Ensure the Cloud account/datacenter has been specified
3363 if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"):
3364 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3365
3366 # Check if nsd is specified
3367 if not msg.has_field("nsd"):
3368 raise NsrInstantiationFailed("NSD not specified in NSR")
3369
3370 else:
3371 nsr = self._nsm.nsrs[msg.id]
3372
3373 if msg.has_field("nsd"):
3374 if nsr.state != NetworkServiceRecordState.RUNNING:
3375 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3376 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3377 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3378
3379 if msg.has_field("scaling_group"):
3380 if nsr.state != NetworkServiceRecordState.RUNNING:
3381 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3382
3383 if len(msg.scaling_group) > 1:
3384 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3385
3386 for group_msg in msg.scaling_group:
3387 num_new_group_instances = len(group_msg.instance)
3388 if num_new_group_instances > 1:
3389 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3390
3391 elif num_new_group_instances == 1:
3392 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
3393 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
3394 if len(scale_group.instances) == scale_group.max_instance_count:
3395 raise ScalingOperationError("Max instances for %s reached" % scale_group)
3396
3397 acg.handle.prepare_complete_ok(xact_info.handle)
3398
3399
3400 self._log.debug("Registering for NSR config using xpath: %s",
3401 NsrDtsHandler.NSR_XPATH)
3402
3403 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3404 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3405 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
3406 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3407 on_prepare=on_prepare)
3408
3409 self._scale_regh = acg.register(
3410 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
3411 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
3412 )
3413
3414 self._key_pair_regh = acg.register(
3415 xpath=NsrDtsHandler.KEY_PAIR_XPATH,
3416 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3417 )
3418
3419
3420 class NsrOpDataDtsHandler(object):
3421 """ The network service op data DTS handler """
3422 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
3423
3424 def __init__(self, dts, log, loop, nsm):
3425 self._dts = dts
3426 self._log = log
3427 self._loop = loop
3428 self._nsm = nsm
3429 self._regh = None
3430
3431 @property
3432 def regh(self):
3433 """ Return the registration handle"""
3434 return self._regh
3435
3436 @property
3437 def nsm(self):
3438 """ Return the NS manager instance """
3439 return self._nsm
3440
3441 @asyncio.coroutine
3442 def register(self):
3443 """ Register for Nsr op data publisher registration"""
3444 self._log.debug("Registering Nsr op data path %s as publisher",
3445 NsrOpDataDtsHandler.XPATH)
3446
3447 hdl = rift.tasklets.DTS.RegistrationHandler()
3448 handlers = rift.tasklets.Group.Handler()
3449 with self._dts.group_create(handler=handlers) as group:
3450 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
3451 handler=hdl,
3452 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE)
3453
3454 @asyncio.coroutine
3455 def create(self, path, msg):
3456 """
3457 Create an NS record in DTS with the path and message
3458 """
3459 self._log.debug("Creating NSR %s:%s", path, msg)
3460 self.regh.create_element(path, msg)
3461 self._log.debug("Created NSR, %s:%s", path, msg)
3462
3463 @asyncio.coroutine
3464 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
3465 """
3466 Update an NS record in DTS with the path and message
3467 """
3468 self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh)
3469 self.regh.update_element(path, msg, flags)
3470 self._log.debug("Updated NSR, %s:%s", path, msg)
3471
3472 @asyncio.coroutine
3473 def delete(self, path):
3474 """
3475 Update an NS record in DTS with the path and message
3476 """
3477 self._log.debug("Deleting NSR path:%s", path)
3478 self.regh.delete_element(path)
3479 self._log.debug("Deleted NSR path:%s", path)
3480
3481
3482 class VnfrDtsHandler(object):
3483 """ The virtual network service DTS handler """
3484 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3485
3486 def __init__(self, dts, log, loop, nsm):
3487 self._dts = dts
3488 self._log = log
3489 self._loop = loop
3490 self._nsm = nsm
3491
3492 self._regh = None
3493
3494 @property
3495 def regh(self):
3496 """ Return registration handle """
3497 return self._regh
3498
3499 @property
3500 def nsm(self):
3501 """ Return the NS manager instance """
3502 return self._nsm
3503
3504 @asyncio.coroutine
3505 def register(self):
3506 """ Register for vnfr create/update/delete/ advises from dts """
3507
3508 def on_commit(xact_info):
3509 """ The transaction has been committed """
3510 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
3511 return rwdts.MemberRspCode.ACTION_OK
3512
3513 @asyncio.coroutine
3514 def on_prepare(xact_info, action, ks_path, msg):
3515 """ prepare callback from dts """
3516 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3517 self._log.debug(
3518 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3519 xact_info, action, ks_path, msg
3520 )
3521
3522 schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
3523 path_entry = schema.keyspec_to_entry(ks_path)
3524 if path_entry.key00.id not in self._nsm._vnfrs:
3525 self._log.error("%s request for non existent record path %s",
3526 action, xpath)
3527 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
3528
3529 return
3530
3531 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3532 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3533 yield from self._nsm.update_vnfr(msg)
3534 elif action == rwdts.QueryAction.DELETE:
3535 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3536 self._nsm.delete_vnfr(path_entry.key00.id)
3537
3538 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
3539
3540 self._log.debug("Registering for VNFR using xpath: %s",
3541 VnfrDtsHandler.XPATH,)
3542
3543 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
3544 on_prepare=on_prepare,)
3545 with self._dts.group_create() as group:
3546 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
3547 handler=hdl,
3548 flags=(rwdts.Flag.SUBSCRIBER),)
3549
3550
3551 class NsdRefCountDtsHandler(object):
3552 """ The NSD Ref Count DTS handler """
3553 XPATH = "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count"
3554
3555 def __init__(self, dts, log, loop, nsm):
3556 self._dts = dts
3557 self._log = log
3558 self._loop = loop
3559 self._nsm = nsm
3560
3561 self._regh = None
3562
3563 @property
3564 def regh(self):
3565 """ Return registration handle """
3566 return self._regh
3567
3568 @property
3569 def nsm(self):
3570 """ Return the NS manager instance """
3571 return self._nsm
3572
3573 @asyncio.coroutine
3574 def register(self):
3575 """ Register for NSD ref count read from dts """
3576
3577 @asyncio.coroutine
3578 def on_prepare(xact_info, action, ks_path, msg):
3579 """ prepare callback from dts """
3580 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3581
3582 if action == rwdts.QueryAction.READ:
3583 schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema()
3584 path_entry = schema.keyspec_to_entry(ks_path)
3585 nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref)
3586 for xpath, msg in nsd_list:
3587 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
3588 xpath=xpath,
3589 msg=msg)
3590 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3591 else:
3592 raise NetworkServiceRecordError("Not supported operation %s" % action)
3593
3594 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
3595 with self._dts.group_create() as group:
3596 self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH,
3597 handler=hdl,
3598 flags=rwdts.Flag.PUBLISHER,)
3599
3600
3601 class NsManager(object):
3602 """ The Network Service Manager class"""
3603 def __init__(self, dts, log, loop,
3604 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
3605 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
3606 self._dts = dts
3607 self._log = log
3608 self._loop = loop
3609 self._nsr_handler = nsr_handler
3610 self._vnfr_pub_handler = vnfr_handler
3611 self._vlr_pub_handler = vlr_handler
3612 self._vnffgmgr = vnffgmgr
3613 self._vnfd_pub_handler = vnfd_pub_handler
3614 self._cloud_account_handler = cloud_account_handler
3615
3616 self._ro_plugin_selector = ro_plugin_selector
3617 self._ncclient = rift.mano.ncclient.NcClient(
3618 host="127.0.0.1",
3619 port=2022,
3620 username="admin",
3621 password="admin",
3622 loop=self._loop)
3623
3624 self._nsrs = {}
3625 self._nsds = {}
3626 self._vnfds = {}
3627 self._vnfrs = {}
3628
3629 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
3630
3631 # TODO: All these handlers should move to tasklet level.
3632 # Passing self is often an indication of bad design
3633 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
3634 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
3635 self._dts_handlers = [self._nsd_dts_handler,
3636 VnfrDtsHandler(dts, log, loop, self),
3637 NsdRefCountDtsHandler(dts, log, loop, self),
3638 NsrDtsHandler(dts, log, loop, self),
3639 ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback),
3640 NsrRpcDtsHandler(dts,log,loop,self),
3641 self._vnfd_dts_handler,
3642 self.cfgmgr_obj,
3643 ]
3644
3645
3646 @property
3647 def log(self):
3648 """ Log handle """
3649 return self._log
3650
3651 @property
3652 def loop(self):
3653 """ Loop """
3654 return self._loop
3655
3656 @property
3657 def dts(self):
3658 """ DTS handle """
3659 return self._dts
3660
3661 @property
3662 def nsr_handler(self):
3663 """" NSR handler """
3664 return self._nsr_handler
3665
3666 @property
3667 def so_obj(self):
3668 """" So Obj handler """
3669 return self._so_obj
3670
3671 @property
3672 def nsrs(self):
3673 """ NSRs in this NSM"""
3674 return self._nsrs
3675
3676 @property
3677 def nsds(self):
3678 """ NSDs in this NSM"""
3679 return self._nsds
3680
3681 @property
3682 def vnfds(self):
3683 """ VNFDs in this NSM"""
3684 return self._vnfds
3685
3686 @property
3687 def vnfrs(self):
3688 """ VNFRs in this NSM"""
3689 return self._vnfrs
3690
3691 @property
3692 def nsr_pub_handler(self):
3693 """ NSR publication handler """
3694 return self._nsr_handler
3695
3696 @property
3697 def vnfr_pub_handler(self):
3698 """ VNFR publication handler """
3699 return self._vnfr_pub_handler
3700
3701 @property
3702 def vlr_pub_handler(self):
3703 """ VLR publication handler """
3704 return self._vlr_pub_handler
3705
3706 @property
3707 def vnfd_pub_handler(self):
3708 return self._vnfd_pub_handler
3709
3710 @asyncio.coroutine
3711 def register(self):
3712 """ Register all static DTS handlers """
3713 for dts_handle in self._dts_handlers:
3714 yield from dts_handle.register()
3715
3716
3717 def get_ns_by_nsr_id(self, nsr_id):
3718 """ get NSR by nsr id """
3719 if nsr_id not in self._nsrs:
3720 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
3721
3722 return self._nsrs[nsr_id]
3723
3724 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
3725 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3726 nsr_id,
3727 scale_group_name,
3728 instance_id
3729 )
3730 nsr = self._nsrs[nsr_id]
3731 if nsr.state != NetworkServiceRecordState.RUNNING:
3732 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3733
3734 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
3735
3736 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
3737 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3738 nsr_id,
3739 scale_group_name,
3740 instance_id,
3741 )
3742 nsr = self._nsrs[nsr_id]
3743 if nsr.state != NetworkServiceRecordState.RUNNING:
3744 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3745
3746 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
3747
3748 def scale_rpc_callback(self, xact, msg, action):
3749 """Callback handler for RPC calls
3750 Args:
3751 xact : Transaction Handler
3752 msg : RPC input
3753 action : Scaling Action
3754 """
3755 ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
3756 ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
3757
3758 xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
3759 msg.nsr_id_ref)
3760 instance = ScalingGroupInstance.from_dict({"id": msg.instance_id})
3761
3762 @asyncio.coroutine
3763 def get_nsr_scaling_group():
3764 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3765
3766 for result in results:
3767 res = yield from result
3768 nsr_config = res.result
3769
3770 for scaling_group in nsr_config.scaling_group:
3771 if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref:
3772 break
3773 else:
3774 scaling_group = nsr_config.scaling_group.add()
3775 scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref
3776
3777 return (nsr_config, scaling_group)
3778
3779 @asyncio.coroutine
3780 def update_config(nsr_config):
3781 xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config)
3782 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
3783 yield from self._ncclient.connect()
3784 yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace")
3785
3786 @asyncio.coroutine
3787 def scale_out():
3788 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3789 scaling_group.instance.append(instance)
3790 yield from update_config(nsr_config)
3791
3792 @asyncio.coroutine
3793 def scale_in():
3794 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3795 scaling_group.instance.remove(instance)
3796 yield from update_config(nsr_config)
3797
3798 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3799 self._loop.create_task(scale_out())
3800 else:
3801 self._loop.create_task(scale_in())
3802
3803 # Opdata based calls, disabled for now!
3804 # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3805 # self.scale_nsr_out(
3806 # msg.nsr_id_ref,
3807 # msg.scaling_group_name_ref,
3808 # msg.instance_id,
3809 # xact)
3810 # else:
3811 # self.scale_nsr_in(
3812 # msg.nsr_id_ref,
3813 # msg.scaling_group_name_ref,
3814 # msg.instance_id)
3815
3816 def nsr_update_cfg(self, nsr_id, msg):
3817 nsr = self._nsrs[nsr_id]
3818 nsr.nsr_cfg_msg= msg
3819
3820 def nsr_instantiate_vl(self, nsr_id, vld):
3821 self.log.debug("NSR {} create VL {}".format(nsr_id, vld))
3822 nsr = self._nsrs[nsr_id]
3823 if nsr.state != NetworkServiceRecordState.RUNNING:
3824 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3825
3826 # Not calling in a separate task as this is called from a separate task
3827 yield from nsr.create_vl_instance(vld)
3828
3829 def nsr_terminate_vl(self, nsr_id, vld):
3830 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
3831 nsr = self._nsrs[nsr_id]
3832 if nsr.state != NetworkServiceRecordState.RUNNING:
3833 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3834
3835 # Not calling in a separate task as this is called from a separate task
3836 yield from nsr.delete_vl_instance(vld)
3837
3838 def create_nsr(self, nsr_msg, key_pairs=None,restart_mode=False):
3839 """ Create an NSR instance """
3840 if nsr_msg.id in self._nsrs:
3841 msg = "NSR id %s already exists" % nsr_msg.id
3842 self._log.error(msg)
3843 raise NetworkServiceRecordError(msg)
3844
3845 self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3846 nsr_msg.id,
3847 nsr_msg.nsd.id)
3848
3849 nsm_plugin = self._ro_plugin_selector.ro_plugin
3850 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account)
3851
3852 nsr = NetworkServiceRecord(self._dts,
3853 self._log,
3854 self._loop,
3855 self,
3856 nsm_plugin,
3857 nsr_msg,
3858 sdn_account_name,
3859 key_pairs,
3860 restart_mode=restart_mode,
3861 vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr
3862 )
3863 self._nsrs[nsr_msg.id] = nsr
3864 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs)
3865
3866 return nsr
3867
3868 def delete_nsr(self, nsr_id):
3869 """
3870 Delete NSR with the passed nsr id
3871 """
3872 del self._nsrs[nsr_id]
3873
3874 @asyncio.coroutine
3875 def instantiate_ns(self, nsr_id, config_xact):
3876 """ Instantiate an NS instance """
3877 self._log.debug("Instantiating Network service id %s", nsr_id)
3878 if nsr_id not in self._nsrs:
3879 err = "NSR id %s not found " % nsr_id
3880 self._log.error(err)
3881 raise NetworkServiceRecordError(err)
3882
3883 nsr = self._nsrs[nsr_id]
3884 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
3885
3886 @asyncio.coroutine
3887 def update_vnfr(self, vnfr):
3888 """Create/Update an VNFR """
3889
3890 vnfr_state = self._vnfrs[vnfr.id].state
3891 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
3892
3893 yield from self._vnfrs[vnfr.id].update_state(vnfr)
3894 nsr = self.find_nsr_for_vnfr(vnfr.id)
3895 yield from nsr.update_state()
3896
3897 def find_nsr_for_vnfr(self, vnfr_id):
3898 """ Find the NSR which )has the passed vnfr id"""
3899 for nsr in list(self.nsrs.values()):
3900 for vnfr in list(nsr.vnfrs.values()):
3901 if vnfr.id == vnfr_id:
3902 return nsr
3903 return None
3904
3905 def delete_vnfr(self, vnfr_id):
3906 """ Delete VNFR with the passed id"""
3907 del self._vnfrs[vnfr_id]
3908
3909 def get_nsd_ref(self, nsd_id):
3910 """ Get network service descriptor for the passed nsd_id
3911 with a reference"""
3912 nsd = self.get_nsd(nsd_id)
3913 nsd.ref()
3914 return nsd
3915
3916 @asyncio.coroutine
3917 def get_nsr_config(self, nsd_id):
3918 xpath = "C,/nsr:ns-instance-config"
3919 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3920
3921 for result in results:
3922 entry = yield from result
3923 ns_instance_config = entry.result
3924
3925 for nsr in ns_instance_config.nsr:
3926 if nsr.nsd.id == nsd_id:
3927 return nsr
3928
3929 return None
3930
3931 @asyncio.coroutine
3932 def nsd_unref_by_nsr_id(self, nsr_id):
3933 """ Unref the network service descriptor based on NSR id """
3934 self._log.debug("NSR Unref called for Nsr Id:%s", nsr_id)
3935 if nsr_id in self._nsrs:
3936 nsr = self._nsrs[nsr_id]
3937
3938 try:
3939 nsd = self.get_nsd(nsr.nsd_id)
3940 self._log.debug("Releasing ref on NSD %s held by NSR %s - Curr %d",
3941 nsd.id, nsr.id, nsd.ref_count)
3942 nsd.unref()
3943 except NetworkServiceDescriptorError:
3944 # We store a copy of NSD in NSR and the NSD in nsd-catalog
3945 # could be deleted
3946 pass
3947
3948 else:
3949 self._log.error("Cannot find NSR with id %s", nsr_id)
3950 raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id)
3951
3952 @asyncio.coroutine
3953 def nsd_unref(self, nsd_id):
3954 """ Unref the network service descriptor associated with the id """
3955 nsd = self.get_nsd(nsd_id)
3956 nsd.unref()
3957
3958 def get_nsd(self, nsd_id):
3959 """ Get network service descriptor for the passed nsd_id"""
3960 if nsd_id not in self._nsds:
3961 self._log.error("Cannot find NSD id:%s", nsd_id)
3962 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
3963
3964 return self._nsds[nsd_id]
3965
3966 def create_nsd(self, nsd_msg):
3967 """ Create a network service descriptor """
3968 self._log.debug("Create network service descriptor - %s", nsd_msg)
3969 if nsd_msg.id in self._nsds:
3970 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
3971 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
3972
3973 nsd = NetworkServiceDescriptor(
3974 self._dts,
3975 self._log,
3976 self._loop,
3977 nsd_msg,
3978 self
3979 )
3980 self._nsds[nsd_msg.id] = nsd
3981
3982 return nsd
3983
3984 def update_nsd(self, nsd):
3985 """ update the Network service descriptor """
3986 self._log.debug("Update network service descriptor - %s", nsd)
3987 if nsd.id not in self._nsds:
3988 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
3989 self.create_nsd(nsd)
3990 else:
3991 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
3992 self._nsds[nsd.id].update(nsd)
3993
3994 def delete_nsd(self, nsd_id):
3995 """ Delete the Network service descriptor with the passed id """
3996 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
3997 if nsd_id not in self._nsds:
3998 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
3999 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
4000
4001 if nsd_id not in self._nsds:
4002 self._log.debug("Cannot delete NSD id %s reference exists %s",
4003 nsd_id,
4004 self._nsds[nsd_id].ref_count)
4005 raise NetworkServiceDescriptorRefCountExists(
4006 "Cannot delete :%s, ref_count:%s",
4007 nsd_id,
4008 self._nsds[nsd_id].ref_count)
4009
4010 del self._nsds[nsd_id]
4011
4012 def get_vnfd_config(self, xact):
4013 vnfd_dts_reg = self._vnfd_dts_handler.regh
4014 for cfg in vnfd_dts_reg.get_xact_elements(xact):
4015 if cfg.id not in self._vnfds:
4016 self.create_vnfd(cfg)
4017
4018 def get_vnfd(self, vnfd_id, xact):
4019 """ Get virtual network function descriptor for the passed vnfd_id"""
4020 if vnfd_id not in self._vnfds:
4021 self._log.error("Cannot find VNFD id:%s", vnfd_id)
4022 self.get_vnfd_config(xact)
4023
4024 if vnfd_id not in self._vnfds:
4025 self._log.error("Cannot find VNFD id:%s", vnfd_id)
4026 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
4027
4028 return self._vnfds[vnfd_id]
4029
4030 def create_vnfd(self, vnfd):
4031 """ Create a virtual network function descriptor """
4032 self._log.debug("Create virtual network function descriptor - %s", vnfd)
4033 if vnfd.id in self._vnfds:
4034 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
4035 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
4036
4037 self._vnfds[vnfd.id] = vnfd
4038 return self._vnfds[vnfd.id]
4039
4040 def update_vnfd(self, vnfd):
4041 """ Update the virtual network function descriptor """
4042 self._log.debug("Update virtual network function descriptor- %s", vnfd)
4043
4044
4045 if vnfd.id not in self._vnfds:
4046 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
4047 self.create_vnfd(vnfd)
4048 else:
4049 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
4050 self._vnfds[vnfd.id] = vnfd
4051
4052 @asyncio.coroutine
4053 def delete_vnfd(self, vnfd_id):
4054 """ Delete the virtual network function descriptor with the passed id """
4055 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
4056 if vnfd_id not in self._vnfds:
4057 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
4058 raise VnfDescriptorError("Cannot find %s", vnfd_id)
4059
4060 del self._vnfds[vnfd_id]
4061
4062 def nsd_in_use(self, nsd_id):
4063 """ Is the NSD with the passed id in use """
4064 self._log.debug("Is this NSD in use - msg:%s", nsd_id)
4065 if nsd_id in self._nsds:
4066 return self._nsds[nsd_id].in_use()
4067 return False
4068
4069 @asyncio.coroutine
4070 def publish_nsr(self, xact, path, msg):
4071 """ Publish a NSR """
4072 self._log.debug("Publish NSR with path %s, msg %s",
4073 path, msg)
4074 yield from self.nsr_handler.update(xact, path, msg)
4075
4076 @asyncio.coroutine
4077 def unpublish_nsr(self, xact, path):
4078 """ Un Publish an NSR """
4079 self._log.debug("Publishing delete NSR with path %s", path)
4080 yield from self.nsr_handler.delete(path, xact)
4081
4082 def vnfr_is_ready(self, vnfr_id):
4083 """ VNFR with the id is ready """
4084 self._log.debug("VNFR id %s ready", vnfr_id)
4085 if vnfr_id not in self._vnfds:
4086 err = "Did not find VNFR ID with id %s" % vnfr_id
4087 self._log.critical("err")
4088 raise VirtualNetworkFunctionRecordError(err)
4089 self._vnfrs[vnfr_id].is_ready()
4090
4091 @asyncio.coroutine
4092 def get_nsd_refcount(self, nsd_id):
4093 """ Get the nsd_list from this NSM"""
4094
4095 def nsd_refcount_xpath(nsd_id):
4096 """ xpath for ref count entry """
4097 return (NsdRefCountDtsHandler.XPATH +
4098 "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id)
4099
4100 nsd_list = []
4101 if nsd_id is None or nsd_id == "":
4102 for nsd in self._nsds.values():
4103 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4104 nsd_msg.nsd_id_ref = nsd.id
4105 nsd_msg.instance_ref_count = nsd.ref_count
4106 nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg))
4107 elif nsd_id in self._nsds:
4108 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4109 nsd_msg.nsd_id_ref = self._nsds[nsd_id].id
4110 nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count
4111 nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg))
4112
4113 return nsd_list
4114
4115 @asyncio.coroutine
4116 def terminate_ns(self, nsr_id, xact):
4117 """
4118 Terminate network service for the given NSR Id
4119 """
4120
4121 # Terminate the instances/networks assocaited with this nw service
4122 self._log.debug("Terminating the network service %s", nsr_id)
4123 try :
4124 yield from self._nsrs[nsr_id].terminate()
4125 except Exception as e:
4126 self.log.exception("Failed to terminate NSR[id=%s]", nsr_id)
4127
4128 # Unref the NSD
4129 yield from self.nsd_unref_by_nsr_id(nsr_id)
4130
4131 # Unpublish the NSR record
4132 self._log.debug("Unpublishing the network service %s", nsr_id)
4133 yield from self._nsrs[nsr_id].unpublish(xact)
4134
4135 # Finaly delete the NS instance from this NS Manager
4136 self._log.debug("Deletng the network service %s", nsr_id)
4137 self.delete_nsr(nsr_id)
4138
4139
4140 class NsmRecordsPublisherProxy(object):
4141 """ This class provides a publisher interface that allows plugin objects
4142 to publish NSR/VNFR/VLR"""
4143
4144 def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr):
4145 self._dts = dts
4146 self._log = log
4147 self._loop = loop
4148 self._nsr_pub_hdlr = nsr_pub_hdlr
4149 self._vlr_pub_hdlr = vlr_pub_hdlr
4150 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4151
4152 @asyncio.coroutine
4153 def publish_nsr(self, xact, nsr):
4154 """ Publish an NSR """
4155 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4156 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4157
4158 @asyncio.coroutine
4159 def unpublish_nsr(self, xact, nsr):
4160 """ Unpublish an NSR """
4161 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4162 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4163
4164 @asyncio.coroutine
4165 def publish_vnfr(self, xact, vnfr):
4166 """ Publish an VNFR """
4167 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4168 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4169
4170 @asyncio.coroutine
4171 def unpublish_vnfr(self, xact, vnfr):
4172 """ Unpublish a VNFR """
4173 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4174 return (yield from self._vnfr_pub_hdlr.delete(xact, path))
4175
4176 @asyncio.coroutine
4177 def publish_vlr(self, xact, vlr):
4178 """ Publish a VLR """
4179 path = VirtualLinkRecord.vlr_xpath(vlr)
4180 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4181
4182 @asyncio.coroutine
4183 def unpublish_vlr(self, xact, vlr):
4184 """ Unpublish a VLR """
4185 path = VirtualLinkRecord.vlr_xpath(vlr)
4186 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4187
4188
4189 class ScalingRpcHandler(mano_dts.DtsHandler):
4190 """ The Network service Monitor DTS handler """
4191 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4192 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4193
4194 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4195 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4196
4197 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4198
4199 def __init__(self, log, dts, loop, callback=None):
4200 super().__init__(log, dts, loop)
4201 self.callback = callback
4202 self.last_instance_id = defaultdict(int)
4203
4204 @asyncio.coroutine
4205 def register(self):
4206
4207 @asyncio.coroutine
4208 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4209 assert action == rwdts.QueryAction.RPC
4210
4211 try:
4212 if self.callback:
4213 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4214
4215 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4216 "instance_id": msg.instance_id})
4217
4218 xact_info.respond_xpath(
4219 rwdts.XactRspCode.ACK,
4220 self.__class__.SCALE_IN_OUTPUT_XPATH,
4221 rpc_op)
4222
4223 except Exception as e:
4224 self.log.exception(e)
4225 xact_info.respond_xpath(
4226 rwdts.XactRspCode.NACK,
4227 self.__class__.SCALE_IN_OUTPUT_XPATH)
4228
4229 @asyncio.coroutine
4230 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4231 assert action == rwdts.QueryAction.RPC
4232
4233 try:
4234 scaling_group = msg.scaling_group_name_ref
4235 if not msg.instance_id:
4236 last_instance_id = self.last_instance_id[scale_group]
4237 msg.instance_id = last_instance_id + 1
4238 self.last_instance_id[scale_group] += 1
4239
4240 if self.callback:
4241 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4242
4243 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4244 "instance_id": msg.instance_id})
4245
4246 xact_info.respond_xpath(
4247 rwdts.XactRspCode.ACK,
4248 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4249 rpc_op)
4250
4251 except Exception as e:
4252 self.log.exception(e)
4253 xact_info.respond_xpath(
4254 rwdts.XactRspCode.NACK,
4255 self.__class__.SCALE_OUT_OUTPUT_XPATH)
4256
4257 scale_in_hdl = rift.tasklets.DTS.RegistrationHandler(
4258 on_prepare=on_scale_in_prepare)
4259 scale_out_hdl = rift.tasklets.DTS.RegistrationHandler(
4260 on_prepare=on_scale_out_prepare)
4261
4262 with self.dts.group_create() as group:
4263 group.register(
4264 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4265 handler=scale_in_hdl,
4266 flags=rwdts.Flag.PUBLISHER)
4267 group.register(
4268 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4269 handler=scale_out_hdl,
4270 flags=rwdts.Flag.PUBLISHER)
4271
4272
4273 class NsmTasklet(rift.tasklets.Tasklet):
4274 """
4275 The network service manager tasklet
4276 """
4277 def __init__(self, *args, **kwargs):
4278 super(NsmTasklet, self).__init__(*args, **kwargs)
4279 self.rwlog.set_category("rw-mano-log")
4280 self.rwlog.set_subcategory("nsm")
4281
4282 self._dts = None
4283 self._nsm = None
4284
4285 self._ro_plugin_selector = None
4286 self._vnffgmgr = None
4287
4288 self._nsr_handler = None
4289 self._vnfr_pub_handler = None
4290 self._vlr_pub_handler = None
4291 self._vnfd_pub_handler = None
4292 self._scale_cfg_handler = None
4293
4294 self._records_publisher_proxy = None
4295
4296 def start(self):
4297 """ The task start callback """
4298 super(NsmTasklet, self).start()
4299 self.log.info("Starting NsmTasklet")
4300
4301 self.log.debug("Registering with dts")
4302 self._dts = rift.tasklets.DTS(self.tasklet_info,
4303 RwNsmYang.get_schema(),
4304 self.loop,
4305 self.on_dts_state_change)
4306
4307 self.log.debug("Created DTS Api GI Object: %s", self._dts)
4308
4309 def stop(self):
4310 try:
4311 self._dts.deinit()
4312 except Exception:
4313 print("Caught Exception in NSM stop:", sys.exc_info()[0])
4314 raise
4315
4316 def on_instance_started(self):
4317 """ Task instance started callback """
4318 self.log.debug("Got instance started callback")
4319
4320 @asyncio.coroutine
4321 def init(self):
4322 """ Task init callback """
4323 self.log.debug("Got instance started callback")
4324
4325 self.log.debug("creating config account handler")
4326
4327 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop)
4328 yield from self._nsr_pub_handler.register()
4329
4330 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop)
4331 yield from self._vnfr_pub_handler.register()
4332
4333 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop)
4334 yield from self._vlr_pub_handler.register()
4335
4336 manifest = self.tasklet_info.get_pb_manifest()
4337 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4338 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4339 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4340
4341 self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop)
4342
4343 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4344 self._dts,
4345 self.log,
4346 self.loop,
4347 self._nsr_pub_handler,
4348 self._vnfr_pub_handler,
4349 self._vlr_pub_handler,
4350 )
4351
4352 # Register the NSM to receive the nsm plugin
4353 # when cloud account is configured
4354 self._ro_plugin_selector = cloud.ROAccountPluginSelector(
4355 self._dts,
4356 self.log,
4357 self.loop,
4358 self._records_publisher_proxy,
4359 )
4360 yield from self._ro_plugin_selector.register()
4361
4362 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4363 self._log,
4364 self._dts,
4365 self.log_hdl)
4366
4367 yield from self._cloud_account_handler.register()
4368
4369 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop)
4370 yield from self._vnffgmgr.register()
4371
4372 self._nsm = NsManager(
4373 self._dts,
4374 self.log,
4375 self.loop,
4376 self._nsr_pub_handler,
4377 self._vnfr_pub_handler,
4378 self._vlr_pub_handler,
4379 self._ro_plugin_selector,
4380 self._vnffgmgr,
4381 self._vnfd_pub_handler,
4382 self._cloud_account_handler
4383 )
4384
4385 yield from self._nsm.register()
4386
4387 @asyncio.coroutine
4388 def run(self):
4389 """ Task run callback """
4390 pass
4391
4392 @asyncio.coroutine
4393 def on_dts_state_change(self, state):
4394 """Take action according to current dts state to transition
4395 application into the corresponding application state
4396
4397 Arguments
4398 state - current dts state
4399 """
4400 switch = {
4401 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
4402 rwdts.State.CONFIG: rwdts.State.RUN,
4403 }
4404
4405 handlers = {
4406 rwdts.State.INIT: self.init,
4407 rwdts.State.RUN: self.run,
4408 }
4409
4410 # Transition application to next state
4411 handler = handlers.get(state, None)
4412 if handler is not None:
4413 yield from handler()
4414
4415 # Transition dts to next state
4416 next_state = switch.get(state, None)
4417 if next_state is not None:
4418 self.log.debug("Changing state to %s", next_state)
4419 self._dts.handle.set_state(next_state)