Revert "Full Juju Charm support"
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import gi
20 import json
21 import ncclient
22 import ncclient.asyncio_manager
23 import os
24 import requests
25 import shutil
26 import sys
27 import tempfile
28 import time
29 import uuid
30 import yaml
31
32 from collections import defaultdict
33 from collections import deque
34 from enum import Enum
35 from urllib.parse import urlparse
36
37 # disable unsigned certificate warning
38 from requests.packages.urllib3.exceptions import InsecureRequestWarning
39 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
40
41 gi.require_version('RwYang', '1.0')
42 gi.require_version('NsdBaseYang', '1.0')
43 gi.require_version('ProjectNsdYang', '1.0')
44 gi.require_version('RwDts', '1.0')
45 gi.require_version('RwNsmYang', '1.0')
46 gi.require_version('RwNsrYang', '1.0')
47 gi.require_version('NsrYang', '1.0')
48 gi.require_version('RwTypes', '1.0')
49 gi.require_version('RwVlrYang', '1.0')
50 gi.require_version('RwVnfrYang', '1.0')
51 gi.require_version('VnfrYang', '1.0')
52 gi.require_version('ProjectVnfdYang', '1.0')
53 from gi.repository import (
54 RwYang,
55 RwNsrYang,
56 NsrYang,
57 NsdBaseYang,
58 ProjectNsdYang as NsdYang,
59 RwVlrYang,
60 VnfrYang,
61 RwVnfrYang,
62 RwNsmYang,
63 RwsdnalYang,
64 RwDts as rwdts,
65 RwTypes,
66 ProjectVnfdYang,
67 ProtobufC,
68 )
69 gi.require_version('RwKeyspec', '1.0')
70 from gi.repository.RwKeyspec import quoted_key
71
72 from rift.mano.utils.ssh_keys import ManoSshKey
73 import rift.mano.ncclient
74 import rift.mano.config_data.config
75 import rift.mano.dts as mano_dts
76 import rift.tasklets
77 from rift.mano.utils.project import (
78 ManoProject,
79 ProjectHandler,
80 get_add_delete_update_cfgs,
81 DEFAULT_PROJECT,
82 )
83
84 from . import rwnsm_conman as conman
85 from . import cloud
86 from . import publisher
87 from . import subscriber
88 from . import xpath
89 from . import config_value_pool
90 from . import rwvnffgmgr
91 from . import scale_group
92 from . import rwnsmplugin
93 from . import openmano_nsm
94 import functools
95 import collections
96
97 class NetworkServiceRecordState(Enum):
98 """ Network Service Record State """
99 INIT = 101
100 VL_INIT_PHASE = 102
101 VNF_INIT_PHASE = 103
102 VNFFG_INIT_PHASE = 104
103 RUNNING = 106
104 SCALING_OUT = 107
105 SCALING_IN = 108
106 TERMINATE = 109
107 TERMINATE_RCVD = 110
108 VL_TERMINATE_PHASE = 111
109 VNF_TERMINATE_PHASE = 112
110 VNFFG_TERMINATE_PHASE = 113
111 TERMINATED = 114
112 FAILED = 115
113 VL_INSTANTIATE = 116
114 VL_TERMINATE = 117
115
116
117 class NetworkServiceRecordError(Exception):
118 """ Network Service Record Error """
119 pass
120
121
122 class NetworkServiceDescriptorError(Exception):
123 """ Network Service Descriptor Error """
124 pass
125
126
127 class VirtualNetworkFunctionRecordError(Exception):
128 """ Virtual Network Function Record Error """
129 pass
130
131
132 class NetworkServiceDescriptorNotFound(Exception):
133 """ Cannot find Network Service Descriptor"""
134 pass
135
136
137 class NetworkServiceDescriptorNotFound(Exception):
138 """ Network Service Descriptor reference count exists """
139 pass
140
141 class NsrInstantiationFailed(Exception):
142 """ Failed to instantiate network service """
143 pass
144
145
146 class VnfInstantiationFailed(Exception):
147 """ Failed to instantiate virtual network function"""
148 pass
149
150
151 class VnffgInstantiationFailed(Exception):
152 """ Failed to instantiate virtual network function"""
153 pass
154
155
156 class VnfDescriptorError(Exception):
157 """Failed to instantiate virtual network function"""
158 pass
159
160
161 class ScalingOperationError(Exception):
162 pass
163
164
165 class ScaleGroupMissingError(Exception):
166 pass
167
168
169 class PlacementGroupError(Exception):
170 pass
171
172
173 class NsrNsdUpdateError(Exception):
174 pass
175
176
177 class NsrVlUpdateError(NsrNsdUpdateError):
178 pass
179
180 class VirtualLinkRecordError(Exception):
181 """ Virtual Links Record Error """
182 pass
183
184
185 class VlRecordState(Enum):
186 """ VL Record State """
187 INIT = 101
188 INSTANTIATION_PENDING = 102
189 ACTIVE = 103
190 TERMINATE_PENDING = 104
191 TERMINATED = 105
192 FAILED = 106
193
194
195 class VnffgRecordState(Enum):
196 """ VNFFG Record State """
197 INIT = 101
198 INSTANTIATION_PENDING = 102
199 ACTIVE = 103
200 TERMINATE_PENDING = 104
201 TERMINATED = 105
202 FAILED = 106
203
204
205 class VnffgRecord(object):
206 """ Vnffg Records class"""
207 SFF_DP_PORT = 4790
208 SFF_MGMT_PORT = 5000
209 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name,cloud_account_name):
210
211 self._dts = dts
212 self._log = log
213 self._loop = loop
214 self._vnffgmgr = vnffgmgr
215 self._nsr = nsr
216 self._nsr_name = nsr_name
217 self._vnffgd_msg = vnffgd_msg
218 self._cloud_account_name = cloud_account_name
219 if sdn_account_name is None:
220 self._sdn_account_name = ''
221 else:
222 self._sdn_account_name = sdn_account_name
223
224 self._vnffgr_id = str(uuid.uuid4())
225 self._vnffgr_rsp_id = list()
226 self._vnffgr_state = VnffgRecordState.INIT
227
228 @property
229 def id(self):
230 """ VNFFGR id """
231 return self._vnffgr_id
232
233 @property
234 def state(self):
235 """ state of this VNF """
236 return self._vnffgr_state
237
238 def fetch_vnffgr(self):
239 """
240 Get VNFFGR message to be published
241 """
242
243 if self._vnffgr_state == VnffgRecordState.INIT:
244 vnffgr_dict = {"id": self._vnffgr_id,
245 "vnffgd_id_ref": self._vnffgd_msg.id,
246 "vnffgd_name_ref": self._vnffgd_msg.name,
247 "sdn_account": self._sdn_account_name,
248 "operational_status": 'init',
249 }
250 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
251 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
252 vnffgr_dict = {"id": self._vnffgr_id,
253 "vnffgd_id_ref": self._vnffgd_msg.id,
254 "vnffgd_name_ref": self._vnffgd_msg.name,
255 "sdn_account": self._sdn_account_name,
256 "operational_status": 'terminated',
257 }
258 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
259 else:
260 try:
261 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
262 except Exception:
263 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
264 self._vnffgr_state = VnffgRecordState.FAILED
265 vnffgr_dict = {"id": self._vnffgr_id,
266 "vnffgd_id_ref": self._vnffgd_msg.id,
267 "vnffgd_name_ref": self._vnffgd_msg.name,
268 "sdn_account": self._sdn_account_name,
269 "operational_status": 'failed',
270 }
271 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
272
273 return vnffgr
274
275 @asyncio.coroutine
276 def vnffgr_create_msg(self):
277 """ Virtual Link Record message for Creating VLR in VNS """
278 vnffgr_dict = {"id": self._vnffgr_id,
279 "vnffgd_id_ref": self._vnffgd_msg.id,
280 "vnffgd_name_ref": self._vnffgd_msg.name,
281 "sdn_account": self._sdn_account_name,
282 "cloud_account": self._cloud_account_name,
283 }
284 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
285 for rsp in self._vnffgd_msg.rsp:
286 vnffgr_rsp = vnffgr.rsp.add()
287 vnffgr_rsp.id = str(uuid.uuid4())
288 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
289 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
290 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
291 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
292 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
293 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
294 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
295 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
296 self._log.debug("Service Function Type for VNFD ID %s is %s",
297 rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
298 else:
299 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",
300 rsp_cp_ref.vnfd_id_ref)
301 continue
302
303 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
304 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
305 vnfr_cp_ref.hop_number = rsp_cp_ref.order
306 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
307 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
308 for nsr_vnfr in self._nsr.vnfrs.values():
309 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
310 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
311 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
312 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
313 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
314
315 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
316 self._log.debug(" Received VNFR is %s", vnfr)
317 while vnfr.operational_status != 'running':
318 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
319 if vnfr.operational_status == 'failed':
320 self._log.error("Fetching VNFR for %s failed", vnfr.id)
321 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
322 (self.id, vnfr.id))
323 yield from asyncio.sleep(2, loop=self._loop)
324 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
325 self._log.debug("Received VNFR is %s", vnfr)
326
327 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
328 for cp in vnfr.connection_point:
329 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
330 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
331 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
332 for vdu in vnfr.vdur:
333 for intf in vdu.interface:
334 if intf.type_yang == "EXTERNAL" and intf.external_connection_point_ref == vnfr_cp_ref.vnfr_connection_point_ref:
335 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
336 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
337 vnfr_cp_ref.connection_point_params.vm_id)
338 break
339
340 vnfr_cp_ref.connection_point_params.address = cp.ip_address
341 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
342
343 for vnffgd_classifier in self._vnffgd_msg.classifier:
344 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
345 if len(_rsp) > 0:
346 rsp_id_ref = _rsp[0].id
347 rsp_name = _rsp[0].name
348 else:
349 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",
350 vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
351 continue
352 vnffgr_classifier = vnffgr.classifier.add()
353 vnffgr_classifier.id = vnffgd_classifier.id
354 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
355 _rsp[0].classifier_name = vnffgr_classifier.name
356 vnffgr_classifier.rsp_id_ref = rsp_id_ref
357 vnffgr_classifier.rsp_name = rsp_name
358 for nsr_vnfr in self._nsr.vnfrs.values():
359 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
360 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
361 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
362 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
363 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
364
365 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
366 vnffgr_classifier.sff_name = nsr_vnfr.name
367
368 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
369 self._log.debug(" Received VNFR is %s", vnfr)
370 while vnfr.operational_status != 'running':
371 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
372 if vnfr.operational_status == 'failed':
373 self._log.error("Fetching VNFR for %s failed", vnfr.id)
374 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
375 (self.id, vnfr.id))
376 yield from asyncio.sleep(2, loop=self._loop)
377 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
378 self._log.debug("Received VNFR is %s", vnfr)
379
380 for cp in vnfr.connection_point:
381 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
382 vnffgr_classifier.port_id = cp.connection_point_id
383 vnffgr_classifier.ip_address = cp.ip_address
384 for vdu in vnfr.vdur:
385 for intf in vdu.interface:
386 if intf.type_yang == "EXTERNAL" and intf.external_connection_point_ref == vnffgr_classifier.vnfr_connection_point_ref:
387 vnffgr_classifier.vm_id = vdu.vim_id
388 self._log.debug("VIM ID for CP %s in VNFR %s is %s",
389 cp.name,nsr_vnfr.id,
390 vnfr_cp_ref.connection_point_params.vm_id)
391 break
392
393 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
394 return vnffgr
395
396 @asyncio.coroutine
397 def vnffgr_nsr_sff_list(self):
398 """ SFF List for VNFR """
399 sff_list = {}
400 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
401
402 for nsr_vnfr in self._nsr.vnfrs.values():
403 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
404 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
405 self._log.debug(" Received VNFR is %s", vnfr)
406 while vnfr.operational_status != 'running':
407 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
408 if vnfr.operational_status == 'failed':
409 self._log.error("Fetching VNFR for %s failed", vnfr.id)
410 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
411 yield from asyncio.sleep(2, loop=self._loop)
412 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
413 self._log.debug("Received VNFR is %s", vnfr)
414
415 sff = RwsdnalYang.YangData_RwProject_Project_Vnffgs_VnffgChain_Sff()
416 sff_list[nsr_vnfr.vnfd.id] = sff
417 sff.name = nsr_vnfr.name
418 sff.function_type = nsr_vnfr.vnfd.service_function_chain
419
420 sff.mgmt_address = vnfr.mgmt_interface.ip_address
421 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
422 for cp in vnfr.connection_point:
423 sff_dp = sff.dp_endpoints.add()
424 sff_dp.name = self._nsr.name + '.' + cp.name
425 sff_dp.address = cp.ip_address
426 sff_dp.port = VnffgRecord.SFF_DP_PORT
427 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
428 for sf_name in sf_list:
429 _sf = sff.vnfr_list.add()
430 _sf.vnfr_name = sf_name
431
432 return sff_list
433
434 @asyncio.coroutine
435 def instantiate(self):
436 """ Instantiate this VNFFG """
437
438 self._log.info("Instaniating VNFFGR with vnffgd %s",
439 self._vnffgd_msg)
440
441
442 vnffgr_request = yield from self.vnffgr_create_msg()
443 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
444
445 try:
446 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
447 except Exception as e:
448 self._log.exception("VNFFG instantiation failed: %s", str(e))
449 self._vnffgr_state = VnffgRecordState.FAILED
450 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
451
452 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
453
454 self._log.info("Instantiated VNFFGR :%s", vnffgr)
455 self._vnffgr_state = VnffgRecordState.ACTIVE
456
457 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
458 yield from self._nsr.update_state()
459
460 def vnffgr_in_vnffgrm(self):
461 """ Is there a VNFR record in VNFM """
462 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
463 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
464 self._vnffgr_state == VnffgRecordState.FAILED):
465 return True
466
467 return False
468
469 @asyncio.coroutine
470 def terminate(self):
471 """ Terminate this VNFFGR """
472 if not self.vnffgr_in_vnffgrm():
473 self._log.error("Ignoring terminate request for id %s in state %s",
474 self.id, self._vnffgr_state)
475 return
476
477 self._log.info("Terminating VNFFGR id:%s", self.id)
478 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
479
480 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
481
482 self._vnffgr_state = VnffgRecordState.TERMINATED
483 self._log.debug("Terminated VNFFGR id:%s", self.id)
484
485
486 class VirtualLinkRecord(object):
487 """ Virtual Link Records class"""
488 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
489 @staticmethod
490 @asyncio.coroutine
491 def create_record(dts, log, loop, project, nsr_name, vld_msg,
492 datacenter, ip_profile, nsr_id, restart_mode=False):
493 """Creates a new VLR object based on the given data.
494
495 If restart mode is enabled, then we look for existing records in the
496 DTS and create a VLR records using the exiting data(ID)
497
498 Returns:
499 VirtualLinkRecord
500 """
501 vlr_obj = VirtualLinkRecord(
502 dts,
503 log,
504 loop,
505 project,
506 nsr_name,
507 vld_msg,
508 datacenter,
509 ip_profile,
510 nsr_id,
511 )
512
513 if restart_mode:
514 res_iter = yield from dts.query_read(
515 project.add_project("D,/vlr:vlr-catalog/vlr:vlr"),
516 rwdts.XactFlag.MERGE)
517
518 for fut in res_iter:
519 response = yield from fut
520 vlr = response.result
521
522 # Check if the record is already present, if so use the ID of
523 # the existing record. Since the name of the record is uniquely
524 # formed we can use it as a search key!
525 if vlr.name == vlr_obj.name:
526 vlr_obj.reset_id(vlr.id)
527 break
528
529 return vlr_obj
530
531 def __init__(self, dts, log, loop, project, nsr_name, vld_msg,
532 datacenter, ip_profile, nsr_id):
533 self._dts = dts
534 self._log = log
535 self._loop = loop
536 self._project = project
537 self._nsr_name = nsr_name
538 self._vld_msg = vld_msg
539 self._datacenter_name = datacenter
540 self._assigned_subnet = None
541 self._nsr_id = nsr_id
542 self._ip_profile = ip_profile
543 self._vlr_id = str(uuid.uuid4())
544 self._state = VlRecordState.INIT
545 self._prev_state = None
546 self._create_time = int(time.time())
547 self.state_failed_reason = None
548
549 @property
550 def xpath(self):
551 """ path for this object """
552 return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]".
553 format(quoted_key(self._vlr_id)))
554
555 @property
556 def id(self):
557 """ VLR id """
558 return self._vlr_id
559
560 @property
561 def nsr_name(self):
562 """ Get NSR name for this VL """
563 return self.nsr_name
564
565 @property
566 def vld_msg(self):
567 """ Virtual Link Desciptor """
568 return self._vld_msg
569
570 @property
571 def assigned_subnet(self):
572 """ Subnet assigned to this VL"""
573 return self._assigned_subnet
574
575 @property
576 def name(self):
577 """
578 Get the name for this VLR.
579 VLR name is "nsr name:VLD name"
580 """
581 if self.vld_msg.vim_network_name:
582 return self.vld_msg.vim_network_name
583 elif self.vld_msg.name == "multisite":
584 # This is a temporary hack to identify manually provisioned inter-site network
585 return self.vld_msg.name
586 else:
587 return self._project.name + "." +self._nsr_name + "." + self.vld_msg.name
588
589 @property
590 def datacenter_name(self):
591 """ Datacenter that this VLR should be created in """
592 return self._datacenter_name
593
594 @staticmethod
595 def vlr_xpath(vlr):
596 """ Get the VLR path from VLR """
597 return (VirtualLinkRecord.XPATH + "[vlr:id={}]").format(quoted_key(vlr.id))
598
599 @property
600 def state(self):
601 """ VLR state """
602 return self._state
603
604 @state.setter
605 def state(self, value):
606 """ VLR set state """
607 self._state = value
608
609 @property
610 def prev_state(self):
611 """ VLR previous state """
612 return self._prev_state
613
614 @prev_state.setter
615 def prev_state(self, value):
616 """ VLR set previous state """
617 self._prev_state = value
618
619 @property
620 def vlr_msg(self):
621 """ Virtual Link Record message for Creating VLR in VNS """
622 vld_fields = ["short_name",
623 "vendor",
624 "description",
625 "version",
626 "type_yang",
627 "vim_network_name",
628 "provider_network"]
629
630 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
631 if k in vld_fields}
632
633 vlr_dict = {"id": self._vlr_id,
634 "nsr_id_ref": self._nsr_id,
635 "vld_ref": self.vld_msg.id,
636 "name": self.name,
637 "create_time": self._create_time,
638 "datacenter": self._datacenter_name,
639 }
640
641 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
642 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
643
644
645 vlr_dict.update(vld_copy_dict)
646 vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict)
647
648 if self.vld_msg.has_field('virtual_connection_points'):
649 for cp in self.vld_msg.virtual_connection_points:
650 vcp = vlr.virtual_connection_points.add()
651 vcp.from_dict(cp.as_dict())
652 return vlr
653
654 def reset_id(self, vlr_id):
655 self._vlr_id = vlr_id
656
657 def create_nsr_vlr_msg(self, vnfrs):
658 """ The VLR message"""
659 nsr_vlr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vlr()
660 nsr_vlr.vlr_ref = self._vlr_id
661 nsr_vlr.assigned_subnet = self.assigned_subnet
662 nsr_vlr.datacenter = self._datacenter_name
663
664 for conn in self.vld_msg.vnfd_connection_point_ref:
665 for vnfr in vnfrs:
666 if (vnfr.vnfd.id == conn.vnfd_id_ref and
667 vnfr.member_vnf_index == conn.member_vnf_index_ref and
668 self._datacenter_name == vnfr._datacenter_name):
669 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
670 cp_entry.vnfr_id = vnfr.id
671 cp_entry.connection_point = conn.vnfd_connection_point_ref
672
673 return nsr_vlr
674
675 @asyncio.coroutine
676 def instantiate(self):
677 """ Instantiate this VL """
678 self._log.debug("Instaniating VLR key %s, vld %s",
679 self.xpath, self._vld_msg)
680 vlr = None
681 self._state = VlRecordState.INSTANTIATION_PENDING
682 self._log.debug("Executing VL create path:%s msg:%s",
683 self.xpath, self.vlr_msg)
684
685 with self._dts.transaction(flags=0) as xact:
686 block = xact.block_create()
687 block.add_query_create(self.xpath, self.vlr_msg)
688 self._log.debug("Executing VL create path:%s msg:%s",
689 self.xpath, self.vlr_msg)
690 res_iter = yield from block.execute(now=True)
691 for ent in res_iter:
692 res = yield from ent
693 vlr = res.result
694
695 if vlr is None:
696 self._state = VlRecordState.FAILED
697 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
698
699 if vlr.operational_status == 'failed':
700 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
701 self._state = VlRecordState.FAILED
702 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
703
704 self._log.info("Instantiated VL with xpath %s and vlr:%s",
705 self.xpath, vlr)
706 self._assigned_subnet = vlr.assigned_subnet
707
708 def vlr_in_vns(self):
709 """ Is there a VLR record in VNS """
710 if (self._state == VlRecordState.ACTIVE or
711 self._state == VlRecordState.INSTANTIATION_PENDING or
712 self._state == VlRecordState.TERMINATE_PENDING or
713 self._state == VlRecordState.FAILED):
714 return True
715
716 return False
717
718 @asyncio.coroutine
719 def terminate(self):
720 """ Terminate this VL """
721 if not self.vlr_in_vns():
722 self._log.debug("Ignoring terminate request for id %s in state %s",
723 self.id, self._state)
724 return
725
726 self._log.debug("Terminating VL id:%s", self.id)
727 self._state = VlRecordState.TERMINATE_PENDING
728
729 with self._dts.transaction(flags=0) as xact:
730 block = xact.block_create()
731 block.add_query_delete(self.xpath)
732 yield from block.execute(flags=0, now=True)
733
734 self._state = VlRecordState.TERMINATED
735 self._log.debug("Terminated VL id:%s", self.id)
736
737 def set_state_from_op_status(self, operational_status):
738 """ Set the state of this VL based on operational_status"""
739
740 self._log.debug("set_state_from_op_status called for vlr id %s with value %s", self.id, operational_status)
741 if operational_status == 'running':
742 self._state = VlRecordState.ACTIVE
743 elif operational_status == 'failed':
744 self._state = VlRecordState.FAILED
745 elif operational_status == 'vl_alloc_pending':
746 self._state = VlRecordState.INSTANTIATION_PENDING
747 else:
748 raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status))
749
750 class VnfRecordState(Enum):
751 """ Vnf Record State """
752 INIT = 101
753 INSTANTIATION_PENDING = 102
754 ACTIVE = 103
755 TERMINATE_PENDING = 104
756 TERMINATED = 105
757 FAILED = 106
758
759
760 class VirtualNetworkFunctionRecord(object):
761 """ Virtual Network Function Record class"""
762 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
763
764 @staticmethod
765 @asyncio.coroutine
766 def create_record(dts, log, loop, project, vnfd, nsr_config, const_vnfd_msg, nsd_id, nsr_name,
767 datacenter_name, nsr_id, group_name, group_instance_id,
768 placement_groups, cloud_config, restart_mode=False):
769 """Creates a new VNFR object based on the given data.
770
771 If restart mode is enabled, then we look for existing records in the
772 DTS and create a VNFR records using the exiting data(ID)
773
774 Returns:
775 VirtualNetworkFunctionRecord
776 """
777
778 vnfr_obj = VirtualNetworkFunctionRecord(
779 dts,
780 log,
781 loop,
782 project,
783 vnfd,
784 nsr_config,
785 const_vnfd_msg,
786 nsd_id,
787 nsr_name,
788 datacenter_name,
789 nsr_id,
790 group_name,
791 group_instance_id,
792 placement_groups,
793 cloud_config,
794 restart_mode=restart_mode)
795
796 if restart_mode:
797 res_iter = yield from dts.query_read(
798 project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr"),
799 rwdts.XactFlag.MERGE)
800
801 for fut in res_iter:
802 response = yield from fut
803 vnfr = response.result
804
805 if vnfr.name == vnfr_obj.name:
806 vnfr_obj.reset_id(vnfr.id)
807 break
808
809 return vnfr_obj
810
811 def __init__(self,
812 dts,
813 log,
814 loop,
815 project,
816 vnfd,
817 nsr_config,
818 const_vnfd_msg,
819 nsd_id,
820 nsr_name,
821 datacenter_name,
822 nsr_id,
823 group_name=None,
824 group_instance_id=None,
825 placement_groups = [],
826 cloud_config = None,
827 restart_mode = False):
828 self._dts = dts
829 self._log = log
830 self._loop = loop
831 self._project = project
832 self._vnfd = vnfd
833 self._nsr_config = nsr_config
834 self._const_vnfd_msg = const_vnfd_msg
835 self._nsd_id = nsd_id
836 self._nsr_name = nsr_name
837 self._nsr_id = nsr_id
838 self._datacenter_name = datacenter_name
839 self._group_name = group_name
840 self._group_instance_id = group_instance_id
841 self._placement_groups = placement_groups
842 self._cloud_config = cloud_config
843 self.restart_mode = restart_mode
844
845 self._config_status = NsrYang.ConfigStates.INIT
846 self._create_time = int(time.time())
847
848 self._prev_state = VnfRecordState.INIT
849 self._state = VnfRecordState.INIT
850 self._state_failed_reason = None
851
852 self._active_vdus = 0
853
854 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
855 self.configure()
856
857 self._vnfr_id = str(uuid.uuid4())
858 self._name = None
859
860 self.substitute_vnf_input_parameters = VnfInputParameterSubstitution(self._log,
861 self._const_vnfd_msg,
862 self._project)
863 self._vnfr_msg = self.create_vnfr_msg()
864 self._log.debug("Set VNFR {} config type to {}".
865 format(self.name, self.config_type))
866
867
868 if group_name is None and group_instance_id is not None:
869 raise ValueError("Group instance id must not be provided with an empty group name")
870
871 @property
872 def id(self):
873 """ VNFR id """
874 return self._vnfr_id
875
876 @property
877 def xpath(self):
878 """ VNFR xpath """
879 return self._project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]"
880 .format(quoted_key(self.id)))
881
882 @property
883 def vnfr_msg(self):
884 """ VNFR message """
885 return self._vnfr_msg
886
887 @property
888 def const_vnfr_msg(self):
889 """ VNFR message """
890 return RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConstituentVnfrRef(
891 vnfr_id=self.id, datacenter=self._datacenter_name)
892
893 @property
894 def vnfd(self):
895 """ vnfd """
896 return self._vnfd
897
898 @property
899 def datacenter_name(self):
900 """ Datacenter that this VNF should be created in """
901 return self._datacenter_name
902
903
904 @property
905 def active(self):
906 """ Is this VNF actve """
907 return True if self._state == VnfRecordState.ACTIVE else False
908
909 @property
910 def state(self):
911 """ state of this VNF """
912 return self._state
913
914 @property
915 def state_failed_reason(self):
916 """ Error message in case this VNF is in failed state """
917 return self._state_failed_reason
918
919 @property
920 def member_vnf_index(self):
921 """ Member VNF index """
922 return self._const_vnfd_msg.member_vnf_index
923
924 @property
925 def nsr_name(self):
926 """ NSR name"""
927 return self._nsr_name
928
929 @property
930 def name(self):
931 """ Name of this VNFR """
932 if self._name is not None:
933 return self._name
934
935 name_tags = [self._project.name, self._nsr_name]
936
937 if self._group_name is not None:
938 name_tags.append(self._group_name)
939
940 if self._group_instance_id is not None:
941 name_tags.append(str(self._group_instance_id))
942
943 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
944
945 self._name = "__".join(name_tags)
946
947 return self._name
948
949 @staticmethod
950 def vnfr_xpath(vnfr):
951 """ Get the VNFR path from VNFR """
952 return (VirtualNetworkFunctionRecord.XPATH +
953 "[vnfr:id={}]").format(quoted_key(vnfr.id))
954
955 @property
956 def config_type(self):
957 cfg_types = ['netconf', 'juju', 'script']
958 for method in cfg_types:
959 if self._vnfd.vnf_configuration.has_field(method):
960 return method
961 return 'none'
962
963 @property
964 def config_status(self):
965 """Return the config status as YANG ENUM string"""
966 self._log.debug("Map VNFR {} config status {} ({})".
967 format(self.name, self._config_status, self.config_type))
968 if self.config_type == 'none':
969 return 'config_not_needed'
970 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
971 return 'configured'
972 elif self._config_status == NsrYang.ConfigStates.FAILED:
973 return 'failed'
974
975 return 'configuring'
976
977 def set_state(self, state):
978 """ set the state of this object """
979 self._prev_state = self._state
980 self._state = state
981
982 def reset_id(self, vnfr_id):
983 self._vnfr_id = vnfr_id
984 self._vnfr_msg = self.create_vnfr_msg()
985
986 def configure(self):
987 self.config_store.merge_vnfd_config(
988 self._project.name,
989 self._nsd_id,
990 self._vnfd,
991 self.member_vnf_index,
992 )
993
994 def create_vnfr_msg(self):
995 """ VNFR message for this VNFR """
996 vnfd_fields = [
997 "short_name",
998 "vendor",
999 "description",
1000 "version",
1001 "type_yang",
1002 ]
1003 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
1004 vnfr_dict = {
1005 "id": self.id,
1006 "nsr_id_ref": self._nsr_id,
1007 "name": self.name,
1008 "datacenter": self._datacenter_name,
1009 "config_status": self.config_status
1010 }
1011 vnfr_dict.update(vnfd_copy_dict)
1012
1013 vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1014 vnfr.vnfd = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd. \
1015 from_dict(self.vnfd.as_dict())
1016 vnfr.member_vnf_index_ref = self.member_vnf_index
1017 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
1018
1019 if self._vnfd.mgmt_interface.has_field("port"):
1020 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
1021
1022 for group_info in self._placement_groups:
1023 group = vnfr.placement_groups_info.add()
1024 group.from_dict(group_info.as_dict())
1025
1026 if self._cloud_config and len(self._cloud_config.as_dict()):
1027 self._log.debug("Cloud config during vnfr create is {}".format(self._cloud_config))
1028 vnfr.cloud_config = self._cloud_config
1029
1030 # UI expects the monitoring param field to exist
1031 vnfr.monitoring_param = []
1032
1033 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
1034
1035 if self.restart_mode:
1036 vnfr.operational_status = 'init'
1037 else:
1038 # Set Operational Status as pre-init for Input Param Substitution
1039 if self._state not in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATE_PENDING,
1040 VnfRecordState.TERMINATED, VnfRecordState.FAILED]:
1041 # To make sure that an active VNFR is not updated with a previous state.
1042 # This can happen during config state updates.
1043 vnfr.operational_status = 'pre_init'
1044 else:
1045 vnfr.operational_status = self._state
1046
1047 return vnfr
1048
1049 @asyncio.coroutine
1050 def update_vnfm(self):
1051 self._log.debug("Send an update to VNFM for VNFR {} with {}".
1052 format(self.name, self.vnfr_msg))
1053 yield from self._dts.query_update(
1054 self.xpath,
1055 rwdts.XactFlag.REPLACE,
1056 self.vnfr_msg
1057 )
1058
1059 def get_config_status(self):
1060 """Return the config status as YANG ENUM"""
1061 return self._config_status
1062
1063 @asyncio.coroutine
1064 def set_config_status(self, status):
1065
1066 def status_to_string(status):
1067 status_dc = {
1068 NsrYang.ConfigStates.INIT : 'init',
1069 NsrYang.ConfigStates.CONFIGURING : 'configuring',
1070 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
1071 NsrYang.ConfigStates.CONFIGURED : 'configured',
1072 NsrYang.ConfigStates.FAILED : 'failed',
1073 }
1074
1075 return status_dc[status]
1076
1077 self._log.debug("Update VNFR {} from {} ({}) to {}".
1078 format(self.name, self._config_status,
1079 self.config_type, status))
1080 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1081 self._log.warning("Updating already configured VNFR {}".
1082 format(self.name))
1083 return
1084
1085 if self._config_status != status:
1086 try:
1087 self._config_status = status
1088 # I don't think this is used. Original implementor can check.
1089 # Caused Exception, so corrected it by status_to_string
1090 # But not sure whats the use of this variable?
1091 self.vnfr_msg.config_status = status_to_string(status)
1092 except Exception as e:
1093 self._log.exception("Exception=%s", str(e))
1094
1095 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1096
1097 if self._config_status != NsrYang.ConfigStates.INIT:
1098 try:
1099 # Publish only after VNFM has the VNFR created
1100 yield from self.update_vnfm()
1101 except Exception as e:
1102 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1103 format(status, self.name, e))
1104 self._log.exception(e)
1105
1106 def is_configured(self):
1107 if self.config_type == 'none':
1108 return True
1109
1110 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1111 return True
1112
1113 return False
1114
1115 @asyncio.coroutine
1116 def update_config_primitives(self, vnf_config, nsr):
1117 # Update only after we are configured
1118 if self._config_status == NsrYang.ConfigStates.INIT:
1119 return
1120
1121 if not vnf_config.as_dict():
1122 return
1123
1124 self._log.debug("Update VNFR {} config: {}".
1125 format(self.name, vnf_config.as_dict()))
1126
1127 # Update config primitive
1128 updated = False
1129 for prim in self._vnfd.vnf_configuration.config_primitive:
1130 for p in vnf_config.config_primitive:
1131 if prim.name == p.name:
1132 for param in prim.parameter:
1133 for pa in p.parameter:
1134 if pa.name == param.name:
1135 if pa.default_value and \
1136 (pa.default_value != param.default_value):
1137 param.default_value = pa.default_value
1138 param.read_only = pa.read_only
1139 updated = True
1140 break
1141 self._log.debug("Prim: {}".format(prim.as_dict()))
1142 break
1143
1144 if updated:
1145 self._log.debug("Updated VNFD {} config: {}".
1146 format(self._vnfd.name,
1147 self._vnfd.vnf_configuration))
1148 self._vnfr_msg = self.create_vnfr_msg()
1149
1150 try:
1151 yield from nsr.nsm_plugin.update_vnfr(self)
1152 except Exception as e:
1153 self._log.error("Exception updating VNFM with new config "
1154 "primitive for VNFR {}: {}".
1155 format(self.name, e))
1156 self._log.exception(e)
1157
1158 @asyncio.coroutine
1159 def instantiate(self, nsr):
1160 """ Instantiate this VNFR"""
1161
1162 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1163 self.xpath, self._vnfd)
1164
1165 self._log.debug("Create VNF with xpath %s and vnfr %s",
1166 self.xpath, self.vnfr_msg)
1167
1168 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1169
1170 def find_vlr_for_cp(conn):
1171 """ Find VLR for the given connection point """
1172 for vlr_id, vlr in nsr.vlrs.items():
1173 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1174 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1175 vnfd_cp.vnfd_connection_point_ref == conn.name and
1176 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1177 vlr._datacenter_name == self._datacenter_name):
1178 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1179 conn.name, self.member_vnf_index)
1180 return vlr
1181 return None
1182
1183 # For every connection point in the VNFD fill in the identifier
1184 self._log.debug("Add connection point for VNF %s: %s",
1185 self.vnfr_msg.name, self._vnfd.connection_point)
1186 for conn_p in self._vnfd.connection_point:
1187 cpr = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint()
1188 cpr.name = conn_p.name
1189 cpr.type_yang = conn_p.type_yang
1190 if conn_p.has_field('port_security_enabled'):
1191 cpr.port_security_enabled = conn_p.port_security_enabled
1192
1193 vlr_ref = find_vlr_for_cp(conn_p)
1194 if vlr_ref is None:
1195 msg = "Failed to find VLR for cp = %s" % conn_p.name
1196 self._log.debug("%s", msg)
1197 # raise VirtualNetworkFunctionRecordError(msg)
1198 continue
1199
1200 cpr.vlr_ref = vlr_ref.id
1201
1202 self.vnfr_msg.connection_point.append(cpr)
1203 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1204 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id)
1205
1206 self._log.debug("VNFR {} restart mode {}".
1207 format(self.vnfr_msg.id, self.restart_mode))
1208 if not self.restart_mode:
1209 # Checking for NS Terminate.
1210 if nsr._ns_terminate_received == False:
1211 # Create with pre-init operational state publishes the vnfr for substitution.
1212 yield from self._dts.query_create(self.xpath, 0, self.vnfr_msg)
1213 # Call to substitute VNF Input Parameter
1214 self.substitute_vnf_input_parameters(self.vnfr_msg, self._nsr_config)
1215 # Calling Update with pre-init operational data after Param substitution to instatntiate vnfr
1216 yield from self._dts.query_update(self.xpath, 0, self.vnfr_msg)
1217
1218 else:
1219 yield from self._dts.query_update(self.xpath,
1220 0,
1221 self.vnfr_msg)
1222
1223 self._log.info("Created VNF with xpath %s and vnfr %s",
1224 self.xpath, self.vnfr_msg)
1225
1226 @asyncio.coroutine
1227 def update_state(self, vnfr_msg):
1228 """ Update this VNFR"""
1229 if vnfr_msg.operational_status == "running":
1230 if self.vnfr_msg.operational_status != "running":
1231 yield from self.is_active()
1232 elif vnfr_msg.operational_status == "failed":
1233 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1234
1235 @asyncio.coroutine
1236 def is_active(self):
1237 """ This VNFR is active """
1238 self._log.debug("VNFR %s is active", self._vnfr_id)
1239 self.set_state(VnfRecordState.ACTIVE)
1240
1241 @asyncio.coroutine
1242 def instantiation_failed(self, failed_reason=None):
1243 """ This VNFR instantiation failed"""
1244 self._log.debug("VNFR %s instantiation failed", self._vnfr_id)
1245 self.set_state(VnfRecordState.FAILED)
1246 self._state_failed_reason = failed_reason
1247
1248 def vnfr_in_vnfm(self):
1249 """ Is there a VNFR record in VNFM """
1250 if (self._state == VnfRecordState.ACTIVE or
1251 self._state == VnfRecordState.INSTANTIATION_PENDING or
1252 self._state == VnfRecordState.FAILED):
1253 return True
1254
1255 return False
1256
1257 @asyncio.coroutine
1258 def terminate(self):
1259 """ Terminate this VNF """
1260 if not self.vnfr_in_vnfm():
1261 self._log.debug("Ignoring terminate request for id %s in state %s",
1262 self.id, self._state)
1263 return
1264
1265 self._log.debug("Terminating VNF id:%s", self.id)
1266 self.set_state(VnfRecordState.TERMINATE_PENDING)
1267 with self._dts.transaction(flags=0) as xact:
1268 block = xact.block_create()
1269 block.add_query_delete(self.xpath)
1270 yield from block.execute(flags=0)
1271 self.set_state(VnfRecordState.TERMINATED)
1272 self._log.debug("Terminated VNF id:%s", self.id)
1273
1274
1275 class NetworkServiceStatus(object):
1276 """ A class representing the Network service's status """
1277 MAX_EVENTS_RECORDED = 10
1278 """ Network service Status class"""
1279 def __init__(self, dts, log, loop):
1280 self._dts = dts
1281 self._log = log
1282 self._loop = loop
1283
1284 self._state = NetworkServiceRecordState.INIT
1285 self._events = deque([])
1286
1287 @asyncio.coroutine
1288 def create_notification(self, evt, evt_desc, evt_details):
1289 xp = "N,/rw-nsr:nsm-notification"
1290 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1291 notif.event = evt
1292 notif.description = evt_desc
1293 notif.details = evt_details if evt_details is not None else None
1294
1295 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1296 self._log.info("Notification called by creating dts query: %s", notif)
1297
1298 def record_event(self, evt, evt_desc, evt_details):
1299 """ Record an event """
1300 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1301 evt, evt_desc, len(self._events))
1302 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1303 self._events.popleft()
1304 self._events.append((int(time.time()), evt, evt_desc,
1305 evt_details if evt_details is not None else None))
1306
1307 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1308
1309 def set_state(self, state):
1310 """ set the state of this status object """
1311 self._state = state
1312
1313 def yang_str(self):
1314 """ Return the state as a yang enum string """
1315 state_to_str_map = {"INIT": "init",
1316 "VL_INIT_PHASE": "vl_init_phase",
1317 "VNF_INIT_PHASE": "vnf_init_phase",
1318 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1319 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1320 "RUNNING": "running",
1321 "SCALING_OUT": "scaling_out",
1322 "SCALING_IN": "scaling_in",
1323 "TERMINATE_RCVD": "terminate_rcvd",
1324 "TERMINATE": "terminate",
1325 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1326 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1327 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1328 "TERMINATED": "terminated",
1329 "FAILED": "failed",
1330 "VL_INSTANTIATE": "vl_instantiate",
1331 "VL_TERMINATE": "vl_terminate",
1332 }
1333 return state_to_str_map[self._state.name]
1334
1335 @property
1336 def state(self):
1337 """ State of this status object """
1338 return self._state
1339
1340 @property
1341 def msg(self):
1342 """ Network Service Record as a message"""
1343 event_list = []
1344 idx = 1
1345 for entry in self._events:
1346 event = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_OperationalEvents()
1347 event.id = idx
1348 idx += 1
1349 event.timestamp, event.event, event.description, event.details = entry
1350 event_list.append(event)
1351 return event_list
1352
1353
1354 class NetworkServiceRecord(object):
1355 """ Network service record """
1356 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1357
1358 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg,
1359 sdn_account_name, key_pairs, project, restart_mode=False,
1360 vlr_handler=None):
1361 self._dts = dts
1362 self._log = log
1363 self._loop = loop
1364 self._nsm = nsm
1365 self._nsr_cfg_msg = nsr_cfg_msg
1366 self._nsm_plugin = nsm_plugin
1367 self._sdn_account_name = sdn_account_name
1368 self._vlr_handler = vlr_handler
1369 self._project = project
1370
1371 self._nsd = None
1372 self._nsr_msg = None
1373 self._nsr_regh = None
1374 self._key_pairs = key_pairs
1375 self._ssh_key_file = None
1376 self._ssh_pub_key = None
1377 self._vlrs = {}
1378 self._vnfrs = {}
1379 self._vnfds = {}
1380 self._vnffgrs = {}
1381 self._param_pools = {}
1382 self._scaling_groups = {}
1383 self._create_time = int(time.time())
1384 self._op_status = NetworkServiceStatus(dts, log, loop)
1385 self._config_status = NsrYang.ConfigStates.CONFIGURING
1386 self._config_status_details = None
1387 self._job_id = 0
1388 self.restart_mode = restart_mode
1389 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1390 self._debug_running = False
1391 self._is_active = False
1392 self._vl_phase_completed = False
1393 self._vnf_phase_completed = False
1394 self.instantiated = set()
1395
1396 # Used for orchestration_progress
1397 self._active_vms = 0
1398 self._active_networks = 0
1399
1400 # A flag to indicate if the NS has failed, currently it is recorded in
1401 # operational status, but at the time of termination this field is
1402 # over-written making it difficult to identify the failure.
1403 self._is_failed = False
1404
1405 # Initalise the state to init
1406 # The NSR moves through the following transitions
1407 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1408 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1409 # 3. VNFS_READY - READY when the NSR is published
1410
1411 self.set_state(NetworkServiceRecordState.INIT)
1412
1413 self.substitute_input_parameters = InputParameterSubstitution(self._log, self._project)
1414
1415 # Create an asyncio loop to know when the virtual links are ready
1416 self._vls_ready = asyncio.Event(loop=self._loop)
1417
1418 # This variable stores all the terminate events received per NS. This is then used to prevent any
1419 # further nsr non-terminate updates received in case of terminate being called bedore ns in in running state.
1420 self._ns_terminate_received = False
1421
1422 @property
1423 def nsm_plugin(self):
1424 """ NSM Plugin """
1425 return self._nsm_plugin
1426
1427 def set_state(self, state):
1428 """ Set state for this NSR"""
1429 # We are in init phase and is moving to the next state
1430 # The new state could be a FAILED state or VNF_INIIT_PHASE
1431 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1432 self._vl_phase_completed = True
1433
1434 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1435 self._vnf_phase_completed = True
1436
1437 self._op_status.set_state(state)
1438
1439 self._nsm_plugin.set_state(self.id, state)
1440
1441 @property
1442 def id(self):
1443 """ Get id for this NSR"""
1444 return self._nsr_cfg_msg.id
1445
1446 @property
1447 def name(self):
1448 """ Name of this network service record """
1449 return self._nsr_cfg_msg.name
1450
1451 @property
1452 def _datacenter_name(self):
1453 if self._nsr_cfg_msg.has_field('datacenter'):
1454 return self._nsr_cfg_msg.datacenter
1455 return None
1456
1457 @property
1458 def state(self):
1459 """State of this NetworkServiceRecord"""
1460 return self._op_status.state
1461
1462 @property
1463 def active(self):
1464 """ Is this NSR active ?"""
1465 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1466
1467 @property
1468 def vlrs(self):
1469 """ VLRs associated with this NSR"""
1470 return self._vlrs
1471
1472 @property
1473 def vnfrs(self):
1474 """ VNFRs associated with this NSR"""
1475 return self._vnfrs
1476
1477 @property
1478 def vnffgrs(self):
1479 """ VNFFGRs associated with this NSR"""
1480 return self._vnffgrs
1481
1482 @property
1483 def scaling_groups(self):
1484 """ Scaling groups associated with this NSR """
1485 return self._scaling_groups
1486
1487 @property
1488 def param_pools(self):
1489 """ Parameter value pools associated with this NSR"""
1490 return self._param_pools
1491
1492 @property
1493 def nsr_cfg_msg(self):
1494 return self._nsr_cfg_msg
1495
1496 @nsr_cfg_msg.setter
1497 def nsr_cfg_msg(self, msg):
1498 self._nsr_cfg_msg = msg
1499
1500 @property
1501 def nsd_msg(self):
1502 """ NSD Protobuf for this NSR """
1503 if self._nsd is not None:
1504 return self._nsd
1505 self._nsd = self._nsr_cfg_msg.nsd
1506 return self._nsd
1507
1508 @property
1509 def nsd_id(self):
1510 """ NSD ID for this NSR """
1511 return self.nsd_msg.id
1512
1513 @property
1514 def job_id(self):
1515 ''' Get a new job id for config primitive'''
1516 self._job_id += 1
1517 return self._job_id
1518
1519 @property
1520 def config_status(self):
1521 """ Config status for NSR """
1522 return self._config_status
1523
1524 @property
1525 def nsm(self):
1526 """NS Manager"""
1527 return self._nsm
1528
1529 @property
1530 def is_failed(self):
1531 return self._is_failed
1532
1533 @property
1534 def public_key(self):
1535 return self._ssh_pub_key
1536
1537 @property
1538 def private_key(self):
1539 return self._ssh_key_file
1540
1541 def resolve_placement_group_cloud_construct(self, input_group):
1542 """
1543 Returns the cloud specific construct for placement group
1544 """
1545 copy_dict = ['name', 'requirement', 'strategy']
1546
1547 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1548 if group_info.placement_group_ref == input_group.name:
1549 group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1550 group_dict = {k:v for k,v in
1551 group_info.as_dict().items() if k != 'placement_group_ref'}
1552 for param in copy_dict:
1553 group_dict.update({param: getattr(input_group, param)})
1554 group.from_dict(group_dict)
1555 return group
1556 return None
1557
1558
1559 def __str__(self):
1560 return "NSR(name={}, nsd_id={}, data center={})".format(
1561 self.name, self.nsd_id, self._datacenter_name
1562 )
1563
1564 def _get_vnfd(self, vnfd_id, config_xact):
1565 """ Fetch vnfd msg for the passed vnfd id """
1566 return self._nsm.get_vnfd(vnfd_id, config_xact)
1567
1568 def _get_vnfd_datacenter(self, vnfd_member_index):
1569 """ Fetch datacenter for the passed vnfd id """
1570 if self._nsr_cfg_msg.vnf_datacenter_map:
1571 vim_accounts = [vnf.datacenter for vnf in self._nsr_cfg_msg.vnf_datacenter_map \
1572 if str(vnfd_member_index) == str(vnf.member_vnf_index_ref)]
1573 if vim_accounts and vim_accounts[0]:
1574 return vim_accounts[0]
1575 return self._datacenter_name
1576
1577 def _get_constituent_vnfd_msg(self, vnf_index):
1578 for const_vnfd in self.nsd_msg.constituent_vnfd:
1579 if const_vnfd.member_vnf_index == vnf_index:
1580 return const_vnfd
1581
1582 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1583
1584 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1585 """ Record an event """
1586 self._op_status.record_event(evt, evt_desc, evt_details)
1587 if state is not None:
1588 self.set_state(state)
1589
1590 def scaling_trigger_str(self, trigger):
1591 SCALING_TRIGGER_STRS = {
1592 NsdBaseYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1593 NsdBaseYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1594 NsdBaseYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1595 NsdBaseYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1596 }
1597 try:
1598 return SCALING_TRIGGER_STRS[trigger]
1599 except Exception as e:
1600 self._log.error("Scaling trigger mapping error for {} : {}".
1601 format(trigger, e))
1602 self._log.exception(e)
1603 return "Unknown trigger"
1604
1605 def generate_ssh_key_pair(self, config_xact):
1606 '''Generate a ssh key pair if required'''
1607 if self._ssh_key_file:
1608 self._log.debug("Key pair already generated")
1609 return
1610
1611 gen_key = False
1612 for cv in self.nsd_msg.constituent_vnfd:
1613 vnfd = self._get_vnfd(cv.vnfd_id_ref, config_xact)
1614 if vnfd and vnfd.mgmt_interface.ssh_key:
1615 gen_key = True
1616 break
1617
1618 if not gen_key:
1619 return
1620
1621 try:
1622 key = ManoSshKey(self._log)
1623 path = tempfile.mkdtemp()
1624 key.write_to_disk(name=self.id, directory=path)
1625 self._ssh_key_file = "file://{}".format(key.private_key_file)
1626 self._ssh_pub_key = key.public_key
1627 except Exception as e:
1628 self._log.exception("Error generating ssh key for {}: {}".
1629 format(self.nsr_cfg_msg.name, e))
1630
1631 @asyncio.coroutine
1632 def instantiate_vls(self):
1633 """
1634 This function instantiates VLs for every VL in this Network Service
1635 """
1636 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1637 self.id)
1638 for vlr_id, vlr in self._vlrs.items():
1639 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1640
1641 if not isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin):
1642 self._vls_ready.set()
1643
1644 # Wait for the VLs to be ready before yielding control out
1645 self._log.debug("Waitng for %d VLs in NSR id %s to be active",
1646 len(self._vlrs), self.id)
1647 if self._vlrs:
1648 self._log.debug("NSR id:%s, name:%s - Waiting for %d VLs to be ready",
1649 self.id, self.name, len(self._vlrs))
1650 yield from self._vls_ready.wait()
1651 else:
1652 self._log.debug("NSR id:%s, name:%s, No virtual links found",
1653 self.id, self.name)
1654 self._vls_ready.set()
1655
1656 self._log.info("All %d VLs in NSR id %s are active, start the VNFs",
1657 len(self._vlrs), self.id)
1658 @asyncio.coroutine
1659 def create(self, config_xact):
1660 """ Create this network service"""
1661 self._log.debug("Create NS {} for {}".format(self.name, self._project.name))
1662 # Create virtual links for all the external vnf
1663 # connection points in this NS
1664 yield from self.create_vls()
1665
1666 # Create VNFs in this network service
1667 yield from self.create_vnfs(config_xact)
1668
1669 # Create VNFFG for network service
1670 self.create_vnffgs()
1671
1672 # Create Scaling Groups for each scaling group in NSD
1673 self.create_scaling_groups()
1674
1675 # Create Parameter Pools
1676 self.create_param_pools()
1677
1678 @asyncio.coroutine
1679 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1680 """ Apply config based on script for scale group """
1681 rift_var_root_dir = os.environ['RIFT_VAR_ROOT']
1682
1683 @asyncio.coroutine
1684 def add_vnfrs_data(vnfrs_list):
1685 """ Add as a dict each of the VNFRs data """
1686 vnfrs_data = []
1687
1688 for vnfr in vnfrs_list:
1689 self._log.debug("Add VNFR {} data".format(vnfr))
1690 vnfr_data = dict()
1691 vnfr_data['name'] = vnfr.name
1692 if trigger in [NsdBaseYang.ScalingTrigger.PRE_SCALE_IN,
1693 NsdBaseYang.ScalingTrigger.POST_SCALE_OUT]:
1694 # Get VNF management and other IPs, etc
1695 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1696 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1697 try:
1698 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1699 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1700 vnfr_data['member_vnf_index_ref'] = opdata.member_vnf_index_ref
1701 vnfr_data['vdur_data'] = []
1702 for vdur in opdata.vdur:
1703 vdur_data = dict()
1704 vdur_data['vm_name'] = vdur.name
1705 vdur_data['vm_mgmt_ip'] = vdur.vm_management_ip
1706 vnfr_data['vdur_data'].append(vdur_data)
1707 except Exception as e:
1708 self._log.error("Unable to get management IP for vnfr {}:{}".
1709 format(vnfr.name, e))
1710
1711 try:
1712 vnfr_data['connection_points'] = []
1713 for cp in opdata.connection_point:
1714 con_pt = dict()
1715 con_pt['name'] = cp.name
1716 con_pt['ip_address'] = cp.ip_address
1717 vnfr_data['connection_points'].append(con_pt)
1718 except Exception as e:
1719 self._log.error("Exception getting connections points for VNFR {}: {}".
1720 format(vnfr.name, e))
1721
1722 vnfrs_data.append(vnfr_data)
1723 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1724
1725 return vnfrs_data
1726
1727 def add_nsr_data(nsr):
1728 nsr_data = dict()
1729 nsr_data['name'] = nsr.name
1730 return nsr_data
1731
1732 if script is None or len(script) == 0:
1733 self._log.error("Script not provided for scale group config: {}".format(group.name))
1734 return False
1735
1736 if script[0] == '/':
1737 path = script
1738 else:
1739 path = os.path.join(rift_var_root_dir,
1740 'launchpad/packages/nsd',
1741 self._project.name,
1742 self.nsd_id, 'scripts',
1743 script)
1744
1745 if not os.path.exists(path):
1746 self._log.error("Config failed for scale group {}: Script does not exist at {}".
1747 format(group.name, path))
1748 return False
1749
1750 # Build a YAML file with all parameters for the script to execute
1751 # The data consists of 5 sections
1752 # 1. Trigger
1753 # 2. Scale group config
1754 # 3. VNFRs in the scale group
1755 # 4. VNFRs outside scale group
1756 # 5. NSR data
1757 data = dict()
1758 data['trigger'] = group.trigger_map(trigger)
1759 data['config'] = group.group_msg.as_dict()
1760
1761 if vnfrs:
1762 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1763 else:
1764 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1765
1766 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1767 data["nsr"] = add_nsr_data(self)
1768
1769 tmp_file = None
1770 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1771 tmp_file.write(yaml.dump(data, default_flow_style=True)
1772 .encode("UTF-8"))
1773
1774 self._log.debug("Creating a temp file: {} with input data: {}".
1775 format(tmp_file.name, data))
1776
1777 cmd = "{} {}".format(path, tmp_file.name)
1778 self._log.debug("Running the CMD: {}".format(cmd))
1779 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1780 rc = yield from proc.wait()
1781 if rc:
1782 self._log.error("The script {} for scale group {} config returned: {}".
1783 format(script, group.name, rc))
1784 return False
1785
1786 # Success
1787 return True
1788
1789
1790 @asyncio.coroutine
1791 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1792 """ Apply the config for the scaling group based on trigger """
1793 if group is None or scale_instance is None:
1794 return False
1795
1796 @asyncio.coroutine
1797 def update_config_status(success=True, err_msg=None):
1798 """ This is ugly!!!
1799 We are trying to determine the scaling instance's config status
1800 as a collation of the config status associated with 4 different triggers
1801 """
1802 self._log.debug("Update %s scaling config status to %r : %s",
1803 scale_instance, success, err_msg)
1804 if (scale_instance.config_status == "failed"):
1805 # Do not update the config status if it is already in failed state
1806 return
1807
1808 if scale_instance.config_status == "configured":
1809 # Update only to failed state an already configured scale instance
1810 if not success:
1811 scale_instance.config_status = "failed"
1812 scale_instance.config_err_msg = err_msg
1813 yield from self.update_state()
1814 else:
1815 # We are in configuring state
1816 # Only after post scale out mark instance as configured
1817 if trigger == NsdBaseYang.ScalingTrigger.POST_SCALE_OUT:
1818 if success:
1819 scale_instance.config_status = "configured"
1820 for vnfr in scale_instance.vnfrs:
1821 if vnfr.config_status == "configuring":
1822 vnfr.vnfr_msg.config_status = "configured"
1823 yield from vnfr.update_vnfm()
1824 else:
1825 scale_instance.config_status = "failed"
1826 scale_instance.config_err_msg = err_msg
1827
1828 yield from self.update_state()
1829 # Publish config state as update_state seems to care only operational status
1830 yield from self.publish()
1831
1832 config = group.trigger_config(trigger)
1833 if config is None:
1834 if trigger == NsdBaseYang.ScalingTrigger.POST_SCALE_OUT:
1835 self._log.debug("No config needed, update %s scaling config status to configured",
1836 scale_instance)
1837 scale_instance.config_status = "configured"
1838 return True
1839
1840 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1841 if config.has_field("ns_service_primitive_name_ref"):
1842 config_name = config.ns_service_primitive_name_ref
1843 nsd_msg = self.nsd_msg
1844 config_primitive = None
1845 for ns_cfg_prim in nsd_msg.service_primitive:
1846 if ns_cfg_prim.name == config_name:
1847 config_primitive = ns_cfg_prim
1848 break
1849
1850 if config_primitive is None:
1851 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1852
1853 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1854 if config_primitive.has_field("user_defined_script"):
1855 script_path = '/'.join(["launchpad/packages/nsd", self._project.name, nsd_msg.id, "scripts", config_primitive.user_defined_script])
1856 rc = yield from self.apply_scale_group_config_script(script_path,
1857 group, scale_instance, trigger, vnfrs)
1858 err_msg = None
1859 if not rc:
1860 err_msg = "Failed config for trigger {} using config script '{}'". \
1861 format(self.scaling_trigger_str(trigger),
1862 config_primitive.user_defined_script)
1863 yield from update_config_status(success=rc, err_msg=err_msg)
1864 return rc
1865 else:
1866 err_msg = "Failed config for trigger {} as config script is not specified". \
1867 format(self.scaling_trigger_str(trigger))
1868 yield from update_config_status(success=False, err_msg=err_msg)
1869 raise NotImplementedError("Only script based config support for scale group for now: {}".
1870 format(group.name))
1871 else:
1872 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1873 format(self.scaling_trigger_str(trigger))
1874 yield from update_config_status(success=False, err_msg=err_msg)
1875 self._log.error("Config primitive not specified for config action in scale group %s" %
1876 (group.name))
1877 return False
1878
1879 def create_scaling_groups(self):
1880 """ This function creates a NSScalingGroup for every scaling
1881 group defined in he NSD"""
1882
1883 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1884 self._log.debug("Found scaling_group %s in nsr id %s",
1885 scaling_group_msg.name, self.id)
1886
1887 group_record = scale_group.ScalingGroup(
1888 self._log,
1889 scaling_group_msg
1890 )
1891
1892 self._scaling_groups[group_record.name] = group_record
1893
1894 @asyncio.coroutine
1895 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1896 group = self._scaling_groups[group_name]
1897 scale_instance = group.create_instance(index, is_default)
1898
1899 @asyncio.coroutine
1900 def create_vnfs():
1901 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1902 len(self.nsd_msg.constituent_vnfd), self.id, self)
1903
1904 vnfrs = []
1905 for vnf_index, count in group.vnf_index_count_map.items():
1906 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1907 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1908
1909 datacenter_name = self._get_vnfd_datacenter(const_vnfd_msg.member_vnf_index)
1910 if datacenter_name is None:
1911 datacenter_name = self._datacenter_name
1912 for _ in range(count):
1913 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, datacenter_name, group_name, index)
1914 scale_instance.add_vnfr(vnfr)
1915 vnfrs.append(vnfr)
1916 return vnfrs
1917
1918 @asyncio.coroutine
1919 def instantiate_instance():
1920 self._log.debug("Creating %s VNFRS", scale_instance)
1921 vnfrs = yield from create_vnfs()
1922 yield from self.publish()
1923
1924 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1925 scale_instance.operational_status = "vnf_init_phase"
1926 yield from self.update_state()
1927
1928 try:
1929 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.PRE_SCALE_OUT,
1930 group, scale_instance, vnfrs)
1931 if not rc:
1932 self._log.error("Pre scale out config for scale group {} ({}) failed".
1933 format(group.name, index))
1934 scale_instance.operational_status = "failed"
1935 else:
1936 yield from self.instantiate_vnfs(vnfrs, scaleout=True)
1937
1938
1939 except Exception as e:
1940 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1941 format(group.name, e))
1942 self._log.exception(e)
1943 scale_instance.operational_status = "failed"
1944
1945 yield from self.update_state()
1946
1947 yield from instantiate_instance()
1948
1949 @asyncio.coroutine
1950 def delete_scale_group_instance(self, group_name, index):
1951 group = self._scaling_groups[group_name]
1952 scale_instance = group.get_instance(index)
1953 if scale_instance.is_default:
1954 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1955
1956 scale_instance.operational_status = "terminate"
1957 yield from self.update_state()
1958
1959 @asyncio.coroutine
1960 def terminate_instance():
1961 self._log.debug("Terminating scaling instance %s VNFRS" % scale_instance)
1962 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.PRE_SCALE_IN,
1963 group, scale_instance)
1964 if not rc:
1965 self._log.error("Pre scale in config for scale group {} ({}) failed".
1966 format(group.name, index))
1967
1968 # Going ahead with terminate, even if there is an error in pre-scale-in config
1969 # as this could be result of scale out failure and we need to cleanup this group
1970 yield from self.terminate_vnfrs(scale_instance.vnfrs, scalein=True)
1971 group.delete_instance(index)
1972
1973 scale_instance.operational_status = "vnf_terminate_phase"
1974 yield from self.update_state()
1975
1976 yield from terminate_instance()
1977
1978 @asyncio.coroutine
1979 def _update_scale_group_instances_status(self):
1980 @asyncio.coroutine
1981 def post_scale_out_task(group, instance):
1982 # Apply post scale out config once all VNFRs are active
1983 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.POST_SCALE_OUT,
1984 group, instance)
1985 instance.operational_status = "running"
1986 if rc:
1987 self._log.debug("Scale out for group {} and instance {} succeeded".
1988 format(group.name, instance.instance_id))
1989 else:
1990 self._log.error("Post scale out config for scale group {} ({}) failed".
1991 format(group.name, instance.instance_id))
1992
1993 yield from self.update_state()
1994
1995 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1996 for group, instances in group_instances.items():
1997 self._log.debug("Updating %s instance status", group)
1998 for instance in instances:
1999 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
2000 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
2001 if instance.operational_status == "vnf_init_phase":
2002 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
2003 instance.operational_status = "running"
2004
2005 # Create a task for post scale out to allow us to sleep before attempting
2006 # to configure newly created VM's
2007 self._loop.create_task(post_scale_out_task(group, instance))
2008
2009 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
2010 self._log.debug("Scale out for group {} and instance {} failed".
2011 format(group.name, instance.instance_id))
2012 instance.operational_status = "failed"
2013
2014 elif instance.operational_status == "vnf_terminate_phase":
2015 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
2016 instance.operational_status = "terminated"
2017 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.POST_SCALE_IN,
2018 group, instance)
2019 if rc:
2020 self._log.debug("Scale in for group {} and instance {} succeeded".
2021 format(group.name, instance.instance_id))
2022 else:
2023 self._log.error("Post scale in config for scale group {} ({}) failed".
2024 format(group.name, instance.instance_id))
2025
2026 def create_vnffgs(self):
2027 """ This function creates VNFFGs for every VNFFG in the NSD
2028 associated with this NSR"""
2029
2030 for vnffgd in self.nsd_msg.vnffgd:
2031 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
2032 vnffgr = VnffgRecord(self._dts,
2033 self._log,
2034 self._loop,
2035 self._nsm._vnffgmgr,
2036 self,
2037 self.name,
2038 vnffgd,
2039 self._sdn_account_name,
2040 self._datacenter_name
2041 )
2042 self._vnffgrs[vnffgr.id] = vnffgr
2043
2044 def resolve_vld_ip_profile(self, nsd_msg, vld):
2045 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
2046 if not vld.has_field('ip_profile_ref'):
2047 return None
2048 profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
2049 return profile[0] if profile else None
2050
2051 @asyncio.coroutine
2052 def _create_vls(self, vld, datacenter):
2053 """Create a VLR in the cloud account specified using the given VLD
2054
2055 Args:
2056 vld : VLD yang obj
2057 datacenter : Cloud account name
2058
2059 Returns:
2060 VirtualLinkRecord
2061 """
2062 vlr = yield from VirtualLinkRecord.create_record(
2063 self._dts,
2064 self._log,
2065 self._loop,
2066 self._project,
2067 self.name,
2068 vld,
2069 datacenter,
2070 self.resolve_vld_ip_profile(self.nsd_msg, vld),
2071 self.id,
2072 restart_mode=self.restart_mode)
2073
2074 return vlr
2075
2076 def _extract_datacenters_for_vl(self, vld):
2077 """
2078 Extracts the list of cloud accounts from the NS Config obj
2079
2080 Rules:
2081 1. Cloud accounts based connection point (vnf_datacenter_map)
2082 Args:
2083 vld : VLD yang object
2084
2085 Returns:
2086 TYPE: Description
2087 """
2088 datacenter_list = []
2089
2090 if self._nsr_cfg_msg.vnf_datacenter_map:
2091 # Handle case where datacenter is None
2092 vnf_datacenter_map = {}
2093 for vnf in self._nsr_cfg_msg.vnf_datacenter_map:
2094 if vnf.datacenter is not None or vnf.datacenter is not None:
2095 vnf_datacenter_map[vnf.member_vnf_index_ref] = \
2096 vnf.datacenter
2097
2098 for vnfc in vld.vnfd_connection_point_ref:
2099 datacenter = vnf_datacenter_map.get(
2100 vnfc.member_vnf_index_ref, self._datacenter_name)
2101
2102 datacenter_list.append(datacenter)
2103
2104 if self._nsr_cfg_msg.vl_datacenter_map:
2105 for vld_map in self._nsr_cfg_msg.vl_datacenter_map:
2106 if vld_map.vld_id_ref == vld.id:
2107 for datacenter in vld_map.datacenters:
2108 datacenter_list.append(datacenter)
2109
2110 # If no config has been provided then fall-back to the default
2111 # account
2112 if not datacenter_list:
2113 datacenter_list.append(self._datacenter_name)
2114
2115 self._log.debug("VL {} data center list: {}".
2116 format(vld.name, datacenter_list))
2117 return set(datacenter_list)
2118
2119 @asyncio.coroutine
2120 def create_vls(self):
2121 """ This function creates VLs for every VLD in the NSD
2122 associated with this NSR"""
2123 for vld in self.nsd_msg.vld:
2124
2125 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
2126 datacenter_list = self._extract_datacenters_for_vl(vld)
2127 for datacenter in datacenter_list:
2128 vlr = yield from self._create_vls(vld, datacenter)
2129 self._vlrs[vlr.id] = vlr
2130 self._nsm.add_vlr_id_nsr_map(vlr.id, self)
2131
2132 @asyncio.coroutine
2133 def create_vl_instance(self, vld):
2134 self._log.error("Create VL for {}: {}".format(self.id, vld.as_dict()))
2135 # Check if the VL is already present
2136 vlr = None
2137 for vl_id, vl in self._vlrs.items():
2138 if vl.vld_msg.id == vld.id:
2139 self._log.error("The VLD %s already in NSR %s as VLR %s with status %s",
2140 vld.id, self.id, vl.id, vl.state)
2141 vlr = vl
2142 if vlr.state != VlRecordState.TERMINATED:
2143 err_msg = "VLR for VL {} in NSR {} already instantiated". \
2144 format(vld, self.id)
2145 self._log.error(err_msg)
2146 raise NsrVlUpdateError(err_msg)
2147 break
2148
2149 if vlr is None:
2150 datacenter_list = self._extract_datacenters_for_vl(vld)
2151 for datacenter in datacenter_list:
2152 vlr = yield from self._create_vls(vld, account, datacenter)
2153 self._vlrs[vlr.id] = vlr
2154 self._nsm.add_vlr_id_nsr_map(vlr.id, self)
2155
2156 vlr.state = VlRecordState.INSTANTIATION_PENDING
2157 yield from self.update_state()
2158
2159 try:
2160 yield from self.nsm_plugin.instantiate_vl(self, vlr)
2161
2162 except Exception as e:
2163 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
2164 format(self.id, vld.id, e)
2165 self._log.error(err_msg)
2166 self._log.exception(e)
2167 vlr.state = VlRecordState.FAILED
2168
2169 yield from self.update_state()
2170
2171 @asyncio.coroutine
2172 def delete_vl_instance(self, vld):
2173 for vlr_id, vlr in self._vlrs.items():
2174 if vlr.vld_msg.id == vld.id:
2175 self._log.debug("Found VLR %s for VLD %s in NSR %s",
2176 vlr.id, vld.id, self.id)
2177 vlr.state = VlRecordState.TERMINATE_PENDING
2178 yield from self.update_state()
2179
2180 try:
2181 yield from self.nsm_plugin.terminate_vl(vlr)
2182 vlr.state = VlRecordState.TERMINATED
2183 del self._vlrs[vlr]
2184 self.remove_vlr_id_nsr_map(vlr.id)
2185
2186 except Exception as e:
2187 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
2188 format(self.id, vld.id, e)
2189 self._log.error(err_msg)
2190 self._log.exception(e)
2191 vlr.state = VlRecordState.FAILED
2192
2193 yield from self.update_state()
2194 break
2195
2196 @asyncio.coroutine
2197 def create_vnfs(self, config_xact):
2198 """
2199 This function creates VNFs for every VNF in the NSD
2200 associated with this NSR
2201 """
2202 self._log.debug("Creating %u VNFs associated with this NS id %s",
2203 len(self.nsd_msg.constituent_vnfd), self.id)
2204
2205 for const_vnfd in self.nsd_msg.constituent_vnfd:
2206 if not const_vnfd.start_by_default:
2207 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
2208 const_vnfd.member_vnf_index)
2209 continue
2210
2211 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
2212 datacenter_name = self._get_vnfd_datacenter(const_vnfd.member_vnf_index)
2213 if datacenter_name is None:
2214 datacenter_name = self._datacenter_name
2215 yield from self.create_vnf_record(vnfd_msg, const_vnfd, datacenter_name)
2216
2217 def get_placement_groups(self, vnfd_msg, const_vnfd):
2218 placement_groups = []
2219 for group in self.nsd_msg.placement_groups:
2220 for member_vnfd in group.member_vnfd:
2221 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
2222 (member_vnfd.member_vnf_index_ref == str(const_vnfd.member_vnf_index)):
2223 group_info = self.resolve_placement_group_cloud_construct(group)
2224 if group_info is None:
2225 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
2226 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
2227 else:
2228 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
2229 str(group_info),
2230 vnfd_msg.name,
2231 const_vnfd.member_vnf_index)
2232 placement_groups.append(group_info)
2233 return placement_groups
2234
2235 def get_cloud_config(self):
2236 cloud_config = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_CloudConfig()
2237 self._log.debug("Received key pair is {}".format(self._key_pairs))
2238
2239 for authorized_key in self.nsr_cfg_msg.ssh_authorized_key:
2240 if authorized_key.key_pair_ref in self._key_pairs:
2241 key_pair = cloud_config.key_pair.add()
2242 key_pair.from_dict(self._key_pairs[authorized_key.key_pair_ref].as_dict())
2243 for nsd_key_pair in self.nsd_msg.key_pair:
2244 key_pair = cloud_config.key_pair.add()
2245 key_pair.from_dict(key_pair.as_dict())
2246 for nsr_cfg_user in self.nsr_cfg_msg.user:
2247 user = cloud_config.user.add()
2248 user.name = nsr_cfg_user.name
2249 user.user_info = nsr_cfg_user.user_info
2250 for ssh_key in nsr_cfg_user.ssh_authorized_key:
2251 if ssh_key.key_pair_ref in self._key_pairs:
2252 key_pair = user.key_pair.add()
2253 key_pair.from_dict(self._key_pairs[ssh_key.key_pair_ref].as_dict())
2254 for nsd_user in self.nsd_msg.user:
2255 user = cloud_config.user.add()
2256 user.from_dict(nsd_user.as_dict())
2257
2258 self._log.debug("Formed cloud-config msg is {}".format(cloud_config))
2259 return cloud_config
2260
2261 @asyncio.coroutine
2262 def create_vnf_record(self, vnfd_msg, const_vnfd, datacenter_name, group_name=None, group_instance_id=None):
2263 # Fetch the VNFD associated with this VNF
2264 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
2265 cloud_config = self.get_cloud_config()
2266 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,datacenter_name)
2267 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2268 vnfd_msg.name,
2269 const_vnfd.member_vnf_index,
2270 [ group.name for group in placement_groups])
2271
2272 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
2273 self._log,
2274 self._loop,
2275 self._project,
2276 vnfd_msg,
2277 self._nsr_cfg_msg,
2278 const_vnfd,
2279 self.nsd_id,
2280 self.name,
2281 datacenter_name,
2282 self.id,
2283 group_name,
2284 group_instance_id,
2285 placement_groups,
2286 cloud_config,
2287 restart_mode=self.restart_mode,
2288 )
2289 if vnfr.id in self._vnfrs:
2290 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
2291 raise NetworkServiceRecordError(err)
2292
2293 self._vnfrs[vnfr.id] = vnfr
2294 self._nsm.vnfrs[vnfr.id] = vnfr
2295
2296 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
2297
2298 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2299 vnfr.name,
2300 vnfr.id)
2301
2302 return vnfr
2303
2304 def create_param_pools(self):
2305 for param_pool in self.nsd_msg.parameter_pool:
2306 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2307
2308 start_value = param_pool.range.start_value
2309 end_value = param_pool.range.end_value
2310 if end_value < start_value:
2311 raise NetworkServiceRecordError(
2312 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2313 start_value, end_value
2314 )
2315 )
2316
2317 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2318 self._log,
2319 param_pool.name,
2320 range(start_value, end_value)
2321 )
2322
2323 @asyncio.coroutine
2324 def fetch_vnfr(self, vnfr_path):
2325 """ Fetch VNFR record """
2326 vnfr = None
2327 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2328 vnfr_path, self.id)
2329 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2330
2331 for ent in res_iter:
2332 res = yield from ent
2333 vnfr = res.result
2334
2335 return vnfr
2336
2337 @asyncio.coroutine
2338 def instantiate_vnfs(self, vnfrs, scaleout=False):
2339 """
2340 This function instantiates VNFs for every VNF in this Network Service
2341 """
2342 @asyncio.coroutine
2343 def instantiate_vnf(vnf):
2344 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2345 vnfd_id = vnf.vnfr_msg.vnfd.id
2346 for dependency_vnf in dependencies[vnfd_id]:
2347 while dependency_vnf not in self.instantiated:
2348 yield from asyncio.sleep(1, loop=self._loop)
2349
2350 yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout)
2351 self.instantiated.add(vnfd_id)
2352
2353 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2354 dependencies = collections.defaultdict(list)
2355 for dependency_vnf in self._nsr_cfg_msg.nsd.vnf_dependency:
2356 dependencies[dependency_vnf.vnf_source_ref].append(dependency_vnf.vnf_depends_on_ref)
2357
2358 # The dictionary copy is to ensure that if a terminate is initiated right after instantiation, the
2359 # Runtime error for "dictionary changed size during iteration" does not occur.
2360 # vnfrs - 'dict_values' object
2361 # vnfrs_copy - list object
2362 vnfrs_copy = list(vnfrs)
2363 tasks = []
2364 for vnf in vnfrs_copy:
2365 vnf_task = self._loop.create_task(instantiate_vnf(vnf))
2366 tasks.append(vnf_task)
2367
2368 if len(tasks) > 0:
2369 self._log.debug("Waiting for %s instantiate_vnf tasks to complete", len(tasks))
2370 done, pending = yield from asyncio.wait(tasks, loop=self._loop, timeout=30)
2371 if pending:
2372 self._log.error("The Instantiate vnf task timed out after 30 seconds.")
2373 raise VirtualNetworkFunctionRecordError("Task tied out : ", pending)
2374
2375 @asyncio.coroutine
2376 def instantiate_vnffgs(self):
2377 """
2378 This function instantiates VNFFGs for every VNFFG in this Network Service
2379 """
2380 self._log.debug("Instantiating %u VNFFGs in NS %s",
2381 len(self.nsd_msg.vnffgd), self.id)
2382 for _, vnfr in self.vnfrs.items():
2383 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2384 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2385 yield from asyncio.sleep(2, loop=self._loop)
2386 if vnfr.state == VnfRecordState.ACTIVE:
2387 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2388 continue
2389 else:
2390 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2391 self._vnffgr_state = VnffgRecordState.FAILED
2392 return
2393
2394 self._log.info("Waiting for 90 seconds for VMs to come up")
2395 yield from asyncio.sleep(90, loop=self._loop)
2396 self._log.info("Starting VNFFG orchestration")
2397 for vnffg in self._vnffgrs.values():
2398 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2399 yield from vnffg.instantiate()
2400
2401 @asyncio.coroutine
2402 def instantiate_scaling_instances(self, config_xact):
2403 """ Instantiate any default scaling instances in this Network Service """
2404 for group in self._scaling_groups.values():
2405 for i in range(group.min_instance_count):
2406 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2407 yield from self.create_scale_group_instance(
2408 group.name, i, config_xact, is_default=True
2409 )
2410
2411 for group_msg in self._nsr_cfg_msg.scaling_group:
2412 if group_msg.scaling_group_name_ref != group.name:
2413 continue
2414
2415 for instance in group_msg.instance:
2416 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2417 yield from self.create_scale_group_instance(
2418 group.name, instance.id, config_xact, is_default=False
2419 )
2420
2421 def has_scaling_instances(self):
2422 """ Return boolean indicating if the network service has default scaling groups """
2423 for group in self._scaling_groups.values():
2424 if group.min_instance_count > 0:
2425 return True
2426
2427 for group_msg in self._nsr_cfg_msg.scaling_group:
2428 if len(group_msg.instance) > 0:
2429 return True
2430
2431 return False
2432
2433 @asyncio.coroutine
2434 def publish(self):
2435 """ This function publishes this NSR """
2436
2437 self._nsr_msg = self.create_msg()
2438
2439 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2440 self.nsr_xpath,
2441 self._nsr_msg)
2442
2443 if self._debug_running:
2444 self._log.debug("Publishing NSR in RUNNING state!")
2445 #raise()
2446
2447 yield from self._nsm.nsr_handler.update(None, self.nsr_xpath, self._nsr_msg)
2448 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2449 self._debug_running = True
2450
2451 @asyncio.coroutine
2452 def unpublish(self, xact=None):
2453 """ Unpublish this NSR object """
2454 self._log.debug("Unpublishing Network service id %s", self.id)
2455
2456 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2457
2458 @property
2459 def nsr_xpath(self):
2460 """ Returns the xpath associated with this NSR """
2461 return self._project.add_project((
2462 "D,/nsr:ns-instance-opdata" +
2463 "/nsr:nsr[nsr:ns-instance-config-ref={}]"
2464 ).format(quoted_key(self.id)))
2465
2466 @staticmethod
2467 def xpath_from_nsr(nsr):
2468 """ Returns the xpath associated with this NSR op data"""
2469 return (NetworkServiceRecord.XPATH +
2470 "[nsr:ns-instance-config-ref={}]").format(quoted_key(nsr.id))
2471
2472 @property
2473 def nsd_xpath(self):
2474 """ Return NSD config xpath."""
2475 return self._project.add_project((
2476 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id={}]"
2477 ).format(quoted_key(self.nsd_id)))
2478
2479 @asyncio.coroutine
2480 def instantiate(self, config_xact):
2481 """"Instantiates a NetworkServiceRecord.
2482
2483 This function instantiates a Network service
2484 which involves the following steps,
2485
2486 * Instantiate every VL in NSD by sending create VLR request to DTS.
2487 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2488 * Publish the NSR details to DTS
2489
2490 Arguments:
2491 nsr: The NSR configuration request containing nsr-id and nsd
2492 config_xact: The configuration transaction which initiated the instatiation
2493
2494 Raises:
2495 NetworkServiceRecordError if the NSR creation fails
2496
2497 Returns:
2498 No return value
2499 """
2500
2501 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2502
2503 # Move the state to INIITALIZING
2504 self.set_state(NetworkServiceRecordState.INIT)
2505
2506 event_descr = "Instantiation Request Received NSR Id: %s, NS Name: %s" % (self.id, self.name)
2507 self.record_event("instantiating", event_descr)
2508
2509 # Find the NSD
2510 self._nsd = self._nsr_cfg_msg.nsd
2511
2512 # Merge any config and initial config primitive values
2513 self.config_store.merge_nsd_config(self.nsd_msg, self._project.name)
2514 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2515
2516 event_descr = "Fetched NSD with descriptor id %s, NS Name: %s" % (self.nsd_id, self.name)
2517 self.record_event("nsd-fetched", event_descr)
2518
2519 if self._nsd is None:
2520 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2521 self._log.debug(msg, self.nsd_id, self.id)
2522 raise NetworkServiceRecordError(self)
2523
2524 self._log.debug("Got nsd result %s", self._nsd)
2525
2526 # Substitute any input parameters
2527 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2528
2529 # Create the record
2530 yield from self.create(config_xact)
2531
2532 # Publish the NSR to DTS
2533 yield from self.publish()
2534
2535 @asyncio.coroutine
2536 def do_instantiate():
2537 """
2538 Instantiate network service
2539 """
2540 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2541 self.id, self.nsd_id)
2542
2543 # instantiate the VLs
2544 event_descr = ("Instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2545 (len(self.nsd_msg.vld), self.id, self.name))
2546 self.record_event("begin-external-vls-instantiation", event_descr)
2547
2548 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2549
2550 # Publish the NSR to DTS
2551 yield from self.publish()
2552
2553 if self._ns_terminate_received:
2554 self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-external-vls-instantiation.")
2555 # Setting this flag as False again as this is a state where neither VL or VNF have been instantiated.
2556 self._ns_terminate_received = False
2557 # At this stage only ns-instance opdata is published. Cleaning up the record.
2558 yield from self.unpublish()
2559 return
2560
2561 yield from self.instantiate_vls()
2562
2563 event_descr = ("Finished instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2564 (len(self.nsd_msg.vld), self.id, self.name))
2565 self.record_event("end-external-vls-instantiation", event_descr)
2566
2567 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2568
2569 # Publish the NSR to DTS
2570 yield from self.publish()
2571
2572 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2573 self.id, self.nsd_id)
2574
2575 # instantiate the VNFs
2576 event_descr = ("Instantiating %s VNFS for NSR id: %s, NS Name: %s " %
2577 (len(self.nsd_msg.constituent_vnfd), self.id, self.name))
2578
2579 self.record_event("begin-vnf-instantiation", event_descr)
2580
2581 if self._ns_terminate_received:
2582 self._log.debug("Terminate Received. Interrupting Instantiation at event : end-external-vls-instantiation.")
2583 return
2584
2585 yield from self.instantiate_vnfs(self._vnfrs.values())
2586
2587 self._log.debug(" Finished instantiating %d VNFs for NSR id: %s, NS Name: %s",
2588 len(self.nsd_msg.constituent_vnfd), self.id, self.name)
2589
2590 event_descr = ("Finished instantiating %s VNFs for NSR id: %s, NS Name: %s" %
2591 (len(self.nsd_msg.constituent_vnfd), self.id, self.name))
2592 self.record_event("end-vnf-instantiation", event_descr)
2593
2594 # Publish the NSR to DTS
2595 yield from self.publish()
2596
2597 if len(self.vnffgrs) > 0:
2598 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2599 event_descr = ("Instantiating %s VNFFGS for NSR id: %s, NS Name: %s" %
2600 (len(self.nsd_msg.vnffgd), self.id, self.name))
2601
2602 self.record_event("begin-vnffg-instantiation", event_descr)
2603
2604 if self._ns_terminate_received:
2605 self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-vnffg-instantiation.")
2606 return
2607
2608 yield from self.instantiate_vnffgs()
2609
2610 event_descr = ("Finished instantiating %s VNFFGDs for NSR id: %s, NS Name: %s" %
2611 (len(self.nsd_msg.vnffgd), self.id, self.name))
2612 self.record_event("end-vnffg-instantiation", event_descr)
2613
2614 if self.has_scaling_instances():
2615 event_descr = ("Instantiating %s Scaling Groups for NSR id: %s, NS Name: %s" %
2616 (len(self._scaling_groups), self.id, self.name))
2617
2618 self.record_event("begin-scaling-group-instantiation", event_descr)
2619
2620 if self._ns_terminate_received:
2621 self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-scaling-group-instantiation.")
2622 return
2623
2624 yield from self.instantiate_scaling_instances(config_xact)
2625 self.record_event("end-scaling-group-instantiation", event_descr)
2626
2627 # Give the plugin a chance to deploy the network service now that all
2628 # virtual links and vnfs are instantiated
2629 yield from self.nsm_plugin.deploy(self._nsr_msg)
2630
2631 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s], for NS[%s]",
2632 self.id, self.nsd_id, self.name)
2633
2634 # Publish the NSR to DTS
2635 yield from self.publish()
2636
2637 self._log.debug("Published NSR...... nsr[%s], nsd[%s], for NS[%s]",
2638 self.id, self.nsd_id, self.name)
2639
2640 def on_instantiate_done(fut):
2641 # If the do_instantiate fails, then publish NSR with failed result
2642 e = fut.exception()
2643 if e is not None:
2644 import traceback, sys
2645 print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True)
2646 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(e))
2647 self._loop.create_task(self.instantiation_failed(failed_reason=str(e)))
2648
2649 instantiate_task = self._loop.create_task(do_instantiate())
2650 instantiate_task.add_done_callback(on_instantiate_done)
2651
2652 @asyncio.coroutine
2653 def set_config_status(self, status, status_details=None):
2654 if self.config_status != status:
2655 self._log.debug("Updating NSR {} status for {} to {}".
2656 format(self.name, self.config_status, status))
2657 self._config_status = status
2658 self._config_status_details = status_details
2659
2660 if self._config_status == NsrYang.ConfigStates.FAILED:
2661 self.record_event("config-failed", "NS configuration failed",
2662 evt_details=self._config_status_details)
2663
2664 yield from self.publish()
2665
2666 if status == NsrYang.ConfigStates.TERMINATE:
2667 yield from self.terminate_ns_cont()
2668
2669 @asyncio.coroutine
2670 def is_active(self):
2671 """ This NS is active """
2672 self.set_state(NetworkServiceRecordState.RUNNING)
2673 if self._is_active:
2674 return
2675
2676 # Publish the NSR to DTS
2677 self._log.debug("Network service %s is active ", self.id)
2678 self._is_active = True
2679
2680 event_descr = "NSR in running state for NSR id: %s, NS Name: %s" % (self.id, self.name)
2681 self.record_event("ns-running", event_descr)
2682
2683 yield from self.publish()
2684
2685 @asyncio.coroutine
2686 def instantiation_failed(self, failed_reason=None):
2687 """ The NS instantiation failed"""
2688 self._log.error("Network service id:%s, name:%s instantiation failed",
2689 self.id, self.name)
2690 self.set_state(NetworkServiceRecordState.FAILED)
2691 self._is_failed = True
2692
2693 event_descr = "Instantiation of NS %s - %s failed" % (self.id, self.name)
2694 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2695
2696 # Publish the NSR to DTS
2697 yield from self.publish()
2698
2699 @asyncio.coroutine
2700 def terminate_vnfrs(self, vnfrs, scalein=False):
2701 """ Terminate VNFRS in this network service """
2702 self._log.debug("Terminating VNFs in network service %s - %s", self.id, self.name)
2703 vnfr_ids = []
2704 scaleIn = scalein
2705 for vnfr in list(vnfrs):
2706 self._log.debug("Terminating VNFs in network service %s %s", vnfr.id, self.id)
2707 # The below check is added for determining which of the VNFRS are scaling ones
2708 # under OPENMANO. Need to pass scalein True when terminate received to OPENAMNO
2709 # Plugin.
2710 if isinstance(self.nsm_plugin, openmano_nsm.OpenmanoNsPlugin):
2711 for scaling_group in self._scaling_groups.values():
2712 scaling_instances = scaling_group.create_record_msg().instance
2713 for sc in scaling_instances:
2714 if vnfr.id in sc.vnfrs:
2715 scaleIn = True
2716 self._log.debug("Found a Scaling VNF for Openmano during Terminate")
2717
2718 yield from self.nsm_plugin.terminate_vnf(self, vnfr, scalein=scaleIn)
2719 scaleIn = scalein
2720 vnfr_ids.append(vnfr.id)
2721
2722 for vnfr_id in vnfr_ids:
2723 self._vnfrs.pop(vnfr_id, None)
2724
2725 @asyncio.coroutine
2726 def terminate(self):
2727 """Start terminate of a NetworkServiceRecord."""
2728 # Move the state to TERMINATE
2729 self.set_state(NetworkServiceRecordState.TERMINATE)
2730 event_descr = "Terminate being processed for NS Id: %s, NS Name: %s" % (self.id, self.name)
2731 self.record_event("terminate", event_descr)
2732 self._log.debug("Terminating network service id: %s, NS Name: %s", self.id, self.name)
2733
2734 # Adding the NSR ID on terminate Evet. This will be checked to halt the instantiation if not already finished.
2735 self._ns_terminate_received = True
2736
2737 yield from self.publish()
2738
2739 if self._is_failed:
2740 # IN case the instantiation failed, then trigger a cleanup immediately
2741 # don't wait for Cfg manager, as it will have no idea of this NSR.
2742 # Due to the failure
2743 yield from self.terminate_ns_cont()
2744
2745
2746 @asyncio.coroutine
2747 def terminate_ns_cont(self):
2748 """Config script related to terminate finished, continue termination"""
2749 def terminate_vnffgrs():
2750 """ Terminate VNFFGRS in this network service """
2751 self._log.debug("Terminating VNFFGRs in network service %s - %s", self.id, self.name)
2752 for vnffgr in self.vnffgrs.values():
2753 yield from vnffgr.terminate()
2754
2755 def terminate_vlrs():
2756 """ Terminate VLRs in this netork service """
2757 self._log.debug("Terminating VLs in network service %s - %s", self.id, self.name)
2758 for vlr_id, vlr in self.vlrs.items():
2759 yield from self.nsm_plugin.terminate_vl(vlr)
2760 vlr.state = VlRecordState.TERMINATED
2761
2762 # Move the state to VNF_TERMINATE_PHASE
2763 self._log.debug("Terminating VNFFGs in NS ID: %s, NS Name: %s", self.id, self.name)
2764 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2765 event_descr = "Terminating VNFFGS in NS Id: %s, NS Name: %s" % (self.id, self.name)
2766 self.record_event("terminating-vnffgss", event_descr)
2767 yield from terminate_vnffgrs()
2768
2769 # Move the state to VNF_TERMINATE_PHASE
2770 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2771 event_descr = "Terminating VNFS in NS Id: %s, NS Name: %s" % (self.id, self.name)
2772 self.record_event("terminating-vnfs", event_descr)
2773 yield from self.terminate_vnfrs(self.vnfrs.values())
2774
2775 # Move the state to VL_TERMINATE_PHASE
2776 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2777 event_descr = "Terminating VLs in NS Id: %s, NS Name: %s" % (self.id, self.name)
2778 self.record_event("terminating-vls", event_descr)
2779 yield from terminate_vlrs()
2780 yield from self.nsm_plugin.terminate_ns(self)
2781 # Remove the generated SSH key
2782 if self._ssh_key_file:
2783 p = urlparse(self._ssh_key_file)
2784 if p[0] == 'file':
2785 path = os.path.dirname(p[2])
2786 self._log.debug("NSR {}: Removing keys in {}".format(self.name,
2787 path))
2788 shutil.rmtree(path, ignore_errors=True)
2789
2790 # Move the state to TERMINATED
2791 self.set_state(NetworkServiceRecordState.TERMINATED)
2792 event_descr = "Terminated NS Id: %s, NS Name: %s" % (self.id, self.name)
2793 self.record_event("terminated", event_descr)
2794
2795 # Unpublish the NSR record
2796 self._log.debug("Unpublishing the network service %s - %s", self.id, self.name)
2797 yield from self.unpublish()
2798
2799 # Finaly delete the NS instance from this NS Manager
2800 self._log.debug("Deleting the network service %s - %s", self.id, self.name)
2801 self.nsm.delete_nsr(self.id)
2802
2803 def enable(self):
2804 """"Enable a NetworkServiceRecord."""
2805 pass
2806
2807 def disable(self):
2808 """"Disable a NetworkServiceRecord."""
2809 pass
2810
2811 def map_config_status(self):
2812 self._log.debug("Config status for ns {} is {}".
2813 format(self.name, self._config_status))
2814 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2815 return 'configuring'
2816 if self._config_status == NsrYang.ConfigStates.FAILED:
2817 return 'failed'
2818 return 'configured'
2819
2820 def vl_phase_completed(self):
2821 """ Are VLs created in this NS?"""
2822 return self._vl_phase_completed
2823
2824 def vnf_phase_completed(self):
2825 """ Are VLs created in this NS?"""
2826 return self._vnf_phase_completed
2827
2828 def create_msg(self):
2829 """ The network serice record as a message """
2830 nsr_dict = {"ns_instance_config_ref": self.id}
2831 nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2832 #nsr.datacenter = self.cloud_account_name
2833 nsr.sdn_account = self._sdn_account_name
2834 nsr.name_ref = self.name
2835 nsr.nsd_ref = self.nsd_id
2836 nsr.nsd_name_ref = self.nsd_msg.name
2837 nsr.operational_events = self._op_status.msg
2838 nsr.operational_status = self._op_status.yang_str()
2839 nsr.config_status = self.map_config_status()
2840 nsr.config_status_details = self._config_status_details
2841 nsr.create_time = self._create_time
2842 nsr.uptime = int(time.time()) - self._create_time
2843
2844 # Added for OpenMano
2845
2846 nsr.orchestration_progress.networks.total = len(self.nsd_msg.vld)
2847 if isinstance(self.nsm_plugin, openmano_nsm.OpenmanoNsPlugin):
2848 # Taking the last update by OpenMano
2849 nsr.orchestration_progress.networks.active = self.nsm_plugin._openmano_nsrs[self.id]._active_nets
2850 else:
2851 nsr.orchestration_progress.networks.active = self._active_networks
2852 no_of_vdus = 0
2853 for vnfr_id, vnfr in self._vnfrs.items():
2854 no_of_vdus += len(vnfr.vnfd.vdu)
2855
2856 nsr.orchestration_progress.vms.total = no_of_vdus
2857 if isinstance(self.nsm_plugin, openmano_nsm.OpenmanoNsPlugin):
2858 # Taking the last update by OpenMano
2859 nsr.orchestration_progress.vms.active = self.nsm_plugin._openmano_nsrs[self.id]._active_vms
2860 else:
2861 nsr.orchestration_progress.vms.active = self._active_vms
2862
2863 # Generated SSH key
2864 if self._ssh_pub_key:
2865 nsr.ssh_key_generated.private_key_file = self._ssh_key_file
2866 nsr.ssh_key_generated.public_key = self._ssh_pub_key
2867
2868 for cfg_prim in self.nsd_msg.service_primitive:
2869 cfg_prim = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2870 cfg_prim.as_dict())
2871 nsr.service_primitive.append(cfg_prim)
2872
2873 for init_cfg in self.nsd_msg.initial_service_primitive:
2874 prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_InitialServicePrimitive.from_dict(
2875 init_cfg.as_dict())
2876 nsr.initial_service_primitive.append(prim)
2877
2878 for term_cfg in self.nsd_msg.terminate_service_primitive:
2879 prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_TerminateServicePrimitive.from_dict(
2880 term_cfg.as_dict())
2881 nsr.terminate_service_primitive.append(prim)
2882
2883 if self.vl_phase_completed():
2884 for vlr_id, vlr in self.vlrs.items():
2885 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2886
2887 if self.vnf_phase_completed():
2888 for vnfr_id in self.vnfrs:
2889 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2890 for vnffgr in self.vnffgrs.values():
2891 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2892 for scaling_group in self._scaling_groups.values():
2893 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2894
2895 return nsr
2896
2897 def all_vnfs_active(self):
2898 """ Are all VNFS in this NS active? """
2899 for _, vnfr in self.vnfrs.items():
2900 if vnfr.active is not True:
2901 return False
2902 return True
2903
2904 @asyncio.coroutine
2905 def update_state(self):
2906 """ Re-evaluate this NS's state """
2907 curr_state = self._op_status.state
2908
2909 # This means that the terminate has been fired before the NS was UP.
2910 if self._ns_terminate_received:
2911 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
2912 self._ns_terminate_received = False
2913 yield from self.terminate_ns_cont()
2914 else:
2915 if curr_state == NetworkServiceRecordState.TERMINATED:
2916 self._log.debug("NS (%s - %s) in terminated state, not updating state", self.id, self.name)
2917 return
2918
2919 new_state = NetworkServiceRecordState.RUNNING
2920 self._log.debug("Received update_state for nsr: %s, curr-state: %s",
2921 self.id, curr_state)
2922
2923 # check all VLs
2924 if (isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin)):
2925 for vlr_id, vl in self.vlrs.items():
2926 self._log.debug("VLR %s state %s", vlr_id, vl.state)
2927 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2928 continue
2929 elif vl.state == VlRecordState.FAILED:
2930 if vl.prev_state != vl.state:
2931 event_descr = "Instantiation of VL %s failed" % vl.id
2932 event_error_details = vl.state_failed_reason
2933 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2934 vl.prev_state = vl.state
2935 new_state = NetworkServiceRecordState.FAILED
2936 break
2937 else:
2938 self._log.debug("VL already in failed state")
2939 else:
2940 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2941 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2942 break
2943
2944 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2945 new_state = NetworkServiceRecordState.VL_TERMINATE
2946 break
2947
2948 # Check all the VNFRs are present
2949 if new_state == NetworkServiceRecordState.RUNNING:
2950 for _, vnfr in self.vnfrs.items():
2951 self._log.debug("VNFR state %s", vnfr.state)
2952 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2953 active_vdus = 0
2954 for vnfr in self.vnfrs:
2955 active_vdus += self.nsm._vnfrs[vnfr]._active_vdus
2956
2957 if self._active_vms != active_vdus:
2958 self._active_vms = active_vdus
2959 yield from self.publish()
2960
2961 continue
2962
2963 elif vnfr.state == VnfRecordState.FAILED:
2964 if vnfr._prev_state != vnfr.state:
2965 event_descr = "Instantiation of VNF %s for NS: %s failed" % (vnfr.id, self.name)
2966 event_error_details = vnfr.state_failed_reason
2967 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2968 vnfr.set_state(VnfRecordState.FAILED)
2969 else:
2970 self._log.info("VNF state did not change, curr=%s, prev=%s",
2971 vnfr.state, vnfr._prev_state)
2972 new_state = NetworkServiceRecordState.FAILED
2973 break
2974 else:
2975 self._log.debug("VNF %s in NSR %s - %s is still not active; current state is: %s",
2976 vnfr.id, self.id, self.name, vnfr.state)
2977 new_state = curr_state
2978
2979 # If new state is RUNNING; check VNFFGRs are also active
2980 if new_state == NetworkServiceRecordState.RUNNING:
2981 for _, vnffgr in self.vnffgrs.items():
2982 self._log.debug("Checking vnffgr state for nsr %s is: %s",
2983 self.id, vnffgr.state)
2984 if vnffgr.state == VnffgRecordState.ACTIVE:
2985 continue
2986 elif vnffgr.state == VnffgRecordState.FAILED:
2987 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2988 self.record_event("vnffg-failed", event_descr)
2989 new_state = NetworkServiceRecordState.FAILED
2990 break
2991 else:
2992 self._log.info("VNFFGR %s in NSR %s - %s is still not active; current state is: %s",
2993 vnffgr.id, self.id, self.name, vnffgr.state)
2994 new_state = curr_state
2995
2996 # Update all the scaling group instance operational status to
2997 # reflect the state of all VNFR within that instance
2998 yield from self._update_scale_group_instances_status()
2999
3000 for _, group in self._scaling_groups.items():
3001 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
3002 new_state = NetworkServiceRecordState.SCALING_OUT
3003 break
3004 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
3005 new_state = NetworkServiceRecordState.SCALING_IN
3006 break
3007
3008 if new_state != curr_state:
3009 self._log.debug("Changing state of Network service %s - %s from %s to %s",
3010 self.id, self.name, curr_state, new_state)
3011 if new_state == NetworkServiceRecordState.RUNNING:
3012 yield from self.is_active()
3013 elif new_state == NetworkServiceRecordState.FAILED:
3014 # If the NS is already active and we entered scaling_in, scaling_out,
3015 # do not mark the NS as failing if scaling operation failed.
3016 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
3017 NetworkServiceRecordState.SCALING_IN] and self._is_active:
3018 new_state = NetworkServiceRecordState.RUNNING
3019 self.set_state(new_state)
3020 else:
3021 yield from self.instantiation_failed()
3022 else:
3023 self.set_state(new_state)
3024
3025 yield from self.publish()
3026
3027 def vl_instantiation_state(self):
3028 """ Check if all VLs in this NS are active """
3029 for vl_id, vlr in self.vlrs.items():
3030 if vlr.state == VlRecordState.ACTIVE:
3031 continue
3032 elif vlr.state == VlRecordState.FAILED:
3033 return VlRecordState.FAILED
3034 elif vlr.state == VlRecordState.TERMINATED:
3035 return VlRecordState.TERMINATED
3036 elif vlr.state == VlRecordState.INSTANTIATION_PENDING:
3037 return VlRecordState.INSTANTIATION_PENDING
3038 else:
3039 self._log.error("vlr %s still in state %s", vlr, vlr.state)
3040 raise VirtualLinkRecordError("Invalid state %s" %(vlr.state))
3041 return VlRecordState.ACTIVE
3042
3043 def vl_instantiation_successful(self):
3044 """ Mark that all VLs in this NS are active """
3045 if self._vls_ready.is_set():
3046 self._log.error("NSR id %s, vls_ready is already set", self.id)
3047
3048 if self.vl_instantiation_state() == VlRecordState.ACTIVE:
3049 self._log.debug("NSR id %s, All %d vlrs are in active state %s",
3050 self.id, len(self.vlrs), self.vl_instantiation_state)
3051 self._vls_ready.set()
3052
3053 def vlr_event(self, vlr, action):
3054 self._log.debug("Received VLR %s with action:%s", vlr, action)
3055
3056 if vlr.id not in self.vlrs:
3057 self._log.error("VLR %s:%s received for unknown id, state:%s",
3058 vlr.id, vlr.name, vlr.operational_status)
3059 return
3060
3061 vlr_local = self.vlrs[vlr.id]
3062
3063 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3064 if vlr.operational_status == 'running':
3065 vlr_local.set_state_from_op_status(vlr.operational_status)
3066 self._active_networks += 1
3067 self._log.info("VLR %s:%s moving to active state",
3068 vlr.id,vlr.name)
3069 elif vlr.operational_status == 'failed':
3070 vlr_local.set_state_from_op_status(vlr.operational_status)
3071 vlr_local.state_failed_reason = vlr.operational_status_details
3072 asyncio.ensure_future(self.update_state(), loop=self._loop)
3073 self._log.info("VLR %s:%s moving to failed state",
3074 vlr.id,vlr.name)
3075 else:
3076 self._log.warning("VLR %s:%s received state:%s",
3077 vlr.id, vlr.name, vlr.operational_status)
3078
3079 if isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin):
3080 self.vl_instantiation_successful()
3081
3082 # self.update_state() is responsible for publishing the NSR state. Its being called by vlr_event and update_vnfr.
3083 # The call from vlr_event occurs only if vlr reaches a failed state. Hence implementing the check here to handle
3084 # ns terminate received after other vlr states as vl-alloc-pending, vl-init, running.
3085 if self._ns_terminate_received:
3086 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
3087 if vlr.operational_status in ['running', 'failed']:
3088 self._ns_terminate_received = False
3089 asyncio.ensure_future(self.terminate_ns_cont(), loop=self._loop)
3090
3091
3092 class InputParameterSubstitution(object):
3093 """
3094 This class is responsible for substituting input parameters into an NSD.
3095 """
3096
3097 def __init__(self, log, project):
3098 """Create an instance of InputParameterSubstitution
3099
3100 Arguments:
3101 log - a logger for this object to use
3102
3103 """
3104 self.log = log
3105 self.project = project
3106
3107 def _fix_xpath(self, xpath):
3108 # Fix the parameter.xpath to include project and correct namespace
3109 self.log.debug("Provided xpath: {}".format(xpath))
3110 #Split the xpath at the /
3111 attrs = xpath.split('/')
3112 new_xp = attrs[0]
3113 for attr in attrs[1:]:
3114 new_ns = 'project-nsd'
3115 name = attr
3116 if ':' in attr:
3117 # Includes namespace
3118 ns, name = attr.split(':', 2)
3119 if ns == "rw-nsd":
3120 ns = "rw-project-nsd"
3121
3122 new_xp = new_xp + '/' + new_ns + ':' + name
3123
3124 updated_xpath = self.project.add_project(new_xp)
3125
3126 self.log.error("Updated xpath: {}".format(updated_xpath))
3127 return updated_xpath
3128
3129 def __call__(self, nsd, nsr_config):
3130 """Substitutes input parameters from the NSR config into the NSD
3131
3132 This call modifies the provided NSD with the input parameters that are
3133 contained in the NSR config.
3134
3135 Arguments:
3136 nsd - a GI NSD object
3137 nsr_config - a GI NSR config object
3138
3139 """
3140 if nsd is None or nsr_config is None:
3141 return
3142
3143 # Create a lookup of the xpath elements that this descriptor allows
3144 # to be modified
3145 optional_input_parameters = set()
3146 for input_parameter in nsd.input_parameter_xpath:
3147 optional_input_parameters.add(input_parameter.xpath)
3148
3149 # Apply the input parameters to the descriptor
3150 if nsr_config.input_parameter:
3151 for param in nsr_config.input_parameter:
3152 if param.xpath not in optional_input_parameters:
3153 msg = "tried to set an invalid input parameter ({})"
3154 self.log.error(msg.format(param.xpath))
3155 continue
3156
3157 self.log.debug(
3158 "input-parameter:{} = {}".format(
3159 param.xpath,
3160 param.value,
3161 )
3162 )
3163
3164 try:
3165 xp = self._fix_xpath(param.xpath)
3166 xpath.setxattr(nsd, xp, param.value)
3167
3168 except Exception as e:
3169 self.log.exception(e)
3170
3171
3172 class VnfInputParameterSubstitution(object):
3173 """
3174 This class is responsible for substituting input parameters into a VNFD.
3175 """
3176
3177 def __init__(self, log, const_vnfd, project):
3178 """Create an instance of VnfInputParameterSubstitution
3179
3180 Arguments:
3181 log - a logger for this object to use
3182 const_vnfd - id refs for vnfs in a ns
3183 project - project for the VNFs
3184 """
3185
3186 self.log = log
3187 self.member_vnf_index = const_vnfd.member_vnf_index
3188 self.vnfd_id_ref = const_vnfd.vnfd_id_ref
3189 self.project = project
3190
3191 def __call__(self, vnfr, nsr_config):
3192 """Substitutes vnf input parameters from the NSR config into the VNFD
3193
3194 This call modifies the provided VNFD with the input parameters that are
3195 contained in the NSR config.
3196
3197 Arguments:
3198 vnfr - a GI VNFR object
3199 nsr_config - a GI NSR Config object
3200
3201 """
3202
3203 def compose_xpath(xpath, id):
3204 prefix = "/rw-project:project[rw-project:name={}]".format(quoted_key(self.project.name)) + \
3205 "/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vnfd/".format(quoted_key(id))
3206
3207 suffix = '/'.join(xpath.split('/')[3:]).replace('vnfd', 'vnfr')
3208 return prefix + suffix
3209
3210 def substitute_xpath(ip_xpath, substitute_value, vnfr):
3211 vnfr_xpath = compose_xpath(ip_xpath, vnfr.id)
3212
3213 try:
3214 verify_xpath_wildcarded = xpath.getxattr(vnfr, vnfr_xpath)
3215
3216 self.log.debug(
3217 "vnf-input-parameter:{} = {}, for VNF : [member-vnf-index : {}, vnfd-id-ref : {}]".format(
3218 ip_xpath,
3219 substitute_value,
3220 self.member_vnf_index,
3221 self.vnfd_id_ref
3222 )
3223 )
3224 try:
3225 xpath.setxattr(vnfr, vnfr_xpath, substitute_value)
3226
3227 except Exception as e:
3228 self.log.exception(e)
3229
3230 except Exception as e:
3231 self.log.exception("Wildcarded xpath {} is listy in nature. Can not update. Exception => {}"
3232 .format(ip_xpath, e))
3233
3234 if vnfr is None or nsr_config is None:
3235 return
3236
3237 optional_input_parameters = set()
3238 for input_parameter in nsr_config.nsd.input_parameter_xpath:
3239 optional_input_parameters.add(input_parameter.xpath)
3240
3241 # Apply the input parameters to the vnfr
3242 if nsr_config.vnf_input_parameter:
3243 for param in nsr_config.vnf_input_parameter:
3244 if (param.member_vnf_index_ref == self.member_vnf_index and param.vnfd_id_ref == self.vnfd_id_ref):
3245 if param.input_parameter:
3246 for ip in param.input_parameter:
3247 if ip.xpath not in optional_input_parameters:
3248 msg = "Substitution Failed. Tried to set an invalid vnf input parameter ({}) for vnf [member-vnf-index : {}, vnfd-id-ref : {}]"
3249 self.log.error(msg.format(ip.xpath, self.member_vnf_index, self.vnfd_id_ref))
3250 continue
3251
3252 try:
3253 substitute_xpath(ip.xpath, ip.value, vnfr)
3254 except Exception as e:
3255 self.log.exception(e)
3256 else:
3257 self.log.debug("Substituting Xpaths with default Values")
3258 for input_parameter in nsr_config.nsd.input_parameter_xpath:
3259 if input_parameter.default_value is not None:
3260 try:
3261 if "vnfd-catalog" in input_parameter.xpath:
3262 substitute_xpath(input_parameter.xpath, input_parameter.default_value, vnfr)
3263 except Exception as e:
3264 self.log.exception(e)
3265
3266
3267 class NetworkServiceDescriptor(object):
3268 """
3269 Network service descriptor class
3270 """
3271
3272 def __init__(self, dts, log, loop, nsd, nsm):
3273 self._dts = dts
3274 self._log = log
3275 self._loop = loop
3276
3277 self._nsd = nsd
3278 self._nsm = nsm
3279
3280 @property
3281 def id(self):
3282 """ Returns nsd id """
3283 return self._nsd.id
3284
3285 @property
3286 def name(self):
3287 """ Returns name of nsd """
3288 return self._nsd.name
3289
3290 @property
3291 def msg(self):
3292 """ Return the message associated with this NetworkServiceDescriptor"""
3293 return self._nsd
3294
3295 @staticmethod
3296 def path_for_id(nsd_id):
3297 """ Return path for the passed nsd_id"""
3298 return self._nsm._project.add_project(
3299 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id = '{}'".
3300 format(nsd_id))
3301
3302 def path(self):
3303 """ Return the message associated with this NetworkServiceDescriptor"""
3304 return NetworkServiceDescriptor.path_for_id(self.id)
3305
3306 def update(self, nsd):
3307 """ Update the NSD descriptor """
3308 self._nsd = nsd
3309
3310
3311 class NsdDtsHandler(object):
3312 """ The network service descriptor DTS handler """
3313 XPATH = "C,/project-nsd:nsd-catalog/project-nsd:nsd"
3314
3315 def __init__(self, dts, log, loop, nsm):
3316 self._dts = dts
3317 self._log = log
3318 self._loop = loop
3319 self._nsm = nsm
3320
3321 self._regh = None
3322 self._project = nsm._project
3323
3324 @property
3325 def regh(self):
3326 """ Return registration handle """
3327 return self._regh
3328
3329 @asyncio.coroutine
3330 def register(self):
3331 """ Register for Nsd create/update/delete/read requests from dts """
3332
3333 if self._regh:
3334 self._log.warning("DTS handler already registered for project {}".
3335 format(self._project.name))
3336 return
3337
3338 def on_apply(dts, acg, xact, action, scratch):
3339 """Apply the configuration"""
3340 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
3341 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
3342 xact, action)
3343
3344 if self._regh:
3345 # Create/Update an NSD record
3346 for cfg in self._regh.get_xact_elements(xact):
3347 # Only interested in those NSD cfgs whose ID was received in prepare callback
3348 if cfg.id in scratch.get('nsds', []) or is_recovery:
3349 self._nsm.update_nsd(cfg)
3350
3351 else:
3352 # This can happen if we do the deregister
3353 # during project delete before this is called
3354 self._log.debug("No reg handle for {} for project {}".
3355 format(self.__class__, self._project.name))
3356
3357 scratch.pop('nsds', None)
3358
3359 return RwTypes.RwStatus.SUCCESS
3360
3361 @asyncio.coroutine
3362 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3363 """ Prepare callback from DTS for NSD config """
3364
3365 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
3366 msg.id, msg)
3367
3368 fref = ProtobufC.FieldReference.alloc()
3369 fref.goto_whole_message(msg.to_pbcm())
3370
3371 if fref.is_field_deleted():
3372 # Delete an NSD record
3373 self._log.debug("Deleting NSD with id %s", msg.id)
3374 self._nsm.delete_nsd(msg.id)
3375 else:
3376 # Add this NSD to scratch to create/update in apply callback
3377 nsds = scratch.setdefault('nsds', [])
3378 nsds.append(msg.id)
3379 # acg._scratch['nsds'].append(msg.id)
3380
3381 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3382
3383 xpath = self._project.add_project(NsdDtsHandler.XPATH)
3384 self._log.debug(
3385 "Registering for NSD config using xpath: %s",
3386 xpath,
3387 )
3388
3389 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3390 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3391 # Need a list in scratch to store NSDs to create/update later
3392 # acg._scratch['nsds'] = list()
3393 self._regh = acg.register(
3394 xpath=xpath,
3395 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3396 on_prepare=on_prepare)
3397
3398 def deregister(self):
3399 self._log.debug("De-register NSD handler for project {}".
3400 format(self._project.name))
3401 if self._regh:
3402 self._regh.deregister()
3403 self._regh = None
3404
3405
3406 class VnfdDtsHandler(object):
3407 """ DTS handler for VNFD config changes """
3408 XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
3409
3410 def __init__(self, dts, log, loop, nsm):
3411 self._dts = dts
3412 self._log = log
3413 self._loop = loop
3414 self._nsm = nsm
3415 self._regh = None
3416 self._project = nsm._project
3417
3418 @property
3419 def regh(self):
3420 """ DTS registration handle """
3421 return self._regh
3422
3423 @asyncio.coroutine
3424 def register(self):
3425 """ Register for VNFD configuration"""
3426
3427 if self._regh:
3428 self._log.warning("DTS handler already registered for project {}".
3429 format(self._project.name))
3430 return
3431
3432 @asyncio.coroutine
3433 def on_apply(dts, acg, xact, action, scratch):
3434 """Apply the configuration"""
3435 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
3436 xact, action, scratch)
3437
3438 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
3439
3440 if self._regh:
3441 # Create/Update a VNFD record
3442 for cfg in self._regh.get_xact_elements(xact):
3443 # Only interested in those VNFD cfgs whose ID was received in prepare callback
3444 if cfg.id in scratch.get('vnfds', []) or is_recovery:
3445 self._nsm.update_vnfd(cfg)
3446
3447 for cfg in self._regh.elements:
3448 if cfg.id in scratch.get('deleted_vnfds', []):
3449 yield from self._nsm.delete_vnfd(cfg.id)
3450
3451 else:
3452 self._log.warning("Reg handle none for {} in project {}".
3453 format(self.__class__, self._project))
3454
3455 scratch.pop('vnfds', None)
3456 scratch.pop('deleted_vnfds', None)
3457
3458 @asyncio.coroutine
3459 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3460 """ on prepare callback """
3461 xpath = ks_path.to_xpath(NsdYang.get_schema())
3462 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
3463 xpath, xact_info.query_action, msg)
3464
3465 fref = ProtobufC.FieldReference.alloc()
3466 fref.goto_whole_message(msg.to_pbcm())
3467
3468 # Handle deletes in prepare_callback, but adds/updates in apply_callback
3469 if fref.is_field_deleted():
3470 self._log.debug("Adding msg to deleted field")
3471 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
3472 deleted_vnfds.append(msg.id)
3473 else:
3474 # Add this VNFD to scratch to create/update in apply callback
3475 vnfds = scratch.setdefault('vnfds', [])
3476 vnfds.append(msg.id)
3477
3478 try:
3479 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3480 except rift.tasklets.dts.ResponseError as e:
3481 self._log.warning(
3482 "VnfdDtsHandler in project {} with path {} for action {} failed: {}".
3483 format(self._project, xpath, xact_info.query_action, e))
3484
3485
3486 xpath = self._project.add_project(VnfdDtsHandler.XPATH)
3487 self._log.debug(
3488 "Registering for VNFD config using xpath {} for project {}"
3489 .format(xpath, self._project))
3490 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3491 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3492 # Need a list in scratch to store VNFDs to create/update later
3493 # acg._scratch['vnfds'] = list()
3494 # acg._scratch['deleted_vnfds'] = list()
3495 self._regh = acg.register(
3496 xpath=xpath,
3497 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
3498 on_prepare=on_prepare)
3499
3500 def deregister(self):
3501 self._log.debug("De-register VNFD handler for project {}".
3502 format(self._project.name))
3503 if self._regh:
3504 self._regh.deregister()
3505 self._regh = None
3506
3507
3508 class NsrRpcDtsHandler(object):
3509 """ The network service instantiation RPC DTS handler """
3510 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
3511 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
3512 NETCONF_IP_ADDRESS = "127.0.0.1"
3513 NETCONF_PORT = 2022
3514 RESTCONF_PORT = 8008
3515 NETCONF_USER = "@rift"
3516 NETCONF_PW = "rift"
3517 REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1",
3518 RESTCONF_PORT)
3519
3520 def __init__(self, dts, log, loop, nsm):
3521 self._dts = dts
3522 self._log = log
3523 self._loop = loop
3524 self._nsm = nsm
3525 self._project = nsm._project
3526 self._nsd = None
3527
3528 self._ns_regh = None
3529
3530 self._manager = None
3531 self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + \
3532 'project/{}/'.format(self._project) + \
3533 'config/ns-instance-config'
3534
3535 self._model = RwYang.Model.create_libncx()
3536 self._model.load_schema_ypbc(RwNsrYang.get_schema())
3537
3538 @property
3539 def nsm(self):
3540 """ Return the NS manager instance """
3541 return self._nsm
3542
3543 @staticmethod
3544 def wrap_netconf_config_xml(xml):
3545 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
3546 return xml
3547
3548 @asyncio.coroutine
3549 def _connect(self, timeout_secs=240):
3550
3551 start_time = time.time()
3552 while (time.time() - start_time) < timeout_secs:
3553
3554 try:
3555 self._log.debug("Attemping NsmTasklet netconf connection.")
3556
3557 manager = yield from ncclient.asyncio_manager.asyncio_connect(
3558 loop=self._loop,
3559 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
3560 port=NsrRpcDtsHandler.NETCONF_PORT,
3561 username=NsrRpcDtsHandler.NETCONF_USER,
3562 password=NsrRpcDtsHandler.NETCONF_PW,
3563 allow_agent=False,
3564 look_for_keys=False,
3565 hostkey_verify=False,
3566 )
3567
3568 return manager
3569
3570 except ncclient.transport.errors.SSHError as e:
3571 self._log.warning("Netconf connection to launchpad %s failed: %s",
3572 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
3573
3574 yield from asyncio.sleep(5, loop=self._loop)
3575
3576 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
3577 timeout_secs)
3578
3579 def _apply_ns_instance_config(self,payload_dict):
3580 req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
3581 response=requests.post(self._nsr_config_url,
3582 headers=req_hdr,
3583 auth=(NsrRpcDtsHandler.NETCONF_USER, NsrRpcDtsHandler.NETCONF_PW),
3584 data=payload_dict,
3585 verify=False)
3586 return response
3587
3588 @asyncio.coroutine
3589 def register(self):
3590 """ Register for NS monitoring read from dts """
3591
3592 @asyncio.coroutine
3593 def on_ns_config_prepare(xact_info, action, ks_path, msg):
3594 """ prepare callback from dts start-network-service"""
3595 assert action == rwdts.QueryAction.RPC
3596
3597 if not self._project.rpc_check(msg, xact_info):
3598 return
3599
3600 rpc_ip = msg
3601 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
3602 "nsr_id":str(uuid.uuid4())
3603 })
3604
3605 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and
3606 ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
3607 errmsg = (
3608 "Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".
3609 format(rpc_ip))
3610 self._log.error(errmsg)
3611 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
3612 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3613 errmsg)
3614 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3615 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3616 return
3617
3618 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
3619
3620 try:
3621 # Add used value to the pool
3622 self._log.debug("RPC output: {}".format(rpc_op))
3623
3624 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
3625
3626 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
3627 rpc_ip.name, rpc_ip.nsd_ref)
3628
3629 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
3630 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
3631 if k in RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr().fields}
3632 ns_instance_config_dict.update(ns_instance_config_copy_dict)
3633
3634 ns_instance_config = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
3635 ns_instance_config.nsd = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_Nsd()
3636 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
3637
3638 payload_dict = ns_instance_config.to_json(self._model)
3639
3640 self._log.debug("Sending configure ns-instance-config json to %s: %s",
3641 self._nsr_config_url,ns_instance_config)
3642
3643 response = yield from self._loop.run_in_executor(
3644 None,
3645 self._apply_ns_instance_config,
3646 payload_dict
3647 )
3648 response.raise_for_status()
3649 self._log.debug("Received edit config response: %s", response.json())
3650
3651 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
3652 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3653 rpc_op)
3654 except Exception as e:
3655 errmsg = ("Exception processing the "
3656 "start-network-service: {}".format(e))
3657 self._log.exception(errmsg)
3658 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
3659 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3660 errmsg)
3661 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3662 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3663
3664 self._ns_regh = yield from self._dts.register(
3665 xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
3666 handler=rift.tasklets.DTS.RegistrationHandler(
3667 on_prepare=on_ns_config_prepare),
3668 flags=rwdts.Flag.PUBLISHER,
3669 )
3670
3671 def deregister(self):
3672 if self._ns_regh:
3673 self._ns_regh.deregister()
3674 self._ns_regh = None
3675
3676
3677 class NsrDtsHandler(object):
3678 """ The network service DTS handler """
3679 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3680 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3681 KEY_PAIR_XPATH = "C,/nsr:key-pair"
3682
3683 def __init__(self, dts, log, loop, nsm):
3684 self._dts = dts
3685 self._log = log
3686 self._loop = loop
3687 self._nsm = nsm
3688 self._project = self._nsm._project
3689
3690 self._nsr_regh = None
3691 self._scale_regh = None
3692 self._key_pair_regh = None
3693
3694 @property
3695 def nsm(self):
3696 """ Return the NS manager instance """
3697 return self._nsm
3698
3699 @asyncio.coroutine
3700 def register(self):
3701 """ Register for Nsr create/update/delete/read requests from dts """
3702
3703 if self._nsr_regh:
3704 self._log.warning("DTS handler already registered for project {}".
3705 format(self._project.name))
3706 return
3707
3708 def nsr_id_from_keyspec(ks):
3709 nsr_path_entry = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3710 nsr_id = nsr_path_entry.key00.id
3711 return nsr_id
3712
3713 def group_name_from_keyspec(ks):
3714 group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3715 group_name = group_path_entry.key00.scaling_group_name_ref
3716 return group_name
3717
3718 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3719 """ Return boolean indicating if scaling group instance was already commited previously.
3720
3721 By looking at the existing elements in this registration handle (elements not part
3722 of this current xact), we can tell if the instance was configured previously without
3723 keeping any application state.
3724 """
3725 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3726 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3727 elem_group_name = group_name_from_keyspec(keyspec)
3728
3729 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3730 continue
3731
3732 if instance_cfg.id == instance_id:
3733 return True
3734
3735 return False
3736
3737 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3738 delta = {"added": [], "deleted": []}
3739 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3740 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3741 if elem_nsr_id != nsr_id:
3742 continue
3743
3744 elem_group_name = group_name_from_keyspec(keyspec)
3745 if elem_group_name != group_name:
3746 continue
3747
3748 delta["added"].append(instance_cfg.id)
3749
3750 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3751 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3752 if elem_nsr_id != nsr_id:
3753 continue
3754
3755 elem_group_name = group_name_from_keyspec(keyspec)
3756 if elem_group_name != group_name:
3757 continue
3758
3759 if instance_cfg.id in delta["added"]:
3760 delta["added"].remove(instance_cfg.id)
3761 else:
3762 delta["deleted"].append(instance_cfg.id)
3763
3764 return delta
3765
3766 @asyncio.coroutine
3767 def update_nsr_nsd(nsr_id, xact, scratch):
3768
3769 @asyncio.coroutine
3770 def get_nsr_vl_delta(nsr_id, xact, scratch):
3771 delta = {"added": [], "deleted": []}
3772 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3773 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3774 if elem_nsr_id != nsr_id:
3775 continue
3776
3777 if 'vld' in instance_cfg.nsd:
3778 for vld in instance_cfg.nsd.vld:
3779 delta["added"].append(vld)
3780
3781 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3782 self._log.debug("NSR update: %s", instance_cfg)
3783 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3784 if elem_nsr_id != nsr_id:
3785 continue
3786
3787 if 'vld' in instance_cfg.nsd:
3788 for vld in instance_cfg.nsd.vld:
3789 if vld in delta["added"]:
3790 delta["added"].remove(vld)
3791 else:
3792 delta["deleted"].append(vld)
3793
3794 return delta
3795
3796 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3797 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3798
3799 for vld in vl_delta["added"]:
3800 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3801
3802 for vld in vl_delta["deleted"]:
3803 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3804
3805 def get_nsr_key_pairs(dts_member_reg, xact):
3806 key_pairs = {}
3807 for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True):
3808 self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec))
3809 xpath = keyspec.to_xpath(RwNsrYang.get_schema())
3810 key_pairs[instance_cfg.name] = instance_cfg
3811 return key_pairs
3812
3813 def on_apply(dts, acg, xact, action, scratch):
3814 """Apply the configuration"""
3815 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3816 xact, action, scratch)
3817
3818 @asyncio.coroutine
3819 def handle_create_nsr(msg, key_pairs=None, restart_mode=False):
3820 # Handle create nsr requests """
3821 # Do some validations
3822 if not msg.has_field("nsd"):
3823 err = "NSD not provided"
3824 self._log.error(err)
3825 raise NetworkServiceRecordError(err)
3826
3827 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3828 msg.id, msg.as_dict())
3829 nsr = yield from self.nsm.create_nsr(msg,
3830 xact,
3831 key_pairs=key_pairs,
3832 restart_mode=restart_mode)
3833 return nsr
3834
3835 def handle_delete_nsr(msg):
3836 @asyncio.coroutine
3837 def delete_instantiation(ns_id):
3838 """ Delete instantiation """
3839 yield from self._nsm.terminate_ns(ns_id, None)
3840
3841 # Handle delete NSR requests
3842 self._log.info("Delete req for NSR Id: %s received", msg.id)
3843 # Terminate the NSR instance
3844 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3845
3846 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3847 event_descr = "Terminate rcvd for NS Id: %s, NS Name: %s" % (msg.id, msg.name)
3848 nsr.record_event("terminate-rcvd", event_descr)
3849
3850 self._loop.create_task(delete_instantiation(msg.id))
3851
3852 @asyncio.coroutine
3853 def begin_instantiation(nsr):
3854 # Begin instantiation
3855 self._log.info("Beginning NS instantiation: %s", nsr.id)
3856 try:
3857 yield from self._nsm.instantiate_ns(nsr.id, xact)
3858 except Exception as e:
3859 self._log.exception(e)
3860 raise e
3861
3862 @asyncio.coroutine
3863 def instantiate_ns(msg, key_pairs, restart_mode=False):
3864 nsr = yield from handle_create_nsr(msg, key_pairs, restart_mode=restart_mode)
3865 yield from begin_instantiation(nsr)
3866
3867 def on_instantiate_done(fut, msg):
3868 # If the do_instantiate fails, then publish NSR with failed result
3869 e = fut.exception()
3870 if e is not None:
3871 import traceback
3872 print(traceback.format_exception(None, e, e.__traceback__), file=sys.stderr, flush=True)
3873 self._log.error("NSR instantiation failed for NSR id %s: %s", msg.id, str(e))
3874 failed_nsr = self._nsm.nsrs[msg.id]
3875 self._loop.create_task(failed_nsr.instantiation_failed(failed_reason=str(e)))
3876
3877
3878 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3879 xact, action, scratch)
3880
3881 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3882 key_pairs = []
3883 if self._key_pair_regh:
3884 for element in self._key_pair_regh.elements:
3885 key_pairs.append(element)
3886 else:
3887 self._log.error("Reg handle none for key pair in project {}".
3888 format(self._project))
3889
3890 if self._nsr_regh:
3891 for element in self._nsr_regh.elements:
3892 if element.id not in self.nsm._nsrs:
3893 instantiate_task = self._loop.create_task(instantiate_ns(element, key_pairs,
3894 restart_mode=True))
3895 instantiate_task.add_done_callback(functools.partial(on_instantiate_done, msg=element))
3896 else:
3897 self._log.error("Reg handle none for NSR in project {}".
3898 format(self._project))
3899
3900 return RwTypes.RwStatus.SUCCESS
3901
3902 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3903 xact,
3904 "id")
3905 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3906 deleted_msgs, updated_msgs)
3907
3908 for msg in added_msgs:
3909 if msg.id not in self._nsm.nsrs:
3910 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3911 key_pairs = get_nsr_key_pairs(self._key_pair_regh, xact)
3912 instantiate_task = self._loop.create_task(instantiate_ns(msg,key_pairs))
3913 instantiate_task.add_done_callback(functools.partial(on_instantiate_done, msg=msg))
3914
3915 for msg in deleted_msgs:
3916 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3917 try:
3918 handle_delete_nsr(msg)
3919 except Exception:
3920 self._log.exception("Failed to terminate NS:%s", msg.id)
3921
3922 for msg in updated_msgs:
3923 self._log.info("Update NSR received in on_apply: %s", msg)
3924 self._nsm.nsr_update_cfg(msg.id, msg)
3925
3926 if 'nsd' in msg:
3927 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3928
3929 for group in msg.scaling_group:
3930 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3931 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3932
3933 for instance_id in instance_delta["added"]:
3934 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3935
3936 for instance_id in instance_delta["deleted"]:
3937 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3938
3939
3940 return RwTypes.RwStatus.SUCCESS
3941
3942 @asyncio.coroutine
3943 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3944 """ Prepare calllback from DTS for NSR """
3945
3946 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3947 action = xact_info.query_action
3948 self._log.debug(
3949 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3950 xact, action, xact_info, xpath, msg
3951 )
3952
3953 fref = ProtobufC.FieldReference.alloc()
3954 fref.goto_whole_message(msg.to_pbcm())
3955
3956 def send_err_msg(err_msg):
3957 self._log.error(errmsg)
3958 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
3959 xpath,
3960 errmsg)
3961 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
3962
3963
3964 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3965 # if this is an NSR create
3966 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3967 # Ensure the Cloud account/datacenter has been specified
3968 if not msg.has_field("datacenter") and not msg.has_field("datacenter"):
3969 errmsg = ("Cloud account or datacenter not specified in NS {}".
3970 format(msg.name))
3971 send_err_msg(errmsg)
3972 return
3973
3974 # Check if nsd is specified
3975 if not msg.has_field("nsd"):
3976 errmsg = ("NSD not specified in NS {}".
3977 format(msg.name))
3978 send_err_msg(errmsg)
3979 return
3980
3981 else:
3982 nsr = self._nsm.nsrs[msg.id]
3983 if msg.has_field("nsd"):
3984 if nsr.state != NetworkServiceRecordState.RUNNING:
3985 errmsg = ("Unable to update VL when NS {} not in running state".
3986 format(msg.name))
3987 send_err_msg(errmsg)
3988 return
3989
3990 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3991 errmsg = ("NS config {} NSD should have atleast 1 VLD".
3992 format(msg.name))
3993 send_err_msg(errmsg)
3994 return
3995
3996 if msg.has_field("scaling_group"):
3997 self._log.debug("ScaleMsg %s", msg)
3998 self._log.debug("NSSCALINGSTATE %s", nsr.state)
3999 if nsr.state != NetworkServiceRecordState.RUNNING:
4000 errmsg = ("Unable to perform scaling action when NS {} not in running state".
4001 format(msg.name))
4002 send_err_msg(errmsg)
4003 return
4004
4005 if len(msg.scaling_group) > 1:
4006 errmsg = ("Only a single scaling group can be configured at a time for NS {}".
4007 format(msg.name))
4008 send_err_msg(errmsg)
4009 return
4010
4011 for group_msg in msg.scaling_group:
4012 num_new_group_instances = len(group_msg.instance)
4013 if num_new_group_instances > 1:
4014 errmsg = ("Only a single scaling instance can be modified at a time for NS {}".
4015 format(msg.name))
4016 send_err_msg(errmsg)
4017 return
4018
4019 elif num_new_group_instances == 1:
4020 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
4021 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
4022 if len(scale_group.instances) == scale_group.max_instance_count:
4023 errmsg = (" Max instances for {} reached for NS {}".
4024 format(str(scale_group), msg.name))
4025 send_err_msg(errmsg)
4026 return
4027
4028 acg.handle.prepare_complete_ok(xact_info.handle)
4029
4030
4031 xpath = self._project.add_project(NsrDtsHandler.NSR_XPATH)
4032 self._log.debug("Registering for NSR config using xpath: {}".
4033 format(xpath))
4034
4035 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
4036 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
4037 self._nsr_regh = acg.register(
4038 xpath=xpath,
4039 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
4040 on_prepare=on_prepare
4041 )
4042
4043 self._scale_regh = acg.register(
4044 xpath=self._project.add_project(NsrDtsHandler.SCALE_INSTANCE_XPATH),
4045 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
4046 )
4047
4048 self._key_pair_regh = acg.register(
4049 xpath=self._project.add_project(NsrDtsHandler.KEY_PAIR_XPATH),
4050 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
4051 )
4052
4053 def deregister(self):
4054 self._log.debug("De-register NSR config for project {}".
4055 format(self._project.name))
4056 if self._nsr_regh:
4057 self._nsr_regh.deregister()
4058 self._nsr_regh = None
4059 if self._scale_regh:
4060 self._scale_regh.deregister()
4061 self._scale_regh = None
4062 if self._key_pair_regh:
4063 self._key_pair_regh.deregister()
4064 self._key_pair_regh = None
4065
4066
4067 class VnfrDtsHandler(object):
4068 """ The virtual network service DTS handler """
4069 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
4070
4071 def __init__(self, dts, log, loop, nsm):
4072 self._dts = dts
4073 self._log = log
4074 self._loop = loop
4075 self._nsm = nsm
4076
4077 self._regh = None
4078
4079 @property
4080 def regh(self):
4081 """ Return registration handle """
4082 return self._regh
4083
4084 @property
4085 def nsm(self):
4086 """ Return the NS manager instance """
4087 return self._nsm
4088
4089 @asyncio.coroutine
4090 def register(self):
4091 """ Register for vnfr create/update/delete/ advises from dts """
4092 if self._regh:
4093 self._log.warning("VNFR DTS handler already registered for project {}".
4094 format(self._project.name))
4095 return
4096
4097 @asyncio.coroutine
4098 def on_prepare(xact_info, action, ks_path, msg):
4099 """ prepare callback from dts """
4100 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
4101 self._log.debug(
4102 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
4103 xact_info, action, ks_path, msg
4104 )
4105
4106 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
4107 path_entry = schema.keyspec_to_entry(ks_path)
4108 if not path_entry or (path_entry.key00.id not in self._nsm._vnfrs):
4109 # This can happen when using external RO or after delete with monitoring params
4110 self._log.debug("%s request for non existent record path %s",
4111 action, xpath)
4112 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
4113
4114 return
4115
4116 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
4117 yield from self._nsm.update_vnfr(msg)
4118 elif action == rwdts.QueryAction.DELETE:
4119 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
4120
4121 self._nsm.delete_vnfr(path_entry.key00.id)
4122
4123 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
4124
4125 self._log.debug("Registering for VNFR using xpath: %s",
4126 VnfrDtsHandler.XPATH)
4127
4128 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
4129 with self._dts.group_create() as group:
4130 self._regh = group.register(xpath=self._nsm._project.add_project(
4131 VnfrDtsHandler.XPATH),
4132 handler=hdl,
4133 flags=(rwdts.Flag.SUBSCRIBER),)
4134
4135 def deregister(self):
4136 self._log.debug("De-register VNFR for project {}".
4137 format(self._nsm._project.name))
4138 if self._regh:
4139 self._regh.deregister()
4140 self._regh = None
4141
4142 class NsManager(object):
4143 """ The Network Service Manager class"""
4144 def __init__(self, dts, log, loop, project,
4145 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
4146 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
4147 self._dts = dts
4148 self._log = log
4149 self._loop = loop
4150 self._project = project
4151 self._nsr_handler = nsr_handler
4152 self._vnfr_pub_handler = vnfr_handler
4153 self._vlr_pub_handler = vlr_handler
4154 self._vnffgmgr = vnffgmgr
4155 self._vnfd_pub_handler = vnfd_pub_handler
4156 self._cloud_account_handler = cloud_account_handler
4157
4158 self._ro_plugin_selector = ro_plugin_selector
4159
4160 # Intialize the set of variables for implementing Scaling RPC using REST.
4161 self._headers = {"content-type":"application/json", "accept":"application/json"}
4162 self._user = '@rift'
4163 self._password = 'rift'
4164 self._ip = 'localhost'
4165 self._rport = 8008
4166 self._conf_url = "https://{ip}:{port}/api/config/project/{project}". \
4167 format(ip=self._ip,
4168 port=self._rport,
4169 project=self._project.name)
4170
4171 self._nsrs = {}
4172 self._nsds = {}
4173 self._vnfds = {}
4174 self._vnfrs = {}
4175 self._nsr_for_vlr = {}
4176
4177 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
4178
4179 # TODO: All these handlers should move to tasklet level.
4180 # Passing self is often an indication of bad design
4181 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
4182 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
4183 self._dts_handlers = [self._nsd_dts_handler,
4184 VnfrDtsHandler(dts, log, loop, self),
4185 NsrDtsHandler(dts, log, loop, self),
4186 ScalingRpcHandler(log, dts, loop, self, self.scale_rpc_callback),
4187 # NsrRpcDtsHandler(dts, log, loop, self),
4188 self._vnfd_dts_handler,
4189 self.cfgmgr_obj,
4190 ]
4191
4192
4193 @property
4194 def log(self):
4195 """ Log handle """
4196 return self._log
4197
4198 @property
4199 def loop(self):
4200 """ Loop """
4201 return self._loop
4202
4203 @property
4204 def dts(self):
4205 """ DTS handle """
4206 return self._dts
4207
4208 @property
4209 def nsr_handler(self):
4210 """" NSR handler """
4211 return self._nsr_handler
4212
4213 @property
4214 def so_obj(self):
4215 """" So Obj handler """
4216 return self._so_obj
4217
4218 @property
4219 def nsrs(self):
4220 """ NSRs in this NSM"""
4221 return self._nsrs
4222
4223 @property
4224 def nsds(self):
4225 """ NSDs in this NSM"""
4226 return self._nsds
4227
4228 @property
4229 def vnfds(self):
4230 """ VNFDs in this NSM"""
4231 return self._vnfds
4232
4233 @property
4234 def vnfrs(self):
4235 """ VNFRs in this NSM"""
4236 return self._vnfrs
4237
4238 @property
4239 def nsr_pub_handler(self):
4240 """ NSR publication handler """
4241 return self._nsr_handler
4242
4243 @property
4244 def vnfr_pub_handler(self):
4245 """ VNFR publication handler """
4246 return self._vnfr_pub_handler
4247
4248 @property
4249 def vlr_pub_handler(self):
4250 """ VLR publication handler """
4251 return self._vlr_pub_handler
4252
4253 @property
4254 def vnfd_pub_handler(self):
4255 return self._vnfd_pub_handler
4256
4257 @asyncio.coroutine
4258 def register(self):
4259 """ Register all static DTS handlers """
4260 self._log.debug("Register DTS handlers for project {}".format(self._project))
4261 for dts_handle in self._dts_handlers:
4262 if asyncio.iscoroutinefunction(dts_handle.register):
4263 yield from dts_handle.register()
4264 else:
4265 dts_handle.register()
4266
4267 def deregister(self):
4268 """ Register all static DTS handlers """
4269 for dts_handle in self._dts_handlers:
4270 dts_handle.deregister()
4271
4272
4273 def get_ns_by_nsr_id(self, nsr_id):
4274 """ get NSR by nsr id """
4275 if nsr_id not in self._nsrs:
4276 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
4277
4278 return self._nsrs[nsr_id]
4279
4280 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
4281 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4282 nsr_id,
4283 scale_group_name,
4284 instance_id
4285 )
4286 nsr = self._nsrs[nsr_id]
4287 if nsr.state != NetworkServiceRecordState.RUNNING:
4288 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4289
4290 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
4291
4292 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
4293 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4294 nsr_id,
4295 scale_group_name,
4296 instance_id,
4297 )
4298 nsr = self._nsrs[nsr_id]
4299 if nsr.state != NetworkServiceRecordState.RUNNING:
4300 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4301
4302 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
4303
4304 def scale_rpc_callback(self, xact, msg, action):
4305 """Callback handler for RPC calls
4306 Args:
4307 xact : Transaction Handler
4308 msg : RPC input
4309 action : Scaling Action
4310 """
4311 def get_scaling_group_information():
4312 scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
4313 output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False)
4314 if output.text is None or len(output.text) == 0:
4315 self.log.error("nsr id %s information not present", self._nsr_id)
4316 return None
4317 scaling_group_info = json.loads(output.text)
4318 return scaling_group_info
4319
4320 def config_scaling_group_information(scaling_group_info):
4321 data_str = json.dumps(scaling_group_info)
4322
4323 scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
4324 response = requests.put(scale_out_url, data=data_str, verify=False,
4325 auth=(self._user, self._password), headers=self._headers)
4326 response.raise_for_status()
4327
4328 def scale_out():
4329 scaling_group_info = get_scaling_group_information()
4330 self._log.debug("Scale out info: {}".format(scaling_group_info))
4331 if scaling_group_info is None:
4332 return
4333
4334 scaling_group_present = False
4335 if "scaling-group" in scaling_group_info["nsr:nsr"]:
4336 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
4337 for scaling_group in scaling_group_array:
4338 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
4339 scaling_group_present = True
4340 if 'instance' not in scaling_group:
4341 scaling_group['instance'] = []
4342 for instance in scaling_group['instance']:
4343 if instance["id"] == int(msg.instance_id):
4344 self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id)
4345 return
4346 scaling_group["instance"].append({"id": int(msg.instance_id)})
4347
4348 if not scaling_group_present:
4349 scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref,
4350 "instance": [{"id": msg.instance_id}]}]
4351
4352 config_scaling_group_information(scaling_group_info)
4353 return
4354
4355 def scale_in():
4356 scaling_group_info = get_scaling_group_information()
4357 if scaling_group_info is None:
4358 return
4359
4360 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
4361 scaling_group_present = False
4362 instance_id_present = False
4363 for scaling_group in scaling_group_array:
4364 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
4365 scaling_group_present = True
4366 if 'instance' in scaling_group:
4367 instance_array = scaling_group["instance"];
4368 for index in range(len(instance_array)):
4369 if instance_array[index]["id"] == int(msg.instance_id):
4370 instance_array.pop(index)
4371 instance_id_present = True
4372 break
4373
4374 if not scaling_group_present:
4375 self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref)
4376 return
4377
4378 if not instance_id_present:
4379 self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id)
4380 return
4381
4382 config_scaling_group_information(scaling_group_info)
4383 return
4384
4385 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
4386 self._loop.run_in_executor(None, scale_out)
4387 else:
4388 self._loop.run_in_executor(None, scale_in)
4389
4390 def nsr_update_cfg(self, nsr_id, msg):
4391 nsr = self._nsrs[nsr_id]
4392 nsr.nsr_cfg_msg= msg
4393
4394 def nsr_instantiate_vl(self, nsr_id, vld):
4395 self.log.error("NSR {} create VL {}".format(nsr_id, vld))
4396 nsr = self._nsrs[nsr_id]
4397 if nsr.state != NetworkServiceRecordState.RUNNING:
4398 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
4399
4400 # Not calling in a separate task as this is called from a separate task
4401 yield from nsr.create_vl_instance(vld)
4402
4403 def nsr_terminate_vl(self, nsr_id, vld):
4404 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
4405 nsr = self._nsrs[nsr_id]
4406 if nsr.state != NetworkServiceRecordState.RUNNING:
4407 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
4408
4409 # Not calling in a separate task as this is called from a separate task
4410 yield from nsr.delete_vl_instance(vld)
4411
4412 @asyncio.coroutine
4413 def create_nsr(self, nsr_msg, config_xact, key_pairs=None,restart_mode=False):
4414 """ Create an NSR instance """
4415 self._log.debug("NSRMSG %s", nsr_msg)
4416 if nsr_msg.id in self._nsrs:
4417 msg = "NSR id %s already exists" % nsr_msg.id
4418 self._log.error(msg)
4419 raise NetworkServiceRecordError(msg)
4420
4421 self._log.debug("Create NetworkServiceRecord nsr id %s from nsd_id %s",
4422 nsr_msg.id,
4423 nsr_msg.nsd.id)
4424
4425 nsm_plugin = self._ro_plugin_selector.get_ro_plugin(nsr_msg.resource_orchestrator)
4426 #Work Around - openmano expects datacenter id instead of datacenter name
4427 if isinstance(nsm_plugin, openmano_nsm.OpenmanoNsPlugin):
4428 for uuid, name in nsm_plugin._cli_api.datacenter_list():
4429 if name == nsr_msg.datacenter:
4430 nsr_msg.datacenter = uuid
4431
4432 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.datacenter)
4433
4434 nsr = NetworkServiceRecord(self._dts,
4435 self._log,
4436 self._loop,
4437 self,
4438 nsm_plugin,
4439 nsr_msg,
4440 sdn_account_name,
4441 key_pairs,
4442 self._project,
4443 restart_mode=restart_mode,
4444 vlr_handler=self._vlr_pub_handler
4445 )
4446 self._nsrs[nsr_msg.id] = nsr
4447
4448 try:
4449 # Generate ssh key pair if required
4450 nsr.generate_ssh_key_pair(config_xact)
4451 except Exception as e:
4452 self._log.exception("SSH key: {}".format(e))
4453
4454 self._log.debug("NSR {}: SSh key generated: {}".format(nsr_msg.name,
4455 nsr.public_key))
4456
4457 ssh_key = {'private_key': nsr.private_key,
4458 'public_key': nsr.public_key
4459 }
4460
4461 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs, ssh_key=ssh_key)
4462
4463 return nsr
4464
4465 def delete_nsr(self, nsr_id):
4466 """
4467 Delete NSR with the passed nsr id
4468 """
4469 del self._nsrs[nsr_id]
4470
4471 @asyncio.coroutine
4472 def instantiate_ns(self, nsr_id, config_xact):
4473 """ Instantiate an NS instance """
4474 self._log.debug("Instantiating Network service id %s", nsr_id)
4475 if nsr_id not in self._nsrs:
4476 err = "NSR id %s not found " % nsr_id
4477 self._log.error(err)
4478 raise NetworkServiceRecordError(err)
4479
4480 nsr = self._nsrs[nsr_id]
4481 try:
4482 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
4483 except Exception as e:
4484 self._log.exception("NS instantiate: {}".format(e))
4485 raise e
4486
4487 @asyncio.coroutine
4488 def update_vnfr(self, vnfr):
4489 """Create/Update an VNFR """
4490
4491 vnfr_state = self._vnfrs[vnfr.id].state
4492 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
4493
4494 no_of_active_vms = 0
4495 for vdur in vnfr.vdur:
4496 if vdur.operational_status == 'running':
4497 no_of_active_vms += 1
4498
4499 self._vnfrs[vnfr.id]._active_vdus = no_of_active_vms
4500 yield from self._vnfrs[vnfr.id].update_state(vnfr)
4501 nsr = self.find_nsr_for_vnfr(vnfr.id)
4502 if nsr is not None:
4503 nsr._vnf_inst_started = False
4504 yield from nsr.update_state()
4505
4506 def find_nsr_for_vnfr(self, vnfr_id):
4507 """ Find the NSR which )has the passed vnfr id"""
4508 for nsr in list(self.nsrs.values()):
4509 for vnfr in list(nsr.vnfrs.values()):
4510 if vnfr.id == vnfr_id:
4511 return nsr
4512 return None
4513
4514 def delete_vnfr(self, vnfr_id):
4515 """ Delete VNFR with the passed id"""
4516 del self._vnfrs[vnfr_id]
4517
4518 @asyncio.coroutine
4519 def get_nsr_config(self, nsd_id):
4520 xpath = self._project.add_project("C,/nsr:ns-instance-config")
4521 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
4522
4523 for result in results:
4524 entry = yield from result
4525 ns_instance_config = entry.result
4526
4527 for nsr in ns_instance_config.nsr:
4528 if nsr.nsd.id == nsd_id:
4529 return nsr
4530
4531 return None
4532
4533 def get_nsd(self, nsd_id):
4534 """ Get network service descriptor for the passed nsd_id"""
4535 if nsd_id not in self._nsds:
4536 self._log.error("Cannot find NSD id:%s", nsd_id)
4537 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
4538
4539 return self._nsds[nsd_id]
4540
4541 def create_nsd(self, nsd_msg):
4542 """ Create a network service descriptor """
4543 self._log.debug("Create network service descriptor - %s", nsd_msg)
4544 if nsd_msg.id in self._nsds:
4545 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
4546 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
4547
4548 nsd = NetworkServiceDescriptor(
4549 self._dts,
4550 self._log,
4551 self._loop,
4552 nsd_msg,
4553 self
4554 )
4555 self._nsds[nsd_msg.id] = nsd
4556
4557 return nsd
4558
4559 def update_nsd(self, nsd):
4560 """ update the Network service descriptor """
4561 self._log.debug("Update network service descriptor - %s", nsd)
4562 if nsd.id not in self._nsds:
4563 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
4564 self.create_nsd(nsd)
4565 else:
4566 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
4567 self._nsds[nsd.id].update(nsd)
4568
4569 def delete_nsd(self, nsd_id):
4570 """ Delete the Network service descriptor with the passed id """
4571 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
4572 if nsd_id not in self._nsds:
4573 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
4574 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
4575 del self._nsds[nsd_id]
4576
4577 def get_vnfd_config(self, xact):
4578 vnfd_dts_reg = self._vnfd_dts_handler.regh
4579 for cfg in vnfd_dts_reg.get_xact_elements(xact):
4580 if cfg.id not in self._vnfds:
4581 self.create_vnfd(cfg)
4582
4583 def get_vnfd(self, vnfd_id, xact):
4584 """ Get virtual network function descriptor for the passed vnfd_id"""
4585 if vnfd_id not in self._vnfds:
4586 self._log.error("Cannot find VNFD id:%s", vnfd_id)
4587 self.get_vnfd_config(xact)
4588
4589 if vnfd_id not in self._vnfds:
4590 self._log.error("Cannot find VNFD id:%s", vnfd_id)
4591 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
4592
4593 return self._vnfds[vnfd_id]
4594
4595 def create_vnfd(self, vnfd):
4596 """ Create a virtual network function descriptor """
4597 self._log.debug("Create virtual network function descriptor - %s", vnfd)
4598 if vnfd.id in self._vnfds:
4599 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
4600 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
4601
4602 self._vnfds[vnfd.id] = vnfd
4603 return self._vnfds[vnfd.id]
4604
4605 def update_vnfd(self, vnfd):
4606 """ Update the virtual network function descriptor """
4607 self._log.debug("Update virtual network function descriptor- %s", vnfd)
4608
4609
4610 if vnfd.id not in self._vnfds:
4611 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
4612 self.create_vnfd(vnfd)
4613 else:
4614 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
4615 self._vnfds[vnfd.id] = vnfd
4616
4617 @asyncio.coroutine
4618 def delete_vnfd(self, vnfd_id):
4619 """ Delete the virtual network function descriptor with the passed id """
4620 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
4621 if vnfd_id not in self._vnfds:
4622 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
4623 raise VnfDescriptorError("Cannot find %s", vnfd_id)
4624
4625 del self._vnfds[vnfd_id]
4626
4627 @asyncio.coroutine
4628 def publish_nsr(self, xact, path, msg):
4629 """ Publish a NSR """
4630 self._log.debug("Publish NSR with path %s, msg %s",
4631 path, msg)
4632 yield from self.nsr_handler.update(xact, path, msg)
4633
4634 @asyncio.coroutine
4635 def unpublish_nsr(self, xact, path):
4636 """ Un Publish an NSR """
4637 self._log.debug("Publishing delete NSR with path %s", path)
4638 yield from self.nsr_handler.delete(path, xact)
4639
4640 def vnfr_is_ready(self, vnfr_id):
4641 """ VNFR with the id is ready """
4642 self._log.debug("VNFR id %s ready", vnfr_id)
4643 if vnfr_id not in self._vnfds:
4644 err = "Did not find VNFR ID with id %s" % vnfr_id
4645 self._log.critical("err")
4646 raise VirtualNetworkFunctionRecordError(err)
4647 self._vnfrs[vnfr_id].is_ready()
4648
4649
4650 @asyncio.coroutine
4651 def terminate_ns(self, nsr_id, xact):
4652 """
4653 Terminate network service for the given NSR Id
4654 """
4655
4656 if nsr_id not in self._nsrs:
4657 return
4658
4659 # Terminate the instances/networks assocaited with this nw service
4660 self._log.debug("Terminating the network service %s", nsr_id)
4661 try :
4662 yield from self._nsrs[nsr_id].terminate()
4663 except Exception as e:
4664 self.log.exception("Failed to terminate NSR[id=%s]", nsr_id)
4665
4666 def vlr_event(self, vlr, action):
4667 self._log.debug("Received VLR %s with action:%s", vlr, action)
4668 # Find the NS and see if we can proceed
4669 nsr = self.find_nsr_for_vlr_id(vlr.id)
4670 if nsr is None:
4671 self._log.error("VLR %s:%s received for NSR, state:%s",
4672 vlr.id, vlr.name, vlr.operational_status)
4673 return
4674 nsr.vlr_event(vlr, action)
4675
4676 def add_vlr_id_nsr_map(self, vlr_id, nsr):
4677 """ Add a mapping for vlr_id into NSR """
4678 self._nsr_for_vlr[vlr_id] = nsr
4679
4680 def remove_vlr_id_nsr_map(self, vlr_id):
4681 """ Remove a mapping for vlr_id into NSR """
4682 if vlr_id in self._nsr_for_vlr:
4683 del self._nsr_for_vlr[vlr_id]
4684
4685 def find_nsr_for_vlr_id(self, vlr_id):
4686 """ Find NSR for VLR id """
4687 nsr = None
4688 if vlr_id in self._nsr_for_vlr:
4689 nsr = self._nsr_for_vlr[vlr_id]
4690 return nsr
4691
4692
4693 class NsmRecordsPublisherProxy(object):
4694 """ This class provides a publisher interface that allows plugin objects
4695 to publish NSR/VNFR/VLR"""
4696
4697 def __init__(self, dts, log, loop, project, nsr_pub_hdlr,
4698 vnfr_pub_hdlr, vlr_pub_hdlr,):
4699 self._dts = dts
4700 self._log = log
4701 self._loop = loop
4702 self._project = project
4703 self._nsr_pub_hdlr = nsr_pub_hdlr
4704 self._vlr_pub_hdlr = vlr_pub_hdlr
4705 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4706
4707 @asyncio.coroutine
4708 def publish_nsr_opdata(self, xact, nsr):
4709 """ Publish an NSR """
4710 path = ("D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref={}]"
4711 ).format(quoted_key(nsr.ns_instance_config_ref))
4712 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4713
4714 @asyncio.coroutine
4715 def publish_nsr(self, xact, nsr):
4716 """ Publish an NSR """
4717 path = self._project.add_project(NetworkServiceRecord.xpath_from_nsr(nsr))
4718 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4719
4720 @asyncio.coroutine
4721 def unpublish_nsr(self, xact, nsr):
4722 """ Unpublish an NSR """
4723 path = self._project.add_project(NetworkServiceRecord.xpath_from_nsr(nsr))
4724 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4725
4726 @asyncio.coroutine
4727 def publish_vnfr(self, xact, vnfr):
4728 """ Publish an VNFR """
4729 path = self._project.add_project(VirtualNetworkFunctionRecord.vnfr_xpath(vnfr))
4730 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4731
4732 @asyncio.coroutine
4733 def unpublish_vnfr(self, xact, vnfr):
4734 """ Unpublish a VNFR """
4735 path = self._project.add_project(VirtualNetworkFunctionRecord.vnfr_xpath(vnfr))
4736 yield from self._vnfr_pub_hdlr.delete(xact, path)
4737 # NOTE: The regh delete does not send the on_prepare to VNFM tasklet as well
4738 # as remove all the VNFR elements. So need to send this additional delete block.
4739 with self._dts.transaction(flags = 0) as xact:
4740 block = xact.block_create()
4741 block.add_query_delete(path)
4742 yield from block.execute(flags=0, now=True)
4743
4744 @asyncio.coroutine
4745 def publish_vlr(self, xact, vlr):
4746 """ Publish a VLR """
4747 path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr))
4748 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4749
4750 @asyncio.coroutine
4751 def unpublish_vlr(self, xact, vlr):
4752 """ Unpublish a VLR """
4753 path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr))
4754 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4755
4756 class ScalingRpcHandler(mano_dts.DtsHandler):
4757 """ The Network service Monitor DTS handler """
4758 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4759 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4760
4761 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4762 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4763
4764 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4765
4766 def __init__(self, log, dts, loop, nsm, callback=None):
4767 super().__init__(log, dts, loop, nsm._project)
4768 self._nsm = nsm
4769 self.callback = callback
4770 self.last_instance_id = defaultdict(int)
4771
4772 self._reg_in = None
4773 self._reg_out = None
4774
4775 @asyncio.coroutine
4776 def register(self):
4777
4778 def send_err_msg(err_msg, xact_info, ks_path, e=False):
4779 xpath = ks_path.to_xpath(NsrYang.get_schema())
4780 if e:
4781 self._log.exception(err_msg)
4782 else:
4783 self._log.error(err_msg)
4784 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
4785 xpath,
4786 err_msg)
4787 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
4788
4789 @asyncio.coroutine
4790 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4791 assert action == rwdts.QueryAction.RPC
4792
4793 self._log.debug("Scale in called: {}".format(msg.as_dict()))
4794 if not self.project.rpc_check(msg, xact_info):
4795 return
4796
4797 try:
4798 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4799 "instance_id": msg.instance_id})
4800
4801 nsr = self._nsm.nsrs[msg.nsr_id_ref]
4802 if nsr.state != NetworkServiceRecordState.RUNNING:
4803 errmsg = ("Unable to perform scaling action when NS {}({}) not in running state".
4804 format(nsr.name, nsr.id))
4805 send_err_msg(errmsg, xact_info, ks_path)
4806 return
4807
4808 xact_info.respond_xpath(
4809 rwdts.XactRspCode.ACK,
4810 self.__class__.SCALE_IN_OUTPUT_XPATH,
4811 rpc_op)
4812
4813 if self.callback:
4814 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4815
4816 except Exception as e:
4817 errmsg = ("Exception doing scale in using {}: {}".
4818 format(msg, e))
4819 send_err_msg(errmsg, xact_info, ks_path, e=True)
4820
4821 @asyncio.coroutine
4822 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4823 assert action == rwdts.QueryAction.RPC
4824
4825 self._log.debug("Scale out called: {}".format(msg.as_dict()))
4826 if not self.project.rpc_check(msg, xact_info):
4827 return
4828
4829 try:
4830 scaling_group = msg.scaling_group_name_ref
4831 if not msg.instance_id:
4832 last_instance_id = self.last_instance_id[scale_group]
4833 msg.instance_id = last_instance_id + 1
4834 self.last_instance_id[scale_group] += 1
4835
4836 nsr = self._nsm.nsrs[msg.nsr_id_ref]
4837 if nsr.state != NetworkServiceRecordState.RUNNING:
4838 errmsg = ("Unable to perform scaling action when NS {}({}) not in running state".
4839 format(nsr.name, nsr.id))
4840 send_err_msg(errmsg, xact_info, ks_path)
4841 return
4842
4843 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4844 "instance_id": msg.instance_id})
4845
4846 xact_info.respond_xpath(
4847 rwdts.XactRspCode.ACK,
4848 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4849 rpc_op)
4850
4851 if self.callback:
4852 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4853
4854 except Exception as e:
4855 errmsg = ("Exception doing scale in using {}: {}".
4856 format(msg, e))
4857 send_err_msg(errmsg, xact_info, ks_path, e=True)
4858
4859 self._reg_in = yield from self.dts.register(
4860 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4861 handler=rift.tasklets.DTS.RegistrationHandler(
4862 on_prepare=on_scale_in_prepare),
4863 flags=rwdts.Flag.PUBLISHER)
4864
4865 self._reg_out = yield from self.dts.register(
4866 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4867 handler=rift.tasklets.DTS.RegistrationHandler(
4868 on_prepare=on_scale_out_prepare),
4869 flags=rwdts.Flag.PUBLISHER)
4870
4871 def deregister(self):
4872 if self._reg_in:
4873 self._reg_in.deregister()
4874 self._reg_in = None
4875
4876 if self._reg_out:
4877 self._reg_out.deregister()
4878 self._reg_out = None
4879
4880
4881 class NsmProject(ManoProject):
4882
4883 def __init__(self, name, tasklet, **kw):
4884 super(NsmProject, self).__init__(tasklet.log, name)
4885 self.update(tasklet)
4886 self._nsm = None
4887
4888 self._ro_plugin_selector = None
4889 self._vnffgmgr = None
4890
4891 self._nsr_pub_handler = None
4892 self._vnfr_pub_handler = None
4893 self._vlr_pub_handler = None
4894 self._vnfd_pub_handler = None
4895 self._scale_cfg_handler = None
4896
4897 self._records_publisher_proxy = None
4898
4899 def vlr_event(self, vlr, action):
4900 """ VLR Event callback """
4901 self.log.debug("VLR Event received for VLR %s with action %s", vlr, action)
4902 self._nsm.vlr_event(vlr, action)
4903
4904 @asyncio.coroutine
4905 def register(self):
4906 self.log.debug("Register NsmProject for {}".format(self.name))
4907
4908 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(
4909 self._dts, self.log, self.loop, self)
4910 yield from self._nsr_pub_handler.register()
4911
4912 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(
4913 self._dts, self.log, self.loop, self)
4914 yield from self._vnfr_pub_handler.register()
4915
4916 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(
4917 self._dts, self.log, self.loop, self)
4918 yield from self._vlr_pub_handler.register()
4919
4920 self._vlr_sub_handler = subscriber.VlrSubscriberDtsHandler(self.log,
4921 self._dts,
4922 self.loop,
4923 self,
4924 self.vlr_event,
4925 )
4926 yield from self._vlr_sub_handler.register()
4927
4928 manifest = self._tasklet.tasklet_info.get_pb_manifest()
4929 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4930 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4931 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4932
4933 self._vnfd_pub_handler = publisher.VnfdPublisher(
4934 use_ssl, ssl_cert, ssl_key, self.loop, self)
4935
4936 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4937 self._dts,
4938 self.log,
4939 self.loop,
4940 self,
4941 self._nsr_pub_handler,
4942 self._vnfr_pub_handler,
4943 self._vlr_pub_handler,
4944 )
4945
4946 # Register the NSM to receive the nsm plugin
4947 # when cloud account is configured
4948 self._ro_plugin_selector = cloud.ROAccountConfigSubscriber(
4949 self._dts,
4950 self.log,
4951 self.loop,
4952 self,
4953 self._records_publisher_proxy
4954 )
4955 yield from self._ro_plugin_selector.register()
4956
4957 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4958 self._log,
4959 self._dts,
4960 self.log_hdl,
4961 self,
4962 )
4963
4964 yield from self._cloud_account_handler.register()
4965
4966 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop,
4967 self, self._cloud_account_handler)
4968 yield from self._vnffgmgr.register()
4969
4970 self._nsm = NsManager(
4971 self._dts,
4972 self.log,
4973 self.loop,
4974 self,
4975 self._nsr_pub_handler,
4976 self._vnfr_pub_handler,
4977 self._vlr_pub_handler,
4978 self._ro_plugin_selector,
4979 self._vnffgmgr,
4980 self._vnfd_pub_handler,
4981 self._cloud_account_handler,
4982 )
4983
4984 yield from self._nsm.register()
4985 self.log.debug("Register NsmProject for {} complete".format(self.name))
4986
4987 def deregister(self):
4988 self._log.debug("Project {} de-register".format(self.name))
4989 self._nsm.deregister()
4990 self._vnffgmgr.deregister()
4991 self._cloud_account_handler.deregister()
4992 self._ro_plugin_selector.deregister()
4993 self._nsr_pub_handler.deregister()
4994 self._vnfr_pub_handler.deregister()
4995 self._vlr_pub_handler.deregister()
4996 self._vlr_sub_handler.deregister()
4997 self._nsm = None
4998
4999 @asyncio.coroutine
5000 def delete_prepare(self):
5001 if self._nsm and self._nsm._nsrs:
5002 delete_msg = "Project has NSR associated with it. Delete all Project NSR and try again."
5003 return False, delete_msg
5004 return True, "True"
5005
5006
5007 class NsmTasklet(rift.tasklets.Tasklet):
5008 """
5009 The network service manager tasklet
5010 """
5011 def __init__(self, *args, **kwargs):
5012 super(NsmTasklet, self).__init__(*args, **kwargs)
5013 self.rwlog.set_category("rw-mano-log")
5014 self.rwlog.set_subcategory("nsm")
5015
5016 self._dts = None
5017 self.project_handler = None
5018 self.projects = {}
5019
5020 @property
5021 def dts(self):
5022 return self._dts
5023
5024 def start(self):
5025 """ The task start callback """
5026 super(NsmTasklet, self).start()
5027 self.log.info("Starting NsmTasklet")
5028
5029 self.log.debug("Registering with dts")
5030 self._dts = rift.tasklets.DTS(self.tasklet_info,
5031 RwNsmYang.get_schema(),
5032 self.loop,
5033 self.on_dts_state_change)
5034
5035 self.log.debug("Created DTS Api GI Object: %s", self._dts)
5036
5037 def stop(self):
5038 try:
5039 self._dts.deinit()
5040 except Exception:
5041 print("Caught Exception in NSM stop:", sys.exc_info()[0])
5042 raise
5043
5044 def on_instance_started(self):
5045 """ Task instance started callback """
5046 self.log.debug("Got instance started callback")
5047
5048 @asyncio.coroutine
5049 def init(self):
5050 """ Task init callback """
5051 self.log.debug("Got instance started callback")
5052
5053 self.log.debug("creating project handler")
5054 self.project_handler = ProjectHandler(self, NsmProject)
5055 self.project_handler.register()
5056
5057
5058
5059 @asyncio.coroutine
5060 def run(self):
5061 """ Task run callback """
5062 pass
5063
5064 @asyncio.coroutine
5065 def on_dts_state_change(self, state):
5066 """Take action according to current dts state to transition
5067 application into the corresponding application state
5068
5069 Arguments
5070 state - current dts state
5071 """
5072 switch = {
5073 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
5074 rwdts.State.CONFIG: rwdts.State.RUN,
5075 }
5076
5077 handlers = {
5078 rwdts.State.INIT: self.init,
5079 rwdts.State.RUN: self.run,
5080 }
5081
5082 # Transition application to next state
5083 handler = handlers.get(state, None)
5084 if handler is not None:
5085 yield from handler()
5086
5087 # Transition dts to next state
5088 next_state = switch.get(state, None)
5089 if next_state is not None:
5090 self.log.debug("Changing state to %s", next_state)
5091 self._dts.handle.set_state(next_state)