Bug 2002 fixed: the namespace for the Juju Bundle is now updated within the KDU insta...
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73
74 from n2vc.n2vc_juju_conn import N2VCJujuConnector
75 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
76
77 from osm_lcm.lcm_helm_conn import LCMHelmConn
78
79 from copy import copy, deepcopy
80 from time import time
81 from uuid import uuid4
82
83 from random import randint
84
85 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
86
87
88 class NsLcm(LcmBase):
89 timeout_vca_on_error = (
90 5 * 60
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete = 10 * 60
95 timeout_primitive = 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive = (
97 10 * 60
98 ) # timeout for some progress in a primitive execution
99
100 SUBOPERATION_STATUS_NOT_FOUND = -1
101 SUBOPERATION_STATUS_NEW = -2
102 SUBOPERATION_STATUS_SKIP = -3
103 task_name_deploy_vca = "Deploying VCA"
104
105 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
106 """
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
109 :return: None
110 """
111 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
112
113 self.db = Database().instance.db
114 self.fs = Filesystem().instance.fs
115 self.loop = loop
116 self.lcm_tasks = lcm_tasks
117 self.timeout = config["timeout"]
118 self.ro_config = config["ro_config"]
119 self.ng_ro = config["ro_config"].get("ng")
120 self.vca_config = config["VCA"].copy()
121
122 # create N2VC connector
123 self.n2vc = N2VCJujuConnector(
124 log=self.logger,
125 loop=self.loop,
126 on_update_db=self._on_update_n2vc_db,
127 fs=self.fs,
128 db=self.db,
129 )
130
131 self.conn_helm_ee = LCMHelmConn(
132 log=self.logger,
133 loop=self.loop,
134 vca_config=self.vca_config,
135 on_update_db=self._on_update_n2vc_db,
136 )
137
138 self.k8sclusterhelm2 = K8sHelmConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 helm_command=self.vca_config.get("helmpath"),
141 log=self.logger,
142 on_update_db=None,
143 fs=self.fs,
144 db=self.db,
145 )
146
147 self.k8sclusterhelm3 = K8sHelm3Connector(
148 kubectl_command=self.vca_config.get("kubectlpath"),
149 helm_command=self.vca_config.get("helm3path"),
150 fs=self.fs,
151 log=self.logger,
152 db=self.db,
153 on_update_db=None,
154 )
155
156 self.k8sclusterjuju = K8sJujuConnector(
157 kubectl_command=self.vca_config.get("kubectlpath"),
158 juju_command=self.vca_config.get("jujupath"),
159 log=self.logger,
160 loop=self.loop,
161 on_update_db=self._on_update_k8s_db,
162 fs=self.fs,
163 db=self.db,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee,
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 self.RO = NgRoClient(self.loop, **self.ro_config)
186
187 @staticmethod
188 def increment_ip_mac(ip_mac, vm_index=1):
189 if not isinstance(ip_mac, str):
190 return ip_mac
191 try:
192 # try with ipv4 look for last dot
193 i = ip_mac.rfind(".")
194 if i > 0:
195 i += 1
196 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i = ip_mac.rfind(":")
199 if i > 0:
200 i += 1
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
203 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
204 )
205 except Exception:
206 pass
207 return None
208
209 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
210
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
212
213 try:
214 # TODO filter RO descriptor fields...
215
216 # write to database
217 db_dict = dict()
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict["deploymentStatus"] = ro_descriptor
220 self.update_db_2("nsrs", nsrs_id, db_dict)
221
222 except Exception as e:
223 self.logger.warn(
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
225 )
226
227 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
228
229 # remove last dot from path (if exists)
230 if path.endswith("."):
231 path = path[:-1]
232
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
235 try:
236
237 nsr_id = filter.get("_id")
238
239 # read ns record from database
240 nsr = self.db.get_one(table="nsrs", q_filter=filter)
241 current_ns_status = nsr.get("nsState")
242
243 # get vca status for NS
244 status_dict = await self.n2vc.get_status(
245 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
246 )
247
248 # vcaStatus
249 db_dict = dict()
250 db_dict["vcaStatus"] = status_dict
251 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
252
253 # update configurationStatus for this VCA
254 try:
255 vca_index = int(path[path.rfind(".") + 1 :])
256
257 vca_list = deep_get(
258 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
259 )
260 vca_status = vca_list[vca_index].get("status")
261
262 configuration_status_list = nsr.get("configurationStatus")
263 config_status = configuration_status_list[vca_index].get("status")
264
265 if config_status == "BROKEN" and vca_status != "failed":
266 db_dict["configurationStatus"][vca_index] = "READY"
267 elif config_status != "BROKEN" and vca_status == "failed":
268 db_dict["configurationStatus"][vca_index] = "BROKEN"
269 except Exception as e:
270 # not update configurationStatus
271 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
272
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
275 is_degraded = False
276 if current_ns_status in ("READY", "DEGRADED"):
277 error_description = ""
278 # check machines
279 if status_dict.get("machines"):
280 for machine_id in status_dict.get("machines"):
281 machine = status_dict.get("machines").get(machine_id)
282 # check machine agent-status
283 if machine.get("agent-status"):
284 s = machine.get("agent-status").get("status")
285 if s != "started":
286 is_degraded = True
287 error_description += (
288 "machine {} agent-status={} ; ".format(
289 machine_id, s
290 )
291 )
292 # check machine instance status
293 if machine.get("instance-status"):
294 s = machine.get("instance-status").get("status")
295 if s != "running":
296 is_degraded = True
297 error_description += (
298 "machine {} instance-status={} ; ".format(
299 machine_id, s
300 )
301 )
302 # check applications
303 if status_dict.get("applications"):
304 for app_id in status_dict.get("applications"):
305 app = status_dict.get("applications").get(app_id)
306 # check application status
307 if app.get("status"):
308 s = app.get("status").get("status")
309 if s != "active":
310 is_degraded = True
311 error_description += (
312 "application {} status={} ; ".format(app_id, s)
313 )
314
315 if error_description:
316 db_dict["errorDescription"] = error_description
317 if current_ns_status == "READY" and is_degraded:
318 db_dict["nsState"] = "DEGRADED"
319 if current_ns_status == "DEGRADED" and not is_degraded:
320 db_dict["nsState"] = "READY"
321
322 # write to database
323 self.update_db_2("nsrs", nsr_id, db_dict)
324
325 except (asyncio.CancelledError, asyncio.TimeoutError):
326 raise
327 except Exception as e:
328 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
329
330 async def _on_update_k8s_db(
331 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
332 ):
333 """
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :cluster_type: The cluster type (juju, k8s)
339 :return: none
340 """
341
342 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
343 # .format(cluster_uuid, kdu_instance, filter))
344
345 nsr_id = filter.get("_id")
346 try:
347 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
348 cluster_uuid=cluster_uuid,
349 kdu_instance=kdu_instance,
350 yaml_format=False,
351 complete_status=True,
352 vca_id=vca_id,
353 )
354
355 # vcaStatus
356 db_dict = dict()
357 db_dict["vcaStatus"] = {nsr_id: vca_status}
358
359 if cluster_type in ("juju-bundle", "juju"):
360 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
361 # status in a similar way between Juju Bundles and Helm Charts on this side
362 await self.k8sclusterjuju.update_vca_status(
363 db_dict["vcaStatus"],
364 kdu_instance,
365 vca_id=vca_id,
366 )
367
368 self.logger.debug(
369 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
370 )
371
372 # write to database
373 self.update_db_2("nsrs", nsr_id, db_dict)
374 except (asyncio.CancelledError, asyncio.TimeoutError):
375 raise
376 except Exception as e:
377 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
378
379 @staticmethod
380 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
381 try:
382 env = Environment(undefined=StrictUndefined)
383 template = env.from_string(cloud_init_text)
384 return template.render(additional_params or {})
385 except UndefinedError as e:
386 raise LcmException(
387 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
388 "file, must be provided in the instantiation parameters inside the "
389 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
390 )
391 except (TemplateError, TemplateNotFound) as e:
392 raise LcmException(
393 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
394 vnfd_id, vdu_id, e
395 )
396 )
397
398 def _get_vdu_cloud_init_content(self, vdu, vnfd):
399 cloud_init_content = cloud_init_file = None
400 try:
401 if vdu.get("cloud-init-file"):
402 base_folder = vnfd["_admin"]["storage"]
403 cloud_init_file = "{}/{}/cloud_init/{}".format(
404 base_folder["folder"],
405 base_folder["pkg-dir"],
406 vdu["cloud-init-file"],
407 )
408 with self.fs.file_open(cloud_init_file, "r") as ci_file:
409 cloud_init_content = ci_file.read()
410 elif vdu.get("cloud-init"):
411 cloud_init_content = vdu["cloud-init"]
412
413 return cloud_init_content
414 except FsException as e:
415 raise LcmException(
416 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
417 vnfd["id"], vdu["id"], cloud_init_file, e
418 )
419 )
420
421 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
422 vdur = next(
423 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
424 {}
425 )
426 additional_params = vdur.get("additionalParams")
427 return parse_yaml_strings(additional_params)
428
429 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
430 """
431 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
432 :param vnfd: input vnfd
433 :param new_id: overrides vnf id if provided
434 :param additionalParams: Instantiation params for VNFs provided
435 :param nsrId: Id of the NSR
436 :return: copy of vnfd
437 """
438 vnfd_RO = deepcopy(vnfd)
439 # remove unused by RO configuration, monitoring, scaling and internal keys
440 vnfd_RO.pop("_id", None)
441 vnfd_RO.pop("_admin", None)
442 vnfd_RO.pop("monitoring-param", None)
443 vnfd_RO.pop("scaling-group-descriptor", None)
444 vnfd_RO.pop("kdu", None)
445 vnfd_RO.pop("k8s-cluster", None)
446 if new_id:
447 vnfd_RO["id"] = new_id
448
449 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
450 for vdu in get_iterable(vnfd_RO, "vdu"):
451 vdu.pop("cloud-init-file", None)
452 vdu.pop("cloud-init", None)
453 return vnfd_RO
454
455 @staticmethod
456 def ip_profile_2_RO(ip_profile):
457 RO_ip_profile = deepcopy(ip_profile)
458 if "dns-server" in RO_ip_profile:
459 if isinstance(RO_ip_profile["dns-server"], list):
460 RO_ip_profile["dns-address"] = []
461 for ds in RO_ip_profile.pop("dns-server"):
462 RO_ip_profile["dns-address"].append(ds["address"])
463 else:
464 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
465 if RO_ip_profile.get("ip-version") == "ipv4":
466 RO_ip_profile["ip-version"] = "IPv4"
467 if RO_ip_profile.get("ip-version") == "ipv6":
468 RO_ip_profile["ip-version"] = "IPv6"
469 if "dhcp-params" in RO_ip_profile:
470 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
471 return RO_ip_profile
472
473 def _get_ro_vim_id_for_vim_account(self, vim_account):
474 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
475 if db_vim["_admin"]["operationalState"] != "ENABLED":
476 raise LcmException(
477 "VIM={} is not available. operationalState={}".format(
478 vim_account, db_vim["_admin"]["operationalState"]
479 )
480 )
481 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
482 return RO_vim_id
483
484 def get_ro_wim_id_for_wim_account(self, wim_account):
485 if isinstance(wim_account, str):
486 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
487 if db_wim["_admin"]["operationalState"] != "ENABLED":
488 raise LcmException(
489 "WIM={} is not available. operationalState={}".format(
490 wim_account, db_wim["_admin"]["operationalState"]
491 )
492 )
493 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
494 return RO_wim_id
495 else:
496 return wim_account
497
498 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
499
500 db_vdu_push_list = []
501 template_vdur = []
502 db_update = {"_admin.modified": time()}
503 if vdu_create:
504 for vdu_id, vdu_count in vdu_create.items():
505 vdur = next(
506 (
507 vdur
508 for vdur in reversed(db_vnfr["vdur"])
509 if vdur["vdu-id-ref"] == vdu_id
510 ),
511 None,
512 )
513 if not vdur:
514 # Read the template saved in the db:
515 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
516 vdur_template = db_vnfr.get("vdur-template")
517 if not vdur_template:
518 raise LcmException(
519 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
520 vdu_id
521 )
522 )
523 vdur = vdur_template[0]
524 #Delete a template from the database after using it
525 self.db.set_one("vnfrs",
526 {"_id": db_vnfr["_id"]},
527 None,
528 pull={"vdur-template": {"_id": vdur['_id']}}
529 )
530 for count in range(vdu_count):
531 vdur_copy = deepcopy(vdur)
532 vdur_copy["status"] = "BUILD"
533 vdur_copy["status-detailed"] = None
534 vdur_copy["ip-address"] = None
535 vdur_copy["_id"] = str(uuid4())
536 vdur_copy["count-index"] += count + 1
537 vdur_copy["id"] = "{}-{}".format(
538 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
539 )
540 vdur_copy.pop("vim_info", None)
541 for iface in vdur_copy["interfaces"]:
542 if iface.get("fixed-ip"):
543 iface["ip-address"] = self.increment_ip_mac(
544 iface["ip-address"], count + 1
545 )
546 else:
547 iface.pop("ip-address", None)
548 if iface.get("fixed-mac"):
549 iface["mac-address"] = self.increment_ip_mac(
550 iface["mac-address"], count + 1
551 )
552 else:
553 iface.pop("mac-address", None)
554 if db_vnfr["vdur"]:
555 iface.pop(
556 "mgmt_vnf", None
557 ) # only first vdu can be managment of vnf
558 db_vdu_push_list.append(vdur_copy)
559 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
560 if vdu_delete:
561 if len(db_vnfr["vdur"]) == 1:
562 # The scale will move to 0 instances
563 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
564 template_vdur = [db_vnfr["vdur"][0]]
565 for vdu_id, vdu_count in vdu_delete.items():
566 if mark_delete:
567 indexes_to_delete = [
568 iv[0]
569 for iv in enumerate(db_vnfr["vdur"])
570 if iv[1]["vdu-id-ref"] == vdu_id
571 ]
572 db_update.update(
573 {
574 "vdur.{}.status".format(i): "DELETING"
575 for i in indexes_to_delete[-vdu_count:]
576 }
577 )
578 else:
579 # it must be deleted one by one because common.db does not allow otherwise
580 vdus_to_delete = [
581 v
582 for v in reversed(db_vnfr["vdur"])
583 if v["vdu-id-ref"] == vdu_id
584 ]
585 for vdu in vdus_to_delete[:vdu_count]:
586 self.db.set_one(
587 "vnfrs",
588 {"_id": db_vnfr["_id"]},
589 None,
590 pull={"vdur": {"_id": vdu["_id"]}},
591 )
592 db_push = {}
593 if db_vdu_push_list:
594 db_push["vdur"] = db_vdu_push_list
595 if template_vdur:
596 db_push["vdur-template"] = template_vdur
597 if not db_push:
598 db_push = None
599 db_vnfr["vdur-template"] = template_vdur
600 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
601 # modify passed dictionary db_vnfr
602 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
603 db_vnfr["vdur"] = db_vnfr_["vdur"]
604
605 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
606 """
607 Updates database nsr with the RO info for the created vld
608 :param ns_update_nsr: dictionary to be filled with the updated info
609 :param db_nsr: content of db_nsr. This is also modified
610 :param nsr_desc_RO: nsr descriptor from RO
611 :return: Nothing, LcmException is raised on errors
612 """
613
614 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
615 for net_RO in get_iterable(nsr_desc_RO, "nets"):
616 if vld["id"] != net_RO.get("ns_net_osm_id"):
617 continue
618 vld["vim-id"] = net_RO.get("vim_net_id")
619 vld["name"] = net_RO.get("vim_name")
620 vld["status"] = net_RO.get("status")
621 vld["status-detailed"] = net_RO.get("error_msg")
622 ns_update_nsr["vld.{}".format(vld_index)] = vld
623 break
624 else:
625 raise LcmException(
626 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
627 )
628
629 def set_vnfr_at_error(self, db_vnfrs, error_text):
630 try:
631 for db_vnfr in db_vnfrs.values():
632 vnfr_update = {"status": "ERROR"}
633 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
634 if "status" not in vdur:
635 vdur["status"] = "ERROR"
636 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
637 if error_text:
638 vdur["status-detailed"] = str(error_text)
639 vnfr_update[
640 "vdur.{}.status-detailed".format(vdu_index)
641 ] = "ERROR"
642 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
643 except DbException as e:
644 self.logger.error("Cannot update vnf. {}".format(e))
645
646 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
647 """
648 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
649 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
650 :param nsr_desc_RO: nsr descriptor from RO
651 :return: Nothing, LcmException is raised on errors
652 """
653 for vnf_index, db_vnfr in db_vnfrs.items():
654 for vnf_RO in nsr_desc_RO["vnfs"]:
655 if vnf_RO["member_vnf_index"] != vnf_index:
656 continue
657 vnfr_update = {}
658 if vnf_RO.get("ip_address"):
659 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
660 "ip_address"
661 ].split(";")[0]
662 elif not db_vnfr.get("ip-address"):
663 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
664 raise LcmExceptionNoMgmtIP(
665 "ns member_vnf_index '{}' has no IP address".format(
666 vnf_index
667 )
668 )
669
670 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
671 vdur_RO_count_index = 0
672 if vdur.get("pdu-type"):
673 continue
674 for vdur_RO in get_iterable(vnf_RO, "vms"):
675 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
676 continue
677 if vdur["count-index"] != vdur_RO_count_index:
678 vdur_RO_count_index += 1
679 continue
680 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
681 if vdur_RO.get("ip_address"):
682 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
683 else:
684 vdur["ip-address"] = None
685 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
686 vdur["name"] = vdur_RO.get("vim_name")
687 vdur["status"] = vdur_RO.get("status")
688 vdur["status-detailed"] = vdur_RO.get("error_msg")
689 for ifacer in get_iterable(vdur, "interfaces"):
690 for interface_RO in get_iterable(vdur_RO, "interfaces"):
691 if ifacer["name"] == interface_RO.get("internal_name"):
692 ifacer["ip-address"] = interface_RO.get(
693 "ip_address"
694 )
695 ifacer["mac-address"] = interface_RO.get(
696 "mac_address"
697 )
698 break
699 else:
700 raise LcmException(
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
702 "from VIM info".format(
703 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
704 )
705 )
706 vnfr_update["vdur.{}".format(vdu_index)] = vdur
707 break
708 else:
709 raise LcmException(
710 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
711 "VIM info".format(
712 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
713 )
714 )
715
716 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
717 for net_RO in get_iterable(nsr_desc_RO, "nets"):
718 if vld["id"] != net_RO.get("vnf_net_osm_id"):
719 continue
720 vld["vim-id"] = net_RO.get("vim_net_id")
721 vld["name"] = net_RO.get("vim_name")
722 vld["status"] = net_RO.get("status")
723 vld["status-detailed"] = net_RO.get("error_msg")
724 vnfr_update["vld.{}".format(vld_index)] = vld
725 break
726 else:
727 raise LcmException(
728 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
729 vnf_index, vld["id"]
730 )
731 )
732
733 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
734 break
735
736 else:
737 raise LcmException(
738 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
739 vnf_index
740 )
741 )
742
743 def _get_ns_config_info(self, nsr_id):
744 """
745 Generates a mapping between vnf,vdu elements and the N2VC id
746 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
747 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
748 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
749 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
750 """
751 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
752 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
753 mapping = {}
754 ns_config_info = {"osm-config-mapping": mapping}
755 for vca in vca_deployed_list:
756 if not vca["member-vnf-index"]:
757 continue
758 if not vca["vdu_id"]:
759 mapping[vca["member-vnf-index"]] = vca["application"]
760 else:
761 mapping[
762 "{}.{}.{}".format(
763 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
764 )
765 ] = vca["application"]
766 return ns_config_info
767
768 async def _instantiate_ng_ro(
769 self,
770 logging_text,
771 nsr_id,
772 nsd,
773 db_nsr,
774 db_nslcmop,
775 db_vnfrs,
776 db_vnfds,
777 n2vc_key_list,
778 stage,
779 start_deploy,
780 timeout_ns_deploy,
781 ):
782
783 db_vims = {}
784
785 def get_vim_account(vim_account_id):
786 nonlocal db_vims
787 if vim_account_id in db_vims:
788 return db_vims[vim_account_id]
789 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
790 db_vims[vim_account_id] = db_vim
791 return db_vim
792
793 # modify target_vld info with instantiation parameters
794 def parse_vld_instantiation_params(
795 target_vim, target_vld, vld_params, target_sdn
796 ):
797 if vld_params.get("ip-profile"):
798 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
799 "ip-profile"
800 ]
801 if vld_params.get("provider-network"):
802 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
803 "provider-network"
804 ]
805 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
806 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
807 "provider-network"
808 ]["sdn-ports"]
809 if vld_params.get("wimAccountId"):
810 target_wim = "wim:{}".format(vld_params["wimAccountId"])
811 target_vld["vim_info"][target_wim] = {}
812 for param in ("vim-network-name", "vim-network-id"):
813 if vld_params.get(param):
814 if isinstance(vld_params[param], dict):
815 for vim, vim_net in vld_params[param].items():
816 other_target_vim = "vim:" + vim
817 populate_dict(
818 target_vld["vim_info"],
819 (other_target_vim, param.replace("-", "_")),
820 vim_net,
821 )
822 else: # isinstance str
823 target_vld["vim_info"][target_vim][
824 param.replace("-", "_")
825 ] = vld_params[param]
826 if vld_params.get("common_id"):
827 target_vld["common_id"] = vld_params.get("common_id")
828
829 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
830 def update_ns_vld_target(target, ns_params):
831 for vnf_params in ns_params.get("vnf", ()):
832 if vnf_params.get("vimAccountId"):
833 target_vnf = next(
834 (
835 vnfr
836 for vnfr in db_vnfrs.values()
837 if vnf_params["member-vnf-index"]
838 == vnfr["member-vnf-index-ref"]
839 ),
840 None,
841 )
842 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
843 for a_index, a_vld in enumerate(target["ns"]["vld"]):
844 target_vld = find_in_list(
845 get_iterable(vdur, "interfaces"),
846 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
847 )
848 if target_vld:
849 if vnf_params.get("vimAccountId") not in a_vld.get(
850 "vim_info", {}
851 ):
852 target["ns"]["vld"][a_index].get("vim_info").update(
853 {
854 "vim:{}".format(vnf_params["vimAccountId"]): {
855 "vim_network_name": ""
856 }
857 }
858 )
859
860 nslcmop_id = db_nslcmop["_id"]
861 target = {
862 "name": db_nsr["name"],
863 "ns": {"vld": []},
864 "vnf": [],
865 "image": deepcopy(db_nsr["image"]),
866 "flavor": deepcopy(db_nsr["flavor"]),
867 "action_id": nslcmop_id,
868 "cloud_init_content": {},
869 }
870 for image in target["image"]:
871 image["vim_info"] = {}
872 for flavor in target["flavor"]:
873 flavor["vim_info"] = {}
874 if db_nsr.get("affinity-or-anti-affinity-group"):
875 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
876 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
877 affinity_or_anti_affinity_group["vim_info"] = {}
878
879 if db_nslcmop.get("lcmOperationType") != "instantiate":
880 # get parameters of instantiation:
881 db_nslcmop_instantiate = self.db.get_list(
882 "nslcmops",
883 {
884 "nsInstanceId": db_nslcmop["nsInstanceId"],
885 "lcmOperationType": "instantiate",
886 },
887 )[-1]
888 ns_params = db_nslcmop_instantiate.get("operationParams")
889 else:
890 ns_params = db_nslcmop.get("operationParams")
891 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
892 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
893
894 cp2target = {}
895 for vld_index, vld in enumerate(db_nsr.get("vld")):
896 target_vim = "vim:{}".format(ns_params["vimAccountId"])
897 target_vld = {
898 "id": vld["id"],
899 "name": vld["name"],
900 "mgmt-network": vld.get("mgmt-network", False),
901 "type": vld.get("type"),
902 "vim_info": {
903 target_vim: {
904 "vim_network_name": vld.get("vim-network-name"),
905 "vim_account_id": ns_params["vimAccountId"],
906 }
907 },
908 }
909 # check if this network needs SDN assist
910 if vld.get("pci-interfaces"):
911 db_vim = get_vim_account(ns_params["vimAccountId"])
912 sdnc_id = db_vim["config"].get("sdn-controller")
913 if sdnc_id:
914 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
915 target_sdn = "sdn:{}".format(sdnc_id)
916 target_vld["vim_info"][target_sdn] = {
917 "sdn": True,
918 "target_vim": target_vim,
919 "vlds": [sdn_vld],
920 "type": vld.get("type"),
921 }
922
923 nsd_vnf_profiles = get_vnf_profiles(nsd)
924 for nsd_vnf_profile in nsd_vnf_profiles:
925 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
926 if cp["virtual-link-profile-id"] == vld["id"]:
927 cp2target[
928 "member_vnf:{}.{}".format(
929 cp["constituent-cpd-id"][0][
930 "constituent-base-element-id"
931 ],
932 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
933 )
934 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
935
936 # check at nsd descriptor, if there is an ip-profile
937 vld_params = {}
938 nsd_vlp = find_in_list(
939 get_virtual_link_profiles(nsd),
940 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
941 == vld["id"],
942 )
943 if (
944 nsd_vlp
945 and nsd_vlp.get("virtual-link-protocol-data")
946 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
947 ):
948 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
949 "l3-protocol-data"
950 ]
951 ip_profile_dest_data = {}
952 if "ip-version" in ip_profile_source_data:
953 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
954 "ip-version"
955 ]
956 if "cidr" in ip_profile_source_data:
957 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
958 "cidr"
959 ]
960 if "gateway-ip" in ip_profile_source_data:
961 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
962 "gateway-ip"
963 ]
964 if "dhcp-enabled" in ip_profile_source_data:
965 ip_profile_dest_data["dhcp-params"] = {
966 "enabled": ip_profile_source_data["dhcp-enabled"]
967 }
968 vld_params["ip-profile"] = ip_profile_dest_data
969
970 # update vld_params with instantiation params
971 vld_instantiation_params = find_in_list(
972 get_iterable(ns_params, "vld"),
973 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
974 )
975 if vld_instantiation_params:
976 vld_params.update(vld_instantiation_params)
977 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
978 target["ns"]["vld"].append(target_vld)
979 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
980 update_ns_vld_target(target, ns_params)
981
982 for vnfr in db_vnfrs.values():
983 vnfd = find_in_list(
984 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
985 )
986 vnf_params = find_in_list(
987 get_iterable(ns_params, "vnf"),
988 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
989 )
990 target_vnf = deepcopy(vnfr)
991 target_vim = "vim:{}".format(vnfr["vim-account-id"])
992 for vld in target_vnf.get("vld", ()):
993 # check if connected to a ns.vld, to fill target'
994 vnf_cp = find_in_list(
995 vnfd.get("int-virtual-link-desc", ()),
996 lambda cpd: cpd.get("id") == vld["id"],
997 )
998 if vnf_cp:
999 ns_cp = "member_vnf:{}.{}".format(
1000 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1001 )
1002 if cp2target.get(ns_cp):
1003 vld["target"] = cp2target[ns_cp]
1004
1005 vld["vim_info"] = {
1006 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1007 }
1008 # check if this network needs SDN assist
1009 target_sdn = None
1010 if vld.get("pci-interfaces"):
1011 db_vim = get_vim_account(vnfr["vim-account-id"])
1012 sdnc_id = db_vim["config"].get("sdn-controller")
1013 if sdnc_id:
1014 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1015 target_sdn = "sdn:{}".format(sdnc_id)
1016 vld["vim_info"][target_sdn] = {
1017 "sdn": True,
1018 "target_vim": target_vim,
1019 "vlds": [sdn_vld],
1020 "type": vld.get("type"),
1021 }
1022
1023 # check at vnfd descriptor, if there is an ip-profile
1024 vld_params = {}
1025 vnfd_vlp = find_in_list(
1026 get_virtual_link_profiles(vnfd),
1027 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1028 )
1029 if (
1030 vnfd_vlp
1031 and vnfd_vlp.get("virtual-link-protocol-data")
1032 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1033 ):
1034 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1035 "l3-protocol-data"
1036 ]
1037 ip_profile_dest_data = {}
1038 if "ip-version" in ip_profile_source_data:
1039 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1040 "ip-version"
1041 ]
1042 if "cidr" in ip_profile_source_data:
1043 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1044 "cidr"
1045 ]
1046 if "gateway-ip" in ip_profile_source_data:
1047 ip_profile_dest_data[
1048 "gateway-address"
1049 ] = ip_profile_source_data["gateway-ip"]
1050 if "dhcp-enabled" in ip_profile_source_data:
1051 ip_profile_dest_data["dhcp-params"] = {
1052 "enabled": ip_profile_source_data["dhcp-enabled"]
1053 }
1054
1055 vld_params["ip-profile"] = ip_profile_dest_data
1056 # update vld_params with instantiation params
1057 if vnf_params:
1058 vld_instantiation_params = find_in_list(
1059 get_iterable(vnf_params, "internal-vld"),
1060 lambda i_vld: i_vld["name"] == vld["id"],
1061 )
1062 if vld_instantiation_params:
1063 vld_params.update(vld_instantiation_params)
1064 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1065
1066 vdur_list = []
1067 for vdur in target_vnf.get("vdur", ()):
1068 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1069 continue # This vdu must not be created
1070 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1071
1072 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1073
1074 if ssh_keys_all:
1075 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1076 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1077 if (
1078 vdu_configuration
1079 and vdu_configuration.get("config-access")
1080 and vdu_configuration.get("config-access").get("ssh-access")
1081 ):
1082 vdur["ssh-keys"] = ssh_keys_all
1083 vdur["ssh-access-required"] = vdu_configuration[
1084 "config-access"
1085 ]["ssh-access"]["required"]
1086 elif (
1087 vnf_configuration
1088 and vnf_configuration.get("config-access")
1089 and vnf_configuration.get("config-access").get("ssh-access")
1090 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1091 ):
1092 vdur["ssh-keys"] = ssh_keys_all
1093 vdur["ssh-access-required"] = vnf_configuration[
1094 "config-access"
1095 ]["ssh-access"]["required"]
1096 elif ssh_keys_instantiation and find_in_list(
1097 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1098 ):
1099 vdur["ssh-keys"] = ssh_keys_instantiation
1100
1101 self.logger.debug("NS > vdur > {}".format(vdur))
1102
1103 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1104 # cloud-init
1105 if vdud.get("cloud-init-file"):
1106 vdur["cloud-init"] = "{}:file:{}".format(
1107 vnfd["_id"], vdud.get("cloud-init-file")
1108 )
1109 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1110 if vdur["cloud-init"] not in target["cloud_init_content"]:
1111 base_folder = vnfd["_admin"]["storage"]
1112 cloud_init_file = "{}/{}/cloud_init/{}".format(
1113 base_folder["folder"],
1114 base_folder["pkg-dir"],
1115 vdud.get("cloud-init-file"),
1116 )
1117 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1118 target["cloud_init_content"][
1119 vdur["cloud-init"]
1120 ] = ci_file.read()
1121 elif vdud.get("cloud-init"):
1122 vdur["cloud-init"] = "{}:vdu:{}".format(
1123 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1124 )
1125 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1126 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1127 "cloud-init"
1128 ]
1129 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1130 deploy_params_vdu = self._format_additional_params(
1131 vdur.get("additionalParams") or {}
1132 )
1133 deploy_params_vdu["OSM"] = get_osm_params(
1134 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1135 )
1136 vdur["additionalParams"] = deploy_params_vdu
1137
1138 # flavor
1139 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1140 if target_vim not in ns_flavor["vim_info"]:
1141 ns_flavor["vim_info"][target_vim] = {}
1142
1143 # deal with images
1144 # in case alternative images are provided we must check if they should be applied
1145 # for the vim_type, modify the vim_type taking into account
1146 ns_image_id = int(vdur["ns-image-id"])
1147 if vdur.get("alt-image-ids"):
1148 db_vim = get_vim_account(vnfr["vim-account-id"])
1149 vim_type = db_vim["vim_type"]
1150 for alt_image_id in vdur.get("alt-image-ids"):
1151 ns_alt_image = target["image"][int(alt_image_id)]
1152 if vim_type == ns_alt_image.get("vim-type"):
1153 # must use alternative image
1154 self.logger.debug(
1155 "use alternative image id: {}".format(alt_image_id)
1156 )
1157 ns_image_id = alt_image_id
1158 vdur["ns-image-id"] = ns_image_id
1159 break
1160 ns_image = target["image"][int(ns_image_id)]
1161 if target_vim not in ns_image["vim_info"]:
1162 ns_image["vim_info"][target_vim] = {}
1163
1164 # Affinity groups
1165 if vdur.get("affinity-or-anti-affinity-group-id"):
1166 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1167 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1168 if target_vim not in ns_ags["vim_info"]:
1169 ns_ags["vim_info"][target_vim] = {}
1170
1171 vdur["vim_info"] = {target_vim: {}}
1172 # instantiation parameters
1173 # if vnf_params:
1174 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1175 # vdud["id"]), None)
1176 vdur_list.append(vdur)
1177 target_vnf["vdur"] = vdur_list
1178 target["vnf"].append(target_vnf)
1179
1180 desc = await self.RO.deploy(nsr_id, target)
1181 self.logger.debug("RO return > {}".format(desc))
1182 action_id = desc["action_id"]
1183 await self._wait_ng_ro(
1184 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1185 )
1186
1187 # Updating NSR
1188 db_nsr_update = {
1189 "_admin.deployed.RO.operational-status": "running",
1190 "detailed-status": " ".join(stage),
1191 }
1192 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1193 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1194 self._write_op_status(nslcmop_id, stage)
1195 self.logger.debug(
1196 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1197 )
1198 return
1199
1200 async def _wait_ng_ro(
1201 self,
1202 nsr_id,
1203 action_id,
1204 nslcmop_id=None,
1205 start_time=None,
1206 timeout=600,
1207 stage=None,
1208 ):
1209 detailed_status_old = None
1210 db_nsr_update = {}
1211 start_time = start_time or time()
1212 while time() <= start_time + timeout:
1213 desc_status = await self.RO.status(nsr_id, action_id)
1214 self.logger.debug("Wait NG RO > {}".format(desc_status))
1215 if desc_status["status"] == "FAILED":
1216 raise NgRoException(desc_status["details"])
1217 elif desc_status["status"] == "BUILD":
1218 if stage:
1219 stage[2] = "VIM: ({})".format(desc_status["details"])
1220 elif desc_status["status"] == "DONE":
1221 if stage:
1222 stage[2] = "Deployed at VIM"
1223 break
1224 else:
1225 assert False, "ROclient.check_ns_status returns unknown {}".format(
1226 desc_status["status"]
1227 )
1228 if stage and nslcmop_id and stage[2] != detailed_status_old:
1229 detailed_status_old = stage[2]
1230 db_nsr_update["detailed-status"] = " ".join(stage)
1231 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1232 self._write_op_status(nslcmop_id, stage)
1233 await asyncio.sleep(15, loop=self.loop)
1234 else: # timeout_ns_deploy
1235 raise NgRoException("Timeout waiting ns to deploy")
1236
1237 async def _terminate_ng_ro(
1238 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1239 ):
1240 db_nsr_update = {}
1241 failed_detail = []
1242 action_id = None
1243 start_deploy = time()
1244 try:
1245 target = {
1246 "ns": {"vld": []},
1247 "vnf": [],
1248 "image": [],
1249 "flavor": [],
1250 "action_id": nslcmop_id,
1251 }
1252 desc = await self.RO.deploy(nsr_id, target)
1253 action_id = desc["action_id"]
1254 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1255 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1256 self.logger.debug(
1257 logging_text
1258 + "ns terminate action at RO. action_id={}".format(action_id)
1259 )
1260
1261 # wait until done
1262 delete_timeout = 20 * 60 # 20 minutes
1263 await self._wait_ng_ro(
1264 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1265 )
1266
1267 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1268 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1269 # delete all nsr
1270 await self.RO.delete(nsr_id)
1271 except Exception as e:
1272 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1273 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1274 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1275 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1276 self.logger.debug(
1277 logging_text + "RO_action_id={} already deleted".format(action_id)
1278 )
1279 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1280 failed_detail.append("delete conflict: {}".format(e))
1281 self.logger.debug(
1282 logging_text
1283 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1284 )
1285 else:
1286 failed_detail.append("delete error: {}".format(e))
1287 self.logger.error(
1288 logging_text
1289 + "RO_action_id={} delete error: {}".format(action_id, e)
1290 )
1291
1292 if failed_detail:
1293 stage[2] = "Error deleting from VIM"
1294 else:
1295 stage[2] = "Deleted from VIM"
1296 db_nsr_update["detailed-status"] = " ".join(stage)
1297 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1298 self._write_op_status(nslcmop_id, stage)
1299
1300 if failed_detail:
1301 raise LcmException("; ".join(failed_detail))
1302 return
1303
1304 async def instantiate_RO(
1305 self,
1306 logging_text,
1307 nsr_id,
1308 nsd,
1309 db_nsr,
1310 db_nslcmop,
1311 db_vnfrs,
1312 db_vnfds,
1313 n2vc_key_list,
1314 stage,
1315 ):
1316 """
1317 Instantiate at RO
1318 :param logging_text: preffix text to use at logging
1319 :param nsr_id: nsr identity
1320 :param nsd: database content of ns descriptor
1321 :param db_nsr: database content of ns record
1322 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1323 :param db_vnfrs:
1324 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1325 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1326 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1327 :return: None or exception
1328 """
1329 try:
1330 start_deploy = time()
1331 ns_params = db_nslcmop.get("operationParams")
1332 if ns_params and ns_params.get("timeout_ns_deploy"):
1333 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1334 else:
1335 timeout_ns_deploy = self.timeout.get(
1336 "ns_deploy", self.timeout_ns_deploy
1337 )
1338
1339 # Check for and optionally request placement optimization. Database will be updated if placement activated
1340 stage[2] = "Waiting for Placement."
1341 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1342 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1343 for vnfr in db_vnfrs.values():
1344 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1345 break
1346 else:
1347 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1348
1349 return await self._instantiate_ng_ro(
1350 logging_text,
1351 nsr_id,
1352 nsd,
1353 db_nsr,
1354 db_nslcmop,
1355 db_vnfrs,
1356 db_vnfds,
1357 n2vc_key_list,
1358 stage,
1359 start_deploy,
1360 timeout_ns_deploy,
1361 )
1362 except Exception as e:
1363 stage[2] = "ERROR deploying at VIM"
1364 self.set_vnfr_at_error(db_vnfrs, str(e))
1365 self.logger.error(
1366 "Error deploying at VIM {}".format(e),
1367 exc_info=not isinstance(
1368 e,
1369 (
1370 ROclient.ROClientException,
1371 LcmException,
1372 DbException,
1373 NgRoException,
1374 ),
1375 ),
1376 )
1377 raise
1378
1379 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1380 """
1381 Wait for kdu to be up, get ip address
1382 :param logging_text: prefix use for logging
1383 :param nsr_id:
1384 :param vnfr_id:
1385 :param kdu_name:
1386 :return: IP address
1387 """
1388
1389 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1390 nb_tries = 0
1391
1392 while nb_tries < 360:
1393 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1394 kdur = next(
1395 (
1396 x
1397 for x in get_iterable(db_vnfr, "kdur")
1398 if x.get("kdu-name") == kdu_name
1399 ),
1400 None,
1401 )
1402 if not kdur:
1403 raise LcmException(
1404 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1405 )
1406 if kdur.get("status"):
1407 if kdur["status"] in ("READY", "ENABLED"):
1408 return kdur.get("ip-address")
1409 else:
1410 raise LcmException(
1411 "target KDU={} is in error state".format(kdu_name)
1412 )
1413
1414 await asyncio.sleep(10, loop=self.loop)
1415 nb_tries += 1
1416 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1417
1418 async def wait_vm_up_insert_key_ro(
1419 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1420 ):
1421 """
1422 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1423 :param logging_text: prefix use for logging
1424 :param nsr_id:
1425 :param vnfr_id:
1426 :param vdu_id:
1427 :param vdu_index:
1428 :param pub_key: public ssh key to inject, None to skip
1429 :param user: user to apply the public ssh key
1430 :return: IP address
1431 """
1432
1433 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1434 ro_nsr_id = None
1435 ip_address = None
1436 nb_tries = 0
1437 target_vdu_id = None
1438 ro_retries = 0
1439
1440 while True:
1441
1442 ro_retries += 1
1443 if ro_retries >= 360: # 1 hour
1444 raise LcmException(
1445 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1446 )
1447
1448 await asyncio.sleep(10, loop=self.loop)
1449
1450 # get ip address
1451 if not target_vdu_id:
1452 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1453
1454 if not vdu_id: # for the VNF case
1455 if db_vnfr.get("status") == "ERROR":
1456 raise LcmException(
1457 "Cannot inject ssh-key because target VNF is in error state"
1458 )
1459 ip_address = db_vnfr.get("ip-address")
1460 if not ip_address:
1461 continue
1462 vdur = next(
1463 (
1464 x
1465 for x in get_iterable(db_vnfr, "vdur")
1466 if x.get("ip-address") == ip_address
1467 ),
1468 None,
1469 )
1470 else: # VDU case
1471 vdur = next(
1472 (
1473 x
1474 for x in get_iterable(db_vnfr, "vdur")
1475 if x.get("vdu-id-ref") == vdu_id
1476 and x.get("count-index") == vdu_index
1477 ),
1478 None,
1479 )
1480
1481 if (
1482 not vdur and len(db_vnfr.get("vdur", ())) == 1
1483 ): # If only one, this should be the target vdu
1484 vdur = db_vnfr["vdur"][0]
1485 if not vdur:
1486 raise LcmException(
1487 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1488 vnfr_id, vdu_id, vdu_index
1489 )
1490 )
1491 # New generation RO stores information at "vim_info"
1492 ng_ro_status = None
1493 target_vim = None
1494 if vdur.get("vim_info"):
1495 target_vim = next(
1496 t for t in vdur["vim_info"]
1497 ) # there should be only one key
1498 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1499 if (
1500 vdur.get("pdu-type")
1501 or vdur.get("status") == "ACTIVE"
1502 or ng_ro_status == "ACTIVE"
1503 ):
1504 ip_address = vdur.get("ip-address")
1505 if not ip_address:
1506 continue
1507 target_vdu_id = vdur["vdu-id-ref"]
1508 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1509 raise LcmException(
1510 "Cannot inject ssh-key because target VM is in error state"
1511 )
1512
1513 if not target_vdu_id:
1514 continue
1515
1516 # inject public key into machine
1517 if pub_key and user:
1518 self.logger.debug(logging_text + "Inserting RO key")
1519 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1520 if vdur.get("pdu-type"):
1521 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1522 return ip_address
1523 try:
1524 ro_vm_id = "{}-{}".format(
1525 db_vnfr["member-vnf-index-ref"], target_vdu_id
1526 ) # TODO add vdu_index
1527 if self.ng_ro:
1528 target = {
1529 "action": {
1530 "action": "inject_ssh_key",
1531 "key": pub_key,
1532 "user": user,
1533 },
1534 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1535 }
1536 desc = await self.RO.deploy(nsr_id, target)
1537 action_id = desc["action_id"]
1538 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1539 break
1540 else:
1541 # wait until NS is deployed at RO
1542 if not ro_nsr_id:
1543 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1544 ro_nsr_id = deep_get(
1545 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1546 )
1547 if not ro_nsr_id:
1548 continue
1549 result_dict = await self.RO.create_action(
1550 item="ns",
1551 item_id_name=ro_nsr_id,
1552 descriptor={
1553 "add_public_key": pub_key,
1554 "vms": [ro_vm_id],
1555 "user": user,
1556 },
1557 )
1558 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1559 if not result_dict or not isinstance(result_dict, dict):
1560 raise LcmException(
1561 "Unknown response from RO when injecting key"
1562 )
1563 for result in result_dict.values():
1564 if result.get("vim_result") == 200:
1565 break
1566 else:
1567 raise ROclient.ROClientException(
1568 "error injecting key: {}".format(
1569 result.get("description")
1570 )
1571 )
1572 break
1573 except NgRoException as e:
1574 raise LcmException(
1575 "Reaching max tries injecting key. Error: {}".format(e)
1576 )
1577 except ROclient.ROClientException as e:
1578 if not nb_tries:
1579 self.logger.debug(
1580 logging_text
1581 + "error injecting key: {}. Retrying until {} seconds".format(
1582 e, 20 * 10
1583 )
1584 )
1585 nb_tries += 1
1586 if nb_tries >= 20:
1587 raise LcmException(
1588 "Reaching max tries injecting key. Error: {}".format(e)
1589 )
1590 else:
1591 break
1592
1593 return ip_address
1594
1595 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1596 """
1597 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1598 """
1599 my_vca = vca_deployed_list[vca_index]
1600 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1601 # vdu or kdu: no dependencies
1602 return
1603 timeout = 300
1604 while timeout >= 0:
1605 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1606 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1607 configuration_status_list = db_nsr["configurationStatus"]
1608 for index, vca_deployed in enumerate(configuration_status_list):
1609 if index == vca_index:
1610 # myself
1611 continue
1612 if not my_vca.get("member-vnf-index") or (
1613 vca_deployed.get("member-vnf-index")
1614 == my_vca.get("member-vnf-index")
1615 ):
1616 internal_status = configuration_status_list[index].get("status")
1617 if internal_status == "READY":
1618 continue
1619 elif internal_status == "BROKEN":
1620 raise LcmException(
1621 "Configuration aborted because dependent charm/s has failed"
1622 )
1623 else:
1624 break
1625 else:
1626 # no dependencies, return
1627 return
1628 await asyncio.sleep(10)
1629 timeout -= 1
1630
1631 raise LcmException("Configuration aborted because dependent charm/s timeout")
1632
1633 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1634 vca_id = None
1635 if db_vnfr:
1636 vca_id = deep_get(db_vnfr, ("vca-id",))
1637 elif db_nsr:
1638 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1639 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1640 return vca_id
1641
1642 async def instantiate_N2VC(
1643 self,
1644 logging_text,
1645 vca_index,
1646 nsi_id,
1647 db_nsr,
1648 db_vnfr,
1649 vdu_id,
1650 kdu_name,
1651 vdu_index,
1652 config_descriptor,
1653 deploy_params,
1654 base_folder,
1655 nslcmop_id,
1656 stage,
1657 vca_type,
1658 vca_name,
1659 ee_config_descriptor,
1660 ):
1661 nsr_id = db_nsr["_id"]
1662 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1663 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1664 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1665 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1666 db_dict = {
1667 "collection": "nsrs",
1668 "filter": {"_id": nsr_id},
1669 "path": db_update_entry,
1670 }
1671 step = ""
1672 try:
1673
1674 element_type = "NS"
1675 element_under_configuration = nsr_id
1676
1677 vnfr_id = None
1678 if db_vnfr:
1679 vnfr_id = db_vnfr["_id"]
1680 osm_config["osm"]["vnf_id"] = vnfr_id
1681
1682 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1683
1684 if vca_type == "native_charm":
1685 index_number = 0
1686 else:
1687 index_number = vdu_index or 0
1688
1689 if vnfr_id:
1690 element_type = "VNF"
1691 element_under_configuration = vnfr_id
1692 namespace += ".{}-{}".format(vnfr_id, index_number)
1693 if vdu_id:
1694 namespace += ".{}-{}".format(vdu_id, index_number)
1695 element_type = "VDU"
1696 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1697 osm_config["osm"]["vdu_id"] = vdu_id
1698 elif kdu_name:
1699 namespace += ".{}".format(kdu_name)
1700 element_type = "KDU"
1701 element_under_configuration = kdu_name
1702 osm_config["osm"]["kdu_name"] = kdu_name
1703
1704 # Get artifact path
1705 artifact_path = "{}/{}/{}/{}".format(
1706 base_folder["folder"],
1707 base_folder["pkg-dir"],
1708 "charms"
1709 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1710 else "helm-charts",
1711 vca_name,
1712 )
1713
1714 self.logger.debug("Artifact path > {}".format(artifact_path))
1715
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list = config_descriptor.get(
1718 "initial-config-primitive"
1719 )
1720
1721 self.logger.debug(
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1724 )
1725 )
1726
1727 # add config if not present for NS charm
1728 ee_descriptor_id = ee_config_descriptor.get("id")
1729 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1730 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1732 )
1733
1734 self.logger.debug(
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1737 )
1738 )
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id = vca_deployed.get("ee_id")
1742
1743 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1744 # create or register execution environment in VCA
1745 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1746
1747 self._write_configuration_status(
1748 nsr_id=nsr_id,
1749 vca_index=vca_index,
1750 status="CREATING",
1751 element_under_configuration=element_under_configuration,
1752 element_type=element_type,
1753 )
1754
1755 step = "create execution environment"
1756 self.logger.debug(logging_text + step)
1757
1758 ee_id = None
1759 credentials = None
1760 if vca_type == "k8s_proxy_charm":
1761 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1762 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1763 namespace=namespace,
1764 artifact_path=artifact_path,
1765 db_dict=db_dict,
1766 vca_id=vca_id,
1767 )
1768 elif vca_type == "helm" or vca_type == "helm-v3":
1769 ee_id, credentials = await self.vca_map[
1770 vca_type
1771 ].create_execution_environment(
1772 namespace=namespace,
1773 reuse_ee_id=ee_id,
1774 db_dict=db_dict,
1775 config=osm_config,
1776 artifact_path=artifact_path,
1777 vca_type=vca_type,
1778 )
1779 else:
1780 ee_id, credentials = await self.vca_map[
1781 vca_type
1782 ].create_execution_environment(
1783 namespace=namespace,
1784 reuse_ee_id=ee_id,
1785 db_dict=db_dict,
1786 vca_id=vca_id,
1787 )
1788
1789 elif vca_type == "native_charm":
1790 step = "Waiting to VM being up and getting IP address"
1791 self.logger.debug(logging_text + step)
1792 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1793 logging_text,
1794 nsr_id,
1795 vnfr_id,
1796 vdu_id,
1797 vdu_index,
1798 user=None,
1799 pub_key=None,
1800 )
1801 credentials = {"hostname": rw_mgmt_ip}
1802 # get username
1803 username = deep_get(
1804 config_descriptor, ("config-access", "ssh-access", "default-user")
1805 )
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username and initial_config_primitive_list:
1809 for config_primitive in initial_config_primitive_list:
1810 for param in config_primitive.get("parameter", ()):
1811 if param["name"] == "ssh-username":
1812 username = param["value"]
1813 break
1814 if not username:
1815 raise LcmException(
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1818 )
1819 credentials["username"] = username
1820 # n2vc_redesign STEP 3.2
1821
1822 self._write_configuration_status(
1823 nsr_id=nsr_id,
1824 vca_index=vca_index,
1825 status="REGISTERING",
1826 element_under_configuration=element_under_configuration,
1827 element_type=element_type,
1828 )
1829
1830 step = "register execution environment {}".format(credentials)
1831 self.logger.debug(logging_text + step)
1832 ee_id = await self.vca_map[vca_type].register_execution_environment(
1833 credentials=credentials,
1834 namespace=namespace,
1835 db_dict=db_dict,
1836 vca_id=vca_id,
1837 )
1838
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts = ee_id.split(".")
1842 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1843 if len(ee_id_parts) >= 2:
1844 model_name = ee_id_parts[0]
1845 application_name = ee_id_parts[1]
1846 db_nsr_update[db_update_entry + "model"] = model_name
1847 db_nsr_update[db_update_entry + "application"] = application_name
1848
1849 # n2vc_redesign STEP 3.3
1850 step = "Install configuration Software"
1851
1852 self._write_configuration_status(
1853 nsr_id=nsr_id,
1854 vca_index=vca_index,
1855 status="INSTALLING SW",
1856 element_under_configuration=element_under_configuration,
1857 element_type=element_type,
1858 other_update=db_nsr_update,
1859 )
1860
1861 # TODO check if already done
1862 self.logger.debug(logging_text + step)
1863 config = None
1864 if vca_type == "native_charm":
1865 config_primitive = next(
1866 (p for p in initial_config_primitive_list if p["name"] == "config"),
1867 None,
1868 )
1869 if config_primitive:
1870 config = self._map_primitive_params(
1871 config_primitive, {}, deploy_params
1872 )
1873 num_units = 1
1874 if vca_type == "lxc_proxy_charm":
1875 if element_type == "NS":
1876 num_units = db_nsr.get("config-units") or 1
1877 elif element_type == "VNF":
1878 num_units = db_vnfr.get("config-units") or 1
1879 elif element_type == "VDU":
1880 for v in db_vnfr["vdur"]:
1881 if vdu_id == v["vdu-id-ref"]:
1882 num_units = v.get("config-units") or 1
1883 break
1884 if vca_type != "k8s_proxy_charm":
1885 await self.vca_map[vca_type].install_configuration_sw(
1886 ee_id=ee_id,
1887 artifact_path=artifact_path,
1888 db_dict=db_dict,
1889 config=config,
1890 num_units=num_units,
1891 vca_id=vca_id,
1892 vca_type=vca_type,
1893 )
1894
1895 # write in db flag of configuration_sw already installed
1896 self.update_db_2(
1897 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1898 )
1899
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self._add_vca_relations(
1902 logging_text=logging_text,
1903 nsr_id=nsr_id,
1904 vca_index=vca_index,
1905 vca_id=vca_id,
1906 vca_type=vca_type,
1907 )
1908
1909 # if SSH access is required, then get execution environment SSH public
1910 # if native charm we have waited already to VM be UP
1911 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1912 pub_key = None
1913 user = None
1914 # self.logger.debug("get ssh key block")
1915 if deep_get(
1916 config_descriptor, ("config-access", "ssh-access", "required")
1917 ):
1918 # self.logger.debug("ssh key needed")
1919 # Needed to inject a ssh key
1920 user = deep_get(
1921 config_descriptor,
1922 ("config-access", "ssh-access", "default-user"),
1923 )
1924 step = "Install configuration Software, getting public ssh key"
1925 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1926 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1927 )
1928
1929 step = "Insert public key into VM user={} ssh_key={}".format(
1930 user, pub_key
1931 )
1932 else:
1933 # self.logger.debug("no need to get ssh key")
1934 step = "Waiting to VM being up and getting IP address"
1935 self.logger.debug(logging_text + step)
1936
1937 # n2vc_redesign STEP 5.1
1938 # wait for RO (ip-address) Insert pub_key into VM
1939 if vnfr_id:
1940 if kdu_name:
1941 rw_mgmt_ip = await self.wait_kdu_up(
1942 logging_text, nsr_id, vnfr_id, kdu_name
1943 )
1944 else:
1945 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1946 logging_text,
1947 nsr_id,
1948 vnfr_id,
1949 vdu_id,
1950 vdu_index,
1951 user=user,
1952 pub_key=pub_key,
1953 )
1954 else:
1955 rw_mgmt_ip = None # This is for a NS configuration
1956
1957 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1958
1959 # store rw_mgmt_ip in deploy params for later replacement
1960 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1961
1962 # n2vc_redesign STEP 6 Execute initial config primitive
1963 step = "execute initial config primitive"
1964
1965 # wait for dependent primitives execution (NS -> VNF -> VDU)
1966 if initial_config_primitive_list:
1967 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1968
1969 # stage, in function of element type: vdu, kdu, vnf or ns
1970 my_vca = vca_deployed_list[vca_index]
1971 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1972 # VDU or KDU
1973 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1974 elif my_vca.get("member-vnf-index"):
1975 # VNF
1976 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1977 else:
1978 # NS
1979 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1980
1981 self._write_configuration_status(
1982 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1983 )
1984
1985 self._write_op_status(op_id=nslcmop_id, stage=stage)
1986
1987 check_if_terminated_needed = True
1988 for initial_config_primitive in initial_config_primitive_list:
1989 # adding information on the vca_deployed if it is a NS execution environment
1990 if not vca_deployed["member-vnf-index"]:
1991 deploy_params["ns_config_info"] = json.dumps(
1992 self._get_ns_config_info(nsr_id)
1993 )
1994 # TODO check if already done
1995 primitive_params_ = self._map_primitive_params(
1996 initial_config_primitive, {}, deploy_params
1997 )
1998
1999 step = "execute primitive '{}' params '{}'".format(
2000 initial_config_primitive["name"], primitive_params_
2001 )
2002 self.logger.debug(logging_text + step)
2003 await self.vca_map[vca_type].exec_primitive(
2004 ee_id=ee_id,
2005 primitive_name=initial_config_primitive["name"],
2006 params_dict=primitive_params_,
2007 db_dict=db_dict,
2008 vca_id=vca_id,
2009 vca_type=vca_type,
2010 )
2011 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2012 if check_if_terminated_needed:
2013 if config_descriptor.get("terminate-config-primitive"):
2014 self.update_db_2(
2015 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2016 )
2017 check_if_terminated_needed = False
2018
2019 # TODO register in database that primitive is done
2020
2021 # STEP 7 Configure metrics
2022 if vca_type == "helm" or vca_type == "helm-v3":
2023 prometheus_jobs = await self.add_prometheus_metrics(
2024 ee_id=ee_id,
2025 artifact_path=artifact_path,
2026 ee_config_descriptor=ee_config_descriptor,
2027 vnfr_id=vnfr_id,
2028 nsr_id=nsr_id,
2029 target_ip=rw_mgmt_ip,
2030 )
2031 if prometheus_jobs:
2032 self.update_db_2(
2033 "nsrs",
2034 nsr_id,
2035 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2036 )
2037
2038 step = "instantiated at VCA"
2039 self.logger.debug(logging_text + step)
2040
2041 self._write_configuration_status(
2042 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2043 )
2044
2045 except Exception as e: # TODO not use Exception but N2VC exception
2046 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2047 if not isinstance(
2048 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2049 ):
2050 self.logger.error(
2051 "Exception while {} : {}".format(step, e), exc_info=True
2052 )
2053 self._write_configuration_status(
2054 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2055 )
2056 raise LcmException("{} {}".format(step, e)) from e
2057
2058 def _write_ns_status(
2059 self,
2060 nsr_id: str,
2061 ns_state: str,
2062 current_operation: str,
2063 current_operation_id: str,
2064 error_description: str = None,
2065 error_detail: str = None,
2066 other_update: dict = None,
2067 ):
2068 """
2069 Update db_nsr fields.
2070 :param nsr_id:
2071 :param ns_state:
2072 :param current_operation:
2073 :param current_operation_id:
2074 :param error_description:
2075 :param error_detail:
2076 :param other_update: Other required changes at database if provided, will be cleared
2077 :return:
2078 """
2079 try:
2080 db_dict = other_update or {}
2081 db_dict[
2082 "_admin.nslcmop"
2083 ] = current_operation_id # for backward compatibility
2084 db_dict["_admin.current-operation"] = current_operation_id
2085 db_dict["_admin.operation-type"] = (
2086 current_operation if current_operation != "IDLE" else None
2087 )
2088 db_dict["currentOperation"] = current_operation
2089 db_dict["currentOperationID"] = current_operation_id
2090 db_dict["errorDescription"] = error_description
2091 db_dict["errorDetail"] = error_detail
2092
2093 if ns_state:
2094 db_dict["nsState"] = ns_state
2095 self.update_db_2("nsrs", nsr_id, db_dict)
2096 except DbException as e:
2097 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2098
2099 def _write_op_status(
2100 self,
2101 op_id: str,
2102 stage: list = None,
2103 error_message: str = None,
2104 queuePosition: int = 0,
2105 operation_state: str = None,
2106 other_update: dict = None,
2107 ):
2108 try:
2109 db_dict = other_update or {}
2110 db_dict["queuePosition"] = queuePosition
2111 if isinstance(stage, list):
2112 db_dict["stage"] = stage[0]
2113 db_dict["detailed-status"] = " ".join(stage)
2114 elif stage is not None:
2115 db_dict["stage"] = str(stage)
2116
2117 if error_message is not None:
2118 db_dict["errorMessage"] = error_message
2119 if operation_state is not None:
2120 db_dict["operationState"] = operation_state
2121 db_dict["statusEnteredTime"] = time()
2122 self.update_db_2("nslcmops", op_id, db_dict)
2123 except DbException as e:
2124 self.logger.warn(
2125 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2126 )
2127
2128 def _write_all_config_status(self, db_nsr: dict, status: str):
2129 try:
2130 nsr_id = db_nsr["_id"]
2131 # configurationStatus
2132 config_status = db_nsr.get("configurationStatus")
2133 if config_status:
2134 db_nsr_update = {
2135 "configurationStatus.{}.status".format(index): status
2136 for index, v in enumerate(config_status)
2137 if v
2138 }
2139 # update status
2140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2141
2142 except DbException as e:
2143 self.logger.warn(
2144 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2145 )
2146
2147 def _write_configuration_status(
2148 self,
2149 nsr_id: str,
2150 vca_index: int,
2151 status: str = None,
2152 element_under_configuration: str = None,
2153 element_type: str = None,
2154 other_update: dict = None,
2155 ):
2156
2157 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2158 # .format(vca_index, status))
2159
2160 try:
2161 db_path = "configurationStatus.{}.".format(vca_index)
2162 db_dict = other_update or {}
2163 if status:
2164 db_dict[db_path + "status"] = status
2165 if element_under_configuration:
2166 db_dict[
2167 db_path + "elementUnderConfiguration"
2168 ] = element_under_configuration
2169 if element_type:
2170 db_dict[db_path + "elementType"] = element_type
2171 self.update_db_2("nsrs", nsr_id, db_dict)
2172 except DbException as e:
2173 self.logger.warn(
2174 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2175 status, nsr_id, vca_index, e
2176 )
2177 )
2178
2179 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2180 """
2181 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2182 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2183 Database is used because the result can be obtained from a different LCM worker in case of HA.
2184 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2185 :param db_nslcmop: database content of nslcmop
2186 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2187 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2188 computed 'vim-account-id'
2189 """
2190 modified = False
2191 nslcmop_id = db_nslcmop["_id"]
2192 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2193 if placement_engine == "PLA":
2194 self.logger.debug(
2195 logging_text + "Invoke and wait for placement optimization"
2196 )
2197 await self.msg.aiowrite(
2198 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2199 )
2200 db_poll_interval = 5
2201 wait = db_poll_interval * 10
2202 pla_result = None
2203 while not pla_result and wait >= 0:
2204 await asyncio.sleep(db_poll_interval)
2205 wait -= db_poll_interval
2206 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2207 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2208
2209 if not pla_result:
2210 raise LcmException(
2211 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2212 )
2213
2214 for pla_vnf in pla_result["vnf"]:
2215 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2216 if not pla_vnf.get("vimAccountId") or not vnfr:
2217 continue
2218 modified = True
2219 self.db.set_one(
2220 "vnfrs",
2221 {"_id": vnfr["_id"]},
2222 {"vim-account-id": pla_vnf["vimAccountId"]},
2223 )
2224 # Modifies db_vnfrs
2225 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2226 return modified
2227
2228 def update_nsrs_with_pla_result(self, params):
2229 try:
2230 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2231 self.update_db_2(
2232 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2233 )
2234 except Exception as e:
2235 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2236
2237 async def instantiate(self, nsr_id, nslcmop_id):
2238 """
2239
2240 :param nsr_id: ns instance to deploy
2241 :param nslcmop_id: operation to run
2242 :return:
2243 """
2244
2245 # Try to lock HA task here
2246 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2247 if not task_is_locked_by_me:
2248 self.logger.debug(
2249 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2250 )
2251 return
2252
2253 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2254 self.logger.debug(logging_text + "Enter")
2255
2256 # get all needed from database
2257
2258 # database nsrs record
2259 db_nsr = None
2260
2261 # database nslcmops record
2262 db_nslcmop = None
2263
2264 # update operation on nsrs
2265 db_nsr_update = {}
2266 # update operation on nslcmops
2267 db_nslcmop_update = {}
2268
2269 nslcmop_operation_state = None
2270 db_vnfrs = {} # vnf's info indexed by member-index
2271 # n2vc_info = {}
2272 tasks_dict_info = {} # from task to info text
2273 exc = None
2274 error_list = []
2275 stage = [
2276 "Stage 1/5: preparation of the environment.",
2277 "Waiting for previous operations to terminate.",
2278 "",
2279 ]
2280 # ^ stage, step, VIM progress
2281 try:
2282 # wait for any previous tasks in process
2283 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2284
2285 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2286 stage[1] = "Reading from database."
2287 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2288 db_nsr_update["detailed-status"] = "creating"
2289 db_nsr_update["operational-status"] = "init"
2290 self._write_ns_status(
2291 nsr_id=nsr_id,
2292 ns_state="BUILDING",
2293 current_operation="INSTANTIATING",
2294 current_operation_id=nslcmop_id,
2295 other_update=db_nsr_update,
2296 )
2297 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2298
2299 # read from db: operation
2300 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2301 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2302 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2303 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2304 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2305 )
2306 ns_params = db_nslcmop.get("operationParams")
2307 if ns_params and ns_params.get("timeout_ns_deploy"):
2308 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2309 else:
2310 timeout_ns_deploy = self.timeout.get(
2311 "ns_deploy", self.timeout_ns_deploy
2312 )
2313
2314 # read from db: ns
2315 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2316 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2317 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2318 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2319 self.fs.sync(db_nsr["nsd-id"])
2320 db_nsr["nsd"] = nsd
2321 # nsr_name = db_nsr["name"] # TODO short-name??
2322
2323 # read from db: vnf's of this ns
2324 stage[1] = "Getting vnfrs from db."
2325 self.logger.debug(logging_text + stage[1])
2326 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2327
2328 # read from db: vnfd's for every vnf
2329 db_vnfds = [] # every vnfd data
2330
2331 # for each vnf in ns, read vnfd
2332 for vnfr in db_vnfrs_list:
2333 if vnfr.get("kdur"):
2334 kdur_list = []
2335 for kdur in vnfr["kdur"]:
2336 if kdur.get("additionalParams"):
2337 kdur["additionalParams"] = json.loads(
2338 kdur["additionalParams"]
2339 )
2340 kdur_list.append(kdur)
2341 vnfr["kdur"] = kdur_list
2342
2343 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2344 vnfd_id = vnfr["vnfd-id"]
2345 vnfd_ref = vnfr["vnfd-ref"]
2346 self.fs.sync(vnfd_id)
2347
2348 # if we haven't this vnfd, read it from db
2349 if vnfd_id not in db_vnfds:
2350 # read from db
2351 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2352 vnfd_id, vnfd_ref
2353 )
2354 self.logger.debug(logging_text + stage[1])
2355 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2356
2357 # store vnfd
2358 db_vnfds.append(vnfd)
2359
2360 # Get or generates the _admin.deployed.VCA list
2361 vca_deployed_list = None
2362 if db_nsr["_admin"].get("deployed"):
2363 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2364 if vca_deployed_list is None:
2365 vca_deployed_list = []
2366 configuration_status_list = []
2367 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2368 db_nsr_update["configurationStatus"] = configuration_status_list
2369 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2370 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2371 elif isinstance(vca_deployed_list, dict):
2372 # maintain backward compatibility. Change a dict to list at database
2373 vca_deployed_list = list(vca_deployed_list.values())
2374 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2375 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2376
2377 if not isinstance(
2378 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2379 ):
2380 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2381 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2382
2383 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2384 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2385 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2386 self.db.set_list(
2387 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2388 )
2389
2390 # n2vc_redesign STEP 2 Deploy Network Scenario
2391 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2392 self._write_op_status(op_id=nslcmop_id, stage=stage)
2393
2394 stage[1] = "Deploying KDUs."
2395 # self.logger.debug(logging_text + "Before deploy_kdus")
2396 # Call to deploy_kdus in case exists the "vdu:kdu" param
2397 await self.deploy_kdus(
2398 logging_text=logging_text,
2399 nsr_id=nsr_id,
2400 nslcmop_id=nslcmop_id,
2401 db_vnfrs=db_vnfrs,
2402 db_vnfds=db_vnfds,
2403 task_instantiation_info=tasks_dict_info,
2404 )
2405
2406 stage[1] = "Getting VCA public key."
2407 # n2vc_redesign STEP 1 Get VCA public ssh-key
2408 # feature 1429. Add n2vc public key to needed VMs
2409 n2vc_key = self.n2vc.get_public_key()
2410 n2vc_key_list = [n2vc_key]
2411 if self.vca_config.get("public_key"):
2412 n2vc_key_list.append(self.vca_config["public_key"])
2413
2414 stage[1] = "Deploying NS at VIM."
2415 task_ro = asyncio.ensure_future(
2416 self.instantiate_RO(
2417 logging_text=logging_text,
2418 nsr_id=nsr_id,
2419 nsd=nsd,
2420 db_nsr=db_nsr,
2421 db_nslcmop=db_nslcmop,
2422 db_vnfrs=db_vnfrs,
2423 db_vnfds=db_vnfds,
2424 n2vc_key_list=n2vc_key_list,
2425 stage=stage,
2426 )
2427 )
2428 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2429 tasks_dict_info[task_ro] = "Deploying at VIM"
2430
2431 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2432 stage[1] = "Deploying Execution Environments."
2433 self.logger.debug(logging_text + stage[1])
2434
2435 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2436 for vnf_profile in get_vnf_profiles(nsd):
2437 vnfd_id = vnf_profile["vnfd-id"]
2438 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2439 member_vnf_index = str(vnf_profile["id"])
2440 db_vnfr = db_vnfrs[member_vnf_index]
2441 base_folder = vnfd["_admin"]["storage"]
2442 vdu_id = None
2443 vdu_index = 0
2444 vdu_name = None
2445 kdu_name = None
2446
2447 # Get additional parameters
2448 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2449 if db_vnfr.get("additionalParamsForVnf"):
2450 deploy_params.update(
2451 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2452 )
2453
2454 descriptor_config = get_configuration(vnfd, vnfd["id"])
2455 if descriptor_config:
2456 self._deploy_n2vc(
2457 logging_text=logging_text
2458 + "member_vnf_index={} ".format(member_vnf_index),
2459 db_nsr=db_nsr,
2460 db_vnfr=db_vnfr,
2461 nslcmop_id=nslcmop_id,
2462 nsr_id=nsr_id,
2463 nsi_id=nsi_id,
2464 vnfd_id=vnfd_id,
2465 vdu_id=vdu_id,
2466 kdu_name=kdu_name,
2467 member_vnf_index=member_vnf_index,
2468 vdu_index=vdu_index,
2469 vdu_name=vdu_name,
2470 deploy_params=deploy_params,
2471 descriptor_config=descriptor_config,
2472 base_folder=base_folder,
2473 task_instantiation_info=tasks_dict_info,
2474 stage=stage,
2475 )
2476
2477 # Deploy charms for each VDU that supports one.
2478 for vdud in get_vdu_list(vnfd):
2479 vdu_id = vdud["id"]
2480 descriptor_config = get_configuration(vnfd, vdu_id)
2481 vdur = find_in_list(
2482 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2483 )
2484
2485 if vdur.get("additionalParams"):
2486 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2487 else:
2488 deploy_params_vdu = deploy_params
2489 deploy_params_vdu["OSM"] = get_osm_params(
2490 db_vnfr, vdu_id, vdu_count_index=0
2491 )
2492 vdud_count = get_number_of_instances(vnfd, vdu_id)
2493
2494 self.logger.debug("VDUD > {}".format(vdud))
2495 self.logger.debug(
2496 "Descriptor config > {}".format(descriptor_config)
2497 )
2498 if descriptor_config:
2499 vdu_name = None
2500 kdu_name = None
2501 for vdu_index in range(vdud_count):
2502 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2503 self._deploy_n2vc(
2504 logging_text=logging_text
2505 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2506 member_vnf_index, vdu_id, vdu_index
2507 ),
2508 db_nsr=db_nsr,
2509 db_vnfr=db_vnfr,
2510 nslcmop_id=nslcmop_id,
2511 nsr_id=nsr_id,
2512 nsi_id=nsi_id,
2513 vnfd_id=vnfd_id,
2514 vdu_id=vdu_id,
2515 kdu_name=kdu_name,
2516 member_vnf_index=member_vnf_index,
2517 vdu_index=vdu_index,
2518 vdu_name=vdu_name,
2519 deploy_params=deploy_params_vdu,
2520 descriptor_config=descriptor_config,
2521 base_folder=base_folder,
2522 task_instantiation_info=tasks_dict_info,
2523 stage=stage,
2524 )
2525 for kdud in get_kdu_list(vnfd):
2526 kdu_name = kdud["name"]
2527 descriptor_config = get_configuration(vnfd, kdu_name)
2528 if descriptor_config:
2529 vdu_id = None
2530 vdu_index = 0
2531 vdu_name = None
2532 kdur = next(
2533 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2534 )
2535 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2536 if kdur.get("additionalParams"):
2537 deploy_params_kdu.update(
2538 parse_yaml_strings(kdur["additionalParams"].copy())
2539 )
2540
2541 self._deploy_n2vc(
2542 logging_text=logging_text,
2543 db_nsr=db_nsr,
2544 db_vnfr=db_vnfr,
2545 nslcmop_id=nslcmop_id,
2546 nsr_id=nsr_id,
2547 nsi_id=nsi_id,
2548 vnfd_id=vnfd_id,
2549 vdu_id=vdu_id,
2550 kdu_name=kdu_name,
2551 member_vnf_index=member_vnf_index,
2552 vdu_index=vdu_index,
2553 vdu_name=vdu_name,
2554 deploy_params=deploy_params_kdu,
2555 descriptor_config=descriptor_config,
2556 base_folder=base_folder,
2557 task_instantiation_info=tasks_dict_info,
2558 stage=stage,
2559 )
2560
2561 # Check if this NS has a charm configuration
2562 descriptor_config = nsd.get("ns-configuration")
2563 if descriptor_config and descriptor_config.get("juju"):
2564 vnfd_id = None
2565 db_vnfr = None
2566 member_vnf_index = None
2567 vdu_id = None
2568 kdu_name = None
2569 vdu_index = 0
2570 vdu_name = None
2571
2572 # Get additional parameters
2573 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2574 if db_nsr.get("additionalParamsForNs"):
2575 deploy_params.update(
2576 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2577 )
2578 base_folder = nsd["_admin"]["storage"]
2579 self._deploy_n2vc(
2580 logging_text=logging_text,
2581 db_nsr=db_nsr,
2582 db_vnfr=db_vnfr,
2583 nslcmop_id=nslcmop_id,
2584 nsr_id=nsr_id,
2585 nsi_id=nsi_id,
2586 vnfd_id=vnfd_id,
2587 vdu_id=vdu_id,
2588 kdu_name=kdu_name,
2589 member_vnf_index=member_vnf_index,
2590 vdu_index=vdu_index,
2591 vdu_name=vdu_name,
2592 deploy_params=deploy_params,
2593 descriptor_config=descriptor_config,
2594 base_folder=base_folder,
2595 task_instantiation_info=tasks_dict_info,
2596 stage=stage,
2597 )
2598
2599 # rest of staff will be done at finally
2600
2601 except (
2602 ROclient.ROClientException,
2603 DbException,
2604 LcmException,
2605 N2VCException,
2606 ) as e:
2607 self.logger.error(
2608 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2609 )
2610 exc = e
2611 except asyncio.CancelledError:
2612 self.logger.error(
2613 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2614 )
2615 exc = "Operation was cancelled"
2616 except Exception as e:
2617 exc = traceback.format_exc()
2618 self.logger.critical(
2619 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2620 exc_info=True,
2621 )
2622 finally:
2623 if exc:
2624 error_list.append(str(exc))
2625 try:
2626 # wait for pending tasks
2627 if tasks_dict_info:
2628 stage[1] = "Waiting for instantiate pending tasks."
2629 self.logger.debug(logging_text + stage[1])
2630 error_list += await self._wait_for_tasks(
2631 logging_text,
2632 tasks_dict_info,
2633 timeout_ns_deploy,
2634 stage,
2635 nslcmop_id,
2636 nsr_id=nsr_id,
2637 )
2638 stage[1] = stage[2] = ""
2639 except asyncio.CancelledError:
2640 error_list.append("Cancelled")
2641 # TODO cancel all tasks
2642 except Exception as exc:
2643 error_list.append(str(exc))
2644
2645 # update operation-status
2646 db_nsr_update["operational-status"] = "running"
2647 # let's begin with VCA 'configured' status (later we can change it)
2648 db_nsr_update["config-status"] = "configured"
2649 for task, task_name in tasks_dict_info.items():
2650 if not task.done() or task.cancelled() or task.exception():
2651 if task_name.startswith(self.task_name_deploy_vca):
2652 # A N2VC task is pending
2653 db_nsr_update["config-status"] = "failed"
2654 else:
2655 # RO or KDU task is pending
2656 db_nsr_update["operational-status"] = "failed"
2657
2658 # update status at database
2659 if error_list:
2660 error_detail = ". ".join(error_list)
2661 self.logger.error(logging_text + error_detail)
2662 error_description_nslcmop = "{} Detail: {}".format(
2663 stage[0], error_detail
2664 )
2665 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2666 nslcmop_id, stage[0]
2667 )
2668
2669 db_nsr_update["detailed-status"] = (
2670 error_description_nsr + " Detail: " + error_detail
2671 )
2672 db_nslcmop_update["detailed-status"] = error_detail
2673 nslcmop_operation_state = "FAILED"
2674 ns_state = "BROKEN"
2675 else:
2676 error_detail = None
2677 error_description_nsr = error_description_nslcmop = None
2678 ns_state = "READY"
2679 db_nsr_update["detailed-status"] = "Done"
2680 db_nslcmop_update["detailed-status"] = "Done"
2681 nslcmop_operation_state = "COMPLETED"
2682
2683 if db_nsr:
2684 self._write_ns_status(
2685 nsr_id=nsr_id,
2686 ns_state=ns_state,
2687 current_operation="IDLE",
2688 current_operation_id=None,
2689 error_description=error_description_nsr,
2690 error_detail=error_detail,
2691 other_update=db_nsr_update,
2692 )
2693 self._write_op_status(
2694 op_id=nslcmop_id,
2695 stage="",
2696 error_message=error_description_nslcmop,
2697 operation_state=nslcmop_operation_state,
2698 other_update=db_nslcmop_update,
2699 )
2700
2701 if nslcmop_operation_state:
2702 try:
2703 await self.msg.aiowrite(
2704 "ns",
2705 "instantiated",
2706 {
2707 "nsr_id": nsr_id,
2708 "nslcmop_id": nslcmop_id,
2709 "operationState": nslcmop_operation_state,
2710 },
2711 loop=self.loop,
2712 )
2713 except Exception as e:
2714 self.logger.error(
2715 logging_text + "kafka_write notification Exception {}".format(e)
2716 )
2717
2718 self.logger.debug(logging_text + "Exit")
2719 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2720
2721 async def _add_vca_relations(
2722 self,
2723 logging_text,
2724 nsr_id,
2725 vca_index: int,
2726 timeout: int = 3600,
2727 vca_type: str = None,
2728 vca_id: str = None,
2729 ) -> bool:
2730
2731 # steps:
2732 # 1. find all relations for this VCA
2733 # 2. wait for other peers related
2734 # 3. add relations
2735
2736 try:
2737 vca_type = vca_type or "lxc_proxy_charm"
2738
2739 # STEP 1: find all relations for this VCA
2740
2741 # read nsr record
2742 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2743 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2744
2745 # this VCA data
2746 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2747
2748 # read all ns-configuration relations
2749 ns_relations = list()
2750 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2751 if db_ns_relations:
2752 for r in db_ns_relations:
2753 # check if this VCA is in the relation
2754 if my_vca.get("member-vnf-index") in (
2755 r.get("entities")[0].get("id"),
2756 r.get("entities")[1].get("id"),
2757 ):
2758 ns_relations.append(r)
2759
2760 # read all vnf-configuration relations
2761 vnf_relations = list()
2762 db_vnfd_list = db_nsr.get("vnfd-id")
2763 if db_vnfd_list:
2764 for vnfd in db_vnfd_list:
2765 db_vnf_relations = None
2766 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2767 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2768 if db_vnf_configuration:
2769 db_vnf_relations = db_vnf_configuration.get("relation", [])
2770 if db_vnf_relations:
2771 for r in db_vnf_relations:
2772 # check if this VCA is in the relation
2773 if my_vca.get("vdu_id") in (
2774 r.get("entities")[0].get("id"),
2775 r.get("entities")[1].get("id"),
2776 ):
2777 vnf_relations.append(r)
2778
2779 # if no relations, terminate
2780 if not ns_relations and not vnf_relations:
2781 self.logger.debug(logging_text + " No relations")
2782 return True
2783
2784 self.logger.debug(
2785 logging_text
2786 + " adding relations\n {}\n {}".format(
2787 ns_relations, vnf_relations
2788 )
2789 )
2790
2791 # add all relations
2792 start = time()
2793 while True:
2794 # check timeout
2795 now = time()
2796 if now - start >= timeout:
2797 self.logger.error(logging_text + " : timeout adding relations")
2798 return False
2799
2800 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2801 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2802
2803 # for each defined NS relation, find the VCA's related
2804 for r in ns_relations.copy():
2805 from_vca_ee_id = None
2806 to_vca_ee_id = None
2807 from_vca_endpoint = None
2808 to_vca_endpoint = None
2809 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2810 for vca in vca_list:
2811 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2812 "id"
2813 ) and vca.get("config_sw_installed"):
2814 from_vca_ee_id = vca.get("ee_id")
2815 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2816 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2817 "id"
2818 ) and vca.get("config_sw_installed"):
2819 to_vca_ee_id = vca.get("ee_id")
2820 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2821 if from_vca_ee_id and to_vca_ee_id:
2822 # add relation
2823 await self.vca_map[vca_type].add_relation(
2824 ee_id_1=from_vca_ee_id,
2825 ee_id_2=to_vca_ee_id,
2826 endpoint_1=from_vca_endpoint,
2827 endpoint_2=to_vca_endpoint,
2828 vca_id=vca_id,
2829 )
2830 # remove entry from relations list
2831 ns_relations.remove(r)
2832 else:
2833 # check failed peers
2834 try:
2835 vca_status_list = db_nsr.get("configurationStatus")
2836 if vca_status_list:
2837 for i in range(len(vca_list)):
2838 vca = vca_list[i]
2839 vca_status = vca_status_list[i]
2840 if vca.get("member-vnf-index") == r.get("entities")[
2841 0
2842 ].get("id"):
2843 if vca_status.get("status") == "BROKEN":
2844 # peer broken: remove relation from list
2845 ns_relations.remove(r)
2846 if vca.get("member-vnf-index") == r.get("entities")[
2847 1
2848 ].get("id"):
2849 if vca_status.get("status") == "BROKEN":
2850 # peer broken: remove relation from list
2851 ns_relations.remove(r)
2852 except Exception:
2853 # ignore
2854 pass
2855
2856 # for each defined VNF relation, find the VCA's related
2857 for r in vnf_relations.copy():
2858 from_vca_ee_id = None
2859 to_vca_ee_id = None
2860 from_vca_endpoint = None
2861 to_vca_endpoint = None
2862 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2863 for vca in vca_list:
2864 key_to_check = "vdu_id"
2865 if vca.get("vdu_id") is None:
2866 key_to_check = "vnfd_id"
2867 if vca.get(key_to_check) == r.get("entities")[0].get(
2868 "id"
2869 ) and vca.get("config_sw_installed"):
2870 from_vca_ee_id = vca.get("ee_id")
2871 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2872 if vca.get(key_to_check) == r.get("entities")[1].get(
2873 "id"
2874 ) and vca.get("config_sw_installed"):
2875 to_vca_ee_id = vca.get("ee_id")
2876 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2877 if from_vca_ee_id and to_vca_ee_id:
2878 # add relation
2879 await self.vca_map[vca_type].add_relation(
2880 ee_id_1=from_vca_ee_id,
2881 ee_id_2=to_vca_ee_id,
2882 endpoint_1=from_vca_endpoint,
2883 endpoint_2=to_vca_endpoint,
2884 vca_id=vca_id,
2885 )
2886 # remove entry from relations list
2887 vnf_relations.remove(r)
2888 else:
2889 # check failed peers
2890 try:
2891 vca_status_list = db_nsr.get("configurationStatus")
2892 if vca_status_list:
2893 for i in range(len(vca_list)):
2894 vca = vca_list[i]
2895 vca_status = vca_status_list[i]
2896 if vca.get("vdu_id") == r.get("entities")[0].get(
2897 "id"
2898 ):
2899 if vca_status.get("status") == "BROKEN":
2900 # peer broken: remove relation from list
2901 vnf_relations.remove(r)
2902 if vca.get("vdu_id") == r.get("entities")[1].get(
2903 "id"
2904 ):
2905 if vca_status.get("status") == "BROKEN":
2906 # peer broken: remove relation from list
2907 vnf_relations.remove(r)
2908 except Exception:
2909 # ignore
2910 pass
2911
2912 # wait for next try
2913 await asyncio.sleep(5.0)
2914
2915 if not ns_relations and not vnf_relations:
2916 self.logger.debug("Relations added")
2917 break
2918
2919 return True
2920
2921 except Exception as e:
2922 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2923 return False
2924
2925 async def _install_kdu(
2926 self,
2927 nsr_id: str,
2928 nsr_db_path: str,
2929 vnfr_data: dict,
2930 kdu_index: int,
2931 kdud: dict,
2932 vnfd: dict,
2933 k8s_instance_info: dict,
2934 k8params: dict = None,
2935 timeout: int = 600,
2936 vca_id: str = None,
2937 ):
2938
2939 try:
2940 k8sclustertype = k8s_instance_info["k8scluster-type"]
2941 # Instantiate kdu
2942 db_dict_install = {
2943 "collection": "nsrs",
2944 "filter": {"_id": nsr_id},
2945 "path": nsr_db_path,
2946 }
2947
2948 if k8s_instance_info.get("kdu-deployment-name"):
2949 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2950 else:
2951 kdu_instance = self.k8scluster_map[
2952 k8sclustertype
2953 ].generate_kdu_instance_name(
2954 db_dict=db_dict_install,
2955 kdu_model=k8s_instance_info["kdu-model"],
2956 kdu_name=k8s_instance_info["kdu-name"],
2957 )
2958
2959 # Update the nsrs table with the kdu-instance value
2960 self.update_db_2(
2961 item="nsrs",
2962 _id=nsr_id,
2963 _desc={nsr_db_path + ".kdu-instance": kdu_instance},
2964 )
2965
2966 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
2967 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
2968 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
2969 # namespace, this first verification could be removed, and the next step would be done for any kind
2970 # of KNF.
2971 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
2972 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
2973 if k8sclustertype in ("juju", "juju-bundle"):
2974 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
2975 # that the user passed a namespace which he wants its KDU to be deployed in)
2976 if (
2977 self.db.count(
2978 table="nsrs",
2979 q_filter={
2980 "_id": nsr_id,
2981 "_admin.projects_write": k8s_instance_info["namespace"],
2982 "_admin.projects_read": k8s_instance_info["namespace"],
2983 },
2984 )
2985 > 0
2986 ):
2987 self.logger.debug(
2988 f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
2989 )
2990 self.update_db_2(
2991 item="nsrs",
2992 _id=nsr_id,
2993 _desc={f"{nsr_db_path}.namespace": kdu_instance},
2994 )
2995 k8s_instance_info["namespace"] = kdu_instance
2996
2997 await self.k8scluster_map[k8sclustertype].install(
2998 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2999 kdu_model=k8s_instance_info["kdu-model"],
3000 atomic=True,
3001 params=k8params,
3002 db_dict=db_dict_install,
3003 timeout=timeout,
3004 kdu_name=k8s_instance_info["kdu-name"],
3005 namespace=k8s_instance_info["namespace"],
3006 kdu_instance=kdu_instance,
3007 vca_id=vca_id,
3008 )
3009
3010 # Obtain services to obtain management service ip
3011 services = await self.k8scluster_map[k8sclustertype].get_services(
3012 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3013 kdu_instance=kdu_instance,
3014 namespace=k8s_instance_info["namespace"],
3015 )
3016
3017 # Obtain management service info (if exists)
3018 vnfr_update_dict = {}
3019 kdu_config = get_configuration(vnfd, kdud["name"])
3020 if kdu_config:
3021 target_ee_list = kdu_config.get("execution-environment-list", [])
3022 else:
3023 target_ee_list = []
3024
3025 if services:
3026 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3027 mgmt_services = [
3028 service
3029 for service in kdud.get("service", [])
3030 if service.get("mgmt-service")
3031 ]
3032 for mgmt_service in mgmt_services:
3033 for service in services:
3034 if service["name"].startswith(mgmt_service["name"]):
3035 # Mgmt service found, Obtain service ip
3036 ip = service.get("external_ip", service.get("cluster_ip"))
3037 if isinstance(ip, list) and len(ip) == 1:
3038 ip = ip[0]
3039
3040 vnfr_update_dict[
3041 "kdur.{}.ip-address".format(kdu_index)
3042 ] = ip
3043
3044 # Check if must update also mgmt ip at the vnf
3045 service_external_cp = mgmt_service.get(
3046 "external-connection-point-ref"
3047 )
3048 if service_external_cp:
3049 if (
3050 deep_get(vnfd, ("mgmt-interface", "cp"))
3051 == service_external_cp
3052 ):
3053 vnfr_update_dict["ip-address"] = ip
3054
3055 if find_in_list(
3056 target_ee_list,
3057 lambda ee: ee.get(
3058 "external-connection-point-ref", ""
3059 )
3060 == service_external_cp,
3061 ):
3062 vnfr_update_dict[
3063 "kdur.{}.ip-address".format(kdu_index)
3064 ] = ip
3065 break
3066 else:
3067 self.logger.warn(
3068 "Mgmt service name: {} not found".format(
3069 mgmt_service["name"]
3070 )
3071 )
3072
3073 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3074 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3075
3076 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3077 if (
3078 kdu_config
3079 and kdu_config.get("initial-config-primitive")
3080 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3081 ):
3082 initial_config_primitive_list = kdu_config.get(
3083 "initial-config-primitive"
3084 )
3085 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3086
3087 for initial_config_primitive in initial_config_primitive_list:
3088 primitive_params_ = self._map_primitive_params(
3089 initial_config_primitive, {}, {}
3090 )
3091
3092 await asyncio.wait_for(
3093 self.k8scluster_map[k8sclustertype].exec_primitive(
3094 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3095 kdu_instance=kdu_instance,
3096 primitive_name=initial_config_primitive["name"],
3097 params=primitive_params_,
3098 db_dict=db_dict_install,
3099 vca_id=vca_id,
3100 ),
3101 timeout=timeout,
3102 )
3103
3104 except Exception as e:
3105 # Prepare update db with error and raise exception
3106 try:
3107 self.update_db_2(
3108 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3109 )
3110 self.update_db_2(
3111 "vnfrs",
3112 vnfr_data.get("_id"),
3113 {"kdur.{}.status".format(kdu_index): "ERROR"},
3114 )
3115 except Exception:
3116 # ignore to keep original exception
3117 pass
3118 # reraise original error
3119 raise
3120
3121 return kdu_instance
3122
3123 async def deploy_kdus(
3124 self,
3125 logging_text,
3126 nsr_id,
3127 nslcmop_id,
3128 db_vnfrs,
3129 db_vnfds,
3130 task_instantiation_info,
3131 ):
3132 # Launch kdus if present in the descriptor
3133
3134 k8scluster_id_2_uuic = {
3135 "helm-chart-v3": {},
3136 "helm-chart": {},
3137 "juju-bundle": {},
3138 }
3139
3140 async def _get_cluster_id(cluster_id, cluster_type):
3141 nonlocal k8scluster_id_2_uuic
3142 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3143 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3144
3145 # check if K8scluster is creating and wait look if previous tasks in process
3146 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3147 "k8scluster", cluster_id
3148 )
3149 if task_dependency:
3150 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3151 task_name, cluster_id
3152 )
3153 self.logger.debug(logging_text + text)
3154 await asyncio.wait(task_dependency, timeout=3600)
3155
3156 db_k8scluster = self.db.get_one(
3157 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3158 )
3159 if not db_k8scluster:
3160 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3161
3162 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3163 if not k8s_id:
3164 if cluster_type == "helm-chart-v3":
3165 try:
3166 # backward compatibility for existing clusters that have not been initialized for helm v3
3167 k8s_credentials = yaml.safe_dump(
3168 db_k8scluster.get("credentials")
3169 )
3170 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3171 k8s_credentials, reuse_cluster_uuid=cluster_id
3172 )
3173 db_k8scluster_update = {}
3174 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3175 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3176 db_k8scluster_update[
3177 "_admin.helm-chart-v3.created"
3178 ] = uninstall_sw
3179 db_k8scluster_update[
3180 "_admin.helm-chart-v3.operationalState"
3181 ] = "ENABLED"
3182 self.update_db_2(
3183 "k8sclusters", cluster_id, db_k8scluster_update
3184 )
3185 except Exception as e:
3186 self.logger.error(
3187 logging_text
3188 + "error initializing helm-v3 cluster: {}".format(str(e))
3189 )
3190 raise LcmException(
3191 "K8s cluster '{}' has not been initialized for '{}'".format(
3192 cluster_id, cluster_type
3193 )
3194 )
3195 else:
3196 raise LcmException(
3197 "K8s cluster '{}' has not been initialized for '{}'".format(
3198 cluster_id, cluster_type
3199 )
3200 )
3201 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3202 return k8s_id
3203
3204 logging_text += "Deploy kdus: "
3205 step = ""
3206 try:
3207 db_nsr_update = {"_admin.deployed.K8s": []}
3208 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3209
3210 index = 0
3211 updated_cluster_list = []
3212 updated_v3_cluster_list = []
3213
3214 for vnfr_data in db_vnfrs.values():
3215 vca_id = self.get_vca_id(vnfr_data, {})
3216 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3217 # Step 0: Prepare and set parameters
3218 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3219 vnfd_id = vnfr_data.get("vnfd-id")
3220 vnfd_with_id = find_in_list(
3221 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3222 )
3223 kdud = next(
3224 kdud
3225 for kdud in vnfd_with_id["kdu"]
3226 if kdud["name"] == kdur["kdu-name"]
3227 )
3228 namespace = kdur.get("k8s-namespace")
3229 kdu_deployment_name = kdur.get("kdu-deployment-name")
3230 if kdur.get("helm-chart"):
3231 kdumodel = kdur["helm-chart"]
3232 # Default version: helm3, if helm-version is v2 assign v2
3233 k8sclustertype = "helm-chart-v3"
3234 self.logger.debug("kdur: {}".format(kdur))
3235 if (
3236 kdur.get("helm-version")
3237 and kdur.get("helm-version") == "v2"
3238 ):
3239 k8sclustertype = "helm-chart"
3240 elif kdur.get("juju-bundle"):
3241 kdumodel = kdur["juju-bundle"]
3242 k8sclustertype = "juju-bundle"
3243 else:
3244 raise LcmException(
3245 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3246 "juju-bundle. Maybe an old NBI version is running".format(
3247 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3248 )
3249 )
3250 # check if kdumodel is a file and exists
3251 try:
3252 vnfd_with_id = find_in_list(
3253 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3254 )
3255 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3256 if storage and storage.get(
3257 "pkg-dir"
3258 ): # may be not present if vnfd has not artifacts
3259 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3260 filename = "{}/{}/{}s/{}".format(
3261 storage["folder"],
3262 storage["pkg-dir"],
3263 k8sclustertype,
3264 kdumodel,
3265 )
3266 if self.fs.file_exists(
3267 filename, mode="file"
3268 ) or self.fs.file_exists(filename, mode="dir"):
3269 kdumodel = self.fs.path + filename
3270 except (asyncio.TimeoutError, asyncio.CancelledError):
3271 raise
3272 except Exception: # it is not a file
3273 pass
3274
3275 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3276 step = "Synchronize repos for k8s cluster '{}'".format(
3277 k8s_cluster_id
3278 )
3279 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3280
3281 # Synchronize repos
3282 if (
3283 k8sclustertype == "helm-chart"
3284 and cluster_uuid not in updated_cluster_list
3285 ) or (
3286 k8sclustertype == "helm-chart-v3"
3287 and cluster_uuid not in updated_v3_cluster_list
3288 ):
3289 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3290 self.k8scluster_map[k8sclustertype].synchronize_repos(
3291 cluster_uuid=cluster_uuid
3292 )
3293 )
3294 if del_repo_list or added_repo_dict:
3295 if k8sclustertype == "helm-chart":
3296 unset = {
3297 "_admin.helm_charts_added." + item: None
3298 for item in del_repo_list
3299 }
3300 updated = {
3301 "_admin.helm_charts_added." + item: name
3302 for item, name in added_repo_dict.items()
3303 }
3304 updated_cluster_list.append(cluster_uuid)
3305 elif k8sclustertype == "helm-chart-v3":
3306 unset = {
3307 "_admin.helm_charts_v3_added." + item: None
3308 for item in del_repo_list
3309 }
3310 updated = {
3311 "_admin.helm_charts_v3_added." + item: name
3312 for item, name in added_repo_dict.items()
3313 }
3314 updated_v3_cluster_list.append(cluster_uuid)
3315 self.logger.debug(
3316 logging_text + "repos synchronized on k8s cluster "
3317 "'{}' to_delete: {}, to_add: {}".format(
3318 k8s_cluster_id, del_repo_list, added_repo_dict
3319 )
3320 )
3321 self.db.set_one(
3322 "k8sclusters",
3323 {"_id": k8s_cluster_id},
3324 updated,
3325 unset=unset,
3326 )
3327
3328 # Instantiate kdu
3329 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3330 vnfr_data["member-vnf-index-ref"],
3331 kdur["kdu-name"],
3332 k8s_cluster_id,
3333 )
3334 k8s_instance_info = {
3335 "kdu-instance": None,
3336 "k8scluster-uuid": cluster_uuid,
3337 "k8scluster-type": k8sclustertype,
3338 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3339 "kdu-name": kdur["kdu-name"],
3340 "kdu-model": kdumodel,
3341 "namespace": namespace,
3342 "kdu-deployment-name": kdu_deployment_name,
3343 }
3344 db_path = "_admin.deployed.K8s.{}".format(index)
3345 db_nsr_update[db_path] = k8s_instance_info
3346 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3347 vnfd_with_id = find_in_list(
3348 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3349 )
3350 task = asyncio.ensure_future(
3351 self._install_kdu(
3352 nsr_id,
3353 db_path,
3354 vnfr_data,
3355 kdu_index,
3356 kdud,
3357 vnfd_with_id,
3358 k8s_instance_info,
3359 k8params=desc_params,
3360 timeout=1800,
3361 vca_id=vca_id,
3362 )
3363 )
3364 self.lcm_tasks.register(
3365 "ns",
3366 nsr_id,
3367 nslcmop_id,
3368 "instantiate_KDU-{}".format(index),
3369 task,
3370 )
3371 task_instantiation_info[task] = "Deploying KDU {}".format(
3372 kdur["kdu-name"]
3373 )
3374
3375 index += 1
3376
3377 except (LcmException, asyncio.CancelledError):
3378 raise
3379 except Exception as e:
3380 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3381 if isinstance(e, (N2VCException, DbException)):
3382 self.logger.error(logging_text + msg)
3383 else:
3384 self.logger.critical(logging_text + msg, exc_info=True)
3385 raise LcmException(msg)
3386 finally:
3387 if db_nsr_update:
3388 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3389
3390 def _deploy_n2vc(
3391 self,
3392 logging_text,
3393 db_nsr,
3394 db_vnfr,
3395 nslcmop_id,
3396 nsr_id,
3397 nsi_id,
3398 vnfd_id,
3399 vdu_id,
3400 kdu_name,
3401 member_vnf_index,
3402 vdu_index,
3403 vdu_name,
3404 deploy_params,
3405 descriptor_config,
3406 base_folder,
3407 task_instantiation_info,
3408 stage,
3409 ):
3410 # launch instantiate_N2VC in a asyncio task and register task object
3411 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3412 # if not found, create one entry and update database
3413 # fill db_nsr._admin.deployed.VCA.<index>
3414
3415 self.logger.debug(
3416 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3417 )
3418 if "execution-environment-list" in descriptor_config:
3419 ee_list = descriptor_config.get("execution-environment-list", [])
3420 elif "juju" in descriptor_config:
3421 ee_list = [descriptor_config] # ns charms
3422 else: # other types as script are not supported
3423 ee_list = []
3424
3425 for ee_item in ee_list:
3426 self.logger.debug(
3427 logging_text
3428 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3429 ee_item.get("juju"), ee_item.get("helm-chart")
3430 )
3431 )
3432 ee_descriptor_id = ee_item.get("id")
3433 if ee_item.get("juju"):
3434 vca_name = ee_item["juju"].get("charm")
3435 vca_type = (
3436 "lxc_proxy_charm"
3437 if ee_item["juju"].get("charm") is not None
3438 else "native_charm"
3439 )
3440 if ee_item["juju"].get("cloud") == "k8s":
3441 vca_type = "k8s_proxy_charm"
3442 elif ee_item["juju"].get("proxy") is False:
3443 vca_type = "native_charm"
3444 elif ee_item.get("helm-chart"):
3445 vca_name = ee_item["helm-chart"]
3446 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3447 vca_type = "helm"
3448 else:
3449 vca_type = "helm-v3"
3450 else:
3451 self.logger.debug(
3452 logging_text + "skipping non juju neither charm configuration"
3453 )
3454 continue
3455
3456 vca_index = -1
3457 for vca_index, vca_deployed in enumerate(
3458 db_nsr["_admin"]["deployed"]["VCA"]
3459 ):
3460 if not vca_deployed:
3461 continue
3462 if (
3463 vca_deployed.get("member-vnf-index") == member_vnf_index
3464 and vca_deployed.get("vdu_id") == vdu_id
3465 and vca_deployed.get("kdu_name") == kdu_name
3466 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3467 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3468 ):
3469 break
3470 else:
3471 # not found, create one.
3472 target = (
3473 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3474 )
3475 if vdu_id:
3476 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3477 elif kdu_name:
3478 target += "/kdu/{}".format(kdu_name)
3479 vca_deployed = {
3480 "target_element": target,
3481 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3482 "member-vnf-index": member_vnf_index,
3483 "vdu_id": vdu_id,
3484 "kdu_name": kdu_name,
3485 "vdu_count_index": vdu_index,
3486 "operational-status": "init", # TODO revise
3487 "detailed-status": "", # TODO revise
3488 "step": "initial-deploy", # TODO revise
3489 "vnfd_id": vnfd_id,
3490 "vdu_name": vdu_name,
3491 "type": vca_type,
3492 "ee_descriptor_id": ee_descriptor_id,
3493 }
3494 vca_index += 1
3495
3496 # create VCA and configurationStatus in db
3497 db_dict = {
3498 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3499 "configurationStatus.{}".format(vca_index): dict(),
3500 }
3501 self.update_db_2("nsrs", nsr_id, db_dict)
3502
3503 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3504
3505 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3506 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3507 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3508
3509 # Launch task
3510 task_n2vc = asyncio.ensure_future(
3511 self.instantiate_N2VC(
3512 logging_text=logging_text,
3513 vca_index=vca_index,
3514 nsi_id=nsi_id,
3515 db_nsr=db_nsr,
3516 db_vnfr=db_vnfr,
3517 vdu_id=vdu_id,
3518 kdu_name=kdu_name,
3519 vdu_index=vdu_index,
3520 deploy_params=deploy_params,
3521 config_descriptor=descriptor_config,
3522 base_folder=base_folder,
3523 nslcmop_id=nslcmop_id,
3524 stage=stage,
3525 vca_type=vca_type,
3526 vca_name=vca_name,
3527 ee_config_descriptor=ee_item,
3528 )
3529 )
3530 self.lcm_tasks.register(
3531 "ns",
3532 nsr_id,
3533 nslcmop_id,
3534 "instantiate_N2VC-{}".format(vca_index),
3535 task_n2vc,
3536 )
3537 task_instantiation_info[
3538 task_n2vc
3539 ] = self.task_name_deploy_vca + " {}.{}".format(
3540 member_vnf_index or "", vdu_id or ""
3541 )
3542
3543 @staticmethod
3544 def _create_nslcmop(nsr_id, operation, params):
3545 """
3546 Creates a ns-lcm-opp content to be stored at database.
3547 :param nsr_id: internal id of the instance
3548 :param operation: instantiate, terminate, scale, action, ...
3549 :param params: user parameters for the operation
3550 :return: dictionary following SOL005 format
3551 """
3552 # Raise exception if invalid arguments
3553 if not (nsr_id and operation and params):
3554 raise LcmException(
3555 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3556 )
3557 now = time()
3558 _id = str(uuid4())
3559 nslcmop = {
3560 "id": _id,
3561 "_id": _id,
3562 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3563 "operationState": "PROCESSING",
3564 "statusEnteredTime": now,
3565 "nsInstanceId": nsr_id,
3566 "lcmOperationType": operation,
3567 "startTime": now,
3568 "isAutomaticInvocation": False,
3569 "operationParams": params,
3570 "isCancelPending": False,
3571 "links": {
3572 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3573 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3574 },
3575 }
3576 return nslcmop
3577
3578 def _format_additional_params(self, params):
3579 params = params or {}
3580 for key, value in params.items():
3581 if str(value).startswith("!!yaml "):
3582 params[key] = yaml.safe_load(value[7:])
3583 return params
3584
3585 def _get_terminate_primitive_params(self, seq, vnf_index):
3586 primitive = seq.get("name")
3587 primitive_params = {}
3588 params = {
3589 "member_vnf_index": vnf_index,
3590 "primitive": primitive,
3591 "primitive_params": primitive_params,
3592 }
3593 desc_params = {}
3594 return self._map_primitive_params(seq, params, desc_params)
3595
3596 # sub-operations
3597
3598 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3599 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3600 if op.get("operationState") == "COMPLETED":
3601 # b. Skip sub-operation
3602 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3603 return self.SUBOPERATION_STATUS_SKIP
3604 else:
3605 # c. retry executing sub-operation
3606 # The sub-operation exists, and operationState != 'COMPLETED'
3607 # Update operationState = 'PROCESSING' to indicate a retry.
3608 operationState = "PROCESSING"
3609 detailed_status = "In progress"
3610 self._update_suboperation_status(
3611 db_nslcmop, op_index, operationState, detailed_status
3612 )
3613 # Return the sub-operation index
3614 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3615 # with arguments extracted from the sub-operation
3616 return op_index
3617
3618 # Find a sub-operation where all keys in a matching dictionary must match
3619 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3620 def _find_suboperation(self, db_nslcmop, match):
3621 if db_nslcmop and match:
3622 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3623 for i, op in enumerate(op_list):
3624 if all(op.get(k) == match[k] for k in match):
3625 return i
3626 return self.SUBOPERATION_STATUS_NOT_FOUND
3627
3628 # Update status for a sub-operation given its index
3629 def _update_suboperation_status(
3630 self, db_nslcmop, op_index, operationState, detailed_status
3631 ):
3632 # Update DB for HA tasks
3633 q_filter = {"_id": db_nslcmop["_id"]}
3634 update_dict = {
3635 "_admin.operations.{}.operationState".format(op_index): operationState,
3636 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3637 }
3638 self.db.set_one(
3639 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3640 )
3641
3642 # Add sub-operation, return the index of the added sub-operation
3643 # Optionally, set operationState, detailed-status, and operationType
3644 # Status and type are currently set for 'scale' sub-operations:
3645 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3646 # 'detailed-status' : status message
3647 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3648 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3649 def _add_suboperation(
3650 self,
3651 db_nslcmop,
3652 vnf_index,
3653 vdu_id,
3654 vdu_count_index,
3655 vdu_name,
3656 primitive,
3657 mapped_primitive_params,
3658 operationState=None,
3659 detailed_status=None,
3660 operationType=None,
3661 RO_nsr_id=None,
3662 RO_scaling_info=None,
3663 ):
3664 if not db_nslcmop:
3665 return self.SUBOPERATION_STATUS_NOT_FOUND
3666 # Get the "_admin.operations" list, if it exists
3667 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3668 op_list = db_nslcmop_admin.get("operations")
3669 # Create or append to the "_admin.operations" list
3670 new_op = {
3671 "member_vnf_index": vnf_index,
3672 "vdu_id": vdu_id,
3673 "vdu_count_index": vdu_count_index,
3674 "primitive": primitive,
3675 "primitive_params": mapped_primitive_params,
3676 }
3677 if operationState:
3678 new_op["operationState"] = operationState
3679 if detailed_status:
3680 new_op["detailed-status"] = detailed_status
3681 if operationType:
3682 new_op["lcmOperationType"] = operationType
3683 if RO_nsr_id:
3684 new_op["RO_nsr_id"] = RO_nsr_id
3685 if RO_scaling_info:
3686 new_op["RO_scaling_info"] = RO_scaling_info
3687 if not op_list:
3688 # No existing operations, create key 'operations' with current operation as first list element
3689 db_nslcmop_admin.update({"operations": [new_op]})
3690 op_list = db_nslcmop_admin.get("operations")
3691 else:
3692 # Existing operations, append operation to list
3693 op_list.append(new_op)
3694
3695 db_nslcmop_update = {"_admin.operations": op_list}
3696 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3697 op_index = len(op_list) - 1
3698 return op_index
3699
3700 # Helper methods for scale() sub-operations
3701
3702 # pre-scale/post-scale:
3703 # Check for 3 different cases:
3704 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3705 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3706 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3707 def _check_or_add_scale_suboperation(
3708 self,
3709 db_nslcmop,
3710 vnf_index,
3711 vnf_config_primitive,
3712 primitive_params,
3713 operationType,
3714 RO_nsr_id=None,
3715 RO_scaling_info=None,
3716 ):
3717 # Find this sub-operation
3718 if RO_nsr_id and RO_scaling_info:
3719 operationType = "SCALE-RO"
3720 match = {
3721 "member_vnf_index": vnf_index,
3722 "RO_nsr_id": RO_nsr_id,
3723 "RO_scaling_info": RO_scaling_info,
3724 }
3725 else:
3726 match = {
3727 "member_vnf_index": vnf_index,
3728 "primitive": vnf_config_primitive,
3729 "primitive_params": primitive_params,
3730 "lcmOperationType": operationType,
3731 }
3732 op_index = self._find_suboperation(db_nslcmop, match)
3733 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3734 # a. New sub-operation
3735 # The sub-operation does not exist, add it.
3736 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3737 # The following parameters are set to None for all kind of scaling:
3738 vdu_id = None
3739 vdu_count_index = None
3740 vdu_name = None
3741 if RO_nsr_id and RO_scaling_info:
3742 vnf_config_primitive = None
3743 primitive_params = None
3744 else:
3745 RO_nsr_id = None
3746 RO_scaling_info = None
3747 # Initial status for sub-operation
3748 operationState = "PROCESSING"
3749 detailed_status = "In progress"
3750 # Add sub-operation for pre/post-scaling (zero or more operations)
3751 self._add_suboperation(
3752 db_nslcmop,
3753 vnf_index,
3754 vdu_id,
3755 vdu_count_index,
3756 vdu_name,
3757 vnf_config_primitive,
3758 primitive_params,
3759 operationState,
3760 detailed_status,
3761 operationType,
3762 RO_nsr_id,
3763 RO_scaling_info,
3764 )
3765 return self.SUBOPERATION_STATUS_NEW
3766 else:
3767 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3768 # or op_index (operationState != 'COMPLETED')
3769 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3770
3771 # Function to return execution_environment id
3772
3773 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3774 # TODO vdu_index_count
3775 for vca in vca_deployed_list:
3776 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3777 return vca["ee_id"]
3778
3779 async def destroy_N2VC(
3780 self,
3781 logging_text,
3782 db_nslcmop,
3783 vca_deployed,
3784 config_descriptor,
3785 vca_index,
3786 destroy_ee=True,
3787 exec_primitives=True,
3788 scaling_in=False,
3789 vca_id: str = None,
3790 ):
3791 """
3792 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3793 :param logging_text:
3794 :param db_nslcmop:
3795 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3796 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3797 :param vca_index: index in the database _admin.deployed.VCA
3798 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3799 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3800 not executed properly
3801 :param scaling_in: True destroys the application, False destroys the model
3802 :return: None or exception
3803 """
3804
3805 self.logger.debug(
3806 logging_text
3807 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3808 vca_index, vca_deployed, config_descriptor, destroy_ee
3809 )
3810 )
3811
3812 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3813
3814 # execute terminate_primitives
3815 if exec_primitives:
3816 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3817 config_descriptor.get("terminate-config-primitive"),
3818 vca_deployed.get("ee_descriptor_id"),
3819 )
3820 vdu_id = vca_deployed.get("vdu_id")
3821 vdu_count_index = vca_deployed.get("vdu_count_index")
3822 vdu_name = vca_deployed.get("vdu_name")
3823 vnf_index = vca_deployed.get("member-vnf-index")
3824 if terminate_primitives and vca_deployed.get("needed_terminate"):
3825 for seq in terminate_primitives:
3826 # For each sequence in list, get primitive and call _ns_execute_primitive()
3827 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3828 vnf_index, seq.get("name")
3829 )
3830 self.logger.debug(logging_text + step)
3831 # Create the primitive for each sequence, i.e. "primitive": "touch"
3832 primitive = seq.get("name")
3833 mapped_primitive_params = self._get_terminate_primitive_params(
3834 seq, vnf_index
3835 )
3836
3837 # Add sub-operation
3838 self._add_suboperation(
3839 db_nslcmop,
3840 vnf_index,
3841 vdu_id,
3842 vdu_count_index,
3843 vdu_name,
3844 primitive,
3845 mapped_primitive_params,
3846 )
3847 # Sub-operations: Call _ns_execute_primitive() instead of action()
3848 try:
3849 result, result_detail = await self._ns_execute_primitive(
3850 vca_deployed["ee_id"],
3851 primitive,
3852 mapped_primitive_params,
3853 vca_type=vca_type,
3854 vca_id=vca_id,
3855 )
3856 except LcmException:
3857 # this happens when VCA is not deployed. In this case it is not needed to terminate
3858 continue
3859 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3860 if result not in result_ok:
3861 raise LcmException(
3862 "terminate_primitive {} for vnf_member_index={} fails with "
3863 "error {}".format(seq.get("name"), vnf_index, result_detail)
3864 )
3865 # set that this VCA do not need terminated
3866 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3867 vca_index
3868 )
3869 self.update_db_2(
3870 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3871 )
3872
3873 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3874 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3875
3876 if destroy_ee:
3877 await self.vca_map[vca_type].delete_execution_environment(
3878 vca_deployed["ee_id"],
3879 scaling_in=scaling_in,
3880 vca_type=vca_type,
3881 vca_id=vca_id,
3882 )
3883
3884 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3885 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3886 namespace = "." + db_nsr["_id"]
3887 try:
3888 await self.n2vc.delete_namespace(
3889 namespace=namespace,
3890 total_timeout=self.timeout_charm_delete,
3891 vca_id=vca_id,
3892 )
3893 except N2VCNotFound: # already deleted. Skip
3894 pass
3895 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3896
3897 async def _terminate_RO(
3898 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3899 ):
3900 """
3901 Terminates a deployment from RO
3902 :param logging_text:
3903 :param nsr_deployed: db_nsr._admin.deployed
3904 :param nsr_id:
3905 :param nslcmop_id:
3906 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3907 this method will update only the index 2, but it will write on database the concatenated content of the list
3908 :return:
3909 """
3910 db_nsr_update = {}
3911 failed_detail = []
3912 ro_nsr_id = ro_delete_action = None
3913 if nsr_deployed and nsr_deployed.get("RO"):
3914 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3915 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3916 try:
3917 if ro_nsr_id:
3918 stage[2] = "Deleting ns from VIM."
3919 db_nsr_update["detailed-status"] = " ".join(stage)
3920 self._write_op_status(nslcmop_id, stage)
3921 self.logger.debug(logging_text + stage[2])
3922 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3923 self._write_op_status(nslcmop_id, stage)
3924 desc = await self.RO.delete("ns", ro_nsr_id)
3925 ro_delete_action = desc["action_id"]
3926 db_nsr_update[
3927 "_admin.deployed.RO.nsr_delete_action_id"
3928 ] = ro_delete_action
3929 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3930 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3931 if ro_delete_action:
3932 # wait until NS is deleted from VIM
3933 stage[2] = "Waiting ns deleted from VIM."
3934 detailed_status_old = None
3935 self.logger.debug(
3936 logging_text
3937 + stage[2]
3938 + " RO_id={} ro_delete_action={}".format(
3939 ro_nsr_id, ro_delete_action
3940 )
3941 )
3942 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3943 self._write_op_status(nslcmop_id, stage)
3944
3945 delete_timeout = 20 * 60 # 20 minutes
3946 while delete_timeout > 0:
3947 desc = await self.RO.show(
3948 "ns",
3949 item_id_name=ro_nsr_id,
3950 extra_item="action",
3951 extra_item_id=ro_delete_action,
3952 )
3953
3954 # deploymentStatus
3955 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3956
3957 ns_status, ns_status_info = self.RO.check_action_status(desc)
3958 if ns_status == "ERROR":
3959 raise ROclient.ROClientException(ns_status_info)
3960 elif ns_status == "BUILD":
3961 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3962 elif ns_status == "ACTIVE":
3963 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3964 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3965 break
3966 else:
3967 assert (
3968 False
3969 ), "ROclient.check_action_status returns unknown {}".format(
3970 ns_status
3971 )
3972 if stage[2] != detailed_status_old:
3973 detailed_status_old = stage[2]
3974 db_nsr_update["detailed-status"] = " ".join(stage)
3975 self._write_op_status(nslcmop_id, stage)
3976 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3977 await asyncio.sleep(5, loop=self.loop)
3978 delete_timeout -= 5
3979 else: # delete_timeout <= 0:
3980 raise ROclient.ROClientException(
3981 "Timeout waiting ns deleted from VIM"
3982 )
3983
3984 except Exception as e:
3985 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3986 if (
3987 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3988 ): # not found
3989 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3990 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3991 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3992 self.logger.debug(
3993 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3994 )
3995 elif (
3996 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3997 ): # conflict
3998 failed_detail.append("delete conflict: {}".format(e))
3999 self.logger.debug(
4000 logging_text
4001 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4002 )
4003 else:
4004 failed_detail.append("delete error: {}".format(e))
4005 self.logger.error(
4006 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4007 )
4008
4009 # Delete nsd
4010 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4011 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4012 try:
4013 stage[2] = "Deleting nsd from RO."
4014 db_nsr_update["detailed-status"] = " ".join(stage)
4015 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4016 self._write_op_status(nslcmop_id, stage)
4017 await self.RO.delete("nsd", ro_nsd_id)
4018 self.logger.debug(
4019 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4020 )
4021 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4022 except Exception as e:
4023 if (
4024 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4025 ): # not found
4026 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4027 self.logger.debug(
4028 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4029 )
4030 elif (
4031 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4032 ): # conflict
4033 failed_detail.append(
4034 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4035 )
4036 self.logger.debug(logging_text + failed_detail[-1])
4037 else:
4038 failed_detail.append(
4039 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4040 )
4041 self.logger.error(logging_text + failed_detail[-1])
4042
4043 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4044 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4045 if not vnf_deployed or not vnf_deployed["id"]:
4046 continue
4047 try:
4048 ro_vnfd_id = vnf_deployed["id"]
4049 stage[
4050 2
4051 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4052 vnf_deployed["member-vnf-index"], ro_vnfd_id
4053 )
4054 db_nsr_update["detailed-status"] = " ".join(stage)
4055 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4056 self._write_op_status(nslcmop_id, stage)
4057 await self.RO.delete("vnfd", ro_vnfd_id)
4058 self.logger.debug(
4059 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4060 )
4061 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4062 except Exception as e:
4063 if (
4064 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4065 ): # not found
4066 db_nsr_update[
4067 "_admin.deployed.RO.vnfd.{}.id".format(index)
4068 ] = None
4069 self.logger.debug(
4070 logging_text
4071 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4072 )
4073 elif (
4074 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4075 ): # conflict
4076 failed_detail.append(
4077 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4078 )
4079 self.logger.debug(logging_text + failed_detail[-1])
4080 else:
4081 failed_detail.append(
4082 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4083 )
4084 self.logger.error(logging_text + failed_detail[-1])
4085
4086 if failed_detail:
4087 stage[2] = "Error deleting from VIM"
4088 else:
4089 stage[2] = "Deleted from VIM"
4090 db_nsr_update["detailed-status"] = " ".join(stage)
4091 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4092 self._write_op_status(nslcmop_id, stage)
4093
4094 if failed_detail:
4095 raise LcmException("; ".join(failed_detail))
4096
4097 async def terminate(self, nsr_id, nslcmop_id):
4098 # Try to lock HA task here
4099 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4100 if not task_is_locked_by_me:
4101 return
4102
4103 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4104 self.logger.debug(logging_text + "Enter")
4105 timeout_ns_terminate = self.timeout_ns_terminate
4106 db_nsr = None
4107 db_nslcmop = None
4108 operation_params = None
4109 exc = None
4110 error_list = [] # annotates all failed error messages
4111 db_nslcmop_update = {}
4112 autoremove = False # autoremove after terminated
4113 tasks_dict_info = {}
4114 db_nsr_update = {}
4115 stage = [
4116 "Stage 1/3: Preparing task.",
4117 "Waiting for previous operations to terminate.",
4118 "",
4119 ]
4120 # ^ contains [stage, step, VIM-status]
4121 try:
4122 # wait for any previous tasks in process
4123 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4124
4125 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4126 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4127 operation_params = db_nslcmop.get("operationParams") or {}
4128 if operation_params.get("timeout_ns_terminate"):
4129 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4130 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4131 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4132
4133 db_nsr_update["operational-status"] = "terminating"
4134 db_nsr_update["config-status"] = "terminating"
4135 self._write_ns_status(
4136 nsr_id=nsr_id,
4137 ns_state="TERMINATING",
4138 current_operation="TERMINATING",
4139 current_operation_id=nslcmop_id,
4140 other_update=db_nsr_update,
4141 )
4142 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4143 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4144 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4145 return
4146
4147 stage[1] = "Getting vnf descriptors from db."
4148 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4149 db_vnfrs_dict = {
4150 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4151 }
4152 db_vnfds_from_id = {}
4153 db_vnfds_from_member_index = {}
4154 # Loop over VNFRs
4155 for vnfr in db_vnfrs_list:
4156 vnfd_id = vnfr["vnfd-id"]
4157 if vnfd_id not in db_vnfds_from_id:
4158 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4159 db_vnfds_from_id[vnfd_id] = vnfd
4160 db_vnfds_from_member_index[
4161 vnfr["member-vnf-index-ref"]
4162 ] = db_vnfds_from_id[vnfd_id]
4163
4164 # Destroy individual execution environments when there are terminating primitives.
4165 # Rest of EE will be deleted at once
4166 # TODO - check before calling _destroy_N2VC
4167 # if not operation_params.get("skip_terminate_primitives"):#
4168 # or not vca.get("needed_terminate"):
4169 stage[0] = "Stage 2/3 execute terminating primitives."
4170 self.logger.debug(logging_text + stage[0])
4171 stage[1] = "Looking execution environment that needs terminate."
4172 self.logger.debug(logging_text + stage[1])
4173
4174 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4175 config_descriptor = None
4176 vca_member_vnf_index = vca.get("member-vnf-index")
4177 vca_id = self.get_vca_id(
4178 db_vnfrs_dict.get(vca_member_vnf_index)
4179 if vca_member_vnf_index
4180 else None,
4181 db_nsr,
4182 )
4183 if not vca or not vca.get("ee_id"):
4184 continue
4185 if not vca.get("member-vnf-index"):
4186 # ns
4187 config_descriptor = db_nsr.get("ns-configuration")
4188 elif vca.get("vdu_id"):
4189 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4190 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4191 elif vca.get("kdu_name"):
4192 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4193 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4194 else:
4195 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4196 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4197 vca_type = vca.get("type")
4198 exec_terminate_primitives = not operation_params.get(
4199 "skip_terminate_primitives"
4200 ) and vca.get("needed_terminate")
4201 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4202 # pending native charms
4203 destroy_ee = (
4204 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4205 )
4206 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4207 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4208 task = asyncio.ensure_future(
4209 self.destroy_N2VC(
4210 logging_text,
4211 db_nslcmop,
4212 vca,
4213 config_descriptor,
4214 vca_index,
4215 destroy_ee,
4216 exec_terminate_primitives,
4217 vca_id=vca_id,
4218 )
4219 )
4220 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4221
4222 # wait for pending tasks of terminate primitives
4223 if tasks_dict_info:
4224 self.logger.debug(
4225 logging_text
4226 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4227 )
4228 error_list = await self._wait_for_tasks(
4229 logging_text,
4230 tasks_dict_info,
4231 min(self.timeout_charm_delete, timeout_ns_terminate),
4232 stage,
4233 nslcmop_id,
4234 )
4235 tasks_dict_info.clear()
4236 if error_list:
4237 return # raise LcmException("; ".join(error_list))
4238
4239 # remove All execution environments at once
4240 stage[0] = "Stage 3/3 delete all."
4241
4242 if nsr_deployed.get("VCA"):
4243 stage[1] = "Deleting all execution environments."
4244 self.logger.debug(logging_text + stage[1])
4245 vca_id = self.get_vca_id({}, db_nsr)
4246 task_delete_ee = asyncio.ensure_future(
4247 asyncio.wait_for(
4248 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4249 timeout=self.timeout_charm_delete,
4250 )
4251 )
4252 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4253 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4254
4255 # Delete from k8scluster
4256 stage[1] = "Deleting KDUs."
4257 self.logger.debug(logging_text + stage[1])
4258 # print(nsr_deployed)
4259 for kdu in get_iterable(nsr_deployed, "K8s"):
4260 if not kdu or not kdu.get("kdu-instance"):
4261 continue
4262 kdu_instance = kdu.get("kdu-instance")
4263 if kdu.get("k8scluster-type") in self.k8scluster_map:
4264 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4265 vca_id = self.get_vca_id({}, db_nsr)
4266 task_delete_kdu_instance = asyncio.ensure_future(
4267 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4268 cluster_uuid=kdu.get("k8scluster-uuid"),
4269 kdu_instance=kdu_instance,
4270 vca_id=vca_id,
4271 )
4272 )
4273 else:
4274 self.logger.error(
4275 logging_text
4276 + "Unknown k8s deployment type {}".format(
4277 kdu.get("k8scluster-type")
4278 )
4279 )
4280 continue
4281 tasks_dict_info[
4282 task_delete_kdu_instance
4283 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4284
4285 # remove from RO
4286 stage[1] = "Deleting ns from VIM."
4287 if self.ng_ro:
4288 task_delete_ro = asyncio.ensure_future(
4289 self._terminate_ng_ro(
4290 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4291 )
4292 )
4293 else:
4294 task_delete_ro = asyncio.ensure_future(
4295 self._terminate_RO(
4296 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4297 )
4298 )
4299 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4300
4301 # rest of staff will be done at finally
4302
4303 except (
4304 ROclient.ROClientException,
4305 DbException,
4306 LcmException,
4307 N2VCException,
4308 ) as e:
4309 self.logger.error(logging_text + "Exit Exception {}".format(e))
4310 exc = e
4311 except asyncio.CancelledError:
4312 self.logger.error(
4313 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4314 )
4315 exc = "Operation was cancelled"
4316 except Exception as e:
4317 exc = traceback.format_exc()
4318 self.logger.critical(
4319 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4320 exc_info=True,
4321 )
4322 finally:
4323 if exc:
4324 error_list.append(str(exc))
4325 try:
4326 # wait for pending tasks
4327 if tasks_dict_info:
4328 stage[1] = "Waiting for terminate pending tasks."
4329 self.logger.debug(logging_text + stage[1])
4330 error_list += await self._wait_for_tasks(
4331 logging_text,
4332 tasks_dict_info,
4333 timeout_ns_terminate,
4334 stage,
4335 nslcmop_id,
4336 )
4337 stage[1] = stage[2] = ""
4338 except asyncio.CancelledError:
4339 error_list.append("Cancelled")
4340 # TODO cancell all tasks
4341 except Exception as exc:
4342 error_list.append(str(exc))
4343 # update status at database
4344 if error_list:
4345 error_detail = "; ".join(error_list)
4346 # self.logger.error(logging_text + error_detail)
4347 error_description_nslcmop = "{} Detail: {}".format(
4348 stage[0], error_detail
4349 )
4350 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4351 nslcmop_id, stage[0]
4352 )
4353
4354 db_nsr_update["operational-status"] = "failed"
4355 db_nsr_update["detailed-status"] = (
4356 error_description_nsr + " Detail: " + error_detail
4357 )
4358 db_nslcmop_update["detailed-status"] = error_detail
4359 nslcmop_operation_state = "FAILED"
4360 ns_state = "BROKEN"
4361 else:
4362 error_detail = None
4363 error_description_nsr = error_description_nslcmop = None
4364 ns_state = "NOT_INSTANTIATED"
4365 db_nsr_update["operational-status"] = "terminated"
4366 db_nsr_update["detailed-status"] = "Done"
4367 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4368 db_nslcmop_update["detailed-status"] = "Done"
4369 nslcmop_operation_state = "COMPLETED"
4370
4371 if db_nsr:
4372 self._write_ns_status(
4373 nsr_id=nsr_id,
4374 ns_state=ns_state,
4375 current_operation="IDLE",
4376 current_operation_id=None,
4377 error_description=error_description_nsr,
4378 error_detail=error_detail,
4379 other_update=db_nsr_update,
4380 )
4381 self._write_op_status(
4382 op_id=nslcmop_id,
4383 stage="",
4384 error_message=error_description_nslcmop,
4385 operation_state=nslcmop_operation_state,
4386 other_update=db_nslcmop_update,
4387 )
4388 if ns_state == "NOT_INSTANTIATED":
4389 try:
4390 self.db.set_list(
4391 "vnfrs",
4392 {"nsr-id-ref": nsr_id},
4393 {"_admin.nsState": "NOT_INSTANTIATED"},
4394 )
4395 except DbException as e:
4396 self.logger.warn(
4397 logging_text
4398 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4399 nsr_id, e
4400 )
4401 )
4402 if operation_params:
4403 autoremove = operation_params.get("autoremove", False)
4404 if nslcmop_operation_state:
4405 try:
4406 await self.msg.aiowrite(
4407 "ns",
4408 "terminated",
4409 {
4410 "nsr_id": nsr_id,
4411 "nslcmop_id": nslcmop_id,
4412 "operationState": nslcmop_operation_state,
4413 "autoremove": autoremove,
4414 },
4415 loop=self.loop,
4416 )
4417 except Exception as e:
4418 self.logger.error(
4419 logging_text + "kafka_write notification Exception {}".format(e)
4420 )
4421
4422 self.logger.debug(logging_text + "Exit")
4423 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4424
4425 async def _wait_for_tasks(
4426 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4427 ):
4428 time_start = time()
4429 error_detail_list = []
4430 error_list = []
4431 pending_tasks = list(created_tasks_info.keys())
4432 num_tasks = len(pending_tasks)
4433 num_done = 0
4434 stage[1] = "{}/{}.".format(num_done, num_tasks)
4435 self._write_op_status(nslcmop_id, stage)
4436 while pending_tasks:
4437 new_error = None
4438 _timeout = timeout + time_start - time()
4439 done, pending_tasks = await asyncio.wait(
4440 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4441 )
4442 num_done += len(done)
4443 if not done: # Timeout
4444 for task in pending_tasks:
4445 new_error = created_tasks_info[task] + ": Timeout"
4446 error_detail_list.append(new_error)
4447 error_list.append(new_error)
4448 break
4449 for task in done:
4450 if task.cancelled():
4451 exc = "Cancelled"
4452 else:
4453 exc = task.exception()
4454 if exc:
4455 if isinstance(exc, asyncio.TimeoutError):
4456 exc = "Timeout"
4457 new_error = created_tasks_info[task] + ": {}".format(exc)
4458 error_list.append(created_tasks_info[task])
4459 error_detail_list.append(new_error)
4460 if isinstance(
4461 exc,
4462 (
4463 str,
4464 DbException,
4465 N2VCException,
4466 ROclient.ROClientException,
4467 LcmException,
4468 K8sException,
4469 NgRoException,
4470 ),
4471 ):
4472 self.logger.error(logging_text + new_error)
4473 else:
4474 exc_traceback = "".join(
4475 traceback.format_exception(None, exc, exc.__traceback__)
4476 )
4477 self.logger.error(
4478 logging_text
4479 + created_tasks_info[task]
4480 + " "
4481 + exc_traceback
4482 )
4483 else:
4484 self.logger.debug(
4485 logging_text + created_tasks_info[task] + ": Done"
4486 )
4487 stage[1] = "{}/{}.".format(num_done, num_tasks)
4488 if new_error:
4489 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4490 if nsr_id: # update also nsr
4491 self.update_db_2(
4492 "nsrs",
4493 nsr_id,
4494 {
4495 "errorDescription": "Error at: " + ", ".join(error_list),
4496 "errorDetail": ". ".join(error_detail_list),
4497 },
4498 )
4499 self._write_op_status(nslcmop_id, stage)
4500 return error_detail_list
4501
4502 @staticmethod
4503 def _map_primitive_params(primitive_desc, params, instantiation_params):
4504 """
4505 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4506 The default-value is used. If it is between < > it look for a value at instantiation_params
4507 :param primitive_desc: portion of VNFD/NSD that describes primitive
4508 :param params: Params provided by user
4509 :param instantiation_params: Instantiation params provided by user
4510 :return: a dictionary with the calculated params
4511 """
4512 calculated_params = {}
4513 for parameter in primitive_desc.get("parameter", ()):
4514 param_name = parameter["name"]
4515 if param_name in params:
4516 calculated_params[param_name] = params[param_name]
4517 elif "default-value" in parameter or "value" in parameter:
4518 if "value" in parameter:
4519 calculated_params[param_name] = parameter["value"]
4520 else:
4521 calculated_params[param_name] = parameter["default-value"]
4522 if (
4523 isinstance(calculated_params[param_name], str)
4524 and calculated_params[param_name].startswith("<")
4525 and calculated_params[param_name].endswith(">")
4526 ):
4527 if calculated_params[param_name][1:-1] in instantiation_params:
4528 calculated_params[param_name] = instantiation_params[
4529 calculated_params[param_name][1:-1]
4530 ]
4531 else:
4532 raise LcmException(
4533 "Parameter {} needed to execute primitive {} not provided".format(
4534 calculated_params[param_name], primitive_desc["name"]
4535 )
4536 )
4537 else:
4538 raise LcmException(
4539 "Parameter {} needed to execute primitive {} not provided".format(
4540 param_name, primitive_desc["name"]
4541 )
4542 )
4543
4544 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4545 calculated_params[param_name] = yaml.safe_dump(
4546 calculated_params[param_name], default_flow_style=True, width=256
4547 )
4548 elif isinstance(calculated_params[param_name], str) and calculated_params[
4549 param_name
4550 ].startswith("!!yaml "):
4551 calculated_params[param_name] = calculated_params[param_name][7:]
4552 if parameter.get("data-type") == "INTEGER":
4553 try:
4554 calculated_params[param_name] = int(calculated_params[param_name])
4555 except ValueError: # error converting string to int
4556 raise LcmException(
4557 "Parameter {} of primitive {} must be integer".format(
4558 param_name, primitive_desc["name"]
4559 )
4560 )
4561 elif parameter.get("data-type") == "BOOLEAN":
4562 calculated_params[param_name] = not (
4563 (str(calculated_params[param_name])).lower() == "false"
4564 )
4565
4566 # add always ns_config_info if primitive name is config
4567 if primitive_desc["name"] == "config":
4568 if "ns_config_info" in instantiation_params:
4569 calculated_params["ns_config_info"] = instantiation_params[
4570 "ns_config_info"
4571 ]
4572 return calculated_params
4573
4574 def _look_for_deployed_vca(
4575 self,
4576 deployed_vca,
4577 member_vnf_index,
4578 vdu_id,
4579 vdu_count_index,
4580 kdu_name=None,
4581 ee_descriptor_id=None,
4582 ):
4583 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4584 for vca in deployed_vca:
4585 if not vca:
4586 continue
4587 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4588 continue
4589 if (
4590 vdu_count_index is not None
4591 and vdu_count_index != vca["vdu_count_index"]
4592 ):
4593 continue
4594 if kdu_name and kdu_name != vca["kdu_name"]:
4595 continue
4596 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4597 continue
4598 break
4599 else:
4600 # vca_deployed not found
4601 raise LcmException(
4602 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4603 " is not deployed".format(
4604 member_vnf_index,
4605 vdu_id,
4606 vdu_count_index,
4607 kdu_name,
4608 ee_descriptor_id,
4609 )
4610 )
4611 # get ee_id
4612 ee_id = vca.get("ee_id")
4613 vca_type = vca.get(
4614 "type", "lxc_proxy_charm"
4615 ) # default value for backward compatibility - proxy charm
4616 if not ee_id:
4617 raise LcmException(
4618 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4619 "execution environment".format(
4620 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4621 )
4622 )
4623 return ee_id, vca_type
4624
4625 async def _ns_execute_primitive(
4626 self,
4627 ee_id,
4628 primitive,
4629 primitive_params,
4630 retries=0,
4631 retries_interval=30,
4632 timeout=None,
4633 vca_type=None,
4634 db_dict=None,
4635 vca_id: str = None,
4636 ) -> (str, str):
4637 try:
4638 if primitive == "config":
4639 primitive_params = {"params": primitive_params}
4640
4641 vca_type = vca_type or "lxc_proxy_charm"
4642
4643 while retries >= 0:
4644 try:
4645 output = await asyncio.wait_for(
4646 self.vca_map[vca_type].exec_primitive(
4647 ee_id=ee_id,
4648 primitive_name=primitive,
4649 params_dict=primitive_params,
4650 progress_timeout=self.timeout_progress_primitive,
4651 total_timeout=self.timeout_primitive,
4652 db_dict=db_dict,
4653 vca_id=vca_id,
4654 vca_type=vca_type,
4655 ),
4656 timeout=timeout or self.timeout_primitive,
4657 )
4658 # execution was OK
4659 break
4660 except asyncio.CancelledError:
4661 raise
4662 except Exception as e: # asyncio.TimeoutError
4663 if isinstance(e, asyncio.TimeoutError):
4664 e = "Timeout"
4665 retries -= 1
4666 if retries >= 0:
4667 self.logger.debug(
4668 "Error executing action {} on {} -> {}".format(
4669 primitive, ee_id, e
4670 )
4671 )
4672 # wait and retry
4673 await asyncio.sleep(retries_interval, loop=self.loop)
4674 else:
4675 return "FAILED", str(e)
4676
4677 return "COMPLETED", output
4678
4679 except (LcmException, asyncio.CancelledError):
4680 raise
4681 except Exception as e:
4682 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4683
4684 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4685 """
4686 Updating the vca_status with latest juju information in nsrs record
4687 :param: nsr_id: Id of the nsr
4688 :param: nslcmop_id: Id of the nslcmop
4689 :return: None
4690 """
4691
4692 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4693 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4694 vca_id = self.get_vca_id({}, db_nsr)
4695 if db_nsr["_admin"]["deployed"]["K8s"]:
4696 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4697 cluster_uuid, kdu_instance, cluster_type = (
4698 k8s["k8scluster-uuid"],
4699 k8s["kdu-instance"],
4700 k8s["k8scluster-type"],
4701 )
4702 await self._on_update_k8s_db(
4703 cluster_uuid=cluster_uuid,
4704 kdu_instance=kdu_instance,
4705 filter={"_id": nsr_id},
4706 vca_id=vca_id,
4707 cluster_type=cluster_type,
4708 )
4709 else:
4710 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4711 table, filter = "nsrs", {"_id": nsr_id}
4712 path = "_admin.deployed.VCA.{}.".format(vca_index)
4713 await self._on_update_n2vc_db(table, filter, path, {})
4714
4715 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4716 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4717
4718 async def action(self, nsr_id, nslcmop_id):
4719 # Try to lock HA task here
4720 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4721 if not task_is_locked_by_me:
4722 return
4723
4724 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4725 self.logger.debug(logging_text + "Enter")
4726 # get all needed from database
4727 db_nsr = None
4728 db_nslcmop = None
4729 db_nsr_update = {}
4730 db_nslcmop_update = {}
4731 nslcmop_operation_state = None
4732 error_description_nslcmop = None
4733 exc = None
4734 try:
4735 # wait for any previous tasks in process
4736 step = "Waiting for previous operations to terminate"
4737 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4738
4739 self._write_ns_status(
4740 nsr_id=nsr_id,
4741 ns_state=None,
4742 current_operation="RUNNING ACTION",
4743 current_operation_id=nslcmop_id,
4744 )
4745
4746 step = "Getting information from database"
4747 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4748 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4749 if db_nslcmop["operationParams"].get("primitive_params"):
4750 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4751 db_nslcmop["operationParams"]["primitive_params"]
4752 )
4753
4754 nsr_deployed = db_nsr["_admin"].get("deployed")
4755 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4756 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4757 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4758 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4759 primitive = db_nslcmop["operationParams"]["primitive"]
4760 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4761 timeout_ns_action = db_nslcmop["operationParams"].get(
4762 "timeout_ns_action", self.timeout_primitive
4763 )
4764
4765 if vnf_index:
4766 step = "Getting vnfr from database"
4767 db_vnfr = self.db.get_one(
4768 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4769 )
4770 if db_vnfr.get("kdur"):
4771 kdur_list = []
4772 for kdur in db_vnfr["kdur"]:
4773 if kdur.get("additionalParams"):
4774 kdur["additionalParams"] = json.loads(
4775 kdur["additionalParams"]
4776 )
4777 kdur_list.append(kdur)
4778 db_vnfr["kdur"] = kdur_list
4779 step = "Getting vnfd from database"
4780 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4781 else:
4782 step = "Getting nsd from database"
4783 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4784
4785 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4786 # for backward compatibility
4787 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4788 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4789 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4790 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4791
4792 # look for primitive
4793 config_primitive_desc = descriptor_configuration = None
4794 if vdu_id:
4795 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4796 elif kdu_name:
4797 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4798 elif vnf_index:
4799 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4800 else:
4801 descriptor_configuration = db_nsd.get("ns-configuration")
4802
4803 if descriptor_configuration and descriptor_configuration.get(
4804 "config-primitive"
4805 ):
4806 for config_primitive in descriptor_configuration["config-primitive"]:
4807 if config_primitive["name"] == primitive:
4808 config_primitive_desc = config_primitive
4809 break
4810
4811 if not config_primitive_desc:
4812 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4813 raise LcmException(
4814 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4815 primitive
4816 )
4817 )
4818 primitive_name = primitive
4819 ee_descriptor_id = None
4820 else:
4821 primitive_name = config_primitive_desc.get(
4822 "execution-environment-primitive", primitive
4823 )
4824 ee_descriptor_id = config_primitive_desc.get(
4825 "execution-environment-ref"
4826 )
4827
4828 if vnf_index:
4829 if vdu_id:
4830 vdur = next(
4831 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4832 )
4833 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4834 elif kdu_name:
4835 kdur = next(
4836 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4837 )
4838 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4839 else:
4840 desc_params = parse_yaml_strings(
4841 db_vnfr.get("additionalParamsForVnf")
4842 )
4843 else:
4844 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4845 if kdu_name and get_configuration(db_vnfd, kdu_name):
4846 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4847 actions = set()
4848 for primitive in kdu_configuration.get("initial-config-primitive", []):
4849 actions.add(primitive["name"])
4850 for primitive in kdu_configuration.get("config-primitive", []):
4851 actions.add(primitive["name"])
4852 kdu_action = True if primitive_name in actions else False
4853
4854 # TODO check if ns is in a proper status
4855 if kdu_name and (
4856 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4857 ):
4858 # kdur and desc_params already set from before
4859 if primitive_params:
4860 desc_params.update(primitive_params)
4861 # TODO Check if we will need something at vnf level
4862 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4863 if (
4864 kdu_name == kdu["kdu-name"]
4865 and kdu["member-vnf-index"] == vnf_index
4866 ):
4867 break
4868 else:
4869 raise LcmException(
4870 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4871 )
4872
4873 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4874 msg = "unknown k8scluster-type '{}'".format(
4875 kdu.get("k8scluster-type")
4876 )
4877 raise LcmException(msg)
4878
4879 db_dict = {
4880 "collection": "nsrs",
4881 "filter": {"_id": nsr_id},
4882 "path": "_admin.deployed.K8s.{}".format(index),
4883 }
4884 self.logger.debug(
4885 logging_text
4886 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4887 )
4888 step = "Executing kdu {}".format(primitive_name)
4889 if primitive_name == "upgrade":
4890 if desc_params.get("kdu_model"):
4891 kdu_model = desc_params.get("kdu_model")
4892 del desc_params["kdu_model"]
4893 else:
4894 kdu_model = kdu.get("kdu-model")
4895 parts = kdu_model.split(sep=":")
4896 if len(parts) == 2:
4897 kdu_model = parts[0]
4898
4899 detailed_status = await asyncio.wait_for(
4900 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4901 cluster_uuid=kdu.get("k8scluster-uuid"),
4902 kdu_instance=kdu.get("kdu-instance"),
4903 atomic=True,
4904 kdu_model=kdu_model,
4905 params=desc_params,
4906 db_dict=db_dict,
4907 timeout=timeout_ns_action,
4908 ),
4909 timeout=timeout_ns_action + 10,
4910 )
4911 self.logger.debug(
4912 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4913 )
4914 elif primitive_name == "rollback":
4915 detailed_status = await asyncio.wait_for(
4916 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4917 cluster_uuid=kdu.get("k8scluster-uuid"),
4918 kdu_instance=kdu.get("kdu-instance"),
4919 db_dict=db_dict,
4920 ),
4921 timeout=timeout_ns_action,
4922 )
4923 elif primitive_name == "status":
4924 detailed_status = await asyncio.wait_for(
4925 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4926 cluster_uuid=kdu.get("k8scluster-uuid"),
4927 kdu_instance=kdu.get("kdu-instance"),
4928 vca_id=vca_id,
4929 ),
4930 timeout=timeout_ns_action,
4931 )
4932 else:
4933 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4934 kdu["kdu-name"], nsr_id
4935 )
4936 params = self._map_primitive_params(
4937 config_primitive_desc, primitive_params, desc_params
4938 )
4939
4940 detailed_status = await asyncio.wait_for(
4941 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4942 cluster_uuid=kdu.get("k8scluster-uuid"),
4943 kdu_instance=kdu_instance,
4944 primitive_name=primitive_name,
4945 params=params,
4946 db_dict=db_dict,
4947 timeout=timeout_ns_action,
4948 vca_id=vca_id,
4949 ),
4950 timeout=timeout_ns_action,
4951 )
4952
4953 if detailed_status:
4954 nslcmop_operation_state = "COMPLETED"
4955 else:
4956 detailed_status = ""
4957 nslcmop_operation_state = "FAILED"
4958 else:
4959 ee_id, vca_type = self._look_for_deployed_vca(
4960 nsr_deployed["VCA"],
4961 member_vnf_index=vnf_index,
4962 vdu_id=vdu_id,
4963 vdu_count_index=vdu_count_index,
4964 ee_descriptor_id=ee_descriptor_id,
4965 )
4966 for vca_index, vca_deployed in enumerate(
4967 db_nsr["_admin"]["deployed"]["VCA"]
4968 ):
4969 if vca_deployed.get("member-vnf-index") == vnf_index:
4970 db_dict = {
4971 "collection": "nsrs",
4972 "filter": {"_id": nsr_id},
4973 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4974 }
4975 break
4976 (
4977 nslcmop_operation_state,
4978 detailed_status,
4979 ) = await self._ns_execute_primitive(
4980 ee_id,
4981 primitive=primitive_name,
4982 primitive_params=self._map_primitive_params(
4983 config_primitive_desc, primitive_params, desc_params
4984 ),
4985 timeout=timeout_ns_action,
4986 vca_type=vca_type,
4987 db_dict=db_dict,
4988 vca_id=vca_id,
4989 )
4990
4991 db_nslcmop_update["detailed-status"] = detailed_status
4992 error_description_nslcmop = (
4993 detailed_status if nslcmop_operation_state == "FAILED" else ""
4994 )
4995 self.logger.debug(
4996 logging_text
4997 + " task Done with result {} {}".format(
4998 nslcmop_operation_state, detailed_status
4999 )
5000 )
5001 return # database update is called inside finally
5002
5003 except (DbException, LcmException, N2VCException, K8sException) as e:
5004 self.logger.error(logging_text + "Exit Exception {}".format(e))
5005 exc = e
5006 except asyncio.CancelledError:
5007 self.logger.error(
5008 logging_text + "Cancelled Exception while '{}'".format(step)
5009 )
5010 exc = "Operation was cancelled"
5011 except asyncio.TimeoutError:
5012 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5013 exc = "Timeout"
5014 except Exception as e:
5015 exc = traceback.format_exc()
5016 self.logger.critical(
5017 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5018 exc_info=True,
5019 )
5020 finally:
5021 if exc:
5022 db_nslcmop_update[
5023 "detailed-status"
5024 ] = (
5025 detailed_status
5026 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5027 nslcmop_operation_state = "FAILED"
5028 if db_nsr:
5029 self._write_ns_status(
5030 nsr_id=nsr_id,
5031 ns_state=db_nsr[
5032 "nsState"
5033 ], # TODO check if degraded. For the moment use previous status
5034 current_operation="IDLE",
5035 current_operation_id=None,
5036 # error_description=error_description_nsr,
5037 # error_detail=error_detail,
5038 other_update=db_nsr_update,
5039 )
5040
5041 self._write_op_status(
5042 op_id=nslcmop_id,
5043 stage="",
5044 error_message=error_description_nslcmop,
5045 operation_state=nslcmop_operation_state,
5046 other_update=db_nslcmop_update,
5047 )
5048
5049 if nslcmop_operation_state:
5050 try:
5051 await self.msg.aiowrite(
5052 "ns",
5053 "actioned",
5054 {
5055 "nsr_id": nsr_id,
5056 "nslcmop_id": nslcmop_id,
5057 "operationState": nslcmop_operation_state,
5058 },
5059 loop=self.loop,
5060 )
5061 except Exception as e:
5062 self.logger.error(
5063 logging_text + "kafka_write notification Exception {}".format(e)
5064 )
5065 self.logger.debug(logging_text + "Exit")
5066 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5067 return nslcmop_operation_state, detailed_status
5068
5069 async def scale(self, nsr_id, nslcmop_id):
5070 # Try to lock HA task here
5071 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5072 if not task_is_locked_by_me:
5073 return
5074
5075 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5076 stage = ["", "", ""]
5077 tasks_dict_info = {}
5078 # ^ stage, step, VIM progress
5079 self.logger.debug(logging_text + "Enter")
5080 # get all needed from database
5081 db_nsr = None
5082 db_nslcmop_update = {}
5083 db_nsr_update = {}
5084 exc = None
5085 # in case of error, indicates what part of scale was failed to put nsr at error status
5086 scale_process = None
5087 old_operational_status = ""
5088 old_config_status = ""
5089 nsi_id = None
5090 try:
5091 # wait for any previous tasks in process
5092 step = "Waiting for previous operations to terminate"
5093 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5094 self._write_ns_status(
5095 nsr_id=nsr_id,
5096 ns_state=None,
5097 current_operation="SCALING",
5098 current_operation_id=nslcmop_id,
5099 )
5100
5101 step = "Getting nslcmop from database"
5102 self.logger.debug(
5103 step + " after having waited for previous tasks to be completed"
5104 )
5105 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5106
5107 step = "Getting nsr from database"
5108 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5109 old_operational_status = db_nsr["operational-status"]
5110 old_config_status = db_nsr["config-status"]
5111
5112 step = "Parsing scaling parameters"
5113 db_nsr_update["operational-status"] = "scaling"
5114 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5115 nsr_deployed = db_nsr["_admin"].get("deployed")
5116
5117 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5118 "scaleByStepData"
5119 ]["member-vnf-index"]
5120 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5121 "scaleByStepData"
5122 ]["scaling-group-descriptor"]
5123 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5124 # for backward compatibility
5125 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5126 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5127 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5128 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5129
5130 step = "Getting vnfr from database"
5131 db_vnfr = self.db.get_one(
5132 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5133 )
5134
5135 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5136
5137 step = "Getting vnfd from database"
5138 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5139
5140 base_folder = db_vnfd["_admin"]["storage"]
5141
5142 step = "Getting scaling-group-descriptor"
5143 scaling_descriptor = find_in_list(
5144 get_scaling_aspect(db_vnfd),
5145 lambda scale_desc: scale_desc["name"] == scaling_group,
5146 )
5147 if not scaling_descriptor:
5148 raise LcmException(
5149 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5150 "at vnfd:scaling-group-descriptor".format(scaling_group)
5151 )
5152
5153 step = "Sending scale order to VIM"
5154 # TODO check if ns is in a proper status
5155 nb_scale_op = 0
5156 if not db_nsr["_admin"].get("scaling-group"):
5157 self.update_db_2(
5158 "nsrs",
5159 nsr_id,
5160 {
5161 "_admin.scaling-group": [
5162 {"name": scaling_group, "nb-scale-op": 0}
5163 ]
5164 },
5165 )
5166 admin_scale_index = 0
5167 else:
5168 for admin_scale_index, admin_scale_info in enumerate(
5169 db_nsr["_admin"]["scaling-group"]
5170 ):
5171 if admin_scale_info["name"] == scaling_group:
5172 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5173 break
5174 else: # not found, set index one plus last element and add new entry with the name
5175 admin_scale_index += 1
5176 db_nsr_update[
5177 "_admin.scaling-group.{}.name".format(admin_scale_index)
5178 ] = scaling_group
5179
5180 vca_scaling_info = []
5181 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5182 if scaling_type == "SCALE_OUT":
5183 if "aspect-delta-details" not in scaling_descriptor:
5184 raise LcmException(
5185 "Aspect delta details not fount in scaling descriptor {}".format(
5186 scaling_descriptor["name"]
5187 )
5188 )
5189 # count if max-instance-count is reached
5190 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5191
5192 scaling_info["scaling_direction"] = "OUT"
5193 scaling_info["vdu-create"] = {}
5194 scaling_info["kdu-create"] = {}
5195 for delta in deltas:
5196 for vdu_delta in delta.get("vdu-delta", {}):
5197 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5198 # vdu_index also provides the number of instance of the targeted vdu
5199 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5200 cloud_init_text = self._get_vdu_cloud_init_content(
5201 vdud, db_vnfd
5202 )
5203 if cloud_init_text:
5204 additional_params = (
5205 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5206 or {}
5207 )
5208 cloud_init_list = []
5209
5210 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5211 max_instance_count = 10
5212 if vdu_profile and "max-number-of-instances" in vdu_profile:
5213 max_instance_count = vdu_profile.get(
5214 "max-number-of-instances", 10
5215 )
5216
5217 default_instance_num = get_number_of_instances(
5218 db_vnfd, vdud["id"]
5219 )
5220 instances_number = vdu_delta.get("number-of-instances", 1)
5221 nb_scale_op += instances_number
5222
5223 new_instance_count = nb_scale_op + default_instance_num
5224 # Control if new count is over max and vdu count is less than max.
5225 # Then assign new instance count
5226 if new_instance_count > max_instance_count > vdu_count:
5227 instances_number = new_instance_count - max_instance_count
5228 else:
5229 instances_number = instances_number
5230
5231 if new_instance_count > max_instance_count:
5232 raise LcmException(
5233 "reached the limit of {} (max-instance-count) "
5234 "scaling-out operations for the "
5235 "scaling-group-descriptor '{}'".format(
5236 nb_scale_op, scaling_group
5237 )
5238 )
5239 for x in range(vdu_delta.get("number-of-instances", 1)):
5240 if cloud_init_text:
5241 # TODO Information of its own ip is not available because db_vnfr is not updated.
5242 additional_params["OSM"] = get_osm_params(
5243 db_vnfr, vdu_delta["id"], vdu_index + x
5244 )
5245 cloud_init_list.append(
5246 self._parse_cloud_init(
5247 cloud_init_text,
5248 additional_params,
5249 db_vnfd["id"],
5250 vdud["id"],
5251 )
5252 )
5253 vca_scaling_info.append(
5254 {
5255 "osm_vdu_id": vdu_delta["id"],
5256 "member-vnf-index": vnf_index,
5257 "type": "create",
5258 "vdu_index": vdu_index + x,
5259 }
5260 )
5261 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5262 for kdu_delta in delta.get("kdu-resource-delta", {}):
5263 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5264 kdu_name = kdu_profile["kdu-name"]
5265 resource_name = kdu_profile["resource-name"]
5266
5267 # Might have different kdus in the same delta
5268 # Should have list for each kdu
5269 if not scaling_info["kdu-create"].get(kdu_name, None):
5270 scaling_info["kdu-create"][kdu_name] = []
5271
5272 kdur = get_kdur(db_vnfr, kdu_name)
5273 if kdur.get("helm-chart"):
5274 k8s_cluster_type = "helm-chart-v3"
5275 self.logger.debug("kdur: {}".format(kdur))
5276 if (
5277 kdur.get("helm-version")
5278 and kdur.get("helm-version") == "v2"
5279 ):
5280 k8s_cluster_type = "helm-chart"
5281 raise NotImplementedError
5282 elif kdur.get("juju-bundle"):
5283 k8s_cluster_type = "juju-bundle"
5284 else:
5285 raise LcmException(
5286 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5287 "juju-bundle. Maybe an old NBI version is running".format(
5288 db_vnfr["member-vnf-index-ref"], kdu_name
5289 )
5290 )
5291
5292 max_instance_count = 10
5293 if kdu_profile and "max-number-of-instances" in kdu_profile:
5294 max_instance_count = kdu_profile.get(
5295 "max-number-of-instances", 10
5296 )
5297
5298 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5299 deployed_kdu, _ = get_deployed_kdu(
5300 nsr_deployed, kdu_name, vnf_index
5301 )
5302 if deployed_kdu is None:
5303 raise LcmException(
5304 "KDU '{}' for vnf '{}' not deployed".format(
5305 kdu_name, vnf_index
5306 )
5307 )
5308 kdu_instance = deployed_kdu.get("kdu-instance")
5309 instance_num = await self.k8scluster_map[
5310 k8s_cluster_type
5311 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5312 kdu_replica_count = instance_num + kdu_delta.get(
5313 "number-of-instances", 1
5314 )
5315
5316 # Control if new count is over max and instance_num is less than max.
5317 # Then assign max instance number to kdu replica count
5318 if kdu_replica_count > max_instance_count > instance_num:
5319 kdu_replica_count = max_instance_count
5320 if kdu_replica_count > max_instance_count:
5321 raise LcmException(
5322 "reached the limit of {} (max-instance-count) "
5323 "scaling-out operations for the "
5324 "scaling-group-descriptor '{}'".format(
5325 instance_num, scaling_group
5326 )
5327 )
5328
5329 for x in range(kdu_delta.get("number-of-instances", 1)):
5330 vca_scaling_info.append(
5331 {
5332 "osm_kdu_id": kdu_name,
5333 "member-vnf-index": vnf_index,
5334 "type": "create",
5335 "kdu_index": instance_num + x - 1,
5336 }
5337 )
5338 scaling_info["kdu-create"][kdu_name].append(
5339 {
5340 "member-vnf-index": vnf_index,
5341 "type": "create",
5342 "k8s-cluster-type": k8s_cluster_type,
5343 "resource-name": resource_name,
5344 "scale": kdu_replica_count,
5345 }
5346 )
5347 elif scaling_type == "SCALE_IN":
5348 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5349
5350 scaling_info["scaling_direction"] = "IN"
5351 scaling_info["vdu-delete"] = {}
5352 scaling_info["kdu-delete"] = {}
5353
5354 for delta in deltas:
5355 for vdu_delta in delta.get("vdu-delta", {}):
5356 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5357 min_instance_count = 0
5358 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5359 if vdu_profile and "min-number-of-instances" in vdu_profile:
5360 min_instance_count = vdu_profile["min-number-of-instances"]
5361
5362 default_instance_num = get_number_of_instances(
5363 db_vnfd, vdu_delta["id"]
5364 )
5365 instance_num = vdu_delta.get("number-of-instances", 1)
5366 nb_scale_op -= instance_num
5367
5368 new_instance_count = nb_scale_op + default_instance_num
5369
5370 if new_instance_count < min_instance_count < vdu_count:
5371 instances_number = min_instance_count - new_instance_count
5372 else:
5373 instances_number = instance_num
5374
5375 if new_instance_count < min_instance_count:
5376 raise LcmException(
5377 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5378 "scaling-group-descriptor '{}'".format(
5379 nb_scale_op, scaling_group
5380 )
5381 )
5382 for x in range(vdu_delta.get("number-of-instances", 1)):
5383 vca_scaling_info.append(
5384 {
5385 "osm_vdu_id": vdu_delta["id"],
5386 "member-vnf-index": vnf_index,
5387 "type": "delete",
5388 "vdu_index": vdu_index - 1 - x,
5389 }
5390 )
5391 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5392 for kdu_delta in delta.get("kdu-resource-delta", {}):
5393 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5394 kdu_name = kdu_profile["kdu-name"]
5395 resource_name = kdu_profile["resource-name"]
5396
5397 if not scaling_info["kdu-delete"].get(kdu_name, None):
5398 scaling_info["kdu-delete"][kdu_name] = []
5399
5400 kdur = get_kdur(db_vnfr, kdu_name)
5401 if kdur.get("helm-chart"):
5402 k8s_cluster_type = "helm-chart-v3"
5403 self.logger.debug("kdur: {}".format(kdur))
5404 if (
5405 kdur.get("helm-version")
5406 and kdur.get("helm-version") == "v2"
5407 ):
5408 k8s_cluster_type = "helm-chart"
5409 raise NotImplementedError
5410 elif kdur.get("juju-bundle"):
5411 k8s_cluster_type = "juju-bundle"
5412 else:
5413 raise LcmException(
5414 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5415 "juju-bundle. Maybe an old NBI version is running".format(
5416 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5417 )
5418 )
5419
5420 min_instance_count = 0
5421 if kdu_profile and "min-number-of-instances" in kdu_profile:
5422 min_instance_count = kdu_profile["min-number-of-instances"]
5423
5424 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5425 deployed_kdu, _ = get_deployed_kdu(
5426 nsr_deployed, kdu_name, vnf_index
5427 )
5428 if deployed_kdu is None:
5429 raise LcmException(
5430 "KDU '{}' for vnf '{}' not deployed".format(
5431 kdu_name, vnf_index
5432 )
5433 )
5434 kdu_instance = deployed_kdu.get("kdu-instance")
5435 instance_num = await self.k8scluster_map[
5436 k8s_cluster_type
5437 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5438 kdu_replica_count = instance_num - kdu_delta.get(
5439 "number-of-instances", 1
5440 )
5441
5442 if kdu_replica_count < min_instance_count < instance_num:
5443 kdu_replica_count = min_instance_count
5444 if kdu_replica_count < min_instance_count:
5445 raise LcmException(
5446 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5447 "scaling-group-descriptor '{}'".format(
5448 instance_num, scaling_group
5449 )
5450 )
5451
5452 for x in range(kdu_delta.get("number-of-instances", 1)):
5453 vca_scaling_info.append(
5454 {
5455 "osm_kdu_id": kdu_name,
5456 "member-vnf-index": vnf_index,
5457 "type": "delete",
5458 "kdu_index": instance_num - x - 1,
5459 }
5460 )
5461 scaling_info["kdu-delete"][kdu_name].append(
5462 {
5463 "member-vnf-index": vnf_index,
5464 "type": "delete",
5465 "k8s-cluster-type": k8s_cluster_type,
5466 "resource-name": resource_name,
5467 "scale": kdu_replica_count,
5468 }
5469 )
5470
5471 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5472 vdu_delete = copy(scaling_info.get("vdu-delete"))
5473 if scaling_info["scaling_direction"] == "IN":
5474 for vdur in reversed(db_vnfr["vdur"]):
5475 if vdu_delete.get(vdur["vdu-id-ref"]):
5476 vdu_delete[vdur["vdu-id-ref"]] -= 1
5477 scaling_info["vdu"].append(
5478 {
5479 "name": vdur.get("name") or vdur.get("vdu-name"),
5480 "vdu_id": vdur["vdu-id-ref"],
5481 "interface": [],
5482 }
5483 )
5484 for interface in vdur["interfaces"]:
5485 scaling_info["vdu"][-1]["interface"].append(
5486 {
5487 "name": interface["name"],
5488 "ip_address": interface["ip-address"],
5489 "mac_address": interface.get("mac-address"),
5490 }
5491 )
5492 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5493
5494 # PRE-SCALE BEGIN
5495 step = "Executing pre-scale vnf-config-primitive"
5496 if scaling_descriptor.get("scaling-config-action"):
5497 for scaling_config_action in scaling_descriptor[
5498 "scaling-config-action"
5499 ]:
5500 if (
5501 scaling_config_action.get("trigger") == "pre-scale-in"
5502 and scaling_type == "SCALE_IN"
5503 ) or (
5504 scaling_config_action.get("trigger") == "pre-scale-out"
5505 and scaling_type == "SCALE_OUT"
5506 ):
5507 vnf_config_primitive = scaling_config_action[
5508 "vnf-config-primitive-name-ref"
5509 ]
5510 step = db_nslcmop_update[
5511 "detailed-status"
5512 ] = "executing pre-scale scaling-config-action '{}'".format(
5513 vnf_config_primitive
5514 )
5515
5516 # look for primitive
5517 for config_primitive in (
5518 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5519 ).get("config-primitive", ()):
5520 if config_primitive["name"] == vnf_config_primitive:
5521 break
5522 else:
5523 raise LcmException(
5524 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5525 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5526 "primitive".format(scaling_group, vnf_config_primitive)
5527 )
5528
5529 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5530 if db_vnfr.get("additionalParamsForVnf"):
5531 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5532
5533 scale_process = "VCA"
5534 db_nsr_update["config-status"] = "configuring pre-scaling"
5535 primitive_params = self._map_primitive_params(
5536 config_primitive, {}, vnfr_params
5537 )
5538
5539 # Pre-scale retry check: Check if this sub-operation has been executed before
5540 op_index = self._check_or_add_scale_suboperation(
5541 db_nslcmop,
5542 vnf_index,
5543 vnf_config_primitive,
5544 primitive_params,
5545 "PRE-SCALE",
5546 )
5547 if op_index == self.SUBOPERATION_STATUS_SKIP:
5548 # Skip sub-operation
5549 result = "COMPLETED"
5550 result_detail = "Done"
5551 self.logger.debug(
5552 logging_text
5553 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5554 vnf_config_primitive, result, result_detail
5555 )
5556 )
5557 else:
5558 if op_index == self.SUBOPERATION_STATUS_NEW:
5559 # New sub-operation: Get index of this sub-operation
5560 op_index = (
5561 len(db_nslcmop.get("_admin", {}).get("operations"))
5562 - 1
5563 )
5564 self.logger.debug(
5565 logging_text
5566 + "vnf_config_primitive={} New sub-operation".format(
5567 vnf_config_primitive
5568 )
5569 )
5570 else:
5571 # retry: Get registered params for this existing sub-operation
5572 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5573 op_index
5574 ]
5575 vnf_index = op.get("member_vnf_index")
5576 vnf_config_primitive = op.get("primitive")
5577 primitive_params = op.get("primitive_params")
5578 self.logger.debug(
5579 logging_text
5580 + "vnf_config_primitive={} Sub-operation retry".format(
5581 vnf_config_primitive
5582 )
5583 )
5584 # Execute the primitive, either with new (first-time) or registered (reintent) args
5585 ee_descriptor_id = config_primitive.get(
5586 "execution-environment-ref"
5587 )
5588 primitive_name = config_primitive.get(
5589 "execution-environment-primitive", vnf_config_primitive
5590 )
5591 ee_id, vca_type = self._look_for_deployed_vca(
5592 nsr_deployed["VCA"],
5593 member_vnf_index=vnf_index,
5594 vdu_id=None,
5595 vdu_count_index=None,
5596 ee_descriptor_id=ee_descriptor_id,
5597 )
5598 result, result_detail = await self._ns_execute_primitive(
5599 ee_id,
5600 primitive_name,
5601 primitive_params,
5602 vca_type=vca_type,
5603 vca_id=vca_id,
5604 )
5605 self.logger.debug(
5606 logging_text
5607 + "vnf_config_primitive={} Done with result {} {}".format(
5608 vnf_config_primitive, result, result_detail
5609 )
5610 )
5611 # Update operationState = COMPLETED | FAILED
5612 self._update_suboperation_status(
5613 db_nslcmop, op_index, result, result_detail
5614 )
5615
5616 if result == "FAILED":
5617 raise LcmException(result_detail)
5618 db_nsr_update["config-status"] = old_config_status
5619 scale_process = None
5620 # PRE-SCALE END
5621
5622 db_nsr_update[
5623 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5624 ] = nb_scale_op
5625 db_nsr_update[
5626 "_admin.scaling-group.{}.time".format(admin_scale_index)
5627 ] = time()
5628
5629 # SCALE-IN VCA - BEGIN
5630 if vca_scaling_info:
5631 step = db_nslcmop_update[
5632 "detailed-status"
5633 ] = "Deleting the execution environments"
5634 scale_process = "VCA"
5635 for vca_info in vca_scaling_info:
5636 if vca_info["type"] == "delete":
5637 member_vnf_index = str(vca_info["member-vnf-index"])
5638 self.logger.debug(
5639 logging_text + "vdu info: {}".format(vca_info)
5640 )
5641 if vca_info.get("osm_vdu_id"):
5642 vdu_id = vca_info["osm_vdu_id"]
5643 vdu_index = int(vca_info["vdu_index"])
5644 stage[
5645 1
5646 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5647 member_vnf_index, vdu_id, vdu_index
5648 )
5649 else:
5650 vdu_index = 0
5651 kdu_id = vca_info["osm_kdu_id"]
5652 stage[
5653 1
5654 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5655 member_vnf_index, kdu_id, vdu_index
5656 )
5657 stage[2] = step = "Scaling in VCA"
5658 self._write_op_status(op_id=nslcmop_id, stage=stage)
5659 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5660 config_update = db_nsr["configurationStatus"]
5661 for vca_index, vca in enumerate(vca_update):
5662 if (
5663 (vca or vca.get("ee_id"))
5664 and vca["member-vnf-index"] == member_vnf_index
5665 and vca["vdu_count_index"] == vdu_index
5666 ):
5667 if vca.get("vdu_id"):
5668 config_descriptor = get_configuration(
5669 db_vnfd, vca.get("vdu_id")
5670 )
5671 elif vca.get("kdu_name"):
5672 config_descriptor = get_configuration(
5673 db_vnfd, vca.get("kdu_name")
5674 )
5675 else:
5676 config_descriptor = get_configuration(
5677 db_vnfd, db_vnfd["id"]
5678 )
5679 operation_params = (
5680 db_nslcmop.get("operationParams") or {}
5681 )
5682 exec_terminate_primitives = not operation_params.get(
5683 "skip_terminate_primitives"
5684 ) and vca.get("needed_terminate")
5685 task = asyncio.ensure_future(
5686 asyncio.wait_for(
5687 self.destroy_N2VC(
5688 logging_text,
5689 db_nslcmop,
5690 vca,
5691 config_descriptor,
5692 vca_index,
5693 destroy_ee=True,
5694 exec_primitives=exec_terminate_primitives,
5695 scaling_in=True,
5696 vca_id=vca_id,
5697 ),
5698 timeout=self.timeout_charm_delete,
5699 )
5700 )
5701 tasks_dict_info[task] = "Terminating VCA {}".format(
5702 vca.get("ee_id")
5703 )
5704 del vca_update[vca_index]
5705 del config_update[vca_index]
5706 # wait for pending tasks of terminate primitives
5707 if tasks_dict_info:
5708 self.logger.debug(
5709 logging_text
5710 + "Waiting for tasks {}".format(
5711 list(tasks_dict_info.keys())
5712 )
5713 )
5714 error_list = await self._wait_for_tasks(
5715 logging_text,
5716 tasks_dict_info,
5717 min(
5718 self.timeout_charm_delete, self.timeout_ns_terminate
5719 ),
5720 stage,
5721 nslcmop_id,
5722 )
5723 tasks_dict_info.clear()
5724 if error_list:
5725 raise LcmException("; ".join(error_list))
5726
5727 db_vca_and_config_update = {
5728 "_admin.deployed.VCA": vca_update,
5729 "configurationStatus": config_update,
5730 }
5731 self.update_db_2(
5732 "nsrs", db_nsr["_id"], db_vca_and_config_update
5733 )
5734 scale_process = None
5735 # SCALE-IN VCA - END
5736
5737 # SCALE RO - BEGIN
5738 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5739 scale_process = "RO"
5740 if self.ro_config.get("ng"):
5741 await self._scale_ng_ro(
5742 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5743 )
5744 scaling_info.pop("vdu-create", None)
5745 scaling_info.pop("vdu-delete", None)
5746
5747 scale_process = None
5748 # SCALE RO - END
5749
5750 # SCALE KDU - BEGIN
5751 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5752 scale_process = "KDU"
5753 await self._scale_kdu(
5754 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5755 )
5756 scaling_info.pop("kdu-create", None)
5757 scaling_info.pop("kdu-delete", None)
5758
5759 scale_process = None
5760 # SCALE KDU - END
5761
5762 if db_nsr_update:
5763 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5764
5765 # SCALE-UP VCA - BEGIN
5766 if vca_scaling_info:
5767 step = db_nslcmop_update[
5768 "detailed-status"
5769 ] = "Creating new execution environments"
5770 scale_process = "VCA"
5771 for vca_info in vca_scaling_info:
5772 if vca_info["type"] == "create":
5773 member_vnf_index = str(vca_info["member-vnf-index"])
5774 self.logger.debug(
5775 logging_text + "vdu info: {}".format(vca_info)
5776 )
5777 vnfd_id = db_vnfr["vnfd-ref"]
5778 if vca_info.get("osm_vdu_id"):
5779 vdu_index = int(vca_info["vdu_index"])
5780 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5781 if db_vnfr.get("additionalParamsForVnf"):
5782 deploy_params.update(
5783 parse_yaml_strings(
5784 db_vnfr["additionalParamsForVnf"].copy()
5785 )
5786 )
5787 descriptor_config = get_configuration(
5788 db_vnfd, db_vnfd["id"]
5789 )
5790 if descriptor_config:
5791 vdu_id = None
5792 vdu_name = None
5793 kdu_name = None
5794 self._deploy_n2vc(
5795 logging_text=logging_text
5796 + "member_vnf_index={} ".format(member_vnf_index),
5797 db_nsr=db_nsr,
5798 db_vnfr=db_vnfr,
5799 nslcmop_id=nslcmop_id,
5800 nsr_id=nsr_id,
5801 nsi_id=nsi_id,
5802 vnfd_id=vnfd_id,
5803 vdu_id=vdu_id,
5804 kdu_name=kdu_name,
5805 member_vnf_index=member_vnf_index,
5806 vdu_index=vdu_index,
5807 vdu_name=vdu_name,
5808 deploy_params=deploy_params,
5809 descriptor_config=descriptor_config,
5810 base_folder=base_folder,
5811 task_instantiation_info=tasks_dict_info,
5812 stage=stage,
5813 )
5814 vdu_id = vca_info["osm_vdu_id"]
5815 vdur = find_in_list(
5816 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5817 )
5818 descriptor_config = get_configuration(db_vnfd, vdu_id)
5819 if vdur.get("additionalParams"):
5820 deploy_params_vdu = parse_yaml_strings(
5821 vdur["additionalParams"]
5822 )
5823 else:
5824 deploy_params_vdu = deploy_params
5825 deploy_params_vdu["OSM"] = get_osm_params(
5826 db_vnfr, vdu_id, vdu_count_index=vdu_index
5827 )
5828 if descriptor_config:
5829 vdu_name = None
5830 kdu_name = None
5831 stage[
5832 1
5833 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5834 member_vnf_index, vdu_id, vdu_index
5835 )
5836 stage[2] = step = "Scaling out VCA"
5837 self._write_op_status(op_id=nslcmop_id, stage=stage)
5838 self._deploy_n2vc(
5839 logging_text=logging_text
5840 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5841 member_vnf_index, vdu_id, vdu_index
5842 ),
5843 db_nsr=db_nsr,
5844 db_vnfr=db_vnfr,
5845 nslcmop_id=nslcmop_id,
5846 nsr_id=nsr_id,
5847 nsi_id=nsi_id,
5848 vnfd_id=vnfd_id,
5849 vdu_id=vdu_id,
5850 kdu_name=kdu_name,
5851 member_vnf_index=member_vnf_index,
5852 vdu_index=vdu_index,
5853 vdu_name=vdu_name,
5854 deploy_params=deploy_params_vdu,
5855 descriptor_config=descriptor_config,
5856 base_folder=base_folder,
5857 task_instantiation_info=tasks_dict_info,
5858 stage=stage,
5859 )
5860 else:
5861 kdu_name = vca_info["osm_kdu_id"]
5862 descriptor_config = get_configuration(db_vnfd, kdu_name)
5863 if descriptor_config:
5864 vdu_id = None
5865 kdu_index = int(vca_info["kdu_index"])
5866 vdu_name = None
5867 kdur = next(
5868 x
5869 for x in db_vnfr["kdur"]
5870 if x["kdu-name"] == kdu_name
5871 )
5872 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5873 if kdur.get("additionalParams"):
5874 deploy_params_kdu = parse_yaml_strings(
5875 kdur["additionalParams"]
5876 )
5877
5878 self._deploy_n2vc(
5879 logging_text=logging_text,
5880 db_nsr=db_nsr,
5881 db_vnfr=db_vnfr,
5882 nslcmop_id=nslcmop_id,
5883 nsr_id=nsr_id,
5884 nsi_id=nsi_id,
5885 vnfd_id=vnfd_id,
5886 vdu_id=vdu_id,
5887 kdu_name=kdu_name,
5888 member_vnf_index=member_vnf_index,
5889 vdu_index=kdu_index,
5890 vdu_name=vdu_name,
5891 deploy_params=deploy_params_kdu,
5892 descriptor_config=descriptor_config,
5893 base_folder=base_folder,
5894 task_instantiation_info=tasks_dict_info,
5895 stage=stage,
5896 )
5897 # SCALE-UP VCA - END
5898 scale_process = None
5899
5900 # POST-SCALE BEGIN
5901 # execute primitive service POST-SCALING
5902 step = "Executing post-scale vnf-config-primitive"
5903 if scaling_descriptor.get("scaling-config-action"):
5904 for scaling_config_action in scaling_descriptor[
5905 "scaling-config-action"
5906 ]:
5907 if (
5908 scaling_config_action.get("trigger") == "post-scale-in"
5909 and scaling_type == "SCALE_IN"
5910 ) or (
5911 scaling_config_action.get("trigger") == "post-scale-out"
5912 and scaling_type == "SCALE_OUT"
5913 ):
5914 vnf_config_primitive = scaling_config_action[
5915 "vnf-config-primitive-name-ref"
5916 ]
5917 step = db_nslcmop_update[
5918 "detailed-status"
5919 ] = "executing post-scale scaling-config-action '{}'".format(
5920 vnf_config_primitive
5921 )
5922
5923 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5924 if db_vnfr.get("additionalParamsForVnf"):
5925 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5926
5927 # look for primitive
5928 for config_primitive in (
5929 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5930 ).get("config-primitive", ()):
5931 if config_primitive["name"] == vnf_config_primitive:
5932 break
5933 else:
5934 raise LcmException(
5935 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5936 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5937 "config-primitive".format(
5938 scaling_group, vnf_config_primitive
5939 )
5940 )
5941 scale_process = "VCA"
5942 db_nsr_update["config-status"] = "configuring post-scaling"
5943 primitive_params = self._map_primitive_params(
5944 config_primitive, {}, vnfr_params
5945 )
5946
5947 # Post-scale retry check: Check if this sub-operation has been executed before
5948 op_index = self._check_or_add_scale_suboperation(
5949 db_nslcmop,
5950 vnf_index,
5951 vnf_config_primitive,
5952 primitive_params,
5953 "POST-SCALE",
5954 )
5955 if op_index == self.SUBOPERATION_STATUS_SKIP:
5956 # Skip sub-operation
5957 result = "COMPLETED"
5958 result_detail = "Done"
5959 self.logger.debug(
5960 logging_text
5961 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5962 vnf_config_primitive, result, result_detail
5963 )
5964 )
5965 else:
5966 if op_index == self.SUBOPERATION_STATUS_NEW:
5967 # New sub-operation: Get index of this sub-operation
5968 op_index = (
5969 len(db_nslcmop.get("_admin", {}).get("operations"))
5970 - 1
5971 )
5972 self.logger.debug(
5973 logging_text
5974 + "vnf_config_primitive={} New sub-operation".format(
5975 vnf_config_primitive
5976 )
5977 )
5978 else:
5979 # retry: Get registered params for this existing sub-operation
5980 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5981 op_index
5982 ]
5983 vnf_index = op.get("member_vnf_index")
5984 vnf_config_primitive = op.get("primitive")
5985 primitive_params = op.get("primitive_params")
5986 self.logger.debug(
5987 logging_text
5988 + "vnf_config_primitive={} Sub-operation retry".format(
5989 vnf_config_primitive
5990 )
5991 )
5992 # Execute the primitive, either with new (first-time) or registered (reintent) args
5993 ee_descriptor_id = config_primitive.get(
5994 "execution-environment-ref"
5995 )
5996 primitive_name = config_primitive.get(
5997 "execution-environment-primitive", vnf_config_primitive
5998 )
5999 ee_id, vca_type = self._look_for_deployed_vca(
6000 nsr_deployed["VCA"],
6001 member_vnf_index=vnf_index,
6002 vdu_id=None,
6003 vdu_count_index=None,
6004 ee_descriptor_id=ee_descriptor_id,
6005 )
6006 result, result_detail = await self._ns_execute_primitive(
6007 ee_id,
6008 primitive_name,
6009 primitive_params,
6010 vca_type=vca_type,
6011 vca_id=vca_id,
6012 )
6013 self.logger.debug(
6014 logging_text
6015 + "vnf_config_primitive={} Done with result {} {}".format(
6016 vnf_config_primitive, result, result_detail
6017 )
6018 )
6019 # Update operationState = COMPLETED | FAILED
6020 self._update_suboperation_status(
6021 db_nslcmop, op_index, result, result_detail
6022 )
6023
6024 if result == "FAILED":
6025 raise LcmException(result_detail)
6026 db_nsr_update["config-status"] = old_config_status
6027 scale_process = None
6028 # POST-SCALE END
6029
6030 db_nsr_update[
6031 "detailed-status"
6032 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6033 db_nsr_update["operational-status"] = (
6034 "running"
6035 if old_operational_status == "failed"
6036 else old_operational_status
6037 )
6038 db_nsr_update["config-status"] = old_config_status
6039 return
6040 except (
6041 ROclient.ROClientException,
6042 DbException,
6043 LcmException,
6044 NgRoException,
6045 ) as e:
6046 self.logger.error(logging_text + "Exit Exception {}".format(e))
6047 exc = e
6048 except asyncio.CancelledError:
6049 self.logger.error(
6050 logging_text + "Cancelled Exception while '{}'".format(step)
6051 )
6052 exc = "Operation was cancelled"
6053 except Exception as e:
6054 exc = traceback.format_exc()
6055 self.logger.critical(
6056 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6057 exc_info=True,
6058 )
6059 finally:
6060 self._write_ns_status(
6061 nsr_id=nsr_id,
6062 ns_state=None,
6063 current_operation="IDLE",
6064 current_operation_id=None,
6065 )
6066 if tasks_dict_info:
6067 stage[1] = "Waiting for instantiate pending tasks."
6068 self.logger.debug(logging_text + stage[1])
6069 exc = await self._wait_for_tasks(
6070 logging_text,
6071 tasks_dict_info,
6072 self.timeout_ns_deploy,
6073 stage,
6074 nslcmop_id,
6075 nsr_id=nsr_id,
6076 )
6077 if exc:
6078 db_nslcmop_update[
6079 "detailed-status"
6080 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6081 nslcmop_operation_state = "FAILED"
6082 if db_nsr:
6083 db_nsr_update["operational-status"] = old_operational_status
6084 db_nsr_update["config-status"] = old_config_status
6085 db_nsr_update["detailed-status"] = ""
6086 if scale_process:
6087 if "VCA" in scale_process:
6088 db_nsr_update["config-status"] = "failed"
6089 if "RO" in scale_process:
6090 db_nsr_update["operational-status"] = "failed"
6091 db_nsr_update[
6092 "detailed-status"
6093 ] = "FAILED scaling nslcmop={} {}: {}".format(
6094 nslcmop_id, step, exc
6095 )
6096 else:
6097 error_description_nslcmop = None
6098 nslcmop_operation_state = "COMPLETED"
6099 db_nslcmop_update["detailed-status"] = "Done"
6100
6101 self._write_op_status(
6102 op_id=nslcmop_id,
6103 stage="",
6104 error_message=error_description_nslcmop,
6105 operation_state=nslcmop_operation_state,
6106 other_update=db_nslcmop_update,
6107 )
6108 if db_nsr:
6109 self._write_ns_status(
6110 nsr_id=nsr_id,
6111 ns_state=None,
6112 current_operation="IDLE",
6113 current_operation_id=None,
6114 other_update=db_nsr_update,
6115 )
6116
6117 if nslcmop_operation_state:
6118 try:
6119 msg = {
6120 "nsr_id": nsr_id,
6121 "nslcmop_id": nslcmop_id,
6122 "operationState": nslcmop_operation_state,
6123 }
6124 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6125 except Exception as e:
6126 self.logger.error(
6127 logging_text + "kafka_write notification Exception {}".format(e)
6128 )
6129 self.logger.debug(logging_text + "Exit")
6130 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6131
6132 async def _scale_kdu(
6133 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6134 ):
6135 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6136 for kdu_name in _scaling_info:
6137 for kdu_scaling_info in _scaling_info[kdu_name]:
6138 deployed_kdu, index = get_deployed_kdu(
6139 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6140 )
6141 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6142 kdu_instance = deployed_kdu["kdu-instance"]
6143 scale = int(kdu_scaling_info["scale"])
6144 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6145
6146 db_dict = {
6147 "collection": "nsrs",
6148 "filter": {"_id": nsr_id},
6149 "path": "_admin.deployed.K8s.{}".format(index),
6150 }
6151
6152 step = "scaling application {}".format(
6153 kdu_scaling_info["resource-name"]
6154 )
6155 self.logger.debug(logging_text + step)
6156
6157 if kdu_scaling_info["type"] == "delete":
6158 kdu_config = get_configuration(db_vnfd, kdu_name)
6159 if (
6160 kdu_config
6161 and kdu_config.get("terminate-config-primitive")
6162 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6163 ):
6164 terminate_config_primitive_list = kdu_config.get(
6165 "terminate-config-primitive"
6166 )
6167 terminate_config_primitive_list.sort(
6168 key=lambda val: int(val["seq"])
6169 )
6170
6171 for (
6172 terminate_config_primitive
6173 ) in terminate_config_primitive_list:
6174 primitive_params_ = self._map_primitive_params(
6175 terminate_config_primitive, {}, {}
6176 )
6177 step = "execute terminate config primitive"
6178 self.logger.debug(logging_text + step)
6179 await asyncio.wait_for(
6180 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6181 cluster_uuid=cluster_uuid,
6182 kdu_instance=kdu_instance,
6183 primitive_name=terminate_config_primitive["name"],
6184 params=primitive_params_,
6185 db_dict=db_dict,
6186 vca_id=vca_id,
6187 ),
6188 timeout=600,
6189 )
6190
6191 await asyncio.wait_for(
6192 self.k8scluster_map[k8s_cluster_type].scale(
6193 kdu_instance,
6194 scale,
6195 kdu_scaling_info["resource-name"],
6196 vca_id=vca_id,
6197 ),
6198 timeout=self.timeout_vca_on_error,
6199 )
6200
6201 if kdu_scaling_info["type"] == "create":
6202 kdu_config = get_configuration(db_vnfd, kdu_name)
6203 if (
6204 kdu_config
6205 and kdu_config.get("initial-config-primitive")
6206 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6207 ):
6208 initial_config_primitive_list = kdu_config.get(
6209 "initial-config-primitive"
6210 )
6211 initial_config_primitive_list.sort(
6212 key=lambda val: int(val["seq"])
6213 )
6214
6215 for initial_config_primitive in initial_config_primitive_list:
6216 primitive_params_ = self._map_primitive_params(
6217 initial_config_primitive, {}, {}
6218 )
6219 step = "execute initial config primitive"
6220 self.logger.debug(logging_text + step)
6221 await asyncio.wait_for(
6222 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6223 cluster_uuid=cluster_uuid,
6224 kdu_instance=kdu_instance,
6225 primitive_name=initial_config_primitive["name"],
6226 params=primitive_params_,
6227 db_dict=db_dict,
6228 vca_id=vca_id,
6229 ),
6230 timeout=600,
6231 )
6232
6233 async def _scale_ng_ro(
6234 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6235 ):
6236 nsr_id = db_nslcmop["nsInstanceId"]
6237 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6238 db_vnfrs = {}
6239
6240 # read from db: vnfd's for every vnf
6241 db_vnfds = []
6242
6243 # for each vnf in ns, read vnfd
6244 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6245 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6246 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6247 # if we haven't this vnfd, read it from db
6248 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6249 # read from db
6250 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6251 db_vnfds.append(vnfd)
6252 n2vc_key = self.n2vc.get_public_key()
6253 n2vc_key_list = [n2vc_key]
6254 self.scale_vnfr(
6255 db_vnfr,
6256 vdu_scaling_info.get("vdu-create"),
6257 vdu_scaling_info.get("vdu-delete"),
6258 mark_delete=True,
6259 )
6260 # db_vnfr has been updated, update db_vnfrs to use it
6261 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6262 await self._instantiate_ng_ro(
6263 logging_text,
6264 nsr_id,
6265 db_nsd,
6266 db_nsr,
6267 db_nslcmop,
6268 db_vnfrs,
6269 db_vnfds,
6270 n2vc_key_list,
6271 stage=stage,
6272 start_deploy=time(),
6273 timeout_ns_deploy=self.timeout_ns_deploy,
6274 )
6275 if vdu_scaling_info.get("vdu-delete"):
6276 self.scale_vnfr(
6277 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6278 )
6279
6280 async def add_prometheus_metrics(
6281 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6282 ):
6283 if not self.prometheus:
6284 return
6285 # look if exist a file called 'prometheus*.j2' and
6286 artifact_content = self.fs.dir_ls(artifact_path)
6287 job_file = next(
6288 (
6289 f
6290 for f in artifact_content
6291 if f.startswith("prometheus") and f.endswith(".j2")
6292 ),
6293 None,
6294 )
6295 if not job_file:
6296 return
6297 with self.fs.file_open((artifact_path, job_file), "r") as f:
6298 job_data = f.read()
6299
6300 # TODO get_service
6301 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6302 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6303 host_port = "80"
6304 vnfr_id = vnfr_id.replace("-", "")
6305 variables = {
6306 "JOB_NAME": vnfr_id,
6307 "TARGET_IP": target_ip,
6308 "EXPORTER_POD_IP": host_name,
6309 "EXPORTER_POD_PORT": host_port,
6310 }
6311 job_list = self.prometheus.parse_job(job_data, variables)
6312 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6313 for job in job_list:
6314 if (
6315 not isinstance(job.get("job_name"), str)
6316 or vnfr_id not in job["job_name"]
6317 ):
6318 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6319 job["nsr_id"] = nsr_id
6320 job_dict = {jl["job_name"]: jl for jl in job_list}
6321 if await self.prometheus.update(job_dict):
6322 return list(job_dict.keys())
6323
6324 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6325 """
6326 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6327
6328 :param: vim_account_id: VIM Account ID
6329
6330 :return: (cloud_name, cloud_credential)
6331 """
6332 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6333 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6334
6335 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6336 """
6337 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6338
6339 :param: vim_account_id: VIM Account ID
6340
6341 :return: (cloud_name, cloud_credential)
6342 """
6343 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6344 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")