6df1c0af241a45d488faae88fc724b798703a7c1
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73 from osm_lcm.data_utils.wim import (
74 get_sdn_ports,
75 get_target_wim_attrs,
76 select_feasible_wim_account,
77 )
78
79 from n2vc.n2vc_juju_conn import N2VCJujuConnector
80 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
81
82 from osm_lcm.lcm_helm_conn import LCMHelmConn
83
84 from copy import copy, deepcopy
85 from time import time
86 from uuid import uuid4
87
88 from random import randint
89
90 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
91
92
93 class NsLcm(LcmBase):
94 timeout_vca_on_error = (
95 5 * 60
96 ) # Time for charm from first time at blocked,error status to mark as failed
97 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
98 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
99 timeout_charm_delete = 10 * 60
100 timeout_primitive = 30 * 60 # timeout for primitive execution
101 timeout_progress_primitive = (
102 10 * 60
103 ) # timeout for some progress in a primitive execution
104
105 SUBOPERATION_STATUS_NOT_FOUND = -1
106 SUBOPERATION_STATUS_NEW = -2
107 SUBOPERATION_STATUS_SKIP = -3
108 task_name_deploy_vca = "Deploying VCA"
109
110 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
111 """
112 Init, Connect to database, filesystem storage, and messaging
113 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
114 :return: None
115 """
116 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
117
118 self.db = Database().instance.db
119 self.fs = Filesystem().instance.fs
120 self.loop = loop
121 self.lcm_tasks = lcm_tasks
122 self.timeout = config["timeout"]
123 self.ro_config = config["ro_config"]
124 self.ng_ro = config["ro_config"].get("ng")
125 self.vca_config = config["VCA"].copy()
126
127 # create N2VC connector
128 self.n2vc = N2VCJujuConnector(
129 log=self.logger,
130 loop=self.loop,
131 on_update_db=self._on_update_n2vc_db,
132 fs=self.fs,
133 db=self.db,
134 )
135
136 self.conn_helm_ee = LCMHelmConn(
137 log=self.logger,
138 loop=self.loop,
139 vca_config=self.vca_config,
140 on_update_db=self._on_update_n2vc_db,
141 )
142
143 self.k8sclusterhelm2 = K8sHelmConnector(
144 kubectl_command=self.vca_config.get("kubectlpath"),
145 helm_command=self.vca_config.get("helmpath"),
146 log=self.logger,
147 on_update_db=None,
148 fs=self.fs,
149 db=self.db,
150 )
151
152 self.k8sclusterhelm3 = K8sHelm3Connector(
153 kubectl_command=self.vca_config.get("kubectlpath"),
154 helm_command=self.vca_config.get("helm3path"),
155 fs=self.fs,
156 log=self.logger,
157 db=self.db,
158 on_update_db=None,
159 )
160
161 self.k8sclusterjuju = K8sJujuConnector(
162 kubectl_command=self.vca_config.get("kubectlpath"),
163 juju_command=self.vca_config.get("jujupath"),
164 log=self.logger,
165 loop=self.loop,
166 on_update_db=self._on_update_k8s_db,
167 fs=self.fs,
168 db=self.db,
169 )
170
171 self.k8scluster_map = {
172 "helm-chart": self.k8sclusterhelm2,
173 "helm-chart-v3": self.k8sclusterhelm3,
174 "chart": self.k8sclusterhelm3,
175 "juju-bundle": self.k8sclusterjuju,
176 "juju": self.k8sclusterjuju,
177 }
178
179 self.vca_map = {
180 "lxc_proxy_charm": self.n2vc,
181 "native_charm": self.n2vc,
182 "k8s_proxy_charm": self.n2vc,
183 "helm": self.conn_helm_ee,
184 "helm-v3": self.conn_helm_ee,
185 }
186
187 self.prometheus = prometheus
188
189 # create RO client
190 self.RO = NgRoClient(self.loop, **self.ro_config)
191
192 @staticmethod
193 def increment_ip_mac(ip_mac, vm_index=1):
194 if not isinstance(ip_mac, str):
195 return ip_mac
196 try:
197 # try with ipv4 look for last dot
198 i = ip_mac.rfind(".")
199 if i > 0:
200 i += 1
201 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
202 # try with ipv6 or mac look for last colon. Operate in hex
203 i = ip_mac.rfind(":")
204 if i > 0:
205 i += 1
206 # format in hex, len can be 2 for mac or 4 for ipv6
207 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
208 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
209 )
210 except Exception:
211 pass
212 return None
213
214 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
215
216 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
217
218 try:
219 # TODO filter RO descriptor fields...
220
221 # write to database
222 db_dict = dict()
223 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
224 db_dict["deploymentStatus"] = ro_descriptor
225 self.update_db_2("nsrs", nsrs_id, db_dict)
226
227 except Exception as e:
228 self.logger.warn(
229 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
230 )
231
232 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
233
234 # remove last dot from path (if exists)
235 if path.endswith("."):
236 path = path[:-1]
237
238 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
239 # .format(table, filter, path, updated_data))
240 try:
241
242 nsr_id = filter.get("_id")
243
244 # read ns record from database
245 nsr = self.db.get_one(table="nsrs", q_filter=filter)
246 current_ns_status = nsr.get("nsState")
247
248 # get vca status for NS
249 status_dict = await self.n2vc.get_status(
250 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
251 )
252
253 # vcaStatus
254 db_dict = dict()
255 db_dict["vcaStatus"] = status_dict
256 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
257
258 # update configurationStatus for this VCA
259 try:
260 vca_index = int(path[path.rfind(".") + 1 :])
261
262 vca_list = deep_get(
263 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
264 )
265 vca_status = vca_list[vca_index].get("status")
266
267 configuration_status_list = nsr.get("configurationStatus")
268 config_status = configuration_status_list[vca_index].get("status")
269
270 if config_status == "BROKEN" and vca_status != "failed":
271 db_dict["configurationStatus"][vca_index] = "READY"
272 elif config_status != "BROKEN" and vca_status == "failed":
273 db_dict["configurationStatus"][vca_index] = "BROKEN"
274 except Exception as e:
275 # not update configurationStatus
276 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
277
278 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
279 # if nsState = 'DEGRADED' check if all is OK
280 is_degraded = False
281 if current_ns_status in ("READY", "DEGRADED"):
282 error_description = ""
283 # check machines
284 if status_dict.get("machines"):
285 for machine_id in status_dict.get("machines"):
286 machine = status_dict.get("machines").get(machine_id)
287 # check machine agent-status
288 if machine.get("agent-status"):
289 s = machine.get("agent-status").get("status")
290 if s != "started":
291 is_degraded = True
292 error_description += (
293 "machine {} agent-status={} ; ".format(
294 machine_id, s
295 )
296 )
297 # check machine instance status
298 if machine.get("instance-status"):
299 s = machine.get("instance-status").get("status")
300 if s != "running":
301 is_degraded = True
302 error_description += (
303 "machine {} instance-status={} ; ".format(
304 machine_id, s
305 )
306 )
307 # check applications
308 if status_dict.get("applications"):
309 for app_id in status_dict.get("applications"):
310 app = status_dict.get("applications").get(app_id)
311 # check application status
312 if app.get("status"):
313 s = app.get("status").get("status")
314 if s != "active":
315 is_degraded = True
316 error_description += (
317 "application {} status={} ; ".format(app_id, s)
318 )
319
320 if error_description:
321 db_dict["errorDescription"] = error_description
322 if current_ns_status == "READY" and is_degraded:
323 db_dict["nsState"] = "DEGRADED"
324 if current_ns_status == "DEGRADED" and not is_degraded:
325 db_dict["nsState"] = "READY"
326
327 # write to database
328 self.update_db_2("nsrs", nsr_id, db_dict)
329
330 except (asyncio.CancelledError, asyncio.TimeoutError):
331 raise
332 except Exception as e:
333 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
334
335 async def _on_update_k8s_db(
336 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
337 ):
338 """
339 Updating vca status in NSR record
340 :param cluster_uuid: UUID of a k8s cluster
341 :param kdu_instance: The unique name of the KDU instance
342 :param filter: To get nsr_id
343 :cluster_type: The cluster type (juju, k8s)
344 :return: none
345 """
346
347 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
348 # .format(cluster_uuid, kdu_instance, filter))
349
350 nsr_id = filter.get("_id")
351 try:
352 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
353 cluster_uuid=cluster_uuid,
354 kdu_instance=kdu_instance,
355 yaml_format=False,
356 complete_status=True,
357 vca_id=vca_id,
358 )
359
360 # vcaStatus
361 db_dict = dict()
362 db_dict["vcaStatus"] = {nsr_id: vca_status}
363
364 if cluster_type in ("juju-bundle", "juju"):
365 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
366 # status in a similar way between Juju Bundles and Helm Charts on this side
367 await self.k8sclusterjuju.update_vca_status(
368 db_dict["vcaStatus"],
369 kdu_instance,
370 vca_id=vca_id,
371 )
372
373 self.logger.debug(
374 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
375 )
376
377 # write to database
378 self.update_db_2("nsrs", nsr_id, db_dict)
379 except (asyncio.CancelledError, asyncio.TimeoutError):
380 raise
381 except Exception as e:
382 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
383
384 @staticmethod
385 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
386 try:
387 env = Environment(undefined=StrictUndefined)
388 template = env.from_string(cloud_init_text)
389 return template.render(additional_params or {})
390 except UndefinedError as e:
391 raise LcmException(
392 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
393 "file, must be provided in the instantiation parameters inside the "
394 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
395 )
396 except (TemplateError, TemplateNotFound) as e:
397 raise LcmException(
398 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
399 vnfd_id, vdu_id, e
400 )
401 )
402
403 def _get_vdu_cloud_init_content(self, vdu, vnfd):
404 cloud_init_content = cloud_init_file = None
405 try:
406 if vdu.get("cloud-init-file"):
407 base_folder = vnfd["_admin"]["storage"]
408 cloud_init_file = "{}/{}/cloud_init/{}".format(
409 base_folder["folder"],
410 base_folder["pkg-dir"],
411 vdu["cloud-init-file"],
412 )
413 with self.fs.file_open(cloud_init_file, "r") as ci_file:
414 cloud_init_content = ci_file.read()
415 elif vdu.get("cloud-init"):
416 cloud_init_content = vdu["cloud-init"]
417
418 return cloud_init_content
419 except FsException as e:
420 raise LcmException(
421 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
422 vnfd["id"], vdu["id"], cloud_init_file, e
423 )
424 )
425
426 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
427 vdur = next(
428 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
429 {}
430 )
431 additional_params = vdur.get("additionalParams")
432 return parse_yaml_strings(additional_params)
433
434 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
435 """
436 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
437 :param vnfd: input vnfd
438 :param new_id: overrides vnf id if provided
439 :param additionalParams: Instantiation params for VNFs provided
440 :param nsrId: Id of the NSR
441 :return: copy of vnfd
442 """
443 vnfd_RO = deepcopy(vnfd)
444 # remove unused by RO configuration, monitoring, scaling and internal keys
445 vnfd_RO.pop("_id", None)
446 vnfd_RO.pop("_admin", None)
447 vnfd_RO.pop("monitoring-param", None)
448 vnfd_RO.pop("scaling-group-descriptor", None)
449 vnfd_RO.pop("kdu", None)
450 vnfd_RO.pop("k8s-cluster", None)
451 if new_id:
452 vnfd_RO["id"] = new_id
453
454 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
455 for vdu in get_iterable(vnfd_RO, "vdu"):
456 vdu.pop("cloud-init-file", None)
457 vdu.pop("cloud-init", None)
458 return vnfd_RO
459
460 @staticmethod
461 def ip_profile_2_RO(ip_profile):
462 RO_ip_profile = deepcopy(ip_profile)
463 if "dns-server" in RO_ip_profile:
464 if isinstance(RO_ip_profile["dns-server"], list):
465 RO_ip_profile["dns-address"] = []
466 for ds in RO_ip_profile.pop("dns-server"):
467 RO_ip_profile["dns-address"].append(ds["address"])
468 else:
469 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
470 if RO_ip_profile.get("ip-version") == "ipv4":
471 RO_ip_profile["ip-version"] = "IPv4"
472 if RO_ip_profile.get("ip-version") == "ipv6":
473 RO_ip_profile["ip-version"] = "IPv6"
474 if "dhcp-params" in RO_ip_profile:
475 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
476 return RO_ip_profile
477
478 def _get_ro_vim_id_for_vim_account(self, vim_account):
479 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
480 if db_vim["_admin"]["operationalState"] != "ENABLED":
481 raise LcmException(
482 "VIM={} is not available. operationalState={}".format(
483 vim_account, db_vim["_admin"]["operationalState"]
484 )
485 )
486 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
487 return RO_vim_id
488
489 def get_ro_wim_id_for_wim_account(self, wim_account):
490 if isinstance(wim_account, str):
491 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
492 if db_wim["_admin"]["operationalState"] != "ENABLED":
493 raise LcmException(
494 "WIM={} is not available. operationalState={}".format(
495 wim_account, db_wim["_admin"]["operationalState"]
496 )
497 )
498 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
499 return RO_wim_id
500 else:
501 return wim_account
502
503 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
504
505 db_vdu_push_list = []
506 template_vdur = []
507 db_update = {"_admin.modified": time()}
508 if vdu_create:
509 for vdu_id, vdu_count in vdu_create.items():
510 vdur = next(
511 (
512 vdur
513 for vdur in reversed(db_vnfr["vdur"])
514 if vdur["vdu-id-ref"] == vdu_id
515 ),
516 None,
517 )
518 if not vdur:
519 # Read the template saved in the db:
520 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
521 vdur_template = db_vnfr.get("vdur-template")
522 if not vdur_template:
523 raise LcmException(
524 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
525 vdu_id
526 )
527 )
528 vdur = vdur_template[0]
529 #Delete a template from the database after using it
530 self.db.set_one("vnfrs",
531 {"_id": db_vnfr["_id"]},
532 None,
533 pull={"vdur-template": {"_id": vdur['_id']}}
534 )
535 for count in range(vdu_count):
536 vdur_copy = deepcopy(vdur)
537 vdur_copy["status"] = "BUILD"
538 vdur_copy["status-detailed"] = None
539 vdur_copy["ip-address"] = None
540 vdur_copy["_id"] = str(uuid4())
541 vdur_copy["count-index"] += count + 1
542 vdur_copy["id"] = "{}-{}".format(
543 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
544 )
545 vdur_copy.pop("vim_info", None)
546 for iface in vdur_copy["interfaces"]:
547 if iface.get("fixed-ip"):
548 iface["ip-address"] = self.increment_ip_mac(
549 iface["ip-address"], count + 1
550 )
551 else:
552 iface.pop("ip-address", None)
553 if iface.get("fixed-mac"):
554 iface["mac-address"] = self.increment_ip_mac(
555 iface["mac-address"], count + 1
556 )
557 else:
558 iface.pop("mac-address", None)
559 if db_vnfr["vdur"]:
560 iface.pop(
561 "mgmt_vnf", None
562 ) # only first vdu can be managment of vnf
563 db_vdu_push_list.append(vdur_copy)
564 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
565 if vdu_delete:
566 if len(db_vnfr["vdur"]) == 1:
567 # The scale will move to 0 instances
568 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
569 template_vdur = [db_vnfr["vdur"][0]]
570 for vdu_id, vdu_count in vdu_delete.items():
571 if mark_delete:
572 indexes_to_delete = [
573 iv[0]
574 for iv in enumerate(db_vnfr["vdur"])
575 if iv[1]["vdu-id-ref"] == vdu_id
576 ]
577 db_update.update(
578 {
579 "vdur.{}.status".format(i): "DELETING"
580 for i in indexes_to_delete[-vdu_count:]
581 }
582 )
583 else:
584 # it must be deleted one by one because common.db does not allow otherwise
585 vdus_to_delete = [
586 v
587 for v in reversed(db_vnfr["vdur"])
588 if v["vdu-id-ref"] == vdu_id
589 ]
590 for vdu in vdus_to_delete[:vdu_count]:
591 self.db.set_one(
592 "vnfrs",
593 {"_id": db_vnfr["_id"]},
594 None,
595 pull={"vdur": {"_id": vdu["_id"]}},
596 )
597 db_push = {}
598 if db_vdu_push_list:
599 db_push["vdur"] = db_vdu_push_list
600 if template_vdur:
601 db_push["vdur-template"] = template_vdur
602 if not db_push:
603 db_push = None
604 db_vnfr["vdur-template"] = template_vdur
605 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
606 # modify passed dictionary db_vnfr
607 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
608 db_vnfr["vdur"] = db_vnfr_["vdur"]
609
610 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
611 """
612 Updates database nsr with the RO info for the created vld
613 :param ns_update_nsr: dictionary to be filled with the updated info
614 :param db_nsr: content of db_nsr. This is also modified
615 :param nsr_desc_RO: nsr descriptor from RO
616 :return: Nothing, LcmException is raised on errors
617 """
618
619 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
620 for net_RO in get_iterable(nsr_desc_RO, "nets"):
621 if vld["id"] != net_RO.get("ns_net_osm_id"):
622 continue
623 vld["vim-id"] = net_RO.get("vim_net_id")
624 vld["name"] = net_RO.get("vim_name")
625 vld["status"] = net_RO.get("status")
626 vld["status-detailed"] = net_RO.get("error_msg")
627 ns_update_nsr["vld.{}".format(vld_index)] = vld
628 break
629 else:
630 raise LcmException(
631 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
632 )
633
634 def set_vnfr_at_error(self, db_vnfrs, error_text):
635 try:
636 for db_vnfr in db_vnfrs.values():
637 vnfr_update = {"status": "ERROR"}
638 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
639 if "status" not in vdur:
640 vdur["status"] = "ERROR"
641 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
642 if error_text:
643 vdur["status-detailed"] = str(error_text)
644 vnfr_update[
645 "vdur.{}.status-detailed".format(vdu_index)
646 ] = "ERROR"
647 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
648 except DbException as e:
649 self.logger.error("Cannot update vnf. {}".format(e))
650
651 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
652 """
653 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
654 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
655 :param nsr_desc_RO: nsr descriptor from RO
656 :return: Nothing, LcmException is raised on errors
657 """
658 for vnf_index, db_vnfr in db_vnfrs.items():
659 for vnf_RO in nsr_desc_RO["vnfs"]:
660 if vnf_RO["member_vnf_index"] != vnf_index:
661 continue
662 vnfr_update = {}
663 if vnf_RO.get("ip_address"):
664 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
665 "ip_address"
666 ].split(";")[0]
667 elif not db_vnfr.get("ip-address"):
668 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
669 raise LcmExceptionNoMgmtIP(
670 "ns member_vnf_index '{}' has no IP address".format(
671 vnf_index
672 )
673 )
674
675 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
676 vdur_RO_count_index = 0
677 if vdur.get("pdu-type"):
678 continue
679 for vdur_RO in get_iterable(vnf_RO, "vms"):
680 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
681 continue
682 if vdur["count-index"] != vdur_RO_count_index:
683 vdur_RO_count_index += 1
684 continue
685 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
686 if vdur_RO.get("ip_address"):
687 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
688 else:
689 vdur["ip-address"] = None
690 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
691 vdur["name"] = vdur_RO.get("vim_name")
692 vdur["status"] = vdur_RO.get("status")
693 vdur["status-detailed"] = vdur_RO.get("error_msg")
694 for ifacer in get_iterable(vdur, "interfaces"):
695 for interface_RO in get_iterable(vdur_RO, "interfaces"):
696 if ifacer["name"] == interface_RO.get("internal_name"):
697 ifacer["ip-address"] = interface_RO.get(
698 "ip_address"
699 )
700 ifacer["mac-address"] = interface_RO.get(
701 "mac_address"
702 )
703 break
704 else:
705 raise LcmException(
706 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
707 "from VIM info".format(
708 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
709 )
710 )
711 vnfr_update["vdur.{}".format(vdu_index)] = vdur
712 break
713 else:
714 raise LcmException(
715 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
716 "VIM info".format(
717 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
718 )
719 )
720
721 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
722 for net_RO in get_iterable(nsr_desc_RO, "nets"):
723 if vld["id"] != net_RO.get("vnf_net_osm_id"):
724 continue
725 vld["vim-id"] = net_RO.get("vim_net_id")
726 vld["name"] = net_RO.get("vim_name")
727 vld["status"] = net_RO.get("status")
728 vld["status-detailed"] = net_RO.get("error_msg")
729 vnfr_update["vld.{}".format(vld_index)] = vld
730 break
731 else:
732 raise LcmException(
733 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
734 vnf_index, vld["id"]
735 )
736 )
737
738 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
739 break
740
741 else:
742 raise LcmException(
743 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
744 vnf_index
745 )
746 )
747
748 def _get_ns_config_info(self, nsr_id):
749 """
750 Generates a mapping between vnf,vdu elements and the N2VC id
751 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
752 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
753 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
754 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
755 """
756 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
757 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
758 mapping = {}
759 ns_config_info = {"osm-config-mapping": mapping}
760 for vca in vca_deployed_list:
761 if not vca["member-vnf-index"]:
762 continue
763 if not vca["vdu_id"]:
764 mapping[vca["member-vnf-index"]] = vca["application"]
765 else:
766 mapping[
767 "{}.{}.{}".format(
768 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
769 )
770 ] = vca["application"]
771 return ns_config_info
772
773 async def _instantiate_ng_ro(
774 self,
775 logging_text,
776 nsr_id,
777 nsd,
778 db_nsr,
779 db_nslcmop,
780 db_vnfrs,
781 db_vnfds,
782 n2vc_key_list,
783 stage,
784 start_deploy,
785 timeout_ns_deploy,
786 ):
787
788 db_vims = {}
789
790 def get_vim_account(vim_account_id):
791 nonlocal db_vims
792 if vim_account_id in db_vims:
793 return db_vims[vim_account_id]
794 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
795 db_vims[vim_account_id] = db_vim
796 return db_vim
797
798 # modify target_vld info with instantiation parameters
799 def parse_vld_instantiation_params(
800 target_vim, target_vld, vld_params, target_sdn
801 ):
802 if vld_params.get("ip-profile"):
803 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
804 "ip-profile"
805 ]
806 if vld_params.get("provider-network"):
807 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
808 "provider-network"
809 ]
810 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
811 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
812 "provider-network"
813 ]["sdn-ports"]
814
815 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
816 # if wim_account_id is specified in vld_params, validate if it is feasible.
817 wim_account_id, db_wim = select_feasible_wim_account(
818 db_nsr, db_vnfrs, target_vld, vld_params, self.logger
819 )
820
821 if wim_account_id:
822 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
823 self.logger.info("WIM selected: {:s}".format(str(wim_account_id)))
824 # update vld_params with correct WIM account Id
825 vld_params["wimAccountId"] = wim_account_id
826
827 target_wim = "wim:{}".format(wim_account_id)
828 target_wim_attrs = get_target_wim_attrs(nsr_id, target_vld, vld_params)
829 sdn_ports = get_sdn_ports(vld_params, db_wim)
830 if len(sdn_ports) > 0:
831 target_vld["vim_info"][target_wim] = target_wim_attrs
832 target_vld["vim_info"][target_wim]["sdn-ports"] = sdn_ports
833
834 self.logger.debug(
835 "Target VLD with WIM data: {:s}".format(str(target_vld))
836 )
837
838 for param in ("vim-network-name", "vim-network-id"):
839 if vld_params.get(param):
840 if isinstance(vld_params[param], dict):
841 for vim, vim_net in vld_params[param].items():
842 other_target_vim = "vim:" + vim
843 populate_dict(
844 target_vld["vim_info"],
845 (other_target_vim, param.replace("-", "_")),
846 vim_net,
847 )
848 else: # isinstance str
849 target_vld["vim_info"][target_vim][
850 param.replace("-", "_")
851 ] = vld_params[param]
852 if vld_params.get("common_id"):
853 target_vld["common_id"] = vld_params.get("common_id")
854
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target, ns_params):
857 for vnf_params in ns_params.get("vnf", ()):
858 if vnf_params.get("vimAccountId"):
859 target_vnf = next(
860 (
861 vnfr
862 for vnfr in db_vnfrs.values()
863 if vnf_params["member-vnf-index"]
864 == vnfr["member-vnf-index-ref"]
865 ),
866 None,
867 )
868 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
869 for a_index, a_vld in enumerate(target["ns"]["vld"]):
870 target_vld = find_in_list(
871 get_iterable(vdur, "interfaces"),
872 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
873 )
874 if target_vld:
875 if vnf_params.get("vimAccountId") not in a_vld.get(
876 "vim_info", {}
877 ):
878 target["ns"]["vld"][a_index].get("vim_info").update(
879 {
880 "vim:{}".format(vnf_params["vimAccountId"]): {
881 "vim_network_name": ""
882 }
883 }
884 )
885
886 nslcmop_id = db_nslcmop["_id"]
887 target = {
888 "name": db_nsr["name"],
889 "ns": {"vld": []},
890 "vnf": [],
891 "image": deepcopy(db_nsr["image"]),
892 "flavor": deepcopy(db_nsr["flavor"]),
893 "action_id": nslcmop_id,
894 "cloud_init_content": {},
895 }
896 for image in target["image"]:
897 image["vim_info"] = {}
898 for flavor in target["flavor"]:
899 flavor["vim_info"] = {}
900 if db_nsr.get("affinity-or-anti-affinity-group"):
901 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group["vim_info"] = {}
904
905 if db_nslcmop.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate = self.db.get_list(
908 "nslcmops",
909 {
910 "nsInstanceId": db_nslcmop["nsInstanceId"],
911 "lcmOperationType": "instantiate",
912 },
913 )[-1]
914 ns_params = db_nslcmop_instantiate.get("operationParams")
915 else:
916 ns_params = db_nslcmop.get("operationParams")
917 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
918 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
919
920 cp2target = {}
921 for vld_index, vld in enumerate(db_nsr.get("vld")):
922 target_vim = "vim:{}".format(ns_params["vimAccountId"])
923 target_vld = {
924 "id": vld["id"],
925 "name": vld["name"],
926 "mgmt-network": vld.get("mgmt-network", False),
927 "type": vld.get("type"),
928 "vim_info": {
929 target_vim: {
930 "vim_network_name": vld.get("vim-network-name"),
931 "vim_account_id": ns_params["vimAccountId"],
932 }
933 },
934 }
935 # check if this network needs SDN assist
936 if vld.get("pci-interfaces"):
937 db_vim = get_vim_account(ns_params["vimAccountId"])
938 sdnc_id = db_vim["config"].get("sdn-controller")
939 if sdnc_id:
940 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
941 target_sdn = "sdn:{}".format(sdnc_id)
942 target_vld["vim_info"][target_sdn] = {
943 "sdn": True,
944 "target_vim": target_vim,
945 "vlds": [sdn_vld],
946 "type": vld.get("type"),
947 }
948
949 nsd_vnf_profiles = get_vnf_profiles(nsd)
950 for nsd_vnf_profile in nsd_vnf_profiles:
951 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
952 if cp["virtual-link-profile-id"] == vld["id"]:
953 cp2target[
954 "member_vnf:{}.{}".format(
955 cp["constituent-cpd-id"][0][
956 "constituent-base-element-id"
957 ],
958 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
959 )
960 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
961
962 # check at nsd descriptor, if there is an ip-profile
963 vld_params = {}
964 nsd_vlp = find_in_list(
965 get_virtual_link_profiles(nsd),
966 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
967 == vld["id"],
968 )
969 if (
970 nsd_vlp
971 and nsd_vlp.get("virtual-link-protocol-data")
972 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
973 ):
974 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
975 "l3-protocol-data"
976 ]
977 ip_profile_dest_data = {}
978 if "ip-version" in ip_profile_source_data:
979 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
980 "ip-version"
981 ]
982 if "cidr" in ip_profile_source_data:
983 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
984 "cidr"
985 ]
986 if "gateway-ip" in ip_profile_source_data:
987 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
988 "gateway-ip"
989 ]
990 if "dhcp-enabled" in ip_profile_source_data:
991 ip_profile_dest_data["dhcp-params"] = {
992 "enabled": ip_profile_source_data["dhcp-enabled"]
993 }
994 vld_params["ip-profile"] = ip_profile_dest_data
995
996 # update vld_params with instantiation params
997 vld_instantiation_params = find_in_list(
998 get_iterable(ns_params, "vld"),
999 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1000 )
1001 if vld_instantiation_params:
1002 vld_params.update(vld_instantiation_params)
1003 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1004 target["ns"]["vld"].append(target_vld)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target, ns_params)
1007
1008 for vnfr in db_vnfrs.values():
1009 vnfd = find_in_list(
1010 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1011 )
1012 vnf_params = find_in_list(
1013 get_iterable(ns_params, "vnf"),
1014 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1015 )
1016 target_vnf = deepcopy(vnfr)
1017 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1018 for vld in target_vnf.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp = find_in_list(
1021 vnfd.get("int-virtual-link-desc", ()),
1022 lambda cpd: cpd.get("id") == vld["id"],
1023 )
1024 if vnf_cp:
1025 ns_cp = "member_vnf:{}.{}".format(
1026 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1027 )
1028 if cp2target.get(ns_cp):
1029 vld["target"] = cp2target[ns_cp]
1030
1031 vld["vim_info"] = {
1032 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1033 }
1034 # check if this network needs SDN assist
1035 target_sdn = None
1036 if vld.get("pci-interfaces"):
1037 db_vim = get_vim_account(vnfr["vim-account-id"])
1038 sdnc_id = db_vim["config"].get("sdn-controller")
1039 if sdnc_id:
1040 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1041 target_sdn = "sdn:{}".format(sdnc_id)
1042 vld["vim_info"][target_sdn] = {
1043 "sdn": True,
1044 "target_vim": target_vim,
1045 "vlds": [sdn_vld],
1046 "type": vld.get("type"),
1047 }
1048
1049 # check at vnfd descriptor, if there is an ip-profile
1050 vld_params = {}
1051 vnfd_vlp = find_in_list(
1052 get_virtual_link_profiles(vnfd),
1053 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1054 )
1055 if (
1056 vnfd_vlp
1057 and vnfd_vlp.get("virtual-link-protocol-data")
1058 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1059 ):
1060 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1061 "l3-protocol-data"
1062 ]
1063 ip_profile_dest_data = {}
1064 if "ip-version" in ip_profile_source_data:
1065 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1066 "ip-version"
1067 ]
1068 if "cidr" in ip_profile_source_data:
1069 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1070 "cidr"
1071 ]
1072 if "gateway-ip" in ip_profile_source_data:
1073 ip_profile_dest_data[
1074 "gateway-address"
1075 ] = ip_profile_source_data["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data:
1077 ip_profile_dest_data["dhcp-params"] = {
1078 "enabled": ip_profile_source_data["dhcp-enabled"]
1079 }
1080
1081 vld_params["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1083 if vnf_params:
1084 vld_instantiation_params = find_in_list(
1085 get_iterable(vnf_params, "internal-vld"),
1086 lambda i_vld: i_vld["name"] == vld["id"],
1087 )
1088 if vld_instantiation_params:
1089 vld_params.update(vld_instantiation_params)
1090 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1091
1092 vdur_list = []
1093 for vdur in target_vnf.get("vdur", ()):
1094 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1097
1098 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1099
1100 if ssh_keys_all:
1101 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1102 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1103 if (
1104 vdu_configuration
1105 and vdu_configuration.get("config-access")
1106 and vdu_configuration.get("config-access").get("ssh-access")
1107 ):
1108 vdur["ssh-keys"] = ssh_keys_all
1109 vdur["ssh-access-required"] = vdu_configuration[
1110 "config-access"
1111 ]["ssh-access"]["required"]
1112 elif (
1113 vnf_configuration
1114 and vnf_configuration.get("config-access")
1115 and vnf_configuration.get("config-access").get("ssh-access")
1116 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1117 ):
1118 vdur["ssh-keys"] = ssh_keys_all
1119 vdur["ssh-access-required"] = vnf_configuration[
1120 "config-access"
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation and find_in_list(
1123 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1124 ):
1125 vdur["ssh-keys"] = ssh_keys_instantiation
1126
1127 self.logger.debug("NS > vdur > {}".format(vdur))
1128
1129 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1130 # cloud-init
1131 if vdud.get("cloud-init-file"):
1132 vdur["cloud-init"] = "{}:file:{}".format(
1133 vnfd["_id"], vdud.get("cloud-init-file")
1134 )
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur["cloud-init"] not in target["cloud_init_content"]:
1137 base_folder = vnfd["_admin"]["storage"]
1138 cloud_init_file = "{}/{}/cloud_init/{}".format(
1139 base_folder["folder"],
1140 base_folder["pkg-dir"],
1141 vdud.get("cloud-init-file"),
1142 )
1143 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1144 target["cloud_init_content"][
1145 vdur["cloud-init"]
1146 ] = ci_file.read()
1147 elif vdud.get("cloud-init"):
1148 vdur["cloud-init"] = "{}:vdu:{}".format(
1149 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1150 )
1151 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1152 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1153 "cloud-init"
1154 ]
1155 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1156 deploy_params_vdu = self._format_additional_params(
1157 vdur.get("additionalParams") or {}
1158 )
1159 deploy_params_vdu["OSM"] = get_osm_params(
1160 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1161 )
1162 vdur["additionalParams"] = deploy_params_vdu
1163
1164 # flavor
1165 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1166 if target_vim not in ns_flavor["vim_info"]:
1167 ns_flavor["vim_info"][target_vim] = {}
1168
1169 # deal with images
1170 # in case alternative images are provided we must check if they should be applied
1171 # for the vim_type, modify the vim_type taking into account
1172 ns_image_id = int(vdur["ns-image-id"])
1173 if vdur.get("alt-image-ids"):
1174 db_vim = get_vim_account(vnfr["vim-account-id"])
1175 vim_type = db_vim["vim_type"]
1176 for alt_image_id in vdur.get("alt-image-ids"):
1177 ns_alt_image = target["image"][int(alt_image_id)]
1178 if vim_type == ns_alt_image.get("vim-type"):
1179 # must use alternative image
1180 self.logger.debug(
1181 "use alternative image id: {}".format(alt_image_id)
1182 )
1183 ns_image_id = alt_image_id
1184 vdur["ns-image-id"] = ns_image_id
1185 break
1186 ns_image = target["image"][int(ns_image_id)]
1187 if target_vim not in ns_image["vim_info"]:
1188 ns_image["vim_info"][target_vim] = {}
1189
1190 # Affinity groups
1191 if vdur.get("affinity-or-anti-affinity-group-id"):
1192 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1193 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1194 if target_vim not in ns_ags["vim_info"]:
1195 ns_ags["vim_info"][target_vim] = {}
1196
1197 vdur["vim_info"] = {target_vim: {}}
1198 # instantiation parameters
1199 # if vnf_params:
1200 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1201 # vdud["id"]), None)
1202 vdur_list.append(vdur)
1203 target_vnf["vdur"] = vdur_list
1204 target["vnf"].append(target_vnf)
1205
1206 desc = await self.RO.deploy(nsr_id, target)
1207 self.logger.debug("RO return > {}".format(desc))
1208 action_id = desc["action_id"]
1209 await self._wait_ng_ro(
1210 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1211 )
1212
1213 # Updating NSR
1214 db_nsr_update = {
1215 "_admin.deployed.RO.operational-status": "running",
1216 "detailed-status": " ".join(stage),
1217 }
1218 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1219 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1220 self._write_op_status(nslcmop_id, stage)
1221 self.logger.debug(
1222 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1223 )
1224 return
1225
1226 async def _wait_ng_ro(
1227 self,
1228 nsr_id,
1229 action_id,
1230 nslcmop_id=None,
1231 start_time=None,
1232 timeout=600,
1233 stage=None,
1234 ):
1235 detailed_status_old = None
1236 db_nsr_update = {}
1237 start_time = start_time or time()
1238 while time() <= start_time + timeout:
1239 desc_status = await self.RO.status(nsr_id, action_id)
1240 self.logger.debug("Wait NG RO > {}".format(desc_status))
1241 if desc_status["status"] == "FAILED":
1242 raise NgRoException(desc_status["details"])
1243 elif desc_status["status"] == "BUILD":
1244 if stage:
1245 stage[2] = "VIM: ({})".format(desc_status["details"])
1246 elif desc_status["status"] == "DONE":
1247 if stage:
1248 stage[2] = "Deployed at VIM"
1249 break
1250 else:
1251 assert False, "ROclient.check_ns_status returns unknown {}".format(
1252 desc_status["status"]
1253 )
1254 if stage and nslcmop_id and stage[2] != detailed_status_old:
1255 detailed_status_old = stage[2]
1256 db_nsr_update["detailed-status"] = " ".join(stage)
1257 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1258 self._write_op_status(nslcmop_id, stage)
1259 await asyncio.sleep(15, loop=self.loop)
1260 else: # timeout_ns_deploy
1261 raise NgRoException("Timeout waiting ns to deploy")
1262
1263 async def _terminate_ng_ro(
1264 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1265 ):
1266 db_nsr_update = {}
1267 failed_detail = []
1268 action_id = None
1269 start_deploy = time()
1270 try:
1271 target = {
1272 "ns": {"vld": []},
1273 "vnf": [],
1274 "image": [],
1275 "flavor": [],
1276 "action_id": nslcmop_id,
1277 }
1278 desc = await self.RO.deploy(nsr_id, target)
1279 action_id = desc["action_id"]
1280 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1281 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1282 self.logger.debug(
1283 logging_text
1284 + "ns terminate action at RO. action_id={}".format(action_id)
1285 )
1286
1287 # wait until done
1288 delete_timeout = 20 * 60 # 20 minutes
1289 await self._wait_ng_ro(
1290 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1291 )
1292
1293 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1294 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1295 # delete all nsr
1296 await self.RO.delete(nsr_id)
1297 except Exception as e:
1298 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1299 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1300 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1301 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1302 self.logger.debug(
1303 logging_text + "RO_action_id={} already deleted".format(action_id)
1304 )
1305 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1306 failed_detail.append("delete conflict: {}".format(e))
1307 self.logger.debug(
1308 logging_text
1309 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1310 )
1311 else:
1312 failed_detail.append("delete error: {}".format(e))
1313 self.logger.error(
1314 logging_text
1315 + "RO_action_id={} delete error: {}".format(action_id, e)
1316 )
1317
1318 if failed_detail:
1319 stage[2] = "Error deleting from VIM"
1320 else:
1321 stage[2] = "Deleted from VIM"
1322 db_nsr_update["detailed-status"] = " ".join(stage)
1323 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1324 self._write_op_status(nslcmop_id, stage)
1325
1326 if failed_detail:
1327 raise LcmException("; ".join(failed_detail))
1328 return
1329
1330 async def instantiate_RO(
1331 self,
1332 logging_text,
1333 nsr_id,
1334 nsd,
1335 db_nsr,
1336 db_nslcmop,
1337 db_vnfrs,
1338 db_vnfds,
1339 n2vc_key_list,
1340 stage,
1341 ):
1342 """
1343 Instantiate at RO
1344 :param logging_text: preffix text to use at logging
1345 :param nsr_id: nsr identity
1346 :param nsd: database content of ns descriptor
1347 :param db_nsr: database content of ns record
1348 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1349 :param db_vnfrs:
1350 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1351 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1352 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1353 :return: None or exception
1354 """
1355 try:
1356 start_deploy = time()
1357 ns_params = db_nslcmop.get("operationParams")
1358 if ns_params and ns_params.get("timeout_ns_deploy"):
1359 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1360 else:
1361 timeout_ns_deploy = self.timeout.get(
1362 "ns_deploy", self.timeout_ns_deploy
1363 )
1364
1365 # Check for and optionally request placement optimization. Database will be updated if placement activated
1366 stage[2] = "Waiting for Placement."
1367 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1368 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1369 for vnfr in db_vnfrs.values():
1370 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1371 break
1372 else:
1373 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1374
1375 return await self._instantiate_ng_ro(
1376 logging_text,
1377 nsr_id,
1378 nsd,
1379 db_nsr,
1380 db_nslcmop,
1381 db_vnfrs,
1382 db_vnfds,
1383 n2vc_key_list,
1384 stage,
1385 start_deploy,
1386 timeout_ns_deploy,
1387 )
1388 except Exception as e:
1389 stage[2] = "ERROR deploying at VIM"
1390 self.set_vnfr_at_error(db_vnfrs, str(e))
1391 self.logger.error(
1392 "Error deploying at VIM {}".format(e),
1393 exc_info=not isinstance(
1394 e,
1395 (
1396 ROclient.ROClientException,
1397 LcmException,
1398 DbException,
1399 NgRoException,
1400 ),
1401 ),
1402 )
1403 raise
1404
1405 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1406 """
1407 Wait for kdu to be up, get ip address
1408 :param logging_text: prefix use for logging
1409 :param nsr_id:
1410 :param vnfr_id:
1411 :param kdu_name:
1412 :return: IP address
1413 """
1414
1415 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1416 nb_tries = 0
1417
1418 while nb_tries < 360:
1419 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1420 kdur = next(
1421 (
1422 x
1423 for x in get_iterable(db_vnfr, "kdur")
1424 if x.get("kdu-name") == kdu_name
1425 ),
1426 None,
1427 )
1428 if not kdur:
1429 raise LcmException(
1430 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1431 )
1432 if kdur.get("status"):
1433 if kdur["status"] in ("READY", "ENABLED"):
1434 return kdur.get("ip-address")
1435 else:
1436 raise LcmException(
1437 "target KDU={} is in error state".format(kdu_name)
1438 )
1439
1440 await asyncio.sleep(10, loop=self.loop)
1441 nb_tries += 1
1442 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1443
1444 async def wait_vm_up_insert_key_ro(
1445 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1446 ):
1447 """
1448 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1449 :param logging_text: prefix use for logging
1450 :param nsr_id:
1451 :param vnfr_id:
1452 :param vdu_id:
1453 :param vdu_index:
1454 :param pub_key: public ssh key to inject, None to skip
1455 :param user: user to apply the public ssh key
1456 :return: IP address
1457 """
1458
1459 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1460 ro_nsr_id = None
1461 ip_address = None
1462 nb_tries = 0
1463 target_vdu_id = None
1464 ro_retries = 0
1465
1466 while True:
1467
1468 ro_retries += 1
1469 if ro_retries >= 360: # 1 hour
1470 raise LcmException(
1471 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1472 )
1473
1474 await asyncio.sleep(10, loop=self.loop)
1475
1476 # get ip address
1477 if not target_vdu_id:
1478 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1479
1480 if not vdu_id: # for the VNF case
1481 if db_vnfr.get("status") == "ERROR":
1482 raise LcmException(
1483 "Cannot inject ssh-key because target VNF is in error state"
1484 )
1485 ip_address = db_vnfr.get("ip-address")
1486 if not ip_address:
1487 continue
1488 vdur = next(
1489 (
1490 x
1491 for x in get_iterable(db_vnfr, "vdur")
1492 if x.get("ip-address") == ip_address
1493 ),
1494 None,
1495 )
1496 else: # VDU case
1497 vdur = next(
1498 (
1499 x
1500 for x in get_iterable(db_vnfr, "vdur")
1501 if x.get("vdu-id-ref") == vdu_id
1502 and x.get("count-index") == vdu_index
1503 ),
1504 None,
1505 )
1506
1507 if (
1508 not vdur and len(db_vnfr.get("vdur", ())) == 1
1509 ): # If only one, this should be the target vdu
1510 vdur = db_vnfr["vdur"][0]
1511 if not vdur:
1512 raise LcmException(
1513 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1514 vnfr_id, vdu_id, vdu_index
1515 )
1516 )
1517 # New generation RO stores information at "vim_info"
1518 ng_ro_status = None
1519 target_vim = None
1520 if vdur.get("vim_info"):
1521 target_vim = next(
1522 t for t in vdur["vim_info"]
1523 ) # there should be only one key
1524 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1525 if (
1526 vdur.get("pdu-type")
1527 or vdur.get("status") == "ACTIVE"
1528 or ng_ro_status == "ACTIVE"
1529 ):
1530 ip_address = vdur.get("ip-address")
1531 if not ip_address:
1532 continue
1533 target_vdu_id = vdur["vdu-id-ref"]
1534 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1535 raise LcmException(
1536 "Cannot inject ssh-key because target VM is in error state"
1537 )
1538
1539 if not target_vdu_id:
1540 continue
1541
1542 # inject public key into machine
1543 if pub_key and user:
1544 self.logger.debug(logging_text + "Inserting RO key")
1545 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1546 if vdur.get("pdu-type"):
1547 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1548 return ip_address
1549 try:
1550 ro_vm_id = "{}-{}".format(
1551 db_vnfr["member-vnf-index-ref"], target_vdu_id
1552 ) # TODO add vdu_index
1553 if self.ng_ro:
1554 target = {
1555 "action": {
1556 "action": "inject_ssh_key",
1557 "key": pub_key,
1558 "user": user,
1559 },
1560 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1561 }
1562 desc = await self.RO.deploy(nsr_id, target)
1563 action_id = desc["action_id"]
1564 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1565 break
1566 else:
1567 # wait until NS is deployed at RO
1568 if not ro_nsr_id:
1569 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1570 ro_nsr_id = deep_get(
1571 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1572 )
1573 if not ro_nsr_id:
1574 continue
1575 result_dict = await self.RO.create_action(
1576 item="ns",
1577 item_id_name=ro_nsr_id,
1578 descriptor={
1579 "add_public_key": pub_key,
1580 "vms": [ro_vm_id],
1581 "user": user,
1582 },
1583 )
1584 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1585 if not result_dict or not isinstance(result_dict, dict):
1586 raise LcmException(
1587 "Unknown response from RO when injecting key"
1588 )
1589 for result in result_dict.values():
1590 if result.get("vim_result") == 200:
1591 break
1592 else:
1593 raise ROclient.ROClientException(
1594 "error injecting key: {}".format(
1595 result.get("description")
1596 )
1597 )
1598 break
1599 except NgRoException as e:
1600 raise LcmException(
1601 "Reaching max tries injecting key. Error: {}".format(e)
1602 )
1603 except ROclient.ROClientException as e:
1604 if not nb_tries:
1605 self.logger.debug(
1606 logging_text
1607 + "error injecting key: {}. Retrying until {} seconds".format(
1608 e, 20 * 10
1609 )
1610 )
1611 nb_tries += 1
1612 if nb_tries >= 20:
1613 raise LcmException(
1614 "Reaching max tries injecting key. Error: {}".format(e)
1615 )
1616 else:
1617 break
1618
1619 return ip_address
1620
1621 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1622 """
1623 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1624 """
1625 my_vca = vca_deployed_list[vca_index]
1626 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1627 # vdu or kdu: no dependencies
1628 return
1629 timeout = 300
1630 while timeout >= 0:
1631 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1632 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1633 configuration_status_list = db_nsr["configurationStatus"]
1634 for index, vca_deployed in enumerate(configuration_status_list):
1635 if index == vca_index:
1636 # myself
1637 continue
1638 if not my_vca.get("member-vnf-index") or (
1639 vca_deployed.get("member-vnf-index")
1640 == my_vca.get("member-vnf-index")
1641 ):
1642 internal_status = configuration_status_list[index].get("status")
1643 if internal_status == "READY":
1644 continue
1645 elif internal_status == "BROKEN":
1646 raise LcmException(
1647 "Configuration aborted because dependent charm/s has failed"
1648 )
1649 else:
1650 break
1651 else:
1652 # no dependencies, return
1653 return
1654 await asyncio.sleep(10)
1655 timeout -= 1
1656
1657 raise LcmException("Configuration aborted because dependent charm/s timeout")
1658
1659 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1660 vca_id = None
1661 if db_vnfr:
1662 vca_id = deep_get(db_vnfr, ("vca-id",))
1663 elif db_nsr:
1664 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1665 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1666 return vca_id
1667
1668 async def instantiate_N2VC(
1669 self,
1670 logging_text,
1671 vca_index,
1672 nsi_id,
1673 db_nsr,
1674 db_vnfr,
1675 vdu_id,
1676 kdu_name,
1677 vdu_index,
1678 config_descriptor,
1679 deploy_params,
1680 base_folder,
1681 nslcmop_id,
1682 stage,
1683 vca_type,
1684 vca_name,
1685 ee_config_descriptor,
1686 ):
1687 nsr_id = db_nsr["_id"]
1688 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1689 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1690 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1691 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1692 db_dict = {
1693 "collection": "nsrs",
1694 "filter": {"_id": nsr_id},
1695 "path": db_update_entry,
1696 }
1697 step = ""
1698 try:
1699
1700 element_type = "NS"
1701 element_under_configuration = nsr_id
1702
1703 vnfr_id = None
1704 if db_vnfr:
1705 vnfr_id = db_vnfr["_id"]
1706 osm_config["osm"]["vnf_id"] = vnfr_id
1707
1708 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1709
1710 if vca_type == "native_charm":
1711 index_number = 0
1712 else:
1713 index_number = vdu_index or 0
1714
1715 if vnfr_id:
1716 element_type = "VNF"
1717 element_under_configuration = vnfr_id
1718 namespace += ".{}-{}".format(vnfr_id, index_number)
1719 if vdu_id:
1720 namespace += ".{}-{}".format(vdu_id, index_number)
1721 element_type = "VDU"
1722 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1723 osm_config["osm"]["vdu_id"] = vdu_id
1724 elif kdu_name:
1725 namespace += ".{}".format(kdu_name)
1726 element_type = "KDU"
1727 element_under_configuration = kdu_name
1728 osm_config["osm"]["kdu_name"] = kdu_name
1729
1730 # Get artifact path
1731 artifact_path = "{}/{}/{}/{}".format(
1732 base_folder["folder"],
1733 base_folder["pkg-dir"],
1734 "charms"
1735 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1736 else "helm-charts",
1737 vca_name,
1738 )
1739
1740 self.logger.debug("Artifact path > {}".format(artifact_path))
1741
1742 # get initial_config_primitive_list that applies to this element
1743 initial_config_primitive_list = config_descriptor.get(
1744 "initial-config-primitive"
1745 )
1746
1747 self.logger.debug(
1748 "Initial config primitive list > {}".format(
1749 initial_config_primitive_list
1750 )
1751 )
1752
1753 # add config if not present for NS charm
1754 ee_descriptor_id = ee_config_descriptor.get("id")
1755 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1756 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1757 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1758 )
1759
1760 self.logger.debug(
1761 "Initial config primitive list #2 > {}".format(
1762 initial_config_primitive_list
1763 )
1764 )
1765 # n2vc_redesign STEP 3.1
1766 # find old ee_id if exists
1767 ee_id = vca_deployed.get("ee_id")
1768
1769 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1770 # create or register execution environment in VCA
1771 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1772
1773 self._write_configuration_status(
1774 nsr_id=nsr_id,
1775 vca_index=vca_index,
1776 status="CREATING",
1777 element_under_configuration=element_under_configuration,
1778 element_type=element_type,
1779 )
1780
1781 step = "create execution environment"
1782 self.logger.debug(logging_text + step)
1783
1784 ee_id = None
1785 credentials = None
1786 if vca_type == "k8s_proxy_charm":
1787 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1788 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1789 namespace=namespace,
1790 artifact_path=artifact_path,
1791 db_dict=db_dict,
1792 vca_id=vca_id,
1793 )
1794 elif vca_type == "helm" or vca_type == "helm-v3":
1795 ee_id, credentials = await self.vca_map[
1796 vca_type
1797 ].create_execution_environment(
1798 namespace=namespace,
1799 reuse_ee_id=ee_id,
1800 db_dict=db_dict,
1801 config=osm_config,
1802 artifact_path=artifact_path,
1803 vca_type=vca_type,
1804 )
1805 else:
1806 ee_id, credentials = await self.vca_map[
1807 vca_type
1808 ].create_execution_environment(
1809 namespace=namespace,
1810 reuse_ee_id=ee_id,
1811 db_dict=db_dict,
1812 vca_id=vca_id,
1813 )
1814
1815 elif vca_type == "native_charm":
1816 step = "Waiting to VM being up and getting IP address"
1817 self.logger.debug(logging_text + step)
1818 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1819 logging_text,
1820 nsr_id,
1821 vnfr_id,
1822 vdu_id,
1823 vdu_index,
1824 user=None,
1825 pub_key=None,
1826 )
1827 credentials = {"hostname": rw_mgmt_ip}
1828 # get username
1829 username = deep_get(
1830 config_descriptor, ("config-access", "ssh-access", "default-user")
1831 )
1832 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1833 # merged. Meanwhile let's get username from initial-config-primitive
1834 if not username and initial_config_primitive_list:
1835 for config_primitive in initial_config_primitive_list:
1836 for param in config_primitive.get("parameter", ()):
1837 if param["name"] == "ssh-username":
1838 username = param["value"]
1839 break
1840 if not username:
1841 raise LcmException(
1842 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1843 "'config-access.ssh-access.default-user'"
1844 )
1845 credentials["username"] = username
1846 # n2vc_redesign STEP 3.2
1847
1848 self._write_configuration_status(
1849 nsr_id=nsr_id,
1850 vca_index=vca_index,
1851 status="REGISTERING",
1852 element_under_configuration=element_under_configuration,
1853 element_type=element_type,
1854 )
1855
1856 step = "register execution environment {}".format(credentials)
1857 self.logger.debug(logging_text + step)
1858 ee_id = await self.vca_map[vca_type].register_execution_environment(
1859 credentials=credentials,
1860 namespace=namespace,
1861 db_dict=db_dict,
1862 vca_id=vca_id,
1863 )
1864
1865 # for compatibility with MON/POL modules, the need model and application name at database
1866 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1867 ee_id_parts = ee_id.split(".")
1868 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1869 if len(ee_id_parts) >= 2:
1870 model_name = ee_id_parts[0]
1871 application_name = ee_id_parts[1]
1872 db_nsr_update[db_update_entry + "model"] = model_name
1873 db_nsr_update[db_update_entry + "application"] = application_name
1874
1875 # n2vc_redesign STEP 3.3
1876 step = "Install configuration Software"
1877
1878 self._write_configuration_status(
1879 nsr_id=nsr_id,
1880 vca_index=vca_index,
1881 status="INSTALLING SW",
1882 element_under_configuration=element_under_configuration,
1883 element_type=element_type,
1884 other_update=db_nsr_update,
1885 )
1886
1887 # TODO check if already done
1888 self.logger.debug(logging_text + step)
1889 config = None
1890 if vca_type == "native_charm":
1891 config_primitive = next(
1892 (p for p in initial_config_primitive_list if p["name"] == "config"),
1893 None,
1894 )
1895 if config_primitive:
1896 config = self._map_primitive_params(
1897 config_primitive, {}, deploy_params
1898 )
1899 num_units = 1
1900 if vca_type == "lxc_proxy_charm":
1901 if element_type == "NS":
1902 num_units = db_nsr.get("config-units") or 1
1903 elif element_type == "VNF":
1904 num_units = db_vnfr.get("config-units") or 1
1905 elif element_type == "VDU":
1906 for v in db_vnfr["vdur"]:
1907 if vdu_id == v["vdu-id-ref"]:
1908 num_units = v.get("config-units") or 1
1909 break
1910 if vca_type != "k8s_proxy_charm":
1911 await self.vca_map[vca_type].install_configuration_sw(
1912 ee_id=ee_id,
1913 artifact_path=artifact_path,
1914 db_dict=db_dict,
1915 config=config,
1916 num_units=num_units,
1917 vca_id=vca_id,
1918 vca_type=vca_type,
1919 )
1920
1921 # write in db flag of configuration_sw already installed
1922 self.update_db_2(
1923 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1924 )
1925
1926 # add relations for this VCA (wait for other peers related with this VCA)
1927 await self._add_vca_relations(
1928 logging_text=logging_text,
1929 nsr_id=nsr_id,
1930 vca_index=vca_index,
1931 vca_id=vca_id,
1932 vca_type=vca_type,
1933 )
1934
1935 # if SSH access is required, then get execution environment SSH public
1936 # if native charm we have waited already to VM be UP
1937 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1938 pub_key = None
1939 user = None
1940 # self.logger.debug("get ssh key block")
1941 if deep_get(
1942 config_descriptor, ("config-access", "ssh-access", "required")
1943 ):
1944 # self.logger.debug("ssh key needed")
1945 # Needed to inject a ssh key
1946 user = deep_get(
1947 config_descriptor,
1948 ("config-access", "ssh-access", "default-user"),
1949 )
1950 step = "Install configuration Software, getting public ssh key"
1951 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1952 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1953 )
1954
1955 step = "Insert public key into VM user={} ssh_key={}".format(
1956 user, pub_key
1957 )
1958 else:
1959 # self.logger.debug("no need to get ssh key")
1960 step = "Waiting to VM being up and getting IP address"
1961 self.logger.debug(logging_text + step)
1962
1963 # n2vc_redesign STEP 5.1
1964 # wait for RO (ip-address) Insert pub_key into VM
1965 if vnfr_id:
1966 if kdu_name:
1967 rw_mgmt_ip = await self.wait_kdu_up(
1968 logging_text, nsr_id, vnfr_id, kdu_name
1969 )
1970 else:
1971 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1972 logging_text,
1973 nsr_id,
1974 vnfr_id,
1975 vdu_id,
1976 vdu_index,
1977 user=user,
1978 pub_key=pub_key,
1979 )
1980 else:
1981 rw_mgmt_ip = None # This is for a NS configuration
1982
1983 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1984
1985 # store rw_mgmt_ip in deploy params for later replacement
1986 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1987
1988 # n2vc_redesign STEP 6 Execute initial config primitive
1989 step = "execute initial config primitive"
1990
1991 # wait for dependent primitives execution (NS -> VNF -> VDU)
1992 if initial_config_primitive_list:
1993 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1994
1995 # stage, in function of element type: vdu, kdu, vnf or ns
1996 my_vca = vca_deployed_list[vca_index]
1997 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1998 # VDU or KDU
1999 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2000 elif my_vca.get("member-vnf-index"):
2001 # VNF
2002 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2003 else:
2004 # NS
2005 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2006
2007 self._write_configuration_status(
2008 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2009 )
2010
2011 self._write_op_status(op_id=nslcmop_id, stage=stage)
2012
2013 check_if_terminated_needed = True
2014 for initial_config_primitive in initial_config_primitive_list:
2015 # adding information on the vca_deployed if it is a NS execution environment
2016 if not vca_deployed["member-vnf-index"]:
2017 deploy_params["ns_config_info"] = json.dumps(
2018 self._get_ns_config_info(nsr_id)
2019 )
2020 # TODO check if already done
2021 primitive_params_ = self._map_primitive_params(
2022 initial_config_primitive, {}, deploy_params
2023 )
2024
2025 step = "execute primitive '{}' params '{}'".format(
2026 initial_config_primitive["name"], primitive_params_
2027 )
2028 self.logger.debug(logging_text + step)
2029 await self.vca_map[vca_type].exec_primitive(
2030 ee_id=ee_id,
2031 primitive_name=initial_config_primitive["name"],
2032 params_dict=primitive_params_,
2033 db_dict=db_dict,
2034 vca_id=vca_id,
2035 vca_type=vca_type,
2036 )
2037 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2038 if check_if_terminated_needed:
2039 if config_descriptor.get("terminate-config-primitive"):
2040 self.update_db_2(
2041 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2042 )
2043 check_if_terminated_needed = False
2044
2045 # TODO register in database that primitive is done
2046
2047 # STEP 7 Configure metrics
2048 if vca_type == "helm" or vca_type == "helm-v3":
2049 prometheus_jobs = await self.add_prometheus_metrics(
2050 ee_id=ee_id,
2051 artifact_path=artifact_path,
2052 ee_config_descriptor=ee_config_descriptor,
2053 vnfr_id=vnfr_id,
2054 nsr_id=nsr_id,
2055 target_ip=rw_mgmt_ip,
2056 )
2057 if prometheus_jobs:
2058 self.update_db_2(
2059 "nsrs",
2060 nsr_id,
2061 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2062 )
2063
2064 step = "instantiated at VCA"
2065 self.logger.debug(logging_text + step)
2066
2067 self._write_configuration_status(
2068 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2069 )
2070
2071 except Exception as e: # TODO not use Exception but N2VC exception
2072 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2073 if not isinstance(
2074 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2075 ):
2076 self.logger.error(
2077 "Exception while {} : {}".format(step, e), exc_info=True
2078 )
2079 self._write_configuration_status(
2080 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2081 )
2082 raise LcmException("{} {}".format(step, e)) from e
2083
2084 def _write_ns_status(
2085 self,
2086 nsr_id: str,
2087 ns_state: str,
2088 current_operation: str,
2089 current_operation_id: str,
2090 error_description: str = None,
2091 error_detail: str = None,
2092 other_update: dict = None,
2093 ):
2094 """
2095 Update db_nsr fields.
2096 :param nsr_id:
2097 :param ns_state:
2098 :param current_operation:
2099 :param current_operation_id:
2100 :param error_description:
2101 :param error_detail:
2102 :param other_update: Other required changes at database if provided, will be cleared
2103 :return:
2104 """
2105 try:
2106 db_dict = other_update or {}
2107 db_dict[
2108 "_admin.nslcmop"
2109 ] = current_operation_id # for backward compatibility
2110 db_dict["_admin.current-operation"] = current_operation_id
2111 db_dict["_admin.operation-type"] = (
2112 current_operation if current_operation != "IDLE" else None
2113 )
2114 db_dict["currentOperation"] = current_operation
2115 db_dict["currentOperationID"] = current_operation_id
2116 db_dict["errorDescription"] = error_description
2117 db_dict["errorDetail"] = error_detail
2118
2119 if ns_state:
2120 db_dict["nsState"] = ns_state
2121 self.update_db_2("nsrs", nsr_id, db_dict)
2122 except DbException as e:
2123 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2124
2125 def _write_op_status(
2126 self,
2127 op_id: str,
2128 stage: list = None,
2129 error_message: str = None,
2130 queuePosition: int = 0,
2131 operation_state: str = None,
2132 other_update: dict = None,
2133 ):
2134 try:
2135 db_dict = other_update or {}
2136 db_dict["queuePosition"] = queuePosition
2137 if isinstance(stage, list):
2138 db_dict["stage"] = stage[0]
2139 db_dict["detailed-status"] = " ".join(stage)
2140 elif stage is not None:
2141 db_dict["stage"] = str(stage)
2142
2143 if error_message is not None:
2144 db_dict["errorMessage"] = error_message
2145 if operation_state is not None:
2146 db_dict["operationState"] = operation_state
2147 db_dict["statusEnteredTime"] = time()
2148 self.update_db_2("nslcmops", op_id, db_dict)
2149 except DbException as e:
2150 self.logger.warn(
2151 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2152 )
2153
2154 def _write_all_config_status(self, db_nsr: dict, status: str):
2155 try:
2156 nsr_id = db_nsr["_id"]
2157 # configurationStatus
2158 config_status = db_nsr.get("configurationStatus")
2159 if config_status:
2160 db_nsr_update = {
2161 "configurationStatus.{}.status".format(index): status
2162 for index, v in enumerate(config_status)
2163 if v
2164 }
2165 # update status
2166 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2167
2168 except DbException as e:
2169 self.logger.warn(
2170 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2171 )
2172
2173 def _write_configuration_status(
2174 self,
2175 nsr_id: str,
2176 vca_index: int,
2177 status: str = None,
2178 element_under_configuration: str = None,
2179 element_type: str = None,
2180 other_update: dict = None,
2181 ):
2182
2183 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2184 # .format(vca_index, status))
2185
2186 try:
2187 db_path = "configurationStatus.{}.".format(vca_index)
2188 db_dict = other_update or {}
2189 if status:
2190 db_dict[db_path + "status"] = status
2191 if element_under_configuration:
2192 db_dict[
2193 db_path + "elementUnderConfiguration"
2194 ] = element_under_configuration
2195 if element_type:
2196 db_dict[db_path + "elementType"] = element_type
2197 self.update_db_2("nsrs", nsr_id, db_dict)
2198 except DbException as e:
2199 self.logger.warn(
2200 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2201 status, nsr_id, vca_index, e
2202 )
2203 )
2204
2205 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2206 """
2207 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2208 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2209 Database is used because the result can be obtained from a different LCM worker in case of HA.
2210 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2211 :param db_nslcmop: database content of nslcmop
2212 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2213 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2214 computed 'vim-account-id'
2215 """
2216 modified = False
2217 nslcmop_id = db_nslcmop["_id"]
2218 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2219 if placement_engine == "PLA":
2220 self.logger.debug(
2221 logging_text + "Invoke and wait for placement optimization"
2222 )
2223 await self.msg.aiowrite(
2224 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2225 )
2226 db_poll_interval = 5
2227 wait = db_poll_interval * 10
2228 pla_result = None
2229 while not pla_result and wait >= 0:
2230 await asyncio.sleep(db_poll_interval)
2231 wait -= db_poll_interval
2232 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2233 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2234
2235 if not pla_result:
2236 raise LcmException(
2237 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2238 )
2239
2240 for pla_vnf in pla_result["vnf"]:
2241 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2242 if not pla_vnf.get("vimAccountId") or not vnfr:
2243 continue
2244 modified = True
2245 self.db.set_one(
2246 "vnfrs",
2247 {"_id": vnfr["_id"]},
2248 {"vim-account-id": pla_vnf["vimAccountId"]},
2249 )
2250 # Modifies db_vnfrs
2251 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2252 return modified
2253
2254 def update_nsrs_with_pla_result(self, params):
2255 try:
2256 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2257 self.update_db_2(
2258 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2259 )
2260 except Exception as e:
2261 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2262
2263 async def instantiate(self, nsr_id, nslcmop_id):
2264 """
2265
2266 :param nsr_id: ns instance to deploy
2267 :param nslcmop_id: operation to run
2268 :return:
2269 """
2270
2271 # Try to lock HA task here
2272 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2273 if not task_is_locked_by_me:
2274 self.logger.debug(
2275 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2276 )
2277 return
2278
2279 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2280 self.logger.debug(logging_text + "Enter")
2281
2282 # get all needed from database
2283
2284 # database nsrs record
2285 db_nsr = None
2286
2287 # database nslcmops record
2288 db_nslcmop = None
2289
2290 # update operation on nsrs
2291 db_nsr_update = {}
2292 # update operation on nslcmops
2293 db_nslcmop_update = {}
2294
2295 nslcmop_operation_state = None
2296 db_vnfrs = {} # vnf's info indexed by member-index
2297 # n2vc_info = {}
2298 tasks_dict_info = {} # from task to info text
2299 exc = None
2300 error_list = []
2301 stage = [
2302 "Stage 1/5: preparation of the environment.",
2303 "Waiting for previous operations to terminate.",
2304 "",
2305 ]
2306 # ^ stage, step, VIM progress
2307 try:
2308 # wait for any previous tasks in process
2309 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2310
2311 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2312 stage[1] = "Reading from database."
2313 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2314 db_nsr_update["detailed-status"] = "creating"
2315 db_nsr_update["operational-status"] = "init"
2316 self._write_ns_status(
2317 nsr_id=nsr_id,
2318 ns_state="BUILDING",
2319 current_operation="INSTANTIATING",
2320 current_operation_id=nslcmop_id,
2321 other_update=db_nsr_update,
2322 )
2323 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2324
2325 # read from db: operation
2326 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2327 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2328 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2329 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2330 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2331 )
2332 ns_params = db_nslcmop.get("operationParams")
2333 if ns_params and ns_params.get("timeout_ns_deploy"):
2334 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2335 else:
2336 timeout_ns_deploy = self.timeout.get(
2337 "ns_deploy", self.timeout_ns_deploy
2338 )
2339
2340 # read from db: ns
2341 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2342 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2343 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2344 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2345 self.fs.sync(db_nsr["nsd-id"])
2346 db_nsr["nsd"] = nsd
2347 # nsr_name = db_nsr["name"] # TODO short-name??
2348
2349 # read from db: vnf's of this ns
2350 stage[1] = "Getting vnfrs from db."
2351 self.logger.debug(logging_text + stage[1])
2352 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2353
2354 # read from db: vnfd's for every vnf
2355 db_vnfds = [] # every vnfd data
2356
2357 # for each vnf in ns, read vnfd
2358 for vnfr in db_vnfrs_list:
2359 if vnfr.get("kdur"):
2360 kdur_list = []
2361 for kdur in vnfr["kdur"]:
2362 if kdur.get("additionalParams"):
2363 kdur["additionalParams"] = json.loads(
2364 kdur["additionalParams"]
2365 )
2366 kdur_list.append(kdur)
2367 vnfr["kdur"] = kdur_list
2368
2369 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2370 vnfd_id = vnfr["vnfd-id"]
2371 vnfd_ref = vnfr["vnfd-ref"]
2372 self.fs.sync(vnfd_id)
2373
2374 # if we haven't this vnfd, read it from db
2375 if vnfd_id not in db_vnfds:
2376 # read from db
2377 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2378 vnfd_id, vnfd_ref
2379 )
2380 self.logger.debug(logging_text + stage[1])
2381 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2382
2383 # store vnfd
2384 db_vnfds.append(vnfd)
2385
2386 # Get or generates the _admin.deployed.VCA list
2387 vca_deployed_list = None
2388 if db_nsr["_admin"].get("deployed"):
2389 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2390 if vca_deployed_list is None:
2391 vca_deployed_list = []
2392 configuration_status_list = []
2393 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2394 db_nsr_update["configurationStatus"] = configuration_status_list
2395 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2396 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2397 elif isinstance(vca_deployed_list, dict):
2398 # maintain backward compatibility. Change a dict to list at database
2399 vca_deployed_list = list(vca_deployed_list.values())
2400 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2401 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2402
2403 if not isinstance(
2404 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2405 ):
2406 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2407 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2408
2409 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2410 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2411 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2412 self.db.set_list(
2413 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2414 )
2415
2416 # n2vc_redesign STEP 2 Deploy Network Scenario
2417 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2418 self._write_op_status(op_id=nslcmop_id, stage=stage)
2419
2420 stage[1] = "Deploying KDUs."
2421 # self.logger.debug(logging_text + "Before deploy_kdus")
2422 # Call to deploy_kdus in case exists the "vdu:kdu" param
2423 await self.deploy_kdus(
2424 logging_text=logging_text,
2425 nsr_id=nsr_id,
2426 nslcmop_id=nslcmop_id,
2427 db_vnfrs=db_vnfrs,
2428 db_vnfds=db_vnfds,
2429 task_instantiation_info=tasks_dict_info,
2430 )
2431
2432 stage[1] = "Getting VCA public key."
2433 # n2vc_redesign STEP 1 Get VCA public ssh-key
2434 # feature 1429. Add n2vc public key to needed VMs
2435 n2vc_key = self.n2vc.get_public_key()
2436 n2vc_key_list = [n2vc_key]
2437 if self.vca_config.get("public_key"):
2438 n2vc_key_list.append(self.vca_config["public_key"])
2439
2440 stage[1] = "Deploying NS at VIM."
2441 task_ro = asyncio.ensure_future(
2442 self.instantiate_RO(
2443 logging_text=logging_text,
2444 nsr_id=nsr_id,
2445 nsd=nsd,
2446 db_nsr=db_nsr,
2447 db_nslcmop=db_nslcmop,
2448 db_vnfrs=db_vnfrs,
2449 db_vnfds=db_vnfds,
2450 n2vc_key_list=n2vc_key_list,
2451 stage=stage,
2452 )
2453 )
2454 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2455 tasks_dict_info[task_ro] = "Deploying at VIM"
2456
2457 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2458 stage[1] = "Deploying Execution Environments."
2459 self.logger.debug(logging_text + stage[1])
2460
2461 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2462 for vnf_profile in get_vnf_profiles(nsd):
2463 vnfd_id = vnf_profile["vnfd-id"]
2464 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2465 member_vnf_index = str(vnf_profile["id"])
2466 db_vnfr = db_vnfrs[member_vnf_index]
2467 base_folder = vnfd["_admin"]["storage"]
2468 vdu_id = None
2469 vdu_index = 0
2470 vdu_name = None
2471 kdu_name = None
2472
2473 # Get additional parameters
2474 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2475 if db_vnfr.get("additionalParamsForVnf"):
2476 deploy_params.update(
2477 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2478 )
2479
2480 descriptor_config = get_configuration(vnfd, vnfd["id"])
2481 if descriptor_config:
2482 self._deploy_n2vc(
2483 logging_text=logging_text
2484 + "member_vnf_index={} ".format(member_vnf_index),
2485 db_nsr=db_nsr,
2486 db_vnfr=db_vnfr,
2487 nslcmop_id=nslcmop_id,
2488 nsr_id=nsr_id,
2489 nsi_id=nsi_id,
2490 vnfd_id=vnfd_id,
2491 vdu_id=vdu_id,
2492 kdu_name=kdu_name,
2493 member_vnf_index=member_vnf_index,
2494 vdu_index=vdu_index,
2495 vdu_name=vdu_name,
2496 deploy_params=deploy_params,
2497 descriptor_config=descriptor_config,
2498 base_folder=base_folder,
2499 task_instantiation_info=tasks_dict_info,
2500 stage=stage,
2501 )
2502
2503 # Deploy charms for each VDU that supports one.
2504 for vdud in get_vdu_list(vnfd):
2505 vdu_id = vdud["id"]
2506 descriptor_config = get_configuration(vnfd, vdu_id)
2507 vdur = find_in_list(
2508 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2509 )
2510
2511 if vdur.get("additionalParams"):
2512 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2513 else:
2514 deploy_params_vdu = deploy_params
2515 deploy_params_vdu["OSM"] = get_osm_params(
2516 db_vnfr, vdu_id, vdu_count_index=0
2517 )
2518 vdud_count = get_number_of_instances(vnfd, vdu_id)
2519
2520 self.logger.debug("VDUD > {}".format(vdud))
2521 self.logger.debug(
2522 "Descriptor config > {}".format(descriptor_config)
2523 )
2524 if descriptor_config:
2525 vdu_name = None
2526 kdu_name = None
2527 for vdu_index in range(vdud_count):
2528 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2529 self._deploy_n2vc(
2530 logging_text=logging_text
2531 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2532 member_vnf_index, vdu_id, vdu_index
2533 ),
2534 db_nsr=db_nsr,
2535 db_vnfr=db_vnfr,
2536 nslcmop_id=nslcmop_id,
2537 nsr_id=nsr_id,
2538 nsi_id=nsi_id,
2539 vnfd_id=vnfd_id,
2540 vdu_id=vdu_id,
2541 kdu_name=kdu_name,
2542 member_vnf_index=member_vnf_index,
2543 vdu_index=vdu_index,
2544 vdu_name=vdu_name,
2545 deploy_params=deploy_params_vdu,
2546 descriptor_config=descriptor_config,
2547 base_folder=base_folder,
2548 task_instantiation_info=tasks_dict_info,
2549 stage=stage,
2550 )
2551 for kdud in get_kdu_list(vnfd):
2552 kdu_name = kdud["name"]
2553 descriptor_config = get_configuration(vnfd, kdu_name)
2554 if descriptor_config:
2555 vdu_id = None
2556 vdu_index = 0
2557 vdu_name = None
2558 kdur = next(
2559 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2560 )
2561 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2562 if kdur.get("additionalParams"):
2563 deploy_params_kdu.update(
2564 parse_yaml_strings(kdur["additionalParams"].copy())
2565 )
2566
2567 self._deploy_n2vc(
2568 logging_text=logging_text,
2569 db_nsr=db_nsr,
2570 db_vnfr=db_vnfr,
2571 nslcmop_id=nslcmop_id,
2572 nsr_id=nsr_id,
2573 nsi_id=nsi_id,
2574 vnfd_id=vnfd_id,
2575 vdu_id=vdu_id,
2576 kdu_name=kdu_name,
2577 member_vnf_index=member_vnf_index,
2578 vdu_index=vdu_index,
2579 vdu_name=vdu_name,
2580 deploy_params=deploy_params_kdu,
2581 descriptor_config=descriptor_config,
2582 base_folder=base_folder,
2583 task_instantiation_info=tasks_dict_info,
2584 stage=stage,
2585 )
2586
2587 # Check if this NS has a charm configuration
2588 descriptor_config = nsd.get("ns-configuration")
2589 if descriptor_config and descriptor_config.get("juju"):
2590 vnfd_id = None
2591 db_vnfr = None
2592 member_vnf_index = None
2593 vdu_id = None
2594 kdu_name = None
2595 vdu_index = 0
2596 vdu_name = None
2597
2598 # Get additional parameters
2599 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2600 if db_nsr.get("additionalParamsForNs"):
2601 deploy_params.update(
2602 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2603 )
2604 base_folder = nsd["_admin"]["storage"]
2605 self._deploy_n2vc(
2606 logging_text=logging_text,
2607 db_nsr=db_nsr,
2608 db_vnfr=db_vnfr,
2609 nslcmop_id=nslcmop_id,
2610 nsr_id=nsr_id,
2611 nsi_id=nsi_id,
2612 vnfd_id=vnfd_id,
2613 vdu_id=vdu_id,
2614 kdu_name=kdu_name,
2615 member_vnf_index=member_vnf_index,
2616 vdu_index=vdu_index,
2617 vdu_name=vdu_name,
2618 deploy_params=deploy_params,
2619 descriptor_config=descriptor_config,
2620 base_folder=base_folder,
2621 task_instantiation_info=tasks_dict_info,
2622 stage=stage,
2623 )
2624
2625 # rest of staff will be done at finally
2626
2627 except (
2628 ROclient.ROClientException,
2629 DbException,
2630 LcmException,
2631 N2VCException,
2632 ) as e:
2633 self.logger.error(
2634 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2635 )
2636 exc = e
2637 except asyncio.CancelledError:
2638 self.logger.error(
2639 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2640 )
2641 exc = "Operation was cancelled"
2642 except Exception as e:
2643 exc = traceback.format_exc()
2644 self.logger.critical(
2645 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2646 exc_info=True,
2647 )
2648 finally:
2649 if exc:
2650 error_list.append(str(exc))
2651 try:
2652 # wait for pending tasks
2653 if tasks_dict_info:
2654 stage[1] = "Waiting for instantiate pending tasks."
2655 self.logger.debug(logging_text + stage[1])
2656 error_list += await self._wait_for_tasks(
2657 logging_text,
2658 tasks_dict_info,
2659 timeout_ns_deploy,
2660 stage,
2661 nslcmop_id,
2662 nsr_id=nsr_id,
2663 )
2664 stage[1] = stage[2] = ""
2665 except asyncio.CancelledError:
2666 error_list.append("Cancelled")
2667 # TODO cancel all tasks
2668 except Exception as exc:
2669 error_list.append(str(exc))
2670
2671 # update operation-status
2672 db_nsr_update["operational-status"] = "running"
2673 # let's begin with VCA 'configured' status (later we can change it)
2674 db_nsr_update["config-status"] = "configured"
2675 for task, task_name in tasks_dict_info.items():
2676 if not task.done() or task.cancelled() or task.exception():
2677 if task_name.startswith(self.task_name_deploy_vca):
2678 # A N2VC task is pending
2679 db_nsr_update["config-status"] = "failed"
2680 else:
2681 # RO or KDU task is pending
2682 db_nsr_update["operational-status"] = "failed"
2683
2684 # update status at database
2685 if error_list:
2686 error_detail = ". ".join(error_list)
2687 self.logger.error(logging_text + error_detail)
2688 error_description_nslcmop = "{} Detail: {}".format(
2689 stage[0], error_detail
2690 )
2691 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2692 nslcmop_id, stage[0]
2693 )
2694
2695 db_nsr_update["detailed-status"] = (
2696 error_description_nsr + " Detail: " + error_detail
2697 )
2698 db_nslcmop_update["detailed-status"] = error_detail
2699 nslcmop_operation_state = "FAILED"
2700 ns_state = "BROKEN"
2701 else:
2702 error_detail = None
2703 error_description_nsr = error_description_nslcmop = None
2704 ns_state = "READY"
2705 db_nsr_update["detailed-status"] = "Done"
2706 db_nslcmop_update["detailed-status"] = "Done"
2707 nslcmop_operation_state = "COMPLETED"
2708
2709 if db_nsr:
2710 self._write_ns_status(
2711 nsr_id=nsr_id,
2712 ns_state=ns_state,
2713 current_operation="IDLE",
2714 current_operation_id=None,
2715 error_description=error_description_nsr,
2716 error_detail=error_detail,
2717 other_update=db_nsr_update,
2718 )
2719 self._write_op_status(
2720 op_id=nslcmop_id,
2721 stage="",
2722 error_message=error_description_nslcmop,
2723 operation_state=nslcmop_operation_state,
2724 other_update=db_nslcmop_update,
2725 )
2726
2727 if nslcmop_operation_state:
2728 try:
2729 await self.msg.aiowrite(
2730 "ns",
2731 "instantiated",
2732 {
2733 "nsr_id": nsr_id,
2734 "nslcmop_id": nslcmop_id,
2735 "operationState": nslcmop_operation_state,
2736 },
2737 loop=self.loop,
2738 )
2739 except Exception as e:
2740 self.logger.error(
2741 logging_text + "kafka_write notification Exception {}".format(e)
2742 )
2743
2744 self.logger.debug(logging_text + "Exit")
2745 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2746
2747 async def _add_vca_relations(
2748 self,
2749 logging_text,
2750 nsr_id,
2751 vca_index: int,
2752 timeout: int = 3600,
2753 vca_type: str = None,
2754 vca_id: str = None,
2755 ) -> bool:
2756
2757 # steps:
2758 # 1. find all relations for this VCA
2759 # 2. wait for other peers related
2760 # 3. add relations
2761
2762 try:
2763 vca_type = vca_type or "lxc_proxy_charm"
2764
2765 # STEP 1: find all relations for this VCA
2766
2767 # read nsr record
2768 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2769 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2770
2771 # this VCA data
2772 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2773
2774 # read all ns-configuration relations
2775 ns_relations = list()
2776 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2777 if db_ns_relations:
2778 for r in db_ns_relations:
2779 # check if this VCA is in the relation
2780 if my_vca.get("member-vnf-index") in (
2781 r.get("entities")[0].get("id"),
2782 r.get("entities")[1].get("id"),
2783 ):
2784 ns_relations.append(r)
2785
2786 # read all vnf-configuration relations
2787 vnf_relations = list()
2788 db_vnfd_list = db_nsr.get("vnfd-id")
2789 if db_vnfd_list:
2790 for vnfd in db_vnfd_list:
2791 db_vnf_relations = None
2792 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2793 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2794 if db_vnf_configuration:
2795 db_vnf_relations = db_vnf_configuration.get("relation", [])
2796 if db_vnf_relations:
2797 for r in db_vnf_relations:
2798 # check if this VCA is in the relation
2799 if my_vca.get("vdu_id") in (
2800 r.get("entities")[0].get("id"),
2801 r.get("entities")[1].get("id"),
2802 ):
2803 vnf_relations.append(r)
2804
2805 # if no relations, terminate
2806 if not ns_relations and not vnf_relations:
2807 self.logger.debug(logging_text + " No relations")
2808 return True
2809
2810 self.logger.debug(
2811 logging_text
2812 + " adding relations\n {}\n {}".format(
2813 ns_relations, vnf_relations
2814 )
2815 )
2816
2817 # add all relations
2818 start = time()
2819 while True:
2820 # check timeout
2821 now = time()
2822 if now - start >= timeout:
2823 self.logger.error(logging_text + " : timeout adding relations")
2824 return False
2825
2826 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2827 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2828
2829 # for each defined NS relation, find the VCA's related
2830 for r in ns_relations.copy():
2831 from_vca_ee_id = None
2832 to_vca_ee_id = None
2833 from_vca_endpoint = None
2834 to_vca_endpoint = None
2835 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2836 for vca in vca_list:
2837 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2838 "id"
2839 ) and vca.get("config_sw_installed"):
2840 from_vca_ee_id = vca.get("ee_id")
2841 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2842 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2843 "id"
2844 ) and vca.get("config_sw_installed"):
2845 to_vca_ee_id = vca.get("ee_id")
2846 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2847 if from_vca_ee_id and to_vca_ee_id:
2848 # add relation
2849 await self.vca_map[vca_type].add_relation(
2850 ee_id_1=from_vca_ee_id,
2851 ee_id_2=to_vca_ee_id,
2852 endpoint_1=from_vca_endpoint,
2853 endpoint_2=to_vca_endpoint,
2854 vca_id=vca_id,
2855 )
2856 # remove entry from relations list
2857 ns_relations.remove(r)
2858 else:
2859 # check failed peers
2860 try:
2861 vca_status_list = db_nsr.get("configurationStatus")
2862 if vca_status_list:
2863 for i in range(len(vca_list)):
2864 vca = vca_list[i]
2865 vca_status = vca_status_list[i]
2866 if vca.get("member-vnf-index") == r.get("entities")[
2867 0
2868 ].get("id"):
2869 if vca_status.get("status") == "BROKEN":
2870 # peer broken: remove relation from list
2871 ns_relations.remove(r)
2872 if vca.get("member-vnf-index") == r.get("entities")[
2873 1
2874 ].get("id"):
2875 if vca_status.get("status") == "BROKEN":
2876 # peer broken: remove relation from list
2877 ns_relations.remove(r)
2878 except Exception:
2879 # ignore
2880 pass
2881
2882 # for each defined VNF relation, find the VCA's related
2883 for r in vnf_relations.copy():
2884 from_vca_ee_id = None
2885 to_vca_ee_id = None
2886 from_vca_endpoint = None
2887 to_vca_endpoint = None
2888 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2889 for vca in vca_list:
2890 key_to_check = "vdu_id"
2891 if vca.get("vdu_id") is None:
2892 key_to_check = "vnfd_id"
2893 if vca.get(key_to_check) == r.get("entities")[0].get(
2894 "id"
2895 ) and vca.get("config_sw_installed"):
2896 from_vca_ee_id = vca.get("ee_id")
2897 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2898 if vca.get(key_to_check) == r.get("entities")[1].get(
2899 "id"
2900 ) and vca.get("config_sw_installed"):
2901 to_vca_ee_id = vca.get("ee_id")
2902 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2903 if from_vca_ee_id and to_vca_ee_id:
2904 # add relation
2905 await self.vca_map[vca_type].add_relation(
2906 ee_id_1=from_vca_ee_id,
2907 ee_id_2=to_vca_ee_id,
2908 endpoint_1=from_vca_endpoint,
2909 endpoint_2=to_vca_endpoint,
2910 vca_id=vca_id,
2911 )
2912 # remove entry from relations list
2913 vnf_relations.remove(r)
2914 else:
2915 # check failed peers
2916 try:
2917 vca_status_list = db_nsr.get("configurationStatus")
2918 if vca_status_list:
2919 for i in range(len(vca_list)):
2920 vca = vca_list[i]
2921 vca_status = vca_status_list[i]
2922 if vca.get("vdu_id") == r.get("entities")[0].get(
2923 "id"
2924 ):
2925 if vca_status.get("status") == "BROKEN":
2926 # peer broken: remove relation from list
2927 vnf_relations.remove(r)
2928 if vca.get("vdu_id") == r.get("entities")[1].get(
2929 "id"
2930 ):
2931 if vca_status.get("status") == "BROKEN":
2932 # peer broken: remove relation from list
2933 vnf_relations.remove(r)
2934 except Exception:
2935 # ignore
2936 pass
2937
2938 # wait for next try
2939 await asyncio.sleep(5.0)
2940
2941 if not ns_relations and not vnf_relations:
2942 self.logger.debug("Relations added")
2943 break
2944
2945 return True
2946
2947 except Exception as e:
2948 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2949 return False
2950
2951 async def _install_kdu(
2952 self,
2953 nsr_id: str,
2954 nsr_db_path: str,
2955 vnfr_data: dict,
2956 kdu_index: int,
2957 kdud: dict,
2958 vnfd: dict,
2959 k8s_instance_info: dict,
2960 k8params: dict = None,
2961 timeout: int = 600,
2962 vca_id: str = None,
2963 ):
2964
2965 try:
2966 k8sclustertype = k8s_instance_info["k8scluster-type"]
2967 # Instantiate kdu
2968 db_dict_install = {
2969 "collection": "nsrs",
2970 "filter": {"_id": nsr_id},
2971 "path": nsr_db_path,
2972 }
2973
2974 if k8s_instance_info.get("kdu-deployment-name"):
2975 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2976 else:
2977 kdu_instance = self.k8scluster_map[
2978 k8sclustertype
2979 ].generate_kdu_instance_name(
2980 db_dict=db_dict_install,
2981 kdu_model=k8s_instance_info["kdu-model"],
2982 kdu_name=k8s_instance_info["kdu-name"],
2983 )
2984 self.update_db_2(
2985 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2986 )
2987 await self.k8scluster_map[k8sclustertype].install(
2988 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2989 kdu_model=k8s_instance_info["kdu-model"],
2990 atomic=True,
2991 params=k8params,
2992 db_dict=db_dict_install,
2993 timeout=timeout,
2994 kdu_name=k8s_instance_info["kdu-name"],
2995 namespace=k8s_instance_info["namespace"],
2996 kdu_instance=kdu_instance,
2997 vca_id=vca_id,
2998 )
2999 self.update_db_2(
3000 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3001 )
3002
3003 # Obtain services to obtain management service ip
3004 services = await self.k8scluster_map[k8sclustertype].get_services(
3005 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3006 kdu_instance=kdu_instance,
3007 namespace=k8s_instance_info["namespace"],
3008 )
3009
3010 # Obtain management service info (if exists)
3011 vnfr_update_dict = {}
3012 kdu_config = get_configuration(vnfd, kdud["name"])
3013 if kdu_config:
3014 target_ee_list = kdu_config.get("execution-environment-list", [])
3015 else:
3016 target_ee_list = []
3017
3018 if services:
3019 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3020 mgmt_services = [
3021 service
3022 for service in kdud.get("service", [])
3023 if service.get("mgmt-service")
3024 ]
3025 for mgmt_service in mgmt_services:
3026 for service in services:
3027 if service["name"].startswith(mgmt_service["name"]):
3028 # Mgmt service found, Obtain service ip
3029 ip = service.get("external_ip", service.get("cluster_ip"))
3030 if isinstance(ip, list) and len(ip) == 1:
3031 ip = ip[0]
3032
3033 vnfr_update_dict[
3034 "kdur.{}.ip-address".format(kdu_index)
3035 ] = ip
3036
3037 # Check if must update also mgmt ip at the vnf
3038 service_external_cp = mgmt_service.get(
3039 "external-connection-point-ref"
3040 )
3041 if service_external_cp:
3042 if (
3043 deep_get(vnfd, ("mgmt-interface", "cp"))
3044 == service_external_cp
3045 ):
3046 vnfr_update_dict["ip-address"] = ip
3047
3048 if find_in_list(
3049 target_ee_list,
3050 lambda ee: ee.get(
3051 "external-connection-point-ref", ""
3052 )
3053 == service_external_cp,
3054 ):
3055 vnfr_update_dict[
3056 "kdur.{}.ip-address".format(kdu_index)
3057 ] = ip
3058 break
3059 else:
3060 self.logger.warn(
3061 "Mgmt service name: {} not found".format(
3062 mgmt_service["name"]
3063 )
3064 )
3065
3066 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3067 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3068
3069 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3070 if (
3071 kdu_config
3072 and kdu_config.get("initial-config-primitive")
3073 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3074 ):
3075 initial_config_primitive_list = kdu_config.get(
3076 "initial-config-primitive"
3077 )
3078 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3079
3080 for initial_config_primitive in initial_config_primitive_list:
3081 primitive_params_ = self._map_primitive_params(
3082 initial_config_primitive, {}, {}
3083 )
3084
3085 await asyncio.wait_for(
3086 self.k8scluster_map[k8sclustertype].exec_primitive(
3087 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3088 kdu_instance=kdu_instance,
3089 primitive_name=initial_config_primitive["name"],
3090 params=primitive_params_,
3091 db_dict=db_dict_install,
3092 vca_id=vca_id,
3093 ),
3094 timeout=timeout,
3095 )
3096
3097 except Exception as e:
3098 # Prepare update db with error and raise exception
3099 try:
3100 self.update_db_2(
3101 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3102 )
3103 self.update_db_2(
3104 "vnfrs",
3105 vnfr_data.get("_id"),
3106 {"kdur.{}.status".format(kdu_index): "ERROR"},
3107 )
3108 except Exception:
3109 # ignore to keep original exception
3110 pass
3111 # reraise original error
3112 raise
3113
3114 return kdu_instance
3115
3116 async def deploy_kdus(
3117 self,
3118 logging_text,
3119 nsr_id,
3120 nslcmop_id,
3121 db_vnfrs,
3122 db_vnfds,
3123 task_instantiation_info,
3124 ):
3125 # Launch kdus if present in the descriptor
3126
3127 k8scluster_id_2_uuic = {
3128 "helm-chart-v3": {},
3129 "helm-chart": {},
3130 "juju-bundle": {},
3131 }
3132
3133 async def _get_cluster_id(cluster_id, cluster_type):
3134 nonlocal k8scluster_id_2_uuic
3135 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3136 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3137
3138 # check if K8scluster is creating and wait look if previous tasks in process
3139 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3140 "k8scluster", cluster_id
3141 )
3142 if task_dependency:
3143 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3144 task_name, cluster_id
3145 )
3146 self.logger.debug(logging_text + text)
3147 await asyncio.wait(task_dependency, timeout=3600)
3148
3149 db_k8scluster = self.db.get_one(
3150 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3151 )
3152 if not db_k8scluster:
3153 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3154
3155 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3156 if not k8s_id:
3157 if cluster_type == "helm-chart-v3":
3158 try:
3159 # backward compatibility for existing clusters that have not been initialized for helm v3
3160 k8s_credentials = yaml.safe_dump(
3161 db_k8scluster.get("credentials")
3162 )
3163 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3164 k8s_credentials, reuse_cluster_uuid=cluster_id
3165 )
3166 db_k8scluster_update = {}
3167 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3168 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3169 db_k8scluster_update[
3170 "_admin.helm-chart-v3.created"
3171 ] = uninstall_sw
3172 db_k8scluster_update[
3173 "_admin.helm-chart-v3.operationalState"
3174 ] = "ENABLED"
3175 self.update_db_2(
3176 "k8sclusters", cluster_id, db_k8scluster_update
3177 )
3178 except Exception as e:
3179 self.logger.error(
3180 logging_text
3181 + "error initializing helm-v3 cluster: {}".format(str(e))
3182 )
3183 raise LcmException(
3184 "K8s cluster '{}' has not been initialized for '{}'".format(
3185 cluster_id, cluster_type
3186 )
3187 )
3188 else:
3189 raise LcmException(
3190 "K8s cluster '{}' has not been initialized for '{}'".format(
3191 cluster_id, cluster_type
3192 )
3193 )
3194 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3195 return k8s_id
3196
3197 logging_text += "Deploy kdus: "
3198 step = ""
3199 try:
3200 db_nsr_update = {"_admin.deployed.K8s": []}
3201 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3202
3203 index = 0
3204 updated_cluster_list = []
3205 updated_v3_cluster_list = []
3206
3207 for vnfr_data in db_vnfrs.values():
3208 vca_id = self.get_vca_id(vnfr_data, {})
3209 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3210 # Step 0: Prepare and set parameters
3211 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3212 vnfd_id = vnfr_data.get("vnfd-id")
3213 vnfd_with_id = find_in_list(
3214 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3215 )
3216 kdud = next(
3217 kdud
3218 for kdud in vnfd_with_id["kdu"]
3219 if kdud["name"] == kdur["kdu-name"]
3220 )
3221 namespace = kdur.get("k8s-namespace")
3222 kdu_deployment_name = kdur.get("kdu-deployment-name")
3223 if kdur.get("helm-chart"):
3224 kdumodel = kdur["helm-chart"]
3225 # Default version: helm3, if helm-version is v2 assign v2
3226 k8sclustertype = "helm-chart-v3"
3227 self.logger.debug("kdur: {}".format(kdur))
3228 if (
3229 kdur.get("helm-version")
3230 and kdur.get("helm-version") == "v2"
3231 ):
3232 k8sclustertype = "helm-chart"
3233 elif kdur.get("juju-bundle"):
3234 kdumodel = kdur["juju-bundle"]
3235 k8sclustertype = "juju-bundle"
3236 else:
3237 raise LcmException(
3238 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3239 "juju-bundle. Maybe an old NBI version is running".format(
3240 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3241 )
3242 )
3243 # check if kdumodel is a file and exists
3244 try:
3245 vnfd_with_id = find_in_list(
3246 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3247 )
3248 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3249 if storage and storage.get(
3250 "pkg-dir"
3251 ): # may be not present if vnfd has not artifacts
3252 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3253 filename = "{}/{}/{}s/{}".format(
3254 storage["folder"],
3255 storage["pkg-dir"],
3256 k8sclustertype,
3257 kdumodel,
3258 )
3259 if self.fs.file_exists(
3260 filename, mode="file"
3261 ) or self.fs.file_exists(filename, mode="dir"):
3262 kdumodel = self.fs.path + filename
3263 except (asyncio.TimeoutError, asyncio.CancelledError):
3264 raise
3265 except Exception: # it is not a file
3266 pass
3267
3268 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3269 step = "Synchronize repos for k8s cluster '{}'".format(
3270 k8s_cluster_id
3271 )
3272 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3273
3274 # Synchronize repos
3275 if (
3276 k8sclustertype == "helm-chart"
3277 and cluster_uuid not in updated_cluster_list
3278 ) or (
3279 k8sclustertype == "helm-chart-v3"
3280 and cluster_uuid not in updated_v3_cluster_list
3281 ):
3282 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3283 self.k8scluster_map[k8sclustertype].synchronize_repos(
3284 cluster_uuid=cluster_uuid
3285 )
3286 )
3287 if del_repo_list or added_repo_dict:
3288 if k8sclustertype == "helm-chart":
3289 unset = {
3290 "_admin.helm_charts_added." + item: None
3291 for item in del_repo_list
3292 }
3293 updated = {
3294 "_admin.helm_charts_added." + item: name
3295 for item, name in added_repo_dict.items()
3296 }
3297 updated_cluster_list.append(cluster_uuid)
3298 elif k8sclustertype == "helm-chart-v3":
3299 unset = {
3300 "_admin.helm_charts_v3_added." + item: None
3301 for item in del_repo_list
3302 }
3303 updated = {
3304 "_admin.helm_charts_v3_added." + item: name
3305 for item, name in added_repo_dict.items()
3306 }
3307 updated_v3_cluster_list.append(cluster_uuid)
3308 self.logger.debug(
3309 logging_text + "repos synchronized on k8s cluster "
3310 "'{}' to_delete: {}, to_add: {}".format(
3311 k8s_cluster_id, del_repo_list, added_repo_dict
3312 )
3313 )
3314 self.db.set_one(
3315 "k8sclusters",
3316 {"_id": k8s_cluster_id},
3317 updated,
3318 unset=unset,
3319 )
3320
3321 # Instantiate kdu
3322 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3323 vnfr_data["member-vnf-index-ref"],
3324 kdur["kdu-name"],
3325 k8s_cluster_id,
3326 )
3327 k8s_instance_info = {
3328 "kdu-instance": None,
3329 "k8scluster-uuid": cluster_uuid,
3330 "k8scluster-type": k8sclustertype,
3331 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3332 "kdu-name": kdur["kdu-name"],
3333 "kdu-model": kdumodel,
3334 "namespace": namespace,
3335 "kdu-deployment-name": kdu_deployment_name,
3336 }
3337 db_path = "_admin.deployed.K8s.{}".format(index)
3338 db_nsr_update[db_path] = k8s_instance_info
3339 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3340 vnfd_with_id = find_in_list(
3341 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3342 )
3343 task = asyncio.ensure_future(
3344 self._install_kdu(
3345 nsr_id,
3346 db_path,
3347 vnfr_data,
3348 kdu_index,
3349 kdud,
3350 vnfd_with_id,
3351 k8s_instance_info,
3352 k8params=desc_params,
3353 timeout=1800,
3354 vca_id=vca_id,
3355 )
3356 )
3357 self.lcm_tasks.register(
3358 "ns",
3359 nsr_id,
3360 nslcmop_id,
3361 "instantiate_KDU-{}".format(index),
3362 task,
3363 )
3364 task_instantiation_info[task] = "Deploying KDU {}".format(
3365 kdur["kdu-name"]
3366 )
3367
3368 index += 1
3369
3370 except (LcmException, asyncio.CancelledError):
3371 raise
3372 except Exception as e:
3373 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3374 if isinstance(e, (N2VCException, DbException)):
3375 self.logger.error(logging_text + msg)
3376 else:
3377 self.logger.critical(logging_text + msg, exc_info=True)
3378 raise LcmException(msg)
3379 finally:
3380 if db_nsr_update:
3381 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3382
3383 def _deploy_n2vc(
3384 self,
3385 logging_text,
3386 db_nsr,
3387 db_vnfr,
3388 nslcmop_id,
3389 nsr_id,
3390 nsi_id,
3391 vnfd_id,
3392 vdu_id,
3393 kdu_name,
3394 member_vnf_index,
3395 vdu_index,
3396 vdu_name,
3397 deploy_params,
3398 descriptor_config,
3399 base_folder,
3400 task_instantiation_info,
3401 stage,
3402 ):
3403 # launch instantiate_N2VC in a asyncio task and register task object
3404 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3405 # if not found, create one entry and update database
3406 # fill db_nsr._admin.deployed.VCA.<index>
3407
3408 self.logger.debug(
3409 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3410 )
3411 if "execution-environment-list" in descriptor_config:
3412 ee_list = descriptor_config.get("execution-environment-list", [])
3413 elif "juju" in descriptor_config:
3414 ee_list = [descriptor_config] # ns charms
3415 else: # other types as script are not supported
3416 ee_list = []
3417
3418 for ee_item in ee_list:
3419 self.logger.debug(
3420 logging_text
3421 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3422 ee_item.get("juju"), ee_item.get("helm-chart")
3423 )
3424 )
3425 ee_descriptor_id = ee_item.get("id")
3426 if ee_item.get("juju"):
3427 vca_name = ee_item["juju"].get("charm")
3428 vca_type = (
3429 "lxc_proxy_charm"
3430 if ee_item["juju"].get("charm") is not None
3431 else "native_charm"
3432 )
3433 if ee_item["juju"].get("cloud") == "k8s":
3434 vca_type = "k8s_proxy_charm"
3435 elif ee_item["juju"].get("proxy") is False:
3436 vca_type = "native_charm"
3437 elif ee_item.get("helm-chart"):
3438 vca_name = ee_item["helm-chart"]
3439 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3440 vca_type = "helm"
3441 else:
3442 vca_type = "helm-v3"
3443 else:
3444 self.logger.debug(
3445 logging_text + "skipping non juju neither charm configuration"
3446 )
3447 continue
3448
3449 vca_index = -1
3450 for vca_index, vca_deployed in enumerate(
3451 db_nsr["_admin"]["deployed"]["VCA"]
3452 ):
3453 if not vca_deployed:
3454 continue
3455 if (
3456 vca_deployed.get("member-vnf-index") == member_vnf_index
3457 and vca_deployed.get("vdu_id") == vdu_id
3458 and vca_deployed.get("kdu_name") == kdu_name
3459 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3460 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3461 ):
3462 break
3463 else:
3464 # not found, create one.
3465 target = (
3466 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3467 )
3468 if vdu_id:
3469 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3470 elif kdu_name:
3471 target += "/kdu/{}".format(kdu_name)
3472 vca_deployed = {
3473 "target_element": target,
3474 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3475 "member-vnf-index": member_vnf_index,
3476 "vdu_id": vdu_id,
3477 "kdu_name": kdu_name,
3478 "vdu_count_index": vdu_index,
3479 "operational-status": "init", # TODO revise
3480 "detailed-status": "", # TODO revise
3481 "step": "initial-deploy", # TODO revise
3482 "vnfd_id": vnfd_id,
3483 "vdu_name": vdu_name,
3484 "type": vca_type,
3485 "ee_descriptor_id": ee_descriptor_id,
3486 }
3487 vca_index += 1
3488
3489 # create VCA and configurationStatus in db
3490 db_dict = {
3491 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3492 "configurationStatus.{}".format(vca_index): dict(),
3493 }
3494 self.update_db_2("nsrs", nsr_id, db_dict)
3495
3496 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3497
3498 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3499 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3500 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3501
3502 # Launch task
3503 task_n2vc = asyncio.ensure_future(
3504 self.instantiate_N2VC(
3505 logging_text=logging_text,
3506 vca_index=vca_index,
3507 nsi_id=nsi_id,
3508 db_nsr=db_nsr,
3509 db_vnfr=db_vnfr,
3510 vdu_id=vdu_id,
3511 kdu_name=kdu_name,
3512 vdu_index=vdu_index,
3513 deploy_params=deploy_params,
3514 config_descriptor=descriptor_config,
3515 base_folder=base_folder,
3516 nslcmop_id=nslcmop_id,
3517 stage=stage,
3518 vca_type=vca_type,
3519 vca_name=vca_name,
3520 ee_config_descriptor=ee_item,
3521 )
3522 )
3523 self.lcm_tasks.register(
3524 "ns",
3525 nsr_id,
3526 nslcmop_id,
3527 "instantiate_N2VC-{}".format(vca_index),
3528 task_n2vc,
3529 )
3530 task_instantiation_info[
3531 task_n2vc
3532 ] = self.task_name_deploy_vca + " {}.{}".format(
3533 member_vnf_index or "", vdu_id or ""
3534 )
3535
3536 @staticmethod
3537 def _create_nslcmop(nsr_id, operation, params):
3538 """
3539 Creates a ns-lcm-opp content to be stored at database.
3540 :param nsr_id: internal id of the instance
3541 :param operation: instantiate, terminate, scale, action, ...
3542 :param params: user parameters for the operation
3543 :return: dictionary following SOL005 format
3544 """
3545 # Raise exception if invalid arguments
3546 if not (nsr_id and operation and params):
3547 raise LcmException(
3548 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3549 )
3550 now = time()
3551 _id = str(uuid4())
3552 nslcmop = {
3553 "id": _id,
3554 "_id": _id,
3555 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3556 "operationState": "PROCESSING",
3557 "statusEnteredTime": now,
3558 "nsInstanceId": nsr_id,
3559 "lcmOperationType": operation,
3560 "startTime": now,
3561 "isAutomaticInvocation": False,
3562 "operationParams": params,
3563 "isCancelPending": False,
3564 "links": {
3565 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3566 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3567 },
3568 }
3569 return nslcmop
3570
3571 def _format_additional_params(self, params):
3572 params = params or {}
3573 for key, value in params.items():
3574 if str(value).startswith("!!yaml "):
3575 params[key] = yaml.safe_load(value[7:])
3576 return params
3577
3578 def _get_terminate_primitive_params(self, seq, vnf_index):
3579 primitive = seq.get("name")
3580 primitive_params = {}
3581 params = {
3582 "member_vnf_index": vnf_index,
3583 "primitive": primitive,
3584 "primitive_params": primitive_params,
3585 }
3586 desc_params = {}
3587 return self._map_primitive_params(seq, params, desc_params)
3588
3589 # sub-operations
3590
3591 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3592 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3593 if op.get("operationState") == "COMPLETED":
3594 # b. Skip sub-operation
3595 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3596 return self.SUBOPERATION_STATUS_SKIP
3597 else:
3598 # c. retry executing sub-operation
3599 # The sub-operation exists, and operationState != 'COMPLETED'
3600 # Update operationState = 'PROCESSING' to indicate a retry.
3601 operationState = "PROCESSING"
3602 detailed_status = "In progress"
3603 self._update_suboperation_status(
3604 db_nslcmop, op_index, operationState, detailed_status
3605 )
3606 # Return the sub-operation index
3607 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3608 # with arguments extracted from the sub-operation
3609 return op_index
3610
3611 # Find a sub-operation where all keys in a matching dictionary must match
3612 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3613 def _find_suboperation(self, db_nslcmop, match):
3614 if db_nslcmop and match:
3615 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3616 for i, op in enumerate(op_list):
3617 if all(op.get(k) == match[k] for k in match):
3618 return i
3619 return self.SUBOPERATION_STATUS_NOT_FOUND
3620
3621 # Update status for a sub-operation given its index
3622 def _update_suboperation_status(
3623 self, db_nslcmop, op_index, operationState, detailed_status
3624 ):
3625 # Update DB for HA tasks
3626 q_filter = {"_id": db_nslcmop["_id"]}
3627 update_dict = {
3628 "_admin.operations.{}.operationState".format(op_index): operationState,
3629 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3630 }
3631 self.db.set_one(
3632 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3633 )
3634
3635 # Add sub-operation, return the index of the added sub-operation
3636 # Optionally, set operationState, detailed-status, and operationType
3637 # Status and type are currently set for 'scale' sub-operations:
3638 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3639 # 'detailed-status' : status message
3640 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3641 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3642 def _add_suboperation(
3643 self,
3644 db_nslcmop,
3645 vnf_index,
3646 vdu_id,
3647 vdu_count_index,
3648 vdu_name,
3649 primitive,
3650 mapped_primitive_params,
3651 operationState=None,
3652 detailed_status=None,
3653 operationType=None,
3654 RO_nsr_id=None,
3655 RO_scaling_info=None,
3656 ):
3657 if not db_nslcmop:
3658 return self.SUBOPERATION_STATUS_NOT_FOUND
3659 # Get the "_admin.operations" list, if it exists
3660 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3661 op_list = db_nslcmop_admin.get("operations")
3662 # Create or append to the "_admin.operations" list
3663 new_op = {
3664 "member_vnf_index": vnf_index,
3665 "vdu_id": vdu_id,
3666 "vdu_count_index": vdu_count_index,
3667 "primitive": primitive,
3668 "primitive_params": mapped_primitive_params,
3669 }
3670 if operationState:
3671 new_op["operationState"] = operationState
3672 if detailed_status:
3673 new_op["detailed-status"] = detailed_status
3674 if operationType:
3675 new_op["lcmOperationType"] = operationType
3676 if RO_nsr_id:
3677 new_op["RO_nsr_id"] = RO_nsr_id
3678 if RO_scaling_info:
3679 new_op["RO_scaling_info"] = RO_scaling_info
3680 if not op_list:
3681 # No existing operations, create key 'operations' with current operation as first list element
3682 db_nslcmop_admin.update({"operations": [new_op]})
3683 op_list = db_nslcmop_admin.get("operations")
3684 else:
3685 # Existing operations, append operation to list
3686 op_list.append(new_op)
3687
3688 db_nslcmop_update = {"_admin.operations": op_list}
3689 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3690 op_index = len(op_list) - 1
3691 return op_index
3692
3693 # Helper methods for scale() sub-operations
3694
3695 # pre-scale/post-scale:
3696 # Check for 3 different cases:
3697 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3698 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3699 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3700 def _check_or_add_scale_suboperation(
3701 self,
3702 db_nslcmop,
3703 vnf_index,
3704 vnf_config_primitive,
3705 primitive_params,
3706 operationType,
3707 RO_nsr_id=None,
3708 RO_scaling_info=None,
3709 ):
3710 # Find this sub-operation
3711 if RO_nsr_id and RO_scaling_info:
3712 operationType = "SCALE-RO"
3713 match = {
3714 "member_vnf_index": vnf_index,
3715 "RO_nsr_id": RO_nsr_id,
3716 "RO_scaling_info": RO_scaling_info,
3717 }
3718 else:
3719 match = {
3720 "member_vnf_index": vnf_index,
3721 "primitive": vnf_config_primitive,
3722 "primitive_params": primitive_params,
3723 "lcmOperationType": operationType,
3724 }
3725 op_index = self._find_suboperation(db_nslcmop, match)
3726 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3727 # a. New sub-operation
3728 # The sub-operation does not exist, add it.
3729 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3730 # The following parameters are set to None for all kind of scaling:
3731 vdu_id = None
3732 vdu_count_index = None
3733 vdu_name = None
3734 if RO_nsr_id and RO_scaling_info:
3735 vnf_config_primitive = None
3736 primitive_params = None
3737 else:
3738 RO_nsr_id = None
3739 RO_scaling_info = None
3740 # Initial status for sub-operation
3741 operationState = "PROCESSING"
3742 detailed_status = "In progress"
3743 # Add sub-operation for pre/post-scaling (zero or more operations)
3744 self._add_suboperation(
3745 db_nslcmop,
3746 vnf_index,
3747 vdu_id,
3748 vdu_count_index,
3749 vdu_name,
3750 vnf_config_primitive,
3751 primitive_params,
3752 operationState,
3753 detailed_status,
3754 operationType,
3755 RO_nsr_id,
3756 RO_scaling_info,
3757 )
3758 return self.SUBOPERATION_STATUS_NEW
3759 else:
3760 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3761 # or op_index (operationState != 'COMPLETED')
3762 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3763
3764 # Function to return execution_environment id
3765
3766 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3767 # TODO vdu_index_count
3768 for vca in vca_deployed_list:
3769 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3770 return vca["ee_id"]
3771
3772 async def destroy_N2VC(
3773 self,
3774 logging_text,
3775 db_nslcmop,
3776 vca_deployed,
3777 config_descriptor,
3778 vca_index,
3779 destroy_ee=True,
3780 exec_primitives=True,
3781 scaling_in=False,
3782 vca_id: str = None,
3783 ):
3784 """
3785 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3786 :param logging_text:
3787 :param db_nslcmop:
3788 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3789 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3790 :param vca_index: index in the database _admin.deployed.VCA
3791 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3792 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3793 not executed properly
3794 :param scaling_in: True destroys the application, False destroys the model
3795 :return: None or exception
3796 """
3797
3798 self.logger.debug(
3799 logging_text
3800 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3801 vca_index, vca_deployed, config_descriptor, destroy_ee
3802 )
3803 )
3804
3805 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3806
3807 # execute terminate_primitives
3808 if exec_primitives:
3809 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3810 config_descriptor.get("terminate-config-primitive"),
3811 vca_deployed.get("ee_descriptor_id"),
3812 )
3813 vdu_id = vca_deployed.get("vdu_id")
3814 vdu_count_index = vca_deployed.get("vdu_count_index")
3815 vdu_name = vca_deployed.get("vdu_name")
3816 vnf_index = vca_deployed.get("member-vnf-index")
3817 if terminate_primitives and vca_deployed.get("needed_terminate"):
3818 for seq in terminate_primitives:
3819 # For each sequence in list, get primitive and call _ns_execute_primitive()
3820 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3821 vnf_index, seq.get("name")
3822 )
3823 self.logger.debug(logging_text + step)
3824 # Create the primitive for each sequence, i.e. "primitive": "touch"
3825 primitive = seq.get("name")
3826 mapped_primitive_params = self._get_terminate_primitive_params(
3827 seq, vnf_index
3828 )
3829
3830 # Add sub-operation
3831 self._add_suboperation(
3832 db_nslcmop,
3833 vnf_index,
3834 vdu_id,
3835 vdu_count_index,
3836 vdu_name,
3837 primitive,
3838 mapped_primitive_params,
3839 )
3840 # Sub-operations: Call _ns_execute_primitive() instead of action()
3841 try:
3842 result, result_detail = await self._ns_execute_primitive(
3843 vca_deployed["ee_id"],
3844 primitive,
3845 mapped_primitive_params,
3846 vca_type=vca_type,
3847 vca_id=vca_id,
3848 )
3849 except LcmException:
3850 # this happens when VCA is not deployed. In this case it is not needed to terminate
3851 continue
3852 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3853 if result not in result_ok:
3854 raise LcmException(
3855 "terminate_primitive {} for vnf_member_index={} fails with "
3856 "error {}".format(seq.get("name"), vnf_index, result_detail)
3857 )
3858 # set that this VCA do not need terminated
3859 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3860 vca_index
3861 )
3862 self.update_db_2(
3863 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3864 )
3865
3866 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3867 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3868
3869 if destroy_ee:
3870 await self.vca_map[vca_type].delete_execution_environment(
3871 vca_deployed["ee_id"],
3872 scaling_in=scaling_in,
3873 vca_type=vca_type,
3874 vca_id=vca_id,
3875 )
3876
3877 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3878 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3879 namespace = "." + db_nsr["_id"]
3880 try:
3881 await self.n2vc.delete_namespace(
3882 namespace=namespace,
3883 total_timeout=self.timeout_charm_delete,
3884 vca_id=vca_id,
3885 )
3886 except N2VCNotFound: # already deleted. Skip
3887 pass
3888 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3889
3890 async def _terminate_RO(
3891 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3892 ):
3893 """
3894 Terminates a deployment from RO
3895 :param logging_text:
3896 :param nsr_deployed: db_nsr._admin.deployed
3897 :param nsr_id:
3898 :param nslcmop_id:
3899 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3900 this method will update only the index 2, but it will write on database the concatenated content of the list
3901 :return:
3902 """
3903 db_nsr_update = {}
3904 failed_detail = []
3905 ro_nsr_id = ro_delete_action = None
3906 if nsr_deployed and nsr_deployed.get("RO"):
3907 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3908 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3909 try:
3910 if ro_nsr_id:
3911 stage[2] = "Deleting ns from VIM."
3912 db_nsr_update["detailed-status"] = " ".join(stage)
3913 self._write_op_status(nslcmop_id, stage)
3914 self.logger.debug(logging_text + stage[2])
3915 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3916 self._write_op_status(nslcmop_id, stage)
3917 desc = await self.RO.delete("ns", ro_nsr_id)
3918 ro_delete_action = desc["action_id"]
3919 db_nsr_update[
3920 "_admin.deployed.RO.nsr_delete_action_id"
3921 ] = ro_delete_action
3922 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3923 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3924 if ro_delete_action:
3925 # wait until NS is deleted from VIM
3926 stage[2] = "Waiting ns deleted from VIM."
3927 detailed_status_old = None
3928 self.logger.debug(
3929 logging_text
3930 + stage[2]
3931 + " RO_id={} ro_delete_action={}".format(
3932 ro_nsr_id, ro_delete_action
3933 )
3934 )
3935 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3936 self._write_op_status(nslcmop_id, stage)
3937
3938 delete_timeout = 20 * 60 # 20 minutes
3939 while delete_timeout > 0:
3940 desc = await self.RO.show(
3941 "ns",
3942 item_id_name=ro_nsr_id,
3943 extra_item="action",
3944 extra_item_id=ro_delete_action,
3945 )
3946
3947 # deploymentStatus
3948 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3949
3950 ns_status, ns_status_info = self.RO.check_action_status(desc)
3951 if ns_status == "ERROR":
3952 raise ROclient.ROClientException(ns_status_info)
3953 elif ns_status == "BUILD":
3954 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3955 elif ns_status == "ACTIVE":
3956 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3957 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3958 break
3959 else:
3960 assert (
3961 False
3962 ), "ROclient.check_action_status returns unknown {}".format(
3963 ns_status
3964 )
3965 if stage[2] != detailed_status_old:
3966 detailed_status_old = stage[2]
3967 db_nsr_update["detailed-status"] = " ".join(stage)
3968 self._write_op_status(nslcmop_id, stage)
3969 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3970 await asyncio.sleep(5, loop=self.loop)
3971 delete_timeout -= 5
3972 else: # delete_timeout <= 0:
3973 raise ROclient.ROClientException(
3974 "Timeout waiting ns deleted from VIM"
3975 )
3976
3977 except Exception as e:
3978 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3979 if (
3980 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3981 ): # not found
3982 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3983 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3984 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3985 self.logger.debug(
3986 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3987 )
3988 elif (
3989 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3990 ): # conflict
3991 failed_detail.append("delete conflict: {}".format(e))
3992 self.logger.debug(
3993 logging_text
3994 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
3995 )
3996 else:
3997 failed_detail.append("delete error: {}".format(e))
3998 self.logger.error(
3999 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4000 )
4001
4002 # Delete nsd
4003 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4004 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4005 try:
4006 stage[2] = "Deleting nsd from RO."
4007 db_nsr_update["detailed-status"] = " ".join(stage)
4008 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4009 self._write_op_status(nslcmop_id, stage)
4010 await self.RO.delete("nsd", ro_nsd_id)
4011 self.logger.debug(
4012 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4013 )
4014 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4015 except Exception as e:
4016 if (
4017 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4018 ): # not found
4019 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4020 self.logger.debug(
4021 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4022 )
4023 elif (
4024 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4025 ): # conflict
4026 failed_detail.append(
4027 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4028 )
4029 self.logger.debug(logging_text + failed_detail[-1])
4030 else:
4031 failed_detail.append(
4032 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4033 )
4034 self.logger.error(logging_text + failed_detail[-1])
4035
4036 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4037 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4038 if not vnf_deployed or not vnf_deployed["id"]:
4039 continue
4040 try:
4041 ro_vnfd_id = vnf_deployed["id"]
4042 stage[
4043 2
4044 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4045 vnf_deployed["member-vnf-index"], ro_vnfd_id
4046 )
4047 db_nsr_update["detailed-status"] = " ".join(stage)
4048 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4049 self._write_op_status(nslcmop_id, stage)
4050 await self.RO.delete("vnfd", ro_vnfd_id)
4051 self.logger.debug(
4052 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4053 )
4054 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4055 except Exception as e:
4056 if (
4057 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4058 ): # not found
4059 db_nsr_update[
4060 "_admin.deployed.RO.vnfd.{}.id".format(index)
4061 ] = None
4062 self.logger.debug(
4063 logging_text
4064 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4065 )
4066 elif (
4067 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4068 ): # conflict
4069 failed_detail.append(
4070 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4071 )
4072 self.logger.debug(logging_text + failed_detail[-1])
4073 else:
4074 failed_detail.append(
4075 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4076 )
4077 self.logger.error(logging_text + failed_detail[-1])
4078
4079 if failed_detail:
4080 stage[2] = "Error deleting from VIM"
4081 else:
4082 stage[2] = "Deleted from VIM"
4083 db_nsr_update["detailed-status"] = " ".join(stage)
4084 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4085 self._write_op_status(nslcmop_id, stage)
4086
4087 if failed_detail:
4088 raise LcmException("; ".join(failed_detail))
4089
4090 async def terminate(self, nsr_id, nslcmop_id):
4091 # Try to lock HA task here
4092 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4093 if not task_is_locked_by_me:
4094 return
4095
4096 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4097 self.logger.debug(logging_text + "Enter")
4098 timeout_ns_terminate = self.timeout_ns_terminate
4099 db_nsr = None
4100 db_nslcmop = None
4101 operation_params = None
4102 exc = None
4103 error_list = [] # annotates all failed error messages
4104 db_nslcmop_update = {}
4105 autoremove = False # autoremove after terminated
4106 tasks_dict_info = {}
4107 db_nsr_update = {}
4108 stage = [
4109 "Stage 1/3: Preparing task.",
4110 "Waiting for previous operations to terminate.",
4111 "",
4112 ]
4113 # ^ contains [stage, step, VIM-status]
4114 try:
4115 # wait for any previous tasks in process
4116 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4117
4118 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4119 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4120 operation_params = db_nslcmop.get("operationParams") or {}
4121 if operation_params.get("timeout_ns_terminate"):
4122 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4123 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4124 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4125
4126 db_nsr_update["operational-status"] = "terminating"
4127 db_nsr_update["config-status"] = "terminating"
4128 self._write_ns_status(
4129 nsr_id=nsr_id,
4130 ns_state="TERMINATING",
4131 current_operation="TERMINATING",
4132 current_operation_id=nslcmop_id,
4133 other_update=db_nsr_update,
4134 )
4135 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4136 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4137 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4138 return
4139
4140 stage[1] = "Getting vnf descriptors from db."
4141 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4142 db_vnfrs_dict = {
4143 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4144 }
4145 db_vnfds_from_id = {}
4146 db_vnfds_from_member_index = {}
4147 # Loop over VNFRs
4148 for vnfr in db_vnfrs_list:
4149 vnfd_id = vnfr["vnfd-id"]
4150 if vnfd_id not in db_vnfds_from_id:
4151 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4152 db_vnfds_from_id[vnfd_id] = vnfd
4153 db_vnfds_from_member_index[
4154 vnfr["member-vnf-index-ref"]
4155 ] = db_vnfds_from_id[vnfd_id]
4156
4157 # Destroy individual execution environments when there are terminating primitives.
4158 # Rest of EE will be deleted at once
4159 # TODO - check before calling _destroy_N2VC
4160 # if not operation_params.get("skip_terminate_primitives"):#
4161 # or not vca.get("needed_terminate"):
4162 stage[0] = "Stage 2/3 execute terminating primitives."
4163 self.logger.debug(logging_text + stage[0])
4164 stage[1] = "Looking execution environment that needs terminate."
4165 self.logger.debug(logging_text + stage[1])
4166
4167 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4168 config_descriptor = None
4169 vca_member_vnf_index = vca.get("member-vnf-index")
4170 vca_id = self.get_vca_id(
4171 db_vnfrs_dict.get(vca_member_vnf_index)
4172 if vca_member_vnf_index
4173 else None,
4174 db_nsr,
4175 )
4176 if not vca or not vca.get("ee_id"):
4177 continue
4178 if not vca.get("member-vnf-index"):
4179 # ns
4180 config_descriptor = db_nsr.get("ns-configuration")
4181 elif vca.get("vdu_id"):
4182 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4183 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4184 elif vca.get("kdu_name"):
4185 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4186 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4187 else:
4188 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4189 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4190 vca_type = vca.get("type")
4191 exec_terminate_primitives = not operation_params.get(
4192 "skip_terminate_primitives"
4193 ) and vca.get("needed_terminate")
4194 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4195 # pending native charms
4196 destroy_ee = (
4197 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4198 )
4199 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4200 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4201 task = asyncio.ensure_future(
4202 self.destroy_N2VC(
4203 logging_text,
4204 db_nslcmop,
4205 vca,
4206 config_descriptor,
4207 vca_index,
4208 destroy_ee,
4209 exec_terminate_primitives,
4210 vca_id=vca_id,
4211 )
4212 )
4213 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4214
4215 # wait for pending tasks of terminate primitives
4216 if tasks_dict_info:
4217 self.logger.debug(
4218 logging_text
4219 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4220 )
4221 error_list = await self._wait_for_tasks(
4222 logging_text,
4223 tasks_dict_info,
4224 min(self.timeout_charm_delete, timeout_ns_terminate),
4225 stage,
4226 nslcmop_id,
4227 )
4228 tasks_dict_info.clear()
4229 if error_list:
4230 return # raise LcmException("; ".join(error_list))
4231
4232 # remove All execution environments at once
4233 stage[0] = "Stage 3/3 delete all."
4234
4235 if nsr_deployed.get("VCA"):
4236 stage[1] = "Deleting all execution environments."
4237 self.logger.debug(logging_text + stage[1])
4238 vca_id = self.get_vca_id({}, db_nsr)
4239 task_delete_ee = asyncio.ensure_future(
4240 asyncio.wait_for(
4241 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4242 timeout=self.timeout_charm_delete,
4243 )
4244 )
4245 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4246 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4247
4248 # Delete from k8scluster
4249 stage[1] = "Deleting KDUs."
4250 self.logger.debug(logging_text + stage[1])
4251 # print(nsr_deployed)
4252 for kdu in get_iterable(nsr_deployed, "K8s"):
4253 if not kdu or not kdu.get("kdu-instance"):
4254 continue
4255 kdu_instance = kdu.get("kdu-instance")
4256 if kdu.get("k8scluster-type") in self.k8scluster_map:
4257 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4258 vca_id = self.get_vca_id({}, db_nsr)
4259 task_delete_kdu_instance = asyncio.ensure_future(
4260 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4261 cluster_uuid=kdu.get("k8scluster-uuid"),
4262 kdu_instance=kdu_instance,
4263 vca_id=vca_id,
4264 )
4265 )
4266 else:
4267 self.logger.error(
4268 logging_text
4269 + "Unknown k8s deployment type {}".format(
4270 kdu.get("k8scluster-type")
4271 )
4272 )
4273 continue
4274 tasks_dict_info[
4275 task_delete_kdu_instance
4276 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4277
4278 # remove from RO
4279 stage[1] = "Deleting ns from VIM."
4280 if self.ng_ro:
4281 task_delete_ro = asyncio.ensure_future(
4282 self._terminate_ng_ro(
4283 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4284 )
4285 )
4286 else:
4287 task_delete_ro = asyncio.ensure_future(
4288 self._terminate_RO(
4289 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4290 )
4291 )
4292 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4293
4294 # rest of staff will be done at finally
4295
4296 except (
4297 ROclient.ROClientException,
4298 DbException,
4299 LcmException,
4300 N2VCException,
4301 ) as e:
4302 self.logger.error(logging_text + "Exit Exception {}".format(e))
4303 exc = e
4304 except asyncio.CancelledError:
4305 self.logger.error(
4306 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4307 )
4308 exc = "Operation was cancelled"
4309 except Exception as e:
4310 exc = traceback.format_exc()
4311 self.logger.critical(
4312 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4313 exc_info=True,
4314 )
4315 finally:
4316 if exc:
4317 error_list.append(str(exc))
4318 try:
4319 # wait for pending tasks
4320 if tasks_dict_info:
4321 stage[1] = "Waiting for terminate pending tasks."
4322 self.logger.debug(logging_text + stage[1])
4323 error_list += await self._wait_for_tasks(
4324 logging_text,
4325 tasks_dict_info,
4326 timeout_ns_terminate,
4327 stage,
4328 nslcmop_id,
4329 )
4330 stage[1] = stage[2] = ""
4331 except asyncio.CancelledError:
4332 error_list.append("Cancelled")
4333 # TODO cancell all tasks
4334 except Exception as exc:
4335 error_list.append(str(exc))
4336 # update status at database
4337 if error_list:
4338 error_detail = "; ".join(error_list)
4339 # self.logger.error(logging_text + error_detail)
4340 error_description_nslcmop = "{} Detail: {}".format(
4341 stage[0], error_detail
4342 )
4343 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4344 nslcmop_id, stage[0]
4345 )
4346
4347 db_nsr_update["operational-status"] = "failed"
4348 db_nsr_update["detailed-status"] = (
4349 error_description_nsr + " Detail: " + error_detail
4350 )
4351 db_nslcmop_update["detailed-status"] = error_detail
4352 nslcmop_operation_state = "FAILED"
4353 ns_state = "BROKEN"
4354 else:
4355 error_detail = None
4356 error_description_nsr = error_description_nslcmop = None
4357 ns_state = "NOT_INSTANTIATED"
4358 db_nsr_update["operational-status"] = "terminated"
4359 db_nsr_update["detailed-status"] = "Done"
4360 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4361 db_nslcmop_update["detailed-status"] = "Done"
4362 nslcmop_operation_state = "COMPLETED"
4363
4364 if db_nsr:
4365 self._write_ns_status(
4366 nsr_id=nsr_id,
4367 ns_state=ns_state,
4368 current_operation="IDLE",
4369 current_operation_id=None,
4370 error_description=error_description_nsr,
4371 error_detail=error_detail,
4372 other_update=db_nsr_update,
4373 )
4374 self._write_op_status(
4375 op_id=nslcmop_id,
4376 stage="",
4377 error_message=error_description_nslcmop,
4378 operation_state=nslcmop_operation_state,
4379 other_update=db_nslcmop_update,
4380 )
4381 if ns_state == "NOT_INSTANTIATED":
4382 try:
4383 self.db.set_list(
4384 "vnfrs",
4385 {"nsr-id-ref": nsr_id},
4386 {"_admin.nsState": "NOT_INSTANTIATED"},
4387 )
4388 except DbException as e:
4389 self.logger.warn(
4390 logging_text
4391 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4392 nsr_id, e
4393 )
4394 )
4395 if operation_params:
4396 autoremove = operation_params.get("autoremove", False)
4397 if nslcmop_operation_state:
4398 try:
4399 await self.msg.aiowrite(
4400 "ns",
4401 "terminated",
4402 {
4403 "nsr_id": nsr_id,
4404 "nslcmop_id": nslcmop_id,
4405 "operationState": nslcmop_operation_state,
4406 "autoremove": autoremove,
4407 },
4408 loop=self.loop,
4409 )
4410 except Exception as e:
4411 self.logger.error(
4412 logging_text + "kafka_write notification Exception {}".format(e)
4413 )
4414
4415 self.logger.debug(logging_text + "Exit")
4416 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4417
4418 async def _wait_for_tasks(
4419 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4420 ):
4421 time_start = time()
4422 error_detail_list = []
4423 error_list = []
4424 pending_tasks = list(created_tasks_info.keys())
4425 num_tasks = len(pending_tasks)
4426 num_done = 0
4427 stage[1] = "{}/{}.".format(num_done, num_tasks)
4428 self._write_op_status(nslcmop_id, stage)
4429 while pending_tasks:
4430 new_error = None
4431 _timeout = timeout + time_start - time()
4432 done, pending_tasks = await asyncio.wait(
4433 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4434 )
4435 num_done += len(done)
4436 if not done: # Timeout
4437 for task in pending_tasks:
4438 new_error = created_tasks_info[task] + ": Timeout"
4439 error_detail_list.append(new_error)
4440 error_list.append(new_error)
4441 break
4442 for task in done:
4443 if task.cancelled():
4444 exc = "Cancelled"
4445 else:
4446 exc = task.exception()
4447 if exc:
4448 if isinstance(exc, asyncio.TimeoutError):
4449 exc = "Timeout"
4450 new_error = created_tasks_info[task] + ": {}".format(exc)
4451 error_list.append(created_tasks_info[task])
4452 error_detail_list.append(new_error)
4453 if isinstance(
4454 exc,
4455 (
4456 str,
4457 DbException,
4458 N2VCException,
4459 ROclient.ROClientException,
4460 LcmException,
4461 K8sException,
4462 NgRoException,
4463 ),
4464 ):
4465 self.logger.error(logging_text + new_error)
4466 else:
4467 exc_traceback = "".join(
4468 traceback.format_exception(None, exc, exc.__traceback__)
4469 )
4470 self.logger.error(
4471 logging_text
4472 + created_tasks_info[task]
4473 + " "
4474 + exc_traceback
4475 )
4476 else:
4477 self.logger.debug(
4478 logging_text + created_tasks_info[task] + ": Done"
4479 )
4480 stage[1] = "{}/{}.".format(num_done, num_tasks)
4481 if new_error:
4482 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4483 if nsr_id: # update also nsr
4484 self.update_db_2(
4485 "nsrs",
4486 nsr_id,
4487 {
4488 "errorDescription": "Error at: " + ", ".join(error_list),
4489 "errorDetail": ". ".join(error_detail_list),
4490 },
4491 )
4492 self._write_op_status(nslcmop_id, stage)
4493 return error_detail_list
4494
4495 @staticmethod
4496 def _map_primitive_params(primitive_desc, params, instantiation_params):
4497 """
4498 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4499 The default-value is used. If it is between < > it look for a value at instantiation_params
4500 :param primitive_desc: portion of VNFD/NSD that describes primitive
4501 :param params: Params provided by user
4502 :param instantiation_params: Instantiation params provided by user
4503 :return: a dictionary with the calculated params
4504 """
4505 calculated_params = {}
4506 for parameter in primitive_desc.get("parameter", ()):
4507 param_name = parameter["name"]
4508 if param_name in params:
4509 calculated_params[param_name] = params[param_name]
4510 elif "default-value" in parameter or "value" in parameter:
4511 if "value" in parameter:
4512 calculated_params[param_name] = parameter["value"]
4513 else:
4514 calculated_params[param_name] = parameter["default-value"]
4515 if (
4516 isinstance(calculated_params[param_name], str)
4517 and calculated_params[param_name].startswith("<")
4518 and calculated_params[param_name].endswith(">")
4519 ):
4520 if calculated_params[param_name][1:-1] in instantiation_params:
4521 calculated_params[param_name] = instantiation_params[
4522 calculated_params[param_name][1:-1]
4523 ]
4524 else:
4525 raise LcmException(
4526 "Parameter {} needed to execute primitive {} not provided".format(
4527 calculated_params[param_name], primitive_desc["name"]
4528 )
4529 )
4530 else:
4531 raise LcmException(
4532 "Parameter {} needed to execute primitive {} not provided".format(
4533 param_name, primitive_desc["name"]
4534 )
4535 )
4536
4537 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4538 calculated_params[param_name] = yaml.safe_dump(
4539 calculated_params[param_name], default_flow_style=True, width=256
4540 )
4541 elif isinstance(calculated_params[param_name], str) and calculated_params[
4542 param_name
4543 ].startswith("!!yaml "):
4544 calculated_params[param_name] = calculated_params[param_name][7:]
4545 if parameter.get("data-type") == "INTEGER":
4546 try:
4547 calculated_params[param_name] = int(calculated_params[param_name])
4548 except ValueError: # error converting string to int
4549 raise LcmException(
4550 "Parameter {} of primitive {} must be integer".format(
4551 param_name, primitive_desc["name"]
4552 )
4553 )
4554 elif parameter.get("data-type") == "BOOLEAN":
4555 calculated_params[param_name] = not (
4556 (str(calculated_params[param_name])).lower() == "false"
4557 )
4558
4559 # add always ns_config_info if primitive name is config
4560 if primitive_desc["name"] == "config":
4561 if "ns_config_info" in instantiation_params:
4562 calculated_params["ns_config_info"] = instantiation_params[
4563 "ns_config_info"
4564 ]
4565 return calculated_params
4566
4567 def _look_for_deployed_vca(
4568 self,
4569 deployed_vca,
4570 member_vnf_index,
4571 vdu_id,
4572 vdu_count_index,
4573 kdu_name=None,
4574 ee_descriptor_id=None,
4575 ):
4576 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4577 for vca in deployed_vca:
4578 if not vca:
4579 continue
4580 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4581 continue
4582 if (
4583 vdu_count_index is not None
4584 and vdu_count_index != vca["vdu_count_index"]
4585 ):
4586 continue
4587 if kdu_name and kdu_name != vca["kdu_name"]:
4588 continue
4589 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4590 continue
4591 break
4592 else:
4593 # vca_deployed not found
4594 raise LcmException(
4595 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4596 " is not deployed".format(
4597 member_vnf_index,
4598 vdu_id,
4599 vdu_count_index,
4600 kdu_name,
4601 ee_descriptor_id,
4602 )
4603 )
4604 # get ee_id
4605 ee_id = vca.get("ee_id")
4606 vca_type = vca.get(
4607 "type", "lxc_proxy_charm"
4608 ) # default value for backward compatibility - proxy charm
4609 if not ee_id:
4610 raise LcmException(
4611 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4612 "execution environment".format(
4613 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4614 )
4615 )
4616 return ee_id, vca_type
4617
4618 async def _ns_execute_primitive(
4619 self,
4620 ee_id,
4621 primitive,
4622 primitive_params,
4623 retries=0,
4624 retries_interval=30,
4625 timeout=None,
4626 vca_type=None,
4627 db_dict=None,
4628 vca_id: str = None,
4629 ) -> (str, str):
4630 try:
4631 if primitive == "config":
4632 primitive_params = {"params": primitive_params}
4633
4634 vca_type = vca_type or "lxc_proxy_charm"
4635
4636 while retries >= 0:
4637 try:
4638 output = await asyncio.wait_for(
4639 self.vca_map[vca_type].exec_primitive(
4640 ee_id=ee_id,
4641 primitive_name=primitive,
4642 params_dict=primitive_params,
4643 progress_timeout=self.timeout_progress_primitive,
4644 total_timeout=self.timeout_primitive,
4645 db_dict=db_dict,
4646 vca_id=vca_id,
4647 vca_type=vca_type,
4648 ),
4649 timeout=timeout or self.timeout_primitive,
4650 )
4651 # execution was OK
4652 break
4653 except asyncio.CancelledError:
4654 raise
4655 except Exception as e: # asyncio.TimeoutError
4656 if isinstance(e, asyncio.TimeoutError):
4657 e = "Timeout"
4658 retries -= 1
4659 if retries >= 0:
4660 self.logger.debug(
4661 "Error executing action {} on {} -> {}".format(
4662 primitive, ee_id, e
4663 )
4664 )
4665 # wait and retry
4666 await asyncio.sleep(retries_interval, loop=self.loop)
4667 else:
4668 return "FAILED", str(e)
4669
4670 return "COMPLETED", output
4671
4672 except (LcmException, asyncio.CancelledError):
4673 raise
4674 except Exception as e:
4675 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4676
4677 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4678 """
4679 Updating the vca_status with latest juju information in nsrs record
4680 :param: nsr_id: Id of the nsr
4681 :param: nslcmop_id: Id of the nslcmop
4682 :return: None
4683 """
4684
4685 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4686 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4687 vca_id = self.get_vca_id({}, db_nsr)
4688 if db_nsr["_admin"]["deployed"]["K8s"]:
4689 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4690 cluster_uuid, kdu_instance, cluster_type = (
4691 k8s["k8scluster-uuid"],
4692 k8s["kdu-instance"],
4693 k8s["k8scluster-type"],
4694 )
4695 await self._on_update_k8s_db(
4696 cluster_uuid=cluster_uuid,
4697 kdu_instance=kdu_instance,
4698 filter={"_id": nsr_id},
4699 vca_id=vca_id,
4700 cluster_type=cluster_type,
4701 )
4702 else:
4703 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4704 table, filter = "nsrs", {"_id": nsr_id}
4705 path = "_admin.deployed.VCA.{}.".format(vca_index)
4706 await self._on_update_n2vc_db(table, filter, path, {})
4707
4708 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4709 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4710
4711 async def action(self, nsr_id, nslcmop_id):
4712 # Try to lock HA task here
4713 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4714 if not task_is_locked_by_me:
4715 return
4716
4717 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4718 self.logger.debug(logging_text + "Enter")
4719 # get all needed from database
4720 db_nsr = None
4721 db_nslcmop = None
4722 db_nsr_update = {}
4723 db_nslcmop_update = {}
4724 nslcmop_operation_state = None
4725 error_description_nslcmop = None
4726 exc = None
4727 try:
4728 # wait for any previous tasks in process
4729 step = "Waiting for previous operations to terminate"
4730 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4731
4732 self._write_ns_status(
4733 nsr_id=nsr_id,
4734 ns_state=None,
4735 current_operation="RUNNING ACTION",
4736 current_operation_id=nslcmop_id,
4737 )
4738
4739 step = "Getting information from database"
4740 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4741 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4742 if db_nslcmop["operationParams"].get("primitive_params"):
4743 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4744 db_nslcmop["operationParams"]["primitive_params"]
4745 )
4746
4747 nsr_deployed = db_nsr["_admin"].get("deployed")
4748 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4749 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4750 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4751 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4752 primitive = db_nslcmop["operationParams"]["primitive"]
4753 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4754 timeout_ns_action = db_nslcmop["operationParams"].get(
4755 "timeout_ns_action", self.timeout_primitive
4756 )
4757
4758 if vnf_index:
4759 step = "Getting vnfr from database"
4760 db_vnfr = self.db.get_one(
4761 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4762 )
4763 if db_vnfr.get("kdur"):
4764 kdur_list = []
4765 for kdur in db_vnfr["kdur"]:
4766 if kdur.get("additionalParams"):
4767 kdur["additionalParams"] = json.loads(
4768 kdur["additionalParams"]
4769 )
4770 kdur_list.append(kdur)
4771 db_vnfr["kdur"] = kdur_list
4772 step = "Getting vnfd from database"
4773 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4774 else:
4775 step = "Getting nsd from database"
4776 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4777
4778 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4779 # for backward compatibility
4780 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4781 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4782 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4783 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4784
4785 # look for primitive
4786 config_primitive_desc = descriptor_configuration = None
4787 if vdu_id:
4788 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4789 elif kdu_name:
4790 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4791 elif vnf_index:
4792 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4793 else:
4794 descriptor_configuration = db_nsd.get("ns-configuration")
4795
4796 if descriptor_configuration and descriptor_configuration.get(
4797 "config-primitive"
4798 ):
4799 for config_primitive in descriptor_configuration["config-primitive"]:
4800 if config_primitive["name"] == primitive:
4801 config_primitive_desc = config_primitive
4802 break
4803
4804 if not config_primitive_desc:
4805 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4806 raise LcmException(
4807 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4808 primitive
4809 )
4810 )
4811 primitive_name = primitive
4812 ee_descriptor_id = None
4813 else:
4814 primitive_name = config_primitive_desc.get(
4815 "execution-environment-primitive", primitive
4816 )
4817 ee_descriptor_id = config_primitive_desc.get(
4818 "execution-environment-ref"
4819 )
4820
4821 if vnf_index:
4822 if vdu_id:
4823 vdur = next(
4824 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4825 )
4826 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4827 elif kdu_name:
4828 kdur = next(
4829 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4830 )
4831 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4832 else:
4833 desc_params = parse_yaml_strings(
4834 db_vnfr.get("additionalParamsForVnf")
4835 )
4836 else:
4837 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4838 if kdu_name and get_configuration(db_vnfd, kdu_name):
4839 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4840 actions = set()
4841 for primitive in kdu_configuration.get("initial-config-primitive", []):
4842 actions.add(primitive["name"])
4843 for primitive in kdu_configuration.get("config-primitive", []):
4844 actions.add(primitive["name"])
4845 kdu_action = True if primitive_name in actions else False
4846
4847 # TODO check if ns is in a proper status
4848 if kdu_name and (
4849 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4850 ):
4851 # kdur and desc_params already set from before
4852 if primitive_params:
4853 desc_params.update(primitive_params)
4854 # TODO Check if we will need something at vnf level
4855 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4856 if (
4857 kdu_name == kdu["kdu-name"]
4858 and kdu["member-vnf-index"] == vnf_index
4859 ):
4860 break
4861 else:
4862 raise LcmException(
4863 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4864 )
4865
4866 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4867 msg = "unknown k8scluster-type '{}'".format(
4868 kdu.get("k8scluster-type")
4869 )
4870 raise LcmException(msg)
4871
4872 db_dict = {
4873 "collection": "nsrs",
4874 "filter": {"_id": nsr_id},
4875 "path": "_admin.deployed.K8s.{}".format(index),
4876 }
4877 self.logger.debug(
4878 logging_text
4879 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4880 )
4881 step = "Executing kdu {}".format(primitive_name)
4882 if primitive_name == "upgrade":
4883 if desc_params.get("kdu_model"):
4884 kdu_model = desc_params.get("kdu_model")
4885 del desc_params["kdu_model"]
4886 else:
4887 kdu_model = kdu.get("kdu-model")
4888 parts = kdu_model.split(sep=":")
4889 if len(parts) == 2:
4890 kdu_model = parts[0]
4891
4892 detailed_status = await asyncio.wait_for(
4893 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4894 cluster_uuid=kdu.get("k8scluster-uuid"),
4895 kdu_instance=kdu.get("kdu-instance"),
4896 atomic=True,
4897 kdu_model=kdu_model,
4898 params=desc_params,
4899 db_dict=db_dict,
4900 timeout=timeout_ns_action,
4901 ),
4902 timeout=timeout_ns_action + 10,
4903 )
4904 self.logger.debug(
4905 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4906 )
4907 elif primitive_name == "rollback":
4908 detailed_status = await asyncio.wait_for(
4909 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4910 cluster_uuid=kdu.get("k8scluster-uuid"),
4911 kdu_instance=kdu.get("kdu-instance"),
4912 db_dict=db_dict,
4913 ),
4914 timeout=timeout_ns_action,
4915 )
4916 elif primitive_name == "status":
4917 detailed_status = await asyncio.wait_for(
4918 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4919 cluster_uuid=kdu.get("k8scluster-uuid"),
4920 kdu_instance=kdu.get("kdu-instance"),
4921 vca_id=vca_id,
4922 ),
4923 timeout=timeout_ns_action,
4924 )
4925 else:
4926 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4927 kdu["kdu-name"], nsr_id
4928 )
4929 params = self._map_primitive_params(
4930 config_primitive_desc, primitive_params, desc_params
4931 )
4932
4933 detailed_status = await asyncio.wait_for(
4934 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4935 cluster_uuid=kdu.get("k8scluster-uuid"),
4936 kdu_instance=kdu_instance,
4937 primitive_name=primitive_name,
4938 params=params,
4939 db_dict=db_dict,
4940 timeout=timeout_ns_action,
4941 vca_id=vca_id,
4942 ),
4943 timeout=timeout_ns_action,
4944 )
4945
4946 if detailed_status:
4947 nslcmop_operation_state = "COMPLETED"
4948 else:
4949 detailed_status = ""
4950 nslcmop_operation_state = "FAILED"
4951 else:
4952 ee_id, vca_type = self._look_for_deployed_vca(
4953 nsr_deployed["VCA"],
4954 member_vnf_index=vnf_index,
4955 vdu_id=vdu_id,
4956 vdu_count_index=vdu_count_index,
4957 ee_descriptor_id=ee_descriptor_id,
4958 )
4959 for vca_index, vca_deployed in enumerate(
4960 db_nsr["_admin"]["deployed"]["VCA"]
4961 ):
4962 if vca_deployed.get("member-vnf-index") == vnf_index:
4963 db_dict = {
4964 "collection": "nsrs",
4965 "filter": {"_id": nsr_id},
4966 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4967 }
4968 break
4969 (
4970 nslcmop_operation_state,
4971 detailed_status,
4972 ) = await self._ns_execute_primitive(
4973 ee_id,
4974 primitive=primitive_name,
4975 primitive_params=self._map_primitive_params(
4976 config_primitive_desc, primitive_params, desc_params
4977 ),
4978 timeout=timeout_ns_action,
4979 vca_type=vca_type,
4980 db_dict=db_dict,
4981 vca_id=vca_id,
4982 )
4983
4984 db_nslcmop_update["detailed-status"] = detailed_status
4985 error_description_nslcmop = (
4986 detailed_status if nslcmop_operation_state == "FAILED" else ""
4987 )
4988 self.logger.debug(
4989 logging_text
4990 + " task Done with result {} {}".format(
4991 nslcmop_operation_state, detailed_status
4992 )
4993 )
4994 return # database update is called inside finally
4995
4996 except (DbException, LcmException, N2VCException, K8sException) as e:
4997 self.logger.error(logging_text + "Exit Exception {}".format(e))
4998 exc = e
4999 except asyncio.CancelledError:
5000 self.logger.error(
5001 logging_text + "Cancelled Exception while '{}'".format(step)
5002 )
5003 exc = "Operation was cancelled"
5004 except asyncio.TimeoutError:
5005 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5006 exc = "Timeout"
5007 except Exception as e:
5008 exc = traceback.format_exc()
5009 self.logger.critical(
5010 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5011 exc_info=True,
5012 )
5013 finally:
5014 if exc:
5015 db_nslcmop_update[
5016 "detailed-status"
5017 ] = (
5018 detailed_status
5019 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5020 nslcmop_operation_state = "FAILED"
5021 if db_nsr:
5022 self._write_ns_status(
5023 nsr_id=nsr_id,
5024 ns_state=db_nsr[
5025 "nsState"
5026 ], # TODO check if degraded. For the moment use previous status
5027 current_operation="IDLE",
5028 current_operation_id=None,
5029 # error_description=error_description_nsr,
5030 # error_detail=error_detail,
5031 other_update=db_nsr_update,
5032 )
5033
5034 self._write_op_status(
5035 op_id=nslcmop_id,
5036 stage="",
5037 error_message=error_description_nslcmop,
5038 operation_state=nslcmop_operation_state,
5039 other_update=db_nslcmop_update,
5040 )
5041
5042 if nslcmop_operation_state:
5043 try:
5044 await self.msg.aiowrite(
5045 "ns",
5046 "actioned",
5047 {
5048 "nsr_id": nsr_id,
5049 "nslcmop_id": nslcmop_id,
5050 "operationState": nslcmop_operation_state,
5051 },
5052 loop=self.loop,
5053 )
5054 except Exception as e:
5055 self.logger.error(
5056 logging_text + "kafka_write notification Exception {}".format(e)
5057 )
5058 self.logger.debug(logging_text + "Exit")
5059 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5060 return nslcmop_operation_state, detailed_status
5061
5062 async def scale(self, nsr_id, nslcmop_id):
5063 # Try to lock HA task here
5064 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5065 if not task_is_locked_by_me:
5066 return
5067
5068 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5069 stage = ["", "", ""]
5070 tasks_dict_info = {}
5071 # ^ stage, step, VIM progress
5072 self.logger.debug(logging_text + "Enter")
5073 # get all needed from database
5074 db_nsr = None
5075 db_nslcmop_update = {}
5076 db_nsr_update = {}
5077 exc = None
5078 # in case of error, indicates what part of scale was failed to put nsr at error status
5079 scale_process = None
5080 old_operational_status = ""
5081 old_config_status = ""
5082 nsi_id = None
5083 try:
5084 # wait for any previous tasks in process
5085 step = "Waiting for previous operations to terminate"
5086 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5087 self._write_ns_status(
5088 nsr_id=nsr_id,
5089 ns_state=None,
5090 current_operation="SCALING",
5091 current_operation_id=nslcmop_id,
5092 )
5093
5094 step = "Getting nslcmop from database"
5095 self.logger.debug(
5096 step + " after having waited for previous tasks to be completed"
5097 )
5098 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5099
5100 step = "Getting nsr from database"
5101 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5102 old_operational_status = db_nsr["operational-status"]
5103 old_config_status = db_nsr["config-status"]
5104
5105 step = "Parsing scaling parameters"
5106 db_nsr_update["operational-status"] = "scaling"
5107 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5108 nsr_deployed = db_nsr["_admin"].get("deployed")
5109
5110 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5111 "scaleByStepData"
5112 ]["member-vnf-index"]
5113 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5114 "scaleByStepData"
5115 ]["scaling-group-descriptor"]
5116 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5117 # for backward compatibility
5118 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5119 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5120 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5121 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5122
5123 step = "Getting vnfr from database"
5124 db_vnfr = self.db.get_one(
5125 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5126 )
5127
5128 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5129
5130 step = "Getting vnfd from database"
5131 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5132
5133 base_folder = db_vnfd["_admin"]["storage"]
5134
5135 step = "Getting scaling-group-descriptor"
5136 scaling_descriptor = find_in_list(
5137 get_scaling_aspect(db_vnfd),
5138 lambda scale_desc: scale_desc["name"] == scaling_group,
5139 )
5140 if not scaling_descriptor:
5141 raise LcmException(
5142 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5143 "at vnfd:scaling-group-descriptor".format(scaling_group)
5144 )
5145
5146 step = "Sending scale order to VIM"
5147 # TODO check if ns is in a proper status
5148 nb_scale_op = 0
5149 if not db_nsr["_admin"].get("scaling-group"):
5150 self.update_db_2(
5151 "nsrs",
5152 nsr_id,
5153 {
5154 "_admin.scaling-group": [
5155 {"name": scaling_group, "nb-scale-op": 0}
5156 ]
5157 },
5158 )
5159 admin_scale_index = 0
5160 else:
5161 for admin_scale_index, admin_scale_info in enumerate(
5162 db_nsr["_admin"]["scaling-group"]
5163 ):
5164 if admin_scale_info["name"] == scaling_group:
5165 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5166 break
5167 else: # not found, set index one plus last element and add new entry with the name
5168 admin_scale_index += 1
5169 db_nsr_update[
5170 "_admin.scaling-group.{}.name".format(admin_scale_index)
5171 ] = scaling_group
5172
5173 vca_scaling_info = []
5174 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5175 if scaling_type == "SCALE_OUT":
5176 if "aspect-delta-details" not in scaling_descriptor:
5177 raise LcmException(
5178 "Aspect delta details not fount in scaling descriptor {}".format(
5179 scaling_descriptor["name"]
5180 )
5181 )
5182 # count if max-instance-count is reached
5183 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5184
5185 scaling_info["scaling_direction"] = "OUT"
5186 scaling_info["vdu-create"] = {}
5187 scaling_info["kdu-create"] = {}
5188 for delta in deltas:
5189 for vdu_delta in delta.get("vdu-delta", {}):
5190 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5191 # vdu_index also provides the number of instance of the targeted vdu
5192 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5193 cloud_init_text = self._get_vdu_cloud_init_content(
5194 vdud, db_vnfd
5195 )
5196 if cloud_init_text:
5197 additional_params = (
5198 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5199 or {}
5200 )
5201 cloud_init_list = []
5202
5203 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5204 max_instance_count = 10
5205 if vdu_profile and "max-number-of-instances" in vdu_profile:
5206 max_instance_count = vdu_profile.get(
5207 "max-number-of-instances", 10
5208 )
5209
5210 default_instance_num = get_number_of_instances(
5211 db_vnfd, vdud["id"]
5212 )
5213 instances_number = vdu_delta.get("number-of-instances", 1)
5214 nb_scale_op += instances_number
5215
5216 new_instance_count = nb_scale_op + default_instance_num
5217 # Control if new count is over max and vdu count is less than max.
5218 # Then assign new instance count
5219 if new_instance_count > max_instance_count > vdu_count:
5220 instances_number = new_instance_count - max_instance_count
5221 else:
5222 instances_number = instances_number
5223
5224 if new_instance_count > max_instance_count:
5225 raise LcmException(
5226 "reached the limit of {} (max-instance-count) "
5227 "scaling-out operations for the "
5228 "scaling-group-descriptor '{}'".format(
5229 nb_scale_op, scaling_group
5230 )
5231 )
5232 for x in range(vdu_delta.get("number-of-instances", 1)):
5233 if cloud_init_text:
5234 # TODO Information of its own ip is not available because db_vnfr is not updated.
5235 additional_params["OSM"] = get_osm_params(
5236 db_vnfr, vdu_delta["id"], vdu_index + x
5237 )
5238 cloud_init_list.append(
5239 self._parse_cloud_init(
5240 cloud_init_text,
5241 additional_params,
5242 db_vnfd["id"],
5243 vdud["id"],
5244 )
5245 )
5246 vca_scaling_info.append(
5247 {
5248 "osm_vdu_id": vdu_delta["id"],
5249 "member-vnf-index": vnf_index,
5250 "type": "create",
5251 "vdu_index": vdu_index + x,
5252 }
5253 )
5254 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5255 for kdu_delta in delta.get("kdu-resource-delta", {}):
5256 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5257 kdu_name = kdu_profile["kdu-name"]
5258 resource_name = kdu_profile["resource-name"]
5259
5260 # Might have different kdus in the same delta
5261 # Should have list for each kdu
5262 if not scaling_info["kdu-create"].get(kdu_name, None):
5263 scaling_info["kdu-create"][kdu_name] = []
5264
5265 kdur = get_kdur(db_vnfr, kdu_name)
5266 if kdur.get("helm-chart"):
5267 k8s_cluster_type = "helm-chart-v3"
5268 self.logger.debug("kdur: {}".format(kdur))
5269 if (
5270 kdur.get("helm-version")
5271 and kdur.get("helm-version") == "v2"
5272 ):
5273 k8s_cluster_type = "helm-chart"
5274 raise NotImplementedError
5275 elif kdur.get("juju-bundle"):
5276 k8s_cluster_type = "juju-bundle"
5277 else:
5278 raise LcmException(
5279 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5280 "juju-bundle. Maybe an old NBI version is running".format(
5281 db_vnfr["member-vnf-index-ref"], kdu_name
5282 )
5283 )
5284
5285 max_instance_count = 10
5286 if kdu_profile and "max-number-of-instances" in kdu_profile:
5287 max_instance_count = kdu_profile.get(
5288 "max-number-of-instances", 10
5289 )
5290
5291 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5292 deployed_kdu, _ = get_deployed_kdu(
5293 nsr_deployed, kdu_name, vnf_index
5294 )
5295 if deployed_kdu is None:
5296 raise LcmException(
5297 "KDU '{}' for vnf '{}' not deployed".format(
5298 kdu_name, vnf_index
5299 )
5300 )
5301 kdu_instance = deployed_kdu.get("kdu-instance")
5302 instance_num = await self.k8scluster_map[
5303 k8s_cluster_type
5304 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5305 kdu_replica_count = instance_num + kdu_delta.get(
5306 "number-of-instances", 1
5307 )
5308
5309 # Control if new count is over max and instance_num is less than max.
5310 # Then assign max instance number to kdu replica count
5311 if kdu_replica_count > max_instance_count > instance_num:
5312 kdu_replica_count = max_instance_count
5313 if kdu_replica_count > max_instance_count:
5314 raise LcmException(
5315 "reached the limit of {} (max-instance-count) "
5316 "scaling-out operations for the "
5317 "scaling-group-descriptor '{}'".format(
5318 instance_num, scaling_group
5319 )
5320 )
5321
5322 for x in range(kdu_delta.get("number-of-instances", 1)):
5323 vca_scaling_info.append(
5324 {
5325 "osm_kdu_id": kdu_name,
5326 "member-vnf-index": vnf_index,
5327 "type": "create",
5328 "kdu_index": instance_num + x - 1,
5329 }
5330 )
5331 scaling_info["kdu-create"][kdu_name].append(
5332 {
5333 "member-vnf-index": vnf_index,
5334 "type": "create",
5335 "k8s-cluster-type": k8s_cluster_type,
5336 "resource-name": resource_name,
5337 "scale": kdu_replica_count,
5338 }
5339 )
5340 elif scaling_type == "SCALE_IN":
5341 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5342
5343 scaling_info["scaling_direction"] = "IN"
5344 scaling_info["vdu-delete"] = {}
5345 scaling_info["kdu-delete"] = {}
5346
5347 for delta in deltas:
5348 for vdu_delta in delta.get("vdu-delta", {}):
5349 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5350 min_instance_count = 0
5351 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5352 if vdu_profile and "min-number-of-instances" in vdu_profile:
5353 min_instance_count = vdu_profile["min-number-of-instances"]
5354
5355 default_instance_num = get_number_of_instances(
5356 db_vnfd, vdu_delta["id"]
5357 )
5358 instance_num = vdu_delta.get("number-of-instances", 1)
5359 nb_scale_op -= instance_num
5360
5361 new_instance_count = nb_scale_op + default_instance_num
5362
5363 if new_instance_count < min_instance_count < vdu_count:
5364 instances_number = min_instance_count - new_instance_count
5365 else:
5366 instances_number = instance_num
5367
5368 if new_instance_count < min_instance_count:
5369 raise LcmException(
5370 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5371 "scaling-group-descriptor '{}'".format(
5372 nb_scale_op, scaling_group
5373 )
5374 )
5375 for x in range(vdu_delta.get("number-of-instances", 1)):
5376 vca_scaling_info.append(
5377 {
5378 "osm_vdu_id": vdu_delta["id"],
5379 "member-vnf-index": vnf_index,
5380 "type": "delete",
5381 "vdu_index": vdu_index - 1 - x,
5382 }
5383 )
5384 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5385 for kdu_delta in delta.get("kdu-resource-delta", {}):
5386 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5387 kdu_name = kdu_profile["kdu-name"]
5388 resource_name = kdu_profile["resource-name"]
5389
5390 if not scaling_info["kdu-delete"].get(kdu_name, None):
5391 scaling_info["kdu-delete"][kdu_name] = []
5392
5393 kdur = get_kdur(db_vnfr, kdu_name)
5394 if kdur.get("helm-chart"):
5395 k8s_cluster_type = "helm-chart-v3"
5396 self.logger.debug("kdur: {}".format(kdur))
5397 if (
5398 kdur.get("helm-version")
5399 and kdur.get("helm-version") == "v2"
5400 ):
5401 k8s_cluster_type = "helm-chart"
5402 raise NotImplementedError
5403 elif kdur.get("juju-bundle"):
5404 k8s_cluster_type = "juju-bundle"
5405 else:
5406 raise LcmException(
5407 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5408 "juju-bundle. Maybe an old NBI version is running".format(
5409 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5410 )
5411 )
5412
5413 min_instance_count = 0
5414 if kdu_profile and "min-number-of-instances" in kdu_profile:
5415 min_instance_count = kdu_profile["min-number-of-instances"]
5416
5417 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5418 deployed_kdu, _ = get_deployed_kdu(
5419 nsr_deployed, kdu_name, vnf_index
5420 )
5421 if deployed_kdu is None:
5422 raise LcmException(
5423 "KDU '{}' for vnf '{}' not deployed".format(
5424 kdu_name, vnf_index
5425 )
5426 )
5427 kdu_instance = deployed_kdu.get("kdu-instance")
5428 instance_num = await self.k8scluster_map[
5429 k8s_cluster_type
5430 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5431 kdu_replica_count = instance_num - kdu_delta.get(
5432 "number-of-instances", 1
5433 )
5434
5435 if kdu_replica_count < min_instance_count < instance_num:
5436 kdu_replica_count = min_instance_count
5437 if kdu_replica_count < min_instance_count:
5438 raise LcmException(
5439 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5440 "scaling-group-descriptor '{}'".format(
5441 instance_num, scaling_group
5442 )
5443 )
5444
5445 for x in range(kdu_delta.get("number-of-instances", 1)):
5446 vca_scaling_info.append(
5447 {
5448 "osm_kdu_id": kdu_name,
5449 "member-vnf-index": vnf_index,
5450 "type": "delete",
5451 "kdu_index": instance_num - x - 1,
5452 }
5453 )
5454 scaling_info["kdu-delete"][kdu_name].append(
5455 {
5456 "member-vnf-index": vnf_index,
5457 "type": "delete",
5458 "k8s-cluster-type": k8s_cluster_type,
5459 "resource-name": resource_name,
5460 "scale": kdu_replica_count,
5461 }
5462 )
5463
5464 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5465 vdu_delete = copy(scaling_info.get("vdu-delete"))
5466 if scaling_info["scaling_direction"] == "IN":
5467 for vdur in reversed(db_vnfr["vdur"]):
5468 if vdu_delete.get(vdur["vdu-id-ref"]):
5469 vdu_delete[vdur["vdu-id-ref"]] -= 1
5470 scaling_info["vdu"].append(
5471 {
5472 "name": vdur.get("name") or vdur.get("vdu-name"),
5473 "vdu_id": vdur["vdu-id-ref"],
5474 "interface": [],
5475 }
5476 )
5477 for interface in vdur["interfaces"]:
5478 scaling_info["vdu"][-1]["interface"].append(
5479 {
5480 "name": interface["name"],
5481 "ip_address": interface["ip-address"],
5482 "mac_address": interface.get("mac-address"),
5483 }
5484 )
5485 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5486
5487 # PRE-SCALE BEGIN
5488 step = "Executing pre-scale vnf-config-primitive"
5489 if scaling_descriptor.get("scaling-config-action"):
5490 for scaling_config_action in scaling_descriptor[
5491 "scaling-config-action"
5492 ]:
5493 if (
5494 scaling_config_action.get("trigger") == "pre-scale-in"
5495 and scaling_type == "SCALE_IN"
5496 ) or (
5497 scaling_config_action.get("trigger") == "pre-scale-out"
5498 and scaling_type == "SCALE_OUT"
5499 ):
5500 vnf_config_primitive = scaling_config_action[
5501 "vnf-config-primitive-name-ref"
5502 ]
5503 step = db_nslcmop_update[
5504 "detailed-status"
5505 ] = "executing pre-scale scaling-config-action '{}'".format(
5506 vnf_config_primitive
5507 )
5508
5509 # look for primitive
5510 for config_primitive in (
5511 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5512 ).get("config-primitive", ()):
5513 if config_primitive["name"] == vnf_config_primitive:
5514 break
5515 else:
5516 raise LcmException(
5517 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5518 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5519 "primitive".format(scaling_group, vnf_config_primitive)
5520 )
5521
5522 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5523 if db_vnfr.get("additionalParamsForVnf"):
5524 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5525
5526 scale_process = "VCA"
5527 db_nsr_update["config-status"] = "configuring pre-scaling"
5528 primitive_params = self._map_primitive_params(
5529 config_primitive, {}, vnfr_params
5530 )
5531
5532 # Pre-scale retry check: Check if this sub-operation has been executed before
5533 op_index = self._check_or_add_scale_suboperation(
5534 db_nslcmop,
5535 vnf_index,
5536 vnf_config_primitive,
5537 primitive_params,
5538 "PRE-SCALE",
5539 )
5540 if op_index == self.SUBOPERATION_STATUS_SKIP:
5541 # Skip sub-operation
5542 result = "COMPLETED"
5543 result_detail = "Done"
5544 self.logger.debug(
5545 logging_text
5546 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5547 vnf_config_primitive, result, result_detail
5548 )
5549 )
5550 else:
5551 if op_index == self.SUBOPERATION_STATUS_NEW:
5552 # New sub-operation: Get index of this sub-operation
5553 op_index = (
5554 len(db_nslcmop.get("_admin", {}).get("operations"))
5555 - 1
5556 )
5557 self.logger.debug(
5558 logging_text
5559 + "vnf_config_primitive={} New sub-operation".format(
5560 vnf_config_primitive
5561 )
5562 )
5563 else:
5564 # retry: Get registered params for this existing sub-operation
5565 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5566 op_index
5567 ]
5568 vnf_index = op.get("member_vnf_index")
5569 vnf_config_primitive = op.get("primitive")
5570 primitive_params = op.get("primitive_params")
5571 self.logger.debug(
5572 logging_text
5573 + "vnf_config_primitive={} Sub-operation retry".format(
5574 vnf_config_primitive
5575 )
5576 )
5577 # Execute the primitive, either with new (first-time) or registered (reintent) args
5578 ee_descriptor_id = config_primitive.get(
5579 "execution-environment-ref"
5580 )
5581 primitive_name = config_primitive.get(
5582 "execution-environment-primitive", vnf_config_primitive
5583 )
5584 ee_id, vca_type = self._look_for_deployed_vca(
5585 nsr_deployed["VCA"],
5586 member_vnf_index=vnf_index,
5587 vdu_id=None,
5588 vdu_count_index=None,
5589 ee_descriptor_id=ee_descriptor_id,
5590 )
5591 result, result_detail = await self._ns_execute_primitive(
5592 ee_id,
5593 primitive_name,
5594 primitive_params,
5595 vca_type=vca_type,
5596 vca_id=vca_id,
5597 )
5598 self.logger.debug(
5599 logging_text
5600 + "vnf_config_primitive={} Done with result {} {}".format(
5601 vnf_config_primitive, result, result_detail
5602 )
5603 )
5604 # Update operationState = COMPLETED | FAILED
5605 self._update_suboperation_status(
5606 db_nslcmop, op_index, result, result_detail
5607 )
5608
5609 if result == "FAILED":
5610 raise LcmException(result_detail)
5611 db_nsr_update["config-status"] = old_config_status
5612 scale_process = None
5613 # PRE-SCALE END
5614
5615 db_nsr_update[
5616 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5617 ] = nb_scale_op
5618 db_nsr_update[
5619 "_admin.scaling-group.{}.time".format(admin_scale_index)
5620 ] = time()
5621
5622 # SCALE-IN VCA - BEGIN
5623 if vca_scaling_info:
5624 step = db_nslcmop_update[
5625 "detailed-status"
5626 ] = "Deleting the execution environments"
5627 scale_process = "VCA"
5628 for vca_info in vca_scaling_info:
5629 if vca_info["type"] == "delete":
5630 member_vnf_index = str(vca_info["member-vnf-index"])
5631 self.logger.debug(
5632 logging_text + "vdu info: {}".format(vca_info)
5633 )
5634 if vca_info.get("osm_vdu_id"):
5635 vdu_id = vca_info["osm_vdu_id"]
5636 vdu_index = int(vca_info["vdu_index"])
5637 stage[
5638 1
5639 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5640 member_vnf_index, vdu_id, vdu_index
5641 )
5642 else:
5643 vdu_index = 0
5644 kdu_id = vca_info["osm_kdu_id"]
5645 stage[
5646 1
5647 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5648 member_vnf_index, kdu_id, vdu_index
5649 )
5650 stage[2] = step = "Scaling in VCA"
5651 self._write_op_status(op_id=nslcmop_id, stage=stage)
5652 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5653 config_update = db_nsr["configurationStatus"]
5654 for vca_index, vca in enumerate(vca_update):
5655 if (
5656 (vca or vca.get("ee_id"))
5657 and vca["member-vnf-index"] == member_vnf_index
5658 and vca["vdu_count_index"] == vdu_index
5659 ):
5660 if vca.get("vdu_id"):
5661 config_descriptor = get_configuration(
5662 db_vnfd, vca.get("vdu_id")
5663 )
5664 elif vca.get("kdu_name"):
5665 config_descriptor = get_configuration(
5666 db_vnfd, vca.get("kdu_name")
5667 )
5668 else:
5669 config_descriptor = get_configuration(
5670 db_vnfd, db_vnfd["id"]
5671 )
5672 operation_params = (
5673 db_nslcmop.get("operationParams") or {}
5674 )
5675 exec_terminate_primitives = not operation_params.get(
5676 "skip_terminate_primitives"
5677 ) and vca.get("needed_terminate")
5678 task = asyncio.ensure_future(
5679 asyncio.wait_for(
5680 self.destroy_N2VC(
5681 logging_text,
5682 db_nslcmop,
5683 vca,
5684 config_descriptor,
5685 vca_index,
5686 destroy_ee=True,
5687 exec_primitives=exec_terminate_primitives,
5688 scaling_in=True,
5689 vca_id=vca_id,
5690 ),
5691 timeout=self.timeout_charm_delete,
5692 )
5693 )
5694 tasks_dict_info[task] = "Terminating VCA {}".format(
5695 vca.get("ee_id")
5696 )
5697 del vca_update[vca_index]
5698 del config_update[vca_index]
5699 # wait for pending tasks of terminate primitives
5700 if tasks_dict_info:
5701 self.logger.debug(
5702 logging_text
5703 + "Waiting for tasks {}".format(
5704 list(tasks_dict_info.keys())
5705 )
5706 )
5707 error_list = await self._wait_for_tasks(
5708 logging_text,
5709 tasks_dict_info,
5710 min(
5711 self.timeout_charm_delete, self.timeout_ns_terminate
5712 ),
5713 stage,
5714 nslcmop_id,
5715 )
5716 tasks_dict_info.clear()
5717 if error_list:
5718 raise LcmException("; ".join(error_list))
5719
5720 db_vca_and_config_update = {
5721 "_admin.deployed.VCA": vca_update,
5722 "configurationStatus": config_update,
5723 }
5724 self.update_db_2(
5725 "nsrs", db_nsr["_id"], db_vca_and_config_update
5726 )
5727 scale_process = None
5728 # SCALE-IN VCA - END
5729
5730 # SCALE RO - BEGIN
5731 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5732 scale_process = "RO"
5733 if self.ro_config.get("ng"):
5734 await self._scale_ng_ro(
5735 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5736 )
5737 scaling_info.pop("vdu-create", None)
5738 scaling_info.pop("vdu-delete", None)
5739
5740 scale_process = None
5741 # SCALE RO - END
5742
5743 # SCALE KDU - BEGIN
5744 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5745 scale_process = "KDU"
5746 await self._scale_kdu(
5747 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5748 )
5749 scaling_info.pop("kdu-create", None)
5750 scaling_info.pop("kdu-delete", None)
5751
5752 scale_process = None
5753 # SCALE KDU - END
5754
5755 if db_nsr_update:
5756 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5757
5758 # SCALE-UP VCA - BEGIN
5759 if vca_scaling_info:
5760 step = db_nslcmop_update[
5761 "detailed-status"
5762 ] = "Creating new execution environments"
5763 scale_process = "VCA"
5764 for vca_info in vca_scaling_info:
5765 if vca_info["type"] == "create":
5766 member_vnf_index = str(vca_info["member-vnf-index"])
5767 self.logger.debug(
5768 logging_text + "vdu info: {}".format(vca_info)
5769 )
5770 vnfd_id = db_vnfr["vnfd-ref"]
5771 if vca_info.get("osm_vdu_id"):
5772 vdu_index = int(vca_info["vdu_index"])
5773 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5774 if db_vnfr.get("additionalParamsForVnf"):
5775 deploy_params.update(
5776 parse_yaml_strings(
5777 db_vnfr["additionalParamsForVnf"].copy()
5778 )
5779 )
5780 descriptor_config = get_configuration(
5781 db_vnfd, db_vnfd["id"]
5782 )
5783 if descriptor_config:
5784 vdu_id = None
5785 vdu_name = None
5786 kdu_name = None
5787 self._deploy_n2vc(
5788 logging_text=logging_text
5789 + "member_vnf_index={} ".format(member_vnf_index),
5790 db_nsr=db_nsr,
5791 db_vnfr=db_vnfr,
5792 nslcmop_id=nslcmop_id,
5793 nsr_id=nsr_id,
5794 nsi_id=nsi_id,
5795 vnfd_id=vnfd_id,
5796 vdu_id=vdu_id,
5797 kdu_name=kdu_name,
5798 member_vnf_index=member_vnf_index,
5799 vdu_index=vdu_index,
5800 vdu_name=vdu_name,
5801 deploy_params=deploy_params,
5802 descriptor_config=descriptor_config,
5803 base_folder=base_folder,
5804 task_instantiation_info=tasks_dict_info,
5805 stage=stage,
5806 )
5807 vdu_id = vca_info["osm_vdu_id"]
5808 vdur = find_in_list(
5809 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5810 )
5811 descriptor_config = get_configuration(db_vnfd, vdu_id)
5812 if vdur.get("additionalParams"):
5813 deploy_params_vdu = parse_yaml_strings(
5814 vdur["additionalParams"]
5815 )
5816 else:
5817 deploy_params_vdu = deploy_params
5818 deploy_params_vdu["OSM"] = get_osm_params(
5819 db_vnfr, vdu_id, vdu_count_index=vdu_index
5820 )
5821 if descriptor_config:
5822 vdu_name = None
5823 kdu_name = None
5824 stage[
5825 1
5826 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5827 member_vnf_index, vdu_id, vdu_index
5828 )
5829 stage[2] = step = "Scaling out VCA"
5830 self._write_op_status(op_id=nslcmop_id, stage=stage)
5831 self._deploy_n2vc(
5832 logging_text=logging_text
5833 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5834 member_vnf_index, vdu_id, vdu_index
5835 ),
5836 db_nsr=db_nsr,
5837 db_vnfr=db_vnfr,
5838 nslcmop_id=nslcmop_id,
5839 nsr_id=nsr_id,
5840 nsi_id=nsi_id,
5841 vnfd_id=vnfd_id,
5842 vdu_id=vdu_id,
5843 kdu_name=kdu_name,
5844 member_vnf_index=member_vnf_index,
5845 vdu_index=vdu_index,
5846 vdu_name=vdu_name,
5847 deploy_params=deploy_params_vdu,
5848 descriptor_config=descriptor_config,
5849 base_folder=base_folder,
5850 task_instantiation_info=tasks_dict_info,
5851 stage=stage,
5852 )
5853 else:
5854 kdu_name = vca_info["osm_kdu_id"]
5855 descriptor_config = get_configuration(db_vnfd, kdu_name)
5856 if descriptor_config:
5857 vdu_id = None
5858 kdu_index = int(vca_info["kdu_index"])
5859 vdu_name = None
5860 kdur = next(
5861 x
5862 for x in db_vnfr["kdur"]
5863 if x["kdu-name"] == kdu_name
5864 )
5865 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5866 if kdur.get("additionalParams"):
5867 deploy_params_kdu = parse_yaml_strings(
5868 kdur["additionalParams"]
5869 )
5870
5871 self._deploy_n2vc(
5872 logging_text=logging_text,
5873 db_nsr=db_nsr,
5874 db_vnfr=db_vnfr,
5875 nslcmop_id=nslcmop_id,
5876 nsr_id=nsr_id,
5877 nsi_id=nsi_id,
5878 vnfd_id=vnfd_id,
5879 vdu_id=vdu_id,
5880 kdu_name=kdu_name,
5881 member_vnf_index=member_vnf_index,
5882 vdu_index=kdu_index,
5883 vdu_name=vdu_name,
5884 deploy_params=deploy_params_kdu,
5885 descriptor_config=descriptor_config,
5886 base_folder=base_folder,
5887 task_instantiation_info=tasks_dict_info,
5888 stage=stage,
5889 )
5890 # SCALE-UP VCA - END
5891 scale_process = None
5892
5893 # POST-SCALE BEGIN
5894 # execute primitive service POST-SCALING
5895 step = "Executing post-scale vnf-config-primitive"
5896 if scaling_descriptor.get("scaling-config-action"):
5897 for scaling_config_action in scaling_descriptor[
5898 "scaling-config-action"
5899 ]:
5900 if (
5901 scaling_config_action.get("trigger") == "post-scale-in"
5902 and scaling_type == "SCALE_IN"
5903 ) or (
5904 scaling_config_action.get("trigger") == "post-scale-out"
5905 and scaling_type == "SCALE_OUT"
5906 ):
5907 vnf_config_primitive = scaling_config_action[
5908 "vnf-config-primitive-name-ref"
5909 ]
5910 step = db_nslcmop_update[
5911 "detailed-status"
5912 ] = "executing post-scale scaling-config-action '{}'".format(
5913 vnf_config_primitive
5914 )
5915
5916 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5917 if db_vnfr.get("additionalParamsForVnf"):
5918 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5919
5920 # look for primitive
5921 for config_primitive in (
5922 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5923 ).get("config-primitive", ()):
5924 if config_primitive["name"] == vnf_config_primitive:
5925 break
5926 else:
5927 raise LcmException(
5928 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5929 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5930 "config-primitive".format(
5931 scaling_group, vnf_config_primitive
5932 )
5933 )
5934 scale_process = "VCA"
5935 db_nsr_update["config-status"] = "configuring post-scaling"
5936 primitive_params = self._map_primitive_params(
5937 config_primitive, {}, vnfr_params
5938 )
5939
5940 # Post-scale retry check: Check if this sub-operation has been executed before
5941 op_index = self._check_or_add_scale_suboperation(
5942 db_nslcmop,
5943 vnf_index,
5944 vnf_config_primitive,
5945 primitive_params,
5946 "POST-SCALE",
5947 )
5948 if op_index == self.SUBOPERATION_STATUS_SKIP:
5949 # Skip sub-operation
5950 result = "COMPLETED"
5951 result_detail = "Done"
5952 self.logger.debug(
5953 logging_text
5954 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5955 vnf_config_primitive, result, result_detail
5956 )
5957 )
5958 else:
5959 if op_index == self.SUBOPERATION_STATUS_NEW:
5960 # New sub-operation: Get index of this sub-operation
5961 op_index = (
5962 len(db_nslcmop.get("_admin", {}).get("operations"))
5963 - 1
5964 )
5965 self.logger.debug(
5966 logging_text
5967 + "vnf_config_primitive={} New sub-operation".format(
5968 vnf_config_primitive
5969 )
5970 )
5971 else:
5972 # retry: Get registered params for this existing sub-operation
5973 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5974 op_index
5975 ]
5976 vnf_index = op.get("member_vnf_index")
5977 vnf_config_primitive = op.get("primitive")
5978 primitive_params = op.get("primitive_params")
5979 self.logger.debug(
5980 logging_text
5981 + "vnf_config_primitive={} Sub-operation retry".format(
5982 vnf_config_primitive
5983 )
5984 )
5985 # Execute the primitive, either with new (first-time) or registered (reintent) args
5986 ee_descriptor_id = config_primitive.get(
5987 "execution-environment-ref"
5988 )
5989 primitive_name = config_primitive.get(
5990 "execution-environment-primitive", vnf_config_primitive
5991 )
5992 ee_id, vca_type = self._look_for_deployed_vca(
5993 nsr_deployed["VCA"],
5994 member_vnf_index=vnf_index,
5995 vdu_id=None,
5996 vdu_count_index=None,
5997 ee_descriptor_id=ee_descriptor_id,
5998 )
5999 result, result_detail = await self._ns_execute_primitive(
6000 ee_id,
6001 primitive_name,
6002 primitive_params,
6003 vca_type=vca_type,
6004 vca_id=vca_id,
6005 )
6006 self.logger.debug(
6007 logging_text
6008 + "vnf_config_primitive={} Done with result {} {}".format(
6009 vnf_config_primitive, result, result_detail
6010 )
6011 )
6012 # Update operationState = COMPLETED | FAILED
6013 self._update_suboperation_status(
6014 db_nslcmop, op_index, result, result_detail
6015 )
6016
6017 if result == "FAILED":
6018 raise LcmException(result_detail)
6019 db_nsr_update["config-status"] = old_config_status
6020 scale_process = None
6021 # POST-SCALE END
6022
6023 db_nsr_update[
6024 "detailed-status"
6025 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6026 db_nsr_update["operational-status"] = (
6027 "running"
6028 if old_operational_status == "failed"
6029 else old_operational_status
6030 )
6031 db_nsr_update["config-status"] = old_config_status
6032 return
6033 except (
6034 ROclient.ROClientException,
6035 DbException,
6036 LcmException,
6037 NgRoException,
6038 ) as e:
6039 self.logger.error(logging_text + "Exit Exception {}".format(e))
6040 exc = e
6041 except asyncio.CancelledError:
6042 self.logger.error(
6043 logging_text + "Cancelled Exception while '{}'".format(step)
6044 )
6045 exc = "Operation was cancelled"
6046 except Exception as e:
6047 exc = traceback.format_exc()
6048 self.logger.critical(
6049 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6050 exc_info=True,
6051 )
6052 finally:
6053 self._write_ns_status(
6054 nsr_id=nsr_id,
6055 ns_state=None,
6056 current_operation="IDLE",
6057 current_operation_id=None,
6058 )
6059 if tasks_dict_info:
6060 stage[1] = "Waiting for instantiate pending tasks."
6061 self.logger.debug(logging_text + stage[1])
6062 exc = await self._wait_for_tasks(
6063 logging_text,
6064 tasks_dict_info,
6065 self.timeout_ns_deploy,
6066 stage,
6067 nslcmop_id,
6068 nsr_id=nsr_id,
6069 )
6070 if exc:
6071 db_nslcmop_update[
6072 "detailed-status"
6073 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6074 nslcmop_operation_state = "FAILED"
6075 if db_nsr:
6076 db_nsr_update["operational-status"] = old_operational_status
6077 db_nsr_update["config-status"] = old_config_status
6078 db_nsr_update["detailed-status"] = ""
6079 if scale_process:
6080 if "VCA" in scale_process:
6081 db_nsr_update["config-status"] = "failed"
6082 if "RO" in scale_process:
6083 db_nsr_update["operational-status"] = "failed"
6084 db_nsr_update[
6085 "detailed-status"
6086 ] = "FAILED scaling nslcmop={} {}: {}".format(
6087 nslcmop_id, step, exc
6088 )
6089 else:
6090 error_description_nslcmop = None
6091 nslcmop_operation_state = "COMPLETED"
6092 db_nslcmop_update["detailed-status"] = "Done"
6093
6094 self._write_op_status(
6095 op_id=nslcmop_id,
6096 stage="",
6097 error_message=error_description_nslcmop,
6098 operation_state=nslcmop_operation_state,
6099 other_update=db_nslcmop_update,
6100 )
6101 if db_nsr:
6102 self._write_ns_status(
6103 nsr_id=nsr_id,
6104 ns_state=None,
6105 current_operation="IDLE",
6106 current_operation_id=None,
6107 other_update=db_nsr_update,
6108 )
6109
6110 if nslcmop_operation_state:
6111 try:
6112 msg = {
6113 "nsr_id": nsr_id,
6114 "nslcmop_id": nslcmop_id,
6115 "operationState": nslcmop_operation_state,
6116 }
6117 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6118 except Exception as e:
6119 self.logger.error(
6120 logging_text + "kafka_write notification Exception {}".format(e)
6121 )
6122 self.logger.debug(logging_text + "Exit")
6123 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6124
6125 async def _scale_kdu(
6126 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6127 ):
6128 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6129 for kdu_name in _scaling_info:
6130 for kdu_scaling_info in _scaling_info[kdu_name]:
6131 deployed_kdu, index = get_deployed_kdu(
6132 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6133 )
6134 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6135 kdu_instance = deployed_kdu["kdu-instance"]
6136 scale = int(kdu_scaling_info["scale"])
6137 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6138
6139 db_dict = {
6140 "collection": "nsrs",
6141 "filter": {"_id": nsr_id},
6142 "path": "_admin.deployed.K8s.{}".format(index),
6143 }
6144
6145 step = "scaling application {}".format(
6146 kdu_scaling_info["resource-name"]
6147 )
6148 self.logger.debug(logging_text + step)
6149
6150 if kdu_scaling_info["type"] == "delete":
6151 kdu_config = get_configuration(db_vnfd, kdu_name)
6152 if (
6153 kdu_config
6154 and kdu_config.get("terminate-config-primitive")
6155 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6156 ):
6157 terminate_config_primitive_list = kdu_config.get(
6158 "terminate-config-primitive"
6159 )
6160 terminate_config_primitive_list.sort(
6161 key=lambda val: int(val["seq"])
6162 )
6163
6164 for (
6165 terminate_config_primitive
6166 ) in terminate_config_primitive_list:
6167 primitive_params_ = self._map_primitive_params(
6168 terminate_config_primitive, {}, {}
6169 )
6170 step = "execute terminate config primitive"
6171 self.logger.debug(logging_text + step)
6172 await asyncio.wait_for(
6173 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6174 cluster_uuid=cluster_uuid,
6175 kdu_instance=kdu_instance,
6176 primitive_name=terminate_config_primitive["name"],
6177 params=primitive_params_,
6178 db_dict=db_dict,
6179 vca_id=vca_id,
6180 ),
6181 timeout=600,
6182 )
6183
6184 await asyncio.wait_for(
6185 self.k8scluster_map[k8s_cluster_type].scale(
6186 kdu_instance,
6187 scale,
6188 kdu_scaling_info["resource-name"],
6189 vca_id=vca_id,
6190 ),
6191 timeout=self.timeout_vca_on_error,
6192 )
6193
6194 if kdu_scaling_info["type"] == "create":
6195 kdu_config = get_configuration(db_vnfd, kdu_name)
6196 if (
6197 kdu_config
6198 and kdu_config.get("initial-config-primitive")
6199 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6200 ):
6201 initial_config_primitive_list = kdu_config.get(
6202 "initial-config-primitive"
6203 )
6204 initial_config_primitive_list.sort(
6205 key=lambda val: int(val["seq"])
6206 )
6207
6208 for initial_config_primitive in initial_config_primitive_list:
6209 primitive_params_ = self._map_primitive_params(
6210 initial_config_primitive, {}, {}
6211 )
6212 step = "execute initial config primitive"
6213 self.logger.debug(logging_text + step)
6214 await asyncio.wait_for(
6215 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6216 cluster_uuid=cluster_uuid,
6217 kdu_instance=kdu_instance,
6218 primitive_name=initial_config_primitive["name"],
6219 params=primitive_params_,
6220 db_dict=db_dict,
6221 vca_id=vca_id,
6222 ),
6223 timeout=600,
6224 )
6225
6226 async def _scale_ng_ro(
6227 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6228 ):
6229 nsr_id = db_nslcmop["nsInstanceId"]
6230 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6231 db_vnfrs = {}
6232
6233 # read from db: vnfd's for every vnf
6234 db_vnfds = []
6235
6236 # for each vnf in ns, read vnfd
6237 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6238 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6239 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6240 # if we haven't this vnfd, read it from db
6241 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6242 # read from db
6243 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6244 db_vnfds.append(vnfd)
6245 n2vc_key = self.n2vc.get_public_key()
6246 n2vc_key_list = [n2vc_key]
6247 self.scale_vnfr(
6248 db_vnfr,
6249 vdu_scaling_info.get("vdu-create"),
6250 vdu_scaling_info.get("vdu-delete"),
6251 mark_delete=True,
6252 )
6253 # db_vnfr has been updated, update db_vnfrs to use it
6254 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6255 await self._instantiate_ng_ro(
6256 logging_text,
6257 nsr_id,
6258 db_nsd,
6259 db_nsr,
6260 db_nslcmop,
6261 db_vnfrs,
6262 db_vnfds,
6263 n2vc_key_list,
6264 stage=stage,
6265 start_deploy=time(),
6266 timeout_ns_deploy=self.timeout_ns_deploy,
6267 )
6268 if vdu_scaling_info.get("vdu-delete"):
6269 self.scale_vnfr(
6270 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6271 )
6272
6273 async def add_prometheus_metrics(
6274 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6275 ):
6276 if not self.prometheus:
6277 return
6278 # look if exist a file called 'prometheus*.j2' and
6279 artifact_content = self.fs.dir_ls(artifact_path)
6280 job_file = next(
6281 (
6282 f
6283 for f in artifact_content
6284 if f.startswith("prometheus") and f.endswith(".j2")
6285 ),
6286 None,
6287 )
6288 if not job_file:
6289 return
6290 with self.fs.file_open((artifact_path, job_file), "r") as f:
6291 job_data = f.read()
6292
6293 # TODO get_service
6294 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6295 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6296 host_port = "80"
6297 vnfr_id = vnfr_id.replace("-", "")
6298 variables = {
6299 "JOB_NAME": vnfr_id,
6300 "TARGET_IP": target_ip,
6301 "EXPORTER_POD_IP": host_name,
6302 "EXPORTER_POD_PORT": host_port,
6303 }
6304 job_list = self.prometheus.parse_job(job_data, variables)
6305 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6306 for job in job_list:
6307 if (
6308 not isinstance(job.get("job_name"), str)
6309 or vnfr_id not in job["job_name"]
6310 ):
6311 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6312 job["nsr_id"] = nsr_id
6313 job_dict = {jl["job_name"]: jl for jl in job_list}
6314 if await self.prometheus.update(job_dict):
6315 return list(job_dict.keys())
6316
6317 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6318 """
6319 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6320
6321 :param: vim_account_id: VIM Account ID
6322
6323 :return: (cloud_name, cloud_credential)
6324 """
6325 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6326 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6327
6328 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6329 """
6330 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6331
6332 :param: vim_account_id: VIM Account ID
6333
6334 :return: (cloud_name, cloud_credential)
6335 """
6336 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6337 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")