7a23508b2180383c10749bb21fae37593d67c559
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73
74 from n2vc.n2vc_juju_conn import N2VCJujuConnector
75 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
76
77 from osm_lcm.lcm_helm_conn import LCMHelmConn
78
79 from copy import copy, deepcopy
80 from time import time
81 from uuid import uuid4
82
83 from random import randint
84
85 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
86
87
88 class NsLcm(LcmBase):
89 timeout_vca_on_error = (
90 5 * 60
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete = 10 * 60
95 timeout_primitive = 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive = (
97 10 * 60
98 ) # timeout for some progress in a primitive execution
99
100 SUBOPERATION_STATUS_NOT_FOUND = -1
101 SUBOPERATION_STATUS_NEW = -2
102 SUBOPERATION_STATUS_SKIP = -3
103 task_name_deploy_vca = "Deploying VCA"
104
105 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
106 """
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
109 :return: None
110 """
111 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
112
113 self.db = Database().instance.db
114 self.fs = Filesystem().instance.fs
115 self.loop = loop
116 self.lcm_tasks = lcm_tasks
117 self.timeout = config["timeout"]
118 self.ro_config = config["ro_config"]
119 self.ng_ro = config["ro_config"].get("ng")
120 self.vca_config = config["VCA"].copy()
121
122 # create N2VC connector
123 self.n2vc = N2VCJujuConnector(
124 log=self.logger,
125 loop=self.loop,
126 on_update_db=self._on_update_n2vc_db,
127 fs=self.fs,
128 db=self.db,
129 )
130
131 self.conn_helm_ee = LCMHelmConn(
132 log=self.logger,
133 loop=self.loop,
134 vca_config=self.vca_config,
135 on_update_db=self._on_update_n2vc_db,
136 )
137
138 self.k8sclusterhelm2 = K8sHelmConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 helm_command=self.vca_config.get("helmpath"),
141 log=self.logger,
142 on_update_db=None,
143 fs=self.fs,
144 db=self.db,
145 )
146
147 self.k8sclusterhelm3 = K8sHelm3Connector(
148 kubectl_command=self.vca_config.get("kubectlpath"),
149 helm_command=self.vca_config.get("helm3path"),
150 fs=self.fs,
151 log=self.logger,
152 db=self.db,
153 on_update_db=None,
154 )
155
156 self.k8sclusterjuju = K8sJujuConnector(
157 kubectl_command=self.vca_config.get("kubectlpath"),
158 juju_command=self.vca_config.get("jujupath"),
159 log=self.logger,
160 loop=self.loop,
161 on_update_db=self._on_update_k8s_db,
162 fs=self.fs,
163 db=self.db,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee,
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 self.RO = NgRoClient(self.loop, **self.ro_config)
186
187 @staticmethod
188 def increment_ip_mac(ip_mac, vm_index=1):
189 if not isinstance(ip_mac, str):
190 return ip_mac
191 try:
192 # try with ipv4 look for last dot
193 i = ip_mac.rfind(".")
194 if i > 0:
195 i += 1
196 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i = ip_mac.rfind(":")
199 if i > 0:
200 i += 1
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
203 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
204 )
205 except Exception:
206 pass
207 return None
208
209 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
210
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
212
213 try:
214 # TODO filter RO descriptor fields...
215
216 # write to database
217 db_dict = dict()
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict["deploymentStatus"] = ro_descriptor
220 self.update_db_2("nsrs", nsrs_id, db_dict)
221
222 except Exception as e:
223 self.logger.warn(
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
225 )
226
227 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
228
229 # remove last dot from path (if exists)
230 if path.endswith("."):
231 path = path[:-1]
232
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
235 try:
236
237 nsr_id = filter.get("_id")
238
239 # read ns record from database
240 nsr = self.db.get_one(table="nsrs", q_filter=filter)
241 current_ns_status = nsr.get("nsState")
242
243 # get vca status for NS
244 status_dict = await self.n2vc.get_status(
245 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
246 )
247
248 # vcaStatus
249 db_dict = dict()
250 db_dict["vcaStatus"] = status_dict
251 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
252
253 # update configurationStatus for this VCA
254 try:
255 vca_index = int(path[path.rfind(".") + 1 :])
256
257 vca_list = deep_get(
258 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
259 )
260 vca_status = vca_list[vca_index].get("status")
261
262 configuration_status_list = nsr.get("configurationStatus")
263 config_status = configuration_status_list[vca_index].get("status")
264
265 if config_status == "BROKEN" and vca_status != "failed":
266 db_dict["configurationStatus"][vca_index] = "READY"
267 elif config_status != "BROKEN" and vca_status == "failed":
268 db_dict["configurationStatus"][vca_index] = "BROKEN"
269 except Exception as e:
270 # not update configurationStatus
271 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
272
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
275 is_degraded = False
276 if current_ns_status in ("READY", "DEGRADED"):
277 error_description = ""
278 # check machines
279 if status_dict.get("machines"):
280 for machine_id in status_dict.get("machines"):
281 machine = status_dict.get("machines").get(machine_id)
282 # check machine agent-status
283 if machine.get("agent-status"):
284 s = machine.get("agent-status").get("status")
285 if s != "started":
286 is_degraded = True
287 error_description += (
288 "machine {} agent-status={} ; ".format(
289 machine_id, s
290 )
291 )
292 # check machine instance status
293 if machine.get("instance-status"):
294 s = machine.get("instance-status").get("status")
295 if s != "running":
296 is_degraded = True
297 error_description += (
298 "machine {} instance-status={} ; ".format(
299 machine_id, s
300 )
301 )
302 # check applications
303 if status_dict.get("applications"):
304 for app_id in status_dict.get("applications"):
305 app = status_dict.get("applications").get(app_id)
306 # check application status
307 if app.get("status"):
308 s = app.get("status").get("status")
309 if s != "active":
310 is_degraded = True
311 error_description += (
312 "application {} status={} ; ".format(app_id, s)
313 )
314
315 if error_description:
316 db_dict["errorDescription"] = error_description
317 if current_ns_status == "READY" and is_degraded:
318 db_dict["nsState"] = "DEGRADED"
319 if current_ns_status == "DEGRADED" and not is_degraded:
320 db_dict["nsState"] = "READY"
321
322 # write to database
323 self.update_db_2("nsrs", nsr_id, db_dict)
324
325 except (asyncio.CancelledError, asyncio.TimeoutError):
326 raise
327 except Exception as e:
328 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
329
330 async def _on_update_k8s_db(
331 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
332 ):
333 """
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :return: none
339 """
340
341 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
342 # .format(cluster_uuid, kdu_instance, filter))
343
344 try:
345 nsr_id = filter.get("_id")
346
347 # get vca status for NS
348 vca_status = await self.k8sclusterjuju.status_kdu(
349 cluster_uuid,
350 kdu_instance,
351 complete_status=True,
352 yaml_format=False,
353 vca_id=vca_id,
354 )
355 # vcaStatus
356 db_dict = dict()
357 db_dict["vcaStatus"] = {nsr_id: vca_status}
358
359 await self.k8sclusterjuju.update_vca_status(
360 db_dict["vcaStatus"],
361 kdu_instance,
362 vca_id=vca_id,
363 )
364
365 # write to database
366 self.update_db_2("nsrs", nsr_id, db_dict)
367
368 except (asyncio.CancelledError, asyncio.TimeoutError):
369 raise
370 except Exception as e:
371 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
372
373 @staticmethod
374 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
375 try:
376 env = Environment(undefined=StrictUndefined)
377 template = env.from_string(cloud_init_text)
378 return template.render(additional_params or {})
379 except UndefinedError as e:
380 raise LcmException(
381 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
382 "file, must be provided in the instantiation parameters inside the "
383 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
384 )
385 except (TemplateError, TemplateNotFound) as e:
386 raise LcmException(
387 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
388 vnfd_id, vdu_id, e
389 )
390 )
391
392 def _get_vdu_cloud_init_content(self, vdu, vnfd):
393 cloud_init_content = cloud_init_file = None
394 try:
395 if vdu.get("cloud-init-file"):
396 base_folder = vnfd["_admin"]["storage"]
397 cloud_init_file = "{}/{}/cloud_init/{}".format(
398 base_folder["folder"],
399 base_folder["pkg-dir"],
400 vdu["cloud-init-file"],
401 )
402 with self.fs.file_open(cloud_init_file, "r") as ci_file:
403 cloud_init_content = ci_file.read()
404 elif vdu.get("cloud-init"):
405 cloud_init_content = vdu["cloud-init"]
406
407 return cloud_init_content
408 except FsException as e:
409 raise LcmException(
410 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
411 vnfd["id"], vdu["id"], cloud_init_file, e
412 )
413 )
414
415 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
416 vdur = next(
417 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
418 {}
419 )
420 additional_params = vdur.get("additionalParams")
421 return parse_yaml_strings(additional_params)
422
423 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
424 """
425 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
426 :param vnfd: input vnfd
427 :param new_id: overrides vnf id if provided
428 :param additionalParams: Instantiation params for VNFs provided
429 :param nsrId: Id of the NSR
430 :return: copy of vnfd
431 """
432 vnfd_RO = deepcopy(vnfd)
433 # remove unused by RO configuration, monitoring, scaling and internal keys
434 vnfd_RO.pop("_id", None)
435 vnfd_RO.pop("_admin", None)
436 vnfd_RO.pop("monitoring-param", None)
437 vnfd_RO.pop("scaling-group-descriptor", None)
438 vnfd_RO.pop("kdu", None)
439 vnfd_RO.pop("k8s-cluster", None)
440 if new_id:
441 vnfd_RO["id"] = new_id
442
443 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
444 for vdu in get_iterable(vnfd_RO, "vdu"):
445 vdu.pop("cloud-init-file", None)
446 vdu.pop("cloud-init", None)
447 return vnfd_RO
448
449 @staticmethod
450 def ip_profile_2_RO(ip_profile):
451 RO_ip_profile = deepcopy(ip_profile)
452 if "dns-server" in RO_ip_profile:
453 if isinstance(RO_ip_profile["dns-server"], list):
454 RO_ip_profile["dns-address"] = []
455 for ds in RO_ip_profile.pop("dns-server"):
456 RO_ip_profile["dns-address"].append(ds["address"])
457 else:
458 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
459 if RO_ip_profile.get("ip-version") == "ipv4":
460 RO_ip_profile["ip-version"] = "IPv4"
461 if RO_ip_profile.get("ip-version") == "ipv6":
462 RO_ip_profile["ip-version"] = "IPv6"
463 if "dhcp-params" in RO_ip_profile:
464 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
465 return RO_ip_profile
466
467 def _get_ro_vim_id_for_vim_account(self, vim_account):
468 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
469 if db_vim["_admin"]["operationalState"] != "ENABLED":
470 raise LcmException(
471 "VIM={} is not available. operationalState={}".format(
472 vim_account, db_vim["_admin"]["operationalState"]
473 )
474 )
475 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
476 return RO_vim_id
477
478 def get_ro_wim_id_for_wim_account(self, wim_account):
479 if isinstance(wim_account, str):
480 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
481 if db_wim["_admin"]["operationalState"] != "ENABLED":
482 raise LcmException(
483 "WIM={} is not available. operationalState={}".format(
484 wim_account, db_wim["_admin"]["operationalState"]
485 )
486 )
487 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
488 return RO_wim_id
489 else:
490 return wim_account
491
492 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
493
494 db_vdu_push_list = []
495 template_vdur = []
496 db_update = {"_admin.modified": time()}
497 if vdu_create:
498 for vdu_id, vdu_count in vdu_create.items():
499 vdur = next(
500 (
501 vdur
502 for vdur in reversed(db_vnfr["vdur"])
503 if vdur["vdu-id-ref"] == vdu_id
504 ),
505 None,
506 )
507 if not vdur:
508 # Read the template saved in the db:
509 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
510 vdur_template = db_vnfr.get("vdur-template")
511 if not vdur_template:
512 raise LcmException(
513 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
514 vdu_id
515 )
516 )
517 vdur = vdur_template[0]
518 #Delete a template from the database after using it
519 self.db.set_one("vnfrs",
520 {"_id": db_vnfr["_id"]},
521 None,
522 pull={"vdur-template": {"_id": vdur['_id']}}
523 )
524 for count in range(vdu_count):
525 vdur_copy = deepcopy(vdur)
526 vdur_copy["status"] = "BUILD"
527 vdur_copy["status-detailed"] = None
528 vdur_copy["ip-address"] = None
529 vdur_copy["_id"] = str(uuid4())
530 vdur_copy["count-index"] += count + 1
531 vdur_copy["id"] = "{}-{}".format(
532 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
533 )
534 vdur_copy.pop("vim_info", None)
535 for iface in vdur_copy["interfaces"]:
536 if iface.get("fixed-ip"):
537 iface["ip-address"] = self.increment_ip_mac(
538 iface["ip-address"], count + 1
539 )
540 else:
541 iface.pop("ip-address", None)
542 if iface.get("fixed-mac"):
543 iface["mac-address"] = self.increment_ip_mac(
544 iface["mac-address"], count + 1
545 )
546 else:
547 iface.pop("mac-address", None)
548 if db_vnfr["vdur"]:
549 iface.pop(
550 "mgmt_vnf", None
551 ) # only first vdu can be managment of vnf
552 db_vdu_push_list.append(vdur_copy)
553 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
554 if vdu_delete:
555 if len(db_vnfr["vdur"]) == 1:
556 # The scale will move to 0 instances
557 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
558 template_vdur = [db_vnfr["vdur"][0]]
559 for vdu_id, vdu_count in vdu_delete.items():
560 if mark_delete:
561 indexes_to_delete = [
562 iv[0]
563 for iv in enumerate(db_vnfr["vdur"])
564 if iv[1]["vdu-id-ref"] == vdu_id
565 ]
566 db_update.update(
567 {
568 "vdur.{}.status".format(i): "DELETING"
569 for i in indexes_to_delete[-vdu_count:]
570 }
571 )
572 else:
573 # it must be deleted one by one because common.db does not allow otherwise
574 vdus_to_delete = [
575 v
576 for v in reversed(db_vnfr["vdur"])
577 if v["vdu-id-ref"] == vdu_id
578 ]
579 for vdu in vdus_to_delete[:vdu_count]:
580 self.db.set_one(
581 "vnfrs",
582 {"_id": db_vnfr["_id"]},
583 None,
584 pull={"vdur": {"_id": vdu["_id"]}},
585 )
586 db_push = {}
587 if db_vdu_push_list:
588 db_push["vdur"] = db_vdu_push_list
589 if template_vdur:
590 db_push["vdur-template"] = template_vdur
591 if not db_push:
592 db_push = None
593 db_vnfr["vdur-template"] = template_vdur
594 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
595 # modify passed dictionary db_vnfr
596 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
597 db_vnfr["vdur"] = db_vnfr_["vdur"]
598
599 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
600 """
601 Updates database nsr with the RO info for the created vld
602 :param ns_update_nsr: dictionary to be filled with the updated info
603 :param db_nsr: content of db_nsr. This is also modified
604 :param nsr_desc_RO: nsr descriptor from RO
605 :return: Nothing, LcmException is raised on errors
606 """
607
608 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
609 for net_RO in get_iterable(nsr_desc_RO, "nets"):
610 if vld["id"] != net_RO.get("ns_net_osm_id"):
611 continue
612 vld["vim-id"] = net_RO.get("vim_net_id")
613 vld["name"] = net_RO.get("vim_name")
614 vld["status"] = net_RO.get("status")
615 vld["status-detailed"] = net_RO.get("error_msg")
616 ns_update_nsr["vld.{}".format(vld_index)] = vld
617 break
618 else:
619 raise LcmException(
620 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
621 )
622
623 def set_vnfr_at_error(self, db_vnfrs, error_text):
624 try:
625 for db_vnfr in db_vnfrs.values():
626 vnfr_update = {"status": "ERROR"}
627 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
628 if "status" not in vdur:
629 vdur["status"] = "ERROR"
630 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
631 if error_text:
632 vdur["status-detailed"] = str(error_text)
633 vnfr_update[
634 "vdur.{}.status-detailed".format(vdu_index)
635 ] = "ERROR"
636 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
637 except DbException as e:
638 self.logger.error("Cannot update vnf. {}".format(e))
639
640 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
641 """
642 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
643 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
646 """
647 for vnf_index, db_vnfr in db_vnfrs.items():
648 for vnf_RO in nsr_desc_RO["vnfs"]:
649 if vnf_RO["member_vnf_index"] != vnf_index:
650 continue
651 vnfr_update = {}
652 if vnf_RO.get("ip_address"):
653 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
654 "ip_address"
655 ].split(";")[0]
656 elif not db_vnfr.get("ip-address"):
657 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
658 raise LcmExceptionNoMgmtIP(
659 "ns member_vnf_index '{}' has no IP address".format(
660 vnf_index
661 )
662 )
663
664 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
665 vdur_RO_count_index = 0
666 if vdur.get("pdu-type"):
667 continue
668 for vdur_RO in get_iterable(vnf_RO, "vms"):
669 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
670 continue
671 if vdur["count-index"] != vdur_RO_count_index:
672 vdur_RO_count_index += 1
673 continue
674 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
675 if vdur_RO.get("ip_address"):
676 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
677 else:
678 vdur["ip-address"] = None
679 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
680 vdur["name"] = vdur_RO.get("vim_name")
681 vdur["status"] = vdur_RO.get("status")
682 vdur["status-detailed"] = vdur_RO.get("error_msg")
683 for ifacer in get_iterable(vdur, "interfaces"):
684 for interface_RO in get_iterable(vdur_RO, "interfaces"):
685 if ifacer["name"] == interface_RO.get("internal_name"):
686 ifacer["ip-address"] = interface_RO.get(
687 "ip_address"
688 )
689 ifacer["mac-address"] = interface_RO.get(
690 "mac_address"
691 )
692 break
693 else:
694 raise LcmException(
695 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
696 "from VIM info".format(
697 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
698 )
699 )
700 vnfr_update["vdur.{}".format(vdu_index)] = vdur
701 break
702 else:
703 raise LcmException(
704 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
705 "VIM info".format(
706 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
707 )
708 )
709
710 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
711 for net_RO in get_iterable(nsr_desc_RO, "nets"):
712 if vld["id"] != net_RO.get("vnf_net_osm_id"):
713 continue
714 vld["vim-id"] = net_RO.get("vim_net_id")
715 vld["name"] = net_RO.get("vim_name")
716 vld["status"] = net_RO.get("status")
717 vld["status-detailed"] = net_RO.get("error_msg")
718 vnfr_update["vld.{}".format(vld_index)] = vld
719 break
720 else:
721 raise LcmException(
722 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
723 vnf_index, vld["id"]
724 )
725 )
726
727 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
728 break
729
730 else:
731 raise LcmException(
732 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
733 vnf_index
734 )
735 )
736
737 def _get_ns_config_info(self, nsr_id):
738 """
739 Generates a mapping between vnf,vdu elements and the N2VC id
740 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
741 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
742 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
743 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
744 """
745 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
746 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
747 mapping = {}
748 ns_config_info = {"osm-config-mapping": mapping}
749 for vca in vca_deployed_list:
750 if not vca["member-vnf-index"]:
751 continue
752 if not vca["vdu_id"]:
753 mapping[vca["member-vnf-index"]] = vca["application"]
754 else:
755 mapping[
756 "{}.{}.{}".format(
757 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
758 )
759 ] = vca["application"]
760 return ns_config_info
761
762 async def _instantiate_ng_ro(
763 self,
764 logging_text,
765 nsr_id,
766 nsd,
767 db_nsr,
768 db_nslcmop,
769 db_vnfrs,
770 db_vnfds,
771 n2vc_key_list,
772 stage,
773 start_deploy,
774 timeout_ns_deploy,
775 ):
776
777 db_vims = {}
778
779 def get_vim_account(vim_account_id):
780 nonlocal db_vims
781 if vim_account_id in db_vims:
782 return db_vims[vim_account_id]
783 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
784 db_vims[vim_account_id] = db_vim
785 return db_vim
786
787 # modify target_vld info with instantiation parameters
788 def parse_vld_instantiation_params(
789 target_vim, target_vld, vld_params, target_sdn
790 ):
791 if vld_params.get("ip-profile"):
792 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
793 "ip-profile"
794 ]
795 if vld_params.get("provider-network"):
796 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
797 "provider-network"
798 ]
799 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
800 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
801 "provider-network"
802 ]["sdn-ports"]
803 if vld_params.get("wimAccountId"):
804 target_wim = "wim:{}".format(vld_params["wimAccountId"])
805 target_vld["vim_info"][target_wim] = {}
806 for param in ("vim-network-name", "vim-network-id"):
807 if vld_params.get(param):
808 if isinstance(vld_params[param], dict):
809 for vim, vim_net in vld_params[param].items():
810 other_target_vim = "vim:" + vim
811 populate_dict(
812 target_vld["vim_info"],
813 (other_target_vim, param.replace("-", "_")),
814 vim_net,
815 )
816 else: # isinstance str
817 target_vld["vim_info"][target_vim][
818 param.replace("-", "_")
819 ] = vld_params[param]
820 if vld_params.get("common_id"):
821 target_vld["common_id"] = vld_params.get("common_id")
822
823 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
824 def update_ns_vld_target(target, ns_params):
825 for vnf_params in ns_params.get("vnf", ()):
826 if vnf_params.get("vimAccountId"):
827 target_vnf = next(
828 (
829 vnfr
830 for vnfr in db_vnfrs.values()
831 if vnf_params["member-vnf-index"]
832 == vnfr["member-vnf-index-ref"]
833 ),
834 None,
835 )
836 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
837 for a_index, a_vld in enumerate(target["ns"]["vld"]):
838 target_vld = find_in_list(
839 get_iterable(vdur, "interfaces"),
840 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
841 )
842 if target_vld:
843 if vnf_params.get("vimAccountId") not in a_vld.get(
844 "vim_info", {}
845 ):
846 target["ns"]["vld"][a_index].get("vim_info").update(
847 {
848 "vim:{}".format(vnf_params["vimAccountId"]): {
849 "vim_network_name": ""
850 }
851 }
852 )
853
854 nslcmop_id = db_nslcmop["_id"]
855 target = {
856 "name": db_nsr["name"],
857 "ns": {"vld": []},
858 "vnf": [],
859 "image": deepcopy(db_nsr["image"]),
860 "flavor": deepcopy(db_nsr["flavor"]),
861 "action_id": nslcmop_id,
862 "cloud_init_content": {},
863 }
864 for image in target["image"]:
865 image["vim_info"] = {}
866 for flavor in target["flavor"]:
867 flavor["vim_info"] = {}
868
869 if db_nslcmop.get("lcmOperationType") != "instantiate":
870 # get parameters of instantiation:
871 db_nslcmop_instantiate = self.db.get_list(
872 "nslcmops",
873 {
874 "nsInstanceId": db_nslcmop["nsInstanceId"],
875 "lcmOperationType": "instantiate",
876 },
877 )[-1]
878 ns_params = db_nslcmop_instantiate.get("operationParams")
879 else:
880 ns_params = db_nslcmop.get("operationParams")
881 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
882 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
883
884 cp2target = {}
885 for vld_index, vld in enumerate(db_nsr.get("vld")):
886 target_vim = "vim:{}".format(ns_params["vimAccountId"])
887 target_vld = {
888 "id": vld["id"],
889 "name": vld["name"],
890 "mgmt-network": vld.get("mgmt-network", False),
891 "type": vld.get("type"),
892 "vim_info": {
893 target_vim: {
894 "vim_network_name": vld.get("vim-network-name"),
895 "vim_account_id": ns_params["vimAccountId"],
896 }
897 },
898 }
899 # check if this network needs SDN assist
900 if vld.get("pci-interfaces"):
901 db_vim = get_vim_account(ns_params["vimAccountId"])
902 sdnc_id = db_vim["config"].get("sdn-controller")
903 if sdnc_id:
904 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
905 target_sdn = "sdn:{}".format(sdnc_id)
906 target_vld["vim_info"][target_sdn] = {
907 "sdn": True,
908 "target_vim": target_vim,
909 "vlds": [sdn_vld],
910 "type": vld.get("type"),
911 }
912
913 nsd_vnf_profiles = get_vnf_profiles(nsd)
914 for nsd_vnf_profile in nsd_vnf_profiles:
915 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
916 if cp["virtual-link-profile-id"] == vld["id"]:
917 cp2target[
918 "member_vnf:{}.{}".format(
919 cp["constituent-cpd-id"][0][
920 "constituent-base-element-id"
921 ],
922 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
923 )
924 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
925
926 # check at nsd descriptor, if there is an ip-profile
927 vld_params = {}
928 nsd_vlp = find_in_list(
929 get_virtual_link_profiles(nsd),
930 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
931 == vld["id"],
932 )
933 if (
934 nsd_vlp
935 and nsd_vlp.get("virtual-link-protocol-data")
936 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
937 ):
938 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
939 "l3-protocol-data"
940 ]
941 ip_profile_dest_data = {}
942 if "ip-version" in ip_profile_source_data:
943 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
944 "ip-version"
945 ]
946 if "cidr" in ip_profile_source_data:
947 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
948 "cidr"
949 ]
950 if "gateway-ip" in ip_profile_source_data:
951 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
952 "gateway-ip"
953 ]
954 if "dhcp-enabled" in ip_profile_source_data:
955 ip_profile_dest_data["dhcp-params"] = {
956 "enabled": ip_profile_source_data["dhcp-enabled"]
957 }
958 vld_params["ip-profile"] = ip_profile_dest_data
959
960 # update vld_params with instantiation params
961 vld_instantiation_params = find_in_list(
962 get_iterable(ns_params, "vld"),
963 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
964 )
965 if vld_instantiation_params:
966 vld_params.update(vld_instantiation_params)
967 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
968 target["ns"]["vld"].append(target_vld)
969 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
970 update_ns_vld_target(target, ns_params)
971
972 for vnfr in db_vnfrs.values():
973 vnfd = find_in_list(
974 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
975 )
976 vnf_params = find_in_list(
977 get_iterable(ns_params, "vnf"),
978 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
979 )
980 target_vnf = deepcopy(vnfr)
981 target_vim = "vim:{}".format(vnfr["vim-account-id"])
982 for vld in target_vnf.get("vld", ()):
983 # check if connected to a ns.vld, to fill target'
984 vnf_cp = find_in_list(
985 vnfd.get("int-virtual-link-desc", ()),
986 lambda cpd: cpd.get("id") == vld["id"],
987 )
988 if vnf_cp:
989 ns_cp = "member_vnf:{}.{}".format(
990 vnfr["member-vnf-index-ref"], vnf_cp["id"]
991 )
992 if cp2target.get(ns_cp):
993 vld["target"] = cp2target[ns_cp]
994
995 vld["vim_info"] = {
996 target_vim: {"vim_network_name": vld.get("vim-network-name")}
997 }
998 # check if this network needs SDN assist
999 target_sdn = None
1000 if vld.get("pci-interfaces"):
1001 db_vim = get_vim_account(vnfr["vim-account-id"])
1002 sdnc_id = db_vim["config"].get("sdn-controller")
1003 if sdnc_id:
1004 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1005 target_sdn = "sdn:{}".format(sdnc_id)
1006 vld["vim_info"][target_sdn] = {
1007 "sdn": True,
1008 "target_vim": target_vim,
1009 "vlds": [sdn_vld],
1010 "type": vld.get("type"),
1011 }
1012
1013 # check at vnfd descriptor, if there is an ip-profile
1014 vld_params = {}
1015 vnfd_vlp = find_in_list(
1016 get_virtual_link_profiles(vnfd),
1017 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1018 )
1019 if (
1020 vnfd_vlp
1021 and vnfd_vlp.get("virtual-link-protocol-data")
1022 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1023 ):
1024 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1025 "l3-protocol-data"
1026 ]
1027 ip_profile_dest_data = {}
1028 if "ip-version" in ip_profile_source_data:
1029 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1030 "ip-version"
1031 ]
1032 if "cidr" in ip_profile_source_data:
1033 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1034 "cidr"
1035 ]
1036 if "gateway-ip" in ip_profile_source_data:
1037 ip_profile_dest_data[
1038 "gateway-address"
1039 ] = ip_profile_source_data["gateway-ip"]
1040 if "dhcp-enabled" in ip_profile_source_data:
1041 ip_profile_dest_data["dhcp-params"] = {
1042 "enabled": ip_profile_source_data["dhcp-enabled"]
1043 }
1044
1045 vld_params["ip-profile"] = ip_profile_dest_data
1046 # update vld_params with instantiation params
1047 if vnf_params:
1048 vld_instantiation_params = find_in_list(
1049 get_iterable(vnf_params, "internal-vld"),
1050 lambda i_vld: i_vld["name"] == vld["id"],
1051 )
1052 if vld_instantiation_params:
1053 vld_params.update(vld_instantiation_params)
1054 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1055
1056 vdur_list = []
1057 for vdur in target_vnf.get("vdur", ()):
1058 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1059 continue # This vdu must not be created
1060 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1061
1062 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1063
1064 if ssh_keys_all:
1065 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1066 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1067 if (
1068 vdu_configuration
1069 and vdu_configuration.get("config-access")
1070 and vdu_configuration.get("config-access").get("ssh-access")
1071 ):
1072 vdur["ssh-keys"] = ssh_keys_all
1073 vdur["ssh-access-required"] = vdu_configuration[
1074 "config-access"
1075 ]["ssh-access"]["required"]
1076 elif (
1077 vnf_configuration
1078 and vnf_configuration.get("config-access")
1079 and vnf_configuration.get("config-access").get("ssh-access")
1080 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1081 ):
1082 vdur["ssh-keys"] = ssh_keys_all
1083 vdur["ssh-access-required"] = vnf_configuration[
1084 "config-access"
1085 ]["ssh-access"]["required"]
1086 elif ssh_keys_instantiation and find_in_list(
1087 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1088 ):
1089 vdur["ssh-keys"] = ssh_keys_instantiation
1090
1091 self.logger.debug("NS > vdur > {}".format(vdur))
1092
1093 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1094 # cloud-init
1095 if vdud.get("cloud-init-file"):
1096 vdur["cloud-init"] = "{}:file:{}".format(
1097 vnfd["_id"], vdud.get("cloud-init-file")
1098 )
1099 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1100 if vdur["cloud-init"] not in target["cloud_init_content"]:
1101 base_folder = vnfd["_admin"]["storage"]
1102 cloud_init_file = "{}/{}/cloud_init/{}".format(
1103 base_folder["folder"],
1104 base_folder["pkg-dir"],
1105 vdud.get("cloud-init-file"),
1106 )
1107 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1108 target["cloud_init_content"][
1109 vdur["cloud-init"]
1110 ] = ci_file.read()
1111 elif vdud.get("cloud-init"):
1112 vdur["cloud-init"] = "{}:vdu:{}".format(
1113 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1114 )
1115 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1116 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1117 "cloud-init"
1118 ]
1119 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1120 deploy_params_vdu = self._format_additional_params(
1121 vdur.get("additionalParams") or {}
1122 )
1123 deploy_params_vdu["OSM"] = get_osm_params(
1124 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1125 )
1126 vdur["additionalParams"] = deploy_params_vdu
1127
1128 # flavor
1129 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1130 if target_vim not in ns_flavor["vim_info"]:
1131 ns_flavor["vim_info"][target_vim] = {}
1132
1133 # deal with images
1134 # in case alternative images are provided we must check if they should be applied
1135 # for the vim_type, modify the vim_type taking into account
1136 ns_image_id = int(vdur["ns-image-id"])
1137 if vdur.get("alt-image-ids"):
1138 db_vim = get_vim_account(vnfr["vim-account-id"])
1139 vim_type = db_vim["vim_type"]
1140 for alt_image_id in vdur.get("alt-image-ids"):
1141 ns_alt_image = target["image"][int(alt_image_id)]
1142 if vim_type == ns_alt_image.get("vim-type"):
1143 # must use alternative image
1144 self.logger.debug(
1145 "use alternative image id: {}".format(alt_image_id)
1146 )
1147 ns_image_id = alt_image_id
1148 vdur["ns-image-id"] = ns_image_id
1149 break
1150 ns_image = target["image"][int(ns_image_id)]
1151 if target_vim not in ns_image["vim_info"]:
1152 ns_image["vim_info"][target_vim] = {}
1153
1154 vdur["vim_info"] = {target_vim: {}}
1155 # instantiation parameters
1156 # if vnf_params:
1157 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1158 # vdud["id"]), None)
1159 vdur_list.append(vdur)
1160 target_vnf["vdur"] = vdur_list
1161 target["vnf"].append(target_vnf)
1162
1163 desc = await self.RO.deploy(nsr_id, target)
1164 self.logger.debug("RO return > {}".format(desc))
1165 action_id = desc["action_id"]
1166 await self._wait_ng_ro(
1167 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1168 )
1169
1170 # Updating NSR
1171 db_nsr_update = {
1172 "_admin.deployed.RO.operational-status": "running",
1173 "detailed-status": " ".join(stage),
1174 }
1175 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1176 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1177 self._write_op_status(nslcmop_id, stage)
1178 self.logger.debug(
1179 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1180 )
1181 return
1182
1183 async def _wait_ng_ro(
1184 self,
1185 nsr_id,
1186 action_id,
1187 nslcmop_id=None,
1188 start_time=None,
1189 timeout=600,
1190 stage=None,
1191 ):
1192 detailed_status_old = None
1193 db_nsr_update = {}
1194 start_time = start_time or time()
1195 while time() <= start_time + timeout:
1196 desc_status = await self.RO.status(nsr_id, action_id)
1197 self.logger.debug("Wait NG RO > {}".format(desc_status))
1198 if desc_status["status"] == "FAILED":
1199 raise NgRoException(desc_status["details"])
1200 elif desc_status["status"] == "BUILD":
1201 if stage:
1202 stage[2] = "VIM: ({})".format(desc_status["details"])
1203 elif desc_status["status"] == "DONE":
1204 if stage:
1205 stage[2] = "Deployed at VIM"
1206 break
1207 else:
1208 assert False, "ROclient.check_ns_status returns unknown {}".format(
1209 desc_status["status"]
1210 )
1211 if stage and nslcmop_id and stage[2] != detailed_status_old:
1212 detailed_status_old = stage[2]
1213 db_nsr_update["detailed-status"] = " ".join(stage)
1214 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1215 self._write_op_status(nslcmop_id, stage)
1216 await asyncio.sleep(15, loop=self.loop)
1217 else: # timeout_ns_deploy
1218 raise NgRoException("Timeout waiting ns to deploy")
1219
1220 async def _terminate_ng_ro(
1221 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1222 ):
1223 db_nsr_update = {}
1224 failed_detail = []
1225 action_id = None
1226 start_deploy = time()
1227 try:
1228 target = {
1229 "ns": {"vld": []},
1230 "vnf": [],
1231 "image": [],
1232 "flavor": [],
1233 "action_id": nslcmop_id,
1234 }
1235 desc = await self.RO.deploy(nsr_id, target)
1236 action_id = desc["action_id"]
1237 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1238 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1239 self.logger.debug(
1240 logging_text
1241 + "ns terminate action at RO. action_id={}".format(action_id)
1242 )
1243
1244 # wait until done
1245 delete_timeout = 20 * 60 # 20 minutes
1246 await self._wait_ng_ro(
1247 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1248 )
1249
1250 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1251 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1252 # delete all nsr
1253 await self.RO.delete(nsr_id)
1254 except Exception as e:
1255 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1256 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1257 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1258 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1259 self.logger.debug(
1260 logging_text + "RO_action_id={} already deleted".format(action_id)
1261 )
1262 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1263 failed_detail.append("delete conflict: {}".format(e))
1264 self.logger.debug(
1265 logging_text
1266 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1267 )
1268 else:
1269 failed_detail.append("delete error: {}".format(e))
1270 self.logger.error(
1271 logging_text
1272 + "RO_action_id={} delete error: {}".format(action_id, e)
1273 )
1274
1275 if failed_detail:
1276 stage[2] = "Error deleting from VIM"
1277 else:
1278 stage[2] = "Deleted from VIM"
1279 db_nsr_update["detailed-status"] = " ".join(stage)
1280 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1281 self._write_op_status(nslcmop_id, stage)
1282
1283 if failed_detail:
1284 raise LcmException("; ".join(failed_detail))
1285 return
1286
1287 async def instantiate_RO(
1288 self,
1289 logging_text,
1290 nsr_id,
1291 nsd,
1292 db_nsr,
1293 db_nslcmop,
1294 db_vnfrs,
1295 db_vnfds,
1296 n2vc_key_list,
1297 stage,
1298 ):
1299 """
1300 Instantiate at RO
1301 :param logging_text: preffix text to use at logging
1302 :param nsr_id: nsr identity
1303 :param nsd: database content of ns descriptor
1304 :param db_nsr: database content of ns record
1305 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1306 :param db_vnfrs:
1307 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1308 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1309 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1310 :return: None or exception
1311 """
1312 try:
1313 start_deploy = time()
1314 ns_params = db_nslcmop.get("operationParams")
1315 if ns_params and ns_params.get("timeout_ns_deploy"):
1316 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1317 else:
1318 timeout_ns_deploy = self.timeout.get(
1319 "ns_deploy", self.timeout_ns_deploy
1320 )
1321
1322 # Check for and optionally request placement optimization. Database will be updated if placement activated
1323 stage[2] = "Waiting for Placement."
1324 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1325 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1326 for vnfr in db_vnfrs.values():
1327 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1328 break
1329 else:
1330 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1331
1332 return await self._instantiate_ng_ro(
1333 logging_text,
1334 nsr_id,
1335 nsd,
1336 db_nsr,
1337 db_nslcmop,
1338 db_vnfrs,
1339 db_vnfds,
1340 n2vc_key_list,
1341 stage,
1342 start_deploy,
1343 timeout_ns_deploy,
1344 )
1345 except Exception as e:
1346 stage[2] = "ERROR deploying at VIM"
1347 self.set_vnfr_at_error(db_vnfrs, str(e))
1348 self.logger.error(
1349 "Error deploying at VIM {}".format(e),
1350 exc_info=not isinstance(
1351 e,
1352 (
1353 ROclient.ROClientException,
1354 LcmException,
1355 DbException,
1356 NgRoException,
1357 ),
1358 ),
1359 )
1360 raise
1361
1362 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1363 """
1364 Wait for kdu to be up, get ip address
1365 :param logging_text: prefix use for logging
1366 :param nsr_id:
1367 :param vnfr_id:
1368 :param kdu_name:
1369 :return: IP address
1370 """
1371
1372 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1373 nb_tries = 0
1374
1375 while nb_tries < 360:
1376 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1377 kdur = next(
1378 (
1379 x
1380 for x in get_iterable(db_vnfr, "kdur")
1381 if x.get("kdu-name") == kdu_name
1382 ),
1383 None,
1384 )
1385 if not kdur:
1386 raise LcmException(
1387 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1388 )
1389 if kdur.get("status"):
1390 if kdur["status"] in ("READY", "ENABLED"):
1391 return kdur.get("ip-address")
1392 else:
1393 raise LcmException(
1394 "target KDU={} is in error state".format(kdu_name)
1395 )
1396
1397 await asyncio.sleep(10, loop=self.loop)
1398 nb_tries += 1
1399 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1400
1401 async def wait_vm_up_insert_key_ro(
1402 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1403 ):
1404 """
1405 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1406 :param logging_text: prefix use for logging
1407 :param nsr_id:
1408 :param vnfr_id:
1409 :param vdu_id:
1410 :param vdu_index:
1411 :param pub_key: public ssh key to inject, None to skip
1412 :param user: user to apply the public ssh key
1413 :return: IP address
1414 """
1415
1416 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1417 ro_nsr_id = None
1418 ip_address = None
1419 nb_tries = 0
1420 target_vdu_id = None
1421 ro_retries = 0
1422
1423 while True:
1424
1425 ro_retries += 1
1426 if ro_retries >= 360: # 1 hour
1427 raise LcmException(
1428 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1429 )
1430
1431 await asyncio.sleep(10, loop=self.loop)
1432
1433 # get ip address
1434 if not target_vdu_id:
1435 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1436
1437 if not vdu_id: # for the VNF case
1438 if db_vnfr.get("status") == "ERROR":
1439 raise LcmException(
1440 "Cannot inject ssh-key because target VNF is in error state"
1441 )
1442 ip_address = db_vnfr.get("ip-address")
1443 if not ip_address:
1444 continue
1445 vdur = next(
1446 (
1447 x
1448 for x in get_iterable(db_vnfr, "vdur")
1449 if x.get("ip-address") == ip_address
1450 ),
1451 None,
1452 )
1453 else: # VDU case
1454 vdur = next(
1455 (
1456 x
1457 for x in get_iterable(db_vnfr, "vdur")
1458 if x.get("vdu-id-ref") == vdu_id
1459 and x.get("count-index") == vdu_index
1460 ),
1461 None,
1462 )
1463
1464 if (
1465 not vdur and len(db_vnfr.get("vdur", ())) == 1
1466 ): # If only one, this should be the target vdu
1467 vdur = db_vnfr["vdur"][0]
1468 if not vdur:
1469 raise LcmException(
1470 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1471 vnfr_id, vdu_id, vdu_index
1472 )
1473 )
1474 # New generation RO stores information at "vim_info"
1475 ng_ro_status = None
1476 target_vim = None
1477 if vdur.get("vim_info"):
1478 target_vim = next(
1479 t for t in vdur["vim_info"]
1480 ) # there should be only one key
1481 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1482 if (
1483 vdur.get("pdu-type")
1484 or vdur.get("status") == "ACTIVE"
1485 or ng_ro_status == "ACTIVE"
1486 ):
1487 ip_address = vdur.get("ip-address")
1488 if not ip_address:
1489 continue
1490 target_vdu_id = vdur["vdu-id-ref"]
1491 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1492 raise LcmException(
1493 "Cannot inject ssh-key because target VM is in error state"
1494 )
1495
1496 if not target_vdu_id:
1497 continue
1498
1499 # inject public key into machine
1500 if pub_key and user:
1501 self.logger.debug(logging_text + "Inserting RO key")
1502 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1503 if vdur.get("pdu-type"):
1504 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1505 return ip_address
1506 try:
1507 ro_vm_id = "{}-{}".format(
1508 db_vnfr["member-vnf-index-ref"], target_vdu_id
1509 ) # TODO add vdu_index
1510 if self.ng_ro:
1511 target = {
1512 "action": {
1513 "action": "inject_ssh_key",
1514 "key": pub_key,
1515 "user": user,
1516 },
1517 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1518 }
1519 desc = await self.RO.deploy(nsr_id, target)
1520 action_id = desc["action_id"]
1521 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1522 break
1523 else:
1524 # wait until NS is deployed at RO
1525 if not ro_nsr_id:
1526 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1527 ro_nsr_id = deep_get(
1528 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1529 )
1530 if not ro_nsr_id:
1531 continue
1532 result_dict = await self.RO.create_action(
1533 item="ns",
1534 item_id_name=ro_nsr_id,
1535 descriptor={
1536 "add_public_key": pub_key,
1537 "vms": [ro_vm_id],
1538 "user": user,
1539 },
1540 )
1541 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1542 if not result_dict or not isinstance(result_dict, dict):
1543 raise LcmException(
1544 "Unknown response from RO when injecting key"
1545 )
1546 for result in result_dict.values():
1547 if result.get("vim_result") == 200:
1548 break
1549 else:
1550 raise ROclient.ROClientException(
1551 "error injecting key: {}".format(
1552 result.get("description")
1553 )
1554 )
1555 break
1556 except NgRoException as e:
1557 raise LcmException(
1558 "Reaching max tries injecting key. Error: {}".format(e)
1559 )
1560 except ROclient.ROClientException as e:
1561 if not nb_tries:
1562 self.logger.debug(
1563 logging_text
1564 + "error injecting key: {}. Retrying until {} seconds".format(
1565 e, 20 * 10
1566 )
1567 )
1568 nb_tries += 1
1569 if nb_tries >= 20:
1570 raise LcmException(
1571 "Reaching max tries injecting key. Error: {}".format(e)
1572 )
1573 else:
1574 break
1575
1576 return ip_address
1577
1578 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1579 """
1580 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1581 """
1582 my_vca = vca_deployed_list[vca_index]
1583 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1584 # vdu or kdu: no dependencies
1585 return
1586 timeout = 300
1587 while timeout >= 0:
1588 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1589 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1590 configuration_status_list = db_nsr["configurationStatus"]
1591 for index, vca_deployed in enumerate(configuration_status_list):
1592 if index == vca_index:
1593 # myself
1594 continue
1595 if not my_vca.get("member-vnf-index") or (
1596 vca_deployed.get("member-vnf-index")
1597 == my_vca.get("member-vnf-index")
1598 ):
1599 internal_status = configuration_status_list[index].get("status")
1600 if internal_status == "READY":
1601 continue
1602 elif internal_status == "BROKEN":
1603 raise LcmException(
1604 "Configuration aborted because dependent charm/s has failed"
1605 )
1606 else:
1607 break
1608 else:
1609 # no dependencies, return
1610 return
1611 await asyncio.sleep(10)
1612 timeout -= 1
1613
1614 raise LcmException("Configuration aborted because dependent charm/s timeout")
1615
1616 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1617 vca_id = None
1618 if db_vnfr:
1619 vca_id = deep_get(db_vnfr, ("vca-id",))
1620 elif db_nsr:
1621 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1622 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1623 return vca_id
1624
1625 async def instantiate_N2VC(
1626 self,
1627 logging_text,
1628 vca_index,
1629 nsi_id,
1630 db_nsr,
1631 db_vnfr,
1632 vdu_id,
1633 kdu_name,
1634 vdu_index,
1635 config_descriptor,
1636 deploy_params,
1637 base_folder,
1638 nslcmop_id,
1639 stage,
1640 vca_type,
1641 vca_name,
1642 ee_config_descriptor,
1643 ):
1644 nsr_id = db_nsr["_id"]
1645 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1646 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1647 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1648 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1649 db_dict = {
1650 "collection": "nsrs",
1651 "filter": {"_id": nsr_id},
1652 "path": db_update_entry,
1653 }
1654 step = ""
1655 try:
1656
1657 element_type = "NS"
1658 element_under_configuration = nsr_id
1659
1660 vnfr_id = None
1661 if db_vnfr:
1662 vnfr_id = db_vnfr["_id"]
1663 osm_config["osm"]["vnf_id"] = vnfr_id
1664
1665 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1666
1667 if vca_type == "native_charm":
1668 index_number = 0
1669 else:
1670 index_number = vdu_index or 0
1671
1672 if vnfr_id:
1673 element_type = "VNF"
1674 element_under_configuration = vnfr_id
1675 namespace += ".{}-{}".format(vnfr_id, index_number)
1676 if vdu_id:
1677 namespace += ".{}-{}".format(vdu_id, index_number)
1678 element_type = "VDU"
1679 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1680 osm_config["osm"]["vdu_id"] = vdu_id
1681 elif kdu_name:
1682 namespace += ".{}".format(kdu_name)
1683 element_type = "KDU"
1684 element_under_configuration = kdu_name
1685 osm_config["osm"]["kdu_name"] = kdu_name
1686
1687 # Get artifact path
1688 artifact_path = "{}/{}/{}/{}".format(
1689 base_folder["folder"],
1690 base_folder["pkg-dir"],
1691 "charms"
1692 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1693 else "helm-charts",
1694 vca_name,
1695 )
1696
1697 self.logger.debug("Artifact path > {}".format(artifact_path))
1698
1699 # get initial_config_primitive_list that applies to this element
1700 initial_config_primitive_list = config_descriptor.get(
1701 "initial-config-primitive"
1702 )
1703
1704 self.logger.debug(
1705 "Initial config primitive list > {}".format(
1706 initial_config_primitive_list
1707 )
1708 )
1709
1710 # add config if not present for NS charm
1711 ee_descriptor_id = ee_config_descriptor.get("id")
1712 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1713 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1714 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1715 )
1716
1717 self.logger.debug(
1718 "Initial config primitive list #2 > {}".format(
1719 initial_config_primitive_list
1720 )
1721 )
1722 # n2vc_redesign STEP 3.1
1723 # find old ee_id if exists
1724 ee_id = vca_deployed.get("ee_id")
1725
1726 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1727 # create or register execution environment in VCA
1728 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1729
1730 self._write_configuration_status(
1731 nsr_id=nsr_id,
1732 vca_index=vca_index,
1733 status="CREATING",
1734 element_under_configuration=element_under_configuration,
1735 element_type=element_type,
1736 )
1737
1738 step = "create execution environment"
1739 self.logger.debug(logging_text + step)
1740
1741 ee_id = None
1742 credentials = None
1743 if vca_type == "k8s_proxy_charm":
1744 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1745 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1746 namespace=namespace,
1747 artifact_path=artifact_path,
1748 db_dict=db_dict,
1749 vca_id=vca_id,
1750 )
1751 elif vca_type == "helm" or vca_type == "helm-v3":
1752 ee_id, credentials = await self.vca_map[
1753 vca_type
1754 ].create_execution_environment(
1755 namespace=namespace,
1756 reuse_ee_id=ee_id,
1757 db_dict=db_dict,
1758 config=osm_config,
1759 artifact_path=artifact_path,
1760 vca_type=vca_type,
1761 )
1762 else:
1763 ee_id, credentials = await self.vca_map[
1764 vca_type
1765 ].create_execution_environment(
1766 namespace=namespace,
1767 reuse_ee_id=ee_id,
1768 db_dict=db_dict,
1769 vca_id=vca_id,
1770 )
1771
1772 elif vca_type == "native_charm":
1773 step = "Waiting to VM being up and getting IP address"
1774 self.logger.debug(logging_text + step)
1775 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1776 logging_text,
1777 nsr_id,
1778 vnfr_id,
1779 vdu_id,
1780 vdu_index,
1781 user=None,
1782 pub_key=None,
1783 )
1784 credentials = {"hostname": rw_mgmt_ip}
1785 # get username
1786 username = deep_get(
1787 config_descriptor, ("config-access", "ssh-access", "default-user")
1788 )
1789 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1790 # merged. Meanwhile let's get username from initial-config-primitive
1791 if not username and initial_config_primitive_list:
1792 for config_primitive in initial_config_primitive_list:
1793 for param in config_primitive.get("parameter", ()):
1794 if param["name"] == "ssh-username":
1795 username = param["value"]
1796 break
1797 if not username:
1798 raise LcmException(
1799 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1800 "'config-access.ssh-access.default-user'"
1801 )
1802 credentials["username"] = username
1803 # n2vc_redesign STEP 3.2
1804
1805 self._write_configuration_status(
1806 nsr_id=nsr_id,
1807 vca_index=vca_index,
1808 status="REGISTERING",
1809 element_under_configuration=element_under_configuration,
1810 element_type=element_type,
1811 )
1812
1813 step = "register execution environment {}".format(credentials)
1814 self.logger.debug(logging_text + step)
1815 ee_id = await self.vca_map[vca_type].register_execution_environment(
1816 credentials=credentials,
1817 namespace=namespace,
1818 db_dict=db_dict,
1819 vca_id=vca_id,
1820 )
1821
1822 # for compatibility with MON/POL modules, the need model and application name at database
1823 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1824 ee_id_parts = ee_id.split(".")
1825 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1826 if len(ee_id_parts) >= 2:
1827 model_name = ee_id_parts[0]
1828 application_name = ee_id_parts[1]
1829 db_nsr_update[db_update_entry + "model"] = model_name
1830 db_nsr_update[db_update_entry + "application"] = application_name
1831
1832 # n2vc_redesign STEP 3.3
1833 step = "Install configuration Software"
1834
1835 self._write_configuration_status(
1836 nsr_id=nsr_id,
1837 vca_index=vca_index,
1838 status="INSTALLING SW",
1839 element_under_configuration=element_under_configuration,
1840 element_type=element_type,
1841 other_update=db_nsr_update,
1842 )
1843
1844 # TODO check if already done
1845 self.logger.debug(logging_text + step)
1846 config = None
1847 if vca_type == "native_charm":
1848 config_primitive = next(
1849 (p for p in initial_config_primitive_list if p["name"] == "config"),
1850 None,
1851 )
1852 if config_primitive:
1853 config = self._map_primitive_params(
1854 config_primitive, {}, deploy_params
1855 )
1856 num_units = 1
1857 if vca_type == "lxc_proxy_charm":
1858 if element_type == "NS":
1859 num_units = db_nsr.get("config-units") or 1
1860 elif element_type == "VNF":
1861 num_units = db_vnfr.get("config-units") or 1
1862 elif element_type == "VDU":
1863 for v in db_vnfr["vdur"]:
1864 if vdu_id == v["vdu-id-ref"]:
1865 num_units = v.get("config-units") or 1
1866 break
1867 if vca_type != "k8s_proxy_charm":
1868 await self.vca_map[vca_type].install_configuration_sw(
1869 ee_id=ee_id,
1870 artifact_path=artifact_path,
1871 db_dict=db_dict,
1872 config=config,
1873 num_units=num_units,
1874 vca_id=vca_id,
1875 vca_type=vca_type,
1876 )
1877
1878 # write in db flag of configuration_sw already installed
1879 self.update_db_2(
1880 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1881 )
1882
1883 # add relations for this VCA (wait for other peers related with this VCA)
1884 await self._add_vca_relations(
1885 logging_text=logging_text,
1886 nsr_id=nsr_id,
1887 vca_index=vca_index,
1888 vca_id=vca_id,
1889 vca_type=vca_type,
1890 )
1891
1892 # if SSH access is required, then get execution environment SSH public
1893 # if native charm we have waited already to VM be UP
1894 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1895 pub_key = None
1896 user = None
1897 # self.logger.debug("get ssh key block")
1898 if deep_get(
1899 config_descriptor, ("config-access", "ssh-access", "required")
1900 ):
1901 # self.logger.debug("ssh key needed")
1902 # Needed to inject a ssh key
1903 user = deep_get(
1904 config_descriptor,
1905 ("config-access", "ssh-access", "default-user"),
1906 )
1907 step = "Install configuration Software, getting public ssh key"
1908 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1909 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1910 )
1911
1912 step = "Insert public key into VM user={} ssh_key={}".format(
1913 user, pub_key
1914 )
1915 else:
1916 # self.logger.debug("no need to get ssh key")
1917 step = "Waiting to VM being up and getting IP address"
1918 self.logger.debug(logging_text + step)
1919
1920 # n2vc_redesign STEP 5.1
1921 # wait for RO (ip-address) Insert pub_key into VM
1922 if vnfr_id:
1923 if kdu_name:
1924 rw_mgmt_ip = await self.wait_kdu_up(
1925 logging_text, nsr_id, vnfr_id, kdu_name
1926 )
1927 else:
1928 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1929 logging_text,
1930 nsr_id,
1931 vnfr_id,
1932 vdu_id,
1933 vdu_index,
1934 user=user,
1935 pub_key=pub_key,
1936 )
1937 else:
1938 rw_mgmt_ip = None # This is for a NS configuration
1939
1940 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1941
1942 # store rw_mgmt_ip in deploy params for later replacement
1943 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1944
1945 # n2vc_redesign STEP 6 Execute initial config primitive
1946 step = "execute initial config primitive"
1947
1948 # wait for dependent primitives execution (NS -> VNF -> VDU)
1949 if initial_config_primitive_list:
1950 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1951
1952 # stage, in function of element type: vdu, kdu, vnf or ns
1953 my_vca = vca_deployed_list[vca_index]
1954 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1955 # VDU or KDU
1956 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1957 elif my_vca.get("member-vnf-index"):
1958 # VNF
1959 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1960 else:
1961 # NS
1962 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1963
1964 self._write_configuration_status(
1965 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1966 )
1967
1968 self._write_op_status(op_id=nslcmop_id, stage=stage)
1969
1970 check_if_terminated_needed = True
1971 for initial_config_primitive in initial_config_primitive_list:
1972 # adding information on the vca_deployed if it is a NS execution environment
1973 if not vca_deployed["member-vnf-index"]:
1974 deploy_params["ns_config_info"] = json.dumps(
1975 self._get_ns_config_info(nsr_id)
1976 )
1977 # TODO check if already done
1978 primitive_params_ = self._map_primitive_params(
1979 initial_config_primitive, {}, deploy_params
1980 )
1981
1982 step = "execute primitive '{}' params '{}'".format(
1983 initial_config_primitive["name"], primitive_params_
1984 )
1985 self.logger.debug(logging_text + step)
1986 await self.vca_map[vca_type].exec_primitive(
1987 ee_id=ee_id,
1988 primitive_name=initial_config_primitive["name"],
1989 params_dict=primitive_params_,
1990 db_dict=db_dict,
1991 vca_id=vca_id,
1992 vca_type=vca_type,
1993 )
1994 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1995 if check_if_terminated_needed:
1996 if config_descriptor.get("terminate-config-primitive"):
1997 self.update_db_2(
1998 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
1999 )
2000 check_if_terminated_needed = False
2001
2002 # TODO register in database that primitive is done
2003
2004 # STEP 7 Configure metrics
2005 if vca_type == "helm" or vca_type == "helm-v3":
2006 prometheus_jobs = await self.add_prometheus_metrics(
2007 ee_id=ee_id,
2008 artifact_path=artifact_path,
2009 ee_config_descriptor=ee_config_descriptor,
2010 vnfr_id=vnfr_id,
2011 nsr_id=nsr_id,
2012 target_ip=rw_mgmt_ip,
2013 )
2014 if prometheus_jobs:
2015 self.update_db_2(
2016 "nsrs",
2017 nsr_id,
2018 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2019 )
2020
2021 step = "instantiated at VCA"
2022 self.logger.debug(logging_text + step)
2023
2024 self._write_configuration_status(
2025 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2026 )
2027
2028 except Exception as e: # TODO not use Exception but N2VC exception
2029 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2030 if not isinstance(
2031 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2032 ):
2033 self.logger.error(
2034 "Exception while {} : {}".format(step, e), exc_info=True
2035 )
2036 self._write_configuration_status(
2037 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2038 )
2039 raise LcmException("{} {}".format(step, e)) from e
2040
2041 def _write_ns_status(
2042 self,
2043 nsr_id: str,
2044 ns_state: str,
2045 current_operation: str,
2046 current_operation_id: str,
2047 error_description: str = None,
2048 error_detail: str = None,
2049 other_update: dict = None,
2050 ):
2051 """
2052 Update db_nsr fields.
2053 :param nsr_id:
2054 :param ns_state:
2055 :param current_operation:
2056 :param current_operation_id:
2057 :param error_description:
2058 :param error_detail:
2059 :param other_update: Other required changes at database if provided, will be cleared
2060 :return:
2061 """
2062 try:
2063 db_dict = other_update or {}
2064 db_dict[
2065 "_admin.nslcmop"
2066 ] = current_operation_id # for backward compatibility
2067 db_dict["_admin.current-operation"] = current_operation_id
2068 db_dict["_admin.operation-type"] = (
2069 current_operation if current_operation != "IDLE" else None
2070 )
2071 db_dict["currentOperation"] = current_operation
2072 db_dict["currentOperationID"] = current_operation_id
2073 db_dict["errorDescription"] = error_description
2074 db_dict["errorDetail"] = error_detail
2075
2076 if ns_state:
2077 db_dict["nsState"] = ns_state
2078 self.update_db_2("nsrs", nsr_id, db_dict)
2079 except DbException as e:
2080 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2081
2082 def _write_op_status(
2083 self,
2084 op_id: str,
2085 stage: list = None,
2086 error_message: str = None,
2087 queuePosition: int = 0,
2088 operation_state: str = None,
2089 other_update: dict = None,
2090 ):
2091 try:
2092 db_dict = other_update or {}
2093 db_dict["queuePosition"] = queuePosition
2094 if isinstance(stage, list):
2095 db_dict["stage"] = stage[0]
2096 db_dict["detailed-status"] = " ".join(stage)
2097 elif stage is not None:
2098 db_dict["stage"] = str(stage)
2099
2100 if error_message is not None:
2101 db_dict["errorMessage"] = error_message
2102 if operation_state is not None:
2103 db_dict["operationState"] = operation_state
2104 db_dict["statusEnteredTime"] = time()
2105 self.update_db_2("nslcmops", op_id, db_dict)
2106 except DbException as e:
2107 self.logger.warn(
2108 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2109 )
2110
2111 def _write_all_config_status(self, db_nsr: dict, status: str):
2112 try:
2113 nsr_id = db_nsr["_id"]
2114 # configurationStatus
2115 config_status = db_nsr.get("configurationStatus")
2116 if config_status:
2117 db_nsr_update = {
2118 "configurationStatus.{}.status".format(index): status
2119 for index, v in enumerate(config_status)
2120 if v
2121 }
2122 # update status
2123 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2124
2125 except DbException as e:
2126 self.logger.warn(
2127 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2128 )
2129
2130 def _write_configuration_status(
2131 self,
2132 nsr_id: str,
2133 vca_index: int,
2134 status: str = None,
2135 element_under_configuration: str = None,
2136 element_type: str = None,
2137 other_update: dict = None,
2138 ):
2139
2140 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2141 # .format(vca_index, status))
2142
2143 try:
2144 db_path = "configurationStatus.{}.".format(vca_index)
2145 db_dict = other_update or {}
2146 if status:
2147 db_dict[db_path + "status"] = status
2148 if element_under_configuration:
2149 db_dict[
2150 db_path + "elementUnderConfiguration"
2151 ] = element_under_configuration
2152 if element_type:
2153 db_dict[db_path + "elementType"] = element_type
2154 self.update_db_2("nsrs", nsr_id, db_dict)
2155 except DbException as e:
2156 self.logger.warn(
2157 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2158 status, nsr_id, vca_index, e
2159 )
2160 )
2161
2162 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2163 """
2164 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2165 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2166 Database is used because the result can be obtained from a different LCM worker in case of HA.
2167 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2168 :param db_nslcmop: database content of nslcmop
2169 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2170 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2171 computed 'vim-account-id'
2172 """
2173 modified = False
2174 nslcmop_id = db_nslcmop["_id"]
2175 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2176 if placement_engine == "PLA":
2177 self.logger.debug(
2178 logging_text + "Invoke and wait for placement optimization"
2179 )
2180 await self.msg.aiowrite(
2181 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2182 )
2183 db_poll_interval = 5
2184 wait = db_poll_interval * 10
2185 pla_result = None
2186 while not pla_result and wait >= 0:
2187 await asyncio.sleep(db_poll_interval)
2188 wait -= db_poll_interval
2189 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2190 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2191
2192 if not pla_result:
2193 raise LcmException(
2194 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2195 )
2196
2197 for pla_vnf in pla_result["vnf"]:
2198 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2199 if not pla_vnf.get("vimAccountId") or not vnfr:
2200 continue
2201 modified = True
2202 self.db.set_one(
2203 "vnfrs",
2204 {"_id": vnfr["_id"]},
2205 {"vim-account-id": pla_vnf["vimAccountId"]},
2206 )
2207 # Modifies db_vnfrs
2208 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2209 return modified
2210
2211 def update_nsrs_with_pla_result(self, params):
2212 try:
2213 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2214 self.update_db_2(
2215 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2216 )
2217 except Exception as e:
2218 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2219
2220 async def instantiate(self, nsr_id, nslcmop_id):
2221 """
2222
2223 :param nsr_id: ns instance to deploy
2224 :param nslcmop_id: operation to run
2225 :return:
2226 """
2227
2228 # Try to lock HA task here
2229 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2230 if not task_is_locked_by_me:
2231 self.logger.debug(
2232 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2233 )
2234 return
2235
2236 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2237 self.logger.debug(logging_text + "Enter")
2238
2239 # get all needed from database
2240
2241 # database nsrs record
2242 db_nsr = None
2243
2244 # database nslcmops record
2245 db_nslcmop = None
2246
2247 # update operation on nsrs
2248 db_nsr_update = {}
2249 # update operation on nslcmops
2250 db_nslcmop_update = {}
2251
2252 nslcmop_operation_state = None
2253 db_vnfrs = {} # vnf's info indexed by member-index
2254 # n2vc_info = {}
2255 tasks_dict_info = {} # from task to info text
2256 exc = None
2257 error_list = []
2258 stage = [
2259 "Stage 1/5: preparation of the environment.",
2260 "Waiting for previous operations to terminate.",
2261 "",
2262 ]
2263 # ^ stage, step, VIM progress
2264 try:
2265 # wait for any previous tasks in process
2266 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2267
2268 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2269 stage[1] = "Reading from database."
2270 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2271 db_nsr_update["detailed-status"] = "creating"
2272 db_nsr_update["operational-status"] = "init"
2273 self._write_ns_status(
2274 nsr_id=nsr_id,
2275 ns_state="BUILDING",
2276 current_operation="INSTANTIATING",
2277 current_operation_id=nslcmop_id,
2278 other_update=db_nsr_update,
2279 )
2280 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2281
2282 # read from db: operation
2283 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2284 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2285 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2286 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2287 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2288 )
2289 ns_params = db_nslcmop.get("operationParams")
2290 if ns_params and ns_params.get("timeout_ns_deploy"):
2291 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2292 else:
2293 timeout_ns_deploy = self.timeout.get(
2294 "ns_deploy", self.timeout_ns_deploy
2295 )
2296
2297 # read from db: ns
2298 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2299 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2300 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2301 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2302 self.fs.sync(db_nsr["nsd-id"])
2303 db_nsr["nsd"] = nsd
2304 # nsr_name = db_nsr["name"] # TODO short-name??
2305
2306 # read from db: vnf's of this ns
2307 stage[1] = "Getting vnfrs from db."
2308 self.logger.debug(logging_text + stage[1])
2309 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2310
2311 # read from db: vnfd's for every vnf
2312 db_vnfds = [] # every vnfd data
2313
2314 # for each vnf in ns, read vnfd
2315 for vnfr in db_vnfrs_list:
2316 if vnfr.get("kdur"):
2317 kdur_list = []
2318 for kdur in vnfr["kdur"]:
2319 if kdur.get("additionalParams"):
2320 kdur["additionalParams"] = json.loads(
2321 kdur["additionalParams"]
2322 )
2323 kdur_list.append(kdur)
2324 vnfr["kdur"] = kdur_list
2325
2326 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2327 vnfd_id = vnfr["vnfd-id"]
2328 vnfd_ref = vnfr["vnfd-ref"]
2329 self.fs.sync(vnfd_id)
2330
2331 # if we haven't this vnfd, read it from db
2332 if vnfd_id not in db_vnfds:
2333 # read from db
2334 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2335 vnfd_id, vnfd_ref
2336 )
2337 self.logger.debug(logging_text + stage[1])
2338 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2339
2340 # store vnfd
2341 db_vnfds.append(vnfd)
2342
2343 # Get or generates the _admin.deployed.VCA list
2344 vca_deployed_list = None
2345 if db_nsr["_admin"].get("deployed"):
2346 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2347 if vca_deployed_list is None:
2348 vca_deployed_list = []
2349 configuration_status_list = []
2350 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2351 db_nsr_update["configurationStatus"] = configuration_status_list
2352 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2353 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2354 elif isinstance(vca_deployed_list, dict):
2355 # maintain backward compatibility. Change a dict to list at database
2356 vca_deployed_list = list(vca_deployed_list.values())
2357 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2358 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2359
2360 if not isinstance(
2361 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2362 ):
2363 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2364 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2365
2366 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2367 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2368 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2369 self.db.set_list(
2370 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2371 )
2372
2373 # n2vc_redesign STEP 2 Deploy Network Scenario
2374 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2375 self._write_op_status(op_id=nslcmop_id, stage=stage)
2376
2377 stage[1] = "Deploying KDUs."
2378 # self.logger.debug(logging_text + "Before deploy_kdus")
2379 # Call to deploy_kdus in case exists the "vdu:kdu" param
2380 await self.deploy_kdus(
2381 logging_text=logging_text,
2382 nsr_id=nsr_id,
2383 nslcmop_id=nslcmop_id,
2384 db_vnfrs=db_vnfrs,
2385 db_vnfds=db_vnfds,
2386 task_instantiation_info=tasks_dict_info,
2387 )
2388
2389 stage[1] = "Getting VCA public key."
2390 # n2vc_redesign STEP 1 Get VCA public ssh-key
2391 # feature 1429. Add n2vc public key to needed VMs
2392 n2vc_key = self.n2vc.get_public_key()
2393 n2vc_key_list = [n2vc_key]
2394 if self.vca_config.get("public_key"):
2395 n2vc_key_list.append(self.vca_config["public_key"])
2396
2397 stage[1] = "Deploying NS at VIM."
2398 task_ro = asyncio.ensure_future(
2399 self.instantiate_RO(
2400 logging_text=logging_text,
2401 nsr_id=nsr_id,
2402 nsd=nsd,
2403 db_nsr=db_nsr,
2404 db_nslcmop=db_nslcmop,
2405 db_vnfrs=db_vnfrs,
2406 db_vnfds=db_vnfds,
2407 n2vc_key_list=n2vc_key_list,
2408 stage=stage,
2409 )
2410 )
2411 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2412 tasks_dict_info[task_ro] = "Deploying at VIM"
2413
2414 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2415 stage[1] = "Deploying Execution Environments."
2416 self.logger.debug(logging_text + stage[1])
2417
2418 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2419 for vnf_profile in get_vnf_profiles(nsd):
2420 vnfd_id = vnf_profile["vnfd-id"]
2421 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2422 member_vnf_index = str(vnf_profile["id"])
2423 db_vnfr = db_vnfrs[member_vnf_index]
2424 base_folder = vnfd["_admin"]["storage"]
2425 vdu_id = None
2426 vdu_index = 0
2427 vdu_name = None
2428 kdu_name = None
2429
2430 # Get additional parameters
2431 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2432 if db_vnfr.get("additionalParamsForVnf"):
2433 deploy_params.update(
2434 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2435 )
2436
2437 descriptor_config = get_configuration(vnfd, vnfd["id"])
2438 if descriptor_config:
2439 self._deploy_n2vc(
2440 logging_text=logging_text
2441 + "member_vnf_index={} ".format(member_vnf_index),
2442 db_nsr=db_nsr,
2443 db_vnfr=db_vnfr,
2444 nslcmop_id=nslcmop_id,
2445 nsr_id=nsr_id,
2446 nsi_id=nsi_id,
2447 vnfd_id=vnfd_id,
2448 vdu_id=vdu_id,
2449 kdu_name=kdu_name,
2450 member_vnf_index=member_vnf_index,
2451 vdu_index=vdu_index,
2452 vdu_name=vdu_name,
2453 deploy_params=deploy_params,
2454 descriptor_config=descriptor_config,
2455 base_folder=base_folder,
2456 task_instantiation_info=tasks_dict_info,
2457 stage=stage,
2458 )
2459
2460 # Deploy charms for each VDU that supports one.
2461 for vdud in get_vdu_list(vnfd):
2462 vdu_id = vdud["id"]
2463 descriptor_config = get_configuration(vnfd, vdu_id)
2464 vdur = find_in_list(
2465 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2466 )
2467
2468 if vdur.get("additionalParams"):
2469 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2470 else:
2471 deploy_params_vdu = deploy_params
2472 deploy_params_vdu["OSM"] = get_osm_params(
2473 db_vnfr, vdu_id, vdu_count_index=0
2474 )
2475 vdud_count = get_number_of_instances(vnfd, vdu_id)
2476
2477 self.logger.debug("VDUD > {}".format(vdud))
2478 self.logger.debug(
2479 "Descriptor config > {}".format(descriptor_config)
2480 )
2481 if descriptor_config:
2482 vdu_name = None
2483 kdu_name = None
2484 for vdu_index in range(vdud_count):
2485 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2486 self._deploy_n2vc(
2487 logging_text=logging_text
2488 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2489 member_vnf_index, vdu_id, vdu_index
2490 ),
2491 db_nsr=db_nsr,
2492 db_vnfr=db_vnfr,
2493 nslcmop_id=nslcmop_id,
2494 nsr_id=nsr_id,
2495 nsi_id=nsi_id,
2496 vnfd_id=vnfd_id,
2497 vdu_id=vdu_id,
2498 kdu_name=kdu_name,
2499 member_vnf_index=member_vnf_index,
2500 vdu_index=vdu_index,
2501 vdu_name=vdu_name,
2502 deploy_params=deploy_params_vdu,
2503 descriptor_config=descriptor_config,
2504 base_folder=base_folder,
2505 task_instantiation_info=tasks_dict_info,
2506 stage=stage,
2507 )
2508 for kdud in get_kdu_list(vnfd):
2509 kdu_name = kdud["name"]
2510 descriptor_config = get_configuration(vnfd, kdu_name)
2511 if descriptor_config:
2512 vdu_id = None
2513 vdu_index = 0
2514 vdu_name = None
2515 kdur = next(
2516 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2517 )
2518 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2519 if kdur.get("additionalParams"):
2520 deploy_params_kdu.update(
2521 parse_yaml_strings(kdur["additionalParams"].copy())
2522 )
2523
2524 self._deploy_n2vc(
2525 logging_text=logging_text,
2526 db_nsr=db_nsr,
2527 db_vnfr=db_vnfr,
2528 nslcmop_id=nslcmop_id,
2529 nsr_id=nsr_id,
2530 nsi_id=nsi_id,
2531 vnfd_id=vnfd_id,
2532 vdu_id=vdu_id,
2533 kdu_name=kdu_name,
2534 member_vnf_index=member_vnf_index,
2535 vdu_index=vdu_index,
2536 vdu_name=vdu_name,
2537 deploy_params=deploy_params_kdu,
2538 descriptor_config=descriptor_config,
2539 base_folder=base_folder,
2540 task_instantiation_info=tasks_dict_info,
2541 stage=stage,
2542 )
2543
2544 # Check if this NS has a charm configuration
2545 descriptor_config = nsd.get("ns-configuration")
2546 if descriptor_config and descriptor_config.get("juju"):
2547 vnfd_id = None
2548 db_vnfr = None
2549 member_vnf_index = None
2550 vdu_id = None
2551 kdu_name = None
2552 vdu_index = 0
2553 vdu_name = None
2554
2555 # Get additional parameters
2556 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2557 if db_nsr.get("additionalParamsForNs"):
2558 deploy_params.update(
2559 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2560 )
2561 base_folder = nsd["_admin"]["storage"]
2562 self._deploy_n2vc(
2563 logging_text=logging_text,
2564 db_nsr=db_nsr,
2565 db_vnfr=db_vnfr,
2566 nslcmop_id=nslcmop_id,
2567 nsr_id=nsr_id,
2568 nsi_id=nsi_id,
2569 vnfd_id=vnfd_id,
2570 vdu_id=vdu_id,
2571 kdu_name=kdu_name,
2572 member_vnf_index=member_vnf_index,
2573 vdu_index=vdu_index,
2574 vdu_name=vdu_name,
2575 deploy_params=deploy_params,
2576 descriptor_config=descriptor_config,
2577 base_folder=base_folder,
2578 task_instantiation_info=tasks_dict_info,
2579 stage=stage,
2580 )
2581
2582 # rest of staff will be done at finally
2583
2584 except (
2585 ROclient.ROClientException,
2586 DbException,
2587 LcmException,
2588 N2VCException,
2589 ) as e:
2590 self.logger.error(
2591 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2592 )
2593 exc = e
2594 except asyncio.CancelledError:
2595 self.logger.error(
2596 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2597 )
2598 exc = "Operation was cancelled"
2599 except Exception as e:
2600 exc = traceback.format_exc()
2601 self.logger.critical(
2602 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2603 exc_info=True,
2604 )
2605 finally:
2606 if exc:
2607 error_list.append(str(exc))
2608 try:
2609 # wait for pending tasks
2610 if tasks_dict_info:
2611 stage[1] = "Waiting for instantiate pending tasks."
2612 self.logger.debug(logging_text + stage[1])
2613 error_list += await self._wait_for_tasks(
2614 logging_text,
2615 tasks_dict_info,
2616 timeout_ns_deploy,
2617 stage,
2618 nslcmop_id,
2619 nsr_id=nsr_id,
2620 )
2621 stage[1] = stage[2] = ""
2622 except asyncio.CancelledError:
2623 error_list.append("Cancelled")
2624 # TODO cancel all tasks
2625 except Exception as exc:
2626 error_list.append(str(exc))
2627
2628 # update operation-status
2629 db_nsr_update["operational-status"] = "running"
2630 # let's begin with VCA 'configured' status (later we can change it)
2631 db_nsr_update["config-status"] = "configured"
2632 for task, task_name in tasks_dict_info.items():
2633 if not task.done() or task.cancelled() or task.exception():
2634 if task_name.startswith(self.task_name_deploy_vca):
2635 # A N2VC task is pending
2636 db_nsr_update["config-status"] = "failed"
2637 else:
2638 # RO or KDU task is pending
2639 db_nsr_update["operational-status"] = "failed"
2640
2641 # update status at database
2642 if error_list:
2643 error_detail = ". ".join(error_list)
2644 self.logger.error(logging_text + error_detail)
2645 error_description_nslcmop = "{} Detail: {}".format(
2646 stage[0], error_detail
2647 )
2648 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2649 nslcmop_id, stage[0]
2650 )
2651
2652 db_nsr_update["detailed-status"] = (
2653 error_description_nsr + " Detail: " + error_detail
2654 )
2655 db_nslcmop_update["detailed-status"] = error_detail
2656 nslcmop_operation_state = "FAILED"
2657 ns_state = "BROKEN"
2658 else:
2659 error_detail = None
2660 error_description_nsr = error_description_nslcmop = None
2661 ns_state = "READY"
2662 db_nsr_update["detailed-status"] = "Done"
2663 db_nslcmop_update["detailed-status"] = "Done"
2664 nslcmop_operation_state = "COMPLETED"
2665
2666 if db_nsr:
2667 self._write_ns_status(
2668 nsr_id=nsr_id,
2669 ns_state=ns_state,
2670 current_operation="IDLE",
2671 current_operation_id=None,
2672 error_description=error_description_nsr,
2673 error_detail=error_detail,
2674 other_update=db_nsr_update,
2675 )
2676 self._write_op_status(
2677 op_id=nslcmop_id,
2678 stage="",
2679 error_message=error_description_nslcmop,
2680 operation_state=nslcmop_operation_state,
2681 other_update=db_nslcmop_update,
2682 )
2683
2684 if nslcmop_operation_state:
2685 try:
2686 await self.msg.aiowrite(
2687 "ns",
2688 "instantiated",
2689 {
2690 "nsr_id": nsr_id,
2691 "nslcmop_id": nslcmop_id,
2692 "operationState": nslcmop_operation_state,
2693 },
2694 loop=self.loop,
2695 )
2696 except Exception as e:
2697 self.logger.error(
2698 logging_text + "kafka_write notification Exception {}".format(e)
2699 )
2700
2701 self.logger.debug(logging_text + "Exit")
2702 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2703
2704 async def _add_vca_relations(
2705 self,
2706 logging_text,
2707 nsr_id,
2708 vca_index: int,
2709 timeout: int = 3600,
2710 vca_type: str = None,
2711 vca_id: str = None,
2712 ) -> bool:
2713
2714 # steps:
2715 # 1. find all relations for this VCA
2716 # 2. wait for other peers related
2717 # 3. add relations
2718
2719 try:
2720 vca_type = vca_type or "lxc_proxy_charm"
2721
2722 # STEP 1: find all relations for this VCA
2723
2724 # read nsr record
2725 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2726 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2727
2728 # this VCA data
2729 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2730
2731 # read all ns-configuration relations
2732 ns_relations = list()
2733 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2734 if db_ns_relations:
2735 for r in db_ns_relations:
2736 # check if this VCA is in the relation
2737 if my_vca.get("member-vnf-index") in (
2738 r.get("entities")[0].get("id"),
2739 r.get("entities")[1].get("id"),
2740 ):
2741 ns_relations.append(r)
2742
2743 # read all vnf-configuration relations
2744 vnf_relations = list()
2745 db_vnfd_list = db_nsr.get("vnfd-id")
2746 if db_vnfd_list:
2747 for vnfd in db_vnfd_list:
2748 db_vnf_relations = None
2749 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2750 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2751 if db_vnf_configuration:
2752 db_vnf_relations = db_vnf_configuration.get("relation", [])
2753 if db_vnf_relations:
2754 for r in db_vnf_relations:
2755 # check if this VCA is in the relation
2756 if my_vca.get("vdu_id") in (
2757 r.get("entities")[0].get("id"),
2758 r.get("entities")[1].get("id"),
2759 ):
2760 vnf_relations.append(r)
2761
2762 # if no relations, terminate
2763 if not ns_relations and not vnf_relations:
2764 self.logger.debug(logging_text + " No relations")
2765 return True
2766
2767 self.logger.debug(
2768 logging_text
2769 + " adding relations\n {}\n {}".format(
2770 ns_relations, vnf_relations
2771 )
2772 )
2773
2774 # add all relations
2775 start = time()
2776 while True:
2777 # check timeout
2778 now = time()
2779 if now - start >= timeout:
2780 self.logger.error(logging_text + " : timeout adding relations")
2781 return False
2782
2783 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2784 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2785
2786 # for each defined NS relation, find the VCA's related
2787 for r in ns_relations.copy():
2788 from_vca_ee_id = None
2789 to_vca_ee_id = None
2790 from_vca_endpoint = None
2791 to_vca_endpoint = None
2792 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2793 for vca in vca_list:
2794 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2795 "id"
2796 ) and vca.get("config_sw_installed"):
2797 from_vca_ee_id = vca.get("ee_id")
2798 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2799 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2800 "id"
2801 ) and vca.get("config_sw_installed"):
2802 to_vca_ee_id = vca.get("ee_id")
2803 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2804 if from_vca_ee_id and to_vca_ee_id:
2805 # add relation
2806 await self.vca_map[vca_type].add_relation(
2807 ee_id_1=from_vca_ee_id,
2808 ee_id_2=to_vca_ee_id,
2809 endpoint_1=from_vca_endpoint,
2810 endpoint_2=to_vca_endpoint,
2811 vca_id=vca_id,
2812 )
2813 # remove entry from relations list
2814 ns_relations.remove(r)
2815 else:
2816 # check failed peers
2817 try:
2818 vca_status_list = db_nsr.get("configurationStatus")
2819 if vca_status_list:
2820 for i in range(len(vca_list)):
2821 vca = vca_list[i]
2822 vca_status = vca_status_list[i]
2823 if vca.get("member-vnf-index") == r.get("entities")[
2824 0
2825 ].get("id"):
2826 if vca_status.get("status") == "BROKEN":
2827 # peer broken: remove relation from list
2828 ns_relations.remove(r)
2829 if vca.get("member-vnf-index") == r.get("entities")[
2830 1
2831 ].get("id"):
2832 if vca_status.get("status") == "BROKEN":
2833 # peer broken: remove relation from list
2834 ns_relations.remove(r)
2835 except Exception:
2836 # ignore
2837 pass
2838
2839 # for each defined VNF relation, find the VCA's related
2840 for r in vnf_relations.copy():
2841 from_vca_ee_id = None
2842 to_vca_ee_id = None
2843 from_vca_endpoint = None
2844 to_vca_endpoint = None
2845 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2846 for vca in vca_list:
2847 key_to_check = "vdu_id"
2848 if vca.get("vdu_id") is None:
2849 key_to_check = "vnfd_id"
2850 if vca.get(key_to_check) == r.get("entities")[0].get(
2851 "id"
2852 ) and vca.get("config_sw_installed"):
2853 from_vca_ee_id = vca.get("ee_id")
2854 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2855 if vca.get(key_to_check) == r.get("entities")[1].get(
2856 "id"
2857 ) and vca.get("config_sw_installed"):
2858 to_vca_ee_id = vca.get("ee_id")
2859 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2860 if from_vca_ee_id and to_vca_ee_id:
2861 # add relation
2862 await self.vca_map[vca_type].add_relation(
2863 ee_id_1=from_vca_ee_id,
2864 ee_id_2=to_vca_ee_id,
2865 endpoint_1=from_vca_endpoint,
2866 endpoint_2=to_vca_endpoint,
2867 vca_id=vca_id,
2868 )
2869 # remove entry from relations list
2870 vnf_relations.remove(r)
2871 else:
2872 # check failed peers
2873 try:
2874 vca_status_list = db_nsr.get("configurationStatus")
2875 if vca_status_list:
2876 for i in range(len(vca_list)):
2877 vca = vca_list[i]
2878 vca_status = vca_status_list[i]
2879 if vca.get("vdu_id") == r.get("entities")[0].get(
2880 "id"
2881 ):
2882 if vca_status.get("status") == "BROKEN":
2883 # peer broken: remove relation from list
2884 vnf_relations.remove(r)
2885 if vca.get("vdu_id") == r.get("entities")[1].get(
2886 "id"
2887 ):
2888 if vca_status.get("status") == "BROKEN":
2889 # peer broken: remove relation from list
2890 vnf_relations.remove(r)
2891 except Exception:
2892 # ignore
2893 pass
2894
2895 # wait for next try
2896 await asyncio.sleep(5.0)
2897
2898 if not ns_relations and not vnf_relations:
2899 self.logger.debug("Relations added")
2900 break
2901
2902 return True
2903
2904 except Exception as e:
2905 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2906 return False
2907
2908 async def _install_kdu(
2909 self,
2910 nsr_id: str,
2911 nsr_db_path: str,
2912 vnfr_data: dict,
2913 kdu_index: int,
2914 kdud: dict,
2915 vnfd: dict,
2916 k8s_instance_info: dict,
2917 k8params: dict = None,
2918 timeout: int = 600,
2919 vca_id: str = None,
2920 ):
2921
2922 try:
2923 k8sclustertype = k8s_instance_info["k8scluster-type"]
2924 # Instantiate kdu
2925 db_dict_install = {
2926 "collection": "nsrs",
2927 "filter": {"_id": nsr_id},
2928 "path": nsr_db_path,
2929 }
2930
2931 if k8s_instance_info.get("kdu-deployment-name"):
2932 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2933 else:
2934 kdu_instance = self.k8scluster_map[
2935 k8sclustertype
2936 ].generate_kdu_instance_name(
2937 db_dict=db_dict_install,
2938 kdu_model=k8s_instance_info["kdu-model"],
2939 kdu_name=k8s_instance_info["kdu-name"],
2940 )
2941 self.update_db_2(
2942 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2943 )
2944 await self.k8scluster_map[k8sclustertype].install(
2945 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2946 kdu_model=k8s_instance_info["kdu-model"],
2947 atomic=True,
2948 params=k8params,
2949 db_dict=db_dict_install,
2950 timeout=timeout,
2951 kdu_name=k8s_instance_info["kdu-name"],
2952 namespace=k8s_instance_info["namespace"],
2953 kdu_instance=kdu_instance,
2954 vca_id=vca_id,
2955 )
2956 self.update_db_2(
2957 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2958 )
2959
2960 # Obtain services to obtain management service ip
2961 services = await self.k8scluster_map[k8sclustertype].get_services(
2962 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2963 kdu_instance=kdu_instance,
2964 namespace=k8s_instance_info["namespace"],
2965 )
2966
2967 # Obtain management service info (if exists)
2968 vnfr_update_dict = {}
2969 kdu_config = get_configuration(vnfd, kdud["name"])
2970 if kdu_config:
2971 target_ee_list = kdu_config.get("execution-environment-list", [])
2972 else:
2973 target_ee_list = []
2974
2975 if services:
2976 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2977 mgmt_services = [
2978 service
2979 for service in kdud.get("service", [])
2980 if service.get("mgmt-service")
2981 ]
2982 for mgmt_service in mgmt_services:
2983 for service in services:
2984 if service["name"].startswith(mgmt_service["name"]):
2985 # Mgmt service found, Obtain service ip
2986 ip = service.get("external_ip", service.get("cluster_ip"))
2987 if isinstance(ip, list) and len(ip) == 1:
2988 ip = ip[0]
2989
2990 vnfr_update_dict[
2991 "kdur.{}.ip-address".format(kdu_index)
2992 ] = ip
2993
2994 # Check if must update also mgmt ip at the vnf
2995 service_external_cp = mgmt_service.get(
2996 "external-connection-point-ref"
2997 )
2998 if service_external_cp:
2999 if (
3000 deep_get(vnfd, ("mgmt-interface", "cp"))
3001 == service_external_cp
3002 ):
3003 vnfr_update_dict["ip-address"] = ip
3004
3005 if find_in_list(
3006 target_ee_list,
3007 lambda ee: ee.get(
3008 "external-connection-point-ref", ""
3009 )
3010 == service_external_cp,
3011 ):
3012 vnfr_update_dict[
3013 "kdur.{}.ip-address".format(kdu_index)
3014 ] = ip
3015 break
3016 else:
3017 self.logger.warn(
3018 "Mgmt service name: {} not found".format(
3019 mgmt_service["name"]
3020 )
3021 )
3022
3023 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3024 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3025
3026 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3027 if (
3028 kdu_config
3029 and kdu_config.get("initial-config-primitive")
3030 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3031 ):
3032 initial_config_primitive_list = kdu_config.get(
3033 "initial-config-primitive"
3034 )
3035 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3036
3037 for initial_config_primitive in initial_config_primitive_list:
3038 primitive_params_ = self._map_primitive_params(
3039 initial_config_primitive, {}, {}
3040 )
3041
3042 await asyncio.wait_for(
3043 self.k8scluster_map[k8sclustertype].exec_primitive(
3044 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3045 kdu_instance=kdu_instance,
3046 primitive_name=initial_config_primitive["name"],
3047 params=primitive_params_,
3048 db_dict=db_dict_install,
3049 vca_id=vca_id,
3050 ),
3051 timeout=timeout,
3052 )
3053
3054 except Exception as e:
3055 # Prepare update db with error and raise exception
3056 try:
3057 self.update_db_2(
3058 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3059 )
3060 self.update_db_2(
3061 "vnfrs",
3062 vnfr_data.get("_id"),
3063 {"kdur.{}.status".format(kdu_index): "ERROR"},
3064 )
3065 except Exception:
3066 # ignore to keep original exception
3067 pass
3068 # reraise original error
3069 raise
3070
3071 return kdu_instance
3072
3073 async def deploy_kdus(
3074 self,
3075 logging_text,
3076 nsr_id,
3077 nslcmop_id,
3078 db_vnfrs,
3079 db_vnfds,
3080 task_instantiation_info,
3081 ):
3082 # Launch kdus if present in the descriptor
3083
3084 k8scluster_id_2_uuic = {
3085 "helm-chart-v3": {},
3086 "helm-chart": {},
3087 "juju-bundle": {},
3088 }
3089
3090 async def _get_cluster_id(cluster_id, cluster_type):
3091 nonlocal k8scluster_id_2_uuic
3092 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3093 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3094
3095 # check if K8scluster is creating and wait look if previous tasks in process
3096 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3097 "k8scluster", cluster_id
3098 )
3099 if task_dependency:
3100 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3101 task_name, cluster_id
3102 )
3103 self.logger.debug(logging_text + text)
3104 await asyncio.wait(task_dependency, timeout=3600)
3105
3106 db_k8scluster = self.db.get_one(
3107 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3108 )
3109 if not db_k8scluster:
3110 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3111
3112 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3113 if not k8s_id:
3114 if cluster_type == "helm-chart-v3":
3115 try:
3116 # backward compatibility for existing clusters that have not been initialized for helm v3
3117 k8s_credentials = yaml.safe_dump(
3118 db_k8scluster.get("credentials")
3119 )
3120 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3121 k8s_credentials, reuse_cluster_uuid=cluster_id
3122 )
3123 db_k8scluster_update = {}
3124 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3125 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3126 db_k8scluster_update[
3127 "_admin.helm-chart-v3.created"
3128 ] = uninstall_sw
3129 db_k8scluster_update[
3130 "_admin.helm-chart-v3.operationalState"
3131 ] = "ENABLED"
3132 self.update_db_2(
3133 "k8sclusters", cluster_id, db_k8scluster_update
3134 )
3135 except Exception as e:
3136 self.logger.error(
3137 logging_text
3138 + "error initializing helm-v3 cluster: {}".format(str(e))
3139 )
3140 raise LcmException(
3141 "K8s cluster '{}' has not been initialized for '{}'".format(
3142 cluster_id, cluster_type
3143 )
3144 )
3145 else:
3146 raise LcmException(
3147 "K8s cluster '{}' has not been initialized for '{}'".format(
3148 cluster_id, cluster_type
3149 )
3150 )
3151 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3152 return k8s_id
3153
3154 logging_text += "Deploy kdus: "
3155 step = ""
3156 try:
3157 db_nsr_update = {"_admin.deployed.K8s": []}
3158 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3159
3160 index = 0
3161 updated_cluster_list = []
3162 updated_v3_cluster_list = []
3163
3164 for vnfr_data in db_vnfrs.values():
3165 vca_id = self.get_vca_id(vnfr_data, {})
3166 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3167 # Step 0: Prepare and set parameters
3168 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3169 vnfd_id = vnfr_data.get("vnfd-id")
3170 vnfd_with_id = find_in_list(
3171 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3172 )
3173 kdud = next(
3174 kdud
3175 for kdud in vnfd_with_id["kdu"]
3176 if kdud["name"] == kdur["kdu-name"]
3177 )
3178 namespace = kdur.get("k8s-namespace")
3179 kdu_deployment_name = kdur.get("kdu-deployment-name")
3180 if kdur.get("helm-chart"):
3181 kdumodel = kdur["helm-chart"]
3182 # Default version: helm3, if helm-version is v2 assign v2
3183 k8sclustertype = "helm-chart-v3"
3184 self.logger.debug("kdur: {}".format(kdur))
3185 if (
3186 kdur.get("helm-version")
3187 and kdur.get("helm-version") == "v2"
3188 ):
3189 k8sclustertype = "helm-chart"
3190 elif kdur.get("juju-bundle"):
3191 kdumodel = kdur["juju-bundle"]
3192 k8sclustertype = "juju-bundle"
3193 else:
3194 raise LcmException(
3195 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3196 "juju-bundle. Maybe an old NBI version is running".format(
3197 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3198 )
3199 )
3200 # check if kdumodel is a file and exists
3201 try:
3202 vnfd_with_id = find_in_list(
3203 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3204 )
3205 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3206 if storage and storage.get(
3207 "pkg-dir"
3208 ): # may be not present if vnfd has not artifacts
3209 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3210 filename = "{}/{}/{}s/{}".format(
3211 storage["folder"],
3212 storage["pkg-dir"],
3213 k8sclustertype,
3214 kdumodel,
3215 )
3216 if self.fs.file_exists(
3217 filename, mode="file"
3218 ) or self.fs.file_exists(filename, mode="dir"):
3219 kdumodel = self.fs.path + filename
3220 except (asyncio.TimeoutError, asyncio.CancelledError):
3221 raise
3222 except Exception: # it is not a file
3223 pass
3224
3225 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3226 step = "Synchronize repos for k8s cluster '{}'".format(
3227 k8s_cluster_id
3228 )
3229 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3230
3231 # Synchronize repos
3232 if (
3233 k8sclustertype == "helm-chart"
3234 and cluster_uuid not in updated_cluster_list
3235 ) or (
3236 k8sclustertype == "helm-chart-v3"
3237 and cluster_uuid not in updated_v3_cluster_list
3238 ):
3239 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3240 self.k8scluster_map[k8sclustertype].synchronize_repos(
3241 cluster_uuid=cluster_uuid
3242 )
3243 )
3244 if del_repo_list or added_repo_dict:
3245 if k8sclustertype == "helm-chart":
3246 unset = {
3247 "_admin.helm_charts_added." + item: None
3248 for item in del_repo_list
3249 }
3250 updated = {
3251 "_admin.helm_charts_added." + item: name
3252 for item, name in added_repo_dict.items()
3253 }
3254 updated_cluster_list.append(cluster_uuid)
3255 elif k8sclustertype == "helm-chart-v3":
3256 unset = {
3257 "_admin.helm_charts_v3_added." + item: None
3258 for item in del_repo_list
3259 }
3260 updated = {
3261 "_admin.helm_charts_v3_added." + item: name
3262 for item, name in added_repo_dict.items()
3263 }
3264 updated_v3_cluster_list.append(cluster_uuid)
3265 self.logger.debug(
3266 logging_text + "repos synchronized on k8s cluster "
3267 "'{}' to_delete: {}, to_add: {}".format(
3268 k8s_cluster_id, del_repo_list, added_repo_dict
3269 )
3270 )
3271 self.db.set_one(
3272 "k8sclusters",
3273 {"_id": k8s_cluster_id},
3274 updated,
3275 unset=unset,
3276 )
3277
3278 # Instantiate kdu
3279 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3280 vnfr_data["member-vnf-index-ref"],
3281 kdur["kdu-name"],
3282 k8s_cluster_id,
3283 )
3284 k8s_instance_info = {
3285 "kdu-instance": None,
3286 "k8scluster-uuid": cluster_uuid,
3287 "k8scluster-type": k8sclustertype,
3288 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3289 "kdu-name": kdur["kdu-name"],
3290 "kdu-model": kdumodel,
3291 "namespace": namespace,
3292 "kdu-deployment-name": kdu_deployment_name,
3293 }
3294 db_path = "_admin.deployed.K8s.{}".format(index)
3295 db_nsr_update[db_path] = k8s_instance_info
3296 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3297 vnfd_with_id = find_in_list(
3298 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3299 )
3300 task = asyncio.ensure_future(
3301 self._install_kdu(
3302 nsr_id,
3303 db_path,
3304 vnfr_data,
3305 kdu_index,
3306 kdud,
3307 vnfd_with_id,
3308 k8s_instance_info,
3309 k8params=desc_params,
3310 timeout=600,
3311 vca_id=vca_id,
3312 )
3313 )
3314 self.lcm_tasks.register(
3315 "ns",
3316 nsr_id,
3317 nslcmop_id,
3318 "instantiate_KDU-{}".format(index),
3319 task,
3320 )
3321 task_instantiation_info[task] = "Deploying KDU {}".format(
3322 kdur["kdu-name"]
3323 )
3324
3325 index += 1
3326
3327 except (LcmException, asyncio.CancelledError):
3328 raise
3329 except Exception as e:
3330 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3331 if isinstance(e, (N2VCException, DbException)):
3332 self.logger.error(logging_text + msg)
3333 else:
3334 self.logger.critical(logging_text + msg, exc_info=True)
3335 raise LcmException(msg)
3336 finally:
3337 if db_nsr_update:
3338 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3339
3340 def _deploy_n2vc(
3341 self,
3342 logging_text,
3343 db_nsr,
3344 db_vnfr,
3345 nslcmop_id,
3346 nsr_id,
3347 nsi_id,
3348 vnfd_id,
3349 vdu_id,
3350 kdu_name,
3351 member_vnf_index,
3352 vdu_index,
3353 vdu_name,
3354 deploy_params,
3355 descriptor_config,
3356 base_folder,
3357 task_instantiation_info,
3358 stage,
3359 ):
3360 # launch instantiate_N2VC in a asyncio task and register task object
3361 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3362 # if not found, create one entry and update database
3363 # fill db_nsr._admin.deployed.VCA.<index>
3364
3365 self.logger.debug(
3366 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3367 )
3368 if "execution-environment-list" in descriptor_config:
3369 ee_list = descriptor_config.get("execution-environment-list", [])
3370 elif "juju" in descriptor_config:
3371 ee_list = [descriptor_config] # ns charms
3372 else: # other types as script are not supported
3373 ee_list = []
3374
3375 for ee_item in ee_list:
3376 self.logger.debug(
3377 logging_text
3378 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3379 ee_item.get("juju"), ee_item.get("helm-chart")
3380 )
3381 )
3382 ee_descriptor_id = ee_item.get("id")
3383 if ee_item.get("juju"):
3384 vca_name = ee_item["juju"].get("charm")
3385 vca_type = (
3386 "lxc_proxy_charm"
3387 if ee_item["juju"].get("charm") is not None
3388 else "native_charm"
3389 )
3390 if ee_item["juju"].get("cloud") == "k8s":
3391 vca_type = "k8s_proxy_charm"
3392 elif ee_item["juju"].get("proxy") is False:
3393 vca_type = "native_charm"
3394 elif ee_item.get("helm-chart"):
3395 vca_name = ee_item["helm-chart"]
3396 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3397 vca_type = "helm"
3398 else:
3399 vca_type = "helm-v3"
3400 else:
3401 self.logger.debug(
3402 logging_text + "skipping non juju neither charm configuration"
3403 )
3404 continue
3405
3406 vca_index = -1
3407 for vca_index, vca_deployed in enumerate(
3408 db_nsr["_admin"]["deployed"]["VCA"]
3409 ):
3410 if not vca_deployed:
3411 continue
3412 if (
3413 vca_deployed.get("member-vnf-index") == member_vnf_index
3414 and vca_deployed.get("vdu_id") == vdu_id
3415 and vca_deployed.get("kdu_name") == kdu_name
3416 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3417 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3418 ):
3419 break
3420 else:
3421 # not found, create one.
3422 target = (
3423 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3424 )
3425 if vdu_id:
3426 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3427 elif kdu_name:
3428 target += "/kdu/{}".format(kdu_name)
3429 vca_deployed = {
3430 "target_element": target,
3431 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3432 "member-vnf-index": member_vnf_index,
3433 "vdu_id": vdu_id,
3434 "kdu_name": kdu_name,
3435 "vdu_count_index": vdu_index,
3436 "operational-status": "init", # TODO revise
3437 "detailed-status": "", # TODO revise
3438 "step": "initial-deploy", # TODO revise
3439 "vnfd_id": vnfd_id,
3440 "vdu_name": vdu_name,
3441 "type": vca_type,
3442 "ee_descriptor_id": ee_descriptor_id,
3443 }
3444 vca_index += 1
3445
3446 # create VCA and configurationStatus in db
3447 db_dict = {
3448 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3449 "configurationStatus.{}".format(vca_index): dict(),
3450 }
3451 self.update_db_2("nsrs", nsr_id, db_dict)
3452
3453 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3454
3455 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3456 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3457 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3458
3459 # Launch task
3460 task_n2vc = asyncio.ensure_future(
3461 self.instantiate_N2VC(
3462 logging_text=logging_text,
3463 vca_index=vca_index,
3464 nsi_id=nsi_id,
3465 db_nsr=db_nsr,
3466 db_vnfr=db_vnfr,
3467 vdu_id=vdu_id,
3468 kdu_name=kdu_name,
3469 vdu_index=vdu_index,
3470 deploy_params=deploy_params,
3471 config_descriptor=descriptor_config,
3472 base_folder=base_folder,
3473 nslcmop_id=nslcmop_id,
3474 stage=stage,
3475 vca_type=vca_type,
3476 vca_name=vca_name,
3477 ee_config_descriptor=ee_item,
3478 )
3479 )
3480 self.lcm_tasks.register(
3481 "ns",
3482 nsr_id,
3483 nslcmop_id,
3484 "instantiate_N2VC-{}".format(vca_index),
3485 task_n2vc,
3486 )
3487 task_instantiation_info[
3488 task_n2vc
3489 ] = self.task_name_deploy_vca + " {}.{}".format(
3490 member_vnf_index or "", vdu_id or ""
3491 )
3492
3493 @staticmethod
3494 def _create_nslcmop(nsr_id, operation, params):
3495 """
3496 Creates a ns-lcm-opp content to be stored at database.
3497 :param nsr_id: internal id of the instance
3498 :param operation: instantiate, terminate, scale, action, ...
3499 :param params: user parameters for the operation
3500 :return: dictionary following SOL005 format
3501 """
3502 # Raise exception if invalid arguments
3503 if not (nsr_id and operation and params):
3504 raise LcmException(
3505 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3506 )
3507 now = time()
3508 _id = str(uuid4())
3509 nslcmop = {
3510 "id": _id,
3511 "_id": _id,
3512 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3513 "operationState": "PROCESSING",
3514 "statusEnteredTime": now,
3515 "nsInstanceId": nsr_id,
3516 "lcmOperationType": operation,
3517 "startTime": now,
3518 "isAutomaticInvocation": False,
3519 "operationParams": params,
3520 "isCancelPending": False,
3521 "links": {
3522 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3523 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3524 },
3525 }
3526 return nslcmop
3527
3528 def _format_additional_params(self, params):
3529 params = params or {}
3530 for key, value in params.items():
3531 if str(value).startswith("!!yaml "):
3532 params[key] = yaml.safe_load(value[7:])
3533 return params
3534
3535 def _get_terminate_primitive_params(self, seq, vnf_index):
3536 primitive = seq.get("name")
3537 primitive_params = {}
3538 params = {
3539 "member_vnf_index": vnf_index,
3540 "primitive": primitive,
3541 "primitive_params": primitive_params,
3542 }
3543 desc_params = {}
3544 return self._map_primitive_params(seq, params, desc_params)
3545
3546 # sub-operations
3547
3548 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3549 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3550 if op.get("operationState") == "COMPLETED":
3551 # b. Skip sub-operation
3552 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3553 return self.SUBOPERATION_STATUS_SKIP
3554 else:
3555 # c. retry executing sub-operation
3556 # The sub-operation exists, and operationState != 'COMPLETED'
3557 # Update operationState = 'PROCESSING' to indicate a retry.
3558 operationState = "PROCESSING"
3559 detailed_status = "In progress"
3560 self._update_suboperation_status(
3561 db_nslcmop, op_index, operationState, detailed_status
3562 )
3563 # Return the sub-operation index
3564 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3565 # with arguments extracted from the sub-operation
3566 return op_index
3567
3568 # Find a sub-operation where all keys in a matching dictionary must match
3569 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3570 def _find_suboperation(self, db_nslcmop, match):
3571 if db_nslcmop and match:
3572 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3573 for i, op in enumerate(op_list):
3574 if all(op.get(k) == match[k] for k in match):
3575 return i
3576 return self.SUBOPERATION_STATUS_NOT_FOUND
3577
3578 # Update status for a sub-operation given its index
3579 def _update_suboperation_status(
3580 self, db_nslcmop, op_index, operationState, detailed_status
3581 ):
3582 # Update DB for HA tasks
3583 q_filter = {"_id": db_nslcmop["_id"]}
3584 update_dict = {
3585 "_admin.operations.{}.operationState".format(op_index): operationState,
3586 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3587 }
3588 self.db.set_one(
3589 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3590 )
3591
3592 # Add sub-operation, return the index of the added sub-operation
3593 # Optionally, set operationState, detailed-status, and operationType
3594 # Status and type are currently set for 'scale' sub-operations:
3595 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3596 # 'detailed-status' : status message
3597 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3598 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3599 def _add_suboperation(
3600 self,
3601 db_nslcmop,
3602 vnf_index,
3603 vdu_id,
3604 vdu_count_index,
3605 vdu_name,
3606 primitive,
3607 mapped_primitive_params,
3608 operationState=None,
3609 detailed_status=None,
3610 operationType=None,
3611 RO_nsr_id=None,
3612 RO_scaling_info=None,
3613 ):
3614 if not db_nslcmop:
3615 return self.SUBOPERATION_STATUS_NOT_FOUND
3616 # Get the "_admin.operations" list, if it exists
3617 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3618 op_list = db_nslcmop_admin.get("operations")
3619 # Create or append to the "_admin.operations" list
3620 new_op = {
3621 "member_vnf_index": vnf_index,
3622 "vdu_id": vdu_id,
3623 "vdu_count_index": vdu_count_index,
3624 "primitive": primitive,
3625 "primitive_params": mapped_primitive_params,
3626 }
3627 if operationState:
3628 new_op["operationState"] = operationState
3629 if detailed_status:
3630 new_op["detailed-status"] = detailed_status
3631 if operationType:
3632 new_op["lcmOperationType"] = operationType
3633 if RO_nsr_id:
3634 new_op["RO_nsr_id"] = RO_nsr_id
3635 if RO_scaling_info:
3636 new_op["RO_scaling_info"] = RO_scaling_info
3637 if not op_list:
3638 # No existing operations, create key 'operations' with current operation as first list element
3639 db_nslcmop_admin.update({"operations": [new_op]})
3640 op_list = db_nslcmop_admin.get("operations")
3641 else:
3642 # Existing operations, append operation to list
3643 op_list.append(new_op)
3644
3645 db_nslcmop_update = {"_admin.operations": op_list}
3646 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3647 op_index = len(op_list) - 1
3648 return op_index
3649
3650 # Helper methods for scale() sub-operations
3651
3652 # pre-scale/post-scale:
3653 # Check for 3 different cases:
3654 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3655 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3656 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3657 def _check_or_add_scale_suboperation(
3658 self,
3659 db_nslcmop,
3660 vnf_index,
3661 vnf_config_primitive,
3662 primitive_params,
3663 operationType,
3664 RO_nsr_id=None,
3665 RO_scaling_info=None,
3666 ):
3667 # Find this sub-operation
3668 if RO_nsr_id and RO_scaling_info:
3669 operationType = "SCALE-RO"
3670 match = {
3671 "member_vnf_index": vnf_index,
3672 "RO_nsr_id": RO_nsr_id,
3673 "RO_scaling_info": RO_scaling_info,
3674 }
3675 else:
3676 match = {
3677 "member_vnf_index": vnf_index,
3678 "primitive": vnf_config_primitive,
3679 "primitive_params": primitive_params,
3680 "lcmOperationType": operationType,
3681 }
3682 op_index = self._find_suboperation(db_nslcmop, match)
3683 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3684 # a. New sub-operation
3685 # The sub-operation does not exist, add it.
3686 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3687 # The following parameters are set to None for all kind of scaling:
3688 vdu_id = None
3689 vdu_count_index = None
3690 vdu_name = None
3691 if RO_nsr_id and RO_scaling_info:
3692 vnf_config_primitive = None
3693 primitive_params = None
3694 else:
3695 RO_nsr_id = None
3696 RO_scaling_info = None
3697 # Initial status for sub-operation
3698 operationState = "PROCESSING"
3699 detailed_status = "In progress"
3700 # Add sub-operation for pre/post-scaling (zero or more operations)
3701 self._add_suboperation(
3702 db_nslcmop,
3703 vnf_index,
3704 vdu_id,
3705 vdu_count_index,
3706 vdu_name,
3707 vnf_config_primitive,
3708 primitive_params,
3709 operationState,
3710 detailed_status,
3711 operationType,
3712 RO_nsr_id,
3713 RO_scaling_info,
3714 )
3715 return self.SUBOPERATION_STATUS_NEW
3716 else:
3717 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3718 # or op_index (operationState != 'COMPLETED')
3719 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3720
3721 # Function to return execution_environment id
3722
3723 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3724 # TODO vdu_index_count
3725 for vca in vca_deployed_list:
3726 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3727 return vca["ee_id"]
3728
3729 async def destroy_N2VC(
3730 self,
3731 logging_text,
3732 db_nslcmop,
3733 vca_deployed,
3734 config_descriptor,
3735 vca_index,
3736 destroy_ee=True,
3737 exec_primitives=True,
3738 scaling_in=False,
3739 vca_id: str = None,
3740 ):
3741 """
3742 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3743 :param logging_text:
3744 :param db_nslcmop:
3745 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3746 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3747 :param vca_index: index in the database _admin.deployed.VCA
3748 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3749 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3750 not executed properly
3751 :param scaling_in: True destroys the application, False destroys the model
3752 :return: None or exception
3753 """
3754
3755 self.logger.debug(
3756 logging_text
3757 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3758 vca_index, vca_deployed, config_descriptor, destroy_ee
3759 )
3760 )
3761
3762 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3763
3764 # execute terminate_primitives
3765 if exec_primitives:
3766 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3767 config_descriptor.get("terminate-config-primitive"),
3768 vca_deployed.get("ee_descriptor_id"),
3769 )
3770 vdu_id = vca_deployed.get("vdu_id")
3771 vdu_count_index = vca_deployed.get("vdu_count_index")
3772 vdu_name = vca_deployed.get("vdu_name")
3773 vnf_index = vca_deployed.get("member-vnf-index")
3774 if terminate_primitives and vca_deployed.get("needed_terminate"):
3775 for seq in terminate_primitives:
3776 # For each sequence in list, get primitive and call _ns_execute_primitive()
3777 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3778 vnf_index, seq.get("name")
3779 )
3780 self.logger.debug(logging_text + step)
3781 # Create the primitive for each sequence, i.e. "primitive": "touch"
3782 primitive = seq.get("name")
3783 mapped_primitive_params = self._get_terminate_primitive_params(
3784 seq, vnf_index
3785 )
3786
3787 # Add sub-operation
3788 self._add_suboperation(
3789 db_nslcmop,
3790 vnf_index,
3791 vdu_id,
3792 vdu_count_index,
3793 vdu_name,
3794 primitive,
3795 mapped_primitive_params,
3796 )
3797 # Sub-operations: Call _ns_execute_primitive() instead of action()
3798 try:
3799 result, result_detail = await self._ns_execute_primitive(
3800 vca_deployed["ee_id"],
3801 primitive,
3802 mapped_primitive_params,
3803 vca_type=vca_type,
3804 vca_id=vca_id,
3805 )
3806 except LcmException:
3807 # this happens when VCA is not deployed. In this case it is not needed to terminate
3808 continue
3809 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3810 if result not in result_ok:
3811 raise LcmException(
3812 "terminate_primitive {} for vnf_member_index={} fails with "
3813 "error {}".format(seq.get("name"), vnf_index, result_detail)
3814 )
3815 # set that this VCA do not need terminated
3816 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3817 vca_index
3818 )
3819 self.update_db_2(
3820 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3821 )
3822
3823 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3824 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3825
3826 if destroy_ee:
3827 await self.vca_map[vca_type].delete_execution_environment(
3828 vca_deployed["ee_id"],
3829 scaling_in=scaling_in,
3830 vca_type=vca_type,
3831 vca_id=vca_id,
3832 )
3833
3834 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3835 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3836 namespace = "." + db_nsr["_id"]
3837 try:
3838 await self.n2vc.delete_namespace(
3839 namespace=namespace,
3840 total_timeout=self.timeout_charm_delete,
3841 vca_id=vca_id,
3842 )
3843 except N2VCNotFound: # already deleted. Skip
3844 pass
3845 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3846
3847 async def _terminate_RO(
3848 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3849 ):
3850 """
3851 Terminates a deployment from RO
3852 :param logging_text:
3853 :param nsr_deployed: db_nsr._admin.deployed
3854 :param nsr_id:
3855 :param nslcmop_id:
3856 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3857 this method will update only the index 2, but it will write on database the concatenated content of the list
3858 :return:
3859 """
3860 db_nsr_update = {}
3861 failed_detail = []
3862 ro_nsr_id = ro_delete_action = None
3863 if nsr_deployed and nsr_deployed.get("RO"):
3864 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3865 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3866 try:
3867 if ro_nsr_id:
3868 stage[2] = "Deleting ns from VIM."
3869 db_nsr_update["detailed-status"] = " ".join(stage)
3870 self._write_op_status(nslcmop_id, stage)
3871 self.logger.debug(logging_text + stage[2])
3872 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3873 self._write_op_status(nslcmop_id, stage)
3874 desc = await self.RO.delete("ns", ro_nsr_id)
3875 ro_delete_action = desc["action_id"]
3876 db_nsr_update[
3877 "_admin.deployed.RO.nsr_delete_action_id"
3878 ] = ro_delete_action
3879 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3880 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3881 if ro_delete_action:
3882 # wait until NS is deleted from VIM
3883 stage[2] = "Waiting ns deleted from VIM."
3884 detailed_status_old = None
3885 self.logger.debug(
3886 logging_text
3887 + stage[2]
3888 + " RO_id={} ro_delete_action={}".format(
3889 ro_nsr_id, ro_delete_action
3890 )
3891 )
3892 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3893 self._write_op_status(nslcmop_id, stage)
3894
3895 delete_timeout = 20 * 60 # 20 minutes
3896 while delete_timeout > 0:
3897 desc = await self.RO.show(
3898 "ns",
3899 item_id_name=ro_nsr_id,
3900 extra_item="action",
3901 extra_item_id=ro_delete_action,
3902 )
3903
3904 # deploymentStatus
3905 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3906
3907 ns_status, ns_status_info = self.RO.check_action_status(desc)
3908 if ns_status == "ERROR":
3909 raise ROclient.ROClientException(ns_status_info)
3910 elif ns_status == "BUILD":
3911 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3912 elif ns_status == "ACTIVE":
3913 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3914 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3915 break
3916 else:
3917 assert (
3918 False
3919 ), "ROclient.check_action_status returns unknown {}".format(
3920 ns_status
3921 )
3922 if stage[2] != detailed_status_old:
3923 detailed_status_old = stage[2]
3924 db_nsr_update["detailed-status"] = " ".join(stage)
3925 self._write_op_status(nslcmop_id, stage)
3926 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3927 await asyncio.sleep(5, loop=self.loop)
3928 delete_timeout -= 5
3929 else: # delete_timeout <= 0:
3930 raise ROclient.ROClientException(
3931 "Timeout waiting ns deleted from VIM"
3932 )
3933
3934 except Exception as e:
3935 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3936 if (
3937 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3938 ): # not found
3939 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3940 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3941 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3942 self.logger.debug(
3943 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3944 )
3945 elif (
3946 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3947 ): # conflict
3948 failed_detail.append("delete conflict: {}".format(e))
3949 self.logger.debug(
3950 logging_text
3951 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
3952 )
3953 else:
3954 failed_detail.append("delete error: {}".format(e))
3955 self.logger.error(
3956 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
3957 )
3958
3959 # Delete nsd
3960 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3961 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3962 try:
3963 stage[2] = "Deleting nsd from RO."
3964 db_nsr_update["detailed-status"] = " ".join(stage)
3965 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3966 self._write_op_status(nslcmop_id, stage)
3967 await self.RO.delete("nsd", ro_nsd_id)
3968 self.logger.debug(
3969 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
3970 )
3971 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3972 except Exception as e:
3973 if (
3974 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3975 ): # not found
3976 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3977 self.logger.debug(
3978 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
3979 )
3980 elif (
3981 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3982 ): # conflict
3983 failed_detail.append(
3984 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
3985 )
3986 self.logger.debug(logging_text + failed_detail[-1])
3987 else:
3988 failed_detail.append(
3989 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
3990 )
3991 self.logger.error(logging_text + failed_detail[-1])
3992
3993 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3994 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3995 if not vnf_deployed or not vnf_deployed["id"]:
3996 continue
3997 try:
3998 ro_vnfd_id = vnf_deployed["id"]
3999 stage[
4000 2
4001 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4002 vnf_deployed["member-vnf-index"], ro_vnfd_id
4003 )
4004 db_nsr_update["detailed-status"] = " ".join(stage)
4005 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4006 self._write_op_status(nslcmop_id, stage)
4007 await self.RO.delete("vnfd", ro_vnfd_id)
4008 self.logger.debug(
4009 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4010 )
4011 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4012 except Exception as e:
4013 if (
4014 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4015 ): # not found
4016 db_nsr_update[
4017 "_admin.deployed.RO.vnfd.{}.id".format(index)
4018 ] = None
4019 self.logger.debug(
4020 logging_text
4021 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4022 )
4023 elif (
4024 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4025 ): # conflict
4026 failed_detail.append(
4027 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4028 )
4029 self.logger.debug(logging_text + failed_detail[-1])
4030 else:
4031 failed_detail.append(
4032 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4033 )
4034 self.logger.error(logging_text + failed_detail[-1])
4035
4036 if failed_detail:
4037 stage[2] = "Error deleting from VIM"
4038 else:
4039 stage[2] = "Deleted from VIM"
4040 db_nsr_update["detailed-status"] = " ".join(stage)
4041 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4042 self._write_op_status(nslcmop_id, stage)
4043
4044 if failed_detail:
4045 raise LcmException("; ".join(failed_detail))
4046
4047 async def terminate(self, nsr_id, nslcmop_id):
4048 # Try to lock HA task here
4049 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4050 if not task_is_locked_by_me:
4051 return
4052
4053 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4054 self.logger.debug(logging_text + "Enter")
4055 timeout_ns_terminate = self.timeout_ns_terminate
4056 db_nsr = None
4057 db_nslcmop = None
4058 operation_params = None
4059 exc = None
4060 error_list = [] # annotates all failed error messages
4061 db_nslcmop_update = {}
4062 autoremove = False # autoremove after terminated
4063 tasks_dict_info = {}
4064 db_nsr_update = {}
4065 stage = [
4066 "Stage 1/3: Preparing task.",
4067 "Waiting for previous operations to terminate.",
4068 "",
4069 ]
4070 # ^ contains [stage, step, VIM-status]
4071 try:
4072 # wait for any previous tasks in process
4073 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4074
4075 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4076 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4077 operation_params = db_nslcmop.get("operationParams") or {}
4078 if operation_params.get("timeout_ns_terminate"):
4079 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4080 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4081 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4082
4083 db_nsr_update["operational-status"] = "terminating"
4084 db_nsr_update["config-status"] = "terminating"
4085 self._write_ns_status(
4086 nsr_id=nsr_id,
4087 ns_state="TERMINATING",
4088 current_operation="TERMINATING",
4089 current_operation_id=nslcmop_id,
4090 other_update=db_nsr_update,
4091 )
4092 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4093 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4094 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4095 return
4096
4097 stage[1] = "Getting vnf descriptors from db."
4098 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4099 db_vnfrs_dict = {
4100 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4101 }
4102 db_vnfds_from_id = {}
4103 db_vnfds_from_member_index = {}
4104 # Loop over VNFRs
4105 for vnfr in db_vnfrs_list:
4106 vnfd_id = vnfr["vnfd-id"]
4107 if vnfd_id not in db_vnfds_from_id:
4108 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4109 db_vnfds_from_id[vnfd_id] = vnfd
4110 db_vnfds_from_member_index[
4111 vnfr["member-vnf-index-ref"]
4112 ] = db_vnfds_from_id[vnfd_id]
4113
4114 # Destroy individual execution environments when there are terminating primitives.
4115 # Rest of EE will be deleted at once
4116 # TODO - check before calling _destroy_N2VC
4117 # if not operation_params.get("skip_terminate_primitives"):#
4118 # or not vca.get("needed_terminate"):
4119 stage[0] = "Stage 2/3 execute terminating primitives."
4120 self.logger.debug(logging_text + stage[0])
4121 stage[1] = "Looking execution environment that needs terminate."
4122 self.logger.debug(logging_text + stage[1])
4123
4124 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4125 config_descriptor = None
4126 vca_member_vnf_index = vca.get("member-vnf-index")
4127 vca_id = self.get_vca_id(
4128 db_vnfrs_dict.get(vca_member_vnf_index)
4129 if vca_member_vnf_index
4130 else None,
4131 db_nsr,
4132 )
4133 if not vca or not vca.get("ee_id"):
4134 continue
4135 if not vca.get("member-vnf-index"):
4136 # ns
4137 config_descriptor = db_nsr.get("ns-configuration")
4138 elif vca.get("vdu_id"):
4139 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4140 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4141 elif vca.get("kdu_name"):
4142 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4143 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4144 else:
4145 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4146 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4147 vca_type = vca.get("type")
4148 exec_terminate_primitives = not operation_params.get(
4149 "skip_terminate_primitives"
4150 ) and vca.get("needed_terminate")
4151 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4152 # pending native charms
4153 destroy_ee = (
4154 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4155 )
4156 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4157 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4158 task = asyncio.ensure_future(
4159 self.destroy_N2VC(
4160 logging_text,
4161 db_nslcmop,
4162 vca,
4163 config_descriptor,
4164 vca_index,
4165 destroy_ee,
4166 exec_terminate_primitives,
4167 vca_id=vca_id,
4168 )
4169 )
4170 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4171
4172 # wait for pending tasks of terminate primitives
4173 if tasks_dict_info:
4174 self.logger.debug(
4175 logging_text
4176 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4177 )
4178 error_list = await self._wait_for_tasks(
4179 logging_text,
4180 tasks_dict_info,
4181 min(self.timeout_charm_delete, timeout_ns_terminate),
4182 stage,
4183 nslcmop_id,
4184 )
4185 tasks_dict_info.clear()
4186 if error_list:
4187 return # raise LcmException("; ".join(error_list))
4188
4189 # remove All execution environments at once
4190 stage[0] = "Stage 3/3 delete all."
4191
4192 if nsr_deployed.get("VCA"):
4193 stage[1] = "Deleting all execution environments."
4194 self.logger.debug(logging_text + stage[1])
4195 vca_id = self.get_vca_id({}, db_nsr)
4196 task_delete_ee = asyncio.ensure_future(
4197 asyncio.wait_for(
4198 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4199 timeout=self.timeout_charm_delete,
4200 )
4201 )
4202 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4203 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4204
4205 # Delete from k8scluster
4206 stage[1] = "Deleting KDUs."
4207 self.logger.debug(logging_text + stage[1])
4208 # print(nsr_deployed)
4209 for kdu in get_iterable(nsr_deployed, "K8s"):
4210 if not kdu or not kdu.get("kdu-instance"):
4211 continue
4212 kdu_instance = kdu.get("kdu-instance")
4213 if kdu.get("k8scluster-type") in self.k8scluster_map:
4214 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4215 vca_id = self.get_vca_id({}, db_nsr)
4216 task_delete_kdu_instance = asyncio.ensure_future(
4217 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4218 cluster_uuid=kdu.get("k8scluster-uuid"),
4219 kdu_instance=kdu_instance,
4220 vca_id=vca_id,
4221 )
4222 )
4223 else:
4224 self.logger.error(
4225 logging_text
4226 + "Unknown k8s deployment type {}".format(
4227 kdu.get("k8scluster-type")
4228 )
4229 )
4230 continue
4231 tasks_dict_info[
4232 task_delete_kdu_instance
4233 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4234
4235 # remove from RO
4236 stage[1] = "Deleting ns from VIM."
4237 if self.ng_ro:
4238 task_delete_ro = asyncio.ensure_future(
4239 self._terminate_ng_ro(
4240 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4241 )
4242 )
4243 else:
4244 task_delete_ro = asyncio.ensure_future(
4245 self._terminate_RO(
4246 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4247 )
4248 )
4249 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4250
4251 # rest of staff will be done at finally
4252
4253 except (
4254 ROclient.ROClientException,
4255 DbException,
4256 LcmException,
4257 N2VCException,
4258 ) as e:
4259 self.logger.error(logging_text + "Exit Exception {}".format(e))
4260 exc = e
4261 except asyncio.CancelledError:
4262 self.logger.error(
4263 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4264 )
4265 exc = "Operation was cancelled"
4266 except Exception as e:
4267 exc = traceback.format_exc()
4268 self.logger.critical(
4269 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4270 exc_info=True,
4271 )
4272 finally:
4273 if exc:
4274 error_list.append(str(exc))
4275 try:
4276 # wait for pending tasks
4277 if tasks_dict_info:
4278 stage[1] = "Waiting for terminate pending tasks."
4279 self.logger.debug(logging_text + stage[1])
4280 error_list += await self._wait_for_tasks(
4281 logging_text,
4282 tasks_dict_info,
4283 timeout_ns_terminate,
4284 stage,
4285 nslcmop_id,
4286 )
4287 stage[1] = stage[2] = ""
4288 except asyncio.CancelledError:
4289 error_list.append("Cancelled")
4290 # TODO cancell all tasks
4291 except Exception as exc:
4292 error_list.append(str(exc))
4293 # update status at database
4294 if error_list:
4295 error_detail = "; ".join(error_list)
4296 # self.logger.error(logging_text + error_detail)
4297 error_description_nslcmop = "{} Detail: {}".format(
4298 stage[0], error_detail
4299 )
4300 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4301 nslcmop_id, stage[0]
4302 )
4303
4304 db_nsr_update["operational-status"] = "failed"
4305 db_nsr_update["detailed-status"] = (
4306 error_description_nsr + " Detail: " + error_detail
4307 )
4308 db_nslcmop_update["detailed-status"] = error_detail
4309 nslcmop_operation_state = "FAILED"
4310 ns_state = "BROKEN"
4311 else:
4312 error_detail = None
4313 error_description_nsr = error_description_nslcmop = None
4314 ns_state = "NOT_INSTANTIATED"
4315 db_nsr_update["operational-status"] = "terminated"
4316 db_nsr_update["detailed-status"] = "Done"
4317 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4318 db_nslcmop_update["detailed-status"] = "Done"
4319 nslcmop_operation_state = "COMPLETED"
4320
4321 if db_nsr:
4322 self._write_ns_status(
4323 nsr_id=nsr_id,
4324 ns_state=ns_state,
4325 current_operation="IDLE",
4326 current_operation_id=None,
4327 error_description=error_description_nsr,
4328 error_detail=error_detail,
4329 other_update=db_nsr_update,
4330 )
4331 self._write_op_status(
4332 op_id=nslcmop_id,
4333 stage="",
4334 error_message=error_description_nslcmop,
4335 operation_state=nslcmop_operation_state,
4336 other_update=db_nslcmop_update,
4337 )
4338 if ns_state == "NOT_INSTANTIATED":
4339 try:
4340 self.db.set_list(
4341 "vnfrs",
4342 {"nsr-id-ref": nsr_id},
4343 {"_admin.nsState": "NOT_INSTANTIATED"},
4344 )
4345 except DbException as e:
4346 self.logger.warn(
4347 logging_text
4348 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4349 nsr_id, e
4350 )
4351 )
4352 if operation_params:
4353 autoremove = operation_params.get("autoremove", False)
4354 if nslcmop_operation_state:
4355 try:
4356 await self.msg.aiowrite(
4357 "ns",
4358 "terminated",
4359 {
4360 "nsr_id": nsr_id,
4361 "nslcmop_id": nslcmop_id,
4362 "operationState": nslcmop_operation_state,
4363 "autoremove": autoremove,
4364 },
4365 loop=self.loop,
4366 )
4367 except Exception as e:
4368 self.logger.error(
4369 logging_text + "kafka_write notification Exception {}".format(e)
4370 )
4371
4372 self.logger.debug(logging_text + "Exit")
4373 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4374
4375 async def _wait_for_tasks(
4376 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4377 ):
4378 time_start = time()
4379 error_detail_list = []
4380 error_list = []
4381 pending_tasks = list(created_tasks_info.keys())
4382 num_tasks = len(pending_tasks)
4383 num_done = 0
4384 stage[1] = "{}/{}.".format(num_done, num_tasks)
4385 self._write_op_status(nslcmop_id, stage)
4386 while pending_tasks:
4387 new_error = None
4388 _timeout = timeout + time_start - time()
4389 done, pending_tasks = await asyncio.wait(
4390 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4391 )
4392 num_done += len(done)
4393 if not done: # Timeout
4394 for task in pending_tasks:
4395 new_error = created_tasks_info[task] + ": Timeout"
4396 error_detail_list.append(new_error)
4397 error_list.append(new_error)
4398 break
4399 for task in done:
4400 if task.cancelled():
4401 exc = "Cancelled"
4402 else:
4403 exc = task.exception()
4404 if exc:
4405 if isinstance(exc, asyncio.TimeoutError):
4406 exc = "Timeout"
4407 new_error = created_tasks_info[task] + ": {}".format(exc)
4408 error_list.append(created_tasks_info[task])
4409 error_detail_list.append(new_error)
4410 if isinstance(
4411 exc,
4412 (
4413 str,
4414 DbException,
4415 N2VCException,
4416 ROclient.ROClientException,
4417 LcmException,
4418 K8sException,
4419 NgRoException,
4420 ),
4421 ):
4422 self.logger.error(logging_text + new_error)
4423 else:
4424 exc_traceback = "".join(
4425 traceback.format_exception(None, exc, exc.__traceback__)
4426 )
4427 self.logger.error(
4428 logging_text
4429 + created_tasks_info[task]
4430 + " "
4431 + exc_traceback
4432 )
4433 else:
4434 self.logger.debug(
4435 logging_text + created_tasks_info[task] + ": Done"
4436 )
4437 stage[1] = "{}/{}.".format(num_done, num_tasks)
4438 if new_error:
4439 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4440 if nsr_id: # update also nsr
4441 self.update_db_2(
4442 "nsrs",
4443 nsr_id,
4444 {
4445 "errorDescription": "Error at: " + ", ".join(error_list),
4446 "errorDetail": ". ".join(error_detail_list),
4447 },
4448 )
4449 self._write_op_status(nslcmop_id, stage)
4450 return error_detail_list
4451
4452 @staticmethod
4453 def _map_primitive_params(primitive_desc, params, instantiation_params):
4454 """
4455 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4456 The default-value is used. If it is between < > it look for a value at instantiation_params
4457 :param primitive_desc: portion of VNFD/NSD that describes primitive
4458 :param params: Params provided by user
4459 :param instantiation_params: Instantiation params provided by user
4460 :return: a dictionary with the calculated params
4461 """
4462 calculated_params = {}
4463 for parameter in primitive_desc.get("parameter", ()):
4464 param_name = parameter["name"]
4465 if param_name in params:
4466 calculated_params[param_name] = params[param_name]
4467 elif "default-value" in parameter or "value" in parameter:
4468 if "value" in parameter:
4469 calculated_params[param_name] = parameter["value"]
4470 else:
4471 calculated_params[param_name] = parameter["default-value"]
4472 if (
4473 isinstance(calculated_params[param_name], str)
4474 and calculated_params[param_name].startswith("<")
4475 and calculated_params[param_name].endswith(">")
4476 ):
4477 if calculated_params[param_name][1:-1] in instantiation_params:
4478 calculated_params[param_name] = instantiation_params[
4479 calculated_params[param_name][1:-1]
4480 ]
4481 else:
4482 raise LcmException(
4483 "Parameter {} needed to execute primitive {} not provided".format(
4484 calculated_params[param_name], primitive_desc["name"]
4485 )
4486 )
4487 else:
4488 raise LcmException(
4489 "Parameter {} needed to execute primitive {} not provided".format(
4490 param_name, primitive_desc["name"]
4491 )
4492 )
4493
4494 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4495 calculated_params[param_name] = yaml.safe_dump(
4496 calculated_params[param_name], default_flow_style=True, width=256
4497 )
4498 elif isinstance(calculated_params[param_name], str) and calculated_params[
4499 param_name
4500 ].startswith("!!yaml "):
4501 calculated_params[param_name] = calculated_params[param_name][7:]
4502 if parameter.get("data-type") == "INTEGER":
4503 try:
4504 calculated_params[param_name] = int(calculated_params[param_name])
4505 except ValueError: # error converting string to int
4506 raise LcmException(
4507 "Parameter {} of primitive {} must be integer".format(
4508 param_name, primitive_desc["name"]
4509 )
4510 )
4511 elif parameter.get("data-type") == "BOOLEAN":
4512 calculated_params[param_name] = not (
4513 (str(calculated_params[param_name])).lower() == "false"
4514 )
4515
4516 # add always ns_config_info if primitive name is config
4517 if primitive_desc["name"] == "config":
4518 if "ns_config_info" in instantiation_params:
4519 calculated_params["ns_config_info"] = instantiation_params[
4520 "ns_config_info"
4521 ]
4522 return calculated_params
4523
4524 def _look_for_deployed_vca(
4525 self,
4526 deployed_vca,
4527 member_vnf_index,
4528 vdu_id,
4529 vdu_count_index,
4530 kdu_name=None,
4531 ee_descriptor_id=None,
4532 ):
4533 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4534 for vca in deployed_vca:
4535 if not vca:
4536 continue
4537 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4538 continue
4539 if (
4540 vdu_count_index is not None
4541 and vdu_count_index != vca["vdu_count_index"]
4542 ):
4543 continue
4544 if kdu_name and kdu_name != vca["kdu_name"]:
4545 continue
4546 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4547 continue
4548 break
4549 else:
4550 # vca_deployed not found
4551 raise LcmException(
4552 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4553 " is not deployed".format(
4554 member_vnf_index,
4555 vdu_id,
4556 vdu_count_index,
4557 kdu_name,
4558 ee_descriptor_id,
4559 )
4560 )
4561 # get ee_id
4562 ee_id = vca.get("ee_id")
4563 vca_type = vca.get(
4564 "type", "lxc_proxy_charm"
4565 ) # default value for backward compatibility - proxy charm
4566 if not ee_id:
4567 raise LcmException(
4568 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4569 "execution environment".format(
4570 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4571 )
4572 )
4573 return ee_id, vca_type
4574
4575 async def _ns_execute_primitive(
4576 self,
4577 ee_id,
4578 primitive,
4579 primitive_params,
4580 retries=0,
4581 retries_interval=30,
4582 timeout=None,
4583 vca_type=None,
4584 db_dict=None,
4585 vca_id: str = None,
4586 ) -> (str, str):
4587 try:
4588 if primitive == "config":
4589 primitive_params = {"params": primitive_params}
4590
4591 vca_type = vca_type or "lxc_proxy_charm"
4592
4593 while retries >= 0:
4594 try:
4595 output = await asyncio.wait_for(
4596 self.vca_map[vca_type].exec_primitive(
4597 ee_id=ee_id,
4598 primitive_name=primitive,
4599 params_dict=primitive_params,
4600 progress_timeout=self.timeout_progress_primitive,
4601 total_timeout=self.timeout_primitive,
4602 db_dict=db_dict,
4603 vca_id=vca_id,
4604 vca_type=vca_type,
4605 ),
4606 timeout=timeout or self.timeout_primitive,
4607 )
4608 # execution was OK
4609 break
4610 except asyncio.CancelledError:
4611 raise
4612 except Exception as e: # asyncio.TimeoutError
4613 if isinstance(e, asyncio.TimeoutError):
4614 e = "Timeout"
4615 retries -= 1
4616 if retries >= 0:
4617 self.logger.debug(
4618 "Error executing action {} on {} -> {}".format(
4619 primitive, ee_id, e
4620 )
4621 )
4622 # wait and retry
4623 await asyncio.sleep(retries_interval, loop=self.loop)
4624 else:
4625 return "FAILED", str(e)
4626
4627 return "COMPLETED", output
4628
4629 except (LcmException, asyncio.CancelledError):
4630 raise
4631 except Exception as e:
4632 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4633
4634 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4635 """
4636 Updating the vca_status with latest juju information in nsrs record
4637 :param: nsr_id: Id of the nsr
4638 :param: nslcmop_id: Id of the nslcmop
4639 :return: None
4640 """
4641
4642 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4643 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4644 vca_id = self.get_vca_id({}, db_nsr)
4645 if db_nsr["_admin"]["deployed"]["K8s"]:
4646 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4647 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4648 await self._on_update_k8s_db(
4649 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4650 )
4651 else:
4652 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4653 table, filter = "nsrs", {"_id": nsr_id}
4654 path = "_admin.deployed.VCA.{}.".format(vca_index)
4655 await self._on_update_n2vc_db(table, filter, path, {})
4656
4657 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4658 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4659
4660 async def action(self, nsr_id, nslcmop_id):
4661 # Try to lock HA task here
4662 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4663 if not task_is_locked_by_me:
4664 return
4665
4666 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4667 self.logger.debug(logging_text + "Enter")
4668 # get all needed from database
4669 db_nsr = None
4670 db_nslcmop = None
4671 db_nsr_update = {}
4672 db_nslcmop_update = {}
4673 nslcmop_operation_state = None
4674 error_description_nslcmop = None
4675 exc = None
4676 try:
4677 # wait for any previous tasks in process
4678 step = "Waiting for previous operations to terminate"
4679 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4680
4681 self._write_ns_status(
4682 nsr_id=nsr_id,
4683 ns_state=None,
4684 current_operation="RUNNING ACTION",
4685 current_operation_id=nslcmop_id,
4686 )
4687
4688 step = "Getting information from database"
4689 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4690 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4691 if db_nslcmop["operationParams"].get("primitive_params"):
4692 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4693 db_nslcmop["operationParams"]["primitive_params"]
4694 )
4695
4696 nsr_deployed = db_nsr["_admin"].get("deployed")
4697 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4698 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4699 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4700 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4701 primitive = db_nslcmop["operationParams"]["primitive"]
4702 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4703 timeout_ns_action = db_nslcmop["operationParams"].get(
4704 "timeout_ns_action", self.timeout_primitive
4705 )
4706
4707 if vnf_index:
4708 step = "Getting vnfr from database"
4709 db_vnfr = self.db.get_one(
4710 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4711 )
4712 if db_vnfr.get("kdur"):
4713 kdur_list = []
4714 for kdur in db_vnfr["kdur"]:
4715 if kdur.get("additionalParams"):
4716 kdur["additionalParams"] = json.loads(
4717 kdur["additionalParams"]
4718 )
4719 kdur_list.append(kdur)
4720 db_vnfr["kdur"] = kdur_list
4721 step = "Getting vnfd from database"
4722 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4723 else:
4724 step = "Getting nsd from database"
4725 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4726
4727 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4728 # for backward compatibility
4729 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4730 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4731 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4732 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4733
4734 # look for primitive
4735 config_primitive_desc = descriptor_configuration = None
4736 if vdu_id:
4737 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4738 elif kdu_name:
4739 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4740 elif vnf_index:
4741 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4742 else:
4743 descriptor_configuration = db_nsd.get("ns-configuration")
4744
4745 if descriptor_configuration and descriptor_configuration.get(
4746 "config-primitive"
4747 ):
4748 for config_primitive in descriptor_configuration["config-primitive"]:
4749 if config_primitive["name"] == primitive:
4750 config_primitive_desc = config_primitive
4751 break
4752
4753 if not config_primitive_desc:
4754 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4755 raise LcmException(
4756 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4757 primitive
4758 )
4759 )
4760 primitive_name = primitive
4761 ee_descriptor_id = None
4762 else:
4763 primitive_name = config_primitive_desc.get(
4764 "execution-environment-primitive", primitive
4765 )
4766 ee_descriptor_id = config_primitive_desc.get(
4767 "execution-environment-ref"
4768 )
4769
4770 if vnf_index:
4771 if vdu_id:
4772 vdur = next(
4773 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4774 )
4775 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4776 elif kdu_name:
4777 kdur = next(
4778 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4779 )
4780 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4781 else:
4782 desc_params = parse_yaml_strings(
4783 db_vnfr.get("additionalParamsForVnf")
4784 )
4785 else:
4786 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4787 if kdu_name and get_configuration(db_vnfd, kdu_name):
4788 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4789 actions = set()
4790 for primitive in kdu_configuration.get("initial-config-primitive", []):
4791 actions.add(primitive["name"])
4792 for primitive in kdu_configuration.get("config-primitive", []):
4793 actions.add(primitive["name"])
4794 kdu_action = True if primitive_name in actions else False
4795
4796 # TODO check if ns is in a proper status
4797 if kdu_name and (
4798 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4799 ):
4800 # kdur and desc_params already set from before
4801 if primitive_params:
4802 desc_params.update(primitive_params)
4803 # TODO Check if we will need something at vnf level
4804 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4805 if (
4806 kdu_name == kdu["kdu-name"]
4807 and kdu["member-vnf-index"] == vnf_index
4808 ):
4809 break
4810 else:
4811 raise LcmException(
4812 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4813 )
4814
4815 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4816 msg = "unknown k8scluster-type '{}'".format(
4817 kdu.get("k8scluster-type")
4818 )
4819 raise LcmException(msg)
4820
4821 db_dict = {
4822 "collection": "nsrs",
4823 "filter": {"_id": nsr_id},
4824 "path": "_admin.deployed.K8s.{}".format(index),
4825 }
4826 self.logger.debug(
4827 logging_text
4828 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4829 )
4830 step = "Executing kdu {}".format(primitive_name)
4831 if primitive_name == "upgrade":
4832 if desc_params.get("kdu_model"):
4833 kdu_model = desc_params.get("kdu_model")
4834 del desc_params["kdu_model"]
4835 else:
4836 kdu_model = kdu.get("kdu-model")
4837 parts = kdu_model.split(sep=":")
4838 if len(parts) == 2:
4839 kdu_model = parts[0]
4840
4841 detailed_status = await asyncio.wait_for(
4842 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4843 cluster_uuid=kdu.get("k8scluster-uuid"),
4844 kdu_instance=kdu.get("kdu-instance"),
4845 atomic=True,
4846 kdu_model=kdu_model,
4847 params=desc_params,
4848 db_dict=db_dict,
4849 timeout=timeout_ns_action,
4850 ),
4851 timeout=timeout_ns_action + 10,
4852 )
4853 self.logger.debug(
4854 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4855 )
4856 elif primitive_name == "rollback":
4857 detailed_status = await asyncio.wait_for(
4858 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4859 cluster_uuid=kdu.get("k8scluster-uuid"),
4860 kdu_instance=kdu.get("kdu-instance"),
4861 db_dict=db_dict,
4862 ),
4863 timeout=timeout_ns_action,
4864 )
4865 elif primitive_name == "status":
4866 detailed_status = await asyncio.wait_for(
4867 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4868 cluster_uuid=kdu.get("k8scluster-uuid"),
4869 kdu_instance=kdu.get("kdu-instance"),
4870 vca_id=vca_id,
4871 ),
4872 timeout=timeout_ns_action,
4873 )
4874 else:
4875 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4876 kdu["kdu-name"], nsr_id
4877 )
4878 params = self._map_primitive_params(
4879 config_primitive_desc, primitive_params, desc_params
4880 )
4881
4882 detailed_status = await asyncio.wait_for(
4883 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4884 cluster_uuid=kdu.get("k8scluster-uuid"),
4885 kdu_instance=kdu_instance,
4886 primitive_name=primitive_name,
4887 params=params,
4888 db_dict=db_dict,
4889 timeout=timeout_ns_action,
4890 vca_id=vca_id,
4891 ),
4892 timeout=timeout_ns_action,
4893 )
4894
4895 if detailed_status:
4896 nslcmop_operation_state = "COMPLETED"
4897 else:
4898 detailed_status = ""
4899 nslcmop_operation_state = "FAILED"
4900 else:
4901 ee_id, vca_type = self._look_for_deployed_vca(
4902 nsr_deployed["VCA"],
4903 member_vnf_index=vnf_index,
4904 vdu_id=vdu_id,
4905 vdu_count_index=vdu_count_index,
4906 ee_descriptor_id=ee_descriptor_id,
4907 )
4908 for vca_index, vca_deployed in enumerate(
4909 db_nsr["_admin"]["deployed"]["VCA"]
4910 ):
4911 if vca_deployed.get("member-vnf-index") == vnf_index:
4912 db_dict = {
4913 "collection": "nsrs",
4914 "filter": {"_id": nsr_id},
4915 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4916 }
4917 break
4918 (
4919 nslcmop_operation_state,
4920 detailed_status,
4921 ) = await self._ns_execute_primitive(
4922 ee_id,
4923 primitive=primitive_name,
4924 primitive_params=self._map_primitive_params(
4925 config_primitive_desc, primitive_params, desc_params
4926 ),
4927 timeout=timeout_ns_action,
4928 vca_type=vca_type,
4929 db_dict=db_dict,
4930 vca_id=vca_id,
4931 )
4932
4933 db_nslcmop_update["detailed-status"] = detailed_status
4934 error_description_nslcmop = (
4935 detailed_status if nslcmop_operation_state == "FAILED" else ""
4936 )
4937 self.logger.debug(
4938 logging_text
4939 + " task Done with result {} {}".format(
4940 nslcmop_operation_state, detailed_status
4941 )
4942 )
4943 return # database update is called inside finally
4944
4945 except (DbException, LcmException, N2VCException, K8sException) as e:
4946 self.logger.error(logging_text + "Exit Exception {}".format(e))
4947 exc = e
4948 except asyncio.CancelledError:
4949 self.logger.error(
4950 logging_text + "Cancelled Exception while '{}'".format(step)
4951 )
4952 exc = "Operation was cancelled"
4953 except asyncio.TimeoutError:
4954 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4955 exc = "Timeout"
4956 except Exception as e:
4957 exc = traceback.format_exc()
4958 self.logger.critical(
4959 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
4960 exc_info=True,
4961 )
4962 finally:
4963 if exc:
4964 db_nslcmop_update[
4965 "detailed-status"
4966 ] = (
4967 detailed_status
4968 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4969 nslcmop_operation_state = "FAILED"
4970 if db_nsr:
4971 self._write_ns_status(
4972 nsr_id=nsr_id,
4973 ns_state=db_nsr[
4974 "nsState"
4975 ], # TODO check if degraded. For the moment use previous status
4976 current_operation="IDLE",
4977 current_operation_id=None,
4978 # error_description=error_description_nsr,
4979 # error_detail=error_detail,
4980 other_update=db_nsr_update,
4981 )
4982
4983 self._write_op_status(
4984 op_id=nslcmop_id,
4985 stage="",
4986 error_message=error_description_nslcmop,
4987 operation_state=nslcmop_operation_state,
4988 other_update=db_nslcmop_update,
4989 )
4990
4991 if nslcmop_operation_state:
4992 try:
4993 await self.msg.aiowrite(
4994 "ns",
4995 "actioned",
4996 {
4997 "nsr_id": nsr_id,
4998 "nslcmop_id": nslcmop_id,
4999 "operationState": nslcmop_operation_state,
5000 },
5001 loop=self.loop,
5002 )
5003 except Exception as e:
5004 self.logger.error(
5005 logging_text + "kafka_write notification Exception {}".format(e)
5006 )
5007 self.logger.debug(logging_text + "Exit")
5008 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5009 return nslcmop_operation_state, detailed_status
5010
5011 async def scale(self, nsr_id, nslcmop_id):
5012 # Try to lock HA task here
5013 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5014 if not task_is_locked_by_me:
5015 return
5016
5017 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5018 stage = ["", "", ""]
5019 tasks_dict_info = {}
5020 # ^ stage, step, VIM progress
5021 self.logger.debug(logging_text + "Enter")
5022 # get all needed from database
5023 db_nsr = None
5024 db_nslcmop_update = {}
5025 db_nsr_update = {}
5026 exc = None
5027 # in case of error, indicates what part of scale was failed to put nsr at error status
5028 scale_process = None
5029 old_operational_status = ""
5030 old_config_status = ""
5031 nsi_id = None
5032 try:
5033 # wait for any previous tasks in process
5034 step = "Waiting for previous operations to terminate"
5035 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5036 self._write_ns_status(
5037 nsr_id=nsr_id,
5038 ns_state=None,
5039 current_operation="SCALING",
5040 current_operation_id=nslcmop_id,
5041 )
5042
5043 step = "Getting nslcmop from database"
5044 self.logger.debug(
5045 step + " after having waited for previous tasks to be completed"
5046 )
5047 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5048
5049 step = "Getting nsr from database"
5050 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5051 old_operational_status = db_nsr["operational-status"]
5052 old_config_status = db_nsr["config-status"]
5053
5054 step = "Parsing scaling parameters"
5055 db_nsr_update["operational-status"] = "scaling"
5056 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5057 nsr_deployed = db_nsr["_admin"].get("deployed")
5058
5059 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5060 "scaleByStepData"
5061 ]["member-vnf-index"]
5062 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5063 "scaleByStepData"
5064 ]["scaling-group-descriptor"]
5065 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5066 # for backward compatibility
5067 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5068 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5069 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5070 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5071
5072 step = "Getting vnfr from database"
5073 db_vnfr = self.db.get_one(
5074 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5075 )
5076
5077 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5078
5079 step = "Getting vnfd from database"
5080 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5081
5082 base_folder = db_vnfd["_admin"]["storage"]
5083
5084 step = "Getting scaling-group-descriptor"
5085 scaling_descriptor = find_in_list(
5086 get_scaling_aspect(db_vnfd),
5087 lambda scale_desc: scale_desc["name"] == scaling_group,
5088 )
5089 if not scaling_descriptor:
5090 raise LcmException(
5091 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5092 "at vnfd:scaling-group-descriptor".format(scaling_group)
5093 )
5094
5095 step = "Sending scale order to VIM"
5096 # TODO check if ns is in a proper status
5097 nb_scale_op = 0
5098 if not db_nsr["_admin"].get("scaling-group"):
5099 self.update_db_2(
5100 "nsrs",
5101 nsr_id,
5102 {
5103 "_admin.scaling-group": [
5104 {"name": scaling_group, "nb-scale-op": 0}
5105 ]
5106 },
5107 )
5108 admin_scale_index = 0
5109 else:
5110 for admin_scale_index, admin_scale_info in enumerate(
5111 db_nsr["_admin"]["scaling-group"]
5112 ):
5113 if admin_scale_info["name"] == scaling_group:
5114 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5115 break
5116 else: # not found, set index one plus last element and add new entry with the name
5117 admin_scale_index += 1
5118 db_nsr_update[
5119 "_admin.scaling-group.{}.name".format(admin_scale_index)
5120 ] = scaling_group
5121
5122 vca_scaling_info = []
5123 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5124 if scaling_type == "SCALE_OUT":
5125 if "aspect-delta-details" not in scaling_descriptor:
5126 raise LcmException(
5127 "Aspect delta details not fount in scaling descriptor {}".format(
5128 scaling_descriptor["name"]
5129 )
5130 )
5131 # count if max-instance-count is reached
5132 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5133
5134 scaling_info["scaling_direction"] = "OUT"
5135 scaling_info["vdu-create"] = {}
5136 scaling_info["kdu-create"] = {}
5137 for delta in deltas:
5138 for vdu_delta in delta.get("vdu-delta", {}):
5139 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5140 # vdu_index also provides the number of instance of the targeted vdu
5141 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5142 cloud_init_text = self._get_vdu_cloud_init_content(
5143 vdud, db_vnfd
5144 )
5145 if cloud_init_text:
5146 additional_params = (
5147 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5148 or {}
5149 )
5150 cloud_init_list = []
5151
5152 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5153 max_instance_count = 10
5154 if vdu_profile and "max-number-of-instances" in vdu_profile:
5155 max_instance_count = vdu_profile.get(
5156 "max-number-of-instances", 10
5157 )
5158
5159 default_instance_num = get_number_of_instances(
5160 db_vnfd, vdud["id"]
5161 )
5162 instances_number = vdu_delta.get("number-of-instances", 1)
5163 nb_scale_op += instances_number
5164
5165 new_instance_count = nb_scale_op + default_instance_num
5166 # Control if new count is over max and vdu count is less than max.
5167 # Then assign new instance count
5168 if new_instance_count > max_instance_count > vdu_count:
5169 instances_number = new_instance_count - max_instance_count
5170 else:
5171 instances_number = instances_number
5172
5173 if new_instance_count > max_instance_count:
5174 raise LcmException(
5175 "reached the limit of {} (max-instance-count) "
5176 "scaling-out operations for the "
5177 "scaling-group-descriptor '{}'".format(
5178 nb_scale_op, scaling_group
5179 )
5180 )
5181 for x in range(vdu_delta.get("number-of-instances", 1)):
5182 if cloud_init_text:
5183 # TODO Information of its own ip is not available because db_vnfr is not updated.
5184 additional_params["OSM"] = get_osm_params(
5185 db_vnfr, vdu_delta["id"], vdu_index + x
5186 )
5187 cloud_init_list.append(
5188 self._parse_cloud_init(
5189 cloud_init_text,
5190 additional_params,
5191 db_vnfd["id"],
5192 vdud["id"],
5193 )
5194 )
5195 vca_scaling_info.append(
5196 {
5197 "osm_vdu_id": vdu_delta["id"],
5198 "member-vnf-index": vnf_index,
5199 "type": "create",
5200 "vdu_index": vdu_index + x,
5201 }
5202 )
5203 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5204 for kdu_delta in delta.get("kdu-resource-delta", {}):
5205 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5206 kdu_name = kdu_profile["kdu-name"]
5207 resource_name = kdu_profile["resource-name"]
5208
5209 # Might have different kdus in the same delta
5210 # Should have list for each kdu
5211 if not scaling_info["kdu-create"].get(kdu_name, None):
5212 scaling_info["kdu-create"][kdu_name] = []
5213
5214 kdur = get_kdur(db_vnfr, kdu_name)
5215 if kdur.get("helm-chart"):
5216 k8s_cluster_type = "helm-chart-v3"
5217 self.logger.debug("kdur: {}".format(kdur))
5218 if (
5219 kdur.get("helm-version")
5220 and kdur.get("helm-version") == "v2"
5221 ):
5222 k8s_cluster_type = "helm-chart"
5223 raise NotImplementedError
5224 elif kdur.get("juju-bundle"):
5225 k8s_cluster_type = "juju-bundle"
5226 else:
5227 raise LcmException(
5228 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5229 "juju-bundle. Maybe an old NBI version is running".format(
5230 db_vnfr["member-vnf-index-ref"], kdu_name
5231 )
5232 )
5233
5234 max_instance_count = 10
5235 if kdu_profile and "max-number-of-instances" in kdu_profile:
5236 max_instance_count = kdu_profile.get(
5237 "max-number-of-instances", 10
5238 )
5239
5240 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5241 deployed_kdu, _ = get_deployed_kdu(
5242 nsr_deployed, kdu_name, vnf_index
5243 )
5244 if deployed_kdu is None:
5245 raise LcmException(
5246 "KDU '{}' for vnf '{}' not deployed".format(
5247 kdu_name, vnf_index
5248 )
5249 )
5250 kdu_instance = deployed_kdu.get("kdu-instance")
5251 instance_num = await self.k8scluster_map[
5252 k8s_cluster_type
5253 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5254 kdu_replica_count = instance_num + kdu_delta.get(
5255 "number-of-instances", 1
5256 )
5257
5258 # Control if new count is over max and instance_num is less than max.
5259 # Then assign max instance number to kdu replica count
5260 if kdu_replica_count > max_instance_count > instance_num:
5261 kdu_replica_count = max_instance_count
5262 if kdu_replica_count > max_instance_count:
5263 raise LcmException(
5264 "reached the limit of {} (max-instance-count) "
5265 "scaling-out operations for the "
5266 "scaling-group-descriptor '{}'".format(
5267 instance_num, scaling_group
5268 )
5269 )
5270
5271 for x in range(kdu_delta.get("number-of-instances", 1)):
5272 vca_scaling_info.append(
5273 {
5274 "osm_kdu_id": kdu_name,
5275 "member-vnf-index": vnf_index,
5276 "type": "create",
5277 "kdu_index": instance_num + x - 1,
5278 }
5279 )
5280 scaling_info["kdu-create"][kdu_name].append(
5281 {
5282 "member-vnf-index": vnf_index,
5283 "type": "create",
5284 "k8s-cluster-type": k8s_cluster_type,
5285 "resource-name": resource_name,
5286 "scale": kdu_replica_count,
5287 }
5288 )
5289 elif scaling_type == "SCALE_IN":
5290 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5291
5292 scaling_info["scaling_direction"] = "IN"
5293 scaling_info["vdu-delete"] = {}
5294 scaling_info["kdu-delete"] = {}
5295
5296 for delta in deltas:
5297 for vdu_delta in delta.get("vdu-delta", {}):
5298 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5299 min_instance_count = 0
5300 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5301 if vdu_profile and "min-number-of-instances" in vdu_profile:
5302 min_instance_count = vdu_profile["min-number-of-instances"]
5303
5304 default_instance_num = get_number_of_instances(
5305 db_vnfd, vdu_delta["id"]
5306 )
5307 instance_num = vdu_delta.get("number-of-instances", 1)
5308 nb_scale_op -= instance_num
5309
5310 new_instance_count = nb_scale_op + default_instance_num
5311
5312 if new_instance_count < min_instance_count < vdu_count:
5313 instances_number = min_instance_count - new_instance_count
5314 else:
5315 instances_number = instance_num
5316
5317 if new_instance_count < min_instance_count:
5318 raise LcmException(
5319 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5320 "scaling-group-descriptor '{}'".format(
5321 nb_scale_op, scaling_group
5322 )
5323 )
5324 for x in range(vdu_delta.get("number-of-instances", 1)):
5325 vca_scaling_info.append(
5326 {
5327 "osm_vdu_id": vdu_delta["id"],
5328 "member-vnf-index": vnf_index,
5329 "type": "delete",
5330 "vdu_index": vdu_index - 1 - x,
5331 }
5332 )
5333 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5334 for kdu_delta in delta.get("kdu-resource-delta", {}):
5335 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5336 kdu_name = kdu_profile["kdu-name"]
5337 resource_name = kdu_profile["resource-name"]
5338
5339 if not scaling_info["kdu-delete"].get(kdu_name, None):
5340 scaling_info["kdu-delete"][kdu_name] = []
5341
5342 kdur = get_kdur(db_vnfr, kdu_name)
5343 if kdur.get("helm-chart"):
5344 k8s_cluster_type = "helm-chart-v3"
5345 self.logger.debug("kdur: {}".format(kdur))
5346 if (
5347 kdur.get("helm-version")
5348 and kdur.get("helm-version") == "v2"
5349 ):
5350 k8s_cluster_type = "helm-chart"
5351 raise NotImplementedError
5352 elif kdur.get("juju-bundle"):
5353 k8s_cluster_type = "juju-bundle"
5354 else:
5355 raise LcmException(
5356 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5357 "juju-bundle. Maybe an old NBI version is running".format(
5358 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5359 )
5360 )
5361
5362 min_instance_count = 0
5363 if kdu_profile and "min-number-of-instances" in kdu_profile:
5364 min_instance_count = kdu_profile["min-number-of-instances"]
5365
5366 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5367 deployed_kdu, _ = get_deployed_kdu(
5368 nsr_deployed, kdu_name, vnf_index
5369 )
5370 if deployed_kdu is None:
5371 raise LcmException(
5372 "KDU '{}' for vnf '{}' not deployed".format(
5373 kdu_name, vnf_index
5374 )
5375 )
5376 kdu_instance = deployed_kdu.get("kdu-instance")
5377 instance_num = await self.k8scluster_map[
5378 k8s_cluster_type
5379 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5380 kdu_replica_count = instance_num - kdu_delta.get(
5381 "number-of-instances", 1
5382 )
5383
5384 if kdu_replica_count < min_instance_count < instance_num:
5385 kdu_replica_count = min_instance_count
5386 if kdu_replica_count < min_instance_count:
5387 raise LcmException(
5388 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5389 "scaling-group-descriptor '{}'".format(
5390 instance_num, scaling_group
5391 )
5392 )
5393
5394 for x in range(kdu_delta.get("number-of-instances", 1)):
5395 vca_scaling_info.append(
5396 {
5397 "osm_kdu_id": kdu_name,
5398 "member-vnf-index": vnf_index,
5399 "type": "delete",
5400 "kdu_index": instance_num - x - 1,
5401 }
5402 )
5403 scaling_info["kdu-delete"][kdu_name].append(
5404 {
5405 "member-vnf-index": vnf_index,
5406 "type": "delete",
5407 "k8s-cluster-type": k8s_cluster_type,
5408 "resource-name": resource_name,
5409 "scale": kdu_replica_count,
5410 }
5411 )
5412
5413 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5414 vdu_delete = copy(scaling_info.get("vdu-delete"))
5415 if scaling_info["scaling_direction"] == "IN":
5416 for vdur in reversed(db_vnfr["vdur"]):
5417 if vdu_delete.get(vdur["vdu-id-ref"]):
5418 vdu_delete[vdur["vdu-id-ref"]] -= 1
5419 scaling_info["vdu"].append(
5420 {
5421 "name": vdur.get("name") or vdur.get("vdu-name"),
5422 "vdu_id": vdur["vdu-id-ref"],
5423 "interface": [],
5424 }
5425 )
5426 for interface in vdur["interfaces"]:
5427 scaling_info["vdu"][-1]["interface"].append(
5428 {
5429 "name": interface["name"],
5430 "ip_address": interface["ip-address"],
5431 "mac_address": interface.get("mac-address"),
5432 }
5433 )
5434 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5435
5436 # PRE-SCALE BEGIN
5437 step = "Executing pre-scale vnf-config-primitive"
5438 if scaling_descriptor.get("scaling-config-action"):
5439 for scaling_config_action in scaling_descriptor[
5440 "scaling-config-action"
5441 ]:
5442 if (
5443 scaling_config_action.get("trigger") == "pre-scale-in"
5444 and scaling_type == "SCALE_IN"
5445 ) or (
5446 scaling_config_action.get("trigger") == "pre-scale-out"
5447 and scaling_type == "SCALE_OUT"
5448 ):
5449 vnf_config_primitive = scaling_config_action[
5450 "vnf-config-primitive-name-ref"
5451 ]
5452 step = db_nslcmop_update[
5453 "detailed-status"
5454 ] = "executing pre-scale scaling-config-action '{}'".format(
5455 vnf_config_primitive
5456 )
5457
5458 # look for primitive
5459 for config_primitive in (
5460 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5461 ).get("config-primitive", ()):
5462 if config_primitive["name"] == vnf_config_primitive:
5463 break
5464 else:
5465 raise LcmException(
5466 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5467 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5468 "primitive".format(scaling_group, vnf_config_primitive)
5469 )
5470
5471 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5472 if db_vnfr.get("additionalParamsForVnf"):
5473 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5474
5475 scale_process = "VCA"
5476 db_nsr_update["config-status"] = "configuring pre-scaling"
5477 primitive_params = self._map_primitive_params(
5478 config_primitive, {}, vnfr_params
5479 )
5480
5481 # Pre-scale retry check: Check if this sub-operation has been executed before
5482 op_index = self._check_or_add_scale_suboperation(
5483 db_nslcmop,
5484 vnf_index,
5485 vnf_config_primitive,
5486 primitive_params,
5487 "PRE-SCALE",
5488 )
5489 if op_index == self.SUBOPERATION_STATUS_SKIP:
5490 # Skip sub-operation
5491 result = "COMPLETED"
5492 result_detail = "Done"
5493 self.logger.debug(
5494 logging_text
5495 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5496 vnf_config_primitive, result, result_detail
5497 )
5498 )
5499 else:
5500 if op_index == self.SUBOPERATION_STATUS_NEW:
5501 # New sub-operation: Get index of this sub-operation
5502 op_index = (
5503 len(db_nslcmop.get("_admin", {}).get("operations"))
5504 - 1
5505 )
5506 self.logger.debug(
5507 logging_text
5508 + "vnf_config_primitive={} New sub-operation".format(
5509 vnf_config_primitive
5510 )
5511 )
5512 else:
5513 # retry: Get registered params for this existing sub-operation
5514 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5515 op_index
5516 ]
5517 vnf_index = op.get("member_vnf_index")
5518 vnf_config_primitive = op.get("primitive")
5519 primitive_params = op.get("primitive_params")
5520 self.logger.debug(
5521 logging_text
5522 + "vnf_config_primitive={} Sub-operation retry".format(
5523 vnf_config_primitive
5524 )
5525 )
5526 # Execute the primitive, either with new (first-time) or registered (reintent) args
5527 ee_descriptor_id = config_primitive.get(
5528 "execution-environment-ref"
5529 )
5530 primitive_name = config_primitive.get(
5531 "execution-environment-primitive", vnf_config_primitive
5532 )
5533 ee_id, vca_type = self._look_for_deployed_vca(
5534 nsr_deployed["VCA"],
5535 member_vnf_index=vnf_index,
5536 vdu_id=None,
5537 vdu_count_index=None,
5538 ee_descriptor_id=ee_descriptor_id,
5539 )
5540 result, result_detail = await self._ns_execute_primitive(
5541 ee_id,
5542 primitive_name,
5543 primitive_params,
5544 vca_type=vca_type,
5545 vca_id=vca_id,
5546 )
5547 self.logger.debug(
5548 logging_text
5549 + "vnf_config_primitive={} Done with result {} {}".format(
5550 vnf_config_primitive, result, result_detail
5551 )
5552 )
5553 # Update operationState = COMPLETED | FAILED
5554 self._update_suboperation_status(
5555 db_nslcmop, op_index, result, result_detail
5556 )
5557
5558 if result == "FAILED":
5559 raise LcmException(result_detail)
5560 db_nsr_update["config-status"] = old_config_status
5561 scale_process = None
5562 # PRE-SCALE END
5563
5564 db_nsr_update[
5565 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5566 ] = nb_scale_op
5567 db_nsr_update[
5568 "_admin.scaling-group.{}.time".format(admin_scale_index)
5569 ] = time()
5570
5571 # SCALE-IN VCA - BEGIN
5572 if vca_scaling_info:
5573 step = db_nslcmop_update[
5574 "detailed-status"
5575 ] = "Deleting the execution environments"
5576 scale_process = "VCA"
5577 for vca_info in vca_scaling_info:
5578 if vca_info["type"] == "delete":
5579 member_vnf_index = str(vca_info["member-vnf-index"])
5580 self.logger.debug(
5581 logging_text + "vdu info: {}".format(vca_info)
5582 )
5583 if vca_info.get("osm_vdu_id"):
5584 vdu_id = vca_info["osm_vdu_id"]
5585 vdu_index = int(vca_info["vdu_index"])
5586 stage[
5587 1
5588 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5589 member_vnf_index, vdu_id, vdu_index
5590 )
5591 else:
5592 vdu_index = 0
5593 kdu_id = vca_info["osm_kdu_id"]
5594 stage[
5595 1
5596 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5597 member_vnf_index, kdu_id, vdu_index
5598 )
5599 stage[2] = step = "Scaling in VCA"
5600 self._write_op_status(op_id=nslcmop_id, stage=stage)
5601 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5602 config_update = db_nsr["configurationStatus"]
5603 for vca_index, vca in enumerate(vca_update):
5604 if (
5605 (vca or vca.get("ee_id"))
5606 and vca["member-vnf-index"] == member_vnf_index
5607 and vca["vdu_count_index"] == vdu_index
5608 ):
5609 if vca.get("vdu_id"):
5610 config_descriptor = get_configuration(
5611 db_vnfd, vca.get("vdu_id")
5612 )
5613 elif vca.get("kdu_name"):
5614 config_descriptor = get_configuration(
5615 db_vnfd, vca.get("kdu_name")
5616 )
5617 else:
5618 config_descriptor = get_configuration(
5619 db_vnfd, db_vnfd["id"]
5620 )
5621 operation_params = (
5622 db_nslcmop.get("operationParams") or {}
5623 )
5624 exec_terminate_primitives = not operation_params.get(
5625 "skip_terminate_primitives"
5626 ) and vca.get("needed_terminate")
5627 task = asyncio.ensure_future(
5628 asyncio.wait_for(
5629 self.destroy_N2VC(
5630 logging_text,
5631 db_nslcmop,
5632 vca,
5633 config_descriptor,
5634 vca_index,
5635 destroy_ee=True,
5636 exec_primitives=exec_terminate_primitives,
5637 scaling_in=True,
5638 vca_id=vca_id,
5639 ),
5640 timeout=self.timeout_charm_delete,
5641 )
5642 )
5643 tasks_dict_info[task] = "Terminating VCA {}".format(
5644 vca.get("ee_id")
5645 )
5646 del vca_update[vca_index]
5647 del config_update[vca_index]
5648 # wait for pending tasks of terminate primitives
5649 if tasks_dict_info:
5650 self.logger.debug(
5651 logging_text
5652 + "Waiting for tasks {}".format(
5653 list(tasks_dict_info.keys())
5654 )
5655 )
5656 error_list = await self._wait_for_tasks(
5657 logging_text,
5658 tasks_dict_info,
5659 min(
5660 self.timeout_charm_delete, self.timeout_ns_terminate
5661 ),
5662 stage,
5663 nslcmop_id,
5664 )
5665 tasks_dict_info.clear()
5666 if error_list:
5667 raise LcmException("; ".join(error_list))
5668
5669 db_vca_and_config_update = {
5670 "_admin.deployed.VCA": vca_update,
5671 "configurationStatus": config_update,
5672 }
5673 self.update_db_2(
5674 "nsrs", db_nsr["_id"], db_vca_and_config_update
5675 )
5676 scale_process = None
5677 # SCALE-IN VCA - END
5678
5679 # SCALE RO - BEGIN
5680 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5681 scale_process = "RO"
5682 if self.ro_config.get("ng"):
5683 await self._scale_ng_ro(
5684 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5685 )
5686 scaling_info.pop("vdu-create", None)
5687 scaling_info.pop("vdu-delete", None)
5688
5689 scale_process = None
5690 # SCALE RO - END
5691
5692 # SCALE KDU - BEGIN
5693 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5694 scale_process = "KDU"
5695 await self._scale_kdu(
5696 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5697 )
5698 scaling_info.pop("kdu-create", None)
5699 scaling_info.pop("kdu-delete", None)
5700
5701 scale_process = None
5702 # SCALE KDU - END
5703
5704 if db_nsr_update:
5705 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5706
5707 # SCALE-UP VCA - BEGIN
5708 if vca_scaling_info:
5709 step = db_nslcmop_update[
5710 "detailed-status"
5711 ] = "Creating new execution environments"
5712 scale_process = "VCA"
5713 for vca_info in vca_scaling_info:
5714 if vca_info["type"] == "create":
5715 member_vnf_index = str(vca_info["member-vnf-index"])
5716 self.logger.debug(
5717 logging_text + "vdu info: {}".format(vca_info)
5718 )
5719 vnfd_id = db_vnfr["vnfd-ref"]
5720 if vca_info.get("osm_vdu_id"):
5721 vdu_index = int(vca_info["vdu_index"])
5722 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5723 if db_vnfr.get("additionalParamsForVnf"):
5724 deploy_params.update(
5725 parse_yaml_strings(
5726 db_vnfr["additionalParamsForVnf"].copy()
5727 )
5728 )
5729 descriptor_config = get_configuration(
5730 db_vnfd, db_vnfd["id"]
5731 )
5732 if descriptor_config:
5733 vdu_id = None
5734 vdu_name = None
5735 kdu_name = None
5736 self._deploy_n2vc(
5737 logging_text=logging_text
5738 + "member_vnf_index={} ".format(member_vnf_index),
5739 db_nsr=db_nsr,
5740 db_vnfr=db_vnfr,
5741 nslcmop_id=nslcmop_id,
5742 nsr_id=nsr_id,
5743 nsi_id=nsi_id,
5744 vnfd_id=vnfd_id,
5745 vdu_id=vdu_id,
5746 kdu_name=kdu_name,
5747 member_vnf_index=member_vnf_index,
5748 vdu_index=vdu_index,
5749 vdu_name=vdu_name,
5750 deploy_params=deploy_params,
5751 descriptor_config=descriptor_config,
5752 base_folder=base_folder,
5753 task_instantiation_info=tasks_dict_info,
5754 stage=stage,
5755 )
5756 vdu_id = vca_info["osm_vdu_id"]
5757 vdur = find_in_list(
5758 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5759 )
5760 descriptor_config = get_configuration(db_vnfd, vdu_id)
5761 if vdur.get("additionalParams"):
5762 deploy_params_vdu = parse_yaml_strings(
5763 vdur["additionalParams"]
5764 )
5765 else:
5766 deploy_params_vdu = deploy_params
5767 deploy_params_vdu["OSM"] = get_osm_params(
5768 db_vnfr, vdu_id, vdu_count_index=vdu_index
5769 )
5770 if descriptor_config:
5771 vdu_name = None
5772 kdu_name = None
5773 stage[
5774 1
5775 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5776 member_vnf_index, vdu_id, vdu_index
5777 )
5778 stage[2] = step = "Scaling out VCA"
5779 self._write_op_status(op_id=nslcmop_id, stage=stage)
5780 self._deploy_n2vc(
5781 logging_text=logging_text
5782 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5783 member_vnf_index, vdu_id, vdu_index
5784 ),
5785 db_nsr=db_nsr,
5786 db_vnfr=db_vnfr,
5787 nslcmop_id=nslcmop_id,
5788 nsr_id=nsr_id,
5789 nsi_id=nsi_id,
5790 vnfd_id=vnfd_id,
5791 vdu_id=vdu_id,
5792 kdu_name=kdu_name,
5793 member_vnf_index=member_vnf_index,
5794 vdu_index=vdu_index,
5795 vdu_name=vdu_name,
5796 deploy_params=deploy_params_vdu,
5797 descriptor_config=descriptor_config,
5798 base_folder=base_folder,
5799 task_instantiation_info=tasks_dict_info,
5800 stage=stage,
5801 )
5802 else:
5803 kdu_name = vca_info["osm_kdu_id"]
5804 descriptor_config = get_configuration(db_vnfd, kdu_name)
5805 if descriptor_config:
5806 vdu_id = None
5807 kdu_index = int(vca_info["kdu_index"])
5808 vdu_name = None
5809 kdur = next(
5810 x
5811 for x in db_vnfr["kdur"]
5812 if x["kdu-name"] == kdu_name
5813 )
5814 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5815 if kdur.get("additionalParams"):
5816 deploy_params_kdu = parse_yaml_strings(
5817 kdur["additionalParams"]
5818 )
5819
5820 self._deploy_n2vc(
5821 logging_text=logging_text,
5822 db_nsr=db_nsr,
5823 db_vnfr=db_vnfr,
5824 nslcmop_id=nslcmop_id,
5825 nsr_id=nsr_id,
5826 nsi_id=nsi_id,
5827 vnfd_id=vnfd_id,
5828 vdu_id=vdu_id,
5829 kdu_name=kdu_name,
5830 member_vnf_index=member_vnf_index,
5831 vdu_index=kdu_index,
5832 vdu_name=vdu_name,
5833 deploy_params=deploy_params_kdu,
5834 descriptor_config=descriptor_config,
5835 base_folder=base_folder,
5836 task_instantiation_info=tasks_dict_info,
5837 stage=stage,
5838 )
5839 # SCALE-UP VCA - END
5840 scale_process = None
5841
5842 # POST-SCALE BEGIN
5843 # execute primitive service POST-SCALING
5844 step = "Executing post-scale vnf-config-primitive"
5845 if scaling_descriptor.get("scaling-config-action"):
5846 for scaling_config_action in scaling_descriptor[
5847 "scaling-config-action"
5848 ]:
5849 if (
5850 scaling_config_action.get("trigger") == "post-scale-in"
5851 and scaling_type == "SCALE_IN"
5852 ) or (
5853 scaling_config_action.get("trigger") == "post-scale-out"
5854 and scaling_type == "SCALE_OUT"
5855 ):
5856 vnf_config_primitive = scaling_config_action[
5857 "vnf-config-primitive-name-ref"
5858 ]
5859 step = db_nslcmop_update[
5860 "detailed-status"
5861 ] = "executing post-scale scaling-config-action '{}'".format(
5862 vnf_config_primitive
5863 )
5864
5865 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5866 if db_vnfr.get("additionalParamsForVnf"):
5867 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5868
5869 # look for primitive
5870 for config_primitive in (
5871 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5872 ).get("config-primitive", ()):
5873 if config_primitive["name"] == vnf_config_primitive:
5874 break
5875 else:
5876 raise LcmException(
5877 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5878 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5879 "config-primitive".format(
5880 scaling_group, vnf_config_primitive
5881 )
5882 )
5883 scale_process = "VCA"
5884 db_nsr_update["config-status"] = "configuring post-scaling"
5885 primitive_params = self._map_primitive_params(
5886 config_primitive, {}, vnfr_params
5887 )
5888
5889 # Post-scale retry check: Check if this sub-operation has been executed before
5890 op_index = self._check_or_add_scale_suboperation(
5891 db_nslcmop,
5892 vnf_index,
5893 vnf_config_primitive,
5894 primitive_params,
5895 "POST-SCALE",
5896 )
5897 if op_index == self.SUBOPERATION_STATUS_SKIP:
5898 # Skip sub-operation
5899 result = "COMPLETED"
5900 result_detail = "Done"
5901 self.logger.debug(
5902 logging_text
5903 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5904 vnf_config_primitive, result, result_detail
5905 )
5906 )
5907 else:
5908 if op_index == self.SUBOPERATION_STATUS_NEW:
5909 # New sub-operation: Get index of this sub-operation
5910 op_index = (
5911 len(db_nslcmop.get("_admin", {}).get("operations"))
5912 - 1
5913 )
5914 self.logger.debug(
5915 logging_text
5916 + "vnf_config_primitive={} New sub-operation".format(
5917 vnf_config_primitive
5918 )
5919 )
5920 else:
5921 # retry: Get registered params for this existing sub-operation
5922 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5923 op_index
5924 ]
5925 vnf_index = op.get("member_vnf_index")
5926 vnf_config_primitive = op.get("primitive")
5927 primitive_params = op.get("primitive_params")
5928 self.logger.debug(
5929 logging_text
5930 + "vnf_config_primitive={} Sub-operation retry".format(
5931 vnf_config_primitive
5932 )
5933 )
5934 # Execute the primitive, either with new (first-time) or registered (reintent) args
5935 ee_descriptor_id = config_primitive.get(
5936 "execution-environment-ref"
5937 )
5938 primitive_name = config_primitive.get(
5939 "execution-environment-primitive", vnf_config_primitive
5940 )
5941 ee_id, vca_type = self._look_for_deployed_vca(
5942 nsr_deployed["VCA"],
5943 member_vnf_index=vnf_index,
5944 vdu_id=None,
5945 vdu_count_index=None,
5946 ee_descriptor_id=ee_descriptor_id,
5947 )
5948 result, result_detail = await self._ns_execute_primitive(
5949 ee_id,
5950 primitive_name,
5951 primitive_params,
5952 vca_type=vca_type,
5953 vca_id=vca_id,
5954 )
5955 self.logger.debug(
5956 logging_text
5957 + "vnf_config_primitive={} Done with result {} {}".format(
5958 vnf_config_primitive, result, result_detail
5959 )
5960 )
5961 # Update operationState = COMPLETED | FAILED
5962 self._update_suboperation_status(
5963 db_nslcmop, op_index, result, result_detail
5964 )
5965
5966 if result == "FAILED":
5967 raise LcmException(result_detail)
5968 db_nsr_update["config-status"] = old_config_status
5969 scale_process = None
5970 # POST-SCALE END
5971
5972 db_nsr_update[
5973 "detailed-status"
5974 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5975 db_nsr_update["operational-status"] = (
5976 "running"
5977 if old_operational_status == "failed"
5978 else old_operational_status
5979 )
5980 db_nsr_update["config-status"] = old_config_status
5981 return
5982 except (
5983 ROclient.ROClientException,
5984 DbException,
5985 LcmException,
5986 NgRoException,
5987 ) as e:
5988 self.logger.error(logging_text + "Exit Exception {}".format(e))
5989 exc = e
5990 except asyncio.CancelledError:
5991 self.logger.error(
5992 logging_text + "Cancelled Exception while '{}'".format(step)
5993 )
5994 exc = "Operation was cancelled"
5995 except Exception as e:
5996 exc = traceback.format_exc()
5997 self.logger.critical(
5998 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5999 exc_info=True,
6000 )
6001 finally:
6002 self._write_ns_status(
6003 nsr_id=nsr_id,
6004 ns_state=None,
6005 current_operation="IDLE",
6006 current_operation_id=None,
6007 )
6008 if tasks_dict_info:
6009 stage[1] = "Waiting for instantiate pending tasks."
6010 self.logger.debug(logging_text + stage[1])
6011 exc = await self._wait_for_tasks(
6012 logging_text,
6013 tasks_dict_info,
6014 self.timeout_ns_deploy,
6015 stage,
6016 nslcmop_id,
6017 nsr_id=nsr_id,
6018 )
6019 if exc:
6020 db_nslcmop_update[
6021 "detailed-status"
6022 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6023 nslcmop_operation_state = "FAILED"
6024 if db_nsr:
6025 db_nsr_update["operational-status"] = old_operational_status
6026 db_nsr_update["config-status"] = old_config_status
6027 db_nsr_update["detailed-status"] = ""
6028 if scale_process:
6029 if "VCA" in scale_process:
6030 db_nsr_update["config-status"] = "failed"
6031 if "RO" in scale_process:
6032 db_nsr_update["operational-status"] = "failed"
6033 db_nsr_update[
6034 "detailed-status"
6035 ] = "FAILED scaling nslcmop={} {}: {}".format(
6036 nslcmop_id, step, exc
6037 )
6038 else:
6039 error_description_nslcmop = None
6040 nslcmop_operation_state = "COMPLETED"
6041 db_nslcmop_update["detailed-status"] = "Done"
6042
6043 self._write_op_status(
6044 op_id=nslcmop_id,
6045 stage="",
6046 error_message=error_description_nslcmop,
6047 operation_state=nslcmop_operation_state,
6048 other_update=db_nslcmop_update,
6049 )
6050 if db_nsr:
6051 self._write_ns_status(
6052 nsr_id=nsr_id,
6053 ns_state=None,
6054 current_operation="IDLE",
6055 current_operation_id=None,
6056 other_update=db_nsr_update,
6057 )
6058
6059 if nslcmop_operation_state:
6060 try:
6061 msg = {
6062 "nsr_id": nsr_id,
6063 "nslcmop_id": nslcmop_id,
6064 "operationState": nslcmop_operation_state,
6065 }
6066 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6067 except Exception as e:
6068 self.logger.error(
6069 logging_text + "kafka_write notification Exception {}".format(e)
6070 )
6071 self.logger.debug(logging_text + "Exit")
6072 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6073
6074 async def _scale_kdu(
6075 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6076 ):
6077 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6078 for kdu_name in _scaling_info:
6079 for kdu_scaling_info in _scaling_info[kdu_name]:
6080 deployed_kdu, index = get_deployed_kdu(
6081 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6082 )
6083 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6084 kdu_instance = deployed_kdu["kdu-instance"]
6085 scale = int(kdu_scaling_info["scale"])
6086 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6087
6088 db_dict = {
6089 "collection": "nsrs",
6090 "filter": {"_id": nsr_id},
6091 "path": "_admin.deployed.K8s.{}".format(index),
6092 }
6093
6094 step = "scaling application {}".format(
6095 kdu_scaling_info["resource-name"]
6096 )
6097 self.logger.debug(logging_text + step)
6098
6099 if kdu_scaling_info["type"] == "delete":
6100 kdu_config = get_configuration(db_vnfd, kdu_name)
6101 if (
6102 kdu_config
6103 and kdu_config.get("terminate-config-primitive")
6104 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6105 ):
6106 terminate_config_primitive_list = kdu_config.get(
6107 "terminate-config-primitive"
6108 )
6109 terminate_config_primitive_list.sort(
6110 key=lambda val: int(val["seq"])
6111 )
6112
6113 for (
6114 terminate_config_primitive
6115 ) in terminate_config_primitive_list:
6116 primitive_params_ = self._map_primitive_params(
6117 terminate_config_primitive, {}, {}
6118 )
6119 step = "execute terminate config primitive"
6120 self.logger.debug(logging_text + step)
6121 await asyncio.wait_for(
6122 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6123 cluster_uuid=cluster_uuid,
6124 kdu_instance=kdu_instance,
6125 primitive_name=terminate_config_primitive["name"],
6126 params=primitive_params_,
6127 db_dict=db_dict,
6128 vca_id=vca_id,
6129 ),
6130 timeout=600,
6131 )
6132
6133 await asyncio.wait_for(
6134 self.k8scluster_map[k8s_cluster_type].scale(
6135 kdu_instance,
6136 scale,
6137 kdu_scaling_info["resource-name"],
6138 vca_id=vca_id,
6139 ),
6140 timeout=self.timeout_vca_on_error,
6141 )
6142
6143 if kdu_scaling_info["type"] == "create":
6144 kdu_config = get_configuration(db_vnfd, kdu_name)
6145 if (
6146 kdu_config
6147 and kdu_config.get("initial-config-primitive")
6148 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6149 ):
6150 initial_config_primitive_list = kdu_config.get(
6151 "initial-config-primitive"
6152 )
6153 initial_config_primitive_list.sort(
6154 key=lambda val: int(val["seq"])
6155 )
6156
6157 for initial_config_primitive in initial_config_primitive_list:
6158 primitive_params_ = self._map_primitive_params(
6159 initial_config_primitive, {}, {}
6160 )
6161 step = "execute initial config primitive"
6162 self.logger.debug(logging_text + step)
6163 await asyncio.wait_for(
6164 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6165 cluster_uuid=cluster_uuid,
6166 kdu_instance=kdu_instance,
6167 primitive_name=initial_config_primitive["name"],
6168 params=primitive_params_,
6169 db_dict=db_dict,
6170 vca_id=vca_id,
6171 ),
6172 timeout=600,
6173 )
6174
6175 async def _scale_ng_ro(
6176 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6177 ):
6178 nsr_id = db_nslcmop["nsInstanceId"]
6179 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6180 db_vnfrs = {}
6181
6182 # read from db: vnfd's for every vnf
6183 db_vnfds = []
6184
6185 # for each vnf in ns, read vnfd
6186 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6187 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6188 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6189 # if we haven't this vnfd, read it from db
6190 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6191 # read from db
6192 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6193 db_vnfds.append(vnfd)
6194 n2vc_key = self.n2vc.get_public_key()
6195 n2vc_key_list = [n2vc_key]
6196 self.scale_vnfr(
6197 db_vnfr,
6198 vdu_scaling_info.get("vdu-create"),
6199 vdu_scaling_info.get("vdu-delete"),
6200 mark_delete=True,
6201 )
6202 # db_vnfr has been updated, update db_vnfrs to use it
6203 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6204 await self._instantiate_ng_ro(
6205 logging_text,
6206 nsr_id,
6207 db_nsd,
6208 db_nsr,
6209 db_nslcmop,
6210 db_vnfrs,
6211 db_vnfds,
6212 n2vc_key_list,
6213 stage=stage,
6214 start_deploy=time(),
6215 timeout_ns_deploy=self.timeout_ns_deploy,
6216 )
6217 if vdu_scaling_info.get("vdu-delete"):
6218 self.scale_vnfr(
6219 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6220 )
6221
6222 async def add_prometheus_metrics(
6223 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6224 ):
6225 if not self.prometheus:
6226 return
6227 # look if exist a file called 'prometheus*.j2' and
6228 artifact_content = self.fs.dir_ls(artifact_path)
6229 job_file = next(
6230 (
6231 f
6232 for f in artifact_content
6233 if f.startswith("prometheus") and f.endswith(".j2")
6234 ),
6235 None,
6236 )
6237 if not job_file:
6238 return
6239 with self.fs.file_open((artifact_path, job_file), "r") as f:
6240 job_data = f.read()
6241
6242 # TODO get_service
6243 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6244 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6245 host_port = "80"
6246 vnfr_id = vnfr_id.replace("-", "")
6247 variables = {
6248 "JOB_NAME": vnfr_id,
6249 "TARGET_IP": target_ip,
6250 "EXPORTER_POD_IP": host_name,
6251 "EXPORTER_POD_PORT": host_port,
6252 }
6253 job_list = self.prometheus.parse_job(job_data, variables)
6254 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6255 for job in job_list:
6256 if (
6257 not isinstance(job.get("job_name"), str)
6258 or vnfr_id not in job["job_name"]
6259 ):
6260 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6261 job["nsr_id"] = nsr_id
6262 job_dict = {jl["job_name"]: jl for jl in job_list}
6263 if await self.prometheus.update(job_dict):
6264 return list(job_dict.keys())
6265
6266 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6267 """
6268 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6269
6270 :param: vim_account_id: VIM Account ID
6271
6272 :return: (cloud_name, cloud_credential)
6273 """
6274 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6275 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6276
6277 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6278 """
6279 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6280
6281 :param: vim_account_id: VIM Account ID
6282
6283 :return: (cloud_name, cloud_credential)
6284 """
6285 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6286 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")