Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=self._on_update_k8s_db,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211 try:
212
213 nsr_id = filter.get('_id')
214
215 # read ns record from database
216 nsr = self.db.get_one(table='nsrs', q_filter=filter)
217 current_ns_status = nsr.get('nsState')
218
219 # get vca status for NS
220 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
221
222 # vcaStatus
223 db_dict = dict()
224 db_dict['vcaStatus'] = status_dict
225 await self.n2vc.update_vca_status(db_dict['vcaStatus'])
226
227 # update configurationStatus for this VCA
228 try:
229 vca_index = int(path[path.rfind(".")+1:])
230
231 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
232 vca_status = vca_list[vca_index].get('status')
233
234 configuration_status_list = nsr.get('configurationStatus')
235 config_status = configuration_status_list[vca_index].get('status')
236
237 if config_status == 'BROKEN' and vca_status != 'failed':
238 db_dict['configurationStatus'][vca_index] = 'READY'
239 elif config_status != 'BROKEN' and vca_status == 'failed':
240 db_dict['configurationStatus'][vca_index] = 'BROKEN'
241 except Exception as e:
242 # not update configurationStatus
243 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
244
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
247 is_degraded = False
248 if current_ns_status in ('READY', 'DEGRADED'):
249 error_description = ''
250 # check machines
251 if status_dict.get('machines'):
252 for machine_id in status_dict.get('machines'):
253 machine = status_dict.get('machines').get(machine_id)
254 # check machine agent-status
255 if machine.get('agent-status'):
256 s = machine.get('agent-status').get('status')
257 if s != 'started':
258 is_degraded = True
259 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
260 # check machine instance status
261 if machine.get('instance-status'):
262 s = machine.get('instance-status').get('status')
263 if s != 'running':
264 is_degraded = True
265 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
266 # check applications
267 if status_dict.get('applications'):
268 for app_id in status_dict.get('applications'):
269 app = status_dict.get('applications').get(app_id)
270 # check application status
271 if app.get('status'):
272 s = app.get('status').get('status')
273 if s != 'active':
274 is_degraded = True
275 error_description += 'application {} status={} ; '.format(app_id, s)
276
277 if error_description:
278 db_dict['errorDescription'] = error_description
279 if current_ns_status == 'READY' and is_degraded:
280 db_dict['nsState'] = 'DEGRADED'
281 if current_ns_status == 'DEGRADED' and not is_degraded:
282 db_dict['nsState'] = 'READY'
283
284 # write to database
285 self.update_db_2("nsrs", nsr_id, db_dict)
286
287 except (asyncio.CancelledError, asyncio.TimeoutError):
288 raise
289 except Exception as e:
290 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
291
292 async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None):
293 """
294 Updating vca status in NSR record
295 :param cluster_uuid: UUID of a k8s cluster
296 :param kdu_instance: The unique name of the KDU instance
297 :param filter: To get nsr_id
298 :return: none
299 """
300
301 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
302 # .format(cluster_uuid, kdu_instance, filter))
303
304 try:
305 nsr_id = filter.get('_id')
306
307 # get vca status for NS
308 vca_status = await self.k8sclusterjuju.status_kdu(cluster_uuid,
309 kdu_instance,
310 complete_status=True,
311 yaml_format=False)
312 # vcaStatus
313 db_dict = dict()
314 db_dict['vcaStatus'] = {nsr_id: vca_status}
315
316 await self.k8sclusterjuju.update_vca_status(db_dict['vcaStatus'], kdu_instance)
317
318 # write to database
319 self.update_db_2("nsrs", nsr_id, db_dict)
320
321 except (asyncio.CancelledError, asyncio.TimeoutError):
322 raise
323 except Exception as e:
324 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
325
326 @staticmethod
327 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
328 try:
329 env = Environment(undefined=StrictUndefined)
330 template = env.from_string(cloud_init_text)
331 return template.render(additional_params or {})
332 except UndefinedError as e:
333 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
334 "file, must be provided in the instantiation parameters inside the "
335 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
336 except (TemplateError, TemplateNotFound) as e:
337 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
338 format(vnfd_id, vdu_id, e))
339
340 def _get_vdu_cloud_init_content(self, vdu, vnfd):
341 cloud_init_content = cloud_init_file = None
342 try:
343 if vdu.get("cloud-init-file"):
344 base_folder = vnfd["_admin"]["storage"]
345 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
346 vdu["cloud-init-file"])
347 with self.fs.file_open(cloud_init_file, "r") as ci_file:
348 cloud_init_content = ci_file.read()
349 elif vdu.get("cloud-init"):
350 cloud_init_content = vdu["cloud-init"]
351
352 return cloud_init_content
353 except FsException as e:
354 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
355 format(vnfd["id"], vdu["id"], cloud_init_file, e))
356
357 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
358 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
359 additional_params = vdur.get("additionalParams")
360 return parse_yaml_strings(additional_params)
361
362 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
363 """
364 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
365 :param vnfd: input vnfd
366 :param new_id: overrides vnf id if provided
367 :param additionalParams: Instantiation params for VNFs provided
368 :param nsrId: Id of the NSR
369 :return: copy of vnfd
370 """
371 vnfd_RO = deepcopy(vnfd)
372 # remove unused by RO configuration, monitoring, scaling and internal keys
373 vnfd_RO.pop("_id", None)
374 vnfd_RO.pop("_admin", None)
375 vnfd_RO.pop("monitoring-param", None)
376 vnfd_RO.pop("scaling-group-descriptor", None)
377 vnfd_RO.pop("kdu", None)
378 vnfd_RO.pop("k8s-cluster", None)
379 if new_id:
380 vnfd_RO["id"] = new_id
381
382 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
383 for vdu in get_iterable(vnfd_RO, "vdu"):
384 vdu.pop("cloud-init-file", None)
385 vdu.pop("cloud-init", None)
386 return vnfd_RO
387
388 @staticmethod
389 def ip_profile_2_RO(ip_profile):
390 RO_ip_profile = deepcopy(ip_profile)
391 if "dns-server" in RO_ip_profile:
392 if isinstance(RO_ip_profile["dns-server"], list):
393 RO_ip_profile["dns-address"] = []
394 for ds in RO_ip_profile.pop("dns-server"):
395 RO_ip_profile["dns-address"].append(ds['address'])
396 else:
397 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
398 if RO_ip_profile.get("ip-version") == "ipv4":
399 RO_ip_profile["ip-version"] = "IPv4"
400 if RO_ip_profile.get("ip-version") == "ipv6":
401 RO_ip_profile["ip-version"] = "IPv6"
402 if "dhcp-params" in RO_ip_profile:
403 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
404 return RO_ip_profile
405
406 def _get_ro_vim_id_for_vim_account(self, vim_account):
407 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
408 if db_vim["_admin"]["operationalState"] != "ENABLED":
409 raise LcmException("VIM={} is not available. operationalState={}".format(
410 vim_account, db_vim["_admin"]["operationalState"]))
411 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
412 return RO_vim_id
413
414 def get_ro_wim_id_for_wim_account(self, wim_account):
415 if isinstance(wim_account, str):
416 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
417 if db_wim["_admin"]["operationalState"] != "ENABLED":
418 raise LcmException("WIM={} is not available. operationalState={}".format(
419 wim_account, db_wim["_admin"]["operationalState"]))
420 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
421 return RO_wim_id
422 else:
423 return wim_account
424
425 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
426
427 db_vdu_push_list = []
428 db_update = {"_admin.modified": time()}
429 if vdu_create:
430 for vdu_id, vdu_count in vdu_create.items():
431 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
432 if not vdur:
433 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
434 format(vdu_id))
435
436 for count in range(vdu_count):
437 vdur_copy = deepcopy(vdur)
438 vdur_copy["status"] = "BUILD"
439 vdur_copy["status-detailed"] = None
440 vdur_copy["ip-address"]: None
441 vdur_copy["_id"] = str(uuid4())
442 vdur_copy["count-index"] += count + 1
443 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
444 vdur_copy.pop("vim_info", None)
445 for iface in vdur_copy["interfaces"]:
446 if iface.get("fixed-ip"):
447 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
448 else:
449 iface.pop("ip-address", None)
450 if iface.get("fixed-mac"):
451 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
452 else:
453 iface.pop("mac-address", None)
454 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
455 db_vdu_push_list.append(vdur_copy)
456 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
457 if vdu_delete:
458 for vdu_id, vdu_count in vdu_delete.items():
459 if mark_delete:
460 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
461 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
462 else:
463 # it must be deleted one by one because common.db does not allow otherwise
464 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
465 for vdu in vdus_to_delete[:vdu_count]:
466 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
467 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
468 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
469 # modify passed dictionary db_vnfr
470 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
471 db_vnfr["vdur"] = db_vnfr_["vdur"]
472
473 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
474 """
475 Updates database nsr with the RO info for the created vld
476 :param ns_update_nsr: dictionary to be filled with the updated info
477 :param db_nsr: content of db_nsr. This is also modified
478 :param nsr_desc_RO: nsr descriptor from RO
479 :return: Nothing, LcmException is raised on errors
480 """
481
482 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
483 for net_RO in get_iterable(nsr_desc_RO, "nets"):
484 if vld["id"] != net_RO.get("ns_net_osm_id"):
485 continue
486 vld["vim-id"] = net_RO.get("vim_net_id")
487 vld["name"] = net_RO.get("vim_name")
488 vld["status"] = net_RO.get("status")
489 vld["status-detailed"] = net_RO.get("error_msg")
490 ns_update_nsr["vld.{}".format(vld_index)] = vld
491 break
492 else:
493 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
494
495 def set_vnfr_at_error(self, db_vnfrs, error_text):
496 try:
497 for db_vnfr in db_vnfrs.values():
498 vnfr_update = {"status": "ERROR"}
499 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
500 if "status" not in vdur:
501 vdur["status"] = "ERROR"
502 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
503 if error_text:
504 vdur["status-detailed"] = str(error_text)
505 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
506 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
507 except DbException as e:
508 self.logger.error("Cannot update vnf. {}".format(e))
509
510 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
511 """
512 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
513 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
514 :param nsr_desc_RO: nsr descriptor from RO
515 :return: Nothing, LcmException is raised on errors
516 """
517 for vnf_index, db_vnfr in db_vnfrs.items():
518 for vnf_RO in nsr_desc_RO["vnfs"]:
519 if vnf_RO["member_vnf_index"] != vnf_index:
520 continue
521 vnfr_update = {}
522 if vnf_RO.get("ip_address"):
523 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
524 elif not db_vnfr.get("ip-address"):
525 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
526 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
527
528 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
529 vdur_RO_count_index = 0
530 if vdur.get("pdu-type"):
531 continue
532 for vdur_RO in get_iterable(vnf_RO, "vms"):
533 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
534 continue
535 if vdur["count-index"] != vdur_RO_count_index:
536 vdur_RO_count_index += 1
537 continue
538 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
539 if vdur_RO.get("ip_address"):
540 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
541 else:
542 vdur["ip-address"] = None
543 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
544 vdur["name"] = vdur_RO.get("vim_name")
545 vdur["status"] = vdur_RO.get("status")
546 vdur["status-detailed"] = vdur_RO.get("error_msg")
547 for ifacer in get_iterable(vdur, "interfaces"):
548 for interface_RO in get_iterable(vdur_RO, "interfaces"):
549 if ifacer["name"] == interface_RO.get("internal_name"):
550 ifacer["ip-address"] = interface_RO.get("ip_address")
551 ifacer["mac-address"] = interface_RO.get("mac_address")
552 break
553 else:
554 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
555 "from VIM info"
556 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
557 vnfr_update["vdur.{}".format(vdu_index)] = vdur
558 break
559 else:
560 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
561 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
562
563 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
564 for net_RO in get_iterable(nsr_desc_RO, "nets"):
565 if vld["id"] != net_RO.get("vnf_net_osm_id"):
566 continue
567 vld["vim-id"] = net_RO.get("vim_net_id")
568 vld["name"] = net_RO.get("vim_name")
569 vld["status"] = net_RO.get("status")
570 vld["status-detailed"] = net_RO.get("error_msg")
571 vnfr_update["vld.{}".format(vld_index)] = vld
572 break
573 else:
574 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
575 vnf_index, vld["id"]))
576
577 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
578 break
579
580 else:
581 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
582
583 def _get_ns_config_info(self, nsr_id):
584 """
585 Generates a mapping between vnf,vdu elements and the N2VC id
586 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
587 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
588 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
589 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
590 """
591 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
592 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
593 mapping = {}
594 ns_config_info = {"osm-config-mapping": mapping}
595 for vca in vca_deployed_list:
596 if not vca["member-vnf-index"]:
597 continue
598 if not vca["vdu_id"]:
599 mapping[vca["member-vnf-index"]] = vca["application"]
600 else:
601 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
602 vca["application"]
603 return ns_config_info
604
605 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
606 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
607
608 db_vims = {}
609
610 def get_vim_account(vim_account_id):
611 nonlocal db_vims
612 if vim_account_id in db_vims:
613 return db_vims[vim_account_id]
614 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
615 db_vims[vim_account_id] = db_vim
616 return db_vim
617
618 # modify target_vld info with instantiation parameters
619 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
620 if vld_params.get("ip-profile"):
621 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
622 if vld_params.get("provider-network"):
623 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
624 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
625 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
626 if vld_params.get("wimAccountId"):
627 target_wim = "wim:{}".format(vld_params["wimAccountId"])
628 target_vld["vim_info"][target_wim] = {}
629 for param in ("vim-network-name", "vim-network-id"):
630 if vld_params.get(param):
631 if isinstance(vld_params[param], dict):
632 for vim, vim_net in vld_params[param].items():
633 other_target_vim = "vim:" + vim
634 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
635 else: # isinstance str
636 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
637 if vld_params.get("common_id"):
638 target_vld["common_id"] = vld_params.get("common_id")
639
640 nslcmop_id = db_nslcmop["_id"]
641 target = {
642 "name": db_nsr["name"],
643 "ns": {"vld": []},
644 "vnf": [],
645 "image": deepcopy(db_nsr["image"]),
646 "flavor": deepcopy(db_nsr["flavor"]),
647 "action_id": nslcmop_id,
648 "cloud_init_content": {},
649 }
650 for image in target["image"]:
651 image["vim_info"] = {}
652 for flavor in target["flavor"]:
653 flavor["vim_info"] = {}
654
655 if db_nslcmop.get("lcmOperationType") != "instantiate":
656 # get parameters of instantiation:
657 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
658 "lcmOperationType": "instantiate"})[-1]
659 ns_params = db_nslcmop_instantiate.get("operationParams")
660 else:
661 ns_params = db_nslcmop.get("operationParams")
662 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
663 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
664
665 cp2target = {}
666 for vld_index, vld in enumerate(db_nsr.get("vld")):
667 target_vim = "vim:{}".format(ns_params["vimAccountId"])
668 target_vld = {
669 "id": vld["id"],
670 "name": vld["name"],
671 "mgmt-network": vld.get("mgmt-network", False),
672 "type": vld.get("type"),
673 "vim_info": {
674 target_vim: {
675 "vim_network_name": vld.get("vim-network-name"),
676 "vim_account_id": ns_params["vimAccountId"]
677 }
678 }
679 }
680 # check if this network needs SDN assist
681 if vld.get("pci-interfaces"):
682 db_vim = get_vim_account(ns_params["vimAccountId"])
683 sdnc_id = db_vim["config"].get("sdn-controller")
684 if sdnc_id:
685 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
686 target_sdn = "sdn:{}".format(sdnc_id)
687 target_vld["vim_info"][target_sdn] = {
688 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
689
690 nsd_vnf_profiles = get_vnf_profiles(nsd)
691 for nsd_vnf_profile in nsd_vnf_profiles:
692 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
693 if cp["virtual-link-profile-id"] == vld["id"]:
694 cp2target["member_vnf:{}.{}".format(
695 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
696 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
697 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
698
699 # check at nsd descriptor, if there is an ip-profile
700 vld_params = {}
701 virtual_link_profiles = get_virtual_link_profiles(nsd)
702
703 for vlp in virtual_link_profiles:
704 ip_profile = find_in_list(nsd["ip-profiles"],
705 lambda profile: profile["name"] == vlp["ip-profile-ref"])
706 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
707 # update vld_params with instantiation params
708 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
709 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
710 if vld_instantiation_params:
711 vld_params.update(vld_instantiation_params)
712 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
713 target["ns"]["vld"].append(target_vld)
714
715 for vnfr in db_vnfrs.values():
716 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
717 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
718 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
719 target_vnf = deepcopy(vnfr)
720 target_vim = "vim:{}".format(vnfr["vim-account-id"])
721 for vld in target_vnf.get("vld", ()):
722 # check if connected to a ns.vld, to fill target'
723 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
724 lambda cpd: cpd.get("id") == vld["id"])
725 if vnf_cp:
726 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
727 if cp2target.get(ns_cp):
728 vld["target"] = cp2target[ns_cp]
729
730 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
731 # check if this network needs SDN assist
732 target_sdn = None
733 if vld.get("pci-interfaces"):
734 db_vim = get_vim_account(vnfr["vim-account-id"])
735 sdnc_id = db_vim["config"].get("sdn-controller")
736 if sdnc_id:
737 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
738 target_sdn = "sdn:{}".format(sdnc_id)
739 vld["vim_info"][target_sdn] = {
740 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
741
742 # check at vnfd descriptor, if there is an ip-profile
743 vld_params = {}
744 vnfd_vlp = find_in_list(
745 get_virtual_link_profiles(vnfd),
746 lambda a_link_profile: a_link_profile["id"] == vld["id"]
747 )
748 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
749 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
750 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
751 ip_profile_dest_data = {}
752 if "ip-version" in ip_profile_source_data:
753 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
754 if "cidr" in ip_profile_source_data:
755 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
756 if "gateway-ip" in ip_profile_source_data:
757 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
758 if "dhcp-enabled" in ip_profile_source_data:
759 ip_profile_dest_data["dhcp-params"] = {
760 "enabled": ip_profile_source_data["dhcp-enabled"]
761 }
762
763 vld_params["ip-profile"] = ip_profile_dest_data
764 # update vld_params with instantiation params
765 if vnf_params:
766 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
767 lambda i_vld: i_vld["name"] == vld["id"])
768 if vld_instantiation_params:
769 vld_params.update(vld_instantiation_params)
770 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
771
772 vdur_list = []
773 for vdur in target_vnf.get("vdur", ()):
774 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
775 continue # This vdu must not be created
776 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
777
778 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
779
780 if ssh_keys_all:
781 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
782 vnf_configuration = get_configuration(vnfd, vnfd["id"])
783 if vdu_configuration and vdu_configuration.get("config-access") and \
784 vdu_configuration.get("config-access").get("ssh-access"):
785 vdur["ssh-keys"] = ssh_keys_all
786 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
787 elif vnf_configuration and vnf_configuration.get("config-access") and \
788 vnf_configuration.get("config-access").get("ssh-access") and \
789 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
790 vdur["ssh-keys"] = ssh_keys_all
791 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
792 elif ssh_keys_instantiation and \
793 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
794 vdur["ssh-keys"] = ssh_keys_instantiation
795
796 self.logger.debug("NS > vdur > {}".format(vdur))
797
798 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
799 # cloud-init
800 if vdud.get("cloud-init-file"):
801 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
802 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
803 if vdur["cloud-init"] not in target["cloud_init_content"]:
804 base_folder = vnfd["_admin"]["storage"]
805 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
806 vdud.get("cloud-init-file"))
807 with self.fs.file_open(cloud_init_file, "r") as ci_file:
808 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
809 elif vdud.get("cloud-init"):
810 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
811 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
812 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
813 vdur["additionalParams"] = vdur.get("additionalParams") or {}
814 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
815 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
816 vdur["additionalParams"] = deploy_params_vdu
817
818 # flavor
819 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
820 if target_vim not in ns_flavor["vim_info"]:
821 ns_flavor["vim_info"][target_vim] = {}
822
823 # deal with images
824 # in case alternative images are provided we must check if they should be applied
825 # for the vim_type, modify the vim_type taking into account
826 ns_image_id = int(vdur["ns-image-id"])
827 if vdur.get("alt-image-ids"):
828 db_vim = get_vim_account(vnfr["vim-account-id"])
829 vim_type = db_vim["vim_type"]
830 for alt_image_id in vdur.get("alt-image-ids"):
831 ns_alt_image = target["image"][int(alt_image_id)]
832 if vim_type == ns_alt_image.get("vim-type"):
833 # must use alternative image
834 self.logger.debug("use alternative image id: {}".format(alt_image_id))
835 ns_image_id = alt_image_id
836 vdur["ns-image-id"] = ns_image_id
837 break
838 ns_image = target["image"][int(ns_image_id)]
839 if target_vim not in ns_image["vim_info"]:
840 ns_image["vim_info"][target_vim] = {}
841
842 vdur["vim_info"] = {target_vim: {}}
843 # instantiation parameters
844 # if vnf_params:
845 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
846 # vdud["id"]), None)
847 vdur_list.append(vdur)
848 target_vnf["vdur"] = vdur_list
849 target["vnf"].append(target_vnf)
850
851 desc = await self.RO.deploy(nsr_id, target)
852 self.logger.debug("RO return > {}".format(desc))
853 action_id = desc["action_id"]
854 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
855
856 # Updating NSR
857 db_nsr_update = {
858 "_admin.deployed.RO.operational-status": "running",
859 "detailed-status": " ".join(stage)
860 }
861 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
862 self.update_db_2("nsrs", nsr_id, db_nsr_update)
863 self._write_op_status(nslcmop_id, stage)
864 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
865 return
866
867 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
868 detailed_status_old = None
869 db_nsr_update = {}
870 start_time = start_time or time()
871 while time() <= start_time + timeout:
872 desc_status = await self.RO.status(nsr_id, action_id)
873 self.logger.debug("Wait NG RO > {}".format(desc_status))
874 if desc_status["status"] == "FAILED":
875 raise NgRoException(desc_status["details"])
876 elif desc_status["status"] == "BUILD":
877 if stage:
878 stage[2] = "VIM: ({})".format(desc_status["details"])
879 elif desc_status["status"] == "DONE":
880 if stage:
881 stage[2] = "Deployed at VIM"
882 break
883 else:
884 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
885 if stage and nslcmop_id and stage[2] != detailed_status_old:
886 detailed_status_old = stage[2]
887 db_nsr_update["detailed-status"] = " ".join(stage)
888 self.update_db_2("nsrs", nsr_id, db_nsr_update)
889 self._write_op_status(nslcmop_id, stage)
890 await asyncio.sleep(15, loop=self.loop)
891 else: # timeout_ns_deploy
892 raise NgRoException("Timeout waiting ns to deploy")
893
894 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
895 db_nsr_update = {}
896 failed_detail = []
897 action_id = None
898 start_deploy = time()
899 try:
900 target = {
901 "ns": {"vld": []},
902 "vnf": [],
903 "image": [],
904 "flavor": [],
905 "action_id": nslcmop_id
906 }
907 desc = await self.RO.deploy(nsr_id, target)
908 action_id = desc["action_id"]
909 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
910 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
911 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
912
913 # wait until done
914 delete_timeout = 20 * 60 # 20 minutes
915 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
916
917 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
918 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
919 # delete all nsr
920 await self.RO.delete(nsr_id)
921 except Exception as e:
922 if isinstance(e, NgRoException) and e.http_code == 404: # not found
923 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
924 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
925 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
926 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
927 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
928 failed_detail.append("delete conflict: {}".format(e))
929 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
930 else:
931 failed_detail.append("delete error: {}".format(e))
932 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
933
934 if failed_detail:
935 stage[2] = "Error deleting from VIM"
936 else:
937 stage[2] = "Deleted from VIM"
938 db_nsr_update["detailed-status"] = " ".join(stage)
939 self.update_db_2("nsrs", nsr_id, db_nsr_update)
940 self._write_op_status(nslcmop_id, stage)
941
942 if failed_detail:
943 raise LcmException("; ".join(failed_detail))
944 return
945
946 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
947 n2vc_key_list, stage):
948 """
949 Instantiate at RO
950 :param logging_text: preffix text to use at logging
951 :param nsr_id: nsr identity
952 :param nsd: database content of ns descriptor
953 :param db_nsr: database content of ns record
954 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
955 :param db_vnfrs:
956 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
957 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
958 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
959 :return: None or exception
960 """
961 try:
962 start_deploy = time()
963 ns_params = db_nslcmop.get("operationParams")
964 if ns_params and ns_params.get("timeout_ns_deploy"):
965 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
966 else:
967 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
968
969 # Check for and optionally request placement optimization. Database will be updated if placement activated
970 stage[2] = "Waiting for Placement."
971 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
972 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
973 for vnfr in db_vnfrs.values():
974 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
975 break
976 else:
977 ns_params["vimAccountId"] == vnfr["vim-account-id"]
978
979 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
980 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
981 except Exception as e:
982 stage[2] = "ERROR deploying at VIM"
983 self.set_vnfr_at_error(db_vnfrs, str(e))
984 self.logger.error("Error deploying at VIM {}".format(e),
985 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
986 NgRoException)))
987 raise
988
989 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
990 """
991 Wait for kdu to be up, get ip address
992 :param logging_text: prefix use for logging
993 :param nsr_id:
994 :param vnfr_id:
995 :param kdu_name:
996 :return: IP address
997 """
998
999 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1000 nb_tries = 0
1001
1002 while nb_tries < 360:
1003 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1004 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1005 if not kdur:
1006 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1007 if kdur.get("status"):
1008 if kdur["status"] in ("READY", "ENABLED"):
1009 return kdur.get("ip-address")
1010 else:
1011 raise LcmException("target KDU={} is in error state".format(kdu_name))
1012
1013 await asyncio.sleep(10, loop=self.loop)
1014 nb_tries += 1
1015 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1016
1017 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1018 """
1019 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1020 :param logging_text: prefix use for logging
1021 :param nsr_id:
1022 :param vnfr_id:
1023 :param vdu_id:
1024 :param vdu_index:
1025 :param pub_key: public ssh key to inject, None to skip
1026 :param user: user to apply the public ssh key
1027 :return: IP address
1028 """
1029
1030 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1031 ro_nsr_id = None
1032 ip_address = None
1033 nb_tries = 0
1034 target_vdu_id = None
1035 ro_retries = 0
1036
1037 while True:
1038
1039 ro_retries += 1
1040 if ro_retries >= 360: # 1 hour
1041 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1042
1043 await asyncio.sleep(10, loop=self.loop)
1044
1045 # get ip address
1046 if not target_vdu_id:
1047 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1048
1049 if not vdu_id: # for the VNF case
1050 if db_vnfr.get("status") == "ERROR":
1051 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1052 ip_address = db_vnfr.get("ip-address")
1053 if not ip_address:
1054 continue
1055 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1056 else: # VDU case
1057 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1058 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1059
1060 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1061 vdur = db_vnfr["vdur"][0]
1062 if not vdur:
1063 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1064 vdu_index))
1065 # New generation RO stores information at "vim_info"
1066 ng_ro_status = None
1067 target_vim = None
1068 if vdur.get("vim_info"):
1069 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1070 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1071 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1072 ip_address = vdur.get("ip-address")
1073 if not ip_address:
1074 continue
1075 target_vdu_id = vdur["vdu-id-ref"]
1076 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1077 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1078
1079 if not target_vdu_id:
1080 continue
1081
1082 # inject public key into machine
1083 if pub_key and user:
1084 self.logger.debug(logging_text + "Inserting RO key")
1085 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1086 if vdur.get("pdu-type"):
1087 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1088 return ip_address
1089 try:
1090 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1091 if self.ng_ro:
1092 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1093 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1094 }
1095 desc = await self.RO.deploy(nsr_id, target)
1096 action_id = desc["action_id"]
1097 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1098 break
1099 else:
1100 # wait until NS is deployed at RO
1101 if not ro_nsr_id:
1102 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1103 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1104 if not ro_nsr_id:
1105 continue
1106 result_dict = await self.RO.create_action(
1107 item="ns",
1108 item_id_name=ro_nsr_id,
1109 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1110 )
1111 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1112 if not result_dict or not isinstance(result_dict, dict):
1113 raise LcmException("Unknown response from RO when injecting key")
1114 for result in result_dict.values():
1115 if result.get("vim_result") == 200:
1116 break
1117 else:
1118 raise ROclient.ROClientException("error injecting key: {}".format(
1119 result.get("description")))
1120 break
1121 except NgRoException as e:
1122 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1123 except ROclient.ROClientException as e:
1124 if not nb_tries:
1125 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1126 format(e, 20*10))
1127 nb_tries += 1
1128 if nb_tries >= 20:
1129 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1130 else:
1131 break
1132
1133 return ip_address
1134
1135 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1136 """
1137 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1138 """
1139 my_vca = vca_deployed_list[vca_index]
1140 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1141 # vdu or kdu: no dependencies
1142 return
1143 timeout = 300
1144 while timeout >= 0:
1145 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1146 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1147 configuration_status_list = db_nsr["configurationStatus"]
1148 for index, vca_deployed in enumerate(configuration_status_list):
1149 if index == vca_index:
1150 # myself
1151 continue
1152 if not my_vca.get("member-vnf-index") or \
1153 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1154 internal_status = configuration_status_list[index].get("status")
1155 if internal_status == 'READY':
1156 continue
1157 elif internal_status == 'BROKEN':
1158 raise LcmException("Configuration aborted because dependent charm/s has failed")
1159 else:
1160 break
1161 else:
1162 # no dependencies, return
1163 return
1164 await asyncio.sleep(10)
1165 timeout -= 1
1166
1167 raise LcmException("Configuration aborted because dependent charm/s timeout")
1168
1169 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1170 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1171 ee_config_descriptor):
1172 nsr_id = db_nsr["_id"]
1173 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1174 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1175 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1176 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1177 db_dict = {
1178 'collection': 'nsrs',
1179 'filter': {'_id': nsr_id},
1180 'path': db_update_entry
1181 }
1182 step = ""
1183 try:
1184
1185 element_type = 'NS'
1186 element_under_configuration = nsr_id
1187
1188 vnfr_id = None
1189 if db_vnfr:
1190 vnfr_id = db_vnfr["_id"]
1191 osm_config["osm"]["vnf_id"] = vnfr_id
1192
1193 namespace = "{nsi}.{ns}".format(
1194 nsi=nsi_id if nsi_id else "",
1195 ns=nsr_id)
1196
1197 if vnfr_id:
1198 element_type = 'VNF'
1199 element_under_configuration = vnfr_id
1200 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1201 if vdu_id:
1202 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1203 element_type = 'VDU'
1204 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1205 osm_config["osm"]["vdu_id"] = vdu_id
1206 elif kdu_name:
1207 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1208 element_type = 'KDU'
1209 element_under_configuration = kdu_name
1210 osm_config["osm"]["kdu_name"] = kdu_name
1211
1212 # Get artifact path
1213 artifact_path = "{}/{}/{}/{}".format(
1214 base_folder["folder"],
1215 base_folder["pkg-dir"],
1216 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1217 vca_name
1218 )
1219
1220 self.logger.debug("Artifact path > {}".format(artifact_path))
1221
1222 # get initial_config_primitive_list that applies to this element
1223 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1224
1225 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1226
1227 # add config if not present for NS charm
1228 ee_descriptor_id = ee_config_descriptor.get("id")
1229 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1230 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1231 vca_deployed, ee_descriptor_id)
1232
1233 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1234 # n2vc_redesign STEP 3.1
1235 # find old ee_id if exists
1236 ee_id = vca_deployed.get("ee_id")
1237
1238 vim_account_id = (
1239 deep_get(db_vnfr, ("vim-account-id",)) or
1240 deep_get(deploy_params, ("OSM", "vim_account_id"))
1241 )
1242 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1243 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1244 # create or register execution environment in VCA
1245 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1246
1247 self._write_configuration_status(
1248 nsr_id=nsr_id,
1249 vca_index=vca_index,
1250 status='CREATING',
1251 element_under_configuration=element_under_configuration,
1252 element_type=element_type
1253 )
1254
1255 step = "create execution environment"
1256 self.logger.debug(logging_text + step)
1257
1258 ee_id = None
1259 credentials = None
1260 if vca_type == "k8s_proxy_charm":
1261 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1262 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1263 namespace=namespace,
1264 artifact_path=artifact_path,
1265 db_dict=db_dict,
1266 cloud_name=vca_k8s_cloud,
1267 credential_name=vca_k8s_cloud_credential,
1268 )
1269 elif vca_type == "helm" or vca_type == "helm-v3":
1270 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1271 namespace=namespace,
1272 reuse_ee_id=ee_id,
1273 db_dict=db_dict,
1274 config=osm_config,
1275 artifact_path=artifact_path,
1276 vca_type=vca_type
1277 )
1278 else:
1279 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1280 namespace=namespace,
1281 reuse_ee_id=ee_id,
1282 db_dict=db_dict,
1283 cloud_name=vca_cloud,
1284 credential_name=vca_cloud_credential,
1285 )
1286
1287 elif vca_type == "native_charm":
1288 step = "Waiting to VM being up and getting IP address"
1289 self.logger.debug(logging_text + step)
1290 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1291 user=None, pub_key=None)
1292 credentials = {"hostname": rw_mgmt_ip}
1293 # get username
1294 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1295 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1296 # merged. Meanwhile let's get username from initial-config-primitive
1297 if not username and initial_config_primitive_list:
1298 for config_primitive in initial_config_primitive_list:
1299 for param in config_primitive.get("parameter", ()):
1300 if param["name"] == "ssh-username":
1301 username = param["value"]
1302 break
1303 if not username:
1304 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1305 "'config-access.ssh-access.default-user'")
1306 credentials["username"] = username
1307 # n2vc_redesign STEP 3.2
1308
1309 self._write_configuration_status(
1310 nsr_id=nsr_id,
1311 vca_index=vca_index,
1312 status='REGISTERING',
1313 element_under_configuration=element_under_configuration,
1314 element_type=element_type
1315 )
1316
1317 step = "register execution environment {}".format(credentials)
1318 self.logger.debug(logging_text + step)
1319 ee_id = await self.vca_map[vca_type].register_execution_environment(
1320 credentials=credentials,
1321 namespace=namespace,
1322 db_dict=db_dict,
1323 cloud_name=vca_cloud,
1324 credential_name=vca_cloud_credential,
1325 )
1326
1327 # for compatibility with MON/POL modules, the need model and application name at database
1328 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1329 ee_id_parts = ee_id.split('.')
1330 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1331 if len(ee_id_parts) >= 2:
1332 model_name = ee_id_parts[0]
1333 application_name = ee_id_parts[1]
1334 db_nsr_update[db_update_entry + "model"] = model_name
1335 db_nsr_update[db_update_entry + "application"] = application_name
1336
1337 # n2vc_redesign STEP 3.3
1338 step = "Install configuration Software"
1339
1340 self._write_configuration_status(
1341 nsr_id=nsr_id,
1342 vca_index=vca_index,
1343 status='INSTALLING SW',
1344 element_under_configuration=element_under_configuration,
1345 element_type=element_type,
1346 other_update=db_nsr_update
1347 )
1348
1349 # TODO check if already done
1350 self.logger.debug(logging_text + step)
1351 config = None
1352 if vca_type == "native_charm":
1353 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1354 if config_primitive:
1355 config = self._map_primitive_params(
1356 config_primitive,
1357 {},
1358 deploy_params
1359 )
1360 num_units = 1
1361 if vca_type == "lxc_proxy_charm":
1362 if element_type == "NS":
1363 num_units = db_nsr.get("config-units") or 1
1364 elif element_type == "VNF":
1365 num_units = db_vnfr.get("config-units") or 1
1366 elif element_type == "VDU":
1367 for v in db_vnfr["vdur"]:
1368 if vdu_id == v["vdu-id-ref"]:
1369 num_units = v.get("config-units") or 1
1370 break
1371 if vca_type != "k8s_proxy_charm":
1372 await self.vca_map[vca_type].install_configuration_sw(
1373 ee_id=ee_id,
1374 artifact_path=artifact_path,
1375 db_dict=db_dict,
1376 config=config,
1377 num_units=num_units,
1378 )
1379
1380 # write in db flag of configuration_sw already installed
1381 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1382
1383 # add relations for this VCA (wait for other peers related with this VCA)
1384 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1385 vca_index=vca_index, vca_type=vca_type)
1386
1387 # if SSH access is required, then get execution environment SSH public
1388 # if native charm we have waited already to VM be UP
1389 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1390 pub_key = None
1391 user = None
1392 # self.logger.debug("get ssh key block")
1393 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1394 # self.logger.debug("ssh key needed")
1395 # Needed to inject a ssh key
1396 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1397 step = "Install configuration Software, getting public ssh key"
1398 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1399
1400 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1401 else:
1402 # self.logger.debug("no need to get ssh key")
1403 step = "Waiting to VM being up and getting IP address"
1404 self.logger.debug(logging_text + step)
1405
1406 # n2vc_redesign STEP 5.1
1407 # wait for RO (ip-address) Insert pub_key into VM
1408 if vnfr_id:
1409 if kdu_name:
1410 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1411 else:
1412 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1413 vdu_index, user=user, pub_key=pub_key)
1414 else:
1415 rw_mgmt_ip = None # This is for a NS configuration
1416
1417 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1418
1419 # store rw_mgmt_ip in deploy params for later replacement
1420 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1421
1422 # n2vc_redesign STEP 6 Execute initial config primitive
1423 step = 'execute initial config primitive'
1424
1425 # wait for dependent primitives execution (NS -> VNF -> VDU)
1426 if initial_config_primitive_list:
1427 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1428
1429 # stage, in function of element type: vdu, kdu, vnf or ns
1430 my_vca = vca_deployed_list[vca_index]
1431 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1432 # VDU or KDU
1433 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1434 elif my_vca.get("member-vnf-index"):
1435 # VNF
1436 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1437 else:
1438 # NS
1439 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1440
1441 self._write_configuration_status(
1442 nsr_id=nsr_id,
1443 vca_index=vca_index,
1444 status='EXECUTING PRIMITIVE'
1445 )
1446
1447 self._write_op_status(
1448 op_id=nslcmop_id,
1449 stage=stage
1450 )
1451
1452 check_if_terminated_needed = True
1453 for initial_config_primitive in initial_config_primitive_list:
1454 # adding information on the vca_deployed if it is a NS execution environment
1455 if not vca_deployed["member-vnf-index"]:
1456 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1457 # TODO check if already done
1458 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1459
1460 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1461 self.logger.debug(logging_text + step)
1462 await self.vca_map[vca_type].exec_primitive(
1463 ee_id=ee_id,
1464 primitive_name=initial_config_primitive["name"],
1465 params_dict=primitive_params_,
1466 db_dict=db_dict
1467 )
1468 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1469 if check_if_terminated_needed:
1470 if config_descriptor.get('terminate-config-primitive'):
1471 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1472 check_if_terminated_needed = False
1473
1474 # TODO register in database that primitive is done
1475
1476 # STEP 7 Configure metrics
1477 if vca_type == "helm" or vca_type == "helm-v3":
1478 prometheus_jobs = await self.add_prometheus_metrics(
1479 ee_id=ee_id,
1480 artifact_path=artifact_path,
1481 ee_config_descriptor=ee_config_descriptor,
1482 vnfr_id=vnfr_id,
1483 nsr_id=nsr_id,
1484 target_ip=rw_mgmt_ip,
1485 )
1486 if prometheus_jobs:
1487 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1488
1489 step = "instantiated at VCA"
1490 self.logger.debug(logging_text + step)
1491
1492 self._write_configuration_status(
1493 nsr_id=nsr_id,
1494 vca_index=vca_index,
1495 status='READY'
1496 )
1497
1498 except Exception as e: # TODO not use Exception but N2VC exception
1499 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1500 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1501 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1502 self._write_configuration_status(
1503 nsr_id=nsr_id,
1504 vca_index=vca_index,
1505 status='BROKEN'
1506 )
1507 raise LcmException("{} {}".format(step, e)) from e
1508
1509 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1510 error_description: str = None, error_detail: str = None, other_update: dict = None):
1511 """
1512 Update db_nsr fields.
1513 :param nsr_id:
1514 :param ns_state:
1515 :param current_operation:
1516 :param current_operation_id:
1517 :param error_description:
1518 :param error_detail:
1519 :param other_update: Other required changes at database if provided, will be cleared
1520 :return:
1521 """
1522 try:
1523 db_dict = other_update or {}
1524 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1525 db_dict["_admin.current-operation"] = current_operation_id
1526 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1527 db_dict["currentOperation"] = current_operation
1528 db_dict["currentOperationID"] = current_operation_id
1529 db_dict["errorDescription"] = error_description
1530 db_dict["errorDetail"] = error_detail
1531
1532 if ns_state:
1533 db_dict["nsState"] = ns_state
1534 self.update_db_2("nsrs", nsr_id, db_dict)
1535 except DbException as e:
1536 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1537
1538 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1539 operation_state: str = None, other_update: dict = None):
1540 try:
1541 db_dict = other_update or {}
1542 db_dict['queuePosition'] = queuePosition
1543 if isinstance(stage, list):
1544 db_dict['stage'] = stage[0]
1545 db_dict['detailed-status'] = " ".join(stage)
1546 elif stage is not None:
1547 db_dict['stage'] = str(stage)
1548
1549 if error_message is not None:
1550 db_dict['errorMessage'] = error_message
1551 if operation_state is not None:
1552 db_dict['operationState'] = operation_state
1553 db_dict["statusEnteredTime"] = time()
1554 self.update_db_2("nslcmops", op_id, db_dict)
1555 except DbException as e:
1556 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1557
1558 def _write_all_config_status(self, db_nsr: dict, status: str):
1559 try:
1560 nsr_id = db_nsr["_id"]
1561 # configurationStatus
1562 config_status = db_nsr.get('configurationStatus')
1563 if config_status:
1564 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1565 enumerate(config_status) if v}
1566 # update status
1567 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1568
1569 except DbException as e:
1570 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1571
1572 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1573 element_under_configuration: str = None, element_type: str = None,
1574 other_update: dict = None):
1575
1576 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1577 # .format(vca_index, status))
1578
1579 try:
1580 db_path = 'configurationStatus.{}.'.format(vca_index)
1581 db_dict = other_update or {}
1582 if status:
1583 db_dict[db_path + 'status'] = status
1584 if element_under_configuration:
1585 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1586 if element_type:
1587 db_dict[db_path + 'elementType'] = element_type
1588 self.update_db_2("nsrs", nsr_id, db_dict)
1589 except DbException as e:
1590 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1591 .format(status, nsr_id, vca_index, e))
1592
1593 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1594 """
1595 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1596 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1597 Database is used because the result can be obtained from a different LCM worker in case of HA.
1598 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1599 :param db_nslcmop: database content of nslcmop
1600 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1601 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1602 computed 'vim-account-id'
1603 """
1604 modified = False
1605 nslcmop_id = db_nslcmop['_id']
1606 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1607 if placement_engine == "PLA":
1608 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1609 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1610 db_poll_interval = 5
1611 wait = db_poll_interval * 10
1612 pla_result = None
1613 while not pla_result and wait >= 0:
1614 await asyncio.sleep(db_poll_interval)
1615 wait -= db_poll_interval
1616 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1617 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1618
1619 if not pla_result:
1620 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1621
1622 for pla_vnf in pla_result['vnf']:
1623 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1624 if not pla_vnf.get('vimAccountId') or not vnfr:
1625 continue
1626 modified = True
1627 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1628 # Modifies db_vnfrs
1629 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1630 return modified
1631
1632 def update_nsrs_with_pla_result(self, params):
1633 try:
1634 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1635 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1636 except Exception as e:
1637 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1638
1639 async def instantiate(self, nsr_id, nslcmop_id):
1640 """
1641
1642 :param nsr_id: ns instance to deploy
1643 :param nslcmop_id: operation to run
1644 :return:
1645 """
1646
1647 # Try to lock HA task here
1648 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1649 if not task_is_locked_by_me:
1650 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1651 return
1652
1653 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1654 self.logger.debug(logging_text + "Enter")
1655
1656 # get all needed from database
1657
1658 # database nsrs record
1659 db_nsr = None
1660
1661 # database nslcmops record
1662 db_nslcmop = None
1663
1664 # update operation on nsrs
1665 db_nsr_update = {}
1666 # update operation on nslcmops
1667 db_nslcmop_update = {}
1668
1669 nslcmop_operation_state = None
1670 db_vnfrs = {} # vnf's info indexed by member-index
1671 # n2vc_info = {}
1672 tasks_dict_info = {} # from task to info text
1673 exc = None
1674 error_list = []
1675 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1676 # ^ stage, step, VIM progress
1677 try:
1678 # wait for any previous tasks in process
1679 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1680
1681 stage[1] = "Sync filesystem from database."
1682 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1683
1684 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1685 stage[1] = "Reading from database."
1686 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1687 db_nsr_update["detailed-status"] = "creating"
1688 db_nsr_update["operational-status"] = "init"
1689 self._write_ns_status(
1690 nsr_id=nsr_id,
1691 ns_state="BUILDING",
1692 current_operation="INSTANTIATING",
1693 current_operation_id=nslcmop_id,
1694 other_update=db_nsr_update
1695 )
1696 self._write_op_status(
1697 op_id=nslcmop_id,
1698 stage=stage,
1699 queuePosition=0
1700 )
1701
1702 # read from db: operation
1703 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1704 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1705 ns_params = db_nslcmop.get("operationParams")
1706 if ns_params and ns_params.get("timeout_ns_deploy"):
1707 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1708 else:
1709 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1710
1711 # read from db: ns
1712 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1713 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1714 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1715 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1716 db_nsr["nsd"] = nsd
1717 # nsr_name = db_nsr["name"] # TODO short-name??
1718
1719 # read from db: vnf's of this ns
1720 stage[1] = "Getting vnfrs from db."
1721 self.logger.debug(logging_text + stage[1])
1722 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1723
1724 # read from db: vnfd's for every vnf
1725 db_vnfds = [] # every vnfd data
1726
1727 # for each vnf in ns, read vnfd
1728 for vnfr in db_vnfrs_list:
1729 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1730 vnfd_id = vnfr["vnfd-id"]
1731 vnfd_ref = vnfr["vnfd-ref"]
1732
1733 # if we haven't this vnfd, read it from db
1734 if vnfd_id not in db_vnfds:
1735 # read from db
1736 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1737 self.logger.debug(logging_text + stage[1])
1738 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1739
1740 # store vnfd
1741 db_vnfds.append(vnfd)
1742
1743 # Get or generates the _admin.deployed.VCA list
1744 vca_deployed_list = None
1745 if db_nsr["_admin"].get("deployed"):
1746 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1747 if vca_deployed_list is None:
1748 vca_deployed_list = []
1749 configuration_status_list = []
1750 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1751 db_nsr_update["configurationStatus"] = configuration_status_list
1752 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1753 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1754 elif isinstance(vca_deployed_list, dict):
1755 # maintain backward compatibility. Change a dict to list at database
1756 vca_deployed_list = list(vca_deployed_list.values())
1757 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1758 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1759
1760 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1761 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1762 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1763
1764 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1765 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1766 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1767 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1768
1769 # n2vc_redesign STEP 2 Deploy Network Scenario
1770 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1771 self._write_op_status(
1772 op_id=nslcmop_id,
1773 stage=stage
1774 )
1775
1776 stage[1] = "Deploying KDUs."
1777 # self.logger.debug(logging_text + "Before deploy_kdus")
1778 # Call to deploy_kdus in case exists the "vdu:kdu" param
1779 await self.deploy_kdus(
1780 logging_text=logging_text,
1781 nsr_id=nsr_id,
1782 nslcmop_id=nslcmop_id,
1783 db_vnfrs=db_vnfrs,
1784 db_vnfds=db_vnfds,
1785 task_instantiation_info=tasks_dict_info,
1786 )
1787
1788 stage[1] = "Getting VCA public key."
1789 # n2vc_redesign STEP 1 Get VCA public ssh-key
1790 # feature 1429. Add n2vc public key to needed VMs
1791 n2vc_key = self.n2vc.get_public_key()
1792 n2vc_key_list = [n2vc_key]
1793 if self.vca_config.get("public_key"):
1794 n2vc_key_list.append(self.vca_config["public_key"])
1795
1796 stage[1] = "Deploying NS at VIM."
1797 task_ro = asyncio.ensure_future(
1798 self.instantiate_RO(
1799 logging_text=logging_text,
1800 nsr_id=nsr_id,
1801 nsd=nsd,
1802 db_nsr=db_nsr,
1803 db_nslcmop=db_nslcmop,
1804 db_vnfrs=db_vnfrs,
1805 db_vnfds=db_vnfds,
1806 n2vc_key_list=n2vc_key_list,
1807 stage=stage
1808 )
1809 )
1810 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1811 tasks_dict_info[task_ro] = "Deploying at VIM"
1812
1813 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1814 stage[1] = "Deploying Execution Environments."
1815 self.logger.debug(logging_text + stage[1])
1816
1817 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1818 for vnf_profile in get_vnf_profiles(nsd):
1819 vnfd_id = vnf_profile["vnfd-id"]
1820 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1821 member_vnf_index = str(vnf_profile["id"])
1822 db_vnfr = db_vnfrs[member_vnf_index]
1823 base_folder = vnfd["_admin"]["storage"]
1824 vdu_id = None
1825 vdu_index = 0
1826 vdu_name = None
1827 kdu_name = None
1828
1829 # Get additional parameters
1830 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1831 if db_vnfr.get("additionalParamsForVnf"):
1832 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1833
1834 descriptor_config = get_configuration(vnfd, vnfd["id"])
1835 if descriptor_config:
1836 self._deploy_n2vc(
1837 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1838 db_nsr=db_nsr,
1839 db_vnfr=db_vnfr,
1840 nslcmop_id=nslcmop_id,
1841 nsr_id=nsr_id,
1842 nsi_id=nsi_id,
1843 vnfd_id=vnfd_id,
1844 vdu_id=vdu_id,
1845 kdu_name=kdu_name,
1846 member_vnf_index=member_vnf_index,
1847 vdu_index=vdu_index,
1848 vdu_name=vdu_name,
1849 deploy_params=deploy_params,
1850 descriptor_config=descriptor_config,
1851 base_folder=base_folder,
1852 task_instantiation_info=tasks_dict_info,
1853 stage=stage
1854 )
1855
1856 # Deploy charms for each VDU that supports one.
1857 for vdud in get_vdu_list(vnfd):
1858 vdu_id = vdud["id"]
1859 descriptor_config = get_configuration(vnfd, vdu_id)
1860 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1861
1862 if vdur.get("additionalParams"):
1863 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1864 else:
1865 deploy_params_vdu = deploy_params
1866 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1867 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1868
1869 self.logger.debug("VDUD > {}".format(vdud))
1870 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1871 if descriptor_config:
1872 vdu_name = None
1873 kdu_name = None
1874 for vdu_index in range(vdud_count):
1875 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1876 self._deploy_n2vc(
1877 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1878 member_vnf_index, vdu_id, vdu_index),
1879 db_nsr=db_nsr,
1880 db_vnfr=db_vnfr,
1881 nslcmop_id=nslcmop_id,
1882 nsr_id=nsr_id,
1883 nsi_id=nsi_id,
1884 vnfd_id=vnfd_id,
1885 vdu_id=vdu_id,
1886 kdu_name=kdu_name,
1887 member_vnf_index=member_vnf_index,
1888 vdu_index=vdu_index,
1889 vdu_name=vdu_name,
1890 deploy_params=deploy_params_vdu,
1891 descriptor_config=descriptor_config,
1892 base_folder=base_folder,
1893 task_instantiation_info=tasks_dict_info,
1894 stage=stage
1895 )
1896 for kdud in get_kdu_list(vnfd):
1897 kdu_name = kdud["name"]
1898 descriptor_config = get_configuration(vnfd, kdu_name)
1899 if descriptor_config:
1900 vdu_id = None
1901 vdu_index = 0
1902 vdu_name = None
1903 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1904 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1905 if kdur.get("additionalParams"):
1906 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1907
1908 self._deploy_n2vc(
1909 logging_text=logging_text,
1910 db_nsr=db_nsr,
1911 db_vnfr=db_vnfr,
1912 nslcmop_id=nslcmop_id,
1913 nsr_id=nsr_id,
1914 nsi_id=nsi_id,
1915 vnfd_id=vnfd_id,
1916 vdu_id=vdu_id,
1917 kdu_name=kdu_name,
1918 member_vnf_index=member_vnf_index,
1919 vdu_index=vdu_index,
1920 vdu_name=vdu_name,
1921 deploy_params=deploy_params_kdu,
1922 descriptor_config=descriptor_config,
1923 base_folder=base_folder,
1924 task_instantiation_info=tasks_dict_info,
1925 stage=stage
1926 )
1927
1928 # Check if this NS has a charm configuration
1929 descriptor_config = nsd.get("ns-configuration")
1930 if descriptor_config and descriptor_config.get("juju"):
1931 vnfd_id = None
1932 db_vnfr = None
1933 member_vnf_index = None
1934 vdu_id = None
1935 kdu_name = None
1936 vdu_index = 0
1937 vdu_name = None
1938
1939 # Get additional parameters
1940 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1941 if db_nsr.get("additionalParamsForNs"):
1942 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1943 base_folder = nsd["_admin"]["storage"]
1944 self._deploy_n2vc(
1945 logging_text=logging_text,
1946 db_nsr=db_nsr,
1947 db_vnfr=db_vnfr,
1948 nslcmop_id=nslcmop_id,
1949 nsr_id=nsr_id,
1950 nsi_id=nsi_id,
1951 vnfd_id=vnfd_id,
1952 vdu_id=vdu_id,
1953 kdu_name=kdu_name,
1954 member_vnf_index=member_vnf_index,
1955 vdu_index=vdu_index,
1956 vdu_name=vdu_name,
1957 deploy_params=deploy_params,
1958 descriptor_config=descriptor_config,
1959 base_folder=base_folder,
1960 task_instantiation_info=tasks_dict_info,
1961 stage=stage
1962 )
1963
1964 # rest of staff will be done at finally
1965
1966 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1967 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1968 exc = e
1969 except asyncio.CancelledError:
1970 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1971 exc = "Operation was cancelled"
1972 except Exception as e:
1973 exc = traceback.format_exc()
1974 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1975 finally:
1976 if exc:
1977 error_list.append(str(exc))
1978 try:
1979 # wait for pending tasks
1980 if tasks_dict_info:
1981 stage[1] = "Waiting for instantiate pending tasks."
1982 self.logger.debug(logging_text + stage[1])
1983 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1984 stage, nslcmop_id, nsr_id=nsr_id)
1985 stage[1] = stage[2] = ""
1986 except asyncio.CancelledError:
1987 error_list.append("Cancelled")
1988 # TODO cancel all tasks
1989 except Exception as exc:
1990 error_list.append(str(exc))
1991
1992 # update operation-status
1993 db_nsr_update["operational-status"] = "running"
1994 # let's begin with VCA 'configured' status (later we can change it)
1995 db_nsr_update["config-status"] = "configured"
1996 for task, task_name in tasks_dict_info.items():
1997 if not task.done() or task.cancelled() or task.exception():
1998 if task_name.startswith(self.task_name_deploy_vca):
1999 # A N2VC task is pending
2000 db_nsr_update["config-status"] = "failed"
2001 else:
2002 # RO or KDU task is pending
2003 db_nsr_update["operational-status"] = "failed"
2004
2005 # update status at database
2006 if error_list:
2007 error_detail = ". ".join(error_list)
2008 self.logger.error(logging_text + error_detail)
2009 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2010 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2011
2012 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2013 db_nslcmop_update["detailed-status"] = error_detail
2014 nslcmop_operation_state = "FAILED"
2015 ns_state = "BROKEN"
2016 else:
2017 error_detail = None
2018 error_description_nsr = error_description_nslcmop = None
2019 ns_state = "READY"
2020 db_nsr_update["detailed-status"] = "Done"
2021 db_nslcmop_update["detailed-status"] = "Done"
2022 nslcmop_operation_state = "COMPLETED"
2023
2024 if db_nsr:
2025 self._write_ns_status(
2026 nsr_id=nsr_id,
2027 ns_state=ns_state,
2028 current_operation="IDLE",
2029 current_operation_id=None,
2030 error_description=error_description_nsr,
2031 error_detail=error_detail,
2032 other_update=db_nsr_update
2033 )
2034 self._write_op_status(
2035 op_id=nslcmop_id,
2036 stage="",
2037 error_message=error_description_nslcmop,
2038 operation_state=nslcmop_operation_state,
2039 other_update=db_nslcmop_update,
2040 )
2041
2042 if nslcmop_operation_state:
2043 try:
2044 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2045 "operationState": nslcmop_operation_state},
2046 loop=self.loop)
2047 except Exception as e:
2048 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2049
2050 self.logger.debug(logging_text + "Exit")
2051 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2052
2053 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2054 timeout: int = 3600, vca_type: str = None) -> bool:
2055
2056 # steps:
2057 # 1. find all relations for this VCA
2058 # 2. wait for other peers related
2059 # 3. add relations
2060
2061 try:
2062 vca_type = vca_type or "lxc_proxy_charm"
2063
2064 # STEP 1: find all relations for this VCA
2065
2066 # read nsr record
2067 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2068 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2069
2070 # this VCA data
2071 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2072
2073 # read all ns-configuration relations
2074 ns_relations = list()
2075 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2076 if db_ns_relations:
2077 for r in db_ns_relations:
2078 # check if this VCA is in the relation
2079 if my_vca.get('member-vnf-index') in\
2080 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2081 ns_relations.append(r)
2082
2083 # read all vnf-configuration relations
2084 vnf_relations = list()
2085 db_vnfd_list = db_nsr.get('vnfd-id')
2086 if db_vnfd_list:
2087 for vnfd in db_vnfd_list:
2088 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2089 db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", [])
2090 if db_vnf_relations:
2091 for r in db_vnf_relations:
2092 # check if this VCA is in the relation
2093 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2094 vnf_relations.append(r)
2095
2096 # if no relations, terminate
2097 if not ns_relations and not vnf_relations:
2098 self.logger.debug(logging_text + ' No relations')
2099 return True
2100
2101 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2102
2103 # add all relations
2104 start = time()
2105 while True:
2106 # check timeout
2107 now = time()
2108 if now - start >= timeout:
2109 self.logger.error(logging_text + ' : timeout adding relations')
2110 return False
2111
2112 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2113 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2114
2115 # for each defined NS relation, find the VCA's related
2116 for r in ns_relations.copy():
2117 from_vca_ee_id = None
2118 to_vca_ee_id = None
2119 from_vca_endpoint = None
2120 to_vca_endpoint = None
2121 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2122 for vca in vca_list:
2123 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2124 and vca.get('config_sw_installed'):
2125 from_vca_ee_id = vca.get('ee_id')
2126 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2127 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2128 and vca.get('config_sw_installed'):
2129 to_vca_ee_id = vca.get('ee_id')
2130 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2131 if from_vca_ee_id and to_vca_ee_id:
2132 # add relation
2133 await self.vca_map[vca_type].add_relation(
2134 ee_id_1=from_vca_ee_id,
2135 ee_id_2=to_vca_ee_id,
2136 endpoint_1=from_vca_endpoint,
2137 endpoint_2=to_vca_endpoint)
2138 # remove entry from relations list
2139 ns_relations.remove(r)
2140 else:
2141 # check failed peers
2142 try:
2143 vca_status_list = db_nsr.get('configurationStatus')
2144 if vca_status_list:
2145 for i in range(len(vca_list)):
2146 vca = vca_list[i]
2147 vca_status = vca_status_list[i]
2148 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2149 if vca_status.get('status') == 'BROKEN':
2150 # peer broken: remove relation from list
2151 ns_relations.remove(r)
2152 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2153 if vca_status.get('status') == 'BROKEN':
2154 # peer broken: remove relation from list
2155 ns_relations.remove(r)
2156 except Exception:
2157 # ignore
2158 pass
2159
2160 # for each defined VNF relation, find the VCA's related
2161 for r in vnf_relations.copy():
2162 from_vca_ee_id = None
2163 to_vca_ee_id = None
2164 from_vca_endpoint = None
2165 to_vca_endpoint = None
2166 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2167 for vca in vca_list:
2168 key_to_check = "vdu_id"
2169 if vca.get("vdu_id") is None:
2170 key_to_check = "vnfd_id"
2171 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2172 from_vca_ee_id = vca.get('ee_id')
2173 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2174 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2175 to_vca_ee_id = vca.get('ee_id')
2176 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2177 if from_vca_ee_id and to_vca_ee_id:
2178 # add relation
2179 await self.vca_map[vca_type].add_relation(
2180 ee_id_1=from_vca_ee_id,
2181 ee_id_2=to_vca_ee_id,
2182 endpoint_1=from_vca_endpoint,
2183 endpoint_2=to_vca_endpoint)
2184 # remove entry from relations list
2185 vnf_relations.remove(r)
2186 else:
2187 # check failed peers
2188 try:
2189 vca_status_list = db_nsr.get('configurationStatus')
2190 if vca_status_list:
2191 for i in range(len(vca_list)):
2192 vca = vca_list[i]
2193 vca_status = vca_status_list[i]
2194 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2195 if vca_status.get('status') == 'BROKEN':
2196 # peer broken: remove relation from list
2197 vnf_relations.remove(r)
2198 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2199 if vca_status.get('status') == 'BROKEN':
2200 # peer broken: remove relation from list
2201 vnf_relations.remove(r)
2202 except Exception:
2203 # ignore
2204 pass
2205
2206 # wait for next try
2207 await asyncio.sleep(5.0)
2208
2209 if not ns_relations and not vnf_relations:
2210 self.logger.debug('Relations added')
2211 break
2212
2213 return True
2214
2215 except Exception as e:
2216 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2217 return False
2218
2219 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2220 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2221
2222 try:
2223 k8sclustertype = k8s_instance_info["k8scluster-type"]
2224 # Instantiate kdu
2225 db_dict_install = {"collection": "nsrs",
2226 "filter": {"_id": nsr_id},
2227 "path": nsr_db_path}
2228
2229 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2230 db_dict=db_dict_install,
2231 kdu_model=k8s_instance_info["kdu-model"],
2232 kdu_name=k8s_instance_info["kdu-name"],
2233 )
2234 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2235 await self.k8scluster_map[k8sclustertype].install(
2236 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2237 kdu_model=k8s_instance_info["kdu-model"],
2238 atomic=True,
2239 params=k8params,
2240 db_dict=db_dict_install,
2241 timeout=timeout,
2242 kdu_name=k8s_instance_info["kdu-name"],
2243 namespace=k8s_instance_info["namespace"],
2244 kdu_instance=kdu_instance,
2245 )
2246 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2247
2248 # Obtain services to obtain management service ip
2249 services = await self.k8scluster_map[k8sclustertype].get_services(
2250 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2251 kdu_instance=kdu_instance,
2252 namespace=k8s_instance_info["namespace"])
2253
2254 # Obtain management service info (if exists)
2255 vnfr_update_dict = {}
2256 if services:
2257 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2258 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2259 for mgmt_service in mgmt_services:
2260 for service in services:
2261 if service["name"].startswith(mgmt_service["name"]):
2262 # Mgmt service found, Obtain service ip
2263 ip = service.get("external_ip", service.get("cluster_ip"))
2264 if isinstance(ip, list) and len(ip) == 1:
2265 ip = ip[0]
2266
2267 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2268
2269 # Check if must update also mgmt ip at the vnf
2270 service_external_cp = mgmt_service.get("external-connection-point-ref")
2271 if service_external_cp:
2272 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2273 vnfr_update_dict["ip-address"] = ip
2274
2275 break
2276 else:
2277 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2278
2279 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2280 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2281
2282 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2283 if kdu_config and kdu_config.get("initial-config-primitive") and \
2284 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2285 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2286 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2287
2288 for initial_config_primitive in initial_config_primitive_list:
2289 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2290
2291 await asyncio.wait_for(
2292 self.k8scluster_map[k8sclustertype].exec_primitive(
2293 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2294 kdu_instance=kdu_instance,
2295 primitive_name=initial_config_primitive["name"],
2296 params=primitive_params_, db_dict=db_dict_install),
2297 timeout=timeout)
2298
2299 except Exception as e:
2300 # Prepare update db with error and raise exception
2301 try:
2302 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2303 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2304 except Exception:
2305 # ignore to keep original exception
2306 pass
2307 # reraise original error
2308 raise
2309
2310 return kdu_instance
2311
2312 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2313 # Launch kdus if present in the descriptor
2314
2315 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2316
2317 async def _get_cluster_id(cluster_id, cluster_type):
2318 nonlocal k8scluster_id_2_uuic
2319 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2320 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2321
2322 # check if K8scluster is creating and wait look if previous tasks in process
2323 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2324 if task_dependency:
2325 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2326 self.logger.debug(logging_text + text)
2327 await asyncio.wait(task_dependency, timeout=3600)
2328
2329 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2330 if not db_k8scluster:
2331 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2332
2333 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2334 if not k8s_id:
2335 if cluster_type == "helm-chart-v3":
2336 try:
2337 # backward compatibility for existing clusters that have not been initialized for helm v3
2338 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2339 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2340 reuse_cluster_uuid=cluster_id)
2341 db_k8scluster_update = {}
2342 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2343 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2344 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2345 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2346 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2347 except Exception as e:
2348 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2349 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2350 cluster_type))
2351 else:
2352 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2353 format(cluster_id, cluster_type))
2354 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2355 return k8s_id
2356
2357 logging_text += "Deploy kdus: "
2358 step = ""
2359 try:
2360 db_nsr_update = {"_admin.deployed.K8s": []}
2361 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2362
2363 index = 0
2364 updated_cluster_list = []
2365 updated_v3_cluster_list = []
2366
2367 for vnfr_data in db_vnfrs.values():
2368 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2369 # Step 0: Prepare and set parameters
2370 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2371 vnfd_id = vnfr_data.get('vnfd-id')
2372 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2373 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2374 namespace = kdur.get("k8s-namespace")
2375 if kdur.get("helm-chart"):
2376 kdumodel = kdur["helm-chart"]
2377 # Default version: helm3, if helm-version is v2 assign v2
2378 k8sclustertype = "helm-chart-v3"
2379 self.logger.debug("kdur: {}".format(kdur))
2380 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2381 k8sclustertype = "helm-chart"
2382 elif kdur.get("juju-bundle"):
2383 kdumodel = kdur["juju-bundle"]
2384 k8sclustertype = "juju-bundle"
2385 else:
2386 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2387 "juju-bundle. Maybe an old NBI version is running".
2388 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2389 # check if kdumodel is a file and exists
2390 try:
2391 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2392 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2393 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2394 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2395 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2396 kdumodel)
2397 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2398 kdumodel = self.fs.path + filename
2399 except (asyncio.TimeoutError, asyncio.CancelledError):
2400 raise
2401 except Exception: # it is not a file
2402 pass
2403
2404 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2405 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2406 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2407
2408 # Synchronize repos
2409 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2410 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2411 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2412 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2413 if del_repo_list or added_repo_dict:
2414 if k8sclustertype == "helm-chart":
2415 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2416 updated = {'_admin.helm_charts_added.' +
2417 item: name for item, name in added_repo_dict.items()}
2418 updated_cluster_list.append(cluster_uuid)
2419 elif k8sclustertype == "helm-chart-v3":
2420 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2421 updated = {'_admin.helm_charts_v3_added.' +
2422 item: name for item, name in added_repo_dict.items()}
2423 updated_v3_cluster_list.append(cluster_uuid)
2424 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2425 "'{}' to_delete: {}, to_add: {}".
2426 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2427 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2428
2429 # Instantiate kdu
2430 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2431 kdur["kdu-name"], k8s_cluster_id)
2432 k8s_instance_info = {"kdu-instance": None,
2433 "k8scluster-uuid": cluster_uuid,
2434 "k8scluster-type": k8sclustertype,
2435 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2436 "kdu-name": kdur["kdu-name"],
2437 "kdu-model": kdumodel,
2438 "namespace": namespace}
2439 db_path = "_admin.deployed.K8s.{}".format(index)
2440 db_nsr_update[db_path] = k8s_instance_info
2441 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2442 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2443 task = asyncio.ensure_future(
2444 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2445 k8s_instance_info, k8params=desc_params, timeout=600))
2446 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2447 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2448
2449 index += 1
2450
2451 except (LcmException, asyncio.CancelledError):
2452 raise
2453 except Exception as e:
2454 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2455 if isinstance(e, (N2VCException, DbException)):
2456 self.logger.error(logging_text + msg)
2457 else:
2458 self.logger.critical(logging_text + msg, exc_info=True)
2459 raise LcmException(msg)
2460 finally:
2461 if db_nsr_update:
2462 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2463
2464 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2465 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2466 base_folder, task_instantiation_info, stage):
2467 # launch instantiate_N2VC in a asyncio task and register task object
2468 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2469 # if not found, create one entry and update database
2470 # fill db_nsr._admin.deployed.VCA.<index>
2471
2472 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2473 if "execution-environment-list" in descriptor_config:
2474 ee_list = descriptor_config.get("execution-environment-list", [])
2475 else: # other types as script are not supported
2476 ee_list = []
2477
2478 for ee_item in ee_list:
2479 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2480 ee_item.get("helm-chart")))
2481 ee_descriptor_id = ee_item.get("id")
2482 if ee_item.get("juju"):
2483 vca_name = ee_item['juju'].get('charm')
2484 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2485 if ee_item['juju'].get('cloud') == "k8s":
2486 vca_type = "k8s_proxy_charm"
2487 elif ee_item['juju'].get('proxy') is False:
2488 vca_type = "native_charm"
2489 elif ee_item.get("helm-chart"):
2490 vca_name = ee_item['helm-chart']
2491 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2492 vca_type = "helm"
2493 else:
2494 vca_type = "helm-v3"
2495 else:
2496 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2497 continue
2498
2499 vca_index = -1
2500 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2501 if not vca_deployed:
2502 continue
2503 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2504 vca_deployed.get("vdu_id") == vdu_id and \
2505 vca_deployed.get("kdu_name") == kdu_name and \
2506 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2507 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2508 break
2509 else:
2510 # not found, create one.
2511 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2512 if vdu_id:
2513 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2514 elif kdu_name:
2515 target += "/kdu/{}".format(kdu_name)
2516 vca_deployed = {
2517 "target_element": target,
2518 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2519 "member-vnf-index": member_vnf_index,
2520 "vdu_id": vdu_id,
2521 "kdu_name": kdu_name,
2522 "vdu_count_index": vdu_index,
2523 "operational-status": "init", # TODO revise
2524 "detailed-status": "", # TODO revise
2525 "step": "initial-deploy", # TODO revise
2526 "vnfd_id": vnfd_id,
2527 "vdu_name": vdu_name,
2528 "type": vca_type,
2529 "ee_descriptor_id": ee_descriptor_id
2530 }
2531 vca_index += 1
2532
2533 # create VCA and configurationStatus in db
2534 db_dict = {
2535 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2536 "configurationStatus.{}".format(vca_index): dict()
2537 }
2538 self.update_db_2("nsrs", nsr_id, db_dict)
2539
2540 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2541
2542 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2543 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2544 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2545
2546 # Launch task
2547 task_n2vc = asyncio.ensure_future(
2548 self.instantiate_N2VC(
2549 logging_text=logging_text,
2550 vca_index=vca_index,
2551 nsi_id=nsi_id,
2552 db_nsr=db_nsr,
2553 db_vnfr=db_vnfr,
2554 vdu_id=vdu_id,
2555 kdu_name=kdu_name,
2556 vdu_index=vdu_index,
2557 deploy_params=deploy_params,
2558 config_descriptor=descriptor_config,
2559 base_folder=base_folder,
2560 nslcmop_id=nslcmop_id,
2561 stage=stage,
2562 vca_type=vca_type,
2563 vca_name=vca_name,
2564 ee_config_descriptor=ee_item
2565 )
2566 )
2567 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2568 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2569 member_vnf_index or "", vdu_id or "")
2570
2571 @staticmethod
2572 def _create_nslcmop(nsr_id, operation, params):
2573 """
2574 Creates a ns-lcm-opp content to be stored at database.
2575 :param nsr_id: internal id of the instance
2576 :param operation: instantiate, terminate, scale, action, ...
2577 :param params: user parameters for the operation
2578 :return: dictionary following SOL005 format
2579 """
2580 # Raise exception if invalid arguments
2581 if not (nsr_id and operation and params):
2582 raise LcmException(
2583 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2584 now = time()
2585 _id = str(uuid4())
2586 nslcmop = {
2587 "id": _id,
2588 "_id": _id,
2589 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2590 "operationState": "PROCESSING",
2591 "statusEnteredTime": now,
2592 "nsInstanceId": nsr_id,
2593 "lcmOperationType": operation,
2594 "startTime": now,
2595 "isAutomaticInvocation": False,
2596 "operationParams": params,
2597 "isCancelPending": False,
2598 "links": {
2599 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2600 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2601 }
2602 }
2603 return nslcmop
2604
2605 def _format_additional_params(self, params):
2606 params = params or {}
2607 for key, value in params.items():
2608 if str(value).startswith("!!yaml "):
2609 params[key] = yaml.safe_load(value[7:])
2610 return params
2611
2612 def _get_terminate_primitive_params(self, seq, vnf_index):
2613 primitive = seq.get('name')
2614 primitive_params = {}
2615 params = {
2616 "member_vnf_index": vnf_index,
2617 "primitive": primitive,
2618 "primitive_params": primitive_params,
2619 }
2620 desc_params = {}
2621 return self._map_primitive_params(seq, params, desc_params)
2622
2623 # sub-operations
2624
2625 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2626 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2627 if op.get('operationState') == 'COMPLETED':
2628 # b. Skip sub-operation
2629 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2630 return self.SUBOPERATION_STATUS_SKIP
2631 else:
2632 # c. retry executing sub-operation
2633 # The sub-operation exists, and operationState != 'COMPLETED'
2634 # Update operationState = 'PROCESSING' to indicate a retry.
2635 operationState = 'PROCESSING'
2636 detailed_status = 'In progress'
2637 self._update_suboperation_status(
2638 db_nslcmop, op_index, operationState, detailed_status)
2639 # Return the sub-operation index
2640 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2641 # with arguments extracted from the sub-operation
2642 return op_index
2643
2644 # Find a sub-operation where all keys in a matching dictionary must match
2645 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2646 def _find_suboperation(self, db_nslcmop, match):
2647 if db_nslcmop and match:
2648 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2649 for i, op in enumerate(op_list):
2650 if all(op.get(k) == match[k] for k in match):
2651 return i
2652 return self.SUBOPERATION_STATUS_NOT_FOUND
2653
2654 # Update status for a sub-operation given its index
2655 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2656 # Update DB for HA tasks
2657 q_filter = {'_id': db_nslcmop['_id']}
2658 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2659 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2660 self.db.set_one("nslcmops",
2661 q_filter=q_filter,
2662 update_dict=update_dict,
2663 fail_on_empty=False)
2664
2665 # Add sub-operation, return the index of the added sub-operation
2666 # Optionally, set operationState, detailed-status, and operationType
2667 # Status and type are currently set for 'scale' sub-operations:
2668 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2669 # 'detailed-status' : status message
2670 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2671 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2672 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2673 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2674 RO_nsr_id=None, RO_scaling_info=None):
2675 if not db_nslcmop:
2676 return self.SUBOPERATION_STATUS_NOT_FOUND
2677 # Get the "_admin.operations" list, if it exists
2678 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2679 op_list = db_nslcmop_admin.get('operations')
2680 # Create or append to the "_admin.operations" list
2681 new_op = {'member_vnf_index': vnf_index,
2682 'vdu_id': vdu_id,
2683 'vdu_count_index': vdu_count_index,
2684 'primitive': primitive,
2685 'primitive_params': mapped_primitive_params}
2686 if operationState:
2687 new_op['operationState'] = operationState
2688 if detailed_status:
2689 new_op['detailed-status'] = detailed_status
2690 if operationType:
2691 new_op['lcmOperationType'] = operationType
2692 if RO_nsr_id:
2693 new_op['RO_nsr_id'] = RO_nsr_id
2694 if RO_scaling_info:
2695 new_op['RO_scaling_info'] = RO_scaling_info
2696 if not op_list:
2697 # No existing operations, create key 'operations' with current operation as first list element
2698 db_nslcmop_admin.update({'operations': [new_op]})
2699 op_list = db_nslcmop_admin.get('operations')
2700 else:
2701 # Existing operations, append operation to list
2702 op_list.append(new_op)
2703
2704 db_nslcmop_update = {'_admin.operations': op_list}
2705 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2706 op_index = len(op_list) - 1
2707 return op_index
2708
2709 # Helper methods for scale() sub-operations
2710
2711 # pre-scale/post-scale:
2712 # Check for 3 different cases:
2713 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2714 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2715 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2716 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2717 operationType, RO_nsr_id=None, RO_scaling_info=None):
2718 # Find this sub-operation
2719 if RO_nsr_id and RO_scaling_info:
2720 operationType = 'SCALE-RO'
2721 match = {
2722 'member_vnf_index': vnf_index,
2723 'RO_nsr_id': RO_nsr_id,
2724 'RO_scaling_info': RO_scaling_info,
2725 }
2726 else:
2727 match = {
2728 'member_vnf_index': vnf_index,
2729 'primitive': vnf_config_primitive,
2730 'primitive_params': primitive_params,
2731 'lcmOperationType': operationType
2732 }
2733 op_index = self._find_suboperation(db_nslcmop, match)
2734 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2735 # a. New sub-operation
2736 # The sub-operation does not exist, add it.
2737 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2738 # The following parameters are set to None for all kind of scaling:
2739 vdu_id = None
2740 vdu_count_index = None
2741 vdu_name = None
2742 if RO_nsr_id and RO_scaling_info:
2743 vnf_config_primitive = None
2744 primitive_params = None
2745 else:
2746 RO_nsr_id = None
2747 RO_scaling_info = None
2748 # Initial status for sub-operation
2749 operationState = 'PROCESSING'
2750 detailed_status = 'In progress'
2751 # Add sub-operation for pre/post-scaling (zero or more operations)
2752 self._add_suboperation(db_nslcmop,
2753 vnf_index,
2754 vdu_id,
2755 vdu_count_index,
2756 vdu_name,
2757 vnf_config_primitive,
2758 primitive_params,
2759 operationState,
2760 detailed_status,
2761 operationType,
2762 RO_nsr_id,
2763 RO_scaling_info)
2764 return self.SUBOPERATION_STATUS_NEW
2765 else:
2766 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2767 # or op_index (operationState != 'COMPLETED')
2768 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2769
2770 # Function to return execution_environment id
2771
2772 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2773 # TODO vdu_index_count
2774 for vca in vca_deployed_list:
2775 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2776 return vca["ee_id"]
2777
2778 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2779 vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
2780 """
2781 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2782 :param logging_text:
2783 :param db_nslcmop:
2784 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2785 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2786 :param vca_index: index in the database _admin.deployed.VCA
2787 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2788 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2789 not executed properly
2790 :param scaling_in: True destroys the application, False destroys the model
2791 :return: None or exception
2792 """
2793
2794 self.logger.debug(
2795 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2796 vca_index, vca_deployed, config_descriptor, destroy_ee
2797 )
2798 )
2799
2800 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2801
2802 # execute terminate_primitives
2803 if exec_primitives:
2804 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2805 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2806 vdu_id = vca_deployed.get("vdu_id")
2807 vdu_count_index = vca_deployed.get("vdu_count_index")
2808 vdu_name = vca_deployed.get("vdu_name")
2809 vnf_index = vca_deployed.get("member-vnf-index")
2810 if terminate_primitives and vca_deployed.get("needed_terminate"):
2811 for seq in terminate_primitives:
2812 # For each sequence in list, get primitive and call _ns_execute_primitive()
2813 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2814 vnf_index, seq.get("name"))
2815 self.logger.debug(logging_text + step)
2816 # Create the primitive for each sequence, i.e. "primitive": "touch"
2817 primitive = seq.get('name')
2818 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2819
2820 # Add sub-operation
2821 self._add_suboperation(db_nslcmop,
2822 vnf_index,
2823 vdu_id,
2824 vdu_count_index,
2825 vdu_name,
2826 primitive,
2827 mapped_primitive_params)
2828 # Sub-operations: Call _ns_execute_primitive() instead of action()
2829 try:
2830 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2831 mapped_primitive_params,
2832 vca_type=vca_type)
2833 except LcmException:
2834 # this happens when VCA is not deployed. In this case it is not needed to terminate
2835 continue
2836 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2837 if result not in result_ok:
2838 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2839 "error {}".format(seq.get("name"), vnf_index, result_detail))
2840 # set that this VCA do not need terminated
2841 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2842 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2843
2844 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2845 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2846
2847 if destroy_ee:
2848 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
2849
2850 async def _delete_all_N2VC(self, db_nsr: dict):
2851 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2852 namespace = "." + db_nsr["_id"]
2853 try:
2854 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2855 except N2VCNotFound: # already deleted. Skip
2856 pass
2857 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2858
2859 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2860 """
2861 Terminates a deployment from RO
2862 :param logging_text:
2863 :param nsr_deployed: db_nsr._admin.deployed
2864 :param nsr_id:
2865 :param nslcmop_id:
2866 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2867 this method will update only the index 2, but it will write on database the concatenated content of the list
2868 :return:
2869 """
2870 db_nsr_update = {}
2871 failed_detail = []
2872 ro_nsr_id = ro_delete_action = None
2873 if nsr_deployed and nsr_deployed.get("RO"):
2874 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2875 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2876 try:
2877 if ro_nsr_id:
2878 stage[2] = "Deleting ns from VIM."
2879 db_nsr_update["detailed-status"] = " ".join(stage)
2880 self._write_op_status(nslcmop_id, stage)
2881 self.logger.debug(logging_text + stage[2])
2882 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2883 self._write_op_status(nslcmop_id, stage)
2884 desc = await self.RO.delete("ns", ro_nsr_id)
2885 ro_delete_action = desc["action_id"]
2886 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2887 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2888 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2889 if ro_delete_action:
2890 # wait until NS is deleted from VIM
2891 stage[2] = "Waiting ns deleted from VIM."
2892 detailed_status_old = None
2893 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2894 ro_delete_action))
2895 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2896 self._write_op_status(nslcmop_id, stage)
2897
2898 delete_timeout = 20 * 60 # 20 minutes
2899 while delete_timeout > 0:
2900 desc = await self.RO.show(
2901 "ns",
2902 item_id_name=ro_nsr_id,
2903 extra_item="action",
2904 extra_item_id=ro_delete_action)
2905
2906 # deploymentStatus
2907 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2908
2909 ns_status, ns_status_info = self.RO.check_action_status(desc)
2910 if ns_status == "ERROR":
2911 raise ROclient.ROClientException(ns_status_info)
2912 elif ns_status == "BUILD":
2913 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2914 elif ns_status == "ACTIVE":
2915 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2916 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2917 break
2918 else:
2919 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2920 if stage[2] != detailed_status_old:
2921 detailed_status_old = stage[2]
2922 db_nsr_update["detailed-status"] = " ".join(stage)
2923 self._write_op_status(nslcmop_id, stage)
2924 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2925 await asyncio.sleep(5, loop=self.loop)
2926 delete_timeout -= 5
2927 else: # delete_timeout <= 0:
2928 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2929
2930 except Exception as e:
2931 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2932 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2933 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2934 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2935 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2936 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2937 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2938 failed_detail.append("delete conflict: {}".format(e))
2939 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2940 else:
2941 failed_detail.append("delete error: {}".format(e))
2942 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2943
2944 # Delete nsd
2945 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2946 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2947 try:
2948 stage[2] = "Deleting nsd from RO."
2949 db_nsr_update["detailed-status"] = " ".join(stage)
2950 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2951 self._write_op_status(nslcmop_id, stage)
2952 await self.RO.delete("nsd", ro_nsd_id)
2953 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2954 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2955 except Exception as e:
2956 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2957 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2958 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2959 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2960 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2961 self.logger.debug(logging_text + failed_detail[-1])
2962 else:
2963 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2964 self.logger.error(logging_text + failed_detail[-1])
2965
2966 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2967 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2968 if not vnf_deployed or not vnf_deployed["id"]:
2969 continue
2970 try:
2971 ro_vnfd_id = vnf_deployed["id"]
2972 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2973 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2974 db_nsr_update["detailed-status"] = " ".join(stage)
2975 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2976 self._write_op_status(nslcmop_id, stage)
2977 await self.RO.delete("vnfd", ro_vnfd_id)
2978 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2979 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2980 except Exception as e:
2981 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2982 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2983 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2984 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2985 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2986 self.logger.debug(logging_text + failed_detail[-1])
2987 else:
2988 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2989 self.logger.error(logging_text + failed_detail[-1])
2990
2991 if failed_detail:
2992 stage[2] = "Error deleting from VIM"
2993 else:
2994 stage[2] = "Deleted from VIM"
2995 db_nsr_update["detailed-status"] = " ".join(stage)
2996 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2997 self._write_op_status(nslcmop_id, stage)
2998
2999 if failed_detail:
3000 raise LcmException("; ".join(failed_detail))
3001
3002 async def terminate(self, nsr_id, nslcmop_id):
3003 # Try to lock HA task here
3004 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3005 if not task_is_locked_by_me:
3006 return
3007
3008 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3009 self.logger.debug(logging_text + "Enter")
3010 timeout_ns_terminate = self.timeout_ns_terminate
3011 db_nsr = None
3012 db_nslcmop = None
3013 operation_params = None
3014 exc = None
3015 error_list = [] # annotates all failed error messages
3016 db_nslcmop_update = {}
3017 autoremove = False # autoremove after terminated
3018 tasks_dict_info = {}
3019 db_nsr_update = {}
3020 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3021 # ^ contains [stage, step, VIM-status]
3022 try:
3023 # wait for any previous tasks in process
3024 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3025
3026 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3027 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3028 operation_params = db_nslcmop.get("operationParams") or {}
3029 if operation_params.get("timeout_ns_terminate"):
3030 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3031 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3032 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3033
3034 db_nsr_update["operational-status"] = "terminating"
3035 db_nsr_update["config-status"] = "terminating"
3036 self._write_ns_status(
3037 nsr_id=nsr_id,
3038 ns_state="TERMINATING",
3039 current_operation="TERMINATING",
3040 current_operation_id=nslcmop_id,
3041 other_update=db_nsr_update
3042 )
3043 self._write_op_status(
3044 op_id=nslcmop_id,
3045 queuePosition=0,
3046 stage=stage
3047 )
3048 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3049 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3050 return
3051
3052 stage[1] = "Getting vnf descriptors from db."
3053 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3054 db_vnfds_from_id = {}
3055 db_vnfds_from_member_index = {}
3056 # Loop over VNFRs
3057 for vnfr in db_vnfrs_list:
3058 vnfd_id = vnfr["vnfd-id"]
3059 if vnfd_id not in db_vnfds_from_id:
3060 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3061 db_vnfds_from_id[vnfd_id] = vnfd
3062 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3063
3064 # Destroy individual execution environments when there are terminating primitives.
3065 # Rest of EE will be deleted at once
3066 # TODO - check before calling _destroy_N2VC
3067 # if not operation_params.get("skip_terminate_primitives"):#
3068 # or not vca.get("needed_terminate"):
3069 stage[0] = "Stage 2/3 execute terminating primitives."
3070 self.logger.debug(logging_text + stage[0])
3071 stage[1] = "Looking execution environment that needs terminate."
3072 self.logger.debug(logging_text + stage[1])
3073
3074 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3075 config_descriptor = None
3076 if not vca or not vca.get("ee_id"):
3077 continue
3078 if not vca.get("member-vnf-index"):
3079 # ns
3080 config_descriptor = db_nsr.get("ns-configuration")
3081 elif vca.get("vdu_id"):
3082 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3083 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3084 elif vca.get("kdu_name"):
3085 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3086 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3087 else:
3088 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3089 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3090 vca_type = vca.get("type")
3091 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3092 vca.get("needed_terminate"))
3093 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3094 # pending native charms
3095 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3096 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3097 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3098 task = asyncio.ensure_future(
3099 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3100 destroy_ee, exec_terminate_primitives))
3101 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3102
3103 # wait for pending tasks of terminate primitives
3104 if tasks_dict_info:
3105 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3106 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3107 min(self.timeout_charm_delete, timeout_ns_terminate),
3108 stage, nslcmop_id)
3109 tasks_dict_info.clear()
3110 if error_list:
3111 return # raise LcmException("; ".join(error_list))
3112
3113 # remove All execution environments at once
3114 stage[0] = "Stage 3/3 delete all."
3115
3116 if nsr_deployed.get("VCA"):
3117 stage[1] = "Deleting all execution environments."
3118 self.logger.debug(logging_text + stage[1])
3119 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3120 timeout=self.timeout_charm_delete))
3121 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3122 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3123
3124 # Delete from k8scluster
3125 stage[1] = "Deleting KDUs."
3126 self.logger.debug(logging_text + stage[1])
3127 # print(nsr_deployed)
3128 for kdu in get_iterable(nsr_deployed, "K8s"):
3129 if not kdu or not kdu.get("kdu-instance"):
3130 continue
3131 kdu_instance = kdu.get("kdu-instance")
3132 if kdu.get("k8scluster-type") in self.k8scluster_map:
3133 task_delete_kdu_instance = asyncio.ensure_future(
3134 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3135 cluster_uuid=kdu.get("k8scluster-uuid"),
3136 kdu_instance=kdu_instance))
3137 else:
3138 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3139 format(kdu.get("k8scluster-type")))
3140 continue
3141 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3142
3143 # remove from RO
3144 stage[1] = "Deleting ns from VIM."
3145 if self.ng_ro:
3146 task_delete_ro = asyncio.ensure_future(
3147 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3148 else:
3149 task_delete_ro = asyncio.ensure_future(
3150 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3151 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3152
3153 # rest of staff will be done at finally
3154
3155 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3156 self.logger.error(logging_text + "Exit Exception {}".format(e))
3157 exc = e
3158 except asyncio.CancelledError:
3159 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3160 exc = "Operation was cancelled"
3161 except Exception as e:
3162 exc = traceback.format_exc()
3163 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3164 finally:
3165 if exc:
3166 error_list.append(str(exc))
3167 try:
3168 # wait for pending tasks
3169 if tasks_dict_info:
3170 stage[1] = "Waiting for terminate pending tasks."
3171 self.logger.debug(logging_text + stage[1])
3172 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3173 stage, nslcmop_id)
3174 stage[1] = stage[2] = ""
3175 except asyncio.CancelledError:
3176 error_list.append("Cancelled")
3177 # TODO cancell all tasks
3178 except Exception as exc:
3179 error_list.append(str(exc))
3180 # update status at database
3181 if error_list:
3182 error_detail = "; ".join(error_list)
3183 # self.logger.error(logging_text + error_detail)
3184 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3185 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3186
3187 db_nsr_update["operational-status"] = "failed"
3188 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3189 db_nslcmop_update["detailed-status"] = error_detail
3190 nslcmop_operation_state = "FAILED"
3191 ns_state = "BROKEN"
3192 else:
3193 error_detail = None
3194 error_description_nsr = error_description_nslcmop = None
3195 ns_state = "NOT_INSTANTIATED"
3196 db_nsr_update["operational-status"] = "terminated"
3197 db_nsr_update["detailed-status"] = "Done"
3198 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3199 db_nslcmop_update["detailed-status"] = "Done"
3200 nslcmop_operation_state = "COMPLETED"
3201
3202 if db_nsr:
3203 self._write_ns_status(
3204 nsr_id=nsr_id,
3205 ns_state=ns_state,
3206 current_operation="IDLE",
3207 current_operation_id=None,
3208 error_description=error_description_nsr,
3209 error_detail=error_detail,
3210 other_update=db_nsr_update
3211 )
3212 self._write_op_status(
3213 op_id=nslcmop_id,
3214 stage="",
3215 error_message=error_description_nslcmop,
3216 operation_state=nslcmop_operation_state,
3217 other_update=db_nslcmop_update,
3218 )
3219 if ns_state == "NOT_INSTANTIATED":
3220 try:
3221 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3222 except DbException as e:
3223 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3224 format(nsr_id, e))
3225 if operation_params:
3226 autoremove = operation_params.get("autoremove", False)
3227 if nslcmop_operation_state:
3228 try:
3229 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3230 "operationState": nslcmop_operation_state,
3231 "autoremove": autoremove},
3232 loop=self.loop)
3233 except Exception as e:
3234 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3235
3236 self.logger.debug(logging_text + "Exit")
3237 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3238
3239 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3240 time_start = time()
3241 error_detail_list = []
3242 error_list = []
3243 pending_tasks = list(created_tasks_info.keys())
3244 num_tasks = len(pending_tasks)
3245 num_done = 0
3246 stage[1] = "{}/{}.".format(num_done, num_tasks)
3247 self._write_op_status(nslcmop_id, stage)
3248 while pending_tasks:
3249 new_error = None
3250 _timeout = timeout + time_start - time()
3251 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3252 return_when=asyncio.FIRST_COMPLETED)
3253 num_done += len(done)
3254 if not done: # Timeout
3255 for task in pending_tasks:
3256 new_error = created_tasks_info[task] + ": Timeout"
3257 error_detail_list.append(new_error)
3258 error_list.append(new_error)
3259 break
3260 for task in done:
3261 if task.cancelled():
3262 exc = "Cancelled"
3263 else:
3264 exc = task.exception()
3265 if exc:
3266 if isinstance(exc, asyncio.TimeoutError):
3267 exc = "Timeout"
3268 new_error = created_tasks_info[task] + ": {}".format(exc)
3269 error_list.append(created_tasks_info[task])
3270 error_detail_list.append(new_error)
3271 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3272 K8sException, NgRoException)):
3273 self.logger.error(logging_text + new_error)
3274 else:
3275 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3276 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3277 else:
3278 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3279 stage[1] = "{}/{}.".format(num_done, num_tasks)
3280 if new_error:
3281 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3282 if nsr_id: # update also nsr
3283 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3284 "errorDetail": ". ".join(error_detail_list)})
3285 self._write_op_status(nslcmop_id, stage)
3286 return error_detail_list
3287
3288 @staticmethod
3289 def _map_primitive_params(primitive_desc, params, instantiation_params):
3290 """
3291 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3292 The default-value is used. If it is between < > it look for a value at instantiation_params
3293 :param primitive_desc: portion of VNFD/NSD that describes primitive
3294 :param params: Params provided by user
3295 :param instantiation_params: Instantiation params provided by user
3296 :return: a dictionary with the calculated params
3297 """
3298 calculated_params = {}
3299 for parameter in primitive_desc.get("parameter", ()):
3300 param_name = parameter["name"]
3301 if param_name in params:
3302 calculated_params[param_name] = params[param_name]
3303 elif "default-value" in parameter or "value" in parameter:
3304 if "value" in parameter:
3305 calculated_params[param_name] = parameter["value"]
3306 else:
3307 calculated_params[param_name] = parameter["default-value"]
3308 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3309 and calculated_params[param_name].endswith(">"):
3310 if calculated_params[param_name][1:-1] in instantiation_params:
3311 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3312 else:
3313 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3314 format(calculated_params[param_name], primitive_desc["name"]))
3315 else:
3316 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3317 format(param_name, primitive_desc["name"]))
3318
3319 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3320 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3321 default_flow_style=True, width=256)
3322 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3323 calculated_params[param_name] = calculated_params[param_name][7:]
3324 if parameter.get("data-type") == "INTEGER":
3325 try:
3326 calculated_params[param_name] = int(calculated_params[param_name])
3327 except ValueError: # error converting string to int
3328 raise LcmException(
3329 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3330 elif parameter.get("data-type") == "BOOLEAN":
3331 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3332
3333 # add always ns_config_info if primitive name is config
3334 if primitive_desc["name"] == "config":
3335 if "ns_config_info" in instantiation_params:
3336 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3337 return calculated_params
3338
3339 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3340 ee_descriptor_id=None):
3341 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3342 for vca in deployed_vca:
3343 if not vca:
3344 continue
3345 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3346 continue
3347 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3348 continue
3349 if kdu_name and kdu_name != vca["kdu_name"]:
3350 continue
3351 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3352 continue
3353 break
3354 else:
3355 # vca_deployed not found
3356 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3357 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3358 ee_descriptor_id))
3359 # get ee_id
3360 ee_id = vca.get("ee_id")
3361 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3362 if not ee_id:
3363 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3364 "execution environment"
3365 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3366 return ee_id, vca_type
3367
3368 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3369 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3370 try:
3371 if primitive == "config":
3372 primitive_params = {"params": primitive_params}
3373
3374 vca_type = vca_type or "lxc_proxy_charm"
3375
3376 while retries >= 0:
3377 try:
3378 output = await asyncio.wait_for(
3379 self.vca_map[vca_type].exec_primitive(
3380 ee_id=ee_id,
3381 primitive_name=primitive,
3382 params_dict=primitive_params,
3383 progress_timeout=self.timeout_progress_primitive,
3384 total_timeout=self.timeout_primitive,
3385 db_dict=db_dict),
3386 timeout=timeout or self.timeout_primitive)
3387 # execution was OK
3388 break
3389 except asyncio.CancelledError:
3390 raise
3391 except Exception as e: # asyncio.TimeoutError
3392 if isinstance(e, asyncio.TimeoutError):
3393 e = "Timeout"
3394 retries -= 1
3395 if retries >= 0:
3396 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3397 # wait and retry
3398 await asyncio.sleep(retries_interval, loop=self.loop)
3399 else:
3400 return 'FAILED', str(e)
3401
3402 return 'COMPLETED', output
3403
3404 except (LcmException, asyncio.CancelledError):
3405 raise
3406 except Exception as e:
3407 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3408
3409 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3410 """
3411 Updating the vca_status with latest juju information in nsrs record
3412 :param: nsr_id: Id of the nsr
3413 :param: nslcmop_id: Id of the nslcmop
3414 :return: None
3415 """
3416
3417 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3418 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3419 if db_nsr['_admin']['deployed']['K8s']:
3420 for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
3421 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
3422 await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id})
3423 else:
3424 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3425 table, filter = "nsrs", {"_id": nsr_id}
3426 path = "_admin.deployed.VCA.{}.".format(vca_index)
3427 await self._on_update_n2vc_db(table, filter, path, {})
3428
3429 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3430 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3431
3432 async def action(self, nsr_id, nslcmop_id):
3433 # Try to lock HA task here
3434 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3435 if not task_is_locked_by_me:
3436 return
3437
3438 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3439 self.logger.debug(logging_text + "Enter")
3440 # get all needed from database
3441 db_nsr = None
3442 db_nslcmop = None
3443 db_nsr_update = {}
3444 db_nslcmop_update = {}
3445 nslcmop_operation_state = None
3446 error_description_nslcmop = None
3447 exc = None
3448 try:
3449 # wait for any previous tasks in process
3450 step = "Waiting for previous operations to terminate"
3451 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3452
3453 self._write_ns_status(
3454 nsr_id=nsr_id,
3455 ns_state=None,
3456 current_operation="RUNNING ACTION",
3457 current_operation_id=nslcmop_id
3458 )
3459
3460 step = "Getting information from database"
3461 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3462 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3463
3464 nsr_deployed = db_nsr["_admin"].get("deployed")
3465 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3466 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3467 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3468 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3469 primitive = db_nslcmop["operationParams"]["primitive"]
3470 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3471 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3472
3473 if vnf_index:
3474 step = "Getting vnfr from database"
3475 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3476 step = "Getting vnfd from database"
3477 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3478 else:
3479 step = "Getting nsd from database"
3480 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3481
3482 # for backward compatibility
3483 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3484 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3485 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3486 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3487
3488 # look for primitive
3489 config_primitive_desc = descriptor_configuration = None
3490 if vdu_id:
3491 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3492 elif kdu_name:
3493 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3494 elif vnf_index:
3495 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3496 else:
3497 descriptor_configuration = db_nsd.get("ns-configuration")
3498
3499 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3500 for config_primitive in descriptor_configuration["config-primitive"]:
3501 if config_primitive["name"] == primitive:
3502 config_primitive_desc = config_primitive
3503 break
3504
3505 if not config_primitive_desc:
3506 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3507 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3508 format(primitive))
3509 primitive_name = primitive
3510 ee_descriptor_id = None
3511 else:
3512 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3513 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3514
3515 if vnf_index:
3516 if vdu_id:
3517 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3518 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3519 elif kdu_name:
3520 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3521 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3522 else:
3523 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3524 else:
3525 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3526 if kdu_name and get_configuration(db_vnfd, kdu_name):
3527 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3528 actions = set()
3529 for primitive in kdu_configuration.get("initial-config-primitive", []):
3530 actions.add(primitive["name"])
3531 for primitive in kdu_configuration.get("config-primitive", []):
3532 actions.add(primitive["name"])
3533 kdu_action = True if primitive_name in actions else False
3534
3535 # TODO check if ns is in a proper status
3536 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3537 # kdur and desc_params already set from before
3538 if primitive_params:
3539 desc_params.update(primitive_params)
3540 # TODO Check if we will need something at vnf level
3541 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3542 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3543 break
3544 else:
3545 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3546
3547 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3548 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3549 raise LcmException(msg)
3550
3551 db_dict = {"collection": "nsrs",
3552 "filter": {"_id": nsr_id},
3553 "path": "_admin.deployed.K8s.{}".format(index)}
3554 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3555 step = "Executing kdu {}".format(primitive_name)
3556 if primitive_name == "upgrade":
3557 if desc_params.get("kdu_model"):
3558 kdu_model = desc_params.get("kdu_model")
3559 del desc_params["kdu_model"]
3560 else:
3561 kdu_model = kdu.get("kdu-model")
3562 parts = kdu_model.split(sep=":")
3563 if len(parts) == 2:
3564 kdu_model = parts[0]
3565
3566 detailed_status = await asyncio.wait_for(
3567 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3568 cluster_uuid=kdu.get("k8scluster-uuid"),
3569 kdu_instance=kdu.get("kdu-instance"),
3570 atomic=True, kdu_model=kdu_model,
3571 params=desc_params, db_dict=db_dict,
3572 timeout=timeout_ns_action),
3573 timeout=timeout_ns_action + 10)
3574 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3575 elif primitive_name == "rollback":
3576 detailed_status = await asyncio.wait_for(
3577 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3578 cluster_uuid=kdu.get("k8scluster-uuid"),
3579 kdu_instance=kdu.get("kdu-instance"),
3580 db_dict=db_dict),
3581 timeout=timeout_ns_action)
3582 elif primitive_name == "status":
3583 detailed_status = await asyncio.wait_for(
3584 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3585 cluster_uuid=kdu.get("k8scluster-uuid"),
3586 kdu_instance=kdu.get("kdu-instance")),
3587 timeout=timeout_ns_action)
3588 else:
3589 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3590 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3591
3592 detailed_status = await asyncio.wait_for(
3593 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3594 cluster_uuid=kdu.get("k8scluster-uuid"),
3595 kdu_instance=kdu_instance,
3596 primitive_name=primitive_name,
3597 params=params, db_dict=db_dict,
3598 timeout=timeout_ns_action),
3599 timeout=timeout_ns_action)
3600
3601 if detailed_status:
3602 nslcmop_operation_state = 'COMPLETED'
3603 else:
3604 detailed_status = ''
3605 nslcmop_operation_state = 'FAILED'
3606 else:
3607 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3608 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3609 ee_descriptor_id=ee_descriptor_id)
3610 db_nslcmop_notif = {"collection": "nslcmops",
3611 "filter": {"_id": nslcmop_id},
3612 "path": "admin.VCA"}
3613 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3614 ee_id,
3615 primitive=primitive_name,
3616 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3617 timeout=timeout_ns_action,
3618 vca_type=vca_type,
3619 db_dict=db_nslcmop_notif)
3620
3621 db_nslcmop_update["detailed-status"] = detailed_status
3622 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3623 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3624 detailed_status))
3625 return # database update is called inside finally
3626
3627 except (DbException, LcmException, N2VCException, K8sException) as e:
3628 self.logger.error(logging_text + "Exit Exception {}".format(e))
3629 exc = e
3630 except asyncio.CancelledError:
3631 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3632 exc = "Operation was cancelled"
3633 except asyncio.TimeoutError:
3634 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3635 exc = "Timeout"
3636 except Exception as e:
3637 exc = traceback.format_exc()
3638 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3639 finally:
3640 if exc:
3641 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3642 "FAILED {}: {}".format(step, exc)
3643 nslcmop_operation_state = "FAILED"
3644 if db_nsr:
3645 self._write_ns_status(
3646 nsr_id=nsr_id,
3647 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3648 current_operation="IDLE",
3649 current_operation_id=None,
3650 # error_description=error_description_nsr,
3651 # error_detail=error_detail,
3652 other_update=db_nsr_update
3653 )
3654
3655 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3656 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3657
3658 if nslcmop_operation_state:
3659 try:
3660 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3661 "operationState": nslcmop_operation_state},
3662 loop=self.loop)
3663 except Exception as e:
3664 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3665 self.logger.debug(logging_text + "Exit")
3666 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3667 return nslcmop_operation_state, detailed_status
3668
3669 async def scale(self, nsr_id, nslcmop_id):
3670 # Try to lock HA task here
3671 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3672 if not task_is_locked_by_me:
3673 return
3674
3675 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3676 stage = ['', '', '']
3677 tasks_dict_info = {}
3678 # ^ stage, step, VIM progress
3679 self.logger.debug(logging_text + "Enter")
3680 # get all needed from database
3681 db_nsr = None
3682 db_nslcmop_update = {}
3683 db_nsr_update = {}
3684 exc = None
3685 # in case of error, indicates what part of scale was failed to put nsr at error status
3686 scale_process = None
3687 old_operational_status = ""
3688 old_config_status = ""
3689 nsi_id = None
3690 try:
3691 # wait for any previous tasks in process
3692 step = "Waiting for previous operations to terminate"
3693 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3694 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3695 current_operation="SCALING", current_operation_id=nslcmop_id)
3696
3697 step = "Getting nslcmop from database"
3698 self.logger.debug(step + " after having waited for previous tasks to be completed")
3699 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3700
3701 step = "Getting nsr from database"
3702 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3703 old_operational_status = db_nsr["operational-status"]
3704 old_config_status = db_nsr["config-status"]
3705
3706 step = "Parsing scaling parameters"
3707 db_nsr_update["operational-status"] = "scaling"
3708 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3709 nsr_deployed = db_nsr["_admin"].get("deployed")
3710
3711 #######
3712 nsr_deployed = db_nsr["_admin"].get("deployed")
3713 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3714 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3715 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3716 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3717 #######
3718
3719 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3720 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3721 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3722 # for backward compatibility
3723 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3724 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3725 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3726 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3727
3728 step = "Getting vnfr from database"
3729 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3730
3731 step = "Getting vnfd from database"
3732 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3733
3734 base_folder = db_vnfd["_admin"]["storage"]
3735
3736 step = "Getting scaling-group-descriptor"
3737 scaling_descriptor = find_in_list(
3738 get_scaling_aspect(
3739 db_vnfd
3740 ),
3741 lambda scale_desc: scale_desc["name"] == scaling_group
3742 )
3743 if not scaling_descriptor:
3744 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3745 "at vnfd:scaling-group-descriptor".format(scaling_group))
3746
3747 step = "Sending scale order to VIM"
3748 # TODO check if ns is in a proper status
3749 nb_scale_op = 0
3750 if not db_nsr["_admin"].get("scaling-group"):
3751 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3752 admin_scale_index = 0
3753 else:
3754 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3755 if admin_scale_info["name"] == scaling_group:
3756 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3757 break
3758 else: # not found, set index one plus last element and add new entry with the name
3759 admin_scale_index += 1
3760 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3761 RO_scaling_info = []
3762 VCA_scaling_info = []
3763 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3764 if scaling_type == "SCALE_OUT":
3765 if "aspect-delta-details" not in scaling_descriptor:
3766 raise LcmException(
3767 "Aspect delta details not fount in scaling descriptor {}".format(
3768 scaling_descriptor["name"]
3769 )
3770 )
3771 # count if max-instance-count is reached
3772 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3773
3774 vdu_scaling_info["scaling_direction"] = "OUT"
3775 vdu_scaling_info["vdu-create"] = {}
3776 for delta in deltas:
3777 for vdu_delta in delta["vdu-delta"]:
3778 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3779 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3780 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3781 if cloud_init_text:
3782 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3783 cloud_init_list = []
3784
3785 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3786 max_instance_count = 10
3787 if vdu_profile and "max-number-of-instances" in vdu_profile:
3788 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3789
3790 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3791
3792 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3793
3794 if nb_scale_op + default_instance_num > max_instance_count:
3795 raise LcmException(
3796 "reached the limit of {} (max-instance-count) "
3797 "scaling-out operations for the "
3798 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3799 )
3800 for x in range(vdu_delta.get("number-of-instances", 1)):
3801 if cloud_init_text:
3802 # TODO Information of its own ip is not available because db_vnfr is not updated.
3803 additional_params["OSM"] = get_osm_params(
3804 db_vnfr,
3805 vdu_delta["id"],
3806 vdu_index + x
3807 )
3808 cloud_init_list.append(
3809 self._parse_cloud_init(
3810 cloud_init_text,
3811 additional_params,
3812 db_vnfd["id"],
3813 vdud["id"]
3814 )
3815 )
3816 VCA_scaling_info.append(
3817 {
3818 "osm_vdu_id": vdu_delta["id"],
3819 "member-vnf-index": vnf_index,
3820 "type": "create",
3821 "vdu_index": vdu_index + x
3822 }
3823 )
3824 RO_scaling_info.append(
3825 {
3826 "osm_vdu_id": vdu_delta["id"],
3827 "member-vnf-index": vnf_index,
3828 "type": "create",
3829 "count": vdu_delta.get("number-of-instances", 1)
3830 }
3831 )
3832 if cloud_init_list:
3833 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3834 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3835
3836 elif scaling_type == "SCALE_IN":
3837 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3838 min_instance_count = int(scaling_descriptor["min-instance-count"])
3839
3840 vdu_scaling_info["scaling_direction"] = "IN"
3841 vdu_scaling_info["vdu-delete"] = {}
3842 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3843 for delta in deltas:
3844 for vdu_delta in delta["vdu-delta"]:
3845 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3846 min_instance_count = 0
3847 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3848 if vdu_profile and "min-number-of-instances" in vdu_profile:
3849 min_instance_count = vdu_profile["min-number-of-instances"]
3850
3851 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3852
3853 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3854 if nb_scale_op + default_instance_num < min_instance_count:
3855 raise LcmException(
3856 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3857 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3858 )
3859 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3860 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3861 "vdu_index": vdu_index - 1})
3862 for x in range(vdu_delta.get("number-of-instances", 1)):
3863 VCA_scaling_info.append(
3864 {
3865 "osm_vdu_id": vdu_delta["id"],
3866 "member-vnf-index": vnf_index,
3867 "type": "delete",
3868 "vdu_index": vdu_index - 1 - x
3869 }
3870 )
3871 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3872
3873 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3874 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3875 if vdu_scaling_info["scaling_direction"] == "IN":
3876 for vdur in reversed(db_vnfr["vdur"]):
3877 if vdu_delete.get(vdur["vdu-id-ref"]):
3878 vdu_delete[vdur["vdu-id-ref"]] -= 1
3879 vdu_scaling_info["vdu"].append({
3880 "name": vdur.get("name") or vdur.get("vdu-name"),
3881 "vdu_id": vdur["vdu-id-ref"],
3882 "interface": []
3883 })
3884 for interface in vdur["interfaces"]:
3885 vdu_scaling_info["vdu"][-1]["interface"].append({
3886 "name": interface["name"],
3887 "ip_address": interface["ip-address"],
3888 "mac_address": interface.get("mac-address"),
3889 })
3890 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3891
3892 # PRE-SCALE BEGIN
3893 step = "Executing pre-scale vnf-config-primitive"
3894 if scaling_descriptor.get("scaling-config-action"):
3895 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3896 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3897 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3898 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3899 step = db_nslcmop_update["detailed-status"] = \
3900 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3901
3902 # look for primitive
3903 for config_primitive in (get_configuration(
3904 db_vnfd, db_vnfd["id"]
3905 ) or {}).get("config-primitive", ()):
3906 if config_primitive["name"] == vnf_config_primitive:
3907 break
3908 else:
3909 raise LcmException(
3910 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3911 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3912 "primitive".format(scaling_group, vnf_config_primitive))
3913
3914 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3915 if db_vnfr.get("additionalParamsForVnf"):
3916 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3917
3918 scale_process = "VCA"
3919 db_nsr_update["config-status"] = "configuring pre-scaling"
3920 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3921
3922 # Pre-scale retry check: Check if this sub-operation has been executed before
3923 op_index = self._check_or_add_scale_suboperation(
3924 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3925 if op_index == self.SUBOPERATION_STATUS_SKIP:
3926 # Skip sub-operation
3927 result = 'COMPLETED'
3928 result_detail = 'Done'
3929 self.logger.debug(logging_text +
3930 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3931 vnf_config_primitive, result, result_detail))
3932 else:
3933 if op_index == self.SUBOPERATION_STATUS_NEW:
3934 # New sub-operation: Get index of this sub-operation
3935 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3936 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3937 format(vnf_config_primitive))
3938 else:
3939 # retry: Get registered params for this existing sub-operation
3940 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3941 vnf_index = op.get('member_vnf_index')
3942 vnf_config_primitive = op.get('primitive')
3943 primitive_params = op.get('primitive_params')
3944 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3945 format(vnf_config_primitive))
3946 # Execute the primitive, either with new (first-time) or registered (reintent) args
3947 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3948 primitive_name = config_primitive.get("execution-environment-primitive",
3949 vnf_config_primitive)
3950 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3951 member_vnf_index=vnf_index,
3952 vdu_id=None,
3953 vdu_count_index=None,
3954 ee_descriptor_id=ee_descriptor_id)
3955 result, result_detail = await self._ns_execute_primitive(
3956 ee_id, primitive_name, primitive_params, vca_type=vca_type)
3957 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3958 vnf_config_primitive, result, result_detail))
3959 # Update operationState = COMPLETED | FAILED
3960 self._update_suboperation_status(
3961 db_nslcmop, op_index, result, result_detail)
3962
3963 if result == "FAILED":
3964 raise LcmException(result_detail)
3965 db_nsr_update["config-status"] = old_config_status
3966 scale_process = None
3967 # PRE-SCALE END
3968
3969 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3970 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3971
3972 # SCALE-IN VCA - BEGIN
3973 if VCA_scaling_info:
3974 step = db_nslcmop_update["detailed-status"] = \
3975 "Deleting the execution environments"
3976 scale_process = "VCA"
3977 for vdu_info in VCA_scaling_info:
3978 if vdu_info["type"] == "delete":
3979 member_vnf_index = str(vdu_info["member-vnf-index"])
3980 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
3981 vdu_id = vdu_info["osm_vdu_id"]
3982 vdu_index = int(vdu_info["vdu_index"])
3983 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3984 member_vnf_index, vdu_id, vdu_index)
3985 stage[2] = step = "Scaling in VCA"
3986 self._write_op_status(
3987 op_id=nslcmop_id,
3988 stage=stage
3989 )
3990 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
3991 config_update = db_nsr["configurationStatus"]
3992 for vca_index, vca in enumerate(vca_update):
3993 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
3994 vca["vdu_count_index"] == vdu_index:
3995 if vca.get("vdu_id"):
3996 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3997 elif vca.get("kdu_name"):
3998 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3999 else:
4000 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4001 operation_params = db_nslcmop.get("operationParams") or {}
4002 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
4003 vca.get("needed_terminate"))
4004 task = asyncio.ensure_future(asyncio.wait_for(
4005 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
4006 vca_index, destroy_ee=True,
4007 exec_primitives=exec_terminate_primitives,
4008 scaling_in=True), timeout=self.timeout_charm_delete))
4009 # wait before next removal
4010 await asyncio.sleep(30)
4011 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4012 del vca_update[vca_index]
4013 del config_update[vca_index]
4014 # wait for pending tasks of terminate primitives
4015 if tasks_dict_info:
4016 self.logger.debug(logging_text +
4017 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
4018 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
4019 min(self.timeout_charm_delete,
4020 self.timeout_ns_terminate),
4021 stage, nslcmop_id)
4022 tasks_dict_info.clear()
4023 if error_list:
4024 raise LcmException("; ".join(error_list))
4025
4026 db_vca_and_config_update = {
4027 "_admin.deployed.VCA": vca_update,
4028 "configurationStatus": config_update
4029 }
4030 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
4031 scale_process = None
4032 # SCALE-IN VCA - END
4033
4034 # SCALE RO - BEGIN
4035 if RO_scaling_info:
4036 scale_process = "RO"
4037 if self.ro_config.get("ng"):
4038 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4039 vdu_scaling_info.pop("vdu-create", None)
4040 vdu_scaling_info.pop("vdu-delete", None)
4041
4042 scale_process = None
4043 if db_nsr_update:
4044 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4045 # SCALE RO - END
4046
4047 # SCALE-UP VCA - BEGIN
4048 if VCA_scaling_info:
4049 step = db_nslcmop_update["detailed-status"] = \
4050 "Creating new execution environments"
4051 scale_process = "VCA"
4052 for vdu_info in VCA_scaling_info:
4053 if vdu_info["type"] == "create":
4054 member_vnf_index = str(vdu_info["member-vnf-index"])
4055 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4056 vnfd_id = db_vnfr["vnfd-ref"]
4057 vdu_index = int(vdu_info["vdu_index"])
4058 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4059 if db_vnfr.get("additionalParamsForVnf"):
4060 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4061 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4062 if descriptor_config:
4063 vdu_id = None
4064 vdu_name = None
4065 kdu_name = None
4066 self._deploy_n2vc(
4067 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4068 db_nsr=db_nsr,
4069 db_vnfr=db_vnfr,
4070 nslcmop_id=nslcmop_id,
4071 nsr_id=nsr_id,
4072 nsi_id=nsi_id,
4073 vnfd_id=vnfd_id,
4074 vdu_id=vdu_id,
4075 kdu_name=kdu_name,
4076 member_vnf_index=member_vnf_index,
4077 vdu_index=vdu_index,
4078 vdu_name=vdu_name,
4079 deploy_params=deploy_params,
4080 descriptor_config=descriptor_config,
4081 base_folder=base_folder,
4082 task_instantiation_info=tasks_dict_info,
4083 stage=stage
4084 )
4085 vdu_id = vdu_info["osm_vdu_id"]
4086 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4087 descriptor_config = get_configuration(db_vnfd, vdu_id)
4088 if vdur.get("additionalParams"):
4089 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4090 else:
4091 deploy_params_vdu = deploy_params
4092 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4093 if descriptor_config:
4094 vdu_name = None
4095 kdu_name = None
4096 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4097 member_vnf_index, vdu_id, vdu_index)
4098 stage[2] = step = "Scaling out VCA"
4099 self._write_op_status(
4100 op_id=nslcmop_id,
4101 stage=stage
4102 )
4103 self._deploy_n2vc(
4104 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4105 member_vnf_index, vdu_id, vdu_index),
4106 db_nsr=db_nsr,
4107 db_vnfr=db_vnfr,
4108 nslcmop_id=nslcmop_id,
4109 nsr_id=nsr_id,
4110 nsi_id=nsi_id,
4111 vnfd_id=vnfd_id,
4112 vdu_id=vdu_id,
4113 kdu_name=kdu_name,
4114 member_vnf_index=member_vnf_index,
4115 vdu_index=vdu_index,
4116 vdu_name=vdu_name,
4117 deploy_params=deploy_params_vdu,
4118 descriptor_config=descriptor_config,
4119 base_folder=base_folder,
4120 task_instantiation_info=tasks_dict_info,
4121 stage=stage
4122 )
4123 # TODO: scaling for kdu is not implemented yet.
4124 kdu_name = vdu_info["osm_vdu_id"]
4125 descriptor_config = get_configuration(db_vnfd, kdu_name)
4126 if descriptor_config:
4127 vdu_id = None
4128 vdu_index = vdu_index
4129 vdu_name = None
4130 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
4131 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
4132 if kdur.get("additionalParams"):
4133 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
4134
4135 self._deploy_n2vc(
4136 logging_text=logging_text,
4137 db_nsr=db_nsr,
4138 db_vnfr=db_vnfr,
4139 nslcmop_id=nslcmop_id,
4140 nsr_id=nsr_id,
4141 nsi_id=nsi_id,
4142 vnfd_id=vnfd_id,
4143 vdu_id=vdu_id,
4144 kdu_name=kdu_name,
4145 member_vnf_index=member_vnf_index,
4146 vdu_index=vdu_index,
4147 vdu_name=vdu_name,
4148 deploy_params=deploy_params_kdu,
4149 descriptor_config=descriptor_config,
4150 base_folder=base_folder,
4151 task_instantiation_info=tasks_dict_info,
4152 stage=stage
4153 )
4154 # SCALE-UP VCA - END
4155 scale_process = None
4156
4157 # POST-SCALE BEGIN
4158 # execute primitive service POST-SCALING
4159 step = "Executing post-scale vnf-config-primitive"
4160 if scaling_descriptor.get("scaling-config-action"):
4161 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4162 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4163 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4164 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4165 step = db_nslcmop_update["detailed-status"] = \
4166 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4167
4168 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4169 if db_vnfr.get("additionalParamsForVnf"):
4170 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4171
4172 # look for primitive
4173 for config_primitive in (
4174 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4175 ).get("config-primitive", ()):
4176 if config_primitive["name"] == vnf_config_primitive:
4177 break
4178 else:
4179 raise LcmException(
4180 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4181 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4182 "config-primitive".format(scaling_group, vnf_config_primitive))
4183 scale_process = "VCA"
4184 db_nsr_update["config-status"] = "configuring post-scaling"
4185 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4186
4187 # Post-scale retry check: Check if this sub-operation has been executed before
4188 op_index = self._check_or_add_scale_suboperation(
4189 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4190 if op_index == self.SUBOPERATION_STATUS_SKIP:
4191 # Skip sub-operation
4192 result = 'COMPLETED'
4193 result_detail = 'Done'
4194 self.logger.debug(logging_text +
4195 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4196 format(vnf_config_primitive, result, result_detail))
4197 else:
4198 if op_index == self.SUBOPERATION_STATUS_NEW:
4199 # New sub-operation: Get index of this sub-operation
4200 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4201 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4202 format(vnf_config_primitive))
4203 else:
4204 # retry: Get registered params for this existing sub-operation
4205 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4206 vnf_index = op.get('member_vnf_index')
4207 vnf_config_primitive = op.get('primitive')
4208 primitive_params = op.get('primitive_params')
4209 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4210 format(vnf_config_primitive))
4211 # Execute the primitive, either with new (first-time) or registered (reintent) args
4212 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4213 primitive_name = config_primitive.get("execution-environment-primitive",
4214 vnf_config_primitive)
4215 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4216 member_vnf_index=vnf_index,
4217 vdu_id=None,
4218 vdu_count_index=None,
4219 ee_descriptor_id=ee_descriptor_id)
4220 result, result_detail = await self._ns_execute_primitive(
4221 ee_id, primitive_name, primitive_params, vca_type=vca_type)
4222 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4223 vnf_config_primitive, result, result_detail))
4224 # Update operationState = COMPLETED | FAILED
4225 self._update_suboperation_status(
4226 db_nslcmop, op_index, result, result_detail)
4227
4228 if result == "FAILED":
4229 raise LcmException(result_detail)
4230 db_nsr_update["config-status"] = old_config_status
4231 scale_process = None
4232 # POST-SCALE END
4233
4234 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4235 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4236 else old_operational_status
4237 db_nsr_update["config-status"] = old_config_status
4238 return
4239 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4240 self.logger.error(logging_text + "Exit Exception {}".format(e))
4241 exc = e
4242 except asyncio.CancelledError:
4243 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4244 exc = "Operation was cancelled"
4245 except Exception as e:
4246 exc = traceback.format_exc()
4247 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4248 finally:
4249 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4250 if tasks_dict_info:
4251 stage[1] = "Waiting for instantiate pending tasks."
4252 self.logger.debug(logging_text + stage[1])
4253 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4254 stage, nslcmop_id, nsr_id=nsr_id)
4255 if exc:
4256 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4257 nslcmop_operation_state = "FAILED"
4258 if db_nsr:
4259 db_nsr_update["operational-status"] = old_operational_status
4260 db_nsr_update["config-status"] = old_config_status
4261 db_nsr_update["detailed-status"] = ""
4262 if scale_process:
4263 if "VCA" in scale_process:
4264 db_nsr_update["config-status"] = "failed"
4265 if "RO" in scale_process:
4266 db_nsr_update["operational-status"] = "failed"
4267 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4268 exc)
4269 else:
4270 error_description_nslcmop = None
4271 nslcmop_operation_state = "COMPLETED"
4272 db_nslcmop_update["detailed-status"] = "Done"
4273
4274 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4275 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4276 if db_nsr:
4277 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4278 current_operation_id=None, other_update=db_nsr_update)
4279
4280 if nslcmop_operation_state:
4281 try:
4282 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4283 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4284 except Exception as e:
4285 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4286 self.logger.debug(logging_text + "Exit")
4287 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4288
4289 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4290 nsr_id = db_nslcmop["nsInstanceId"]
4291 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4292 db_vnfrs = {}
4293
4294 # read from db: vnfd's for every vnf
4295 db_vnfds = []
4296
4297 # for each vnf in ns, read vnfd
4298 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4299 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4300 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4301 # if we haven't this vnfd, read it from db
4302 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4303 # read from db
4304 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4305 db_vnfds.append(vnfd)
4306 n2vc_key = self.n2vc.get_public_key()
4307 n2vc_key_list = [n2vc_key]
4308 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4309 mark_delete=True)
4310 # db_vnfr has been updated, update db_vnfrs to use it
4311 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4312 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4313 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4314 timeout_ns_deploy=self.timeout_ns_deploy)
4315 if vdu_scaling_info.get("vdu-delete"):
4316 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4317
4318 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4319 if not self.prometheus:
4320 return
4321 # look if exist a file called 'prometheus*.j2' and
4322 artifact_content = self.fs.dir_ls(artifact_path)
4323 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4324 if not job_file:
4325 return
4326 with self.fs.file_open((artifact_path, job_file), "r") as f:
4327 job_data = f.read()
4328
4329 # TODO get_service
4330 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4331 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4332 host_port = "80"
4333 vnfr_id = vnfr_id.replace("-", "")
4334 variables = {
4335 "JOB_NAME": vnfr_id,
4336 "TARGET_IP": target_ip,
4337 "EXPORTER_POD_IP": host_name,
4338 "EXPORTER_POD_PORT": host_port,
4339 }
4340 job_list = self.prometheus.parse_job(job_data, variables)
4341 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4342 for job in job_list:
4343 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4344 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4345 job["nsr_id"] = nsr_id
4346 job_dict = {jl["job_name"]: jl for jl in job_list}
4347 if await self.prometheus.update(job_dict):
4348 return list(job_dict.keys())
4349
4350 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4351 """
4352 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4353
4354 :param: vim_account_id: VIM Account ID
4355
4356 :return: (cloud_name, cloud_credential)
4357 """
4358 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4359 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4360
4361 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4362 """
4363 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4364
4365 :param: vim_account_id: VIM Account ID
4366
4367 :return: (cloud_name, cloud_credential)
4368 """
4369 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4370 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")