ab25be3ea67ca913c6f5a972256789177ada7dcb
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99
100 from copy import copy, deepcopy
101 from time import time
102 from uuid import uuid4
103
104 from random import randint
105
106 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
107
108
109 class NsLcm(LcmBase):
110 timeout_vca_on_error = (
111 5 * 60
112 ) # Time for charm from first time at blocked,error status to mark as failed
113 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
114 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
115 timeout_charm_delete = 10 * 60
116 timeout_primitive = 30 * 60 # timeout for primitive execution
117 timeout_progress_primitive = (
118 10 * 60
119 ) # timeout for some progress in a primitive execution
120
121 SUBOPERATION_STATUS_NOT_FOUND = -1
122 SUBOPERATION_STATUS_NEW = -2
123 SUBOPERATION_STATUS_SKIP = -3
124 task_name_deploy_vca = "Deploying VCA"
125
126 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
127 """
128 Init, Connect to database, filesystem storage, and messaging
129 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
130 :return: None
131 """
132 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
133
134 self.db = Database().instance.db
135 self.fs = Filesystem().instance.fs
136 self.loop = loop
137 self.lcm_tasks = lcm_tasks
138 self.timeout = config["timeout"]
139 self.ro_config = config["ro_config"]
140 self.ng_ro = config["ro_config"].get("ng")
141 self.vca_config = config["VCA"].copy()
142
143 # create N2VC connector
144 self.n2vc = N2VCJujuConnector(
145 log=self.logger,
146 loop=self.loop,
147 on_update_db=self._on_update_n2vc_db,
148 fs=self.fs,
149 db=self.db,
150 )
151
152 self.conn_helm_ee = LCMHelmConn(
153 log=self.logger,
154 loop=self.loop,
155 vca_config=self.vca_config,
156 on_update_db=self._on_update_n2vc_db,
157 )
158
159 self.k8sclusterhelm2 = K8sHelmConnector(
160 kubectl_command=self.vca_config.get("kubectlpath"),
161 helm_command=self.vca_config.get("helmpath"),
162 log=self.logger,
163 on_update_db=None,
164 fs=self.fs,
165 db=self.db,
166 )
167
168 self.k8sclusterhelm3 = K8sHelm3Connector(
169 kubectl_command=self.vca_config.get("kubectlpath"),
170 helm_command=self.vca_config.get("helm3path"),
171 fs=self.fs,
172 log=self.logger,
173 db=self.db,
174 on_update_db=None,
175 )
176
177 self.k8sclusterjuju = K8sJujuConnector(
178 kubectl_command=self.vca_config.get("kubectlpath"),
179 juju_command=self.vca_config.get("jujupath"),
180 log=self.logger,
181 loop=self.loop,
182 on_update_db=self._on_update_k8s_db,
183 fs=self.fs,
184 db=self.db,
185 )
186
187 self.k8scluster_map = {
188 "helm-chart": self.k8sclusterhelm2,
189 "helm-chart-v3": self.k8sclusterhelm3,
190 "chart": self.k8sclusterhelm3,
191 "juju-bundle": self.k8sclusterjuju,
192 "juju": self.k8sclusterjuju,
193 }
194
195 self.vca_map = {
196 "lxc_proxy_charm": self.n2vc,
197 "native_charm": self.n2vc,
198 "k8s_proxy_charm": self.n2vc,
199 "helm": self.conn_helm_ee,
200 "helm-v3": self.conn_helm_ee,
201 }
202
203 self.prometheus = prometheus
204
205 # create RO client
206 self.RO = NgRoClient(self.loop, **self.ro_config)
207
208 @staticmethod
209 def increment_ip_mac(ip_mac, vm_index=1):
210 if not isinstance(ip_mac, str):
211 return ip_mac
212 try:
213 # try with ipv4 look for last dot
214 i = ip_mac.rfind(".")
215 if i > 0:
216 i += 1
217 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
218 # try with ipv6 or mac look for last colon. Operate in hex
219 i = ip_mac.rfind(":")
220 if i > 0:
221 i += 1
222 # format in hex, len can be 2 for mac or 4 for ipv6
223 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
224 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
225 )
226 except Exception:
227 pass
228 return None
229
230 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
231
232 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
233
234 try:
235 # TODO filter RO descriptor fields...
236
237 # write to database
238 db_dict = dict()
239 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
240 db_dict["deploymentStatus"] = ro_descriptor
241 self.update_db_2("nsrs", nsrs_id, db_dict)
242
243 except Exception as e:
244 self.logger.warn(
245 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
246 )
247
248 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
249
250 # remove last dot from path (if exists)
251 if path.endswith("."):
252 path = path[:-1]
253
254 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
255 # .format(table, filter, path, updated_data))
256 try:
257
258 nsr_id = filter.get("_id")
259
260 # read ns record from database
261 nsr = self.db.get_one(table="nsrs", q_filter=filter)
262 current_ns_status = nsr.get("nsState")
263
264 # get vca status for NS
265 status_dict = await self.n2vc.get_status(
266 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
267 )
268
269 # vcaStatus
270 db_dict = dict()
271 db_dict["vcaStatus"] = status_dict
272 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
273
274 # update configurationStatus for this VCA
275 try:
276 vca_index = int(path[path.rfind(".") + 1 :])
277
278 vca_list = deep_get(
279 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
280 )
281 vca_status = vca_list[vca_index].get("status")
282
283 configuration_status_list = nsr.get("configurationStatus")
284 config_status = configuration_status_list[vca_index].get("status")
285
286 if config_status == "BROKEN" and vca_status != "failed":
287 db_dict["configurationStatus"][vca_index] = "READY"
288 elif config_status != "BROKEN" and vca_status == "failed":
289 db_dict["configurationStatus"][vca_index] = "BROKEN"
290 except Exception as e:
291 # not update configurationStatus
292 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
293
294 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
295 # if nsState = 'DEGRADED' check if all is OK
296 is_degraded = False
297 if current_ns_status in ("READY", "DEGRADED"):
298 error_description = ""
299 # check machines
300 if status_dict.get("machines"):
301 for machine_id in status_dict.get("machines"):
302 machine = status_dict.get("machines").get(machine_id)
303 # check machine agent-status
304 if machine.get("agent-status"):
305 s = machine.get("agent-status").get("status")
306 if s != "started":
307 is_degraded = True
308 error_description += (
309 "machine {} agent-status={} ; ".format(
310 machine_id, s
311 )
312 )
313 # check machine instance status
314 if machine.get("instance-status"):
315 s = machine.get("instance-status").get("status")
316 if s != "running":
317 is_degraded = True
318 error_description += (
319 "machine {} instance-status={} ; ".format(
320 machine_id, s
321 )
322 )
323 # check applications
324 if status_dict.get("applications"):
325 for app_id in status_dict.get("applications"):
326 app = status_dict.get("applications").get(app_id)
327 # check application status
328 if app.get("status"):
329 s = app.get("status").get("status")
330 if s != "active":
331 is_degraded = True
332 error_description += (
333 "application {} status={} ; ".format(app_id, s)
334 )
335
336 if error_description:
337 db_dict["errorDescription"] = error_description
338 if current_ns_status == "READY" and is_degraded:
339 db_dict["nsState"] = "DEGRADED"
340 if current_ns_status == "DEGRADED" and not is_degraded:
341 db_dict["nsState"] = "READY"
342
343 # write to database
344 self.update_db_2("nsrs", nsr_id, db_dict)
345
346 except (asyncio.CancelledError, asyncio.TimeoutError):
347 raise
348 except Exception as e:
349 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
350
351 async def _on_update_k8s_db(
352 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
353 ):
354 """
355 Updating vca status in NSR record
356 :param cluster_uuid: UUID of a k8s cluster
357 :param kdu_instance: The unique name of the KDU instance
358 :param filter: To get nsr_id
359 :return: none
360 """
361
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
364
365 try:
366 nsr_id = filter.get("_id")
367
368 # get vca status for NS
369 vca_status = await self.k8sclusterjuju.status_kdu(
370 cluster_uuid,
371 kdu_instance,
372 complete_status=True,
373 yaml_format=False,
374 vca_id=vca_id,
375 )
376 # vcaStatus
377 db_dict = dict()
378 db_dict["vcaStatus"] = {nsr_id: vca_status}
379
380 await self.k8sclusterjuju.update_vca_status(
381 db_dict["vcaStatus"],
382 kdu_instance,
383 vca_id=vca_id,
384 )
385
386 # write to database
387 self.update_db_2("nsrs", nsr_id, db_dict)
388
389 except (asyncio.CancelledError, asyncio.TimeoutError):
390 raise
391 except Exception as e:
392 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
393
394 @staticmethod
395 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
396 try:
397 env = Environment(undefined=StrictUndefined)
398 template = env.from_string(cloud_init_text)
399 return template.render(additional_params or {})
400 except UndefinedError as e:
401 raise LcmException(
402 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
403 "file, must be provided in the instantiation parameters inside the "
404 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
405 )
406 except (TemplateError, TemplateNotFound) as e:
407 raise LcmException(
408 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
409 vnfd_id, vdu_id, e
410 )
411 )
412
413 def _get_vdu_cloud_init_content(self, vdu, vnfd):
414 cloud_init_content = cloud_init_file = None
415 try:
416 if vdu.get("cloud-init-file"):
417 base_folder = vnfd["_admin"]["storage"]
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 with self.fs.file_open(cloud_init_file, "r") as ci_file:
424 cloud_init_content = ci_file.read()
425 elif vdu.get("cloud-init"):
426 cloud_init_content = vdu["cloud-init"]
427
428 return cloud_init_content
429 except FsException as e:
430 raise LcmException(
431 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
432 vnfd["id"], vdu["id"], cloud_init_file, e
433 )
434 )
435
436 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
437 vdur = next(
438 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
439 )
440 additional_params = vdur.get("additionalParams")
441 return parse_yaml_strings(additional_params)
442
443 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
444 """
445 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
446 :param vnfd: input vnfd
447 :param new_id: overrides vnf id if provided
448 :param additionalParams: Instantiation params for VNFs provided
449 :param nsrId: Id of the NSR
450 :return: copy of vnfd
451 """
452 vnfd_RO = deepcopy(vnfd)
453 # remove unused by RO configuration, monitoring, scaling and internal keys
454 vnfd_RO.pop("_id", None)
455 vnfd_RO.pop("_admin", None)
456 vnfd_RO.pop("monitoring-param", None)
457 vnfd_RO.pop("scaling-group-descriptor", None)
458 vnfd_RO.pop("kdu", None)
459 vnfd_RO.pop("k8s-cluster", None)
460 if new_id:
461 vnfd_RO["id"] = new_id
462
463 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
464 for vdu in get_iterable(vnfd_RO, "vdu"):
465 vdu.pop("cloud-init-file", None)
466 vdu.pop("cloud-init", None)
467 return vnfd_RO
468
469 @staticmethod
470 def ip_profile_2_RO(ip_profile):
471 RO_ip_profile = deepcopy(ip_profile)
472 if "dns-server" in RO_ip_profile:
473 if isinstance(RO_ip_profile["dns-server"], list):
474 RO_ip_profile["dns-address"] = []
475 for ds in RO_ip_profile.pop("dns-server"):
476 RO_ip_profile["dns-address"].append(ds["address"])
477 else:
478 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
479 if RO_ip_profile.get("ip-version") == "ipv4":
480 RO_ip_profile["ip-version"] = "IPv4"
481 if RO_ip_profile.get("ip-version") == "ipv6":
482 RO_ip_profile["ip-version"] = "IPv6"
483 if "dhcp-params" in RO_ip_profile:
484 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
485 return RO_ip_profile
486
487 def _get_ro_vim_id_for_vim_account(self, vim_account):
488 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
489 if db_vim["_admin"]["operationalState"] != "ENABLED":
490 raise LcmException(
491 "VIM={} is not available. operationalState={}".format(
492 vim_account, db_vim["_admin"]["operationalState"]
493 )
494 )
495 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
496 return RO_vim_id
497
498 def get_ro_wim_id_for_wim_account(self, wim_account):
499 if isinstance(wim_account, str):
500 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
501 if db_wim["_admin"]["operationalState"] != "ENABLED":
502 raise LcmException(
503 "WIM={} is not available. operationalState={}".format(
504 wim_account, db_wim["_admin"]["operationalState"]
505 )
506 )
507 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
508 return RO_wim_id
509 else:
510 return wim_account
511
512 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
513
514 db_vdu_push_list = []
515 db_update = {"_admin.modified": time()}
516 if vdu_create:
517 for vdu_id, vdu_count in vdu_create.items():
518 vdur = next(
519 (
520 vdur
521 for vdur in reversed(db_vnfr["vdur"])
522 if vdur["vdu-id-ref"] == vdu_id
523 ),
524 None,
525 )
526 if not vdur:
527 raise LcmException(
528 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
529 vdu_id
530 )
531 )
532
533 for count in range(vdu_count):
534 vdur_copy = deepcopy(vdur)
535 vdur_copy["status"] = "BUILD"
536 vdur_copy["status-detailed"] = None
537 vdur_copy["ip-address"]: None
538 vdur_copy["_id"] = str(uuid4())
539 vdur_copy["count-index"] += count + 1
540 vdur_copy["id"] = "{}-{}".format(
541 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
542 )
543 vdur_copy.pop("vim_info", None)
544 for iface in vdur_copy["interfaces"]:
545 if iface.get("fixed-ip"):
546 iface["ip-address"] = self.increment_ip_mac(
547 iface["ip-address"], count + 1
548 )
549 else:
550 iface.pop("ip-address", None)
551 if iface.get("fixed-mac"):
552 iface["mac-address"] = self.increment_ip_mac(
553 iface["mac-address"], count + 1
554 )
555 else:
556 iface.pop("mac-address", None)
557 iface.pop(
558 "mgmt_vnf", None
559 ) # only first vdu can be managment of vnf
560 db_vdu_push_list.append(vdur_copy)
561 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
562 if vdu_delete:
563 for vdu_id, vdu_count in vdu_delete.items():
564 if mark_delete:
565 indexes_to_delete = [
566 iv[0]
567 for iv in enumerate(db_vnfr["vdur"])
568 if iv[1]["vdu-id-ref"] == vdu_id
569 ]
570 db_update.update(
571 {
572 "vdur.{}.status".format(i): "DELETING"
573 for i in indexes_to_delete[-vdu_count:]
574 }
575 )
576 else:
577 # it must be deleted one by one because common.db does not allow otherwise
578 vdus_to_delete = [
579 v
580 for v in reversed(db_vnfr["vdur"])
581 if v["vdu-id-ref"] == vdu_id
582 ]
583 for vdu in vdus_to_delete[:vdu_count]:
584 self.db.set_one(
585 "vnfrs",
586 {"_id": db_vnfr["_id"]},
587 None,
588 pull={"vdur": {"_id": vdu["_id"]}},
589 )
590 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
591 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
592 # modify passed dictionary db_vnfr
593 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
594 db_vnfr["vdur"] = db_vnfr_["vdur"]
595
596 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
597 """
598 Updates database nsr with the RO info for the created vld
599 :param ns_update_nsr: dictionary to be filled with the updated info
600 :param db_nsr: content of db_nsr. This is also modified
601 :param nsr_desc_RO: nsr descriptor from RO
602 :return: Nothing, LcmException is raised on errors
603 """
604
605 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
606 for net_RO in get_iterable(nsr_desc_RO, "nets"):
607 if vld["id"] != net_RO.get("ns_net_osm_id"):
608 continue
609 vld["vim-id"] = net_RO.get("vim_net_id")
610 vld["name"] = net_RO.get("vim_name")
611 vld["status"] = net_RO.get("status")
612 vld["status-detailed"] = net_RO.get("error_msg")
613 ns_update_nsr["vld.{}".format(vld_index)] = vld
614 break
615 else:
616 raise LcmException(
617 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
618 )
619
620 def set_vnfr_at_error(self, db_vnfrs, error_text):
621 try:
622 for db_vnfr in db_vnfrs.values():
623 vnfr_update = {"status": "ERROR"}
624 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
625 if "status" not in vdur:
626 vdur["status"] = "ERROR"
627 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
628 if error_text:
629 vdur["status-detailed"] = str(error_text)
630 vnfr_update[
631 "vdur.{}.status-detailed".format(vdu_index)
632 ] = "ERROR"
633 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
634 except DbException as e:
635 self.logger.error("Cannot update vnf. {}".format(e))
636
637 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
638 """
639 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
640 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
641 :param nsr_desc_RO: nsr descriptor from RO
642 :return: Nothing, LcmException is raised on errors
643 """
644 for vnf_index, db_vnfr in db_vnfrs.items():
645 for vnf_RO in nsr_desc_RO["vnfs"]:
646 if vnf_RO["member_vnf_index"] != vnf_index:
647 continue
648 vnfr_update = {}
649 if vnf_RO.get("ip_address"):
650 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
651 "ip_address"
652 ].split(";")[0]
653 elif not db_vnfr.get("ip-address"):
654 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
655 raise LcmExceptionNoMgmtIP(
656 "ns member_vnf_index '{}' has no IP address".format(
657 vnf_index
658 )
659 )
660
661 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
662 vdur_RO_count_index = 0
663 if vdur.get("pdu-type"):
664 continue
665 for vdur_RO in get_iterable(vnf_RO, "vms"):
666 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
667 continue
668 if vdur["count-index"] != vdur_RO_count_index:
669 vdur_RO_count_index += 1
670 continue
671 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
672 if vdur_RO.get("ip_address"):
673 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
674 else:
675 vdur["ip-address"] = None
676 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
677 vdur["name"] = vdur_RO.get("vim_name")
678 vdur["status"] = vdur_RO.get("status")
679 vdur["status-detailed"] = vdur_RO.get("error_msg")
680 for ifacer in get_iterable(vdur, "interfaces"):
681 for interface_RO in get_iterable(vdur_RO, "interfaces"):
682 if ifacer["name"] == interface_RO.get("internal_name"):
683 ifacer["ip-address"] = interface_RO.get(
684 "ip_address"
685 )
686 ifacer["mac-address"] = interface_RO.get(
687 "mac_address"
688 )
689 break
690 else:
691 raise LcmException(
692 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
693 "from VIM info".format(
694 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
695 )
696 )
697 vnfr_update["vdur.{}".format(vdu_index)] = vdur
698 break
699 else:
700 raise LcmException(
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
702 "VIM info".format(
703 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
704 )
705 )
706
707 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
708 for net_RO in get_iterable(nsr_desc_RO, "nets"):
709 if vld["id"] != net_RO.get("vnf_net_osm_id"):
710 continue
711 vld["vim-id"] = net_RO.get("vim_net_id")
712 vld["name"] = net_RO.get("vim_name")
713 vld["status"] = net_RO.get("status")
714 vld["status-detailed"] = net_RO.get("error_msg")
715 vnfr_update["vld.{}".format(vld_index)] = vld
716 break
717 else:
718 raise LcmException(
719 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
720 vnf_index, vld["id"]
721 )
722 )
723
724 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
725 break
726
727 else:
728 raise LcmException(
729 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
730 vnf_index
731 )
732 )
733
734 def _get_ns_config_info(self, nsr_id):
735 """
736 Generates a mapping between vnf,vdu elements and the N2VC id
737 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
738 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
739 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
740 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
741 """
742 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
743 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
744 mapping = {}
745 ns_config_info = {"osm-config-mapping": mapping}
746 for vca in vca_deployed_list:
747 if not vca["member-vnf-index"]:
748 continue
749 if not vca["vdu_id"]:
750 mapping[vca["member-vnf-index"]] = vca["application"]
751 else:
752 mapping[
753 "{}.{}.{}".format(
754 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
755 )
756 ] = vca["application"]
757 return ns_config_info
758
759 async def _instantiate_ng_ro(
760 self,
761 logging_text,
762 nsr_id,
763 nsd,
764 db_nsr,
765 db_nslcmop,
766 db_vnfrs,
767 db_vnfds,
768 n2vc_key_list,
769 stage,
770 start_deploy,
771 timeout_ns_deploy,
772 ):
773
774 db_vims = {}
775
776 def get_vim_account(vim_account_id):
777 nonlocal db_vims
778 if vim_account_id in db_vims:
779 return db_vims[vim_account_id]
780 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
781 db_vims[vim_account_id] = db_vim
782 return db_vim
783
784 # modify target_vld info with instantiation parameters
785 def parse_vld_instantiation_params(
786 target_vim, target_vld, vld_params, target_sdn
787 ):
788 if vld_params.get("ip-profile"):
789 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
790 "ip-profile"
791 ]
792 if vld_params.get("provider-network"):
793 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
794 "provider-network"
795 ]
796 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
797 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
798 "provider-network"
799 ]["sdn-ports"]
800 if vld_params.get("wimAccountId"):
801 target_wim = "wim:{}".format(vld_params["wimAccountId"])
802 target_vld["vim_info"][target_wim] = {}
803 for param in ("vim-network-name", "vim-network-id"):
804 if vld_params.get(param):
805 if isinstance(vld_params[param], dict):
806 for vim, vim_net in vld_params[param].items():
807 other_target_vim = "vim:" + vim
808 populate_dict(
809 target_vld["vim_info"],
810 (other_target_vim, param.replace("-", "_")),
811 vim_net,
812 )
813 else: # isinstance str
814 target_vld["vim_info"][target_vim][
815 param.replace("-", "_")
816 ] = vld_params[param]
817 if vld_params.get("common_id"):
818 target_vld["common_id"] = vld_params.get("common_id")
819
820 nslcmop_id = db_nslcmop["_id"]
821 target = {
822 "name": db_nsr["name"],
823 "ns": {"vld": []},
824 "vnf": [],
825 "image": deepcopy(db_nsr["image"]),
826 "flavor": deepcopy(db_nsr["flavor"]),
827 "action_id": nslcmop_id,
828 "cloud_init_content": {},
829 }
830 for image in target["image"]:
831 image["vim_info"] = {}
832 for flavor in target["flavor"]:
833 flavor["vim_info"] = {}
834
835 if db_nslcmop.get("lcmOperationType") != "instantiate":
836 # get parameters of instantiation:
837 db_nslcmop_instantiate = self.db.get_list(
838 "nslcmops",
839 {
840 "nsInstanceId": db_nslcmop["nsInstanceId"],
841 "lcmOperationType": "instantiate",
842 },
843 )[-1]
844 ns_params = db_nslcmop_instantiate.get("operationParams")
845 else:
846 ns_params = db_nslcmop.get("operationParams")
847 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
848 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
849
850 cp2target = {}
851 for vld_index, vld in enumerate(db_nsr.get("vld")):
852 target_vim = "vim:{}".format(ns_params["vimAccountId"])
853 target_vld = {
854 "id": vld["id"],
855 "name": vld["name"],
856 "mgmt-network": vld.get("mgmt-network", False),
857 "type": vld.get("type"),
858 "vim_info": {
859 target_vim: {
860 "vim_network_name": vld.get("vim-network-name"),
861 "vim_account_id": ns_params["vimAccountId"],
862 }
863 },
864 }
865 # check if this network needs SDN assist
866 if vld.get("pci-interfaces"):
867 db_vim = get_vim_account(ns_params["vimAccountId"])
868 sdnc_id = db_vim["config"].get("sdn-controller")
869 if sdnc_id:
870 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
871 target_sdn = "sdn:{}".format(sdnc_id)
872 target_vld["vim_info"][target_sdn] = {
873 "sdn": True,
874 "target_vim": target_vim,
875 "vlds": [sdn_vld],
876 "type": vld.get("type"),
877 }
878
879 nsd_vnf_profiles = get_vnf_profiles(nsd)
880 for nsd_vnf_profile in nsd_vnf_profiles:
881 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
882 if cp["virtual-link-profile-id"] == vld["id"]:
883 cp2target[
884 "member_vnf:{}.{}".format(
885 cp["constituent-cpd-id"][0][
886 "constituent-base-element-id"
887 ],
888 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
889 )
890 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
891
892 # check at nsd descriptor, if there is an ip-profile
893 vld_params = {}
894 nsd_vlp = find_in_list(
895 get_virtual_link_profiles(nsd),
896 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
897 == vld["id"],
898 )
899 if (
900 nsd_vlp
901 and nsd_vlp.get("virtual-link-protocol-data")
902 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
903 ):
904 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
905 "l3-protocol-data"
906 ]
907 ip_profile_dest_data = {}
908 if "ip-version" in ip_profile_source_data:
909 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
910 "ip-version"
911 ]
912 if "cidr" in ip_profile_source_data:
913 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
914 "cidr"
915 ]
916 if "gateway-ip" in ip_profile_source_data:
917 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
918 "gateway-ip"
919 ]
920 if "dhcp-enabled" in ip_profile_source_data:
921 ip_profile_dest_data["dhcp-params"] = {
922 "enabled": ip_profile_source_data["dhcp-enabled"]
923 }
924 vld_params["ip-profile"] = ip_profile_dest_data
925
926 # update vld_params with instantiation params
927 vld_instantiation_params = find_in_list(
928 get_iterable(ns_params, "vld"),
929 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
930 )
931 if vld_instantiation_params:
932 vld_params.update(vld_instantiation_params)
933 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
934 target["ns"]["vld"].append(target_vld)
935
936 for vnfr in db_vnfrs.values():
937 vnfd = find_in_list(
938 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
939 )
940 vnf_params = find_in_list(
941 get_iterable(ns_params, "vnf"),
942 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
943 )
944 target_vnf = deepcopy(vnfr)
945 target_vim = "vim:{}".format(vnfr["vim-account-id"])
946 for vld in target_vnf.get("vld", ()):
947 # check if connected to a ns.vld, to fill target'
948 vnf_cp = find_in_list(
949 vnfd.get("int-virtual-link-desc", ()),
950 lambda cpd: cpd.get("id") == vld["id"],
951 )
952 if vnf_cp:
953 ns_cp = "member_vnf:{}.{}".format(
954 vnfr["member-vnf-index-ref"], vnf_cp["id"]
955 )
956 if cp2target.get(ns_cp):
957 vld["target"] = cp2target[ns_cp]
958
959 vld["vim_info"] = {
960 target_vim: {"vim_network_name": vld.get("vim-network-name")}
961 }
962 # check if this network needs SDN assist
963 target_sdn = None
964 if vld.get("pci-interfaces"):
965 db_vim = get_vim_account(vnfr["vim-account-id"])
966 sdnc_id = db_vim["config"].get("sdn-controller")
967 if sdnc_id:
968 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
969 target_sdn = "sdn:{}".format(sdnc_id)
970 vld["vim_info"][target_sdn] = {
971 "sdn": True,
972 "target_vim": target_vim,
973 "vlds": [sdn_vld],
974 "type": vld.get("type"),
975 }
976
977 # check at vnfd descriptor, if there is an ip-profile
978 vld_params = {}
979 vnfd_vlp = find_in_list(
980 get_virtual_link_profiles(vnfd),
981 lambda a_link_profile: a_link_profile["id"] == vld["id"],
982 )
983 if (
984 vnfd_vlp
985 and vnfd_vlp.get("virtual-link-protocol-data")
986 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
987 ):
988 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
989 "l3-protocol-data"
990 ]
991 ip_profile_dest_data = {}
992 if "ip-version" in ip_profile_source_data:
993 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
994 "ip-version"
995 ]
996 if "cidr" in ip_profile_source_data:
997 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
998 "cidr"
999 ]
1000 if "gateway-ip" in ip_profile_source_data:
1001 ip_profile_dest_data[
1002 "gateway-address"
1003 ] = ip_profile_source_data["gateway-ip"]
1004 if "dhcp-enabled" in ip_profile_source_data:
1005 ip_profile_dest_data["dhcp-params"] = {
1006 "enabled": ip_profile_source_data["dhcp-enabled"]
1007 }
1008
1009 vld_params["ip-profile"] = ip_profile_dest_data
1010 # update vld_params with instantiation params
1011 if vnf_params:
1012 vld_instantiation_params = find_in_list(
1013 get_iterable(vnf_params, "internal-vld"),
1014 lambda i_vld: i_vld["name"] == vld["id"],
1015 )
1016 if vld_instantiation_params:
1017 vld_params.update(vld_instantiation_params)
1018 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1019
1020 vdur_list = []
1021 for vdur in target_vnf.get("vdur", ()):
1022 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1023 continue # This vdu must not be created
1024 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1025
1026 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1027
1028 if ssh_keys_all:
1029 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1030 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1031 if (
1032 vdu_configuration
1033 and vdu_configuration.get("config-access")
1034 and vdu_configuration.get("config-access").get("ssh-access")
1035 ):
1036 vdur["ssh-keys"] = ssh_keys_all
1037 vdur["ssh-access-required"] = vdu_configuration[
1038 "config-access"
1039 ]["ssh-access"]["required"]
1040 elif (
1041 vnf_configuration
1042 and vnf_configuration.get("config-access")
1043 and vnf_configuration.get("config-access").get("ssh-access")
1044 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1045 ):
1046 vdur["ssh-keys"] = ssh_keys_all
1047 vdur["ssh-access-required"] = vnf_configuration[
1048 "config-access"
1049 ]["ssh-access"]["required"]
1050 elif ssh_keys_instantiation and find_in_list(
1051 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1052 ):
1053 vdur["ssh-keys"] = ssh_keys_instantiation
1054
1055 self.logger.debug("NS > vdur > {}".format(vdur))
1056
1057 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1058 # cloud-init
1059 if vdud.get("cloud-init-file"):
1060 vdur["cloud-init"] = "{}:file:{}".format(
1061 vnfd["_id"], vdud.get("cloud-init-file")
1062 )
1063 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1064 if vdur["cloud-init"] not in target["cloud_init_content"]:
1065 base_folder = vnfd["_admin"]["storage"]
1066 cloud_init_file = "{}/{}/cloud_init/{}".format(
1067 base_folder["folder"],
1068 base_folder["pkg-dir"],
1069 vdud.get("cloud-init-file"),
1070 )
1071 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1072 target["cloud_init_content"][
1073 vdur["cloud-init"]
1074 ] = ci_file.read()
1075 elif vdud.get("cloud-init"):
1076 vdur["cloud-init"] = "{}:vdu:{}".format(
1077 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1078 )
1079 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1080 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1081 "cloud-init"
1082 ]
1083 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1084 deploy_params_vdu = self._format_additional_params(
1085 vdur.get("additionalParams") or {}
1086 )
1087 deploy_params_vdu["OSM"] = get_osm_params(
1088 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1089 )
1090 vdur["additionalParams"] = deploy_params_vdu
1091
1092 # flavor
1093 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1094 if target_vim not in ns_flavor["vim_info"]:
1095 ns_flavor["vim_info"][target_vim] = {}
1096
1097 # deal with images
1098 # in case alternative images are provided we must check if they should be applied
1099 # for the vim_type, modify the vim_type taking into account
1100 ns_image_id = int(vdur["ns-image-id"])
1101 if vdur.get("alt-image-ids"):
1102 db_vim = get_vim_account(vnfr["vim-account-id"])
1103 vim_type = db_vim["vim_type"]
1104 for alt_image_id in vdur.get("alt-image-ids"):
1105 ns_alt_image = target["image"][int(alt_image_id)]
1106 if vim_type == ns_alt_image.get("vim-type"):
1107 # must use alternative image
1108 self.logger.debug(
1109 "use alternative image id: {}".format(alt_image_id)
1110 )
1111 ns_image_id = alt_image_id
1112 vdur["ns-image-id"] = ns_image_id
1113 break
1114 ns_image = target["image"][int(ns_image_id)]
1115 if target_vim not in ns_image["vim_info"]:
1116 ns_image["vim_info"][target_vim] = {}
1117
1118 vdur["vim_info"] = {target_vim: {}}
1119 # instantiation parameters
1120 # if vnf_params:
1121 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1122 # vdud["id"]), None)
1123 vdur_list.append(vdur)
1124 target_vnf["vdur"] = vdur_list
1125 target["vnf"].append(target_vnf)
1126
1127 desc = await self.RO.deploy(nsr_id, target)
1128 self.logger.debug("RO return > {}".format(desc))
1129 action_id = desc["action_id"]
1130 await self._wait_ng_ro(
1131 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1132 )
1133
1134 # Updating NSR
1135 db_nsr_update = {
1136 "_admin.deployed.RO.operational-status": "running",
1137 "detailed-status": " ".join(stage),
1138 }
1139 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1141 self._write_op_status(nslcmop_id, stage)
1142 self.logger.debug(
1143 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1144 )
1145 return
1146
1147 async def _wait_ng_ro(
1148 self,
1149 nsr_id,
1150 action_id,
1151 nslcmop_id=None,
1152 start_time=None,
1153 timeout=600,
1154 stage=None,
1155 ):
1156 detailed_status_old = None
1157 db_nsr_update = {}
1158 start_time = start_time or time()
1159 while time() <= start_time + timeout:
1160 desc_status = await self.RO.status(nsr_id, action_id)
1161 self.logger.debug("Wait NG RO > {}".format(desc_status))
1162 if desc_status["status"] == "FAILED":
1163 raise NgRoException(desc_status["details"])
1164 elif desc_status["status"] == "BUILD":
1165 if stage:
1166 stage[2] = "VIM: ({})".format(desc_status["details"])
1167 elif desc_status["status"] == "DONE":
1168 if stage:
1169 stage[2] = "Deployed at VIM"
1170 break
1171 else:
1172 assert False, "ROclient.check_ns_status returns unknown {}".format(
1173 desc_status["status"]
1174 )
1175 if stage and nslcmop_id and stage[2] != detailed_status_old:
1176 detailed_status_old = stage[2]
1177 db_nsr_update["detailed-status"] = " ".join(stage)
1178 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1179 self._write_op_status(nslcmop_id, stage)
1180 await asyncio.sleep(15, loop=self.loop)
1181 else: # timeout_ns_deploy
1182 raise NgRoException("Timeout waiting ns to deploy")
1183
1184 async def _terminate_ng_ro(
1185 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1186 ):
1187 db_nsr_update = {}
1188 failed_detail = []
1189 action_id = None
1190 start_deploy = time()
1191 try:
1192 target = {
1193 "ns": {"vld": []},
1194 "vnf": [],
1195 "image": [],
1196 "flavor": [],
1197 "action_id": nslcmop_id,
1198 }
1199 desc = await self.RO.deploy(nsr_id, target)
1200 action_id = desc["action_id"]
1201 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1202 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1203 self.logger.debug(
1204 logging_text
1205 + "ns terminate action at RO. action_id={}".format(action_id)
1206 )
1207
1208 # wait until done
1209 delete_timeout = 20 * 60 # 20 minutes
1210 await self._wait_ng_ro(
1211 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1212 )
1213
1214 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1215 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1216 # delete all nsr
1217 await self.RO.delete(nsr_id)
1218 except Exception as e:
1219 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1220 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1221 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1222 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1223 self.logger.debug(
1224 logging_text + "RO_action_id={} already deleted".format(action_id)
1225 )
1226 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1227 failed_detail.append("delete conflict: {}".format(e))
1228 self.logger.debug(
1229 logging_text
1230 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1231 )
1232 else:
1233 failed_detail.append("delete error: {}".format(e))
1234 self.logger.error(
1235 logging_text
1236 + "RO_action_id={} delete error: {}".format(action_id, e)
1237 )
1238
1239 if failed_detail:
1240 stage[2] = "Error deleting from VIM"
1241 else:
1242 stage[2] = "Deleted from VIM"
1243 db_nsr_update["detailed-status"] = " ".join(stage)
1244 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1245 self._write_op_status(nslcmop_id, stage)
1246
1247 if failed_detail:
1248 raise LcmException("; ".join(failed_detail))
1249 return
1250
1251 async def instantiate_RO(
1252 self,
1253 logging_text,
1254 nsr_id,
1255 nsd,
1256 db_nsr,
1257 db_nslcmop,
1258 db_vnfrs,
1259 db_vnfds,
1260 n2vc_key_list,
1261 stage,
1262 ):
1263 """
1264 Instantiate at RO
1265 :param logging_text: preffix text to use at logging
1266 :param nsr_id: nsr identity
1267 :param nsd: database content of ns descriptor
1268 :param db_nsr: database content of ns record
1269 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1270 :param db_vnfrs:
1271 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1272 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1273 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1274 :return: None or exception
1275 """
1276 try:
1277 start_deploy = time()
1278 ns_params = db_nslcmop.get("operationParams")
1279 if ns_params and ns_params.get("timeout_ns_deploy"):
1280 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1281 else:
1282 timeout_ns_deploy = self.timeout.get(
1283 "ns_deploy", self.timeout_ns_deploy
1284 )
1285
1286 # Check for and optionally request placement optimization. Database will be updated if placement activated
1287 stage[2] = "Waiting for Placement."
1288 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1289 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1290 for vnfr in db_vnfrs.values():
1291 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1292 break
1293 else:
1294 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1295
1296 return await self._instantiate_ng_ro(
1297 logging_text,
1298 nsr_id,
1299 nsd,
1300 db_nsr,
1301 db_nslcmop,
1302 db_vnfrs,
1303 db_vnfds,
1304 n2vc_key_list,
1305 stage,
1306 start_deploy,
1307 timeout_ns_deploy,
1308 )
1309 except Exception as e:
1310 stage[2] = "ERROR deploying at VIM"
1311 self.set_vnfr_at_error(db_vnfrs, str(e))
1312 self.logger.error(
1313 "Error deploying at VIM {}".format(e),
1314 exc_info=not isinstance(
1315 e,
1316 (
1317 ROclient.ROClientException,
1318 LcmException,
1319 DbException,
1320 NgRoException,
1321 ),
1322 ),
1323 )
1324 raise
1325
1326 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1327 """
1328 Wait for kdu to be up, get ip address
1329 :param logging_text: prefix use for logging
1330 :param nsr_id:
1331 :param vnfr_id:
1332 :param kdu_name:
1333 :return: IP address
1334 """
1335
1336 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1337 nb_tries = 0
1338
1339 while nb_tries < 360:
1340 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1341 kdur = next(
1342 (
1343 x
1344 for x in get_iterable(db_vnfr, "kdur")
1345 if x.get("kdu-name") == kdu_name
1346 ),
1347 None,
1348 )
1349 if not kdur:
1350 raise LcmException(
1351 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1352 )
1353 if kdur.get("status"):
1354 if kdur["status"] in ("READY", "ENABLED"):
1355 return kdur.get("ip-address")
1356 else:
1357 raise LcmException(
1358 "target KDU={} is in error state".format(kdu_name)
1359 )
1360
1361 await asyncio.sleep(10, loop=self.loop)
1362 nb_tries += 1
1363 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1364
1365 async def wait_vm_up_insert_key_ro(
1366 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1367 ):
1368 """
1369 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1370 :param logging_text: prefix use for logging
1371 :param nsr_id:
1372 :param vnfr_id:
1373 :param vdu_id:
1374 :param vdu_index:
1375 :param pub_key: public ssh key to inject, None to skip
1376 :param user: user to apply the public ssh key
1377 :return: IP address
1378 """
1379
1380 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1381 ro_nsr_id = None
1382 ip_address = None
1383 nb_tries = 0
1384 target_vdu_id = None
1385 ro_retries = 0
1386
1387 while True:
1388
1389 ro_retries += 1
1390 if ro_retries >= 360: # 1 hour
1391 raise LcmException(
1392 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1393 )
1394
1395 await asyncio.sleep(10, loop=self.loop)
1396
1397 # get ip address
1398 if not target_vdu_id:
1399 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1400
1401 if not vdu_id: # for the VNF case
1402 if db_vnfr.get("status") == "ERROR":
1403 raise LcmException(
1404 "Cannot inject ssh-key because target VNF is in error state"
1405 )
1406 ip_address = db_vnfr.get("ip-address")
1407 if not ip_address:
1408 continue
1409 vdur = next(
1410 (
1411 x
1412 for x in get_iterable(db_vnfr, "vdur")
1413 if x.get("ip-address") == ip_address
1414 ),
1415 None,
1416 )
1417 else: # VDU case
1418 vdur = next(
1419 (
1420 x
1421 for x in get_iterable(db_vnfr, "vdur")
1422 if x.get("vdu-id-ref") == vdu_id
1423 and x.get("count-index") == vdu_index
1424 ),
1425 None,
1426 )
1427
1428 if (
1429 not vdur and len(db_vnfr.get("vdur", ())) == 1
1430 ): # If only one, this should be the target vdu
1431 vdur = db_vnfr["vdur"][0]
1432 if not vdur:
1433 raise LcmException(
1434 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1435 vnfr_id, vdu_id, vdu_index
1436 )
1437 )
1438 # New generation RO stores information at "vim_info"
1439 ng_ro_status = None
1440 target_vim = None
1441 if vdur.get("vim_info"):
1442 target_vim = next(
1443 t for t in vdur["vim_info"]
1444 ) # there should be only one key
1445 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1446 if (
1447 vdur.get("pdu-type")
1448 or vdur.get("status") == "ACTIVE"
1449 or ng_ro_status == "ACTIVE"
1450 ):
1451 ip_address = vdur.get("ip-address")
1452 if not ip_address:
1453 continue
1454 target_vdu_id = vdur["vdu-id-ref"]
1455 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1456 raise LcmException(
1457 "Cannot inject ssh-key because target VM is in error state"
1458 )
1459
1460 if not target_vdu_id:
1461 continue
1462
1463 # inject public key into machine
1464 if pub_key and user:
1465 self.logger.debug(logging_text + "Inserting RO key")
1466 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1467 if vdur.get("pdu-type"):
1468 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1469 return ip_address
1470 try:
1471 ro_vm_id = "{}-{}".format(
1472 db_vnfr["member-vnf-index-ref"], target_vdu_id
1473 ) # TODO add vdu_index
1474 if self.ng_ro:
1475 target = {
1476 "action": {
1477 "action": "inject_ssh_key",
1478 "key": pub_key,
1479 "user": user,
1480 },
1481 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1482 }
1483 desc = await self.RO.deploy(nsr_id, target)
1484 action_id = desc["action_id"]
1485 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1486 break
1487 else:
1488 # wait until NS is deployed at RO
1489 if not ro_nsr_id:
1490 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1491 ro_nsr_id = deep_get(
1492 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1493 )
1494 if not ro_nsr_id:
1495 continue
1496 result_dict = await self.RO.create_action(
1497 item="ns",
1498 item_id_name=ro_nsr_id,
1499 descriptor={
1500 "add_public_key": pub_key,
1501 "vms": [ro_vm_id],
1502 "user": user,
1503 },
1504 )
1505 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1506 if not result_dict or not isinstance(result_dict, dict):
1507 raise LcmException(
1508 "Unknown response from RO when injecting key"
1509 )
1510 for result in result_dict.values():
1511 if result.get("vim_result") == 200:
1512 break
1513 else:
1514 raise ROclient.ROClientException(
1515 "error injecting key: {}".format(
1516 result.get("description")
1517 )
1518 )
1519 break
1520 except NgRoException as e:
1521 raise LcmException(
1522 "Reaching max tries injecting key. Error: {}".format(e)
1523 )
1524 except ROclient.ROClientException as e:
1525 if not nb_tries:
1526 self.logger.debug(
1527 logging_text
1528 + "error injecting key: {}. Retrying until {} seconds".format(
1529 e, 20 * 10
1530 )
1531 )
1532 nb_tries += 1
1533 if nb_tries >= 20:
1534 raise LcmException(
1535 "Reaching max tries injecting key. Error: {}".format(e)
1536 )
1537 else:
1538 break
1539
1540 return ip_address
1541
1542 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1543 """
1544 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1545 """
1546 my_vca = vca_deployed_list[vca_index]
1547 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1548 # vdu or kdu: no dependencies
1549 return
1550 timeout = 300
1551 while timeout >= 0:
1552 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1553 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1554 configuration_status_list = db_nsr["configurationStatus"]
1555 for index, vca_deployed in enumerate(configuration_status_list):
1556 if index == vca_index:
1557 # myself
1558 continue
1559 if not my_vca.get("member-vnf-index") or (
1560 vca_deployed.get("member-vnf-index")
1561 == my_vca.get("member-vnf-index")
1562 ):
1563 internal_status = configuration_status_list[index].get("status")
1564 if internal_status == "READY":
1565 continue
1566 elif internal_status == "BROKEN":
1567 raise LcmException(
1568 "Configuration aborted because dependent charm/s has failed"
1569 )
1570 else:
1571 break
1572 else:
1573 # no dependencies, return
1574 return
1575 await asyncio.sleep(10)
1576 timeout -= 1
1577
1578 raise LcmException("Configuration aborted because dependent charm/s timeout")
1579
1580 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1581 vca_id = None
1582 if db_vnfr:
1583 vca_id = deep_get(db_vnfr, ("vca-id",))
1584 elif db_nsr:
1585 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1586 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1587 return vca_id
1588
1589 async def instantiate_N2VC(
1590 self,
1591 logging_text,
1592 vca_index,
1593 nsi_id,
1594 db_nsr,
1595 db_vnfr,
1596 vdu_id,
1597 kdu_name,
1598 vdu_index,
1599 config_descriptor,
1600 deploy_params,
1601 base_folder,
1602 nslcmop_id,
1603 stage,
1604 vca_type,
1605 vca_name,
1606 ee_config_descriptor,
1607 ):
1608 nsr_id = db_nsr["_id"]
1609 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1610 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1611 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1612 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1613 db_dict = {
1614 "collection": "nsrs",
1615 "filter": {"_id": nsr_id},
1616 "path": db_update_entry,
1617 }
1618 step = ""
1619 try:
1620
1621 element_type = "NS"
1622 element_under_configuration = nsr_id
1623
1624 vnfr_id = None
1625 if db_vnfr:
1626 vnfr_id = db_vnfr["_id"]
1627 osm_config["osm"]["vnf_id"] = vnfr_id
1628
1629 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1630
1631 if vca_type == "native_charm":
1632 index_number = 0
1633 else:
1634 index_number = vdu_index or 0
1635
1636 if vnfr_id:
1637 element_type = "VNF"
1638 element_under_configuration = vnfr_id
1639 namespace += ".{}-{}".format(vnfr_id, index_number)
1640 if vdu_id:
1641 namespace += ".{}-{}".format(vdu_id, index_number)
1642 element_type = "VDU"
1643 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1644 osm_config["osm"]["vdu_id"] = vdu_id
1645 elif kdu_name:
1646 namespace += ".{}".format(kdu_name)
1647 element_type = "KDU"
1648 element_under_configuration = kdu_name
1649 osm_config["osm"]["kdu_name"] = kdu_name
1650
1651 # Get artifact path
1652 artifact_path = "{}/{}/{}/{}".format(
1653 base_folder["folder"],
1654 base_folder["pkg-dir"],
1655 "charms"
1656 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1657 else "helm-charts",
1658 vca_name,
1659 )
1660
1661 self.logger.debug("Artifact path > {}".format(artifact_path))
1662
1663 # get initial_config_primitive_list that applies to this element
1664 initial_config_primitive_list = config_descriptor.get(
1665 "initial-config-primitive"
1666 )
1667
1668 self.logger.debug(
1669 "Initial config primitive list > {}".format(
1670 initial_config_primitive_list
1671 )
1672 )
1673
1674 # add config if not present for NS charm
1675 ee_descriptor_id = ee_config_descriptor.get("id")
1676 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1677 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1678 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1679 )
1680
1681 self.logger.debug(
1682 "Initial config primitive list #2 > {}".format(
1683 initial_config_primitive_list
1684 )
1685 )
1686 # n2vc_redesign STEP 3.1
1687 # find old ee_id if exists
1688 ee_id = vca_deployed.get("ee_id")
1689
1690 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1691 # create or register execution environment in VCA
1692 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1693
1694 self._write_configuration_status(
1695 nsr_id=nsr_id,
1696 vca_index=vca_index,
1697 status="CREATING",
1698 element_under_configuration=element_under_configuration,
1699 element_type=element_type,
1700 )
1701
1702 step = "create execution environment"
1703 self.logger.debug(logging_text + step)
1704
1705 ee_id = None
1706 credentials = None
1707 if vca_type == "k8s_proxy_charm":
1708 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1709 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1710 namespace=namespace,
1711 artifact_path=artifact_path,
1712 db_dict=db_dict,
1713 vca_id=vca_id,
1714 )
1715 elif vca_type == "helm" or vca_type == "helm-v3":
1716 ee_id, credentials = await self.vca_map[
1717 vca_type
1718 ].create_execution_environment(
1719 namespace=namespace,
1720 reuse_ee_id=ee_id,
1721 db_dict=db_dict,
1722 config=osm_config,
1723 artifact_path=artifact_path,
1724 vca_type=vca_type,
1725 )
1726 else:
1727 ee_id, credentials = await self.vca_map[
1728 vca_type
1729 ].create_execution_environment(
1730 namespace=namespace,
1731 reuse_ee_id=ee_id,
1732 db_dict=db_dict,
1733 vca_id=vca_id,
1734 )
1735
1736 elif vca_type == "native_charm":
1737 step = "Waiting to VM being up and getting IP address"
1738 self.logger.debug(logging_text + step)
1739 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1740 logging_text,
1741 nsr_id,
1742 vnfr_id,
1743 vdu_id,
1744 vdu_index,
1745 user=None,
1746 pub_key=None,
1747 )
1748 credentials = {"hostname": rw_mgmt_ip}
1749 # get username
1750 username = deep_get(
1751 config_descriptor, ("config-access", "ssh-access", "default-user")
1752 )
1753 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1754 # merged. Meanwhile let's get username from initial-config-primitive
1755 if not username and initial_config_primitive_list:
1756 for config_primitive in initial_config_primitive_list:
1757 for param in config_primitive.get("parameter", ()):
1758 if param["name"] == "ssh-username":
1759 username = param["value"]
1760 break
1761 if not username:
1762 raise LcmException(
1763 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1764 "'config-access.ssh-access.default-user'"
1765 )
1766 credentials["username"] = username
1767 # n2vc_redesign STEP 3.2
1768
1769 self._write_configuration_status(
1770 nsr_id=nsr_id,
1771 vca_index=vca_index,
1772 status="REGISTERING",
1773 element_under_configuration=element_under_configuration,
1774 element_type=element_type,
1775 )
1776
1777 step = "register execution environment {}".format(credentials)
1778 self.logger.debug(logging_text + step)
1779 ee_id = await self.vca_map[vca_type].register_execution_environment(
1780 credentials=credentials,
1781 namespace=namespace,
1782 db_dict=db_dict,
1783 vca_id=vca_id,
1784 )
1785
1786 # for compatibility with MON/POL modules, the need model and application name at database
1787 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1788 ee_id_parts = ee_id.split(".")
1789 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1790 if len(ee_id_parts) >= 2:
1791 model_name = ee_id_parts[0]
1792 application_name = ee_id_parts[1]
1793 db_nsr_update[db_update_entry + "model"] = model_name
1794 db_nsr_update[db_update_entry + "application"] = application_name
1795
1796 # n2vc_redesign STEP 3.3
1797 step = "Install configuration Software"
1798
1799 self._write_configuration_status(
1800 nsr_id=nsr_id,
1801 vca_index=vca_index,
1802 status="INSTALLING SW",
1803 element_under_configuration=element_under_configuration,
1804 element_type=element_type,
1805 other_update=db_nsr_update,
1806 )
1807
1808 # TODO check if already done
1809 self.logger.debug(logging_text + step)
1810 config = None
1811 if vca_type == "native_charm":
1812 config_primitive = next(
1813 (p for p in initial_config_primitive_list if p["name"] == "config"),
1814 None,
1815 )
1816 if config_primitive:
1817 config = self._map_primitive_params(
1818 config_primitive, {}, deploy_params
1819 )
1820 num_units = 1
1821 if vca_type == "lxc_proxy_charm":
1822 if element_type == "NS":
1823 num_units = db_nsr.get("config-units") or 1
1824 elif element_type == "VNF":
1825 num_units = db_vnfr.get("config-units") or 1
1826 elif element_type == "VDU":
1827 for v in db_vnfr["vdur"]:
1828 if vdu_id == v["vdu-id-ref"]:
1829 num_units = v.get("config-units") or 1
1830 break
1831 if vca_type != "k8s_proxy_charm":
1832 await self.vca_map[vca_type].install_configuration_sw(
1833 ee_id=ee_id,
1834 artifact_path=artifact_path,
1835 db_dict=db_dict,
1836 config=config,
1837 num_units=num_units,
1838 vca_id=vca_id,
1839 vca_type=vca_type,
1840 )
1841
1842 # write in db flag of configuration_sw already installed
1843 self.update_db_2(
1844 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1845 )
1846
1847 # add relations for this VCA (wait for other peers related with this VCA)
1848 await self._add_vca_relations(
1849 logging_text=logging_text,
1850 nsr_id=nsr_id,
1851 vca_type=vca_type,
1852 vca_index=vca_index,
1853 )
1854
1855 # if SSH access is required, then get execution environment SSH public
1856 # if native charm we have waited already to VM be UP
1857 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1858 pub_key = None
1859 user = None
1860 # self.logger.debug("get ssh key block")
1861 if deep_get(
1862 config_descriptor, ("config-access", "ssh-access", "required")
1863 ):
1864 # self.logger.debug("ssh key needed")
1865 # Needed to inject a ssh key
1866 user = deep_get(
1867 config_descriptor,
1868 ("config-access", "ssh-access", "default-user"),
1869 )
1870 step = "Install configuration Software, getting public ssh key"
1871 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1872 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1873 )
1874
1875 step = "Insert public key into VM user={} ssh_key={}".format(
1876 user, pub_key
1877 )
1878 else:
1879 # self.logger.debug("no need to get ssh key")
1880 step = "Waiting to VM being up and getting IP address"
1881 self.logger.debug(logging_text + step)
1882
1883 # n2vc_redesign STEP 5.1
1884 # wait for RO (ip-address) Insert pub_key into VM
1885 if vnfr_id:
1886 if kdu_name:
1887 rw_mgmt_ip = await self.wait_kdu_up(
1888 logging_text, nsr_id, vnfr_id, kdu_name
1889 )
1890 else:
1891 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1892 logging_text,
1893 nsr_id,
1894 vnfr_id,
1895 vdu_id,
1896 vdu_index,
1897 user=user,
1898 pub_key=pub_key,
1899 )
1900 else:
1901 rw_mgmt_ip = None # This is for a NS configuration
1902
1903 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1904
1905 # store rw_mgmt_ip in deploy params for later replacement
1906 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1907
1908 # n2vc_redesign STEP 6 Execute initial config primitive
1909 step = "execute initial config primitive"
1910
1911 # wait for dependent primitives execution (NS -> VNF -> VDU)
1912 if initial_config_primitive_list:
1913 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1914
1915 # stage, in function of element type: vdu, kdu, vnf or ns
1916 my_vca = vca_deployed_list[vca_index]
1917 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1918 # VDU or KDU
1919 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1920 elif my_vca.get("member-vnf-index"):
1921 # VNF
1922 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1923 else:
1924 # NS
1925 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1926
1927 self._write_configuration_status(
1928 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1929 )
1930
1931 self._write_op_status(op_id=nslcmop_id, stage=stage)
1932
1933 check_if_terminated_needed = True
1934 for initial_config_primitive in initial_config_primitive_list:
1935 # adding information on the vca_deployed if it is a NS execution environment
1936 if not vca_deployed["member-vnf-index"]:
1937 deploy_params["ns_config_info"] = json.dumps(
1938 self._get_ns_config_info(nsr_id)
1939 )
1940 # TODO check if already done
1941 primitive_params_ = self._map_primitive_params(
1942 initial_config_primitive, {}, deploy_params
1943 )
1944
1945 step = "execute primitive '{}' params '{}'".format(
1946 initial_config_primitive["name"], primitive_params_
1947 )
1948 self.logger.debug(logging_text + step)
1949 await self.vca_map[vca_type].exec_primitive(
1950 ee_id=ee_id,
1951 primitive_name=initial_config_primitive["name"],
1952 params_dict=primitive_params_,
1953 db_dict=db_dict,
1954 vca_id=vca_id,
1955 vca_type=vca_type,
1956 )
1957 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1958 if check_if_terminated_needed:
1959 if config_descriptor.get("terminate-config-primitive"):
1960 self.update_db_2(
1961 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
1962 )
1963 check_if_terminated_needed = False
1964
1965 # TODO register in database that primitive is done
1966
1967 # STEP 7 Configure metrics
1968 if vca_type == "helm" or vca_type == "helm-v3":
1969 prometheus_jobs = await self.add_prometheus_metrics(
1970 ee_id=ee_id,
1971 artifact_path=artifact_path,
1972 ee_config_descriptor=ee_config_descriptor,
1973 vnfr_id=vnfr_id,
1974 nsr_id=nsr_id,
1975 target_ip=rw_mgmt_ip,
1976 )
1977 if prometheus_jobs:
1978 self.update_db_2(
1979 "nsrs",
1980 nsr_id,
1981 {db_update_entry + "prometheus_jobs": prometheus_jobs},
1982 )
1983
1984 step = "instantiated at VCA"
1985 self.logger.debug(logging_text + step)
1986
1987 self._write_configuration_status(
1988 nsr_id=nsr_id, vca_index=vca_index, status="READY"
1989 )
1990
1991 except Exception as e: # TODO not use Exception but N2VC exception
1992 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1993 if not isinstance(
1994 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
1995 ):
1996 self.logger.error(
1997 "Exception while {} : {}".format(step, e), exc_info=True
1998 )
1999 self._write_configuration_status(
2000 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2001 )
2002 raise LcmException("{} {}".format(step, e)) from e
2003
2004 def _write_ns_status(
2005 self,
2006 nsr_id: str,
2007 ns_state: str,
2008 current_operation: str,
2009 current_operation_id: str,
2010 error_description: str = None,
2011 error_detail: str = None,
2012 other_update: dict = None,
2013 ):
2014 """
2015 Update db_nsr fields.
2016 :param nsr_id:
2017 :param ns_state:
2018 :param current_operation:
2019 :param current_operation_id:
2020 :param error_description:
2021 :param error_detail:
2022 :param other_update: Other required changes at database if provided, will be cleared
2023 :return:
2024 """
2025 try:
2026 db_dict = other_update or {}
2027 db_dict[
2028 "_admin.nslcmop"
2029 ] = current_operation_id # for backward compatibility
2030 db_dict["_admin.current-operation"] = current_operation_id
2031 db_dict["_admin.operation-type"] = (
2032 current_operation if current_operation != "IDLE" else None
2033 )
2034 db_dict["currentOperation"] = current_operation
2035 db_dict["currentOperationID"] = current_operation_id
2036 db_dict["errorDescription"] = error_description
2037 db_dict["errorDetail"] = error_detail
2038
2039 if ns_state:
2040 db_dict["nsState"] = ns_state
2041 self.update_db_2("nsrs", nsr_id, db_dict)
2042 except DbException as e:
2043 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2044
2045 def _write_op_status(
2046 self,
2047 op_id: str,
2048 stage: list = None,
2049 error_message: str = None,
2050 queuePosition: int = 0,
2051 operation_state: str = None,
2052 other_update: dict = None,
2053 ):
2054 try:
2055 db_dict = other_update or {}
2056 db_dict["queuePosition"] = queuePosition
2057 if isinstance(stage, list):
2058 db_dict["stage"] = stage[0]
2059 db_dict["detailed-status"] = " ".join(stage)
2060 elif stage is not None:
2061 db_dict["stage"] = str(stage)
2062
2063 if error_message is not None:
2064 db_dict["errorMessage"] = error_message
2065 if operation_state is not None:
2066 db_dict["operationState"] = operation_state
2067 db_dict["statusEnteredTime"] = time()
2068 self.update_db_2("nslcmops", op_id, db_dict)
2069 except DbException as e:
2070 self.logger.warn(
2071 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2072 )
2073
2074 def _write_all_config_status(self, db_nsr: dict, status: str):
2075 try:
2076 nsr_id = db_nsr["_id"]
2077 # configurationStatus
2078 config_status = db_nsr.get("configurationStatus")
2079 if config_status:
2080 db_nsr_update = {
2081 "configurationStatus.{}.status".format(index): status
2082 for index, v in enumerate(config_status)
2083 if v
2084 }
2085 # update status
2086 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2087
2088 except DbException as e:
2089 self.logger.warn(
2090 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2091 )
2092
2093 def _write_configuration_status(
2094 self,
2095 nsr_id: str,
2096 vca_index: int,
2097 status: str = None,
2098 element_under_configuration: str = None,
2099 element_type: str = None,
2100 other_update: dict = None,
2101 ):
2102
2103 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2104 # .format(vca_index, status))
2105
2106 try:
2107 db_path = "configurationStatus.{}.".format(vca_index)
2108 db_dict = other_update or {}
2109 if status:
2110 db_dict[db_path + "status"] = status
2111 if element_under_configuration:
2112 db_dict[
2113 db_path + "elementUnderConfiguration"
2114 ] = element_under_configuration
2115 if element_type:
2116 db_dict[db_path + "elementType"] = element_type
2117 self.update_db_2("nsrs", nsr_id, db_dict)
2118 except DbException as e:
2119 self.logger.warn(
2120 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2121 status, nsr_id, vca_index, e
2122 )
2123 )
2124
2125 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2126 """
2127 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2128 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2129 Database is used because the result can be obtained from a different LCM worker in case of HA.
2130 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2131 :param db_nslcmop: database content of nslcmop
2132 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2133 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2134 computed 'vim-account-id'
2135 """
2136 modified = False
2137 nslcmop_id = db_nslcmop["_id"]
2138 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2139 if placement_engine == "PLA":
2140 self.logger.debug(
2141 logging_text + "Invoke and wait for placement optimization"
2142 )
2143 await self.msg.aiowrite(
2144 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2145 )
2146 db_poll_interval = 5
2147 wait = db_poll_interval * 10
2148 pla_result = None
2149 while not pla_result and wait >= 0:
2150 await asyncio.sleep(db_poll_interval)
2151 wait -= db_poll_interval
2152 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2153 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2154
2155 if not pla_result:
2156 raise LcmException(
2157 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2158 )
2159
2160 for pla_vnf in pla_result["vnf"]:
2161 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2162 if not pla_vnf.get("vimAccountId") or not vnfr:
2163 continue
2164 modified = True
2165 self.db.set_one(
2166 "vnfrs",
2167 {"_id": vnfr["_id"]},
2168 {"vim-account-id": pla_vnf["vimAccountId"]},
2169 )
2170 # Modifies db_vnfrs
2171 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2172 return modified
2173
2174 def update_nsrs_with_pla_result(self, params):
2175 try:
2176 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2177 self.update_db_2(
2178 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2179 )
2180 except Exception as e:
2181 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2182
2183 async def instantiate(self, nsr_id, nslcmop_id):
2184 """
2185
2186 :param nsr_id: ns instance to deploy
2187 :param nslcmop_id: operation to run
2188 :return:
2189 """
2190
2191 # Try to lock HA task here
2192 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2193 if not task_is_locked_by_me:
2194 self.logger.debug(
2195 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2196 )
2197 return
2198
2199 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2200 self.logger.debug(logging_text + "Enter")
2201
2202 # get all needed from database
2203
2204 # database nsrs record
2205 db_nsr = None
2206
2207 # database nslcmops record
2208 db_nslcmop = None
2209
2210 # update operation on nsrs
2211 db_nsr_update = {}
2212 # update operation on nslcmops
2213 db_nslcmop_update = {}
2214
2215 nslcmop_operation_state = None
2216 db_vnfrs = {} # vnf's info indexed by member-index
2217 # n2vc_info = {}
2218 tasks_dict_info = {} # from task to info text
2219 exc = None
2220 error_list = []
2221 stage = [
2222 "Stage 1/5: preparation of the environment.",
2223 "Waiting for previous operations to terminate.",
2224 "",
2225 ]
2226 # ^ stage, step, VIM progress
2227 try:
2228 # wait for any previous tasks in process
2229 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2230
2231 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2232 stage[1] = "Reading from database."
2233 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2234 db_nsr_update["detailed-status"] = "creating"
2235 db_nsr_update["operational-status"] = "init"
2236 self._write_ns_status(
2237 nsr_id=nsr_id,
2238 ns_state="BUILDING",
2239 current_operation="INSTANTIATING",
2240 current_operation_id=nslcmop_id,
2241 other_update=db_nsr_update,
2242 )
2243 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2244
2245 # read from db: operation
2246 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2247 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2248 ns_params = db_nslcmop.get("operationParams")
2249 if ns_params and ns_params.get("timeout_ns_deploy"):
2250 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2251 else:
2252 timeout_ns_deploy = self.timeout.get(
2253 "ns_deploy", self.timeout_ns_deploy
2254 )
2255
2256 # read from db: ns
2257 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2258 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2259 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2260 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2261 self.fs.sync(db_nsr["nsd-id"])
2262 db_nsr["nsd"] = nsd
2263 # nsr_name = db_nsr["name"] # TODO short-name??
2264
2265 # read from db: vnf's of this ns
2266 stage[1] = "Getting vnfrs from db."
2267 self.logger.debug(logging_text + stage[1])
2268 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2269
2270 # read from db: vnfd's for every vnf
2271 db_vnfds = [] # every vnfd data
2272
2273 # for each vnf in ns, read vnfd
2274 for vnfr in db_vnfrs_list:
2275 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2276 vnfd_id = vnfr["vnfd-id"]
2277 vnfd_ref = vnfr["vnfd-ref"]
2278 self.fs.sync(vnfd_id)
2279
2280 # if we haven't this vnfd, read it from db
2281 if vnfd_id not in db_vnfds:
2282 # read from db
2283 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2284 vnfd_id, vnfd_ref
2285 )
2286 self.logger.debug(logging_text + stage[1])
2287 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2288
2289 # store vnfd
2290 db_vnfds.append(vnfd)
2291
2292 # Get or generates the _admin.deployed.VCA list
2293 vca_deployed_list = None
2294 if db_nsr["_admin"].get("deployed"):
2295 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2296 if vca_deployed_list is None:
2297 vca_deployed_list = []
2298 configuration_status_list = []
2299 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2300 db_nsr_update["configurationStatus"] = configuration_status_list
2301 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2302 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2303 elif isinstance(vca_deployed_list, dict):
2304 # maintain backward compatibility. Change a dict to list at database
2305 vca_deployed_list = list(vca_deployed_list.values())
2306 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2307 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2308
2309 if not isinstance(
2310 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2311 ):
2312 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2313 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2314
2315 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2316 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2317 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2318 self.db.set_list(
2319 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2320 )
2321
2322 # n2vc_redesign STEP 2 Deploy Network Scenario
2323 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2324 self._write_op_status(op_id=nslcmop_id, stage=stage)
2325
2326 stage[1] = "Deploying KDUs."
2327 # self.logger.debug(logging_text + "Before deploy_kdus")
2328 # Call to deploy_kdus in case exists the "vdu:kdu" param
2329 await self.deploy_kdus(
2330 logging_text=logging_text,
2331 nsr_id=nsr_id,
2332 nslcmop_id=nslcmop_id,
2333 db_vnfrs=db_vnfrs,
2334 db_vnfds=db_vnfds,
2335 task_instantiation_info=tasks_dict_info,
2336 )
2337
2338 stage[1] = "Getting VCA public key."
2339 # n2vc_redesign STEP 1 Get VCA public ssh-key
2340 # feature 1429. Add n2vc public key to needed VMs
2341 n2vc_key = self.n2vc.get_public_key()
2342 n2vc_key_list = [n2vc_key]
2343 if self.vca_config.get("public_key"):
2344 n2vc_key_list.append(self.vca_config["public_key"])
2345
2346 stage[1] = "Deploying NS at VIM."
2347 task_ro = asyncio.ensure_future(
2348 self.instantiate_RO(
2349 logging_text=logging_text,
2350 nsr_id=nsr_id,
2351 nsd=nsd,
2352 db_nsr=db_nsr,
2353 db_nslcmop=db_nslcmop,
2354 db_vnfrs=db_vnfrs,
2355 db_vnfds=db_vnfds,
2356 n2vc_key_list=n2vc_key_list,
2357 stage=stage,
2358 )
2359 )
2360 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2361 tasks_dict_info[task_ro] = "Deploying at VIM"
2362
2363 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2364 stage[1] = "Deploying Execution Environments."
2365 self.logger.debug(logging_text + stage[1])
2366
2367 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2368 for vnf_profile in get_vnf_profiles(nsd):
2369 vnfd_id = vnf_profile["vnfd-id"]
2370 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2371 member_vnf_index = str(vnf_profile["id"])
2372 db_vnfr = db_vnfrs[member_vnf_index]
2373 base_folder = vnfd["_admin"]["storage"]
2374 vdu_id = None
2375 vdu_index = 0
2376 vdu_name = None
2377 kdu_name = None
2378
2379 # Get additional parameters
2380 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2381 if db_vnfr.get("additionalParamsForVnf"):
2382 deploy_params.update(
2383 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2384 )
2385
2386 descriptor_config = get_configuration(vnfd, vnfd["id"])
2387 if descriptor_config:
2388 self._deploy_n2vc(
2389 logging_text=logging_text
2390 + "member_vnf_index={} ".format(member_vnf_index),
2391 db_nsr=db_nsr,
2392 db_vnfr=db_vnfr,
2393 nslcmop_id=nslcmop_id,
2394 nsr_id=nsr_id,
2395 nsi_id=nsi_id,
2396 vnfd_id=vnfd_id,
2397 vdu_id=vdu_id,
2398 kdu_name=kdu_name,
2399 member_vnf_index=member_vnf_index,
2400 vdu_index=vdu_index,
2401 vdu_name=vdu_name,
2402 deploy_params=deploy_params,
2403 descriptor_config=descriptor_config,
2404 base_folder=base_folder,
2405 task_instantiation_info=tasks_dict_info,
2406 stage=stage,
2407 )
2408
2409 # Deploy charms for each VDU that supports one.
2410 for vdud in get_vdu_list(vnfd):
2411 vdu_id = vdud["id"]
2412 descriptor_config = get_configuration(vnfd, vdu_id)
2413 vdur = find_in_list(
2414 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2415 )
2416
2417 if vdur.get("additionalParams"):
2418 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2419 else:
2420 deploy_params_vdu = deploy_params
2421 deploy_params_vdu["OSM"] = get_osm_params(
2422 db_vnfr, vdu_id, vdu_count_index=0
2423 )
2424 vdud_count = get_number_of_instances(vnfd, vdu_id)
2425
2426 self.logger.debug("VDUD > {}".format(vdud))
2427 self.logger.debug(
2428 "Descriptor config > {}".format(descriptor_config)
2429 )
2430 if descriptor_config:
2431 vdu_name = None
2432 kdu_name = None
2433 for vdu_index in range(vdud_count):
2434 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2435 self._deploy_n2vc(
2436 logging_text=logging_text
2437 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2438 member_vnf_index, vdu_id, vdu_index
2439 ),
2440 db_nsr=db_nsr,
2441 db_vnfr=db_vnfr,
2442 nslcmop_id=nslcmop_id,
2443 nsr_id=nsr_id,
2444 nsi_id=nsi_id,
2445 vnfd_id=vnfd_id,
2446 vdu_id=vdu_id,
2447 kdu_name=kdu_name,
2448 member_vnf_index=member_vnf_index,
2449 vdu_index=vdu_index,
2450 vdu_name=vdu_name,
2451 deploy_params=deploy_params_vdu,
2452 descriptor_config=descriptor_config,
2453 base_folder=base_folder,
2454 task_instantiation_info=tasks_dict_info,
2455 stage=stage,
2456 )
2457 for kdud in get_kdu_list(vnfd):
2458 kdu_name = kdud["name"]
2459 descriptor_config = get_configuration(vnfd, kdu_name)
2460 if descriptor_config:
2461 vdu_id = None
2462 vdu_index = 0
2463 vdu_name = None
2464 kdur = next(
2465 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2466 )
2467 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2468 if kdur.get("additionalParams"):
2469 deploy_params_kdu = parse_yaml_strings(
2470 kdur["additionalParams"]
2471 )
2472
2473 self._deploy_n2vc(
2474 logging_text=logging_text,
2475 db_nsr=db_nsr,
2476 db_vnfr=db_vnfr,
2477 nslcmop_id=nslcmop_id,
2478 nsr_id=nsr_id,
2479 nsi_id=nsi_id,
2480 vnfd_id=vnfd_id,
2481 vdu_id=vdu_id,
2482 kdu_name=kdu_name,
2483 member_vnf_index=member_vnf_index,
2484 vdu_index=vdu_index,
2485 vdu_name=vdu_name,
2486 deploy_params=deploy_params_kdu,
2487 descriptor_config=descriptor_config,
2488 base_folder=base_folder,
2489 task_instantiation_info=tasks_dict_info,
2490 stage=stage,
2491 )
2492
2493 # Check if this NS has a charm configuration
2494 descriptor_config = nsd.get("ns-configuration")
2495 if descriptor_config and descriptor_config.get("juju"):
2496 vnfd_id = None
2497 db_vnfr = None
2498 member_vnf_index = None
2499 vdu_id = None
2500 kdu_name = None
2501 vdu_index = 0
2502 vdu_name = None
2503
2504 # Get additional parameters
2505 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2506 if db_nsr.get("additionalParamsForNs"):
2507 deploy_params.update(
2508 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2509 )
2510 base_folder = nsd["_admin"]["storage"]
2511 self._deploy_n2vc(
2512 logging_text=logging_text,
2513 db_nsr=db_nsr,
2514 db_vnfr=db_vnfr,
2515 nslcmop_id=nslcmop_id,
2516 nsr_id=nsr_id,
2517 nsi_id=nsi_id,
2518 vnfd_id=vnfd_id,
2519 vdu_id=vdu_id,
2520 kdu_name=kdu_name,
2521 member_vnf_index=member_vnf_index,
2522 vdu_index=vdu_index,
2523 vdu_name=vdu_name,
2524 deploy_params=deploy_params,
2525 descriptor_config=descriptor_config,
2526 base_folder=base_folder,
2527 task_instantiation_info=tasks_dict_info,
2528 stage=stage,
2529 )
2530
2531 # rest of staff will be done at finally
2532
2533 except (
2534 ROclient.ROClientException,
2535 DbException,
2536 LcmException,
2537 N2VCException,
2538 ) as e:
2539 self.logger.error(
2540 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2541 )
2542 exc = e
2543 except asyncio.CancelledError:
2544 self.logger.error(
2545 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2546 )
2547 exc = "Operation was cancelled"
2548 except Exception as e:
2549 exc = traceback.format_exc()
2550 self.logger.critical(
2551 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2552 exc_info=True,
2553 )
2554 finally:
2555 if exc:
2556 error_list.append(str(exc))
2557 try:
2558 # wait for pending tasks
2559 if tasks_dict_info:
2560 stage[1] = "Waiting for instantiate pending tasks."
2561 self.logger.debug(logging_text + stage[1])
2562 error_list += await self._wait_for_tasks(
2563 logging_text,
2564 tasks_dict_info,
2565 timeout_ns_deploy,
2566 stage,
2567 nslcmop_id,
2568 nsr_id=nsr_id,
2569 )
2570 stage[1] = stage[2] = ""
2571 except asyncio.CancelledError:
2572 error_list.append("Cancelled")
2573 # TODO cancel all tasks
2574 except Exception as exc:
2575 error_list.append(str(exc))
2576
2577 # update operation-status
2578 db_nsr_update["operational-status"] = "running"
2579 # let's begin with VCA 'configured' status (later we can change it)
2580 db_nsr_update["config-status"] = "configured"
2581 for task, task_name in tasks_dict_info.items():
2582 if not task.done() or task.cancelled() or task.exception():
2583 if task_name.startswith(self.task_name_deploy_vca):
2584 # A N2VC task is pending
2585 db_nsr_update["config-status"] = "failed"
2586 else:
2587 # RO or KDU task is pending
2588 db_nsr_update["operational-status"] = "failed"
2589
2590 # update status at database
2591 if error_list:
2592 error_detail = ". ".join(error_list)
2593 self.logger.error(logging_text + error_detail)
2594 error_description_nslcmop = "{} Detail: {}".format(
2595 stage[0], error_detail
2596 )
2597 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2598 nslcmop_id, stage[0]
2599 )
2600
2601 db_nsr_update["detailed-status"] = (
2602 error_description_nsr + " Detail: " + error_detail
2603 )
2604 db_nslcmop_update["detailed-status"] = error_detail
2605 nslcmop_operation_state = "FAILED"
2606 ns_state = "BROKEN"
2607 else:
2608 error_detail = None
2609 error_description_nsr = error_description_nslcmop = None
2610 ns_state = "READY"
2611 db_nsr_update["detailed-status"] = "Done"
2612 db_nslcmop_update["detailed-status"] = "Done"
2613 nslcmop_operation_state = "COMPLETED"
2614
2615 if db_nsr:
2616 self._write_ns_status(
2617 nsr_id=nsr_id,
2618 ns_state=ns_state,
2619 current_operation="IDLE",
2620 current_operation_id=None,
2621 error_description=error_description_nsr,
2622 error_detail=error_detail,
2623 other_update=db_nsr_update,
2624 )
2625 self._write_op_status(
2626 op_id=nslcmop_id,
2627 stage="",
2628 error_message=error_description_nslcmop,
2629 operation_state=nslcmop_operation_state,
2630 other_update=db_nslcmop_update,
2631 )
2632
2633 if nslcmop_operation_state:
2634 try:
2635 await self.msg.aiowrite(
2636 "ns",
2637 "instantiated",
2638 {
2639 "nsr_id": nsr_id,
2640 "nslcmop_id": nslcmop_id,
2641 "operationState": nslcmop_operation_state,
2642 },
2643 loop=self.loop,
2644 )
2645 except Exception as e:
2646 self.logger.error(
2647 logging_text + "kafka_write notification Exception {}".format(e)
2648 )
2649
2650 self.logger.debug(logging_text + "Exit")
2651 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2652
2653 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2654 if vnfd_id not in cached_vnfds:
2655 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2656 return cached_vnfds[vnfd_id]
2657
2658 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2659 if vnf_profile_id not in cached_vnfrs:
2660 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2661 "vnfrs",
2662 {
2663 "member-vnf-index-ref": vnf_profile_id,
2664 "nsr-id-ref": nsr_id,
2665 },
2666 )
2667 return cached_vnfrs[vnf_profile_id]
2668
2669 def _is_deployed_vca_in_relation(
2670 self, vca: DeployedVCA, relation: Relation
2671 ) -> bool:
2672 found = False
2673 for endpoint in (relation.provider, relation.requirer):
2674 if endpoint["kdu-resource-profile-id"]:
2675 continue
2676 found = (
2677 vca.vnf_profile_id == endpoint.vnf_profile_id
2678 and vca.vdu_profile_id == endpoint.vdu_profile_id
2679 and vca.execution_environment_ref == endpoint.execution_environment_ref
2680 )
2681 if found:
2682 break
2683 return found
2684
2685 def _update_ee_relation_data_with_implicit_data(
2686 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2687 ):
2688 ee_relation_data = safe_get_ee_relation(
2689 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2690 )
2691 ee_relation_level = EELevel.get_level(ee_relation_data)
2692 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2693 "execution-environment-ref"
2694 ]:
2695 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2696 vnfd_id = vnf_profile["vnfd-id"]
2697 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2698 entity_id = (
2699 vnfd_id
2700 if ee_relation_level == EELevel.VNF
2701 else ee_relation_data["vdu-profile-id"]
2702 )
2703 ee = get_juju_ee_ref(db_vnfd, entity_id)
2704 if not ee:
2705 raise Exception(
2706 f"not execution environments found for ee_relation {ee_relation_data}"
2707 )
2708 ee_relation_data["execution-environment-ref"] = ee["id"]
2709 return ee_relation_data
2710
2711 def _get_ns_relations(
2712 self,
2713 nsr_id: str,
2714 nsd: Dict[str, Any],
2715 vca: DeployedVCA,
2716 cached_vnfds: Dict[str, Any],
2717 ):
2718 relations = []
2719 db_ns_relations = get_ns_configuration_relation_list(nsd)
2720 for r in db_ns_relations:
2721 relation_provider = self._update_ee_relation_data_with_implicit_data(
2722 nsr_id, nsd, r["provider"], cached_vnfds
2723 )
2724 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2725 nsr_id, nsd, r["requirer"], cached_vnfds
2726 )
2727 provider = EERelation(relation_provider)
2728 requirer = EERelation(relation_requirer)
2729 relation = Relation(r["name"], provider, requirer)
2730 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2731 if vca_in_relation:
2732 relations.append(relation)
2733 return relations
2734
2735 def _get_vnf_relations(
2736 self,
2737 nsr_id: str,
2738 nsd: Dict[str, Any],
2739 vca: DeployedVCA,
2740 cached_vnfds: Dict[str, Any],
2741 ):
2742 relations = []
2743 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2744 vnf_profile_id = vnf_profile["id"]
2745 vnfd_id = vnf_profile["vnfd-id"]
2746 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2747 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2748 for r in db_vnf_relations:
2749 relation_provider = self._update_ee_relation_data_with_implicit_data(
2750 nsr_id, nsd, r["provider"], cached_vnfds, vnf_profile_id=vnf_profile_id
2751 )
2752 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2753 nsr_id, nsd, r["requirer"], cached_vnfds, vnf_profile_id=vnf_profile_id
2754 )
2755 provider = EERelation(relation_provider)
2756 requirer = EERelation(relation_requirer)
2757 relation = Relation(r["name"], provider, requirer)
2758 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2759 if vca_in_relation:
2760 relations.append(relation)
2761 return relations
2762
2763 def _get_kdu_resource_data(
2764 self,
2765 ee_relation: EERelation,
2766 db_nsr: Dict[str, Any],
2767 cached_vnfds: Dict[str, Any],
2768 ) -> DeployedK8sResource:
2769 nsd = get_nsd(db_nsr)
2770 vnf_profiles = get_vnf_profiles(nsd)
2771 vnfd_id = find_in_list(
2772 vnf_profiles,
2773 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2774 )["vnfd-id"]
2775 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2776 kdu_resource_profile = get_kdu_resource_profile(
2777 db_vnfd, ee_relation.kdu_resource_profile_id
2778 )
2779 kdu_name = kdu_resource_profile["kdu-name"]
2780 deployed_kdu, _ = get_deployed_kdu(
2781 db_nsr.get("_admin", ()).get("deployed", ()),
2782 kdu_name,
2783 ee_relation.vnf_profile_id,
2784 )
2785 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2786 return deployed_kdu
2787
2788 def _get_deployed_component(
2789 self,
2790 ee_relation: EERelation,
2791 db_nsr: Dict[str, Any],
2792 cached_vnfds: Dict[str, Any],
2793 ) -> DeployedComponent:
2794 nsr_id = db_nsr["_id"]
2795 deployed_component = None
2796 ee_level = EELevel.get_level(ee_relation)
2797 if ee_level == EELevel.NS:
2798 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2799 if vca:
2800 deployed_component = DeployedVCA(nsr_id, vca)
2801 elif ee_level == EELevel.VNF:
2802 vca = get_deployed_vca(
2803 db_nsr,
2804 {
2805 "vdu_id": None,
2806 "member-vnf-index": ee_relation.vnf_profile_id,
2807 "ee_descriptor_id": ee_relation.execution_environment_ref,
2808 },
2809 )
2810 if vca:
2811 deployed_component = DeployedVCA(nsr_id, vca)
2812 elif ee_level == EELevel.VDU:
2813 vca = get_deployed_vca(
2814 db_nsr,
2815 {
2816 "vdu_id": ee_relation.vdu_profile_id,
2817 "member-vnf-index": ee_relation.vnf_profile_id,
2818 "ee_descriptor_id": ee_relation.execution_environment_ref,
2819 },
2820 )
2821 if vca:
2822 deployed_component = DeployedVCA(nsr_id, vca)
2823 elif ee_level == EELevel.KDU:
2824 kdu_resource_data = self._get_kdu_resource_data(
2825 ee_relation, db_nsr, cached_vnfds
2826 )
2827 if kdu_resource_data:
2828 deployed_component = DeployedK8sResource(kdu_resource_data)
2829 return deployed_component
2830
2831 async def _add_relation(
2832 self,
2833 relation: Relation,
2834 vca_type: str,
2835 db_nsr: Dict[str, Any],
2836 cached_vnfds: Dict[str, Any],
2837 cached_vnfrs: Dict[str, Any],
2838 ) -> bool:
2839 deployed_provider = self._get_deployed_component(
2840 relation.provider, db_nsr, cached_vnfds
2841 )
2842 deployed_requirer = self._get_deployed_component(
2843 relation.requirer, db_nsr, cached_vnfds
2844 )
2845 if (
2846 deployed_provider
2847 and deployed_requirer
2848 and deployed_provider.config_sw_installed
2849 and deployed_requirer.config_sw_installed
2850 ):
2851 provider_db_vnfr = (
2852 self._get_vnfr(
2853 relation.provider.nsr_id,
2854 relation.provider.vnf_profile_id,
2855 cached_vnfrs,
2856 )
2857 if relation.provider.vnf_profile_id
2858 else None
2859 )
2860 requirer_db_vnfr = (
2861 self._get_vnfr(
2862 relation.requirer.nsr_id,
2863 relation.requirer.vnf_profile_id,
2864 cached_vnfrs,
2865 )
2866 if relation.requirer.vnf_profile_id
2867 else None
2868 )
2869 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
2870 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
2871 provider_relation_endpoint = RelationEndpoint(
2872 deployed_provider.ee_id,
2873 provider_vca_id,
2874 relation.provider.endpoint,
2875 )
2876 requirer_relation_endpoint = RelationEndpoint(
2877 deployed_requirer.ee_id,
2878 requirer_vca_id,
2879 relation.requirer.endpoint,
2880 )
2881 await self.vca_map[vca_type].add_relation(
2882 provider=provider_relation_endpoint,
2883 requirer=requirer_relation_endpoint,
2884 )
2885 # remove entry from relations list
2886 return True
2887 return False
2888
2889 async def _add_vca_relations(
2890 self,
2891 logging_text,
2892 nsr_id,
2893 vca_type: str,
2894 vca_index: int,
2895 timeout: int = 3600,
2896 ) -> bool:
2897
2898 # steps:
2899 # 1. find all relations for this VCA
2900 # 2. wait for other peers related
2901 # 3. add relations
2902
2903 try:
2904 # STEP 1: find all relations for this VCA
2905
2906 # read nsr record
2907 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2908 nsd = get_nsd(db_nsr)
2909
2910 # this VCA data
2911 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
2912 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
2913
2914 cached_vnfds = {}
2915 cached_vnfrs = {}
2916 relations = []
2917 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
2918 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
2919
2920 # if no relations, terminate
2921 if not relations:
2922 self.logger.debug(logging_text + " No relations")
2923 return True
2924
2925 self.logger.debug(logging_text + " adding relations {}".format(relations))
2926
2927 # add all relations
2928 start = time()
2929 while True:
2930 # check timeout
2931 now = time()
2932 if now - start >= timeout:
2933 self.logger.error(logging_text + " : timeout adding relations")
2934 return False
2935
2936 # reload nsr from database (we need to update record: _admin.deployed.VCA)
2937 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2938
2939 # for each relation, find the VCA's related
2940 for relation in relations.copy():
2941 added = await self._add_relation(
2942 relation,
2943 vca_type,
2944 db_nsr,
2945 cached_vnfds,
2946 cached_vnfrs,
2947 )
2948 if added:
2949 relations.remove(relation)
2950
2951 if not relations:
2952 self.logger.debug("Relations added")
2953 break
2954 await asyncio.sleep(5.0)
2955
2956 return True
2957
2958 except Exception as e:
2959 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2960 return False
2961
2962 async def _install_kdu(
2963 self,
2964 nsr_id: str,
2965 nsr_db_path: str,
2966 vnfr_data: dict,
2967 kdu_index: int,
2968 kdud: dict,
2969 vnfd: dict,
2970 k8s_instance_info: dict,
2971 k8params: dict = None,
2972 timeout: int = 600,
2973 vca_id: str = None,
2974 ):
2975
2976 try:
2977 k8sclustertype = k8s_instance_info["k8scluster-type"]
2978 # Instantiate kdu
2979 db_dict_install = {
2980 "collection": "nsrs",
2981 "filter": {"_id": nsr_id},
2982 "path": nsr_db_path,
2983 }
2984
2985 if k8s_instance_info.get("kdu-deployment-name"):
2986 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2987 else:
2988 kdu_instance = self.k8scluster_map[
2989 k8sclustertype
2990 ].generate_kdu_instance_name(
2991 db_dict=db_dict_install,
2992 kdu_model=k8s_instance_info["kdu-model"],
2993 kdu_name=k8s_instance_info["kdu-name"],
2994 )
2995 self.update_db_2(
2996 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2997 )
2998 await self.k8scluster_map[k8sclustertype].install(
2999 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3000 kdu_model=k8s_instance_info["kdu-model"],
3001 atomic=True,
3002 params=k8params,
3003 db_dict=db_dict_install,
3004 timeout=timeout,
3005 kdu_name=k8s_instance_info["kdu-name"],
3006 namespace=k8s_instance_info["namespace"],
3007 kdu_instance=kdu_instance,
3008 vca_id=vca_id,
3009 )
3010 self.update_db_2(
3011 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3012 )
3013
3014 # Obtain services to obtain management service ip
3015 services = await self.k8scluster_map[k8sclustertype].get_services(
3016 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3017 kdu_instance=kdu_instance,
3018 namespace=k8s_instance_info["namespace"],
3019 )
3020
3021 # Obtain management service info (if exists)
3022 vnfr_update_dict = {}
3023 kdu_config = get_configuration(vnfd, kdud["name"])
3024 if kdu_config:
3025 target_ee_list = kdu_config.get("execution-environment-list", [])
3026 else:
3027 target_ee_list = []
3028
3029 if services:
3030 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3031 mgmt_services = [
3032 service
3033 for service in kdud.get("service", [])
3034 if service.get("mgmt-service")
3035 ]
3036 for mgmt_service in mgmt_services:
3037 for service in services:
3038 if service["name"].startswith(mgmt_service["name"]):
3039 # Mgmt service found, Obtain service ip
3040 ip = service.get("external_ip", service.get("cluster_ip"))
3041 if isinstance(ip, list) and len(ip) == 1:
3042 ip = ip[0]
3043
3044 vnfr_update_dict[
3045 "kdur.{}.ip-address".format(kdu_index)
3046 ] = ip
3047
3048 # Check if must update also mgmt ip at the vnf
3049 service_external_cp = mgmt_service.get(
3050 "external-connection-point-ref"
3051 )
3052 if service_external_cp:
3053 if (
3054 deep_get(vnfd, ("mgmt-interface", "cp"))
3055 == service_external_cp
3056 ):
3057 vnfr_update_dict["ip-address"] = ip
3058
3059 if find_in_list(
3060 target_ee_list,
3061 lambda ee: ee.get(
3062 "external-connection-point-ref", ""
3063 )
3064 == service_external_cp,
3065 ):
3066 vnfr_update_dict[
3067 "kdur.{}.ip-address".format(kdu_index)
3068 ] = ip
3069 break
3070 else:
3071 self.logger.warn(
3072 "Mgmt service name: {} not found".format(
3073 mgmt_service["name"]
3074 )
3075 )
3076
3077 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3078 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3079
3080 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3081 if (
3082 kdu_config
3083 and kdu_config.get("initial-config-primitive")
3084 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3085 ):
3086 initial_config_primitive_list = kdu_config.get(
3087 "initial-config-primitive"
3088 )
3089 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3090
3091 for initial_config_primitive in initial_config_primitive_list:
3092 primitive_params_ = self._map_primitive_params(
3093 initial_config_primitive, {}, {}
3094 )
3095
3096 await asyncio.wait_for(
3097 self.k8scluster_map[k8sclustertype].exec_primitive(
3098 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3099 kdu_instance=kdu_instance,
3100 primitive_name=initial_config_primitive["name"],
3101 params=primitive_params_,
3102 db_dict=db_dict_install,
3103 vca_id=vca_id,
3104 ),
3105 timeout=timeout,
3106 )
3107
3108 except Exception as e:
3109 # Prepare update db with error and raise exception
3110 try:
3111 self.update_db_2(
3112 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3113 )
3114 self.update_db_2(
3115 "vnfrs",
3116 vnfr_data.get("_id"),
3117 {"kdur.{}.status".format(kdu_index): "ERROR"},
3118 )
3119 except Exception:
3120 # ignore to keep original exception
3121 pass
3122 # reraise original error
3123 raise
3124
3125 return kdu_instance
3126
3127 async def deploy_kdus(
3128 self,
3129 logging_text,
3130 nsr_id,
3131 nslcmop_id,
3132 db_vnfrs,
3133 db_vnfds,
3134 task_instantiation_info,
3135 ):
3136 # Launch kdus if present in the descriptor
3137
3138 k8scluster_id_2_uuic = {
3139 "helm-chart-v3": {},
3140 "helm-chart": {},
3141 "juju-bundle": {},
3142 }
3143
3144 async def _get_cluster_id(cluster_id, cluster_type):
3145 nonlocal k8scluster_id_2_uuic
3146 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3147 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3148
3149 # check if K8scluster is creating and wait look if previous tasks in process
3150 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3151 "k8scluster", cluster_id
3152 )
3153 if task_dependency:
3154 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3155 task_name, cluster_id
3156 )
3157 self.logger.debug(logging_text + text)
3158 await asyncio.wait(task_dependency, timeout=3600)
3159
3160 db_k8scluster = self.db.get_one(
3161 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3162 )
3163 if not db_k8scluster:
3164 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3165
3166 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3167 if not k8s_id:
3168 if cluster_type == "helm-chart-v3":
3169 try:
3170 # backward compatibility for existing clusters that have not been initialized for helm v3
3171 k8s_credentials = yaml.safe_dump(
3172 db_k8scluster.get("credentials")
3173 )
3174 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3175 k8s_credentials, reuse_cluster_uuid=cluster_id
3176 )
3177 db_k8scluster_update = {}
3178 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3179 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3180 db_k8scluster_update[
3181 "_admin.helm-chart-v3.created"
3182 ] = uninstall_sw
3183 db_k8scluster_update[
3184 "_admin.helm-chart-v3.operationalState"
3185 ] = "ENABLED"
3186 self.update_db_2(
3187 "k8sclusters", cluster_id, db_k8scluster_update
3188 )
3189 except Exception as e:
3190 self.logger.error(
3191 logging_text
3192 + "error initializing helm-v3 cluster: {}".format(str(e))
3193 )
3194 raise LcmException(
3195 "K8s cluster '{}' has not been initialized for '{}'".format(
3196 cluster_id, cluster_type
3197 )
3198 )
3199 else:
3200 raise LcmException(
3201 "K8s cluster '{}' has not been initialized for '{}'".format(
3202 cluster_id, cluster_type
3203 )
3204 )
3205 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3206 return k8s_id
3207
3208 logging_text += "Deploy kdus: "
3209 step = ""
3210 try:
3211 db_nsr_update = {"_admin.deployed.K8s": []}
3212 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3213
3214 index = 0
3215 updated_cluster_list = []
3216 updated_v3_cluster_list = []
3217
3218 for vnfr_data in db_vnfrs.values():
3219 vca_id = self.get_vca_id(vnfr_data, {})
3220 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3221 # Step 0: Prepare and set parameters
3222 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3223 vnfd_id = vnfr_data.get("vnfd-id")
3224 vnfd_with_id = find_in_list(
3225 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3226 )
3227 kdud = next(
3228 kdud
3229 for kdud in vnfd_with_id["kdu"]
3230 if kdud["name"] == kdur["kdu-name"]
3231 )
3232 namespace = kdur.get("k8s-namespace")
3233 kdu_deployment_name = kdur.get("kdu-deployment-name")
3234 if kdur.get("helm-chart"):
3235 kdumodel = kdur["helm-chart"]
3236 # Default version: helm3, if helm-version is v2 assign v2
3237 k8sclustertype = "helm-chart-v3"
3238 self.logger.debug("kdur: {}".format(kdur))
3239 if (
3240 kdur.get("helm-version")
3241 and kdur.get("helm-version") == "v2"
3242 ):
3243 k8sclustertype = "helm-chart"
3244 elif kdur.get("juju-bundle"):
3245 kdumodel = kdur["juju-bundle"]
3246 k8sclustertype = "juju-bundle"
3247 else:
3248 raise LcmException(
3249 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3250 "juju-bundle. Maybe an old NBI version is running".format(
3251 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3252 )
3253 )
3254 # check if kdumodel is a file and exists
3255 try:
3256 vnfd_with_id = find_in_list(
3257 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3258 )
3259 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3260 if storage and storage.get(
3261 "pkg-dir"
3262 ): # may be not present if vnfd has not artifacts
3263 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3264 filename = "{}/{}/{}s/{}".format(
3265 storage["folder"],
3266 storage["pkg-dir"],
3267 k8sclustertype,
3268 kdumodel,
3269 )
3270 if self.fs.file_exists(
3271 filename, mode="file"
3272 ) or self.fs.file_exists(filename, mode="dir"):
3273 kdumodel = self.fs.path + filename
3274 except (asyncio.TimeoutError, asyncio.CancelledError):
3275 raise
3276 except Exception: # it is not a file
3277 pass
3278
3279 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3280 step = "Synchronize repos for k8s cluster '{}'".format(
3281 k8s_cluster_id
3282 )
3283 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3284
3285 # Synchronize repos
3286 if (
3287 k8sclustertype == "helm-chart"
3288 and cluster_uuid not in updated_cluster_list
3289 ) or (
3290 k8sclustertype == "helm-chart-v3"
3291 and cluster_uuid not in updated_v3_cluster_list
3292 ):
3293 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3294 self.k8scluster_map[k8sclustertype].synchronize_repos(
3295 cluster_uuid=cluster_uuid
3296 )
3297 )
3298 if del_repo_list or added_repo_dict:
3299 if k8sclustertype == "helm-chart":
3300 unset = {
3301 "_admin.helm_charts_added." + item: None
3302 for item in del_repo_list
3303 }
3304 updated = {
3305 "_admin.helm_charts_added." + item: name
3306 for item, name in added_repo_dict.items()
3307 }
3308 updated_cluster_list.append(cluster_uuid)
3309 elif k8sclustertype == "helm-chart-v3":
3310 unset = {
3311 "_admin.helm_charts_v3_added." + item: None
3312 for item in del_repo_list
3313 }
3314 updated = {
3315 "_admin.helm_charts_v3_added." + item: name
3316 for item, name in added_repo_dict.items()
3317 }
3318 updated_v3_cluster_list.append(cluster_uuid)
3319 self.logger.debug(
3320 logging_text + "repos synchronized on k8s cluster "
3321 "'{}' to_delete: {}, to_add: {}".format(
3322 k8s_cluster_id, del_repo_list, added_repo_dict
3323 )
3324 )
3325 self.db.set_one(
3326 "k8sclusters",
3327 {"_id": k8s_cluster_id},
3328 updated,
3329 unset=unset,
3330 )
3331
3332 # Instantiate kdu
3333 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3334 vnfr_data["member-vnf-index-ref"],
3335 kdur["kdu-name"],
3336 k8s_cluster_id,
3337 )
3338 k8s_instance_info = {
3339 "kdu-instance": None,
3340 "k8scluster-uuid": cluster_uuid,
3341 "k8scluster-type": k8sclustertype,
3342 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3343 "kdu-name": kdur["kdu-name"],
3344 "kdu-model": kdumodel,
3345 "namespace": namespace,
3346 "kdu-deployment-name": kdu_deployment_name,
3347 }
3348 db_path = "_admin.deployed.K8s.{}".format(index)
3349 db_nsr_update[db_path] = k8s_instance_info
3350 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3351 vnfd_with_id = find_in_list(
3352 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3353 )
3354 task = asyncio.ensure_future(
3355 self._install_kdu(
3356 nsr_id,
3357 db_path,
3358 vnfr_data,
3359 kdu_index,
3360 kdud,
3361 vnfd_with_id,
3362 k8s_instance_info,
3363 k8params=desc_params,
3364 timeout=600,
3365 vca_id=vca_id,
3366 )
3367 )
3368 self.lcm_tasks.register(
3369 "ns",
3370 nsr_id,
3371 nslcmop_id,
3372 "instantiate_KDU-{}".format(index),
3373 task,
3374 )
3375 task_instantiation_info[task] = "Deploying KDU {}".format(
3376 kdur["kdu-name"]
3377 )
3378
3379 index += 1
3380
3381 except (LcmException, asyncio.CancelledError):
3382 raise
3383 except Exception as e:
3384 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3385 if isinstance(e, (N2VCException, DbException)):
3386 self.logger.error(logging_text + msg)
3387 else:
3388 self.logger.critical(logging_text + msg, exc_info=True)
3389 raise LcmException(msg)
3390 finally:
3391 if db_nsr_update:
3392 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3393
3394 def _deploy_n2vc(
3395 self,
3396 logging_text,
3397 db_nsr,
3398 db_vnfr,
3399 nslcmop_id,
3400 nsr_id,
3401 nsi_id,
3402 vnfd_id,
3403 vdu_id,
3404 kdu_name,
3405 member_vnf_index,
3406 vdu_index,
3407 vdu_name,
3408 deploy_params,
3409 descriptor_config,
3410 base_folder,
3411 task_instantiation_info,
3412 stage,
3413 ):
3414 # launch instantiate_N2VC in a asyncio task and register task object
3415 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3416 # if not found, create one entry and update database
3417 # fill db_nsr._admin.deployed.VCA.<index>
3418
3419 self.logger.debug(
3420 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3421 )
3422 if "execution-environment-list" in descriptor_config:
3423 ee_list = descriptor_config.get("execution-environment-list", [])
3424 elif "juju" in descriptor_config:
3425 ee_list = [descriptor_config] # ns charms
3426 else: # other types as script are not supported
3427 ee_list = []
3428
3429 for ee_item in ee_list:
3430 self.logger.debug(
3431 logging_text
3432 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3433 ee_item.get("juju"), ee_item.get("helm-chart")
3434 )
3435 )
3436 ee_descriptor_id = ee_item.get("id")
3437 if ee_item.get("juju"):
3438 vca_name = ee_item["juju"].get("charm")
3439 vca_type = (
3440 "lxc_proxy_charm"
3441 if ee_item["juju"].get("charm") is not None
3442 else "native_charm"
3443 )
3444 if ee_item["juju"].get("cloud") == "k8s":
3445 vca_type = "k8s_proxy_charm"
3446 elif ee_item["juju"].get("proxy") is False:
3447 vca_type = "native_charm"
3448 elif ee_item.get("helm-chart"):
3449 vca_name = ee_item["helm-chart"]
3450 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3451 vca_type = "helm"
3452 else:
3453 vca_type = "helm-v3"
3454 else:
3455 self.logger.debug(
3456 logging_text + "skipping non juju neither charm configuration"
3457 )
3458 continue
3459
3460 vca_index = -1
3461 for vca_index, vca_deployed in enumerate(
3462 db_nsr["_admin"]["deployed"]["VCA"]
3463 ):
3464 if not vca_deployed:
3465 continue
3466 if (
3467 vca_deployed.get("member-vnf-index") == member_vnf_index
3468 and vca_deployed.get("vdu_id") == vdu_id
3469 and vca_deployed.get("kdu_name") == kdu_name
3470 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3471 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3472 ):
3473 break
3474 else:
3475 # not found, create one.
3476 target = (
3477 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3478 )
3479 if vdu_id:
3480 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3481 elif kdu_name:
3482 target += "/kdu/{}".format(kdu_name)
3483 vca_deployed = {
3484 "target_element": target,
3485 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3486 "member-vnf-index": member_vnf_index,
3487 "vdu_id": vdu_id,
3488 "kdu_name": kdu_name,
3489 "vdu_count_index": vdu_index,
3490 "operational-status": "init", # TODO revise
3491 "detailed-status": "", # TODO revise
3492 "step": "initial-deploy", # TODO revise
3493 "vnfd_id": vnfd_id,
3494 "vdu_name": vdu_name,
3495 "type": vca_type,
3496 "ee_descriptor_id": ee_descriptor_id,
3497 }
3498 vca_index += 1
3499
3500 # create VCA and configurationStatus in db
3501 db_dict = {
3502 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3503 "configurationStatus.{}".format(vca_index): dict(),
3504 }
3505 self.update_db_2("nsrs", nsr_id, db_dict)
3506
3507 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3508
3509 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3510 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3511 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3512
3513 # Launch task
3514 task_n2vc = asyncio.ensure_future(
3515 self.instantiate_N2VC(
3516 logging_text=logging_text,
3517 vca_index=vca_index,
3518 nsi_id=nsi_id,
3519 db_nsr=db_nsr,
3520 db_vnfr=db_vnfr,
3521 vdu_id=vdu_id,
3522 kdu_name=kdu_name,
3523 vdu_index=vdu_index,
3524 deploy_params=deploy_params,
3525 config_descriptor=descriptor_config,
3526 base_folder=base_folder,
3527 nslcmop_id=nslcmop_id,
3528 stage=stage,
3529 vca_type=vca_type,
3530 vca_name=vca_name,
3531 ee_config_descriptor=ee_item,
3532 )
3533 )
3534 self.lcm_tasks.register(
3535 "ns",
3536 nsr_id,
3537 nslcmop_id,
3538 "instantiate_N2VC-{}".format(vca_index),
3539 task_n2vc,
3540 )
3541 task_instantiation_info[
3542 task_n2vc
3543 ] = self.task_name_deploy_vca + " {}.{}".format(
3544 member_vnf_index or "", vdu_id or ""
3545 )
3546
3547 @staticmethod
3548 def _create_nslcmop(nsr_id, operation, params):
3549 """
3550 Creates a ns-lcm-opp content to be stored at database.
3551 :param nsr_id: internal id of the instance
3552 :param operation: instantiate, terminate, scale, action, ...
3553 :param params: user parameters for the operation
3554 :return: dictionary following SOL005 format
3555 """
3556 # Raise exception if invalid arguments
3557 if not (nsr_id and operation and params):
3558 raise LcmException(
3559 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3560 )
3561 now = time()
3562 _id = str(uuid4())
3563 nslcmop = {
3564 "id": _id,
3565 "_id": _id,
3566 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3567 "operationState": "PROCESSING",
3568 "statusEnteredTime": now,
3569 "nsInstanceId": nsr_id,
3570 "lcmOperationType": operation,
3571 "startTime": now,
3572 "isAutomaticInvocation": False,
3573 "operationParams": params,
3574 "isCancelPending": False,
3575 "links": {
3576 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3577 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3578 },
3579 }
3580 return nslcmop
3581
3582 def _format_additional_params(self, params):
3583 params = params or {}
3584 for key, value in params.items():
3585 if str(value).startswith("!!yaml "):
3586 params[key] = yaml.safe_load(value[7:])
3587 return params
3588
3589 def _get_terminate_primitive_params(self, seq, vnf_index):
3590 primitive = seq.get("name")
3591 primitive_params = {}
3592 params = {
3593 "member_vnf_index": vnf_index,
3594 "primitive": primitive,
3595 "primitive_params": primitive_params,
3596 }
3597 desc_params = {}
3598 return self._map_primitive_params(seq, params, desc_params)
3599
3600 # sub-operations
3601
3602 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3603 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3604 if op.get("operationState") == "COMPLETED":
3605 # b. Skip sub-operation
3606 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3607 return self.SUBOPERATION_STATUS_SKIP
3608 else:
3609 # c. retry executing sub-operation
3610 # The sub-operation exists, and operationState != 'COMPLETED'
3611 # Update operationState = 'PROCESSING' to indicate a retry.
3612 operationState = "PROCESSING"
3613 detailed_status = "In progress"
3614 self._update_suboperation_status(
3615 db_nslcmop, op_index, operationState, detailed_status
3616 )
3617 # Return the sub-operation index
3618 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3619 # with arguments extracted from the sub-operation
3620 return op_index
3621
3622 # Find a sub-operation where all keys in a matching dictionary must match
3623 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3624 def _find_suboperation(self, db_nslcmop, match):
3625 if db_nslcmop and match:
3626 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3627 for i, op in enumerate(op_list):
3628 if all(op.get(k) == match[k] for k in match):
3629 return i
3630 return self.SUBOPERATION_STATUS_NOT_FOUND
3631
3632 # Update status for a sub-operation given its index
3633 def _update_suboperation_status(
3634 self, db_nslcmop, op_index, operationState, detailed_status
3635 ):
3636 # Update DB for HA tasks
3637 q_filter = {"_id": db_nslcmop["_id"]}
3638 update_dict = {
3639 "_admin.operations.{}.operationState".format(op_index): operationState,
3640 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3641 }
3642 self.db.set_one(
3643 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3644 )
3645
3646 # Add sub-operation, return the index of the added sub-operation
3647 # Optionally, set operationState, detailed-status, and operationType
3648 # Status and type are currently set for 'scale' sub-operations:
3649 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3650 # 'detailed-status' : status message
3651 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3652 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3653 def _add_suboperation(
3654 self,
3655 db_nslcmop,
3656 vnf_index,
3657 vdu_id,
3658 vdu_count_index,
3659 vdu_name,
3660 primitive,
3661 mapped_primitive_params,
3662 operationState=None,
3663 detailed_status=None,
3664 operationType=None,
3665 RO_nsr_id=None,
3666 RO_scaling_info=None,
3667 ):
3668 if not db_nslcmop:
3669 return self.SUBOPERATION_STATUS_NOT_FOUND
3670 # Get the "_admin.operations" list, if it exists
3671 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3672 op_list = db_nslcmop_admin.get("operations")
3673 # Create or append to the "_admin.operations" list
3674 new_op = {
3675 "member_vnf_index": vnf_index,
3676 "vdu_id": vdu_id,
3677 "vdu_count_index": vdu_count_index,
3678 "primitive": primitive,
3679 "primitive_params": mapped_primitive_params,
3680 }
3681 if operationState:
3682 new_op["operationState"] = operationState
3683 if detailed_status:
3684 new_op["detailed-status"] = detailed_status
3685 if operationType:
3686 new_op["lcmOperationType"] = operationType
3687 if RO_nsr_id:
3688 new_op["RO_nsr_id"] = RO_nsr_id
3689 if RO_scaling_info:
3690 new_op["RO_scaling_info"] = RO_scaling_info
3691 if not op_list:
3692 # No existing operations, create key 'operations' with current operation as first list element
3693 db_nslcmop_admin.update({"operations": [new_op]})
3694 op_list = db_nslcmop_admin.get("operations")
3695 else:
3696 # Existing operations, append operation to list
3697 op_list.append(new_op)
3698
3699 db_nslcmop_update = {"_admin.operations": op_list}
3700 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3701 op_index = len(op_list) - 1
3702 return op_index
3703
3704 # Helper methods for scale() sub-operations
3705
3706 # pre-scale/post-scale:
3707 # Check for 3 different cases:
3708 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3709 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3710 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3711 def _check_or_add_scale_suboperation(
3712 self,
3713 db_nslcmop,
3714 vnf_index,
3715 vnf_config_primitive,
3716 primitive_params,
3717 operationType,
3718 RO_nsr_id=None,
3719 RO_scaling_info=None,
3720 ):
3721 # Find this sub-operation
3722 if RO_nsr_id and RO_scaling_info:
3723 operationType = "SCALE-RO"
3724 match = {
3725 "member_vnf_index": vnf_index,
3726 "RO_nsr_id": RO_nsr_id,
3727 "RO_scaling_info": RO_scaling_info,
3728 }
3729 else:
3730 match = {
3731 "member_vnf_index": vnf_index,
3732 "primitive": vnf_config_primitive,
3733 "primitive_params": primitive_params,
3734 "lcmOperationType": operationType,
3735 }
3736 op_index = self._find_suboperation(db_nslcmop, match)
3737 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3738 # a. New sub-operation
3739 # The sub-operation does not exist, add it.
3740 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3741 # The following parameters are set to None for all kind of scaling:
3742 vdu_id = None
3743 vdu_count_index = None
3744 vdu_name = None
3745 if RO_nsr_id and RO_scaling_info:
3746 vnf_config_primitive = None
3747 primitive_params = None
3748 else:
3749 RO_nsr_id = None
3750 RO_scaling_info = None
3751 # Initial status for sub-operation
3752 operationState = "PROCESSING"
3753 detailed_status = "In progress"
3754 # Add sub-operation for pre/post-scaling (zero or more operations)
3755 self._add_suboperation(
3756 db_nslcmop,
3757 vnf_index,
3758 vdu_id,
3759 vdu_count_index,
3760 vdu_name,
3761 vnf_config_primitive,
3762 primitive_params,
3763 operationState,
3764 detailed_status,
3765 operationType,
3766 RO_nsr_id,
3767 RO_scaling_info,
3768 )
3769 return self.SUBOPERATION_STATUS_NEW
3770 else:
3771 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3772 # or op_index (operationState != 'COMPLETED')
3773 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3774
3775 # Function to return execution_environment id
3776
3777 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3778 # TODO vdu_index_count
3779 for vca in vca_deployed_list:
3780 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3781 return vca["ee_id"]
3782
3783 async def destroy_N2VC(
3784 self,
3785 logging_text,
3786 db_nslcmop,
3787 vca_deployed,
3788 config_descriptor,
3789 vca_index,
3790 destroy_ee=True,
3791 exec_primitives=True,
3792 scaling_in=False,
3793 vca_id: str = None,
3794 ):
3795 """
3796 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3797 :param logging_text:
3798 :param db_nslcmop:
3799 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3800 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3801 :param vca_index: index in the database _admin.deployed.VCA
3802 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3803 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3804 not executed properly
3805 :param scaling_in: True destroys the application, False destroys the model
3806 :return: None or exception
3807 """
3808
3809 self.logger.debug(
3810 logging_text
3811 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3812 vca_index, vca_deployed, config_descriptor, destroy_ee
3813 )
3814 )
3815
3816 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3817
3818 # execute terminate_primitives
3819 if exec_primitives:
3820 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3821 config_descriptor.get("terminate-config-primitive"),
3822 vca_deployed.get("ee_descriptor_id"),
3823 )
3824 vdu_id = vca_deployed.get("vdu_id")
3825 vdu_count_index = vca_deployed.get("vdu_count_index")
3826 vdu_name = vca_deployed.get("vdu_name")
3827 vnf_index = vca_deployed.get("member-vnf-index")
3828 if terminate_primitives and vca_deployed.get("needed_terminate"):
3829 for seq in terminate_primitives:
3830 # For each sequence in list, get primitive and call _ns_execute_primitive()
3831 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3832 vnf_index, seq.get("name")
3833 )
3834 self.logger.debug(logging_text + step)
3835 # Create the primitive for each sequence, i.e. "primitive": "touch"
3836 primitive = seq.get("name")
3837 mapped_primitive_params = self._get_terminate_primitive_params(
3838 seq, vnf_index
3839 )
3840
3841 # Add sub-operation
3842 self._add_suboperation(
3843 db_nslcmop,
3844 vnf_index,
3845 vdu_id,
3846 vdu_count_index,
3847 vdu_name,
3848 primitive,
3849 mapped_primitive_params,
3850 )
3851 # Sub-operations: Call _ns_execute_primitive() instead of action()
3852 try:
3853 result, result_detail = await self._ns_execute_primitive(
3854 vca_deployed["ee_id"],
3855 primitive,
3856 mapped_primitive_params,
3857 vca_type=vca_type,
3858 vca_id=vca_id,
3859 )
3860 except LcmException:
3861 # this happens when VCA is not deployed. In this case it is not needed to terminate
3862 continue
3863 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3864 if result not in result_ok:
3865 raise LcmException(
3866 "terminate_primitive {} for vnf_member_index={} fails with "
3867 "error {}".format(seq.get("name"), vnf_index, result_detail)
3868 )
3869 # set that this VCA do not need terminated
3870 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3871 vca_index
3872 )
3873 self.update_db_2(
3874 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3875 )
3876
3877 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3878 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3879
3880 if destroy_ee:
3881 await self.vca_map[vca_type].delete_execution_environment(
3882 vca_deployed["ee_id"],
3883 scaling_in=scaling_in,
3884 vca_type=vca_type,
3885 vca_id=vca_id,
3886 )
3887
3888 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3889 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3890 namespace = "." + db_nsr["_id"]
3891 try:
3892 await self.n2vc.delete_namespace(
3893 namespace=namespace,
3894 total_timeout=self.timeout_charm_delete,
3895 vca_id=vca_id,
3896 )
3897 except N2VCNotFound: # already deleted. Skip
3898 pass
3899 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3900
3901 async def _terminate_RO(
3902 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3903 ):
3904 """
3905 Terminates a deployment from RO
3906 :param logging_text:
3907 :param nsr_deployed: db_nsr._admin.deployed
3908 :param nsr_id:
3909 :param nslcmop_id:
3910 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3911 this method will update only the index 2, but it will write on database the concatenated content of the list
3912 :return:
3913 """
3914 db_nsr_update = {}
3915 failed_detail = []
3916 ro_nsr_id = ro_delete_action = None
3917 if nsr_deployed and nsr_deployed.get("RO"):
3918 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3919 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3920 try:
3921 if ro_nsr_id:
3922 stage[2] = "Deleting ns from VIM."
3923 db_nsr_update["detailed-status"] = " ".join(stage)
3924 self._write_op_status(nslcmop_id, stage)
3925 self.logger.debug(logging_text + stage[2])
3926 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3927 self._write_op_status(nslcmop_id, stage)
3928 desc = await self.RO.delete("ns", ro_nsr_id)
3929 ro_delete_action = desc["action_id"]
3930 db_nsr_update[
3931 "_admin.deployed.RO.nsr_delete_action_id"
3932 ] = ro_delete_action
3933 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3934 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3935 if ro_delete_action:
3936 # wait until NS is deleted from VIM
3937 stage[2] = "Waiting ns deleted from VIM."
3938 detailed_status_old = None
3939 self.logger.debug(
3940 logging_text
3941 + stage[2]
3942 + " RO_id={} ro_delete_action={}".format(
3943 ro_nsr_id, ro_delete_action
3944 )
3945 )
3946 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3947 self._write_op_status(nslcmop_id, stage)
3948
3949 delete_timeout = 20 * 60 # 20 minutes
3950 while delete_timeout > 0:
3951 desc = await self.RO.show(
3952 "ns",
3953 item_id_name=ro_nsr_id,
3954 extra_item="action",
3955 extra_item_id=ro_delete_action,
3956 )
3957
3958 # deploymentStatus
3959 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3960
3961 ns_status, ns_status_info = self.RO.check_action_status(desc)
3962 if ns_status == "ERROR":
3963 raise ROclient.ROClientException(ns_status_info)
3964 elif ns_status == "BUILD":
3965 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3966 elif ns_status == "ACTIVE":
3967 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3968 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3969 break
3970 else:
3971 assert (
3972 False
3973 ), "ROclient.check_action_status returns unknown {}".format(
3974 ns_status
3975 )
3976 if stage[2] != detailed_status_old:
3977 detailed_status_old = stage[2]
3978 db_nsr_update["detailed-status"] = " ".join(stage)
3979 self._write_op_status(nslcmop_id, stage)
3980 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3981 await asyncio.sleep(5, loop=self.loop)
3982 delete_timeout -= 5
3983 else: # delete_timeout <= 0:
3984 raise ROclient.ROClientException(
3985 "Timeout waiting ns deleted from VIM"
3986 )
3987
3988 except Exception as e:
3989 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3990 if (
3991 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3992 ): # not found
3993 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3994 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3995 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3996 self.logger.debug(
3997 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3998 )
3999 elif (
4000 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4001 ): # conflict
4002 failed_detail.append("delete conflict: {}".format(e))
4003 self.logger.debug(
4004 logging_text
4005 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4006 )
4007 else:
4008 failed_detail.append("delete error: {}".format(e))
4009 self.logger.error(
4010 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4011 )
4012
4013 # Delete nsd
4014 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4015 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4016 try:
4017 stage[2] = "Deleting nsd from RO."
4018 db_nsr_update["detailed-status"] = " ".join(stage)
4019 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4020 self._write_op_status(nslcmop_id, stage)
4021 await self.RO.delete("nsd", ro_nsd_id)
4022 self.logger.debug(
4023 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4024 )
4025 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4026 except Exception as e:
4027 if (
4028 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4029 ): # not found
4030 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4031 self.logger.debug(
4032 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4033 )
4034 elif (
4035 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4036 ): # conflict
4037 failed_detail.append(
4038 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4039 )
4040 self.logger.debug(logging_text + failed_detail[-1])
4041 else:
4042 failed_detail.append(
4043 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4044 )
4045 self.logger.error(logging_text + failed_detail[-1])
4046
4047 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4048 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4049 if not vnf_deployed or not vnf_deployed["id"]:
4050 continue
4051 try:
4052 ro_vnfd_id = vnf_deployed["id"]
4053 stage[
4054 2
4055 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4056 vnf_deployed["member-vnf-index"], ro_vnfd_id
4057 )
4058 db_nsr_update["detailed-status"] = " ".join(stage)
4059 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4060 self._write_op_status(nslcmop_id, stage)
4061 await self.RO.delete("vnfd", ro_vnfd_id)
4062 self.logger.debug(
4063 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4064 )
4065 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4066 except Exception as e:
4067 if (
4068 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4069 ): # not found
4070 db_nsr_update[
4071 "_admin.deployed.RO.vnfd.{}.id".format(index)
4072 ] = None
4073 self.logger.debug(
4074 logging_text
4075 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4076 )
4077 elif (
4078 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4079 ): # conflict
4080 failed_detail.append(
4081 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4082 )
4083 self.logger.debug(logging_text + failed_detail[-1])
4084 else:
4085 failed_detail.append(
4086 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4087 )
4088 self.logger.error(logging_text + failed_detail[-1])
4089
4090 if failed_detail:
4091 stage[2] = "Error deleting from VIM"
4092 else:
4093 stage[2] = "Deleted from VIM"
4094 db_nsr_update["detailed-status"] = " ".join(stage)
4095 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4096 self._write_op_status(nslcmop_id, stage)
4097
4098 if failed_detail:
4099 raise LcmException("; ".join(failed_detail))
4100
4101 async def terminate(self, nsr_id, nslcmop_id):
4102 # Try to lock HA task here
4103 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4104 if not task_is_locked_by_me:
4105 return
4106
4107 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4108 self.logger.debug(logging_text + "Enter")
4109 timeout_ns_terminate = self.timeout_ns_terminate
4110 db_nsr = None
4111 db_nslcmop = None
4112 operation_params = None
4113 exc = None
4114 error_list = [] # annotates all failed error messages
4115 db_nslcmop_update = {}
4116 autoremove = False # autoremove after terminated
4117 tasks_dict_info = {}
4118 db_nsr_update = {}
4119 stage = [
4120 "Stage 1/3: Preparing task.",
4121 "Waiting for previous operations to terminate.",
4122 "",
4123 ]
4124 # ^ contains [stage, step, VIM-status]
4125 try:
4126 # wait for any previous tasks in process
4127 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4128
4129 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4130 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4131 operation_params = db_nslcmop.get("operationParams") or {}
4132 if operation_params.get("timeout_ns_terminate"):
4133 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4134 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4135 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4136
4137 db_nsr_update["operational-status"] = "terminating"
4138 db_nsr_update["config-status"] = "terminating"
4139 self._write_ns_status(
4140 nsr_id=nsr_id,
4141 ns_state="TERMINATING",
4142 current_operation="TERMINATING",
4143 current_operation_id=nslcmop_id,
4144 other_update=db_nsr_update,
4145 )
4146 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4147 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4148 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4149 return
4150
4151 stage[1] = "Getting vnf descriptors from db."
4152 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4153 db_vnfrs_dict = {
4154 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4155 }
4156 db_vnfds_from_id = {}
4157 db_vnfds_from_member_index = {}
4158 # Loop over VNFRs
4159 for vnfr in db_vnfrs_list:
4160 vnfd_id = vnfr["vnfd-id"]
4161 if vnfd_id not in db_vnfds_from_id:
4162 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4163 db_vnfds_from_id[vnfd_id] = vnfd
4164 db_vnfds_from_member_index[
4165 vnfr["member-vnf-index-ref"]
4166 ] = db_vnfds_from_id[vnfd_id]
4167
4168 # Destroy individual execution environments when there are terminating primitives.
4169 # Rest of EE will be deleted at once
4170 # TODO - check before calling _destroy_N2VC
4171 # if not operation_params.get("skip_terminate_primitives"):#
4172 # or not vca.get("needed_terminate"):
4173 stage[0] = "Stage 2/3 execute terminating primitives."
4174 self.logger.debug(logging_text + stage[0])
4175 stage[1] = "Looking execution environment that needs terminate."
4176 self.logger.debug(logging_text + stage[1])
4177
4178 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4179 config_descriptor = None
4180 vca_member_vnf_index = vca.get("member-vnf-index")
4181 vca_id = self.get_vca_id(
4182 db_vnfrs_dict.get(vca_member_vnf_index)
4183 if vca_member_vnf_index
4184 else None,
4185 db_nsr,
4186 )
4187 if not vca or not vca.get("ee_id"):
4188 continue
4189 if not vca.get("member-vnf-index"):
4190 # ns
4191 config_descriptor = db_nsr.get("ns-configuration")
4192 elif vca.get("vdu_id"):
4193 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4194 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4195 elif vca.get("kdu_name"):
4196 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4197 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4198 else:
4199 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4200 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4201 vca_type = vca.get("type")
4202 exec_terminate_primitives = not operation_params.get(
4203 "skip_terminate_primitives"
4204 ) and vca.get("needed_terminate")
4205 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4206 # pending native charms
4207 destroy_ee = (
4208 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4209 )
4210 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4211 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4212 task = asyncio.ensure_future(
4213 self.destroy_N2VC(
4214 logging_text,
4215 db_nslcmop,
4216 vca,
4217 config_descriptor,
4218 vca_index,
4219 destroy_ee,
4220 exec_terminate_primitives,
4221 vca_id=vca_id,
4222 )
4223 )
4224 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4225
4226 # wait for pending tasks of terminate primitives
4227 if tasks_dict_info:
4228 self.logger.debug(
4229 logging_text
4230 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4231 )
4232 error_list = await self._wait_for_tasks(
4233 logging_text,
4234 tasks_dict_info,
4235 min(self.timeout_charm_delete, timeout_ns_terminate),
4236 stage,
4237 nslcmop_id,
4238 )
4239 tasks_dict_info.clear()
4240 if error_list:
4241 return # raise LcmException("; ".join(error_list))
4242
4243 # remove All execution environments at once
4244 stage[0] = "Stage 3/3 delete all."
4245
4246 if nsr_deployed.get("VCA"):
4247 stage[1] = "Deleting all execution environments."
4248 self.logger.debug(logging_text + stage[1])
4249 vca_id = self.get_vca_id({}, db_nsr)
4250 task_delete_ee = asyncio.ensure_future(
4251 asyncio.wait_for(
4252 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4253 timeout=self.timeout_charm_delete,
4254 )
4255 )
4256 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4257 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4258
4259 # Delete from k8scluster
4260 stage[1] = "Deleting KDUs."
4261 self.logger.debug(logging_text + stage[1])
4262 # print(nsr_deployed)
4263 for kdu in get_iterable(nsr_deployed, "K8s"):
4264 if not kdu or not kdu.get("kdu-instance"):
4265 continue
4266 kdu_instance = kdu.get("kdu-instance")
4267 if kdu.get("k8scluster-type") in self.k8scluster_map:
4268 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4269 vca_id = self.get_vca_id({}, db_nsr)
4270 task_delete_kdu_instance = asyncio.ensure_future(
4271 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4272 cluster_uuid=kdu.get("k8scluster-uuid"),
4273 kdu_instance=kdu_instance,
4274 vca_id=vca_id,
4275 )
4276 )
4277 else:
4278 self.logger.error(
4279 logging_text
4280 + "Unknown k8s deployment type {}".format(
4281 kdu.get("k8scluster-type")
4282 )
4283 )
4284 continue
4285 tasks_dict_info[
4286 task_delete_kdu_instance
4287 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4288
4289 # remove from RO
4290 stage[1] = "Deleting ns from VIM."
4291 if self.ng_ro:
4292 task_delete_ro = asyncio.ensure_future(
4293 self._terminate_ng_ro(
4294 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4295 )
4296 )
4297 else:
4298 task_delete_ro = asyncio.ensure_future(
4299 self._terminate_RO(
4300 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4301 )
4302 )
4303 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4304
4305 # rest of staff will be done at finally
4306
4307 except (
4308 ROclient.ROClientException,
4309 DbException,
4310 LcmException,
4311 N2VCException,
4312 ) as e:
4313 self.logger.error(logging_text + "Exit Exception {}".format(e))
4314 exc = e
4315 except asyncio.CancelledError:
4316 self.logger.error(
4317 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4318 )
4319 exc = "Operation was cancelled"
4320 except Exception as e:
4321 exc = traceback.format_exc()
4322 self.logger.critical(
4323 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4324 exc_info=True,
4325 )
4326 finally:
4327 if exc:
4328 error_list.append(str(exc))
4329 try:
4330 # wait for pending tasks
4331 if tasks_dict_info:
4332 stage[1] = "Waiting for terminate pending tasks."
4333 self.logger.debug(logging_text + stage[1])
4334 error_list += await self._wait_for_tasks(
4335 logging_text,
4336 tasks_dict_info,
4337 timeout_ns_terminate,
4338 stage,
4339 nslcmop_id,
4340 )
4341 stage[1] = stage[2] = ""
4342 except asyncio.CancelledError:
4343 error_list.append("Cancelled")
4344 # TODO cancell all tasks
4345 except Exception as exc:
4346 error_list.append(str(exc))
4347 # update status at database
4348 if error_list:
4349 error_detail = "; ".join(error_list)
4350 # self.logger.error(logging_text + error_detail)
4351 error_description_nslcmop = "{} Detail: {}".format(
4352 stage[0], error_detail
4353 )
4354 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4355 nslcmop_id, stage[0]
4356 )
4357
4358 db_nsr_update["operational-status"] = "failed"
4359 db_nsr_update["detailed-status"] = (
4360 error_description_nsr + " Detail: " + error_detail
4361 )
4362 db_nslcmop_update["detailed-status"] = error_detail
4363 nslcmop_operation_state = "FAILED"
4364 ns_state = "BROKEN"
4365 else:
4366 error_detail = None
4367 error_description_nsr = error_description_nslcmop = None
4368 ns_state = "NOT_INSTANTIATED"
4369 db_nsr_update["operational-status"] = "terminated"
4370 db_nsr_update["detailed-status"] = "Done"
4371 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4372 db_nslcmop_update["detailed-status"] = "Done"
4373 nslcmop_operation_state = "COMPLETED"
4374
4375 if db_nsr:
4376 self._write_ns_status(
4377 nsr_id=nsr_id,
4378 ns_state=ns_state,
4379 current_operation="IDLE",
4380 current_operation_id=None,
4381 error_description=error_description_nsr,
4382 error_detail=error_detail,
4383 other_update=db_nsr_update,
4384 )
4385 self._write_op_status(
4386 op_id=nslcmop_id,
4387 stage="",
4388 error_message=error_description_nslcmop,
4389 operation_state=nslcmop_operation_state,
4390 other_update=db_nslcmop_update,
4391 )
4392 if ns_state == "NOT_INSTANTIATED":
4393 try:
4394 self.db.set_list(
4395 "vnfrs",
4396 {"nsr-id-ref": nsr_id},
4397 {"_admin.nsState": "NOT_INSTANTIATED"},
4398 )
4399 except DbException as e:
4400 self.logger.warn(
4401 logging_text
4402 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4403 nsr_id, e
4404 )
4405 )
4406 if operation_params:
4407 autoremove = operation_params.get("autoremove", False)
4408 if nslcmop_operation_state:
4409 try:
4410 await self.msg.aiowrite(
4411 "ns",
4412 "terminated",
4413 {
4414 "nsr_id": nsr_id,
4415 "nslcmop_id": nslcmop_id,
4416 "operationState": nslcmop_operation_state,
4417 "autoremove": autoremove,
4418 },
4419 loop=self.loop,
4420 )
4421 except Exception as e:
4422 self.logger.error(
4423 logging_text + "kafka_write notification Exception {}".format(e)
4424 )
4425
4426 self.logger.debug(logging_text + "Exit")
4427 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4428
4429 async def _wait_for_tasks(
4430 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4431 ):
4432 time_start = time()
4433 error_detail_list = []
4434 error_list = []
4435 pending_tasks = list(created_tasks_info.keys())
4436 num_tasks = len(pending_tasks)
4437 num_done = 0
4438 stage[1] = "{}/{}.".format(num_done, num_tasks)
4439 self._write_op_status(nslcmop_id, stage)
4440 while pending_tasks:
4441 new_error = None
4442 _timeout = timeout + time_start - time()
4443 done, pending_tasks = await asyncio.wait(
4444 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4445 )
4446 num_done += len(done)
4447 if not done: # Timeout
4448 for task in pending_tasks:
4449 new_error = created_tasks_info[task] + ": Timeout"
4450 error_detail_list.append(new_error)
4451 error_list.append(new_error)
4452 break
4453 for task in done:
4454 if task.cancelled():
4455 exc = "Cancelled"
4456 else:
4457 exc = task.exception()
4458 if exc:
4459 if isinstance(exc, asyncio.TimeoutError):
4460 exc = "Timeout"
4461 new_error = created_tasks_info[task] + ": {}".format(exc)
4462 error_list.append(created_tasks_info[task])
4463 error_detail_list.append(new_error)
4464 if isinstance(
4465 exc,
4466 (
4467 str,
4468 DbException,
4469 N2VCException,
4470 ROclient.ROClientException,
4471 LcmException,
4472 K8sException,
4473 NgRoException,
4474 ),
4475 ):
4476 self.logger.error(logging_text + new_error)
4477 else:
4478 exc_traceback = "".join(
4479 traceback.format_exception(None, exc, exc.__traceback__)
4480 )
4481 self.logger.error(
4482 logging_text
4483 + created_tasks_info[task]
4484 + " "
4485 + exc_traceback
4486 )
4487 else:
4488 self.logger.debug(
4489 logging_text + created_tasks_info[task] + ": Done"
4490 )
4491 stage[1] = "{}/{}.".format(num_done, num_tasks)
4492 if new_error:
4493 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4494 if nsr_id: # update also nsr
4495 self.update_db_2(
4496 "nsrs",
4497 nsr_id,
4498 {
4499 "errorDescription": "Error at: " + ", ".join(error_list),
4500 "errorDetail": ". ".join(error_detail_list),
4501 },
4502 )
4503 self._write_op_status(nslcmop_id, stage)
4504 return error_detail_list
4505
4506 @staticmethod
4507 def _map_primitive_params(primitive_desc, params, instantiation_params):
4508 """
4509 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4510 The default-value is used. If it is between < > it look for a value at instantiation_params
4511 :param primitive_desc: portion of VNFD/NSD that describes primitive
4512 :param params: Params provided by user
4513 :param instantiation_params: Instantiation params provided by user
4514 :return: a dictionary with the calculated params
4515 """
4516 calculated_params = {}
4517 for parameter in primitive_desc.get("parameter", ()):
4518 param_name = parameter["name"]
4519 if param_name in params:
4520 calculated_params[param_name] = params[param_name]
4521 elif "default-value" in parameter or "value" in parameter:
4522 if "value" in parameter:
4523 calculated_params[param_name] = parameter["value"]
4524 else:
4525 calculated_params[param_name] = parameter["default-value"]
4526 if (
4527 isinstance(calculated_params[param_name], str)
4528 and calculated_params[param_name].startswith("<")
4529 and calculated_params[param_name].endswith(">")
4530 ):
4531 if calculated_params[param_name][1:-1] in instantiation_params:
4532 calculated_params[param_name] = instantiation_params[
4533 calculated_params[param_name][1:-1]
4534 ]
4535 else:
4536 raise LcmException(
4537 "Parameter {} needed to execute primitive {} not provided".format(
4538 calculated_params[param_name], primitive_desc["name"]
4539 )
4540 )
4541 else:
4542 raise LcmException(
4543 "Parameter {} needed to execute primitive {} not provided".format(
4544 param_name, primitive_desc["name"]
4545 )
4546 )
4547
4548 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4549 calculated_params[param_name] = yaml.safe_dump(
4550 calculated_params[param_name], default_flow_style=True, width=256
4551 )
4552 elif isinstance(calculated_params[param_name], str) and calculated_params[
4553 param_name
4554 ].startswith("!!yaml "):
4555 calculated_params[param_name] = calculated_params[param_name][7:]
4556 if parameter.get("data-type") == "INTEGER":
4557 try:
4558 calculated_params[param_name] = int(calculated_params[param_name])
4559 except ValueError: # error converting string to int
4560 raise LcmException(
4561 "Parameter {} of primitive {} must be integer".format(
4562 param_name, primitive_desc["name"]
4563 )
4564 )
4565 elif parameter.get("data-type") == "BOOLEAN":
4566 calculated_params[param_name] = not (
4567 (str(calculated_params[param_name])).lower() == "false"
4568 )
4569
4570 # add always ns_config_info if primitive name is config
4571 if primitive_desc["name"] == "config":
4572 if "ns_config_info" in instantiation_params:
4573 calculated_params["ns_config_info"] = instantiation_params[
4574 "ns_config_info"
4575 ]
4576 return calculated_params
4577
4578 def _look_for_deployed_vca(
4579 self,
4580 deployed_vca,
4581 member_vnf_index,
4582 vdu_id,
4583 vdu_count_index,
4584 kdu_name=None,
4585 ee_descriptor_id=None,
4586 ):
4587 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4588 for vca in deployed_vca:
4589 if not vca:
4590 continue
4591 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4592 continue
4593 if (
4594 vdu_count_index is not None
4595 and vdu_count_index != vca["vdu_count_index"]
4596 ):
4597 continue
4598 if kdu_name and kdu_name != vca["kdu_name"]:
4599 continue
4600 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4601 continue
4602 break
4603 else:
4604 # vca_deployed not found
4605 raise LcmException(
4606 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4607 " is not deployed".format(
4608 member_vnf_index,
4609 vdu_id,
4610 vdu_count_index,
4611 kdu_name,
4612 ee_descriptor_id,
4613 )
4614 )
4615 # get ee_id
4616 ee_id = vca.get("ee_id")
4617 vca_type = vca.get(
4618 "type", "lxc_proxy_charm"
4619 ) # default value for backward compatibility - proxy charm
4620 if not ee_id:
4621 raise LcmException(
4622 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4623 "execution environment".format(
4624 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4625 )
4626 )
4627 return ee_id, vca_type
4628
4629 async def _ns_execute_primitive(
4630 self,
4631 ee_id,
4632 primitive,
4633 primitive_params,
4634 retries=0,
4635 retries_interval=30,
4636 timeout=None,
4637 vca_type=None,
4638 db_dict=None,
4639 vca_id: str = None,
4640 ) -> (str, str):
4641 try:
4642 if primitive == "config":
4643 primitive_params = {"params": primitive_params}
4644
4645 vca_type = vca_type or "lxc_proxy_charm"
4646
4647 while retries >= 0:
4648 try:
4649 output = await asyncio.wait_for(
4650 self.vca_map[vca_type].exec_primitive(
4651 ee_id=ee_id,
4652 primitive_name=primitive,
4653 params_dict=primitive_params,
4654 progress_timeout=self.timeout_progress_primitive,
4655 total_timeout=self.timeout_primitive,
4656 db_dict=db_dict,
4657 vca_id=vca_id,
4658 vca_type=vca_type,
4659 ),
4660 timeout=timeout or self.timeout_primitive,
4661 )
4662 # execution was OK
4663 break
4664 except asyncio.CancelledError:
4665 raise
4666 except Exception as e: # asyncio.TimeoutError
4667 if isinstance(e, asyncio.TimeoutError):
4668 e = "Timeout"
4669 retries -= 1
4670 if retries >= 0:
4671 self.logger.debug(
4672 "Error executing action {} on {} -> {}".format(
4673 primitive, ee_id, e
4674 )
4675 )
4676 # wait and retry
4677 await asyncio.sleep(retries_interval, loop=self.loop)
4678 else:
4679 return "FAILED", str(e)
4680
4681 return "COMPLETED", output
4682
4683 except (LcmException, asyncio.CancelledError):
4684 raise
4685 except Exception as e:
4686 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4687
4688 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4689 """
4690 Updating the vca_status with latest juju information in nsrs record
4691 :param: nsr_id: Id of the nsr
4692 :param: nslcmop_id: Id of the nslcmop
4693 :return: None
4694 """
4695
4696 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4697 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4698 vca_id = self.get_vca_id({}, db_nsr)
4699 if db_nsr["_admin"]["deployed"]["K8s"]:
4700 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4701 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4702 await self._on_update_k8s_db(
4703 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4704 )
4705 else:
4706 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4707 table, filter = "nsrs", {"_id": nsr_id}
4708 path = "_admin.deployed.VCA.{}.".format(vca_index)
4709 await self._on_update_n2vc_db(table, filter, path, {})
4710
4711 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4712 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4713
4714 async def action(self, nsr_id, nslcmop_id):
4715 # Try to lock HA task here
4716 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4717 if not task_is_locked_by_me:
4718 return
4719
4720 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4721 self.logger.debug(logging_text + "Enter")
4722 # get all needed from database
4723 db_nsr = None
4724 db_nslcmop = None
4725 db_nsr_update = {}
4726 db_nslcmop_update = {}
4727 nslcmop_operation_state = None
4728 error_description_nslcmop = None
4729 exc = None
4730 try:
4731 # wait for any previous tasks in process
4732 step = "Waiting for previous operations to terminate"
4733 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4734
4735 self._write_ns_status(
4736 nsr_id=nsr_id,
4737 ns_state=None,
4738 current_operation="RUNNING ACTION",
4739 current_operation_id=nslcmop_id,
4740 )
4741
4742 step = "Getting information from database"
4743 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4744 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4745
4746 nsr_deployed = db_nsr["_admin"].get("deployed")
4747 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4748 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4749 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4750 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4751 primitive = db_nslcmop["operationParams"]["primitive"]
4752 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4753 timeout_ns_action = db_nslcmop["operationParams"].get(
4754 "timeout_ns_action", self.timeout_primitive
4755 )
4756
4757 if vnf_index:
4758 step = "Getting vnfr from database"
4759 db_vnfr = self.db.get_one(
4760 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4761 )
4762 step = "Getting vnfd from database"
4763 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4764 else:
4765 step = "Getting nsd from database"
4766 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4767
4768 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4769 # for backward compatibility
4770 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4771 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4772 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4773 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4774
4775 # look for primitive
4776 config_primitive_desc = descriptor_configuration = None
4777 if vdu_id:
4778 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4779 elif kdu_name:
4780 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4781 elif vnf_index:
4782 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4783 else:
4784 descriptor_configuration = db_nsd.get("ns-configuration")
4785
4786 if descriptor_configuration and descriptor_configuration.get(
4787 "config-primitive"
4788 ):
4789 for config_primitive in descriptor_configuration["config-primitive"]:
4790 if config_primitive["name"] == primitive:
4791 config_primitive_desc = config_primitive
4792 break
4793
4794 if not config_primitive_desc:
4795 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4796 raise LcmException(
4797 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4798 primitive
4799 )
4800 )
4801 primitive_name = primitive
4802 ee_descriptor_id = None
4803 else:
4804 primitive_name = config_primitive_desc.get(
4805 "execution-environment-primitive", primitive
4806 )
4807 ee_descriptor_id = config_primitive_desc.get(
4808 "execution-environment-ref"
4809 )
4810
4811 if vnf_index:
4812 if vdu_id:
4813 vdur = next(
4814 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4815 )
4816 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4817 elif kdu_name:
4818 kdur = next(
4819 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4820 )
4821 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4822 else:
4823 desc_params = parse_yaml_strings(
4824 db_vnfr.get("additionalParamsForVnf")
4825 )
4826 else:
4827 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4828 if kdu_name and get_configuration(db_vnfd, kdu_name):
4829 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4830 actions = set()
4831 for primitive in kdu_configuration.get("initial-config-primitive", []):
4832 actions.add(primitive["name"])
4833 for primitive in kdu_configuration.get("config-primitive", []):
4834 actions.add(primitive["name"])
4835 kdu_action = True if primitive_name in actions else False
4836
4837 # TODO check if ns is in a proper status
4838 if kdu_name and (
4839 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4840 ):
4841 # kdur and desc_params already set from before
4842 if primitive_params:
4843 desc_params.update(primitive_params)
4844 # TODO Check if we will need something at vnf level
4845 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4846 if (
4847 kdu_name == kdu["kdu-name"]
4848 and kdu["member-vnf-index"] == vnf_index
4849 ):
4850 break
4851 else:
4852 raise LcmException(
4853 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4854 )
4855
4856 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4857 msg = "unknown k8scluster-type '{}'".format(
4858 kdu.get("k8scluster-type")
4859 )
4860 raise LcmException(msg)
4861
4862 db_dict = {
4863 "collection": "nsrs",
4864 "filter": {"_id": nsr_id},
4865 "path": "_admin.deployed.K8s.{}".format(index),
4866 }
4867 self.logger.debug(
4868 logging_text
4869 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4870 )
4871 step = "Executing kdu {}".format(primitive_name)
4872 if primitive_name == "upgrade":
4873 if desc_params.get("kdu_model"):
4874 kdu_model = desc_params.get("kdu_model")
4875 del desc_params["kdu_model"]
4876 else:
4877 kdu_model = kdu.get("kdu-model")
4878 parts = kdu_model.split(sep=":")
4879 if len(parts) == 2:
4880 kdu_model = parts[0]
4881
4882 detailed_status = await asyncio.wait_for(
4883 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4884 cluster_uuid=kdu.get("k8scluster-uuid"),
4885 kdu_instance=kdu.get("kdu-instance"),
4886 atomic=True,
4887 kdu_model=kdu_model,
4888 params=desc_params,
4889 db_dict=db_dict,
4890 timeout=timeout_ns_action,
4891 ),
4892 timeout=timeout_ns_action + 10,
4893 )
4894 self.logger.debug(
4895 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4896 )
4897 elif primitive_name == "rollback":
4898 detailed_status = await asyncio.wait_for(
4899 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4900 cluster_uuid=kdu.get("k8scluster-uuid"),
4901 kdu_instance=kdu.get("kdu-instance"),
4902 db_dict=db_dict,
4903 ),
4904 timeout=timeout_ns_action,
4905 )
4906 elif primitive_name == "status":
4907 detailed_status = await asyncio.wait_for(
4908 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4909 cluster_uuid=kdu.get("k8scluster-uuid"),
4910 kdu_instance=kdu.get("kdu-instance"),
4911 vca_id=vca_id,
4912 ),
4913 timeout=timeout_ns_action,
4914 )
4915 else:
4916 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4917 kdu["kdu-name"], nsr_id
4918 )
4919 params = self._map_primitive_params(
4920 config_primitive_desc, primitive_params, desc_params
4921 )
4922
4923 detailed_status = await asyncio.wait_for(
4924 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4925 cluster_uuid=kdu.get("k8scluster-uuid"),
4926 kdu_instance=kdu_instance,
4927 primitive_name=primitive_name,
4928 params=params,
4929 db_dict=db_dict,
4930 timeout=timeout_ns_action,
4931 vca_id=vca_id,
4932 ),
4933 timeout=timeout_ns_action,
4934 )
4935
4936 if detailed_status:
4937 nslcmop_operation_state = "COMPLETED"
4938 else:
4939 detailed_status = ""
4940 nslcmop_operation_state = "FAILED"
4941 else:
4942 ee_id, vca_type = self._look_for_deployed_vca(
4943 nsr_deployed["VCA"],
4944 member_vnf_index=vnf_index,
4945 vdu_id=vdu_id,
4946 vdu_count_index=vdu_count_index,
4947 ee_descriptor_id=ee_descriptor_id,
4948 )
4949 for vca_index, vca_deployed in enumerate(
4950 db_nsr["_admin"]["deployed"]["VCA"]
4951 ):
4952 if vca_deployed.get("member-vnf-index") == vnf_index:
4953 db_dict = {
4954 "collection": "nsrs",
4955 "filter": {"_id": nsr_id},
4956 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4957 }
4958 break
4959 (
4960 nslcmop_operation_state,
4961 detailed_status,
4962 ) = await self._ns_execute_primitive(
4963 ee_id,
4964 primitive=primitive_name,
4965 primitive_params=self._map_primitive_params(
4966 config_primitive_desc, primitive_params, desc_params
4967 ),
4968 timeout=timeout_ns_action,
4969 vca_type=vca_type,
4970 db_dict=db_dict,
4971 vca_id=vca_id,
4972 )
4973
4974 db_nslcmop_update["detailed-status"] = detailed_status
4975 error_description_nslcmop = (
4976 detailed_status if nslcmop_operation_state == "FAILED" else ""
4977 )
4978 self.logger.debug(
4979 logging_text
4980 + " task Done with result {} {}".format(
4981 nslcmop_operation_state, detailed_status
4982 )
4983 )
4984 return # database update is called inside finally
4985
4986 except (DbException, LcmException, N2VCException, K8sException) as e:
4987 self.logger.error(logging_text + "Exit Exception {}".format(e))
4988 exc = e
4989 except asyncio.CancelledError:
4990 self.logger.error(
4991 logging_text + "Cancelled Exception while '{}'".format(step)
4992 )
4993 exc = "Operation was cancelled"
4994 except asyncio.TimeoutError:
4995 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4996 exc = "Timeout"
4997 except Exception as e:
4998 exc = traceback.format_exc()
4999 self.logger.critical(
5000 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5001 exc_info=True,
5002 )
5003 finally:
5004 if exc:
5005 db_nslcmop_update[
5006 "detailed-status"
5007 ] = (
5008 detailed_status
5009 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5010 nslcmop_operation_state = "FAILED"
5011 if db_nsr:
5012 self._write_ns_status(
5013 nsr_id=nsr_id,
5014 ns_state=db_nsr[
5015 "nsState"
5016 ], # TODO check if degraded. For the moment use previous status
5017 current_operation="IDLE",
5018 current_operation_id=None,
5019 # error_description=error_description_nsr,
5020 # error_detail=error_detail,
5021 other_update=db_nsr_update,
5022 )
5023
5024 self._write_op_status(
5025 op_id=nslcmop_id,
5026 stage="",
5027 error_message=error_description_nslcmop,
5028 operation_state=nslcmop_operation_state,
5029 other_update=db_nslcmop_update,
5030 )
5031
5032 if nslcmop_operation_state:
5033 try:
5034 await self.msg.aiowrite(
5035 "ns",
5036 "actioned",
5037 {
5038 "nsr_id": nsr_id,
5039 "nslcmop_id": nslcmop_id,
5040 "operationState": nslcmop_operation_state,
5041 },
5042 loop=self.loop,
5043 )
5044 except Exception as e:
5045 self.logger.error(
5046 logging_text + "kafka_write notification Exception {}".format(e)
5047 )
5048 self.logger.debug(logging_text + "Exit")
5049 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5050 return nslcmop_operation_state, detailed_status
5051
5052 async def scale(self, nsr_id, nslcmop_id):
5053 # Try to lock HA task here
5054 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5055 if not task_is_locked_by_me:
5056 return
5057
5058 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5059 stage = ["", "", ""]
5060 tasks_dict_info = {}
5061 # ^ stage, step, VIM progress
5062 self.logger.debug(logging_text + "Enter")
5063 # get all needed from database
5064 db_nsr = None
5065 db_nslcmop_update = {}
5066 db_nsr_update = {}
5067 exc = None
5068 # in case of error, indicates what part of scale was failed to put nsr at error status
5069 scale_process = None
5070 old_operational_status = ""
5071 old_config_status = ""
5072 nsi_id = None
5073 try:
5074 # wait for any previous tasks in process
5075 step = "Waiting for previous operations to terminate"
5076 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5077 self._write_ns_status(
5078 nsr_id=nsr_id,
5079 ns_state=None,
5080 current_operation="SCALING",
5081 current_operation_id=nslcmop_id,
5082 )
5083
5084 step = "Getting nslcmop from database"
5085 self.logger.debug(
5086 step + " after having waited for previous tasks to be completed"
5087 )
5088 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5089
5090 step = "Getting nsr from database"
5091 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5092 old_operational_status = db_nsr["operational-status"]
5093 old_config_status = db_nsr["config-status"]
5094
5095 step = "Parsing scaling parameters"
5096 db_nsr_update["operational-status"] = "scaling"
5097 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5098 nsr_deployed = db_nsr["_admin"].get("deployed")
5099
5100 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5101 "scaleByStepData"
5102 ]["member-vnf-index"]
5103 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5104 "scaleByStepData"
5105 ]["scaling-group-descriptor"]
5106 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5107 # for backward compatibility
5108 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5109 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5110 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5111 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5112
5113 step = "Getting vnfr from database"
5114 db_vnfr = self.db.get_one(
5115 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5116 )
5117
5118 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5119
5120 step = "Getting vnfd from database"
5121 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5122
5123 base_folder = db_vnfd["_admin"]["storage"]
5124
5125 step = "Getting scaling-group-descriptor"
5126 scaling_descriptor = find_in_list(
5127 get_scaling_aspect(db_vnfd),
5128 lambda scale_desc: scale_desc["name"] == scaling_group,
5129 )
5130 if not scaling_descriptor:
5131 raise LcmException(
5132 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5133 "at vnfd:scaling-group-descriptor".format(scaling_group)
5134 )
5135
5136 step = "Sending scale order to VIM"
5137 # TODO check if ns is in a proper status
5138 nb_scale_op = 0
5139 if not db_nsr["_admin"].get("scaling-group"):
5140 self.update_db_2(
5141 "nsrs",
5142 nsr_id,
5143 {
5144 "_admin.scaling-group": [
5145 {"name": scaling_group, "nb-scale-op": 0}
5146 ]
5147 },
5148 )
5149 admin_scale_index = 0
5150 else:
5151 for admin_scale_index, admin_scale_info in enumerate(
5152 db_nsr["_admin"]["scaling-group"]
5153 ):
5154 if admin_scale_info["name"] == scaling_group:
5155 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5156 break
5157 else: # not found, set index one plus last element and add new entry with the name
5158 admin_scale_index += 1
5159 db_nsr_update[
5160 "_admin.scaling-group.{}.name".format(admin_scale_index)
5161 ] = scaling_group
5162
5163 vca_scaling_info = []
5164 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5165 if scaling_type == "SCALE_OUT":
5166 if "aspect-delta-details" not in scaling_descriptor:
5167 raise LcmException(
5168 "Aspect delta details not fount in scaling descriptor {}".format(
5169 scaling_descriptor["name"]
5170 )
5171 )
5172 # count if max-instance-count is reached
5173 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5174
5175 scaling_info["scaling_direction"] = "OUT"
5176 scaling_info["vdu-create"] = {}
5177 scaling_info["kdu-create"] = {}
5178 for delta in deltas:
5179 for vdu_delta in delta.get("vdu-delta", {}):
5180 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5181 # vdu_index also provides the number of instance of the targeted vdu
5182 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5183 cloud_init_text = self._get_vdu_cloud_init_content(
5184 vdud, db_vnfd
5185 )
5186 if cloud_init_text:
5187 additional_params = (
5188 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5189 or {}
5190 )
5191 cloud_init_list = []
5192
5193 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5194 max_instance_count = 10
5195 if vdu_profile and "max-number-of-instances" in vdu_profile:
5196 max_instance_count = vdu_profile.get(
5197 "max-number-of-instances", 10
5198 )
5199
5200 default_instance_num = get_number_of_instances(
5201 db_vnfd, vdud["id"]
5202 )
5203 instances_number = vdu_delta.get("number-of-instances", 1)
5204 nb_scale_op += instances_number
5205
5206 new_instance_count = nb_scale_op + default_instance_num
5207 # Control if new count is over max and vdu count is less than max.
5208 # Then assign new instance count
5209 if new_instance_count > max_instance_count > vdu_count:
5210 instances_number = new_instance_count - max_instance_count
5211 else:
5212 instances_number = instances_number
5213
5214 if new_instance_count > max_instance_count:
5215 raise LcmException(
5216 "reached the limit of {} (max-instance-count) "
5217 "scaling-out operations for the "
5218 "scaling-group-descriptor '{}'".format(
5219 nb_scale_op, scaling_group
5220 )
5221 )
5222 for x in range(vdu_delta.get("number-of-instances", 1)):
5223 if cloud_init_text:
5224 # TODO Information of its own ip is not available because db_vnfr is not updated.
5225 additional_params["OSM"] = get_osm_params(
5226 db_vnfr, vdu_delta["id"], vdu_index + x
5227 )
5228 cloud_init_list.append(
5229 self._parse_cloud_init(
5230 cloud_init_text,
5231 additional_params,
5232 db_vnfd["id"],
5233 vdud["id"],
5234 )
5235 )
5236 vca_scaling_info.append(
5237 {
5238 "osm_vdu_id": vdu_delta["id"],
5239 "member-vnf-index": vnf_index,
5240 "type": "create",
5241 "vdu_index": vdu_index + x,
5242 }
5243 )
5244 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5245 for kdu_delta in delta.get("kdu-resource-delta", {}):
5246 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5247 kdu_name = kdu_profile["kdu-name"]
5248 resource_name = kdu_profile["resource-name"]
5249
5250 # Might have different kdus in the same delta
5251 # Should have list for each kdu
5252 if not scaling_info["kdu-create"].get(kdu_name, None):
5253 scaling_info["kdu-create"][kdu_name] = []
5254
5255 kdur = get_kdur(db_vnfr, kdu_name)
5256 if kdur.get("helm-chart"):
5257 k8s_cluster_type = "helm-chart-v3"
5258 self.logger.debug("kdur: {}".format(kdur))
5259 if (
5260 kdur.get("helm-version")
5261 and kdur.get("helm-version") == "v2"
5262 ):
5263 k8s_cluster_type = "helm-chart"
5264 raise NotImplementedError
5265 elif kdur.get("juju-bundle"):
5266 k8s_cluster_type = "juju-bundle"
5267 else:
5268 raise LcmException(
5269 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5270 "juju-bundle. Maybe an old NBI version is running".format(
5271 db_vnfr["member-vnf-index-ref"], kdu_name
5272 )
5273 )
5274
5275 max_instance_count = 10
5276 if kdu_profile and "max-number-of-instances" in kdu_profile:
5277 max_instance_count = kdu_profile.get(
5278 "max-number-of-instances", 10
5279 )
5280
5281 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5282 deployed_kdu, _ = get_deployed_kdu(
5283 nsr_deployed, kdu_name, vnf_index
5284 )
5285 if deployed_kdu is None:
5286 raise LcmException(
5287 "KDU '{}' for vnf '{}' not deployed".format(
5288 kdu_name, vnf_index
5289 )
5290 )
5291 kdu_instance = deployed_kdu.get("kdu-instance")
5292 instance_num = await self.k8scluster_map[
5293 k8s_cluster_type
5294 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5295 kdu_replica_count = instance_num + kdu_delta.get(
5296 "number-of-instances", 1
5297 )
5298
5299 # Control if new count is over max and instance_num is less than max.
5300 # Then assign max instance number to kdu replica count
5301 if kdu_replica_count > max_instance_count > instance_num:
5302 kdu_replica_count = max_instance_count
5303 if kdu_replica_count > max_instance_count:
5304 raise LcmException(
5305 "reached the limit of {} (max-instance-count) "
5306 "scaling-out operations for the "
5307 "scaling-group-descriptor '{}'".format(
5308 instance_num, scaling_group
5309 )
5310 )
5311
5312 for x in range(kdu_delta.get("number-of-instances", 1)):
5313 vca_scaling_info.append(
5314 {
5315 "osm_kdu_id": kdu_name,
5316 "member-vnf-index": vnf_index,
5317 "type": "create",
5318 "kdu_index": instance_num + x - 1,
5319 }
5320 )
5321 scaling_info["kdu-create"][kdu_name].append(
5322 {
5323 "member-vnf-index": vnf_index,
5324 "type": "create",
5325 "k8s-cluster-type": k8s_cluster_type,
5326 "resource-name": resource_name,
5327 "scale": kdu_replica_count,
5328 }
5329 )
5330 elif scaling_type == "SCALE_IN":
5331 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5332
5333 scaling_info["scaling_direction"] = "IN"
5334 scaling_info["vdu-delete"] = {}
5335 scaling_info["kdu-delete"] = {}
5336
5337 for delta in deltas:
5338 for vdu_delta in delta.get("vdu-delta", {}):
5339 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5340 min_instance_count = 0
5341 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5342 if vdu_profile and "min-number-of-instances" in vdu_profile:
5343 min_instance_count = vdu_profile["min-number-of-instances"]
5344
5345 default_instance_num = get_number_of_instances(
5346 db_vnfd, vdu_delta["id"]
5347 )
5348 instance_num = vdu_delta.get("number-of-instances", 1)
5349 nb_scale_op -= instance_num
5350
5351 new_instance_count = nb_scale_op + default_instance_num
5352
5353 if new_instance_count < min_instance_count < vdu_count:
5354 instances_number = min_instance_count - new_instance_count
5355 else:
5356 instances_number = instance_num
5357
5358 if new_instance_count < min_instance_count:
5359 raise LcmException(
5360 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5361 "scaling-group-descriptor '{}'".format(
5362 nb_scale_op, scaling_group
5363 )
5364 )
5365 for x in range(vdu_delta.get("number-of-instances", 1)):
5366 vca_scaling_info.append(
5367 {
5368 "osm_vdu_id": vdu_delta["id"],
5369 "member-vnf-index": vnf_index,
5370 "type": "delete",
5371 "vdu_index": vdu_index - 1 - x,
5372 }
5373 )
5374 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5375 for kdu_delta in delta.get("kdu-resource-delta", {}):
5376 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5377 kdu_name = kdu_profile["kdu-name"]
5378 resource_name = kdu_profile["resource-name"]
5379
5380 if not scaling_info["kdu-delete"].get(kdu_name, None):
5381 scaling_info["kdu-delete"][kdu_name] = []
5382
5383 kdur = get_kdur(db_vnfr, kdu_name)
5384 if kdur.get("helm-chart"):
5385 k8s_cluster_type = "helm-chart-v3"
5386 self.logger.debug("kdur: {}".format(kdur))
5387 if (
5388 kdur.get("helm-version")
5389 and kdur.get("helm-version") == "v2"
5390 ):
5391 k8s_cluster_type = "helm-chart"
5392 raise NotImplementedError
5393 elif kdur.get("juju-bundle"):
5394 k8s_cluster_type = "juju-bundle"
5395 else:
5396 raise LcmException(
5397 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5398 "juju-bundle. Maybe an old NBI version is running".format(
5399 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5400 )
5401 )
5402
5403 min_instance_count = 0
5404 if kdu_profile and "min-number-of-instances" in kdu_profile:
5405 min_instance_count = kdu_profile["min-number-of-instances"]
5406
5407 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5408 deployed_kdu, _ = get_deployed_kdu(
5409 nsr_deployed, kdu_name, vnf_index
5410 )
5411 if deployed_kdu is None:
5412 raise LcmException(
5413 "KDU '{}' for vnf '{}' not deployed".format(
5414 kdu_name, vnf_index
5415 )
5416 )
5417 kdu_instance = deployed_kdu.get("kdu-instance")
5418 instance_num = await self.k8scluster_map[
5419 k8s_cluster_type
5420 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5421 kdu_replica_count = instance_num - kdu_delta.get(
5422 "number-of-instances", 1
5423 )
5424
5425 if kdu_replica_count < min_instance_count < instance_num:
5426 kdu_replica_count = min_instance_count
5427 if kdu_replica_count < min_instance_count:
5428 raise LcmException(
5429 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5430 "scaling-group-descriptor '{}'".format(
5431 instance_num, scaling_group
5432 )
5433 )
5434
5435 for x in range(kdu_delta.get("number-of-instances", 1)):
5436 vca_scaling_info.append(
5437 {
5438 "osm_kdu_id": kdu_name,
5439 "member-vnf-index": vnf_index,
5440 "type": "delete",
5441 "kdu_index": instance_num - x - 1,
5442 }
5443 )
5444 scaling_info["kdu-delete"][kdu_name].append(
5445 {
5446 "member-vnf-index": vnf_index,
5447 "type": "delete",
5448 "k8s-cluster-type": k8s_cluster_type,
5449 "resource-name": resource_name,
5450 "scale": kdu_replica_count,
5451 }
5452 )
5453
5454 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5455 vdu_delete = copy(scaling_info.get("vdu-delete"))
5456 if scaling_info["scaling_direction"] == "IN":
5457 for vdur in reversed(db_vnfr["vdur"]):
5458 if vdu_delete.get(vdur["vdu-id-ref"]):
5459 vdu_delete[vdur["vdu-id-ref"]] -= 1
5460 scaling_info["vdu"].append(
5461 {
5462 "name": vdur.get("name") or vdur.get("vdu-name"),
5463 "vdu_id": vdur["vdu-id-ref"],
5464 "interface": [],
5465 }
5466 )
5467 for interface in vdur["interfaces"]:
5468 scaling_info["vdu"][-1]["interface"].append(
5469 {
5470 "name": interface["name"],
5471 "ip_address": interface["ip-address"],
5472 "mac_address": interface.get("mac-address"),
5473 }
5474 )
5475 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5476
5477 # PRE-SCALE BEGIN
5478 step = "Executing pre-scale vnf-config-primitive"
5479 if scaling_descriptor.get("scaling-config-action"):
5480 for scaling_config_action in scaling_descriptor[
5481 "scaling-config-action"
5482 ]:
5483 if (
5484 scaling_config_action.get("trigger") == "pre-scale-in"
5485 and scaling_type == "SCALE_IN"
5486 ) or (
5487 scaling_config_action.get("trigger") == "pre-scale-out"
5488 and scaling_type == "SCALE_OUT"
5489 ):
5490 vnf_config_primitive = scaling_config_action[
5491 "vnf-config-primitive-name-ref"
5492 ]
5493 step = db_nslcmop_update[
5494 "detailed-status"
5495 ] = "executing pre-scale scaling-config-action '{}'".format(
5496 vnf_config_primitive
5497 )
5498
5499 # look for primitive
5500 for config_primitive in (
5501 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5502 ).get("config-primitive", ()):
5503 if config_primitive["name"] == vnf_config_primitive:
5504 break
5505 else:
5506 raise LcmException(
5507 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5508 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5509 "primitive".format(scaling_group, vnf_config_primitive)
5510 )
5511
5512 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5513 if db_vnfr.get("additionalParamsForVnf"):
5514 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5515
5516 scale_process = "VCA"
5517 db_nsr_update["config-status"] = "configuring pre-scaling"
5518 primitive_params = self._map_primitive_params(
5519 config_primitive, {}, vnfr_params
5520 )
5521
5522 # Pre-scale retry check: Check if this sub-operation has been executed before
5523 op_index = self._check_or_add_scale_suboperation(
5524 db_nslcmop,
5525 vnf_index,
5526 vnf_config_primitive,
5527 primitive_params,
5528 "PRE-SCALE",
5529 )
5530 if op_index == self.SUBOPERATION_STATUS_SKIP:
5531 # Skip sub-operation
5532 result = "COMPLETED"
5533 result_detail = "Done"
5534 self.logger.debug(
5535 logging_text
5536 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5537 vnf_config_primitive, result, result_detail
5538 )
5539 )
5540 else:
5541 if op_index == self.SUBOPERATION_STATUS_NEW:
5542 # New sub-operation: Get index of this sub-operation
5543 op_index = (
5544 len(db_nslcmop.get("_admin", {}).get("operations"))
5545 - 1
5546 )
5547 self.logger.debug(
5548 logging_text
5549 + "vnf_config_primitive={} New sub-operation".format(
5550 vnf_config_primitive
5551 )
5552 )
5553 else:
5554 # retry: Get registered params for this existing sub-operation
5555 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5556 op_index
5557 ]
5558 vnf_index = op.get("member_vnf_index")
5559 vnf_config_primitive = op.get("primitive")
5560 primitive_params = op.get("primitive_params")
5561 self.logger.debug(
5562 logging_text
5563 + "vnf_config_primitive={} Sub-operation retry".format(
5564 vnf_config_primitive
5565 )
5566 )
5567 # Execute the primitive, either with new (first-time) or registered (reintent) args
5568 ee_descriptor_id = config_primitive.get(
5569 "execution-environment-ref"
5570 )
5571 primitive_name = config_primitive.get(
5572 "execution-environment-primitive", vnf_config_primitive
5573 )
5574 ee_id, vca_type = self._look_for_deployed_vca(
5575 nsr_deployed["VCA"],
5576 member_vnf_index=vnf_index,
5577 vdu_id=None,
5578 vdu_count_index=None,
5579 ee_descriptor_id=ee_descriptor_id,
5580 )
5581 result, result_detail = await self._ns_execute_primitive(
5582 ee_id,
5583 primitive_name,
5584 primitive_params,
5585 vca_type=vca_type,
5586 vca_id=vca_id,
5587 )
5588 self.logger.debug(
5589 logging_text
5590 + "vnf_config_primitive={} Done with result {} {}".format(
5591 vnf_config_primitive, result, result_detail
5592 )
5593 )
5594 # Update operationState = COMPLETED | FAILED
5595 self._update_suboperation_status(
5596 db_nslcmop, op_index, result, result_detail
5597 )
5598
5599 if result == "FAILED":
5600 raise LcmException(result_detail)
5601 db_nsr_update["config-status"] = old_config_status
5602 scale_process = None
5603 # PRE-SCALE END
5604
5605 db_nsr_update[
5606 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5607 ] = nb_scale_op
5608 db_nsr_update[
5609 "_admin.scaling-group.{}.time".format(admin_scale_index)
5610 ] = time()
5611
5612 # SCALE-IN VCA - BEGIN
5613 if vca_scaling_info:
5614 step = db_nslcmop_update[
5615 "detailed-status"
5616 ] = "Deleting the execution environments"
5617 scale_process = "VCA"
5618 for vca_info in vca_scaling_info:
5619 if vca_info["type"] == "delete":
5620 member_vnf_index = str(vca_info["member-vnf-index"])
5621 self.logger.debug(
5622 logging_text + "vdu info: {}".format(vca_info)
5623 )
5624 if vca_info.get("osm_vdu_id"):
5625 vdu_id = vca_info["osm_vdu_id"]
5626 vdu_index = int(vca_info["vdu_index"])
5627 stage[
5628 1
5629 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5630 member_vnf_index, vdu_id, vdu_index
5631 )
5632 else:
5633 vdu_index = 0
5634 kdu_id = vca_info["osm_kdu_id"]
5635 stage[
5636 1
5637 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5638 member_vnf_index, kdu_id, vdu_index
5639 )
5640 stage[2] = step = "Scaling in VCA"
5641 self._write_op_status(op_id=nslcmop_id, stage=stage)
5642 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5643 config_update = db_nsr["configurationStatus"]
5644 for vca_index, vca in enumerate(vca_update):
5645 if (
5646 (vca or vca.get("ee_id"))
5647 and vca["member-vnf-index"] == member_vnf_index
5648 and vca["vdu_count_index"] == vdu_index
5649 ):
5650 if vca.get("vdu_id"):
5651 config_descriptor = get_configuration(
5652 db_vnfd, vca.get("vdu_id")
5653 )
5654 elif vca.get("kdu_name"):
5655 config_descriptor = get_configuration(
5656 db_vnfd, vca.get("kdu_name")
5657 )
5658 else:
5659 config_descriptor = get_configuration(
5660 db_vnfd, db_vnfd["id"]
5661 )
5662 operation_params = (
5663 db_nslcmop.get("operationParams") or {}
5664 )
5665 exec_terminate_primitives = not operation_params.get(
5666 "skip_terminate_primitives"
5667 ) and vca.get("needed_terminate")
5668 task = asyncio.ensure_future(
5669 asyncio.wait_for(
5670 self.destroy_N2VC(
5671 logging_text,
5672 db_nslcmop,
5673 vca,
5674 config_descriptor,
5675 vca_index,
5676 destroy_ee=True,
5677 exec_primitives=exec_terminate_primitives,
5678 scaling_in=True,
5679 vca_id=vca_id,
5680 ),
5681 timeout=self.timeout_charm_delete,
5682 )
5683 )
5684 tasks_dict_info[task] = "Terminating VCA {}".format(
5685 vca.get("ee_id")
5686 )
5687 del vca_update[vca_index]
5688 del config_update[vca_index]
5689 # wait for pending tasks of terminate primitives
5690 if tasks_dict_info:
5691 self.logger.debug(
5692 logging_text
5693 + "Waiting for tasks {}".format(
5694 list(tasks_dict_info.keys())
5695 )
5696 )
5697 error_list = await self._wait_for_tasks(
5698 logging_text,
5699 tasks_dict_info,
5700 min(
5701 self.timeout_charm_delete, self.timeout_ns_terminate
5702 ),
5703 stage,
5704 nslcmop_id,
5705 )
5706 tasks_dict_info.clear()
5707 if error_list:
5708 raise LcmException("; ".join(error_list))
5709
5710 db_vca_and_config_update = {
5711 "_admin.deployed.VCA": vca_update,
5712 "configurationStatus": config_update,
5713 }
5714 self.update_db_2(
5715 "nsrs", db_nsr["_id"], db_vca_and_config_update
5716 )
5717 scale_process = None
5718 # SCALE-IN VCA - END
5719
5720 # SCALE RO - BEGIN
5721 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5722 scale_process = "RO"
5723 if self.ro_config.get("ng"):
5724 await self._scale_ng_ro(
5725 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5726 )
5727 scaling_info.pop("vdu-create", None)
5728 scaling_info.pop("vdu-delete", None)
5729
5730 scale_process = None
5731 # SCALE RO - END
5732
5733 # SCALE KDU - BEGIN
5734 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5735 scale_process = "KDU"
5736 await self._scale_kdu(
5737 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5738 )
5739 scaling_info.pop("kdu-create", None)
5740 scaling_info.pop("kdu-delete", None)
5741
5742 scale_process = None
5743 # SCALE KDU - END
5744
5745 if db_nsr_update:
5746 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5747
5748 # SCALE-UP VCA - BEGIN
5749 if vca_scaling_info:
5750 step = db_nslcmop_update[
5751 "detailed-status"
5752 ] = "Creating new execution environments"
5753 scale_process = "VCA"
5754 for vca_info in vca_scaling_info:
5755 if vca_info["type"] == "create":
5756 member_vnf_index = str(vca_info["member-vnf-index"])
5757 self.logger.debug(
5758 logging_text + "vdu info: {}".format(vca_info)
5759 )
5760 vnfd_id = db_vnfr["vnfd-ref"]
5761 if vca_info.get("osm_vdu_id"):
5762 vdu_index = int(vca_info["vdu_index"])
5763 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5764 if db_vnfr.get("additionalParamsForVnf"):
5765 deploy_params.update(
5766 parse_yaml_strings(
5767 db_vnfr["additionalParamsForVnf"].copy()
5768 )
5769 )
5770 descriptor_config = get_configuration(
5771 db_vnfd, db_vnfd["id"]
5772 )
5773 if descriptor_config:
5774 vdu_id = None
5775 vdu_name = None
5776 kdu_name = None
5777 self._deploy_n2vc(
5778 logging_text=logging_text
5779 + "member_vnf_index={} ".format(member_vnf_index),
5780 db_nsr=db_nsr,
5781 db_vnfr=db_vnfr,
5782 nslcmop_id=nslcmop_id,
5783 nsr_id=nsr_id,
5784 nsi_id=nsi_id,
5785 vnfd_id=vnfd_id,
5786 vdu_id=vdu_id,
5787 kdu_name=kdu_name,
5788 member_vnf_index=member_vnf_index,
5789 vdu_index=vdu_index,
5790 vdu_name=vdu_name,
5791 deploy_params=deploy_params,
5792 descriptor_config=descriptor_config,
5793 base_folder=base_folder,
5794 task_instantiation_info=tasks_dict_info,
5795 stage=stage,
5796 )
5797 vdu_id = vca_info["osm_vdu_id"]
5798 vdur = find_in_list(
5799 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5800 )
5801 descriptor_config = get_configuration(db_vnfd, vdu_id)
5802 if vdur.get("additionalParams"):
5803 deploy_params_vdu = parse_yaml_strings(
5804 vdur["additionalParams"]
5805 )
5806 else:
5807 deploy_params_vdu = deploy_params
5808 deploy_params_vdu["OSM"] = get_osm_params(
5809 db_vnfr, vdu_id, vdu_count_index=vdu_index
5810 )
5811 if descriptor_config:
5812 vdu_name = None
5813 kdu_name = None
5814 stage[
5815 1
5816 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5817 member_vnf_index, vdu_id, vdu_index
5818 )
5819 stage[2] = step = "Scaling out VCA"
5820 self._write_op_status(op_id=nslcmop_id, stage=stage)
5821 self._deploy_n2vc(
5822 logging_text=logging_text
5823 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5824 member_vnf_index, vdu_id, vdu_index
5825 ),
5826 db_nsr=db_nsr,
5827 db_vnfr=db_vnfr,
5828 nslcmop_id=nslcmop_id,
5829 nsr_id=nsr_id,
5830 nsi_id=nsi_id,
5831 vnfd_id=vnfd_id,
5832 vdu_id=vdu_id,
5833 kdu_name=kdu_name,
5834 member_vnf_index=member_vnf_index,
5835 vdu_index=vdu_index,
5836 vdu_name=vdu_name,
5837 deploy_params=deploy_params_vdu,
5838 descriptor_config=descriptor_config,
5839 base_folder=base_folder,
5840 task_instantiation_info=tasks_dict_info,
5841 stage=stage,
5842 )
5843 else:
5844 kdu_name = vca_info["osm_kdu_id"]
5845 descriptor_config = get_configuration(db_vnfd, kdu_name)
5846 if descriptor_config:
5847 vdu_id = None
5848 kdu_index = int(vca_info["kdu_index"])
5849 vdu_name = None
5850 kdur = next(
5851 x
5852 for x in db_vnfr["kdur"]
5853 if x["kdu-name"] == kdu_name
5854 )
5855 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5856 if kdur.get("additionalParams"):
5857 deploy_params_kdu = parse_yaml_strings(
5858 kdur["additionalParams"]
5859 )
5860
5861 self._deploy_n2vc(
5862 logging_text=logging_text,
5863 db_nsr=db_nsr,
5864 db_vnfr=db_vnfr,
5865 nslcmop_id=nslcmop_id,
5866 nsr_id=nsr_id,
5867 nsi_id=nsi_id,
5868 vnfd_id=vnfd_id,
5869 vdu_id=vdu_id,
5870 kdu_name=kdu_name,
5871 member_vnf_index=member_vnf_index,
5872 vdu_index=kdu_index,
5873 vdu_name=vdu_name,
5874 deploy_params=deploy_params_kdu,
5875 descriptor_config=descriptor_config,
5876 base_folder=base_folder,
5877 task_instantiation_info=tasks_dict_info,
5878 stage=stage,
5879 )
5880 # SCALE-UP VCA - END
5881 scale_process = None
5882
5883 # POST-SCALE BEGIN
5884 # execute primitive service POST-SCALING
5885 step = "Executing post-scale vnf-config-primitive"
5886 if scaling_descriptor.get("scaling-config-action"):
5887 for scaling_config_action in scaling_descriptor[
5888 "scaling-config-action"
5889 ]:
5890 if (
5891 scaling_config_action.get("trigger") == "post-scale-in"
5892 and scaling_type == "SCALE_IN"
5893 ) or (
5894 scaling_config_action.get("trigger") == "post-scale-out"
5895 and scaling_type == "SCALE_OUT"
5896 ):
5897 vnf_config_primitive = scaling_config_action[
5898 "vnf-config-primitive-name-ref"
5899 ]
5900 step = db_nslcmop_update[
5901 "detailed-status"
5902 ] = "executing post-scale scaling-config-action '{}'".format(
5903 vnf_config_primitive
5904 )
5905
5906 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5907 if db_vnfr.get("additionalParamsForVnf"):
5908 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5909
5910 # look for primitive
5911 for config_primitive in (
5912 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5913 ).get("config-primitive", ()):
5914 if config_primitive["name"] == vnf_config_primitive:
5915 break
5916 else:
5917 raise LcmException(
5918 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5919 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5920 "config-primitive".format(
5921 scaling_group, vnf_config_primitive
5922 )
5923 )
5924 scale_process = "VCA"
5925 db_nsr_update["config-status"] = "configuring post-scaling"
5926 primitive_params = self._map_primitive_params(
5927 config_primitive, {}, vnfr_params
5928 )
5929
5930 # Post-scale retry check: Check if this sub-operation has been executed before
5931 op_index = self._check_or_add_scale_suboperation(
5932 db_nslcmop,
5933 vnf_index,
5934 vnf_config_primitive,
5935 primitive_params,
5936 "POST-SCALE",
5937 )
5938 if op_index == self.SUBOPERATION_STATUS_SKIP:
5939 # Skip sub-operation
5940 result = "COMPLETED"
5941 result_detail = "Done"
5942 self.logger.debug(
5943 logging_text
5944 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5945 vnf_config_primitive, result, result_detail
5946 )
5947 )
5948 else:
5949 if op_index == self.SUBOPERATION_STATUS_NEW:
5950 # New sub-operation: Get index of this sub-operation
5951 op_index = (
5952 len(db_nslcmop.get("_admin", {}).get("operations"))
5953 - 1
5954 )
5955 self.logger.debug(
5956 logging_text
5957 + "vnf_config_primitive={} New sub-operation".format(
5958 vnf_config_primitive
5959 )
5960 )
5961 else:
5962 # retry: Get registered params for this existing sub-operation
5963 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5964 op_index
5965 ]
5966 vnf_index = op.get("member_vnf_index")
5967 vnf_config_primitive = op.get("primitive")
5968 primitive_params = op.get("primitive_params")
5969 self.logger.debug(
5970 logging_text
5971 + "vnf_config_primitive={} Sub-operation retry".format(
5972 vnf_config_primitive
5973 )
5974 )
5975 # Execute the primitive, either with new (first-time) or registered (reintent) args
5976 ee_descriptor_id = config_primitive.get(
5977 "execution-environment-ref"
5978 )
5979 primitive_name = config_primitive.get(
5980 "execution-environment-primitive", vnf_config_primitive
5981 )
5982 ee_id, vca_type = self._look_for_deployed_vca(
5983 nsr_deployed["VCA"],
5984 member_vnf_index=vnf_index,
5985 vdu_id=None,
5986 vdu_count_index=None,
5987 ee_descriptor_id=ee_descriptor_id,
5988 )
5989 result, result_detail = await self._ns_execute_primitive(
5990 ee_id,
5991 primitive_name,
5992 primitive_params,
5993 vca_type=vca_type,
5994 vca_id=vca_id,
5995 )
5996 self.logger.debug(
5997 logging_text
5998 + "vnf_config_primitive={} Done with result {} {}".format(
5999 vnf_config_primitive, result, result_detail
6000 )
6001 )
6002 # Update operationState = COMPLETED | FAILED
6003 self._update_suboperation_status(
6004 db_nslcmop, op_index, result, result_detail
6005 )
6006
6007 if result == "FAILED":
6008 raise LcmException(result_detail)
6009 db_nsr_update["config-status"] = old_config_status
6010 scale_process = None
6011 # POST-SCALE END
6012
6013 db_nsr_update[
6014 "detailed-status"
6015 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6016 db_nsr_update["operational-status"] = (
6017 "running"
6018 if old_operational_status == "failed"
6019 else old_operational_status
6020 )
6021 db_nsr_update["config-status"] = old_config_status
6022 return
6023 except (
6024 ROclient.ROClientException,
6025 DbException,
6026 LcmException,
6027 NgRoException,
6028 ) as e:
6029 self.logger.error(logging_text + "Exit Exception {}".format(e))
6030 exc = e
6031 except asyncio.CancelledError:
6032 self.logger.error(
6033 logging_text + "Cancelled Exception while '{}'".format(step)
6034 )
6035 exc = "Operation was cancelled"
6036 except Exception as e:
6037 exc = traceback.format_exc()
6038 self.logger.critical(
6039 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6040 exc_info=True,
6041 )
6042 finally:
6043 self._write_ns_status(
6044 nsr_id=nsr_id,
6045 ns_state=None,
6046 current_operation="IDLE",
6047 current_operation_id=None,
6048 )
6049 if tasks_dict_info:
6050 stage[1] = "Waiting for instantiate pending tasks."
6051 self.logger.debug(logging_text + stage[1])
6052 exc = await self._wait_for_tasks(
6053 logging_text,
6054 tasks_dict_info,
6055 self.timeout_ns_deploy,
6056 stage,
6057 nslcmop_id,
6058 nsr_id=nsr_id,
6059 )
6060 if exc:
6061 db_nslcmop_update[
6062 "detailed-status"
6063 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6064 nslcmop_operation_state = "FAILED"
6065 if db_nsr:
6066 db_nsr_update["operational-status"] = old_operational_status
6067 db_nsr_update["config-status"] = old_config_status
6068 db_nsr_update["detailed-status"] = ""
6069 if scale_process:
6070 if "VCA" in scale_process:
6071 db_nsr_update["config-status"] = "failed"
6072 if "RO" in scale_process:
6073 db_nsr_update["operational-status"] = "failed"
6074 db_nsr_update[
6075 "detailed-status"
6076 ] = "FAILED scaling nslcmop={} {}: {}".format(
6077 nslcmop_id, step, exc
6078 )
6079 else:
6080 error_description_nslcmop = None
6081 nslcmop_operation_state = "COMPLETED"
6082 db_nslcmop_update["detailed-status"] = "Done"
6083
6084 self._write_op_status(
6085 op_id=nslcmop_id,
6086 stage="",
6087 error_message=error_description_nslcmop,
6088 operation_state=nslcmop_operation_state,
6089 other_update=db_nslcmop_update,
6090 )
6091 if db_nsr:
6092 self._write_ns_status(
6093 nsr_id=nsr_id,
6094 ns_state=None,
6095 current_operation="IDLE",
6096 current_operation_id=None,
6097 other_update=db_nsr_update,
6098 )
6099
6100 if nslcmop_operation_state:
6101 try:
6102 msg = {
6103 "nsr_id": nsr_id,
6104 "nslcmop_id": nslcmop_id,
6105 "operationState": nslcmop_operation_state,
6106 }
6107 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6108 except Exception as e:
6109 self.logger.error(
6110 logging_text + "kafka_write notification Exception {}".format(e)
6111 )
6112 self.logger.debug(logging_text + "Exit")
6113 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6114
6115 async def _scale_kdu(
6116 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6117 ):
6118 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6119 for kdu_name in _scaling_info:
6120 for kdu_scaling_info in _scaling_info[kdu_name]:
6121 deployed_kdu, index = get_deployed_kdu(
6122 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6123 )
6124 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6125 kdu_instance = deployed_kdu["kdu-instance"]
6126 scale = int(kdu_scaling_info["scale"])
6127 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6128
6129 db_dict = {
6130 "collection": "nsrs",
6131 "filter": {"_id": nsr_id},
6132 "path": "_admin.deployed.K8s.{}".format(index),
6133 }
6134
6135 step = "scaling application {}".format(
6136 kdu_scaling_info["resource-name"]
6137 )
6138 self.logger.debug(logging_text + step)
6139
6140 if kdu_scaling_info["type"] == "delete":
6141 kdu_config = get_configuration(db_vnfd, kdu_name)
6142 if (
6143 kdu_config
6144 and kdu_config.get("terminate-config-primitive")
6145 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6146 ):
6147 terminate_config_primitive_list = kdu_config.get(
6148 "terminate-config-primitive"
6149 )
6150 terminate_config_primitive_list.sort(
6151 key=lambda val: int(val["seq"])
6152 )
6153
6154 for (
6155 terminate_config_primitive
6156 ) in terminate_config_primitive_list:
6157 primitive_params_ = self._map_primitive_params(
6158 terminate_config_primitive, {}, {}
6159 )
6160 step = "execute terminate config primitive"
6161 self.logger.debug(logging_text + step)
6162 await asyncio.wait_for(
6163 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6164 cluster_uuid=cluster_uuid,
6165 kdu_instance=kdu_instance,
6166 primitive_name=terminate_config_primitive["name"],
6167 params=primitive_params_,
6168 db_dict=db_dict,
6169 vca_id=vca_id,
6170 ),
6171 timeout=600,
6172 )
6173
6174 await asyncio.wait_for(
6175 self.k8scluster_map[k8s_cluster_type].scale(
6176 kdu_instance,
6177 scale,
6178 kdu_scaling_info["resource-name"],
6179 vca_id=vca_id,
6180 ),
6181 timeout=self.timeout_vca_on_error,
6182 )
6183
6184 if kdu_scaling_info["type"] == "create":
6185 kdu_config = get_configuration(db_vnfd, kdu_name)
6186 if (
6187 kdu_config
6188 and kdu_config.get("initial-config-primitive")
6189 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6190 ):
6191 initial_config_primitive_list = kdu_config.get(
6192 "initial-config-primitive"
6193 )
6194 initial_config_primitive_list.sort(
6195 key=lambda val: int(val["seq"])
6196 )
6197
6198 for initial_config_primitive in initial_config_primitive_list:
6199 primitive_params_ = self._map_primitive_params(
6200 initial_config_primitive, {}, {}
6201 )
6202 step = "execute initial config primitive"
6203 self.logger.debug(logging_text + step)
6204 await asyncio.wait_for(
6205 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6206 cluster_uuid=cluster_uuid,
6207 kdu_instance=kdu_instance,
6208 primitive_name=initial_config_primitive["name"],
6209 params=primitive_params_,
6210 db_dict=db_dict,
6211 vca_id=vca_id,
6212 ),
6213 timeout=600,
6214 )
6215
6216 async def _scale_ng_ro(
6217 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6218 ):
6219 nsr_id = db_nslcmop["nsInstanceId"]
6220 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6221 db_vnfrs = {}
6222
6223 # read from db: vnfd's for every vnf
6224 db_vnfds = []
6225
6226 # for each vnf in ns, read vnfd
6227 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6228 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6229 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6230 # if we haven't this vnfd, read it from db
6231 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6232 # read from db
6233 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6234 db_vnfds.append(vnfd)
6235 n2vc_key = self.n2vc.get_public_key()
6236 n2vc_key_list = [n2vc_key]
6237 self.scale_vnfr(
6238 db_vnfr,
6239 vdu_scaling_info.get("vdu-create"),
6240 vdu_scaling_info.get("vdu-delete"),
6241 mark_delete=True,
6242 )
6243 # db_vnfr has been updated, update db_vnfrs to use it
6244 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6245 await self._instantiate_ng_ro(
6246 logging_text,
6247 nsr_id,
6248 db_nsd,
6249 db_nsr,
6250 db_nslcmop,
6251 db_vnfrs,
6252 db_vnfds,
6253 n2vc_key_list,
6254 stage=stage,
6255 start_deploy=time(),
6256 timeout_ns_deploy=self.timeout_ns_deploy,
6257 )
6258 if vdu_scaling_info.get("vdu-delete"):
6259 self.scale_vnfr(
6260 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6261 )
6262
6263 async def add_prometheus_metrics(
6264 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6265 ):
6266 if not self.prometheus:
6267 return
6268 # look if exist a file called 'prometheus*.j2' and
6269 artifact_content = self.fs.dir_ls(artifact_path)
6270 job_file = next(
6271 (
6272 f
6273 for f in artifact_content
6274 if f.startswith("prometheus") and f.endswith(".j2")
6275 ),
6276 None,
6277 )
6278 if not job_file:
6279 return
6280 with self.fs.file_open((artifact_path, job_file), "r") as f:
6281 job_data = f.read()
6282
6283 # TODO get_service
6284 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6285 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6286 host_port = "80"
6287 vnfr_id = vnfr_id.replace("-", "")
6288 variables = {
6289 "JOB_NAME": vnfr_id,
6290 "TARGET_IP": target_ip,
6291 "EXPORTER_POD_IP": host_name,
6292 "EXPORTER_POD_PORT": host_port,
6293 }
6294 job_list = self.prometheus.parse_job(job_data, variables)
6295 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6296 for job in job_list:
6297 if (
6298 not isinstance(job.get("job_name"), str)
6299 or vnfr_id not in job["job_name"]
6300 ):
6301 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6302 job["nsr_id"] = nsr_id
6303 job_dict = {jl["job_name"]: jl for jl in job_list}
6304 if await self.prometheus.update(job_dict):
6305 return list(job_dict.keys())
6306
6307 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6308 """
6309 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6310
6311 :param: vim_account_id: VIM Account ID
6312
6313 :return: (cloud_name, cloud_credential)
6314 """
6315 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6316 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6317
6318 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6319 """
6320 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6321
6322 :param: vim_account_id: VIM Account ID
6323
6324 :return: (cloud_name, cloud_credential)
6325 """
6326 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6327 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")